relay_kafka/
config.rs

1//! Configuration primitives to configure the kafka producer and properly set up the connection.
2//!
3//! The configuration can be either;
4//! - [`TopicAssignment::Primary`] - the main and default kafka configuration,
5//! - [`TopicAssignment::Secondary`] - used to configure any additional kafka topic,
6
7use std::collections::BTreeMap;
8
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11
12/// Kafka configuration errors.
13#[derive(Error, Debug)]
14pub enum ConfigError {
15    /// The user referenced a kafka config name that does not exist.
16    #[error("unknown kafka config name")]
17    UnknownKafkaConfigName,
18    /// The user did not configure 0 shard
19    #[error("invalid kafka shard configuration: must have shard with index 0")]
20    InvalidShard,
21}
22
23/// Define the topics over which Relay communicates with Sentry.
24#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
25pub enum KafkaTopic {
26    /// Simple events (without attachments) topic.
27    Events,
28    /// Complex events (with attachments) topic.
29    Attachments,
30    /// Transaction events topic.
31    Transactions,
32    /// Shared outcomes topic for Relay and Sentry.
33    Outcomes,
34    /// Override for billing critical outcomes.
35    OutcomesBilling,
36    /// Any metric that is extracted from sessions.
37    MetricsSessions,
38    /// Generic metrics topic, excluding sessions (release health).
39    MetricsGeneric,
40    /// Profiles
41    Profiles,
42    /// ReplayEvents, breadcrumb + session updates for replays
43    ReplayEvents,
44    /// ReplayRecordings, large blobs sent by the replay sdk
45    ReplayRecordings,
46    /// Monitor check-ins.
47    Monitors,
48    /// Logs (our log product).
49    OurLogs,
50    /// Standalone spans without a transaction.
51    Spans,
52    /// Feedback events topic.
53    Feedback,
54}
55
56impl KafkaTopic {
57    /// Returns iterator over the variants of [`KafkaTopic`].
58    /// It will have to be adjusted if the new variants are added.
59    pub fn iter() -> std::slice::Iter<'static, Self> {
60        use KafkaTopic::*;
61        static TOPICS: [KafkaTopic; 14] = [
62            Events,
63            Attachments,
64            Transactions,
65            Outcomes,
66            OutcomesBilling,
67            MetricsSessions,
68            MetricsGeneric,
69            Profiles,
70            ReplayEvents,
71            ReplayRecordings,
72            Monitors,
73            OurLogs,
74            Spans,
75            Feedback,
76        ];
77        TOPICS.iter()
78    }
79}
80
81macro_rules! define_topic_assignments {
82    ($($field_name:ident : ($kafka_topic:path, $default_topic:literal, $doc:literal)),* $(,)?) => {
83        /// Configuration for topics.
84        #[derive(Serialize, Deserialize, Debug)]
85        #[serde(default)]
86        pub struct TopicAssignments {
87            $(
88                #[serde(alias = $default_topic)]
89                #[doc = $doc]
90                pub $field_name: TopicAssignment,
91            )*
92
93            /// Additional topic assignments configured but currently unused by this Relay instance.
94            #[serde(flatten)]
95            pub unused: BTreeMap<String, TopicAssignment>,
96        }
97
98        impl TopicAssignments{
99            /// Get a topic assignment by [`KafkaTopic`] value
100            #[must_use]
101            pub fn get(&self, kafka_topic: KafkaTopic) -> &TopicAssignment {
102                match kafka_topic {
103                    $(
104                        $kafka_topic => &self.$field_name,
105                    )*
106                }
107            }
108        }
109
110        impl Default for TopicAssignments {
111            fn default() -> Self {
112                Self {
113                    $(
114                        $field_name: $default_topic.to_owned().into(),
115                    )*
116                    unused: BTreeMap::new(),
117                }
118            }
119        }
120    };
121}
122
123// WARNING: When adding a topic here, make sure that the kafka topic exists or can be auto-created.
124// Failure to do so will result in Relay crashing (if the `kafka_validate_topics` config flag is enabled),
125// or event loss in the store service.
126define_topic_assignments! {
127    events: (KafkaTopic::Events, "ingest-events", "Simple events topic name."),
128    attachments: (KafkaTopic::Attachments, "ingest-attachments", "Events with attachments topic name."),
129    transactions: (KafkaTopic::Transactions, "ingest-transactions", "Transaction events topic name."),
130    outcomes: (KafkaTopic::Outcomes, "outcomes", "Outcomes topic name."),
131    outcomes_billing: (KafkaTopic::OutcomesBilling, "outcomes-billing", "Outcomes topic name for billing critical outcomes."),
132    metrics_sessions: (KafkaTopic::MetricsSessions, "ingest-metrics", "Topic name for metrics extracted from sessions, aka release health."),
133    metrics_generic: (KafkaTopic::MetricsGeneric, "ingest-performance-metrics", "Topic name for all other kinds of metrics."),
134    profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"),
135    replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."),
136    replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."),
137    ourlogs: (KafkaTopic::OurLogs, "snuba-ourlogs", "Logs from our logs product."),
138    monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."),
139    spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."),
140    feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."),
141}
142
143/// Configuration for a "logical" topic/datasink that Relay should forward data into.
144///
145/// Can be either a string containing the kafka topic name to produce into (using the default
146/// `kafka_config`), or an object containing keys `topic_name` and `kafka_config_name` for using a
147/// custom kafka cluster.
148///
149/// See documentation for `secondary_kafka_configs` for more information.
150#[derive(Serialize, Deserialize, Debug)]
151#[serde(untagged)]
152pub enum TopicAssignment {
153    /// String containing the kafka topic name. In this case the default kafka cluster configured
154    /// in `kafka_config` will be used.
155    Primary(String),
156    /// Object containing topic name and string identifier of one of the clusters configured in
157    /// `secondary_kafka_configs`. In this case that custom kafka config will be used to produce
158    /// data to the given topic name.
159    Secondary(KafkaTopicConfig),
160}
161
162/// Configuration for topic
163#[derive(Serialize, Deserialize, Debug)]
164pub struct KafkaTopicConfig {
165    /// The topic name to use.
166    #[serde(rename = "name")]
167    topic_name: String,
168    /// The Kafka config name will be used to produce data to the given topic.
169    #[serde(rename = "config")]
170    kafka_config_name: String,
171}
172
173/// Config for creating a Kafka producer.
174#[derive(Debug)]
175pub struct KafkaParams<'a> {
176    /// The topic name to use.
177    pub topic_name: &'a str,
178    /// The Kafka config name will be used to produce data.
179    pub config_name: Option<&'a str>,
180    /// Parameters for the Kafka producer configuration.
181    pub params: &'a [KafkaConfigParam],
182}
183
184impl From<String> for TopicAssignment {
185    fn from(topic_name: String) -> Self {
186        Self::Primary(topic_name)
187    }
188}
189
190impl TopicAssignment {
191    /// Get the kafka config for the current topic assignment.
192    ///
193    /// # Errors
194    /// Returns [`ConfigError`] if the configuration for the current topic assignment is invalid.
195    pub fn kafka_config<'a>(
196        &'a self,
197        default_config: &'a Vec<KafkaConfigParam>,
198        secondary_configs: &'a BTreeMap<String, Vec<KafkaConfigParam>>,
199    ) -> Result<KafkaParams<'a>, ConfigError> {
200        let kafka_config = match self {
201            Self::Primary(topic_name) => KafkaParams {
202                topic_name,
203                config_name: None,
204                params: default_config.as_slice(),
205            },
206            Self::Secondary(KafkaTopicConfig {
207                topic_name,
208                kafka_config_name,
209            }) => KafkaParams {
210                config_name: Some(kafka_config_name),
211                topic_name,
212                params: secondary_configs
213                    .get(kafka_config_name)
214                    .ok_or(ConfigError::UnknownKafkaConfigName)?,
215            },
216        };
217
218        Ok(kafka_config)
219    }
220}
221
222/// A name value pair of Kafka config parameter.
223#[derive(Serialize, Deserialize, Debug)]
224pub struct KafkaConfigParam {
225    /// Name of the Kafka config parameter.
226    pub name: String,
227    /// Value of the Kafka config parameter.
228    pub value: String,
229}
230
231#[cfg(test)]
232mod tests {
233
234    use super::*;
235
236    #[test]
237    fn test_kafka_config() {
238        let yaml = r#"
239ingest-events: "ingest-events-kafka-topic"
240profiles:
241    name: "ingest-profiles"
242    config: "profiles"
243ingest-metrics: "ingest-metrics-3"
244transactions: "ingest-transactions-kafka-topic"
245"#;
246
247        let def_config = vec![KafkaConfigParam {
248            name: "test".to_string(),
249            value: "test-value".to_string(),
250        }];
251        let mut second_config = BTreeMap::new();
252        second_config.insert(
253            "profiles".to_string(),
254            vec![KafkaConfigParam {
255                name: "test".to_string(),
256                value: "test-value".to_string(),
257            }],
258        );
259
260        let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
261        let events = topics.events;
262        let profiles = topics.profiles;
263        let metrics_sessions = topics.metrics_sessions;
264        let transactions = topics.transactions;
265
266        assert!(matches!(events, TopicAssignment::Primary(_)));
267        assert!(matches!(profiles, TopicAssignment::Secondary { .. }));
268        assert!(matches!(metrics_sessions, TopicAssignment::Primary(_)));
269        assert!(matches!(transactions, TopicAssignment::Primary(_)));
270
271        let events_config = events
272            .kafka_config(&def_config, &second_config)
273            .expect("Kafka config for events topic");
274        assert!(matches!(
275            events_config,
276            KafkaParams {
277                topic_name: "ingest-events-kafka-topic",
278                ..
279            }
280        ));
281
282        let events_config = profiles
283            .kafka_config(&def_config, &second_config)
284            .expect("Kafka config for profiles topic");
285        assert!(matches!(
286            events_config,
287            KafkaParams {
288                topic_name: "ingest-profiles",
289                config_name: Some("profiles"),
290                ..
291            }
292        ));
293
294        let events_config = metrics_sessions
295            .kafka_config(&def_config, &second_config)
296            .expect("Kafka config for metrics topic");
297        assert!(matches!(
298            events_config,
299            KafkaParams {
300                topic_name: "ingest-metrics-3",
301                ..
302            }
303        ));
304
305        // Legacy keys are still supported
306        let transactions_config = transactions
307            .kafka_config(&def_config, &second_config)
308            .expect("Kafka config for transactions topic");
309        assert!(matches!(
310            transactions_config,
311            KafkaParams {
312                topic_name: "ingest-transactions-kafka-topic",
313                ..
314            }
315        ));
316    }
317
318    #[test]
319    fn test_default_topic_is_valid() {
320        let topic_assignments = TopicAssignments::default();
321
322        // A few topics are not defined currently, remove this once added to `sentry-kafka-schemas`.
323        let currrently_undefined_topics = [
324            "ingest-attachments".to_string(),
325            "ingest-transactions".to_string(),
326            "profiles".to_string(),
327            "ingest-monitors".to_string(),
328        ];
329
330        for topic in KafkaTopic::iter() {
331            match topic_assignments.get(*topic) {
332                TopicAssignment::Primary(logical_topic_name) => {
333                    if !currrently_undefined_topics.contains(logical_topic_name) {
334                        assert!(sentry_kafka_schemas::get_schema(logical_topic_name, None).is_ok());
335                    }
336                }
337                _ => panic!("invalid default"),
338            }
339        }
340    }
341}