1use std::collections::BTreeMap;
4
5use serde::{Deserialize, Serialize, de};
6use thiserror::Error;
7
8#[derive(Error, Debug)]
10pub enum ConfigError {
11 #[error("unknown kafka config name")]
13 UnknownKafkaConfigName,
14 #[error("invalid kafka shard configuration: must have shard with index 0")]
16 InvalidShard,
17}
18
19#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
21pub enum KafkaTopic {
22 Events,
24 Attachments,
26 Transactions,
28 Outcomes,
30 OutcomesBilling,
32 MetricsSessions,
34 MetricsGeneric,
36 Profiles,
38 ReplayEvents,
40 ReplayRecordings,
42 Monitors,
44 Spans,
46 Feedback,
48 Items,
50}
51
52impl KafkaTopic {
53 pub fn iter() -> std::slice::Iter<'static, Self> {
56 use KafkaTopic::*;
57 static TOPICS: [KafkaTopic; 14] = [
58 Events,
59 Attachments,
60 Transactions,
61 Outcomes,
62 OutcomesBilling,
63 MetricsSessions,
64 MetricsGeneric,
65 Profiles,
66 ReplayEvents,
67 ReplayRecordings,
68 Monitors,
69 Spans,
70 Feedback,
71 Items,
72 ];
73 TOPICS.iter()
74 }
75}
76
77macro_rules! define_topic_assignments {
78 ($($field_name:ident : ($kafka_topic:path, $default_topic:literal, $doc:literal)),* $(,)?) => {
79 #[derive(Deserialize, Serialize, Debug)]
81 #[serde(default)]
82 pub struct TopicAssignments {
83 $(
84 #[serde(alias = $default_topic)]
85 #[doc = $doc]
86 pub $field_name: TopicAssignment,
87 )*
88
89 #[serde(flatten, skip_serializing)]
91 pub unused: Unused,
92 }
93
94 impl TopicAssignments{
95 #[must_use]
97 pub fn get(&self, kafka_topic: KafkaTopic) -> &TopicAssignment {
98 match kafka_topic {
99 $(
100 $kafka_topic => &self.$field_name,
101 )*
102 }
103 }
104 }
105
106 impl KafkaTopic {
107 pub fn logical_topic_name(&self) -> &'static str {
109 match self {
110 $(
111 $kafka_topic => $default_topic,
112 )*
113 }
114 }
115 }
116
117 impl Default for TopicAssignments {
118 fn default() -> Self {
119 Self {
120 $(
121 $field_name: $default_topic.to_owned().into(),
122 )*
123 unused: Default::default()
124 }
125 }
126 }
127 };
128}
129
130define_topic_assignments! {
134 events: (KafkaTopic::Events, "ingest-events", "Simple events topic name."),
135 attachments: (KafkaTopic::Attachments, "ingest-attachments", "Events with attachments topic name."),
136 transactions: (KafkaTopic::Transactions, "ingest-transactions", "Transaction events topic name."),
137 outcomes: (KafkaTopic::Outcomes, "outcomes", "Outcomes topic name."),
138 outcomes_billing: (KafkaTopic::OutcomesBilling, "outcomes-billing", "Outcomes topic name for billing critical outcomes."),
139 metrics_sessions: (KafkaTopic::MetricsSessions, "ingest-metrics", "Topic name for metrics extracted from sessions, aka release health."),
140 metrics_generic: (KafkaTopic::MetricsGeneric, "ingest-performance-metrics", "Topic name for all other kinds of metrics."),
141 profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"),
142 replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."),
143 replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."),
144 monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."),
145 spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."),
146 feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."),
147 items: (KafkaTopic::Items, "snuba-items", "Items topic."),
148}
149
150#[derive(Debug, Default)]
152pub struct Unused(Vec<String>);
153
154impl Unused {
155 pub fn names(&self) -> &[String] {
157 &self.0
158 }
159}
160
161impl<'de> de::Deserialize<'de> for Unused {
162 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
163 where
164 D: de::Deserializer<'de>,
165 {
166 let topics = BTreeMap::<String, de::IgnoredAny>::deserialize(deserializer)?;
167 Ok(Self(topics.into_keys().collect()))
168 }
169}
170
171#[derive(Debug, Serialize)]
179pub struct TopicAssignment(Vec<TopicConfig>);
180
181impl<'de> de::Deserialize<'de> for TopicAssignment {
182 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
183 where
184 D: de::Deserializer<'de>,
185 {
186 #[derive(Deserialize, Debug)]
187 #[serde(untagged)]
188 enum Inner {
189 ShardedPrimary(Vec<String>),
191 ShardedSecondary(Vec<TopicConfig>),
192 Primary(String),
193 Secondary(TopicConfig),
194 }
195
196 let configs = match Inner::deserialize(deserializer)? {
197 Inner::Primary(topic_name) => vec![topic_name.into()],
198 Inner::Secondary(config) => vec![config],
199 Inner::ShardedPrimary(topic_names) => topic_names.into_iter().map(From::from).collect(),
200 Inner::ShardedSecondary(configs) => configs,
201 };
202
203 if configs.is_empty() {
204 return Err(de::Error::custom(
205 "topic assignment must have at least one shard",
206 ));
207 }
208
209 Ok(Self(configs))
210 }
211}
212
213#[derive(Debug, Deserialize, Serialize)]
215pub struct TopicConfig {
216 #[serde(rename = "name")]
218 topic_name: String,
219 #[serde(rename = "config", skip_serializing_if = "Option::is_none")]
223 kafka_config_name: Option<String>,
224 #[serde(default, skip_serializing_if = "Option::is_none")]
226 key_rate_limit: Option<KeyRateLimit>,
227}
228
229impl From<String> for TopicConfig {
230 fn from(topic_name: String) -> Self {
231 Self {
232 topic_name,
233 kafka_config_name: None,
234 key_rate_limit: None,
235 }
236 }
237}
238
239#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
241pub struct KeyRateLimit {
242 pub limit_per_window: u64,
244
245 pub window_secs: u64,
249}
250
251#[derive(Debug)]
256pub struct KafkaTopicConfig<'a>(Vec<KafkaParams<'a>>);
257
258impl<'a> KafkaTopicConfig<'a> {
259 pub fn topics(&self) -> &[KafkaParams<'a>] {
261 &self.0
262 }
263}
264
265#[derive(Debug)]
267pub struct KafkaParams<'a> {
268 pub topic_name: String,
270 pub config_name: Option<&'a str>,
272 pub params: &'a [KafkaConfigParam],
274 pub key_rate_limit: Option<KeyRateLimit>,
276}
277
278impl From<String> for TopicAssignment {
279 fn from(topic_name: String) -> Self {
280 Self(vec![topic_name.into()])
281 }
282}
283
284impl TopicAssignment {
285 pub fn kafka_configs<'a>(
290 &'a self,
291 default_config: &'a Vec<KafkaConfigParam>,
292 secondary_configs: &'a BTreeMap<String, Vec<KafkaConfigParam>>,
293 ) -> Result<KafkaTopicConfig<'a>, ConfigError> {
294 let configs = self
295 .0
296 .iter()
297 .map(|tc| {
298 Ok(KafkaParams {
299 topic_name: tc.topic_name.clone(),
300 config_name: tc.kafka_config_name.as_deref(),
301 params: match &tc.kafka_config_name {
302 Some(config) => secondary_configs
303 .get(config)
304 .ok_or(ConfigError::UnknownKafkaConfigName)?,
305 None => default_config.as_slice(),
306 },
307 key_rate_limit: tc.key_rate_limit,
308 })
309 })
310 .collect::<Result<_, _>>()?;
311
312 Ok(KafkaTopicConfig(configs))
313 }
314}
315
316#[derive(Debug, Deserialize, Serialize)]
318pub struct KafkaConfigParam {
319 pub name: String,
321 pub value: String,
323}
324
325#[cfg(test)]
326mod tests {
327
328 use super::*;
329
330 #[test]
331 fn test_kafka_config() {
332 let yaml = r#"
333ingest-events: "ingest-events-kafka-topic"
334profiles:
335 name: "ingest-profiles"
336 config: "profiles"
337ingest-metrics: "ingest-metrics-3"
338transactions: "ingest-transactions-kafka-topic"
339"#;
340
341 let mut second_config = BTreeMap::new();
342 second_config.insert(
343 "profiles".to_owned(),
344 vec![KafkaConfigParam {
345 name: "test".to_owned(),
346 value: "test-value".to_owned(),
347 }],
348 );
349
350 let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
351 insta::assert_debug_snapshot!(topics, @r###"
352 TopicAssignments {
353 events: TopicAssignment(
354 [
355 TopicConfig {
356 topic_name: "ingest-events-kafka-topic",
357 kafka_config_name: None,
358 key_rate_limit: None,
359 },
360 ],
361 ),
362 attachments: TopicAssignment(
363 [
364 TopicConfig {
365 topic_name: "ingest-attachments",
366 kafka_config_name: None,
367 key_rate_limit: None,
368 },
369 ],
370 ),
371 transactions: TopicAssignment(
372 [
373 TopicConfig {
374 topic_name: "ingest-transactions-kafka-topic",
375 kafka_config_name: None,
376 key_rate_limit: None,
377 },
378 ],
379 ),
380 outcomes: TopicAssignment(
381 [
382 TopicConfig {
383 topic_name: "outcomes",
384 kafka_config_name: None,
385 key_rate_limit: None,
386 },
387 ],
388 ),
389 outcomes_billing: TopicAssignment(
390 [
391 TopicConfig {
392 topic_name: "outcomes-billing",
393 kafka_config_name: None,
394 key_rate_limit: None,
395 },
396 ],
397 ),
398 metrics_sessions: TopicAssignment(
399 [
400 TopicConfig {
401 topic_name: "ingest-metrics-3",
402 kafka_config_name: None,
403 key_rate_limit: None,
404 },
405 ],
406 ),
407 metrics_generic: TopicAssignment(
408 [
409 TopicConfig {
410 topic_name: "ingest-performance-metrics",
411 kafka_config_name: None,
412 key_rate_limit: None,
413 },
414 ],
415 ),
416 profiles: TopicAssignment(
417 [
418 TopicConfig {
419 topic_name: "ingest-profiles",
420 kafka_config_name: Some(
421 "profiles",
422 ),
423 key_rate_limit: None,
424 },
425 ],
426 ),
427 replay_events: TopicAssignment(
428 [
429 TopicConfig {
430 topic_name: "ingest-replay-events",
431 kafka_config_name: None,
432 key_rate_limit: None,
433 },
434 ],
435 ),
436 replay_recordings: TopicAssignment(
437 [
438 TopicConfig {
439 topic_name: "ingest-replay-recordings",
440 kafka_config_name: None,
441 key_rate_limit: None,
442 },
443 ],
444 ),
445 monitors: TopicAssignment(
446 [
447 TopicConfig {
448 topic_name: "ingest-monitors",
449 kafka_config_name: None,
450 key_rate_limit: None,
451 },
452 ],
453 ),
454 spans: TopicAssignment(
455 [
456 TopicConfig {
457 topic_name: "snuba-spans",
458 kafka_config_name: None,
459 key_rate_limit: None,
460 },
461 ],
462 ),
463 feedback: TopicAssignment(
464 [
465 TopicConfig {
466 topic_name: "ingest-feedback-events",
467 kafka_config_name: None,
468 key_rate_limit: None,
469 },
470 ],
471 ),
472 items: TopicAssignment(
473 [
474 TopicConfig {
475 topic_name: "snuba-items",
476 kafka_config_name: None,
477 key_rate_limit: None,
478 },
479 ],
480 ),
481 unused: Unused(
482 [],
483 ),
484 }
485 "###);
486 }
487
488 #[test]
489 fn test_default_topic_is_valid() {
490 let currrently_undefined_topics = [
492 "ingest-attachments",
493 "ingest-transactions",
494 "profiles",
495 "ingest-monitors",
496 ];
497
498 for topic in KafkaTopic::iter() {
499 let name = topic.logical_topic_name();
500 if !currrently_undefined_topics.contains(&name) {
501 assert!(sentry_kafka_schemas::get_schema(name, None).is_ok());
502 }
503 }
504 }
505
506 #[test]
507 fn test_sharded_kafka_config() {
508 let yaml = r#"
509events: ["ingest-events-1", "ingest-events-2"]
510profiles:
511 - name: "ingest-profiles-1"
512 config: "profiles"
513 - name: "ingest-profiles-2"
514 config: "profiles"
515"#;
516 let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
517
518 let def_config = vec![KafkaConfigParam {
519 name: "test".to_owned(),
520 value: "test-value".to_owned(),
521 }];
522 let mut second_config = BTreeMap::new();
523 second_config.insert(
524 "profiles".to_owned(),
525 vec![KafkaConfigParam {
526 name: "test".to_owned(),
527 value: "test-value".to_owned(),
528 }],
529 );
530
531 let events_configs = topics
532 .events
533 .kafka_configs(&def_config, &second_config)
534 .expect("Kafka config for sharded events topic");
535
536 insta::assert_debug_snapshot!(events_configs, @r###"
537 KafkaTopicConfig(
538 [
539 KafkaParams {
540 topic_name: "ingest-events-1",
541 config_name: None,
542 params: [
543 KafkaConfigParam {
544 name: "test",
545 value: "test-value",
546 },
547 ],
548 key_rate_limit: None,
549 },
550 KafkaParams {
551 topic_name: "ingest-events-2",
552 config_name: None,
553 params: [
554 KafkaConfigParam {
555 name: "test",
556 value: "test-value",
557 },
558 ],
559 key_rate_limit: None,
560 },
561 ],
562 )
563 "###);
564
565 let profiles_configs = topics
566 .profiles
567 .kafka_configs(&def_config, &second_config)
568 .expect("Kafka config for sharded profiles topic");
569
570 insta::assert_debug_snapshot!(profiles_configs, @r###"
571 KafkaTopicConfig(
572 [
573 KafkaParams {
574 topic_name: "ingest-profiles-1",
575 config_name: Some(
576 "profiles",
577 ),
578 params: [
579 KafkaConfigParam {
580 name: "test",
581 value: "test-value",
582 },
583 ],
584 key_rate_limit: None,
585 },
586 KafkaParams {
587 topic_name: "ingest-profiles-2",
588 config_name: Some(
589 "profiles",
590 ),
591 params: [
592 KafkaConfigParam {
593 name: "test",
594 value: "test-value",
595 },
596 ],
597 key_rate_limit: None,
598 },
599 ],
600 )
601 "###);
602 }
603
604 #[test]
605 fn test_per_shard_rate_limits() {
606 let yaml = r#"
607events:
608 - name: "shard-0"
609 config: "cluster1"
610 key_rate_limit:
611 limit_per_window: 100
612 window_secs: 60
613 - name: "shard-1"
614 config: "cluster2"
615 key_rate_limit:
616 limit_per_window: 200
617 window_secs: 120
618 - name: "shard-2" # No rate limit (Primary variant)
619"#;
620
621 let def_config = vec![KafkaConfigParam {
622 name: "bootstrap.servers".to_owned(),
623 value: "primary:9092".to_owned(),
624 }];
625 let mut second_config = BTreeMap::new();
626 second_config.insert(
627 "cluster1".to_owned(),
628 vec![KafkaConfigParam {
629 name: "bootstrap.servers".to_owned(),
630 value: "cluster1:9092".to_owned(),
631 }],
632 );
633 second_config.insert(
634 "cluster2".to_owned(),
635 vec![KafkaConfigParam {
636 name: "bootstrap.servers".to_owned(),
637 value: "cluster2:9092".to_owned(),
638 }],
639 );
640
641 let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
642
643 let events_configs = topics
644 .events
645 .kafka_configs(&def_config, &second_config)
646 .expect("Kafka config for per-shard rate limits");
647
648 insta::assert_debug_snapshot!(events_configs, @r###"
649 KafkaTopicConfig(
650 [
651 KafkaParams {
652 topic_name: "shard-0",
653 config_name: Some(
654 "cluster1",
655 ),
656 params: [
657 KafkaConfigParam {
658 name: "bootstrap.servers",
659 value: "cluster1:9092",
660 },
661 ],
662 key_rate_limit: Some(
663 KeyRateLimit {
664 limit_per_window: 100,
665 window_secs: 60,
666 },
667 ),
668 },
669 KafkaParams {
670 topic_name: "shard-1",
671 config_name: Some(
672 "cluster2",
673 ),
674 params: [
675 KafkaConfigParam {
676 name: "bootstrap.servers",
677 value: "cluster2:9092",
678 },
679 ],
680 key_rate_limit: Some(
681 KeyRateLimit {
682 limit_per_window: 200,
683 window_secs: 120,
684 },
685 ),
686 },
687 KafkaParams {
688 topic_name: "shard-2",
689 config_name: None,
690 params: [
691 KafkaConfigParam {
692 name: "bootstrap.servers",
693 value: "primary:9092",
694 },
695 ],
696 key_rate_limit: None,
697 },
698 ],
699 )
700 "###);
701 }
702}