relay_kafka/
config.rs

1//! Configuration primitives to configure the kafka producer and properly set up the connection.
2//!
3//! The configuration can be either;
4//! - [`TopicAssignment::Primary`] - the main and default kafka configuration,
5//! - [`TopicAssignment::Secondary`] - used to configure any additional kafka topic,
6
7use std::collections::BTreeMap;
8
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11
12/// Kafka configuration errors.
13#[derive(Error, Debug)]
14pub enum ConfigError {
15    /// The user referenced a kafka config name that does not exist.
16    #[error("unknown kafka config name")]
17    UnknownKafkaConfigName,
18    /// The user did not configure 0 shard
19    #[error("invalid kafka shard configuration: must have shard with index 0")]
20    InvalidShard,
21}
22
23/// Define the topics over which Relay communicates with Sentry.
24#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
25pub enum KafkaTopic {
26    /// Simple events (without attachments) topic.
27    Events,
28    /// Complex events (with attachments) topic.
29    Attachments,
30    /// Transaction events topic.
31    Transactions,
32    /// Shared outcomes topic for Relay and Sentry.
33    Outcomes,
34    /// Override for billing critical outcomes.
35    OutcomesBilling,
36    /// Any metric that is extracted from sessions.
37    MetricsSessions,
38    /// Generic metrics topic, excluding sessions (release health).
39    MetricsGeneric,
40    /// Profiles
41    Profiles,
42    /// ReplayEvents, breadcrumb + session updates for replays
43    ReplayEvents,
44    /// ReplayRecordings, large blobs sent by the replay sdk
45    ReplayRecordings,
46    /// Monitor check-ins.
47    Monitors,
48    /// Logs (our log product).
49    OurLogs,
50    /// Standalone spans without a transaction.
51    Spans,
52    /// Feedback events topic.
53    Feedback,
54    /// Items topic
55    Items,
56}
57
58impl KafkaTopic {
59    /// Returns iterator over the variants of [`KafkaTopic`].
60    /// It will have to be adjusted if the new variants are added.
61    pub fn iter() -> std::slice::Iter<'static, Self> {
62        use KafkaTopic::*;
63        static TOPICS: [KafkaTopic; 15] = [
64            Events,
65            Attachments,
66            Transactions,
67            Outcomes,
68            OutcomesBilling,
69            MetricsSessions,
70            MetricsGeneric,
71            Profiles,
72            ReplayEvents,
73            ReplayRecordings,
74            Monitors,
75            OurLogs,
76            Spans,
77            Feedback,
78            Items,
79        ];
80        TOPICS.iter()
81    }
82}
83
84macro_rules! define_topic_assignments {
85    ($($field_name:ident : ($kafka_topic:path, $default_topic:literal, $doc:literal)),* $(,)?) => {
86        /// Configuration for topics.
87        #[derive(Serialize, Deserialize, Debug)]
88        #[serde(default)]
89        pub struct TopicAssignments {
90            $(
91                #[serde(alias = $default_topic)]
92                #[doc = $doc]
93                pub $field_name: TopicAssignment,
94            )*
95
96            /// Additional topic assignments configured but currently unused by this Relay instance.
97            #[serde(flatten)]
98            pub unused: BTreeMap<String, TopicAssignment>,
99        }
100
101        impl TopicAssignments{
102            /// Get a topic assignment by [`KafkaTopic`] value
103            #[must_use]
104            pub fn get(&self, kafka_topic: KafkaTopic) -> &TopicAssignment {
105                match kafka_topic {
106                    $(
107                        $kafka_topic => &self.$field_name,
108                    )*
109                }
110            }
111        }
112
113        impl Default for TopicAssignments {
114            fn default() -> Self {
115                Self {
116                    $(
117                        $field_name: $default_topic.to_owned().into(),
118                    )*
119                    unused: BTreeMap::new(),
120                }
121            }
122        }
123    };
124}
125
126// WARNING: When adding a topic here, make sure that the kafka topic exists or can be auto-created.
127// Failure to do so will result in Relay crashing (if the `kafka_validate_topics` config flag is enabled),
128// or event loss in the store service.
129define_topic_assignments! {
130    events: (KafkaTopic::Events, "ingest-events", "Simple events topic name."),
131    attachments: (KafkaTopic::Attachments, "ingest-attachments", "Events with attachments topic name."),
132    transactions: (KafkaTopic::Transactions, "ingest-transactions", "Transaction events topic name."),
133    outcomes: (KafkaTopic::Outcomes, "outcomes", "Outcomes topic name."),
134    outcomes_billing: (KafkaTopic::OutcomesBilling, "outcomes-billing", "Outcomes topic name for billing critical outcomes."),
135    metrics_sessions: (KafkaTopic::MetricsSessions, "ingest-metrics", "Topic name for metrics extracted from sessions, aka release health."),
136    metrics_generic: (KafkaTopic::MetricsGeneric, "ingest-performance-metrics", "Topic name for all other kinds of metrics."),
137    profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"),
138    replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."),
139    replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."),
140    ourlogs: (KafkaTopic::OurLogs, "snuba-ourlogs", "Logs from our logs product."),
141    monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."),
142    spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."),
143    feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."),
144    items: (KafkaTopic::Items, "snuba-items", "Items topic."),
145}
146
147/// Configuration for a "logical" topic/datasink that Relay should forward data into.
148///
149/// Can be either a string containing the kafka topic name to produce into (using the default
150/// `kafka_config`), or an object containing keys `topic_name` and `kafka_config_name` for using a
151/// custom kafka cluster.
152///
153/// See documentation for `secondary_kafka_configs` for more information.
154#[derive(Serialize, Deserialize, Debug)]
155#[serde(untagged)]
156pub enum TopicAssignment {
157    /// String containing the kafka topic name. In this case the default kafka cluster configured
158    /// in `kafka_config` will be used.
159    Primary(String),
160    /// Object containing topic name and string identifier of one of the clusters configured in
161    /// `secondary_kafka_configs`. In this case that custom kafka config will be used to produce
162    /// data to the given topic name.
163    Secondary(KafkaTopicConfig),
164}
165
166/// Configuration for topic
167#[derive(Serialize, Deserialize, Debug)]
168pub struct KafkaTopicConfig {
169    /// The topic name to use.
170    #[serde(rename = "name")]
171    topic_name: String,
172    /// The Kafka config name will be used to produce data to the given topic.
173    #[serde(rename = "config")]
174    kafka_config_name: String,
175}
176
177/// Config for creating a Kafka producer.
178#[derive(Debug)]
179pub struct KafkaParams<'a> {
180    /// The topic name to use.
181    pub topic_name: &'a str,
182    /// The Kafka config name will be used to produce data.
183    pub config_name: Option<&'a str>,
184    /// Parameters for the Kafka producer configuration.
185    pub params: &'a [KafkaConfigParam],
186}
187
188impl From<String> for TopicAssignment {
189    fn from(topic_name: String) -> Self {
190        Self::Primary(topic_name)
191    }
192}
193
194impl TopicAssignment {
195    /// Get the kafka config for the current topic assignment.
196    ///
197    /// # Errors
198    /// Returns [`ConfigError`] if the configuration for the current topic assignment is invalid.
199    pub fn kafka_config<'a>(
200        &'a self,
201        default_config: &'a Vec<KafkaConfigParam>,
202        secondary_configs: &'a BTreeMap<String, Vec<KafkaConfigParam>>,
203    ) -> Result<KafkaParams<'a>, ConfigError> {
204        let kafka_config = match self {
205            Self::Primary(topic_name) => KafkaParams {
206                topic_name,
207                config_name: None,
208                params: default_config.as_slice(),
209            },
210            Self::Secondary(KafkaTopicConfig {
211                topic_name,
212                kafka_config_name,
213            }) => KafkaParams {
214                config_name: Some(kafka_config_name),
215                topic_name,
216                params: secondary_configs
217                    .get(kafka_config_name)
218                    .ok_or(ConfigError::UnknownKafkaConfigName)?,
219            },
220        };
221
222        Ok(kafka_config)
223    }
224}
225
226/// A name value pair of Kafka config parameter.
227#[derive(Serialize, Deserialize, Debug)]
228pub struct KafkaConfigParam {
229    /// Name of the Kafka config parameter.
230    pub name: String,
231    /// Value of the Kafka config parameter.
232    pub value: String,
233}
234
235#[cfg(test)]
236mod tests {
237
238    use super::*;
239
240    #[test]
241    fn test_kafka_config() {
242        let yaml = r#"
243ingest-events: "ingest-events-kafka-topic"
244profiles:
245    name: "ingest-profiles"
246    config: "profiles"
247ingest-metrics: "ingest-metrics-3"
248transactions: "ingest-transactions-kafka-topic"
249"#;
250
251        let def_config = vec![KafkaConfigParam {
252            name: "test".to_string(),
253            value: "test-value".to_string(),
254        }];
255        let mut second_config = BTreeMap::new();
256        second_config.insert(
257            "profiles".to_string(),
258            vec![KafkaConfigParam {
259                name: "test".to_string(),
260                value: "test-value".to_string(),
261            }],
262        );
263
264        let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
265        let events = topics.events;
266        let profiles = topics.profiles;
267        let metrics_sessions = topics.metrics_sessions;
268        let transactions = topics.transactions;
269
270        assert!(matches!(events, TopicAssignment::Primary(_)));
271        assert!(matches!(profiles, TopicAssignment::Secondary { .. }));
272        assert!(matches!(metrics_sessions, TopicAssignment::Primary(_)));
273        assert!(matches!(transactions, TopicAssignment::Primary(_)));
274
275        let events_config = events
276            .kafka_config(&def_config, &second_config)
277            .expect("Kafka config for events topic");
278        assert!(matches!(
279            events_config,
280            KafkaParams {
281                topic_name: "ingest-events-kafka-topic",
282                ..
283            }
284        ));
285
286        let events_config = profiles
287            .kafka_config(&def_config, &second_config)
288            .expect("Kafka config for profiles topic");
289        assert!(matches!(
290            events_config,
291            KafkaParams {
292                topic_name: "ingest-profiles",
293                config_name: Some("profiles"),
294                ..
295            }
296        ));
297
298        let events_config = metrics_sessions
299            .kafka_config(&def_config, &second_config)
300            .expect("Kafka config for metrics topic");
301        assert!(matches!(
302            events_config,
303            KafkaParams {
304                topic_name: "ingest-metrics-3",
305                ..
306            }
307        ));
308
309        // Legacy keys are still supported
310        let transactions_config = transactions
311            .kafka_config(&def_config, &second_config)
312            .expect("Kafka config for transactions topic");
313        assert!(matches!(
314            transactions_config,
315            KafkaParams {
316                topic_name: "ingest-transactions-kafka-topic",
317                ..
318            }
319        ));
320    }
321
322    #[test]
323    fn test_default_topic_is_valid() {
324        let topic_assignments = TopicAssignments::default();
325
326        // A few topics are not defined currently, remove this once added to `sentry-kafka-schemas`.
327        let currrently_undefined_topics = [
328            "ingest-attachments".to_string(),
329            "ingest-transactions".to_string(),
330            "profiles".to_string(),
331            "ingest-monitors".to_string(),
332        ];
333
334        for topic in KafkaTopic::iter() {
335            match topic_assignments.get(*topic) {
336                TopicAssignment::Primary(logical_topic_name) => {
337                    if !currrently_undefined_topics.contains(logical_topic_name) {
338                        assert!(sentry_kafka_schemas::get_schema(logical_topic_name, None).is_ok());
339                    }
340                }
341                _ => panic!("invalid default"),
342            }
343        }
344    }
345}