relay_kafka/
config.rs

1//! Configuration primitives to configure the kafka producer and properly set up the connection.
2
3use std::collections::BTreeMap;
4
5use serde::{Deserialize, Serialize, de};
6use thiserror::Error;
7
8/// Kafka configuration errors.
9#[derive(Error, Debug)]
10pub enum ConfigError {
11    /// The user referenced a kafka config name that does not exist.
12    #[error("unknown kafka config name")]
13    UnknownKafkaConfigName,
14    /// The user did not configure 0 shard
15    #[error("invalid kafka shard configuration: must have shard with index 0")]
16    InvalidShard,
17}
18
19/// Define the topics over which Relay communicates with Sentry.
20#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
21pub enum KafkaTopic {
22    /// Simple events (without attachments) topic.
23    Events,
24    /// Complex events (with attachments) topic.
25    Attachments,
26    /// Transaction events topic.
27    Transactions,
28    /// Shared outcomes topic for Relay and Sentry.
29    Outcomes,
30    /// Override for billing critical outcomes.
31    OutcomesBilling,
32    /// Any metric that is extracted from sessions.
33    MetricsSessions,
34    /// Generic metrics topic, excluding sessions (release health).
35    MetricsGeneric,
36    /// Profiles
37    Profiles,
38    /// ReplayEvents, breadcrumb + session updates for replays
39    ReplayEvents,
40    /// ReplayRecordings, large blobs sent by the replay sdk
41    ReplayRecordings,
42    /// Monitor check-ins.
43    Monitors,
44    /// Logs (our log product).
45    OurLogs,
46    /// Standalone spans without a transaction.
47    Spans,
48    /// Feedback events topic.
49    Feedback,
50    /// Items topic
51    Items,
52}
53
54impl KafkaTopic {
55    /// Returns iterator over the variants of [`KafkaTopic`].
56    /// It will have to be adjusted if the new variants are added.
57    pub fn iter() -> std::slice::Iter<'static, Self> {
58        use KafkaTopic::*;
59        static TOPICS: [KafkaTopic; 15] = [
60            Events,
61            Attachments,
62            Transactions,
63            Outcomes,
64            OutcomesBilling,
65            MetricsSessions,
66            MetricsGeneric,
67            Profiles,
68            ReplayEvents,
69            ReplayRecordings,
70            Monitors,
71            OurLogs,
72            Spans,
73            Feedback,
74            Items,
75        ];
76        TOPICS.iter()
77    }
78}
79
80macro_rules! define_topic_assignments {
81    ($($field_name:ident : ($kafka_topic:path, $default_topic:literal, $doc:literal)),* $(,)?) => {
82        /// Configuration for topics.
83        #[derive(Deserialize, Serialize, Debug)]
84        #[serde(default)]
85        pub struct TopicAssignments {
86            $(
87                #[serde(alias = $default_topic)]
88                #[doc = $doc]
89                pub $field_name: TopicAssignment,
90            )*
91
92            /// Additional topic assignments configured but currently unused by this Relay instance.
93            #[serde(flatten, skip_serializing)]
94            pub unused: Unused,
95        }
96
97        impl TopicAssignments{
98            /// Get a topic assignment by [`KafkaTopic`] value
99            #[must_use]
100            pub fn get(&self, kafka_topic: KafkaTopic) -> &TopicAssignment {
101                match kafka_topic {
102                    $(
103                        $kafka_topic => &self.$field_name,
104                    )*
105                }
106            }
107        }
108
109        impl KafkaTopic {
110            /// Map this KafkaTopic to the "logical topic", i.e. the default topic name.
111            pub fn logical_topic_name(&self) -> &'static str {
112                match self {
113                    $(
114                        $kafka_topic => $default_topic,
115                    )*
116                }
117            }
118        }
119
120        impl Default for TopicAssignments {
121            fn default() -> Self {
122                Self {
123                    $(
124                        $field_name: $default_topic.to_owned().into(),
125                    )*
126                    unused: Default::default()
127                }
128            }
129        }
130    };
131}
132
133// WARNING: When adding a topic here, make sure that the kafka topic exists or can be auto-created.
134// Failure to do so will result in Relay crashing (if the `kafka_validate_topics` config flag is enabled),
135// or event loss in the store service.
136define_topic_assignments! {
137    events: (KafkaTopic::Events, "ingest-events", "Simple events topic name."),
138    attachments: (KafkaTopic::Attachments, "ingest-attachments", "Events with attachments topic name."),
139    transactions: (KafkaTopic::Transactions, "ingest-transactions", "Transaction events topic name."),
140    outcomes: (KafkaTopic::Outcomes, "outcomes", "Outcomes topic name."),
141    outcomes_billing: (KafkaTopic::OutcomesBilling, "outcomes-billing", "Outcomes topic name for billing critical outcomes."),
142    metrics_sessions: (KafkaTopic::MetricsSessions, "ingest-metrics", "Topic name for metrics extracted from sessions, aka release health."),
143    metrics_generic: (KafkaTopic::MetricsGeneric, "ingest-performance-metrics", "Topic name for all other kinds of metrics."),
144    profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"),
145    replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."),
146    replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."),
147    ourlogs: (KafkaTopic::OurLogs, "snuba-ourlogs", "Logs from our logs product."),
148    monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."),
149    spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."),
150    feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."),
151    items: (KafkaTopic::Items, "snuba-items", "Items topic."),
152}
153
154/// A list of all currently, by this Relay, unused topic configurations.
155#[derive(Debug, Default)]
156pub struct Unused(Vec<String>);
157
158impl Unused {
159    /// Returns all unused topic names.
160    pub fn names(&self) -> &[String] {
161        &self.0
162    }
163}
164
165impl<'de> de::Deserialize<'de> for Unused {
166    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
167    where
168        D: de::Deserializer<'de>,
169    {
170        let topics = BTreeMap::<String, de::IgnoredAny>::deserialize(deserializer)?;
171        Ok(Self(topics.into_keys().collect()))
172    }
173}
174
175/// Configuration for a "logical" topic/datasink that Relay should forward data into.
176///
177/// Can be either a string containing the kafka topic name to produce into (using the default
178/// `kafka_config`), an object containing keys `topic_name` and `kafka_config_name` for using a
179/// custom kafka cluster, or an array of topic names/configs for sharded topics.
180///
181/// See documentation for `secondary_kafka_configs` for more information.
182#[derive(Debug, Serialize)]
183pub struct TopicAssignment(Vec<TopicConfig>);
184
185impl<'de> de::Deserialize<'de> for TopicAssignment {
186    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
187    where
188        D: de::Deserializer<'de>,
189    {
190        #[derive(Deserialize, Debug)]
191        #[serde(untagged)]
192        enum Inner {
193            // order matters. structs can be deserialized from arrays.
194            ShardedPrimary(Vec<String>),
195            ShardedSecondary(Vec<TopicConfig>),
196            Primary(String),
197            Secondary(TopicConfig),
198        }
199
200        let configs = match Inner::deserialize(deserializer)? {
201            Inner::Primary(topic_name) => vec![topic_name.into()],
202            Inner::Secondary(config) => vec![config],
203            Inner::ShardedPrimary(topic_names) => topic_names.into_iter().map(From::from).collect(),
204            Inner::ShardedSecondary(configs) => configs,
205        };
206
207        if configs.is_empty() {
208            return Err(de::Error::custom(
209                "topic assignment must have at least one shard",
210            ));
211        }
212
213        Ok(Self(configs))
214    }
215}
216
217/// Configuration for topic
218#[derive(Debug, Deserialize, Serialize)]
219pub struct TopicConfig {
220    /// The topic name to use.
221    #[serde(rename = "name")]
222    topic_name: String,
223    /// The Kafka config name will be used to produce data to the given topic.
224    ///
225    /// If the config is missing, the default config will be used.
226    #[serde(rename = "config", skip_serializing_if = "Option::is_none")]
227    kafka_config_name: Option<String>,
228    /// Optionally, a rate limit per partition key to protect against partition imbalance.
229    #[serde(default, skip_serializing_if = "Option::is_none")]
230    key_rate_limit: Option<KeyRateLimit>,
231}
232
233impl From<String> for TopicConfig {
234    fn from(topic_name: String) -> Self {
235        Self {
236            topic_name,
237            kafka_config_name: None,
238            key_rate_limit: None,
239        }
240    }
241}
242
243/// Produce rate limit configuration for a topic.
244#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
245pub struct KeyRateLimit {
246    /// Limit each partition key to N messages per `window_secs`.
247    pub limit_per_window: u64,
248
249    /// The size of the window to record counters for.
250    ///
251    /// Larger windows imply higher memory usage.
252    pub window_secs: u64,
253}
254
255/// A Kafka config for a topic.
256///
257/// This internally includes configuration for multiple 'physical' Kafka topics,
258/// as Relay can shard to multiple topics at once.
259#[derive(Debug)]
260pub struct KafkaTopicConfig<'a>(Vec<KafkaParams<'a>>);
261
262impl<'a> KafkaTopicConfig<'a> {
263    /// Kafka params for each psysical shard.
264    pub fn topics(&self) -> &[KafkaParams<'a>] {
265        &self.0
266    }
267}
268
269/// Config for creating a Kafka producer.
270#[derive(Debug)]
271pub struct KafkaParams<'a> {
272    /// The topic names to use. Can be a single topic or multiple topics for sharding.
273    pub topic_name: String,
274    /// The Kafka config name will be used to produce data.
275    pub config_name: Option<&'a str>,
276    /// Parameters for the Kafka producer configuration.
277    pub params: &'a [KafkaConfigParam],
278    /// Optionally, a rate limit per partition key to protect against partition imbalance.
279    pub key_rate_limit: Option<KeyRateLimit>,
280}
281
282impl From<String> for TopicAssignment {
283    fn from(topic_name: String) -> Self {
284        Self(vec![topic_name.into()])
285    }
286}
287
288impl TopicAssignment {
289    /// Get the Kafka configs for the current topic assignment.
290    ///
291    /// # Errors
292    /// Returns [`ConfigError`] if the configuration for the current topic assignment is invalid.
293    pub fn kafka_configs<'a>(
294        &'a self,
295        default_config: &'a Vec<KafkaConfigParam>,
296        secondary_configs: &'a BTreeMap<String, Vec<KafkaConfigParam>>,
297    ) -> Result<KafkaTopicConfig<'a>, ConfigError> {
298        let configs = self
299            .0
300            .iter()
301            .map(|tc| {
302                Ok(KafkaParams {
303                    topic_name: tc.topic_name.clone(),
304                    config_name: tc.kafka_config_name.as_deref(),
305                    params: match &tc.kafka_config_name {
306                        Some(config) => secondary_configs
307                            .get(config)
308                            .ok_or(ConfigError::UnknownKafkaConfigName)?,
309                        None => default_config.as_slice(),
310                    },
311                    key_rate_limit: tc.key_rate_limit,
312                })
313            })
314            .collect::<Result<_, _>>()?;
315
316        Ok(KafkaTopicConfig(configs))
317    }
318}
319
320/// A name value pair of Kafka config parameter.
321#[derive(Debug, Deserialize, Serialize)]
322pub struct KafkaConfigParam {
323    /// Name of the Kafka config parameter.
324    pub name: String,
325    /// Value of the Kafka config parameter.
326    pub value: String,
327}
328
329#[cfg(test)]
330mod tests {
331
332    use super::*;
333
334    #[test]
335    fn test_kafka_config() {
336        let yaml = r#"
337ingest-events: "ingest-events-kafka-topic"
338profiles:
339    name: "ingest-profiles"
340    config: "profiles"
341ingest-metrics: "ingest-metrics-3"
342transactions: "ingest-transactions-kafka-topic"
343"#;
344
345        let mut second_config = BTreeMap::new();
346        second_config.insert(
347            "profiles".to_owned(),
348            vec![KafkaConfigParam {
349                name: "test".to_owned(),
350                value: "test-value".to_owned(),
351            }],
352        );
353
354        let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
355        insta::assert_debug_snapshot!(topics, @r###"
356        TopicAssignments {
357            events: TopicAssignment(
358                [
359                    TopicConfig {
360                        topic_name: "ingest-events-kafka-topic",
361                        kafka_config_name: None,
362                        key_rate_limit: None,
363                    },
364                ],
365            ),
366            attachments: TopicAssignment(
367                [
368                    TopicConfig {
369                        topic_name: "ingest-attachments",
370                        kafka_config_name: None,
371                        key_rate_limit: None,
372                    },
373                ],
374            ),
375            transactions: TopicAssignment(
376                [
377                    TopicConfig {
378                        topic_name: "ingest-transactions-kafka-topic",
379                        kafka_config_name: None,
380                        key_rate_limit: None,
381                    },
382                ],
383            ),
384            outcomes: TopicAssignment(
385                [
386                    TopicConfig {
387                        topic_name: "outcomes",
388                        kafka_config_name: None,
389                        key_rate_limit: None,
390                    },
391                ],
392            ),
393            outcomes_billing: TopicAssignment(
394                [
395                    TopicConfig {
396                        topic_name: "outcomes-billing",
397                        kafka_config_name: None,
398                        key_rate_limit: None,
399                    },
400                ],
401            ),
402            metrics_sessions: TopicAssignment(
403                [
404                    TopicConfig {
405                        topic_name: "ingest-metrics-3",
406                        kafka_config_name: None,
407                        key_rate_limit: None,
408                    },
409                ],
410            ),
411            metrics_generic: TopicAssignment(
412                [
413                    TopicConfig {
414                        topic_name: "ingest-performance-metrics",
415                        kafka_config_name: None,
416                        key_rate_limit: None,
417                    },
418                ],
419            ),
420            profiles: TopicAssignment(
421                [
422                    TopicConfig {
423                        topic_name: "ingest-profiles",
424                        kafka_config_name: Some(
425                            "profiles",
426                        ),
427                        key_rate_limit: None,
428                    },
429                ],
430            ),
431            replay_events: TopicAssignment(
432                [
433                    TopicConfig {
434                        topic_name: "ingest-replay-events",
435                        kafka_config_name: None,
436                        key_rate_limit: None,
437                    },
438                ],
439            ),
440            replay_recordings: TopicAssignment(
441                [
442                    TopicConfig {
443                        topic_name: "ingest-replay-recordings",
444                        kafka_config_name: None,
445                        key_rate_limit: None,
446                    },
447                ],
448            ),
449            ourlogs: TopicAssignment(
450                [
451                    TopicConfig {
452                        topic_name: "snuba-ourlogs",
453                        kafka_config_name: None,
454                        key_rate_limit: None,
455                    },
456                ],
457            ),
458            monitors: TopicAssignment(
459                [
460                    TopicConfig {
461                        topic_name: "ingest-monitors",
462                        kafka_config_name: None,
463                        key_rate_limit: None,
464                    },
465                ],
466            ),
467            spans: TopicAssignment(
468                [
469                    TopicConfig {
470                        topic_name: "snuba-spans",
471                        kafka_config_name: None,
472                        key_rate_limit: None,
473                    },
474                ],
475            ),
476            feedback: TopicAssignment(
477                [
478                    TopicConfig {
479                        topic_name: "ingest-feedback-events",
480                        kafka_config_name: None,
481                        key_rate_limit: None,
482                    },
483                ],
484            ),
485            items: TopicAssignment(
486                [
487                    TopicConfig {
488                        topic_name: "snuba-items",
489                        kafka_config_name: None,
490                        key_rate_limit: None,
491                    },
492                ],
493            ),
494            unused: Unused(
495                [],
496            ),
497        }
498        "###);
499    }
500
501    #[test]
502    fn test_default_topic_is_valid() {
503        // A few topics are not defined currently, remove this once added to `sentry-kafka-schemas`.
504        let currrently_undefined_topics = [
505            "ingest-attachments",
506            "ingest-transactions",
507            "profiles",
508            "ingest-monitors",
509        ];
510
511        for topic in KafkaTopic::iter() {
512            let name = topic.logical_topic_name();
513            if !currrently_undefined_topics.contains(&name) {
514                assert!(sentry_kafka_schemas::get_schema(name, None).is_ok());
515            }
516        }
517    }
518
519    #[test]
520    fn test_sharded_kafka_config() {
521        let yaml = r#"
522events: ["ingest-events-1", "ingest-events-2"]
523profiles:
524  - name: "ingest-profiles-1"
525    config: "profiles"
526  - name: "ingest-profiles-2"
527    config: "profiles"
528"#;
529        let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
530
531        let def_config = vec![KafkaConfigParam {
532            name: "test".to_owned(),
533            value: "test-value".to_owned(),
534        }];
535        let mut second_config = BTreeMap::new();
536        second_config.insert(
537            "profiles".to_owned(),
538            vec![KafkaConfigParam {
539                name: "test".to_owned(),
540                value: "test-value".to_owned(),
541            }],
542        );
543
544        let events_configs = topics
545            .events
546            .kafka_configs(&def_config, &second_config)
547            .expect("Kafka config for sharded events topic");
548
549        insta::assert_debug_snapshot!(events_configs, @r###"
550        KafkaTopicConfig(
551            [
552                KafkaParams {
553                    topic_name: "ingest-events-1",
554                    config_name: None,
555                    params: [
556                        KafkaConfigParam {
557                            name: "test",
558                            value: "test-value",
559                        },
560                    ],
561                    key_rate_limit: None,
562                },
563                KafkaParams {
564                    topic_name: "ingest-events-2",
565                    config_name: None,
566                    params: [
567                        KafkaConfigParam {
568                            name: "test",
569                            value: "test-value",
570                        },
571                    ],
572                    key_rate_limit: None,
573                },
574            ],
575        )
576        "###);
577
578        let profiles_configs = topics
579            .profiles
580            .kafka_configs(&def_config, &second_config)
581            .expect("Kafka config for sharded profiles topic");
582
583        insta::assert_debug_snapshot!(profiles_configs, @r###"
584        KafkaTopicConfig(
585            [
586                KafkaParams {
587                    topic_name: "ingest-profiles-1",
588                    config_name: Some(
589                        "profiles",
590                    ),
591                    params: [
592                        KafkaConfigParam {
593                            name: "test",
594                            value: "test-value",
595                        },
596                    ],
597                    key_rate_limit: None,
598                },
599                KafkaParams {
600                    topic_name: "ingest-profiles-2",
601                    config_name: Some(
602                        "profiles",
603                    ),
604                    params: [
605                        KafkaConfigParam {
606                            name: "test",
607                            value: "test-value",
608                        },
609                    ],
610                    key_rate_limit: None,
611                },
612            ],
613        )
614        "###);
615    }
616
617    #[test]
618    fn test_per_shard_rate_limits() {
619        let yaml = r#"
620events:
621  - name: "shard-0"
622    config: "cluster1"
623    key_rate_limit:
624      limit_per_window: 100
625      window_secs: 60
626  - name: "shard-1"
627    config: "cluster2"
628    key_rate_limit:
629      limit_per_window: 200
630      window_secs: 120
631  - name: "shard-2"  # No rate limit (Primary variant)
632"#;
633
634        let def_config = vec![KafkaConfigParam {
635            name: "bootstrap.servers".to_owned(),
636            value: "primary:9092".to_owned(),
637        }];
638        let mut second_config = BTreeMap::new();
639        second_config.insert(
640            "cluster1".to_owned(),
641            vec![KafkaConfigParam {
642                name: "bootstrap.servers".to_owned(),
643                value: "cluster1:9092".to_owned(),
644            }],
645        );
646        second_config.insert(
647            "cluster2".to_owned(),
648            vec![KafkaConfigParam {
649                name: "bootstrap.servers".to_owned(),
650                value: "cluster2:9092".to_owned(),
651            }],
652        );
653
654        let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
655
656        let events_configs = topics
657            .events
658            .kafka_configs(&def_config, &second_config)
659            .expect("Kafka config for per-shard rate limits");
660
661        insta::assert_debug_snapshot!(events_configs, @r###"
662        KafkaTopicConfig(
663            [
664                KafkaParams {
665                    topic_name: "shard-0",
666                    config_name: Some(
667                        "cluster1",
668                    ),
669                    params: [
670                        KafkaConfigParam {
671                            name: "bootstrap.servers",
672                            value: "cluster1:9092",
673                        },
674                    ],
675                    key_rate_limit: Some(
676                        KeyRateLimit {
677                            limit_per_window: 100,
678                            window_secs: 60,
679                        },
680                    ),
681                },
682                KafkaParams {
683                    topic_name: "shard-1",
684                    config_name: Some(
685                        "cluster2",
686                    ),
687                    params: [
688                        KafkaConfigParam {
689                            name: "bootstrap.servers",
690                            value: "cluster2:9092",
691                        },
692                    ],
693                    key_rate_limit: Some(
694                        KeyRateLimit {
695                            limit_per_window: 200,
696                            window_secs: 120,
697                        },
698                    ),
699                },
700                KafkaParams {
701                    topic_name: "shard-2",
702                    config_name: None,
703                    params: [
704                        KafkaConfigParam {
705                            name: "bootstrap.servers",
706                            value: "primary:9092",
707                        },
708                    ],
709                    key_rate_limit: None,
710                },
711            ],
712        )
713        "###);
714    }
715}