1use std::collections::BTreeMap;
4
5use serde::{Deserialize, Serialize, de};
6use thiserror::Error;
7
8#[derive(Error, Debug)]
10pub enum ConfigError {
11 #[error("unknown kafka config name")]
13 UnknownKafkaConfigName,
14 #[error("invalid kafka shard configuration: must have shard with index 0")]
16 InvalidShard,
17}
18
19#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
21pub enum KafkaTopic {
22 Events,
24 Attachments,
26 Transactions,
28 Outcomes,
30 OutcomesBilling,
32 MetricsSessions,
34 MetricsGeneric,
36 Profiles,
38 ReplayEvents,
40 ReplayRecordings,
42 Monitors,
44 OurLogs,
46 Spans,
48 Feedback,
50 Items,
52}
53
54impl KafkaTopic {
55 pub fn iter() -> std::slice::Iter<'static, Self> {
58 use KafkaTopic::*;
59 static TOPICS: [KafkaTopic; 15] = [
60 Events,
61 Attachments,
62 Transactions,
63 Outcomes,
64 OutcomesBilling,
65 MetricsSessions,
66 MetricsGeneric,
67 Profiles,
68 ReplayEvents,
69 ReplayRecordings,
70 Monitors,
71 OurLogs,
72 Spans,
73 Feedback,
74 Items,
75 ];
76 TOPICS.iter()
77 }
78}
79
80macro_rules! define_topic_assignments {
81 ($($field_name:ident : ($kafka_topic:path, $default_topic:literal, $doc:literal)),* $(,)?) => {
82 #[derive(Deserialize, Serialize, Debug)]
84 #[serde(default)]
85 pub struct TopicAssignments {
86 $(
87 #[serde(alias = $default_topic)]
88 #[doc = $doc]
89 pub $field_name: TopicAssignment,
90 )*
91
92 #[serde(flatten, skip_serializing)]
94 pub unused: Unused,
95 }
96
97 impl TopicAssignments{
98 #[must_use]
100 pub fn get(&self, kafka_topic: KafkaTopic) -> &TopicAssignment {
101 match kafka_topic {
102 $(
103 $kafka_topic => &self.$field_name,
104 )*
105 }
106 }
107 }
108
109 impl KafkaTopic {
110 pub fn logical_topic_name(&self) -> &'static str {
112 match self {
113 $(
114 $kafka_topic => $default_topic,
115 )*
116 }
117 }
118 }
119
120 impl Default for TopicAssignments {
121 fn default() -> Self {
122 Self {
123 $(
124 $field_name: $default_topic.to_owned().into(),
125 )*
126 unused: Default::default()
127 }
128 }
129 }
130 };
131}
132
133define_topic_assignments! {
137 events: (KafkaTopic::Events, "ingest-events", "Simple events topic name."),
138 attachments: (KafkaTopic::Attachments, "ingest-attachments", "Events with attachments topic name."),
139 transactions: (KafkaTopic::Transactions, "ingest-transactions", "Transaction events topic name."),
140 outcomes: (KafkaTopic::Outcomes, "outcomes", "Outcomes topic name."),
141 outcomes_billing: (KafkaTopic::OutcomesBilling, "outcomes-billing", "Outcomes topic name for billing critical outcomes."),
142 metrics_sessions: (KafkaTopic::MetricsSessions, "ingest-metrics", "Topic name for metrics extracted from sessions, aka release health."),
143 metrics_generic: (KafkaTopic::MetricsGeneric, "ingest-performance-metrics", "Topic name for all other kinds of metrics."),
144 profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"),
145 replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."),
146 replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."),
147 ourlogs: (KafkaTopic::OurLogs, "snuba-ourlogs", "Logs from our logs product."),
148 monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."),
149 spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."),
150 feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."),
151 items: (KafkaTopic::Items, "snuba-items", "Items topic."),
152}
153
154#[derive(Debug, Default)]
156pub struct Unused(Vec<String>);
157
158impl Unused {
159 pub fn names(&self) -> &[String] {
161 &self.0
162 }
163}
164
165impl<'de> de::Deserialize<'de> for Unused {
166 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
167 where
168 D: de::Deserializer<'de>,
169 {
170 let topics = BTreeMap::<String, de::IgnoredAny>::deserialize(deserializer)?;
171 Ok(Self(topics.into_keys().collect()))
172 }
173}
174
175#[derive(Debug, Serialize)]
183pub struct TopicAssignment(Vec<TopicConfig>);
184
185impl<'de> de::Deserialize<'de> for TopicAssignment {
186 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
187 where
188 D: de::Deserializer<'de>,
189 {
190 #[derive(Deserialize, Debug)]
191 #[serde(untagged)]
192 enum Inner {
193 ShardedPrimary(Vec<String>),
195 ShardedSecondary(Vec<TopicConfig>),
196 Primary(String),
197 Secondary(TopicConfig),
198 }
199
200 let configs = match Inner::deserialize(deserializer)? {
201 Inner::Primary(topic_name) => vec![topic_name.into()],
202 Inner::Secondary(config) => vec![config],
203 Inner::ShardedPrimary(topic_names) => topic_names.into_iter().map(From::from).collect(),
204 Inner::ShardedSecondary(configs) => configs,
205 };
206
207 if configs.is_empty() {
208 return Err(de::Error::custom(
209 "topic assignment must have at least one shard",
210 ));
211 }
212
213 Ok(Self(configs))
214 }
215}
216
217#[derive(Debug, Deserialize, Serialize)]
219pub struct TopicConfig {
220 #[serde(rename = "name")]
222 topic_name: String,
223 #[serde(rename = "config", skip_serializing_if = "Option::is_none")]
227 kafka_config_name: Option<String>,
228 #[serde(default, skip_serializing_if = "Option::is_none")]
230 key_rate_limit: Option<KeyRateLimit>,
231}
232
233impl From<String> for TopicConfig {
234 fn from(topic_name: String) -> Self {
235 Self {
236 topic_name,
237 kafka_config_name: None,
238 key_rate_limit: None,
239 }
240 }
241}
242
243#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
245pub struct KeyRateLimit {
246 pub limit_per_window: u64,
248
249 pub window_secs: u64,
253}
254
255#[derive(Debug)]
260pub struct KafkaTopicConfig<'a>(Vec<KafkaParams<'a>>);
261
262impl<'a> KafkaTopicConfig<'a> {
263 pub fn topics(&self) -> &[KafkaParams<'a>] {
265 &self.0
266 }
267}
268
269#[derive(Debug)]
271pub struct KafkaParams<'a> {
272 pub topic_name: String,
274 pub config_name: Option<&'a str>,
276 pub params: &'a [KafkaConfigParam],
278 pub key_rate_limit: Option<KeyRateLimit>,
280}
281
282impl From<String> for TopicAssignment {
283 fn from(topic_name: String) -> Self {
284 Self(vec![topic_name.into()])
285 }
286}
287
288impl TopicAssignment {
289 pub fn kafka_configs<'a>(
294 &'a self,
295 default_config: &'a Vec<KafkaConfigParam>,
296 secondary_configs: &'a BTreeMap<String, Vec<KafkaConfigParam>>,
297 ) -> Result<KafkaTopicConfig<'a>, ConfigError> {
298 let configs = self
299 .0
300 .iter()
301 .map(|tc| {
302 Ok(KafkaParams {
303 topic_name: tc.topic_name.clone(),
304 config_name: tc.kafka_config_name.as_deref(),
305 params: match &tc.kafka_config_name {
306 Some(config) => secondary_configs
307 .get(config)
308 .ok_or(ConfigError::UnknownKafkaConfigName)?,
309 None => default_config.as_slice(),
310 },
311 key_rate_limit: tc.key_rate_limit,
312 })
313 })
314 .collect::<Result<_, _>>()?;
315
316 Ok(KafkaTopicConfig(configs))
317 }
318}
319
320#[derive(Debug, Deserialize, Serialize)]
322pub struct KafkaConfigParam {
323 pub name: String,
325 pub value: String,
327}
328
329#[cfg(test)]
330mod tests {
331
332 use super::*;
333
334 #[test]
335 fn test_kafka_config() {
336 let yaml = r#"
337ingest-events: "ingest-events-kafka-topic"
338profiles:
339 name: "ingest-profiles"
340 config: "profiles"
341ingest-metrics: "ingest-metrics-3"
342transactions: "ingest-transactions-kafka-topic"
343"#;
344
345 let mut second_config = BTreeMap::new();
346 second_config.insert(
347 "profiles".to_owned(),
348 vec![KafkaConfigParam {
349 name: "test".to_owned(),
350 value: "test-value".to_owned(),
351 }],
352 );
353
354 let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
355 insta::assert_debug_snapshot!(topics, @r###"
356 TopicAssignments {
357 events: TopicAssignment(
358 [
359 TopicConfig {
360 topic_name: "ingest-events-kafka-topic",
361 kafka_config_name: None,
362 key_rate_limit: None,
363 },
364 ],
365 ),
366 attachments: TopicAssignment(
367 [
368 TopicConfig {
369 topic_name: "ingest-attachments",
370 kafka_config_name: None,
371 key_rate_limit: None,
372 },
373 ],
374 ),
375 transactions: TopicAssignment(
376 [
377 TopicConfig {
378 topic_name: "ingest-transactions-kafka-topic",
379 kafka_config_name: None,
380 key_rate_limit: None,
381 },
382 ],
383 ),
384 outcomes: TopicAssignment(
385 [
386 TopicConfig {
387 topic_name: "outcomes",
388 kafka_config_name: None,
389 key_rate_limit: None,
390 },
391 ],
392 ),
393 outcomes_billing: TopicAssignment(
394 [
395 TopicConfig {
396 topic_name: "outcomes-billing",
397 kafka_config_name: None,
398 key_rate_limit: None,
399 },
400 ],
401 ),
402 metrics_sessions: TopicAssignment(
403 [
404 TopicConfig {
405 topic_name: "ingest-metrics-3",
406 kafka_config_name: None,
407 key_rate_limit: None,
408 },
409 ],
410 ),
411 metrics_generic: TopicAssignment(
412 [
413 TopicConfig {
414 topic_name: "ingest-performance-metrics",
415 kafka_config_name: None,
416 key_rate_limit: None,
417 },
418 ],
419 ),
420 profiles: TopicAssignment(
421 [
422 TopicConfig {
423 topic_name: "ingest-profiles",
424 kafka_config_name: Some(
425 "profiles",
426 ),
427 key_rate_limit: None,
428 },
429 ],
430 ),
431 replay_events: TopicAssignment(
432 [
433 TopicConfig {
434 topic_name: "ingest-replay-events",
435 kafka_config_name: None,
436 key_rate_limit: None,
437 },
438 ],
439 ),
440 replay_recordings: TopicAssignment(
441 [
442 TopicConfig {
443 topic_name: "ingest-replay-recordings",
444 kafka_config_name: None,
445 key_rate_limit: None,
446 },
447 ],
448 ),
449 ourlogs: TopicAssignment(
450 [
451 TopicConfig {
452 topic_name: "snuba-ourlogs",
453 kafka_config_name: None,
454 key_rate_limit: None,
455 },
456 ],
457 ),
458 monitors: TopicAssignment(
459 [
460 TopicConfig {
461 topic_name: "ingest-monitors",
462 kafka_config_name: None,
463 key_rate_limit: None,
464 },
465 ],
466 ),
467 spans: TopicAssignment(
468 [
469 TopicConfig {
470 topic_name: "snuba-spans",
471 kafka_config_name: None,
472 key_rate_limit: None,
473 },
474 ],
475 ),
476 feedback: TopicAssignment(
477 [
478 TopicConfig {
479 topic_name: "ingest-feedback-events",
480 kafka_config_name: None,
481 key_rate_limit: None,
482 },
483 ],
484 ),
485 items: TopicAssignment(
486 [
487 TopicConfig {
488 topic_name: "snuba-items",
489 kafka_config_name: None,
490 key_rate_limit: None,
491 },
492 ],
493 ),
494 unused: Unused(
495 [],
496 ),
497 }
498 "###);
499 }
500
501 #[test]
502 fn test_default_topic_is_valid() {
503 let currrently_undefined_topics = [
505 "ingest-attachments",
506 "ingest-transactions",
507 "profiles",
508 "ingest-monitors",
509 ];
510
511 for topic in KafkaTopic::iter() {
512 let name = topic.logical_topic_name();
513 if !currrently_undefined_topics.contains(&name) {
514 assert!(sentry_kafka_schemas::get_schema(name, None).is_ok());
515 }
516 }
517 }
518
519 #[test]
520 fn test_sharded_kafka_config() {
521 let yaml = r#"
522events: ["ingest-events-1", "ingest-events-2"]
523profiles:
524 - name: "ingest-profiles-1"
525 config: "profiles"
526 - name: "ingest-profiles-2"
527 config: "profiles"
528"#;
529 let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
530
531 let def_config = vec![KafkaConfigParam {
532 name: "test".to_owned(),
533 value: "test-value".to_owned(),
534 }];
535 let mut second_config = BTreeMap::new();
536 second_config.insert(
537 "profiles".to_owned(),
538 vec![KafkaConfigParam {
539 name: "test".to_owned(),
540 value: "test-value".to_owned(),
541 }],
542 );
543
544 let events_configs = topics
545 .events
546 .kafka_configs(&def_config, &second_config)
547 .expect("Kafka config for sharded events topic");
548
549 insta::assert_debug_snapshot!(events_configs, @r###"
550 KafkaTopicConfig(
551 [
552 KafkaParams {
553 topic_name: "ingest-events-1",
554 config_name: None,
555 params: [
556 KafkaConfigParam {
557 name: "test",
558 value: "test-value",
559 },
560 ],
561 key_rate_limit: None,
562 },
563 KafkaParams {
564 topic_name: "ingest-events-2",
565 config_name: None,
566 params: [
567 KafkaConfigParam {
568 name: "test",
569 value: "test-value",
570 },
571 ],
572 key_rate_limit: None,
573 },
574 ],
575 )
576 "###);
577
578 let profiles_configs = topics
579 .profiles
580 .kafka_configs(&def_config, &second_config)
581 .expect("Kafka config for sharded profiles topic");
582
583 insta::assert_debug_snapshot!(profiles_configs, @r###"
584 KafkaTopicConfig(
585 [
586 KafkaParams {
587 topic_name: "ingest-profiles-1",
588 config_name: Some(
589 "profiles",
590 ),
591 params: [
592 KafkaConfigParam {
593 name: "test",
594 value: "test-value",
595 },
596 ],
597 key_rate_limit: None,
598 },
599 KafkaParams {
600 topic_name: "ingest-profiles-2",
601 config_name: Some(
602 "profiles",
603 ),
604 params: [
605 KafkaConfigParam {
606 name: "test",
607 value: "test-value",
608 },
609 ],
610 key_rate_limit: None,
611 },
612 ],
613 )
614 "###);
615 }
616
617 #[test]
618 fn test_per_shard_rate_limits() {
619 let yaml = r#"
620events:
621 - name: "shard-0"
622 config: "cluster1"
623 key_rate_limit:
624 limit_per_window: 100
625 window_secs: 60
626 - name: "shard-1"
627 config: "cluster2"
628 key_rate_limit:
629 limit_per_window: 200
630 window_secs: 120
631 - name: "shard-2" # No rate limit (Primary variant)
632"#;
633
634 let def_config = vec![KafkaConfigParam {
635 name: "bootstrap.servers".to_owned(),
636 value: "primary:9092".to_owned(),
637 }];
638 let mut second_config = BTreeMap::new();
639 second_config.insert(
640 "cluster1".to_owned(),
641 vec![KafkaConfigParam {
642 name: "bootstrap.servers".to_owned(),
643 value: "cluster1:9092".to_owned(),
644 }],
645 );
646 second_config.insert(
647 "cluster2".to_owned(),
648 vec![KafkaConfigParam {
649 name: "bootstrap.servers".to_owned(),
650 value: "cluster2:9092".to_owned(),
651 }],
652 );
653
654 let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
655
656 let events_configs = topics
657 .events
658 .kafka_configs(&def_config, &second_config)
659 .expect("Kafka config for per-shard rate limits");
660
661 insta::assert_debug_snapshot!(events_configs, @r###"
662 KafkaTopicConfig(
663 [
664 KafkaParams {
665 topic_name: "shard-0",
666 config_name: Some(
667 "cluster1",
668 ),
669 params: [
670 KafkaConfigParam {
671 name: "bootstrap.servers",
672 value: "cluster1:9092",
673 },
674 ],
675 key_rate_limit: Some(
676 KeyRateLimit {
677 limit_per_window: 100,
678 window_secs: 60,
679 },
680 ),
681 },
682 KafkaParams {
683 topic_name: "shard-1",
684 config_name: Some(
685 "cluster2",
686 ),
687 params: [
688 KafkaConfigParam {
689 name: "bootstrap.servers",
690 value: "cluster2:9092",
691 },
692 ],
693 key_rate_limit: Some(
694 KeyRateLimit {
695 limit_per_window: 200,
696 window_secs: 120,
697 },
698 ),
699 },
700 KafkaParams {
701 topic_name: "shard-2",
702 config_name: None,
703 params: [
704 KafkaConfigParam {
705 name: "bootstrap.servers",
706 value: "primary:9092",
707 },
708 ],
709 key_rate_limit: None,
710 },
711 ],
712 )
713 "###);
714 }
715}