relay_kafka/
config.rs

1//! Configuration primitives to configure the kafka producer and properly set up the connection.
2//!
3//! The configuration can be either;
4//! - [`TopicAssignment::Primary`] - the main and default kafka configuration,
5//! - [`TopicAssignment::Secondary`] - used to configure any additional kafka topic,
6
7use std::collections::BTreeMap;
8
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11
12/// Kafka configuration errors.
13#[derive(Error, Debug)]
14pub enum ConfigError {
15    /// The user referenced a kafka config name that does not exist.
16    #[error("unknown kafka config name")]
17    UnknownKafkaConfigName,
18    /// The user did not configure 0 shard
19    #[error("invalid kafka shard configuration: must have shard with index 0")]
20    InvalidShard,
21}
22
23/// Define the topics over which Relay communicates with Sentry.
24#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
25pub enum KafkaTopic {
26    /// Simple events (without attachments) topic.
27    Events,
28    /// Complex events (with attachments) topic.
29    Attachments,
30    /// Transaction events topic.
31    Transactions,
32    /// Shared outcomes topic for Relay and Sentry.
33    Outcomes,
34    /// Override for billing critical outcomes.
35    OutcomesBilling,
36    /// Any metric that is extracted from sessions.
37    MetricsSessions,
38    /// Generic metrics topic, excluding sessions (release health).
39    MetricsGeneric,
40    /// Profiles
41    Profiles,
42    /// ReplayEvents, breadcrumb + session updates for replays
43    ReplayEvents,
44    /// ReplayRecordings, large blobs sent by the replay sdk
45    ReplayRecordings,
46    /// Monitor check-ins.
47    Monitors,
48    /// Logs (our log product).
49    OurLogs,
50    /// Standalone spans without a transaction.
51    Spans,
52    /// Feedback events topic.
53    Feedback,
54    /// Items topic
55    Items,
56}
57
58impl KafkaTopic {
59    /// Returns iterator over the variants of [`KafkaTopic`].
60    /// It will have to be adjusted if the new variants are added.
61    pub fn iter() -> std::slice::Iter<'static, Self> {
62        use KafkaTopic::*;
63        static TOPICS: [KafkaTopic; 15] = [
64            Events,
65            Attachments,
66            Transactions,
67            Outcomes,
68            OutcomesBilling,
69            MetricsSessions,
70            MetricsGeneric,
71            Profiles,
72            ReplayEvents,
73            ReplayRecordings,
74            Monitors,
75            OurLogs,
76            Spans,
77            Feedback,
78            Items,
79        ];
80        TOPICS.iter()
81    }
82}
83
84macro_rules! define_topic_assignments {
85    ($($field_name:ident : ($kafka_topic:path, $default_topic:literal, $doc:literal)),* $(,)?) => {
86        /// Configuration for topics.
87        #[derive(Serialize, Deserialize, Debug)]
88        #[serde(default)]
89        pub struct TopicAssignments {
90            $(
91                #[serde(alias = $default_topic)]
92                #[doc = $doc]
93                pub $field_name: TopicAssignment,
94            )*
95
96            /// Additional topic assignments configured but currently unused by this Relay instance.
97            #[serde(flatten)]
98            pub unused: BTreeMap<String, TopicAssignment>,
99        }
100
101        impl TopicAssignments{
102            /// Get a topic assignment by [`KafkaTopic`] value
103            #[must_use]
104            pub fn get(&self, kafka_topic: KafkaTopic) -> &TopicAssignment {
105                match kafka_topic {
106                    $(
107                        $kafka_topic => &self.$field_name,
108                    )*
109                }
110            }
111        }
112
113        impl Default for TopicAssignments {
114            fn default() -> Self {
115                Self {
116                    $(
117                        $field_name: $default_topic.to_owned().into(),
118                    )*
119                    unused: BTreeMap::new(),
120                }
121            }
122        }
123    };
124}
125
126// WARNING: When adding a topic here, make sure that the kafka topic exists or can be auto-created.
127// Failure to do so will result in Relay crashing (if the `kafka_validate_topics` config flag is enabled),
128// or event loss in the store service.
129define_topic_assignments! {
130    events: (KafkaTopic::Events, "ingest-events", "Simple events topic name."),
131    attachments: (KafkaTopic::Attachments, "ingest-attachments", "Events with attachments topic name."),
132    transactions: (KafkaTopic::Transactions, "ingest-transactions", "Transaction events topic name."),
133    outcomes: (KafkaTopic::Outcomes, "outcomes", "Outcomes topic name."),
134    outcomes_billing: (KafkaTopic::OutcomesBilling, "outcomes-billing", "Outcomes topic name for billing critical outcomes."),
135    metrics_sessions: (KafkaTopic::MetricsSessions, "ingest-metrics", "Topic name for metrics extracted from sessions, aka release health."),
136    metrics_generic: (KafkaTopic::MetricsGeneric, "ingest-performance-metrics", "Topic name for all other kinds of metrics."),
137    profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"),
138    replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."),
139    replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."),
140    ourlogs: (KafkaTopic::OurLogs, "snuba-ourlogs", "Logs from our logs product."),
141    monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."),
142    spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."),
143    feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."),
144    items: (KafkaTopic::Items, "snuba-items", "Items topic."),
145}
146
147/// Configuration for a "logical" topic/datasink that Relay should forward data into.
148///
149/// Can be either a string containing the kafka topic name to produce into (using the default
150/// `kafka_config`), or an object containing keys `topic_name` and `kafka_config_name` for using a
151/// custom kafka cluster.
152///
153/// See documentation for `secondary_kafka_configs` for more information.
154#[derive(Serialize, Deserialize, Debug)]
155#[serde(untagged)]
156pub enum TopicAssignment {
157    /// String containing the kafka topic name. In this case the default kafka cluster configured
158    /// in `kafka_config` will be used.
159    Primary(String),
160    /// Object containing topic name and string identifier of one of the clusters configured in
161    /// `secondary_kafka_configs`. In this case that custom kafka config will be used to produce
162    /// data to the given topic name.
163    Secondary(KafkaTopicConfig),
164}
165
166/// Configuration for topic
167#[derive(Serialize, Deserialize, Debug)]
168pub struct KafkaTopicConfig {
169    /// The topic name to use.
170    #[serde(rename = "name")]
171    topic_name: String,
172    /// The Kafka config name will be used to produce data to the given topic.
173    #[serde(rename = "config")]
174    kafka_config_name: String,
175    /// Optionally, a rate limit per partition key to protect against partition imbalance.
176    #[serde(default, skip_serializing_if = "Option::is_none")]
177    key_rate_limit: Option<KeyRateLimit>,
178}
179
180/// Produce rate limit configuration for a topic.
181#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
182pub struct KeyRateLimit {
183    /// Limit each partition key to N messages per `window_secs`.
184    pub limit_per_window: u64,
185
186    /// The size of the window to record counters for.
187    ///
188    /// Larger windows imply higher memory usage.
189    pub window_secs: u64,
190}
191
192/// Config for creating a Kafka producer.
193#[derive(Debug)]
194pub struct KafkaParams<'a> {
195    /// The topic name to use.
196    pub topic_name: &'a str,
197    /// The Kafka config name will be used to produce data.
198    pub config_name: Option<&'a str>,
199    /// Parameters for the Kafka producer configuration.
200    pub params: &'a [KafkaConfigParam],
201    /// Optionally, a rate limit per partition key to protect against partition imbalance.
202    pub key_rate_limit: Option<KeyRateLimit>,
203}
204
205impl From<String> for TopicAssignment {
206    fn from(topic_name: String) -> Self {
207        Self::Primary(topic_name)
208    }
209}
210
211impl TopicAssignment {
212    /// Get the kafka config for the current topic assignment.
213    ///
214    /// # Errors
215    /// Returns [`ConfigError`] if the configuration for the current topic assignment is invalid.
216    pub fn kafka_config<'a>(
217        &'a self,
218        default_config: &'a Vec<KafkaConfigParam>,
219        secondary_configs: &'a BTreeMap<String, Vec<KafkaConfigParam>>,
220    ) -> Result<KafkaParams<'a>, ConfigError> {
221        let kafka_config = match self {
222            Self::Primary(topic_name) => KafkaParams {
223                topic_name,
224                config_name: None,
225                params: default_config.as_slice(),
226                // XXX: Rate limits can only be set if the non-default kafka broker config is used,
227                // i.e. in the Secondary codepath
228                key_rate_limit: None,
229            },
230            Self::Secondary(KafkaTopicConfig {
231                topic_name,
232                kafka_config_name,
233                key_rate_limit,
234            }) => KafkaParams {
235                config_name: Some(kafka_config_name),
236                topic_name,
237                params: secondary_configs
238                    .get(kafka_config_name)
239                    .ok_or(ConfigError::UnknownKafkaConfigName)?,
240                key_rate_limit: *key_rate_limit,
241            },
242        };
243
244        Ok(kafka_config)
245    }
246}
247
248/// A name value pair of Kafka config parameter.
249#[derive(Serialize, Deserialize, Debug)]
250pub struct KafkaConfigParam {
251    /// Name of the Kafka config parameter.
252    pub name: String,
253    /// Value of the Kafka config parameter.
254    pub value: String,
255}
256
257#[cfg(test)]
258mod tests {
259
260    use super::*;
261
262    #[test]
263    fn test_kafka_config() {
264        let yaml = r#"
265ingest-events: "ingest-events-kafka-topic"
266profiles:
267    name: "ingest-profiles"
268    config: "profiles"
269ingest-metrics: "ingest-metrics-3"
270transactions: "ingest-transactions-kafka-topic"
271"#;
272
273        let def_config = vec![KafkaConfigParam {
274            name: "test".to_string(),
275            value: "test-value".to_string(),
276        }];
277        let mut second_config = BTreeMap::new();
278        second_config.insert(
279            "profiles".to_string(),
280            vec![KafkaConfigParam {
281                name: "test".to_string(),
282                value: "test-value".to_string(),
283            }],
284        );
285
286        let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
287        let events = topics.events;
288        let profiles = topics.profiles;
289        let metrics_sessions = topics.metrics_sessions;
290        let transactions = topics.transactions;
291
292        assert!(matches!(events, TopicAssignment::Primary(_)));
293        assert!(matches!(profiles, TopicAssignment::Secondary { .. }));
294        assert!(matches!(metrics_sessions, TopicAssignment::Primary(_)));
295        assert!(matches!(transactions, TopicAssignment::Primary(_)));
296
297        let events_config = events
298            .kafka_config(&def_config, &second_config)
299            .expect("Kafka config for events topic");
300        assert!(matches!(
301            events_config,
302            KafkaParams {
303                topic_name: "ingest-events-kafka-topic",
304                ..
305            }
306        ));
307
308        let events_config = profiles
309            .kafka_config(&def_config, &second_config)
310            .expect("Kafka config for profiles topic");
311        assert!(matches!(
312            events_config,
313            KafkaParams {
314                topic_name: "ingest-profiles",
315                config_name: Some("profiles"),
316                ..
317            }
318        ));
319
320        let events_config = metrics_sessions
321            .kafka_config(&def_config, &second_config)
322            .expect("Kafka config for metrics topic");
323        assert!(matches!(
324            events_config,
325            KafkaParams {
326                topic_name: "ingest-metrics-3",
327                ..
328            }
329        ));
330
331        // Legacy keys are still supported
332        let transactions_config = transactions
333            .kafka_config(&def_config, &second_config)
334            .expect("Kafka config for transactions topic");
335        assert!(matches!(
336            transactions_config,
337            KafkaParams {
338                topic_name: "ingest-transactions-kafka-topic",
339                ..
340            }
341        ));
342    }
343
344    #[test]
345    fn test_default_topic_is_valid() {
346        let topic_assignments = TopicAssignments::default();
347
348        // A few topics are not defined currently, remove this once added to `sentry-kafka-schemas`.
349        let currrently_undefined_topics = [
350            "ingest-attachments".to_string(),
351            "ingest-transactions".to_string(),
352            "profiles".to_string(),
353            "ingest-monitors".to_string(),
354        ];
355
356        for topic in KafkaTopic::iter() {
357            match topic_assignments.get(*topic) {
358                TopicAssignment::Primary(logical_topic_name) => {
359                    if !currrently_undefined_topics.contains(logical_topic_name) {
360                        assert!(sentry_kafka_schemas::get_schema(logical_topic_name, None).is_ok());
361                    }
362                }
363                _ => panic!("invalid default"),
364            }
365        }
366    }
367}