relay_kafka/
config.rs

1//! Configuration primitives to configure the kafka producer and properly set up the connection.
2
3use std::collections::BTreeMap;
4
5use serde::{Deserialize, Serialize, de};
6use thiserror::Error;
7
8/// Kafka configuration errors.
9#[derive(Error, Debug)]
10pub enum ConfigError {
11    /// The user referenced a kafka config name that does not exist.
12    #[error("unknown kafka config name")]
13    UnknownKafkaConfigName,
14    /// The user did not configure 0 shard
15    #[error("invalid kafka shard configuration: must have shard with index 0")]
16    InvalidShard,
17}
18
19/// Define the topics over which Relay communicates with Sentry.
20#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
21pub enum KafkaTopic {
22    /// Simple events (without attachments) topic.
23    Events,
24    /// Complex events (with attachments) topic.
25    Attachments,
26    /// Transaction events topic.
27    Transactions,
28    /// Shared outcomes topic for Relay and Sentry.
29    Outcomes,
30    /// Override for billing critical outcomes.
31    OutcomesBilling,
32    /// Any metric that is extracted from sessions.
33    MetricsSessions,
34    /// Generic metrics topic, excluding sessions (release health).
35    MetricsGeneric,
36    /// Profiles
37    Profiles,
38    /// ReplayRecordings, large blobs sent by the replay sdk
39    ReplayRecordings,
40    /// Monitor check-ins.
41    Monitors,
42    /// Standalone spans without a transaction.
43    Spans,
44    /// Feedback events topic.
45    Feedback,
46    /// Items topic
47    Items,
48}
49
50impl KafkaTopic {
51    /// Returns iterator over the variants of [`KafkaTopic`].
52    /// It will have to be adjusted if the new variants are added.
53    pub fn iter() -> std::slice::Iter<'static, Self> {
54        use KafkaTopic::*;
55        static TOPICS: [KafkaTopic; 13] = [
56            Events,
57            Attachments,
58            Transactions,
59            Outcomes,
60            OutcomesBilling,
61            MetricsSessions,
62            MetricsGeneric,
63            Profiles,
64            ReplayRecordings,
65            Monitors,
66            Spans,
67            Feedback,
68            Items,
69        ];
70        TOPICS.iter()
71    }
72}
73
74macro_rules! define_topic_assignments {
75    ($($field_name:ident : ($kafka_topic:path, $default_topic:literal, $doc:literal)),* $(,)?) => {
76        /// Configuration for topics.
77        #[derive(Deserialize, Serialize, Debug)]
78        #[serde(default)]
79        pub struct TopicAssignments {
80            $(
81                #[serde(alias = $default_topic)]
82                #[doc = $doc]
83                pub $field_name: TopicAssignment,
84            )*
85
86            /// Additional topic assignments configured but currently unused by this Relay instance.
87            #[serde(flatten, skip_serializing)]
88            pub unused: Unused,
89        }
90
91        impl TopicAssignments{
92            /// Get a topic assignment by [`KafkaTopic`] value
93            #[must_use]
94            pub fn get(&self, kafka_topic: KafkaTopic) -> &TopicAssignment {
95                match kafka_topic {
96                    $(
97                        $kafka_topic => &self.$field_name,
98                    )*
99                }
100            }
101        }
102
103        impl KafkaTopic {
104            /// Map this KafkaTopic to the "logical topic", i.e. the default topic name.
105            pub fn logical_topic_name(&self) -> &'static str {
106                match self {
107                    $(
108                        $kafka_topic => $default_topic,
109                    )*
110                }
111            }
112        }
113
114        impl Default for TopicAssignments {
115            fn default() -> Self {
116                Self {
117                    $(
118                        $field_name: $default_topic.to_owned().into(),
119                    )*
120                    unused: Default::default()
121                }
122            }
123        }
124    };
125}
126
127// WARNING: When adding a topic here, make sure that the kafka topic exists or can be auto-created.
128// Failure to do so will result in Relay crashing (if the `kafka_validate_topics` config flag is enabled),
129// or event loss in the store service.
130define_topic_assignments! {
131    events: (KafkaTopic::Events, "ingest-events", "Simple events topic name."),
132    attachments: (KafkaTopic::Attachments, "ingest-attachments", "Events with attachments topic name."),
133    transactions: (KafkaTopic::Transactions, "ingest-transactions", "Transaction events topic name."),
134    outcomes: (KafkaTopic::Outcomes, "outcomes", "Outcomes topic name."),
135    outcomes_billing: (KafkaTopic::OutcomesBilling, "outcomes-billing", "Outcomes topic name for billing critical outcomes."),
136    metrics_sessions: (KafkaTopic::MetricsSessions, "ingest-metrics", "Topic name for metrics extracted from sessions, aka release health."),
137    metrics_generic: (KafkaTopic::MetricsGeneric, "ingest-performance-metrics", "Topic name for all other kinds of metrics."),
138    profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"),
139    replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."),
140    monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."),
141    spans: (KafkaTopic::Spans, "ingest-spans", "Standalone spans without a transaction."),
142    feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."),
143    items: (KafkaTopic::Items, "snuba-items", "Items topic."),
144}
145
146/// A list of all currently, by this Relay, unused topic configurations.
147#[derive(Debug, Default)]
148pub struct Unused(Vec<String>);
149
150impl Unused {
151    /// Returns all unused topic names.
152    pub fn names(&self) -> &[String] {
153        &self.0
154    }
155}
156
157impl<'de> de::Deserialize<'de> for Unused {
158    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
159    where
160        D: de::Deserializer<'de>,
161    {
162        let topics = BTreeMap::<String, de::IgnoredAny>::deserialize(deserializer)?;
163        Ok(Self(topics.into_keys().collect()))
164    }
165}
166
167/// Configuration for a "logical" topic/datasink that Relay should forward data into.
168///
169/// Can be either a string containing the kafka topic name to produce into (using the default
170/// `kafka_config`), an object containing keys `topic_name` and `kafka_config_name` for using a
171/// custom kafka cluster, or an array of topic names/configs for sharded topics.
172///
173/// See documentation for `secondary_kafka_configs` for more information.
174#[derive(Debug, Serialize)]
175pub struct TopicAssignment(Vec<TopicConfig>);
176
177impl<'de> de::Deserialize<'de> for TopicAssignment {
178    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
179    where
180        D: de::Deserializer<'de>,
181    {
182        #[derive(Deserialize, Debug)]
183        #[serde(untagged)]
184        enum Inner {
185            // order matters. structs can be deserialized from arrays.
186            ShardedPrimary(Vec<String>),
187            ShardedSecondary(Vec<TopicConfig>),
188            Primary(String),
189            Secondary(TopicConfig),
190        }
191
192        let configs = match Inner::deserialize(deserializer)? {
193            Inner::Primary(topic_name) => vec![topic_name.into()],
194            Inner::Secondary(config) => vec![config],
195            Inner::ShardedPrimary(topic_names) => topic_names.into_iter().map(From::from).collect(),
196            Inner::ShardedSecondary(configs) => configs,
197        };
198
199        if configs.is_empty() {
200            return Err(de::Error::custom(
201                "topic assignment must have at least one shard",
202            ));
203        }
204
205        Ok(Self(configs))
206    }
207}
208
209/// Configuration for topic
210#[derive(Debug, Deserialize, Serialize)]
211pub struct TopicConfig {
212    /// The topic name to use.
213    #[serde(rename = "name")]
214    topic_name: String,
215    /// The Kafka config name will be used to produce data to the given topic.
216    ///
217    /// If the config is missing, the default config will be used.
218    #[serde(rename = "config", skip_serializing_if = "Option::is_none")]
219    kafka_config_name: Option<String>,
220    /// Optionally, a rate limit per partition key to protect against partition imbalance.
221    #[serde(default, skip_serializing_if = "Option::is_none")]
222    key_rate_limit: Option<KeyRateLimit>,
223}
224
225impl From<String> for TopicConfig {
226    fn from(topic_name: String) -> Self {
227        Self {
228            topic_name,
229            kafka_config_name: None,
230            key_rate_limit: None,
231        }
232    }
233}
234
235/// Produce rate limit configuration for a topic.
236#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
237pub struct KeyRateLimit {
238    /// Limit each partition key to N messages per `window_secs`.
239    pub limit_per_window: u64,
240
241    /// The size of the window to record counters for.
242    ///
243    /// Larger windows imply higher memory usage.
244    pub window_secs: u64,
245}
246
247/// A Kafka config for a topic.
248///
249/// This internally includes configuration for multiple 'physical' Kafka topics,
250/// as Relay can shard to multiple topics at once.
251#[derive(Debug)]
252pub struct KafkaTopicConfig<'a>(Vec<KafkaParams<'a>>);
253
254impl<'a> KafkaTopicConfig<'a> {
255    /// Kafka params for each psysical shard.
256    pub fn topics(&self) -> &[KafkaParams<'a>] {
257        &self.0
258    }
259}
260
261/// Config for creating a Kafka producer.
262#[derive(Debug)]
263pub struct KafkaParams<'a> {
264    /// The topic names to use. Can be a single topic or multiple topics for sharding.
265    pub topic_name: String,
266    /// The Kafka config name will be used to produce data.
267    pub config_name: Option<&'a str>,
268    /// Parameters for the Kafka producer configuration.
269    pub params: &'a [KafkaConfigParam],
270    /// Optionally, a rate limit per partition key to protect against partition imbalance.
271    pub key_rate_limit: Option<KeyRateLimit>,
272}
273
274impl From<String> for TopicAssignment {
275    fn from(topic_name: String) -> Self {
276        Self(vec![topic_name.into()])
277    }
278}
279
280impl TopicAssignment {
281    /// Get the Kafka configs for the current topic assignment.
282    ///
283    /// # Errors
284    /// Returns [`ConfigError`] if the configuration for the current topic assignment is invalid.
285    pub fn kafka_configs<'a>(
286        &'a self,
287        default_config: &'a Vec<KafkaConfigParam>,
288        secondary_configs: &'a BTreeMap<String, Vec<KafkaConfigParam>>,
289    ) -> Result<KafkaTopicConfig<'a>, ConfigError> {
290        let configs = self
291            .0
292            .iter()
293            .map(|tc| {
294                Ok(KafkaParams {
295                    topic_name: tc.topic_name.clone(),
296                    config_name: tc.kafka_config_name.as_deref(),
297                    params: match &tc.kafka_config_name {
298                        Some(config) => secondary_configs
299                            .get(config)
300                            .ok_or(ConfigError::UnknownKafkaConfigName)?,
301                        None => default_config.as_slice(),
302                    },
303                    key_rate_limit: tc.key_rate_limit,
304                })
305            })
306            .collect::<Result<_, _>>()?;
307
308        Ok(KafkaTopicConfig(configs))
309    }
310}
311
312/// A name value pair of Kafka config parameter.
313#[derive(Debug, Deserialize, Serialize)]
314pub struct KafkaConfigParam {
315    /// Name of the Kafka config parameter.
316    pub name: String,
317    /// Value of the Kafka config parameter.
318    pub value: String,
319}
320
321#[cfg(test)]
322mod tests {
323
324    use super::*;
325
326    #[test]
327    fn test_kafka_config() {
328        let yaml = r#"
329ingest-events: "ingest-events-kafka-topic"
330profiles:
331    name: "ingest-profiles"
332    config: "profiles"
333ingest-metrics: "ingest-metrics-3"
334transactions: "ingest-transactions-kafka-topic"
335"#;
336
337        let mut second_config = BTreeMap::new();
338        second_config.insert(
339            "profiles".to_owned(),
340            vec![KafkaConfigParam {
341                name: "test".to_owned(),
342                value: "test-value".to_owned(),
343            }],
344        );
345
346        let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
347        insta::assert_debug_snapshot!(topics, @r#"
348        TopicAssignments {
349            events: TopicAssignment(
350                [
351                    TopicConfig {
352                        topic_name: "ingest-events-kafka-topic",
353                        kafka_config_name: None,
354                        key_rate_limit: None,
355                    },
356                ],
357            ),
358            attachments: TopicAssignment(
359                [
360                    TopicConfig {
361                        topic_name: "ingest-attachments",
362                        kafka_config_name: None,
363                        key_rate_limit: None,
364                    },
365                ],
366            ),
367            transactions: TopicAssignment(
368                [
369                    TopicConfig {
370                        topic_name: "ingest-transactions-kafka-topic",
371                        kafka_config_name: None,
372                        key_rate_limit: None,
373                    },
374                ],
375            ),
376            outcomes: TopicAssignment(
377                [
378                    TopicConfig {
379                        topic_name: "outcomes",
380                        kafka_config_name: None,
381                        key_rate_limit: None,
382                    },
383                ],
384            ),
385            outcomes_billing: TopicAssignment(
386                [
387                    TopicConfig {
388                        topic_name: "outcomes-billing",
389                        kafka_config_name: None,
390                        key_rate_limit: None,
391                    },
392                ],
393            ),
394            metrics_sessions: TopicAssignment(
395                [
396                    TopicConfig {
397                        topic_name: "ingest-metrics-3",
398                        kafka_config_name: None,
399                        key_rate_limit: None,
400                    },
401                ],
402            ),
403            metrics_generic: TopicAssignment(
404                [
405                    TopicConfig {
406                        topic_name: "ingest-performance-metrics",
407                        kafka_config_name: None,
408                        key_rate_limit: None,
409                    },
410                ],
411            ),
412            profiles: TopicAssignment(
413                [
414                    TopicConfig {
415                        topic_name: "ingest-profiles",
416                        kafka_config_name: Some(
417                            "profiles",
418                        ),
419                        key_rate_limit: None,
420                    },
421                ],
422            ),
423            replay_recordings: TopicAssignment(
424                [
425                    TopicConfig {
426                        topic_name: "ingest-replay-recordings",
427                        kafka_config_name: None,
428                        key_rate_limit: None,
429                    },
430                ],
431            ),
432            monitors: TopicAssignment(
433                [
434                    TopicConfig {
435                        topic_name: "ingest-monitors",
436                        kafka_config_name: None,
437                        key_rate_limit: None,
438                    },
439                ],
440            ),
441            spans: TopicAssignment(
442                [
443                    TopicConfig {
444                        topic_name: "ingest-spans",
445                        kafka_config_name: None,
446                        key_rate_limit: None,
447                    },
448                ],
449            ),
450            feedback: TopicAssignment(
451                [
452                    TopicConfig {
453                        topic_name: "ingest-feedback-events",
454                        kafka_config_name: None,
455                        key_rate_limit: None,
456                    },
457                ],
458            ),
459            items: TopicAssignment(
460                [
461                    TopicConfig {
462                        topic_name: "snuba-items",
463                        kafka_config_name: None,
464                        key_rate_limit: None,
465                    },
466                ],
467            ),
468            unused: Unused(
469                [],
470            ),
471        }
472        "#);
473    }
474
475    #[test]
476    fn test_default_topic_is_valid() {
477        for topic in KafkaTopic::iter() {
478            let name = topic.logical_topic_name();
479            assert!(sentry_kafka_schemas::get_schema(name, None).is_ok());
480        }
481    }
482
483    #[test]
484    fn test_sharded_kafka_config() {
485        let yaml = r#"
486events: ["ingest-events-1", "ingest-events-2"]
487profiles:
488  - name: "ingest-profiles-1"
489    config: "profiles"
490  - name: "ingest-profiles-2"
491    config: "profiles"
492"#;
493        let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
494
495        let def_config = vec![KafkaConfigParam {
496            name: "test".to_owned(),
497            value: "test-value".to_owned(),
498        }];
499        let mut second_config = BTreeMap::new();
500        second_config.insert(
501            "profiles".to_owned(),
502            vec![KafkaConfigParam {
503                name: "test".to_owned(),
504                value: "test-value".to_owned(),
505            }],
506        );
507
508        let events_configs = topics
509            .events
510            .kafka_configs(&def_config, &second_config)
511            .expect("Kafka config for sharded events topic");
512
513        insta::assert_debug_snapshot!(events_configs, @r###"
514        KafkaTopicConfig(
515            [
516                KafkaParams {
517                    topic_name: "ingest-events-1",
518                    config_name: None,
519                    params: [
520                        KafkaConfigParam {
521                            name: "test",
522                            value: "test-value",
523                        },
524                    ],
525                    key_rate_limit: None,
526                },
527                KafkaParams {
528                    topic_name: "ingest-events-2",
529                    config_name: None,
530                    params: [
531                        KafkaConfigParam {
532                            name: "test",
533                            value: "test-value",
534                        },
535                    ],
536                    key_rate_limit: None,
537                },
538            ],
539        )
540        "###);
541
542        let profiles_configs = topics
543            .profiles
544            .kafka_configs(&def_config, &second_config)
545            .expect("Kafka config for sharded profiles topic");
546
547        insta::assert_debug_snapshot!(profiles_configs, @r###"
548        KafkaTopicConfig(
549            [
550                KafkaParams {
551                    topic_name: "ingest-profiles-1",
552                    config_name: Some(
553                        "profiles",
554                    ),
555                    params: [
556                        KafkaConfigParam {
557                            name: "test",
558                            value: "test-value",
559                        },
560                    ],
561                    key_rate_limit: None,
562                },
563                KafkaParams {
564                    topic_name: "ingest-profiles-2",
565                    config_name: Some(
566                        "profiles",
567                    ),
568                    params: [
569                        KafkaConfigParam {
570                            name: "test",
571                            value: "test-value",
572                        },
573                    ],
574                    key_rate_limit: None,
575                },
576            ],
577        )
578        "###);
579    }
580
581    #[test]
582    fn test_per_shard_rate_limits() {
583        let yaml = r#"
584events:
585  - name: "shard-0"
586    config: "cluster1"
587    key_rate_limit:
588      limit_per_window: 100
589      window_secs: 60
590  - name: "shard-1"
591    config: "cluster2"
592    key_rate_limit:
593      limit_per_window: 200
594      window_secs: 120
595  - name: "shard-2"  # No rate limit (Primary variant)
596"#;
597
598        let def_config = vec![KafkaConfigParam {
599            name: "bootstrap.servers".to_owned(),
600            value: "primary:9092".to_owned(),
601        }];
602        let mut second_config = BTreeMap::new();
603        second_config.insert(
604            "cluster1".to_owned(),
605            vec![KafkaConfigParam {
606                name: "bootstrap.servers".to_owned(),
607                value: "cluster1:9092".to_owned(),
608            }],
609        );
610        second_config.insert(
611            "cluster2".to_owned(),
612            vec![KafkaConfigParam {
613                name: "bootstrap.servers".to_owned(),
614                value: "cluster2:9092".to_owned(),
615            }],
616        );
617
618        let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
619
620        let events_configs = topics
621            .events
622            .kafka_configs(&def_config, &second_config)
623            .expect("Kafka config for per-shard rate limits");
624
625        insta::assert_debug_snapshot!(events_configs, @r###"
626        KafkaTopicConfig(
627            [
628                KafkaParams {
629                    topic_name: "shard-0",
630                    config_name: Some(
631                        "cluster1",
632                    ),
633                    params: [
634                        KafkaConfigParam {
635                            name: "bootstrap.servers",
636                            value: "cluster1:9092",
637                        },
638                    ],
639                    key_rate_limit: Some(
640                        KeyRateLimit {
641                            limit_per_window: 100,
642                            window_secs: 60,
643                        },
644                    ),
645                },
646                KafkaParams {
647                    topic_name: "shard-1",
648                    config_name: Some(
649                        "cluster2",
650                    ),
651                    params: [
652                        KafkaConfigParam {
653                            name: "bootstrap.servers",
654                            value: "cluster2:9092",
655                        },
656                    ],
657                    key_rate_limit: Some(
658                        KeyRateLimit {
659                            limit_per_window: 200,
660                            window_secs: 120,
661                        },
662                    ),
663                },
664                KafkaParams {
665                    topic_name: "shard-2",
666                    config_name: None,
667                    params: [
668                        KafkaConfigParam {
669                            name: "bootstrap.servers",
670                            value: "primary:9092",
671                        },
672                    ],
673                    key_rate_limit: None,
674                },
675            ],
676        )
677        "###);
678    }
679}