relay_kafka/
config.rs

1//! Configuration primitives to configure the kafka producer and properly set up the connection.
2
3use std::collections::BTreeMap;
4
5use serde::{Deserialize, Serialize, de};
6use thiserror::Error;
7
8/// Kafka configuration errors.
9#[derive(Error, Debug)]
10pub enum ConfigError {
11    /// The user referenced a kafka config name that does not exist.
12    #[error("unknown kafka config name")]
13    UnknownKafkaConfigName,
14    /// The user did not configure 0 shard
15    #[error("invalid kafka shard configuration: must have shard with index 0")]
16    InvalidShard,
17}
18
19/// Define the topics over which Relay communicates with Sentry.
20#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
21pub enum KafkaTopic {
22    /// Simple events (without attachments) topic.
23    Events,
24    /// Complex events (with attachments) topic.
25    Attachments,
26    /// Transaction events topic.
27    Transactions,
28    /// Shared outcomes topic for Relay and Sentry.
29    Outcomes,
30    /// Override for billing critical outcomes.
31    OutcomesBilling,
32    /// Any metric that is extracted from sessions.
33    MetricsSessions,
34    /// Generic metrics topic, excluding sessions (release health).
35    MetricsGeneric,
36    /// Profiles
37    Profiles,
38    /// ReplayEvents, breadcrumb + session updates for replays
39    ReplayEvents,
40    /// ReplayRecordings, large blobs sent by the replay sdk
41    ReplayRecordings,
42    /// Monitor check-ins.
43    Monitors,
44    /// Standalone spans without a transaction.
45    Spans,
46    /// Feedback events topic.
47    Feedback,
48    /// Items topic
49    Items,
50}
51
52impl KafkaTopic {
53    /// Returns iterator over the variants of [`KafkaTopic`].
54    /// It will have to be adjusted if the new variants are added.
55    pub fn iter() -> std::slice::Iter<'static, Self> {
56        use KafkaTopic::*;
57        static TOPICS: [KafkaTopic; 14] = [
58            Events,
59            Attachments,
60            Transactions,
61            Outcomes,
62            OutcomesBilling,
63            MetricsSessions,
64            MetricsGeneric,
65            Profiles,
66            ReplayEvents,
67            ReplayRecordings,
68            Monitors,
69            Spans,
70            Feedback,
71            Items,
72        ];
73        TOPICS.iter()
74    }
75}
76
77macro_rules! define_topic_assignments {
78    ($($field_name:ident : ($kafka_topic:path, $default_topic:literal, $doc:literal)),* $(,)?) => {
79        /// Configuration for topics.
80        #[derive(Deserialize, Serialize, Debug)]
81        #[serde(default)]
82        pub struct TopicAssignments {
83            $(
84                #[serde(alias = $default_topic)]
85                #[doc = $doc]
86                pub $field_name: TopicAssignment,
87            )*
88
89            /// Additional topic assignments configured but currently unused by this Relay instance.
90            #[serde(flatten, skip_serializing)]
91            pub unused: Unused,
92        }
93
94        impl TopicAssignments{
95            /// Get a topic assignment by [`KafkaTopic`] value
96            #[must_use]
97            pub fn get(&self, kafka_topic: KafkaTopic) -> &TopicAssignment {
98                match kafka_topic {
99                    $(
100                        $kafka_topic => &self.$field_name,
101                    )*
102                }
103            }
104        }
105
106        impl KafkaTopic {
107            /// Map this KafkaTopic to the "logical topic", i.e. the default topic name.
108            pub fn logical_topic_name(&self) -> &'static str {
109                match self {
110                    $(
111                        $kafka_topic => $default_topic,
112                    )*
113                }
114            }
115        }
116
117        impl Default for TopicAssignments {
118            fn default() -> Self {
119                Self {
120                    $(
121                        $field_name: $default_topic.to_owned().into(),
122                    )*
123                    unused: Default::default()
124                }
125            }
126        }
127    };
128}
129
130// WARNING: When adding a topic here, make sure that the kafka topic exists or can be auto-created.
131// Failure to do so will result in Relay crashing (if the `kafka_validate_topics` config flag is enabled),
132// or event loss in the store service.
133define_topic_assignments! {
134    events: (KafkaTopic::Events, "ingest-events", "Simple events topic name."),
135    attachments: (KafkaTopic::Attachments, "ingest-attachments", "Events with attachments topic name."),
136    transactions: (KafkaTopic::Transactions, "ingest-transactions", "Transaction events topic name."),
137    outcomes: (KafkaTopic::Outcomes, "outcomes", "Outcomes topic name."),
138    outcomes_billing: (KafkaTopic::OutcomesBilling, "outcomes-billing", "Outcomes topic name for billing critical outcomes."),
139    metrics_sessions: (KafkaTopic::MetricsSessions, "ingest-metrics", "Topic name for metrics extracted from sessions, aka release health."),
140    metrics_generic: (KafkaTopic::MetricsGeneric, "ingest-performance-metrics", "Topic name for all other kinds of metrics."),
141    profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"),
142    replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."),
143    replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."),
144    monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."),
145    spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."),
146    feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."),
147    items: (KafkaTopic::Items, "snuba-items", "Items topic."),
148}
149
150/// A list of all currently, by this Relay, unused topic configurations.
151#[derive(Debug, Default)]
152pub struct Unused(Vec<String>);
153
154impl Unused {
155    /// Returns all unused topic names.
156    pub fn names(&self) -> &[String] {
157        &self.0
158    }
159}
160
161impl<'de> de::Deserialize<'de> for Unused {
162    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
163    where
164        D: de::Deserializer<'de>,
165    {
166        let topics = BTreeMap::<String, de::IgnoredAny>::deserialize(deserializer)?;
167        Ok(Self(topics.into_keys().collect()))
168    }
169}
170
171/// Configuration for a "logical" topic/datasink that Relay should forward data into.
172///
173/// Can be either a string containing the kafka topic name to produce into (using the default
174/// `kafka_config`), an object containing keys `topic_name` and `kafka_config_name` for using a
175/// custom kafka cluster, or an array of topic names/configs for sharded topics.
176///
177/// See documentation for `secondary_kafka_configs` for more information.
178#[derive(Debug, Serialize)]
179pub struct TopicAssignment(Vec<TopicConfig>);
180
181impl<'de> de::Deserialize<'de> for TopicAssignment {
182    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
183    where
184        D: de::Deserializer<'de>,
185    {
186        #[derive(Deserialize, Debug)]
187        #[serde(untagged)]
188        enum Inner {
189            // order matters. structs can be deserialized from arrays.
190            ShardedPrimary(Vec<String>),
191            ShardedSecondary(Vec<TopicConfig>),
192            Primary(String),
193            Secondary(TopicConfig),
194        }
195
196        let configs = match Inner::deserialize(deserializer)? {
197            Inner::Primary(topic_name) => vec![topic_name.into()],
198            Inner::Secondary(config) => vec![config],
199            Inner::ShardedPrimary(topic_names) => topic_names.into_iter().map(From::from).collect(),
200            Inner::ShardedSecondary(configs) => configs,
201        };
202
203        if configs.is_empty() {
204            return Err(de::Error::custom(
205                "topic assignment must have at least one shard",
206            ));
207        }
208
209        Ok(Self(configs))
210    }
211}
212
213/// Configuration for topic
214#[derive(Debug, Deserialize, Serialize)]
215pub struct TopicConfig {
216    /// The topic name to use.
217    #[serde(rename = "name")]
218    topic_name: String,
219    /// The Kafka config name will be used to produce data to the given topic.
220    ///
221    /// If the config is missing, the default config will be used.
222    #[serde(rename = "config", skip_serializing_if = "Option::is_none")]
223    kafka_config_name: Option<String>,
224    /// Optionally, a rate limit per partition key to protect against partition imbalance.
225    #[serde(default, skip_serializing_if = "Option::is_none")]
226    key_rate_limit: Option<KeyRateLimit>,
227}
228
229impl From<String> for TopicConfig {
230    fn from(topic_name: String) -> Self {
231        Self {
232            topic_name,
233            kafka_config_name: None,
234            key_rate_limit: None,
235        }
236    }
237}
238
239/// Produce rate limit configuration for a topic.
240#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
241pub struct KeyRateLimit {
242    /// Limit each partition key to N messages per `window_secs`.
243    pub limit_per_window: u64,
244
245    /// The size of the window to record counters for.
246    ///
247    /// Larger windows imply higher memory usage.
248    pub window_secs: u64,
249}
250
251/// A Kafka config for a topic.
252///
253/// This internally includes configuration for multiple 'physical' Kafka topics,
254/// as Relay can shard to multiple topics at once.
255#[derive(Debug)]
256pub struct KafkaTopicConfig<'a>(Vec<KafkaParams<'a>>);
257
258impl<'a> KafkaTopicConfig<'a> {
259    /// Kafka params for each psysical shard.
260    pub fn topics(&self) -> &[KafkaParams<'a>] {
261        &self.0
262    }
263}
264
265/// Config for creating a Kafka producer.
266#[derive(Debug)]
267pub struct KafkaParams<'a> {
268    /// The topic names to use. Can be a single topic or multiple topics for sharding.
269    pub topic_name: String,
270    /// The Kafka config name will be used to produce data.
271    pub config_name: Option<&'a str>,
272    /// Parameters for the Kafka producer configuration.
273    pub params: &'a [KafkaConfigParam],
274    /// Optionally, a rate limit per partition key to protect against partition imbalance.
275    pub key_rate_limit: Option<KeyRateLimit>,
276}
277
278impl From<String> for TopicAssignment {
279    fn from(topic_name: String) -> Self {
280        Self(vec![topic_name.into()])
281    }
282}
283
284impl TopicAssignment {
285    /// Get the Kafka configs for the current topic assignment.
286    ///
287    /// # Errors
288    /// Returns [`ConfigError`] if the configuration for the current topic assignment is invalid.
289    pub fn kafka_configs<'a>(
290        &'a self,
291        default_config: &'a Vec<KafkaConfigParam>,
292        secondary_configs: &'a BTreeMap<String, Vec<KafkaConfigParam>>,
293    ) -> Result<KafkaTopicConfig<'a>, ConfigError> {
294        let configs = self
295            .0
296            .iter()
297            .map(|tc| {
298                Ok(KafkaParams {
299                    topic_name: tc.topic_name.clone(),
300                    config_name: tc.kafka_config_name.as_deref(),
301                    params: match &tc.kafka_config_name {
302                        Some(config) => secondary_configs
303                            .get(config)
304                            .ok_or(ConfigError::UnknownKafkaConfigName)?,
305                        None => default_config.as_slice(),
306                    },
307                    key_rate_limit: tc.key_rate_limit,
308                })
309            })
310            .collect::<Result<_, _>>()?;
311
312        Ok(KafkaTopicConfig(configs))
313    }
314}
315
316/// A name value pair of Kafka config parameter.
317#[derive(Debug, Deserialize, Serialize)]
318pub struct KafkaConfigParam {
319    /// Name of the Kafka config parameter.
320    pub name: String,
321    /// Value of the Kafka config parameter.
322    pub value: String,
323}
324
325#[cfg(test)]
326mod tests {
327
328    use super::*;
329
330    #[test]
331    fn test_kafka_config() {
332        let yaml = r#"
333ingest-events: "ingest-events-kafka-topic"
334profiles:
335    name: "ingest-profiles"
336    config: "profiles"
337ingest-metrics: "ingest-metrics-3"
338transactions: "ingest-transactions-kafka-topic"
339"#;
340
341        let mut second_config = BTreeMap::new();
342        second_config.insert(
343            "profiles".to_owned(),
344            vec![KafkaConfigParam {
345                name: "test".to_owned(),
346                value: "test-value".to_owned(),
347            }],
348        );
349
350        let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
351        insta::assert_debug_snapshot!(topics, @r###"
352        TopicAssignments {
353            events: TopicAssignment(
354                [
355                    TopicConfig {
356                        topic_name: "ingest-events-kafka-topic",
357                        kafka_config_name: None,
358                        key_rate_limit: None,
359                    },
360                ],
361            ),
362            attachments: TopicAssignment(
363                [
364                    TopicConfig {
365                        topic_name: "ingest-attachments",
366                        kafka_config_name: None,
367                        key_rate_limit: None,
368                    },
369                ],
370            ),
371            transactions: TopicAssignment(
372                [
373                    TopicConfig {
374                        topic_name: "ingest-transactions-kafka-topic",
375                        kafka_config_name: None,
376                        key_rate_limit: None,
377                    },
378                ],
379            ),
380            outcomes: TopicAssignment(
381                [
382                    TopicConfig {
383                        topic_name: "outcomes",
384                        kafka_config_name: None,
385                        key_rate_limit: None,
386                    },
387                ],
388            ),
389            outcomes_billing: TopicAssignment(
390                [
391                    TopicConfig {
392                        topic_name: "outcomes-billing",
393                        kafka_config_name: None,
394                        key_rate_limit: None,
395                    },
396                ],
397            ),
398            metrics_sessions: TopicAssignment(
399                [
400                    TopicConfig {
401                        topic_name: "ingest-metrics-3",
402                        kafka_config_name: None,
403                        key_rate_limit: None,
404                    },
405                ],
406            ),
407            metrics_generic: TopicAssignment(
408                [
409                    TopicConfig {
410                        topic_name: "ingest-performance-metrics",
411                        kafka_config_name: None,
412                        key_rate_limit: None,
413                    },
414                ],
415            ),
416            profiles: TopicAssignment(
417                [
418                    TopicConfig {
419                        topic_name: "ingest-profiles",
420                        kafka_config_name: Some(
421                            "profiles",
422                        ),
423                        key_rate_limit: None,
424                    },
425                ],
426            ),
427            replay_events: TopicAssignment(
428                [
429                    TopicConfig {
430                        topic_name: "ingest-replay-events",
431                        kafka_config_name: None,
432                        key_rate_limit: None,
433                    },
434                ],
435            ),
436            replay_recordings: TopicAssignment(
437                [
438                    TopicConfig {
439                        topic_name: "ingest-replay-recordings",
440                        kafka_config_name: None,
441                        key_rate_limit: None,
442                    },
443                ],
444            ),
445            monitors: TopicAssignment(
446                [
447                    TopicConfig {
448                        topic_name: "ingest-monitors",
449                        kafka_config_name: None,
450                        key_rate_limit: None,
451                    },
452                ],
453            ),
454            spans: TopicAssignment(
455                [
456                    TopicConfig {
457                        topic_name: "snuba-spans",
458                        kafka_config_name: None,
459                        key_rate_limit: None,
460                    },
461                ],
462            ),
463            feedback: TopicAssignment(
464                [
465                    TopicConfig {
466                        topic_name: "ingest-feedback-events",
467                        kafka_config_name: None,
468                        key_rate_limit: None,
469                    },
470                ],
471            ),
472            items: TopicAssignment(
473                [
474                    TopicConfig {
475                        topic_name: "snuba-items",
476                        kafka_config_name: None,
477                        key_rate_limit: None,
478                    },
479                ],
480            ),
481            unused: Unused(
482                [],
483            ),
484        }
485        "###);
486    }
487
488    #[test]
489    fn test_default_topic_is_valid() {
490        // A few topics are not defined currently, remove this once added to `sentry-kafka-schemas`.
491        let currrently_undefined_topics = [
492            "ingest-attachments",
493            "ingest-transactions",
494            "profiles",
495            "ingest-monitors",
496        ];
497
498        for topic in KafkaTopic::iter() {
499            let name = topic.logical_topic_name();
500            if !currrently_undefined_topics.contains(&name) {
501                assert!(sentry_kafka_schemas::get_schema(name, None).is_ok());
502            }
503        }
504    }
505
506    #[test]
507    fn test_sharded_kafka_config() {
508        let yaml = r#"
509events: ["ingest-events-1", "ingest-events-2"]
510profiles:
511  - name: "ingest-profiles-1"
512    config: "profiles"
513  - name: "ingest-profiles-2"
514    config: "profiles"
515"#;
516        let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
517
518        let def_config = vec![KafkaConfigParam {
519            name: "test".to_owned(),
520            value: "test-value".to_owned(),
521        }];
522        let mut second_config = BTreeMap::new();
523        second_config.insert(
524            "profiles".to_owned(),
525            vec![KafkaConfigParam {
526                name: "test".to_owned(),
527                value: "test-value".to_owned(),
528            }],
529        );
530
531        let events_configs = topics
532            .events
533            .kafka_configs(&def_config, &second_config)
534            .expect("Kafka config for sharded events topic");
535
536        insta::assert_debug_snapshot!(events_configs, @r###"
537        KafkaTopicConfig(
538            [
539                KafkaParams {
540                    topic_name: "ingest-events-1",
541                    config_name: None,
542                    params: [
543                        KafkaConfigParam {
544                            name: "test",
545                            value: "test-value",
546                        },
547                    ],
548                    key_rate_limit: None,
549                },
550                KafkaParams {
551                    topic_name: "ingest-events-2",
552                    config_name: None,
553                    params: [
554                        KafkaConfigParam {
555                            name: "test",
556                            value: "test-value",
557                        },
558                    ],
559                    key_rate_limit: None,
560                },
561            ],
562        )
563        "###);
564
565        let profiles_configs = topics
566            .profiles
567            .kafka_configs(&def_config, &second_config)
568            .expect("Kafka config for sharded profiles topic");
569
570        insta::assert_debug_snapshot!(profiles_configs, @r###"
571        KafkaTopicConfig(
572            [
573                KafkaParams {
574                    topic_name: "ingest-profiles-1",
575                    config_name: Some(
576                        "profiles",
577                    ),
578                    params: [
579                        KafkaConfigParam {
580                            name: "test",
581                            value: "test-value",
582                        },
583                    ],
584                    key_rate_limit: None,
585                },
586                KafkaParams {
587                    topic_name: "ingest-profiles-2",
588                    config_name: Some(
589                        "profiles",
590                    ),
591                    params: [
592                        KafkaConfigParam {
593                            name: "test",
594                            value: "test-value",
595                        },
596                    ],
597                    key_rate_limit: None,
598                },
599            ],
600        )
601        "###);
602    }
603
604    #[test]
605    fn test_per_shard_rate_limits() {
606        let yaml = r#"
607events:
608  - name: "shard-0"
609    config: "cluster1"
610    key_rate_limit:
611      limit_per_window: 100
612      window_secs: 60
613  - name: "shard-1"
614    config: "cluster2"
615    key_rate_limit:
616      limit_per_window: 200
617      window_secs: 120
618  - name: "shard-2"  # No rate limit (Primary variant)
619"#;
620
621        let def_config = vec![KafkaConfigParam {
622            name: "bootstrap.servers".to_owned(),
623            value: "primary:9092".to_owned(),
624        }];
625        let mut second_config = BTreeMap::new();
626        second_config.insert(
627            "cluster1".to_owned(),
628            vec![KafkaConfigParam {
629                name: "bootstrap.servers".to_owned(),
630                value: "cluster1:9092".to_owned(),
631            }],
632        );
633        second_config.insert(
634            "cluster2".to_owned(),
635            vec![KafkaConfigParam {
636                name: "bootstrap.servers".to_owned(),
637                value: "cluster2:9092".to_owned(),
638            }],
639        );
640
641        let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
642
643        let events_configs = topics
644            .events
645            .kafka_configs(&def_config, &second_config)
646            .expect("Kafka config for per-shard rate limits");
647
648        insta::assert_debug_snapshot!(events_configs, @r###"
649        KafkaTopicConfig(
650            [
651                KafkaParams {
652                    topic_name: "shard-0",
653                    config_name: Some(
654                        "cluster1",
655                    ),
656                    params: [
657                        KafkaConfigParam {
658                            name: "bootstrap.servers",
659                            value: "cluster1:9092",
660                        },
661                    ],
662                    key_rate_limit: Some(
663                        KeyRateLimit {
664                            limit_per_window: 100,
665                            window_secs: 60,
666                        },
667                    ),
668                },
669                KafkaParams {
670                    topic_name: "shard-1",
671                    config_name: Some(
672                        "cluster2",
673                    ),
674                    params: [
675                        KafkaConfigParam {
676                            name: "bootstrap.servers",
677                            value: "cluster2:9092",
678                        },
679                    ],
680                    key_rate_limit: Some(
681                        KeyRateLimit {
682                            limit_per_window: 200,
683                            window_secs: 120,
684                        },
685                    ),
686                },
687                KafkaParams {
688                    topic_name: "shard-2",
689                    config_name: None,
690                    params: [
691                        KafkaConfigParam {
692                            name: "bootstrap.servers",
693                            value: "primary:9092",
694                        },
695                    ],
696                    key_rate_limit: None,
697                },
698            ],
699        )
700        "###);
701    }
702}