1use std::collections::BTreeMap;
4
5use serde::{Deserialize, Serialize, de};
6use thiserror::Error;
7
8#[derive(Error, Debug)]
10pub enum ConfigError {
11 #[error("unknown kafka config name")]
13 UnknownKafkaConfigName,
14 #[error("invalid kafka shard configuration: must have shard with index 0")]
16 InvalidShard,
17}
18
19#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
21pub enum KafkaTopic {
22 Events,
24 Attachments,
26 Transactions,
28 Outcomes,
30 OutcomesBilling,
32 MetricsSessions,
34 MetricsGeneric,
36 Profiles,
38 ReplayEvents,
40 ReplayRecordings,
42 Monitors,
44 Spans,
46 Feedback,
48 Items,
50}
51
52impl KafkaTopic {
53 pub fn iter() -> std::slice::Iter<'static, Self> {
56 use KafkaTopic::*;
57 static TOPICS: [KafkaTopic; 14] = [
58 Events,
59 Attachments,
60 Transactions,
61 Outcomes,
62 OutcomesBilling,
63 MetricsSessions,
64 MetricsGeneric,
65 Profiles,
66 ReplayEvents,
67 ReplayRecordings,
68 Monitors,
69 Spans,
70 Feedback,
71 Items,
72 ];
73 TOPICS.iter()
74 }
75}
76
77macro_rules! define_topic_assignments {
78 ($($field_name:ident : ($kafka_topic:path, $default_topic:literal, $doc:literal)),* $(,)?) => {
79 #[derive(Deserialize, Serialize, Debug)]
81 #[serde(default)]
82 pub struct TopicAssignments {
83 $(
84 #[serde(alias = $default_topic)]
85 #[doc = $doc]
86 pub $field_name: TopicAssignment,
87 )*
88
89 #[serde(flatten, skip_serializing)]
91 pub unused: Unused,
92 }
93
94 impl TopicAssignments{
95 #[must_use]
97 pub fn get(&self, kafka_topic: KafkaTopic) -> &TopicAssignment {
98 match kafka_topic {
99 $(
100 $kafka_topic => &self.$field_name,
101 )*
102 }
103 }
104 }
105
106 impl KafkaTopic {
107 pub fn logical_topic_name(&self) -> &'static str {
109 match self {
110 $(
111 $kafka_topic => $default_topic,
112 )*
113 }
114 }
115 }
116
117 impl Default for TopicAssignments {
118 fn default() -> Self {
119 Self {
120 $(
121 $field_name: $default_topic.to_owned().into(),
122 )*
123 unused: Default::default()
124 }
125 }
126 }
127 };
128}
129
130define_topic_assignments! {
134 events: (KafkaTopic::Events, "ingest-events", "Simple events topic name."),
135 attachments: (KafkaTopic::Attachments, "ingest-attachments", "Events with attachments topic name."),
136 transactions: (KafkaTopic::Transactions, "ingest-transactions", "Transaction events topic name."),
137 outcomes: (KafkaTopic::Outcomes, "outcomes", "Outcomes topic name."),
138 outcomes_billing: (KafkaTopic::OutcomesBilling, "outcomes-billing", "Outcomes topic name for billing critical outcomes."),
139 metrics_sessions: (KafkaTopic::MetricsSessions, "ingest-metrics", "Topic name for metrics extracted from sessions, aka release health."),
140 metrics_generic: (KafkaTopic::MetricsGeneric, "ingest-performance-metrics", "Topic name for all other kinds of metrics."),
141 profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"),
142 replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."),
143 replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."),
144 monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."),
145 spans: (KafkaTopic::Spans, "ingest-spans", "Standalone spans without a transaction."),
146 feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."),
147 items: (KafkaTopic::Items, "snuba-items", "Items topic."),
148}
149
150#[derive(Debug, Default)]
152pub struct Unused(Vec<String>);
153
154impl Unused {
155 pub fn names(&self) -> &[String] {
157 &self.0
158 }
159}
160
161impl<'de> de::Deserialize<'de> for Unused {
162 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
163 where
164 D: de::Deserializer<'de>,
165 {
166 let topics = BTreeMap::<String, de::IgnoredAny>::deserialize(deserializer)?;
167 Ok(Self(topics.into_keys().collect()))
168 }
169}
170
171#[derive(Debug, Serialize)]
179pub struct TopicAssignment(Vec<TopicConfig>);
180
181impl<'de> de::Deserialize<'de> for TopicAssignment {
182 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
183 where
184 D: de::Deserializer<'de>,
185 {
186 #[derive(Deserialize, Debug)]
187 #[serde(untagged)]
188 enum Inner {
189 ShardedPrimary(Vec<String>),
191 ShardedSecondary(Vec<TopicConfig>),
192 Primary(String),
193 Secondary(TopicConfig),
194 }
195
196 let configs = match Inner::deserialize(deserializer)? {
197 Inner::Primary(topic_name) => vec![topic_name.into()],
198 Inner::Secondary(config) => vec![config],
199 Inner::ShardedPrimary(topic_names) => topic_names.into_iter().map(From::from).collect(),
200 Inner::ShardedSecondary(configs) => configs,
201 };
202
203 if configs.is_empty() {
204 return Err(de::Error::custom(
205 "topic assignment must have at least one shard",
206 ));
207 }
208
209 Ok(Self(configs))
210 }
211}
212
213#[derive(Debug, Deserialize, Serialize)]
215pub struct TopicConfig {
216 #[serde(rename = "name")]
218 topic_name: String,
219 #[serde(rename = "config", skip_serializing_if = "Option::is_none")]
223 kafka_config_name: Option<String>,
224 #[serde(default, skip_serializing_if = "Option::is_none")]
226 key_rate_limit: Option<KeyRateLimit>,
227}
228
229impl From<String> for TopicConfig {
230 fn from(topic_name: String) -> Self {
231 Self {
232 topic_name,
233 kafka_config_name: None,
234 key_rate_limit: None,
235 }
236 }
237}
238
239#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
241pub struct KeyRateLimit {
242 pub limit_per_window: u64,
244
245 pub window_secs: u64,
249}
250
251#[derive(Debug)]
256pub struct KafkaTopicConfig<'a>(Vec<KafkaParams<'a>>);
257
258impl<'a> KafkaTopicConfig<'a> {
259 pub fn topics(&self) -> &[KafkaParams<'a>] {
261 &self.0
262 }
263}
264
265#[derive(Debug)]
267pub struct KafkaParams<'a> {
268 pub topic_name: String,
270 pub config_name: Option<&'a str>,
272 pub params: &'a [KafkaConfigParam],
274 pub key_rate_limit: Option<KeyRateLimit>,
276}
277
278impl From<String> for TopicAssignment {
279 fn from(topic_name: String) -> Self {
280 Self(vec![topic_name.into()])
281 }
282}
283
284impl TopicAssignment {
285 pub fn kafka_configs<'a>(
290 &'a self,
291 default_config: &'a Vec<KafkaConfigParam>,
292 secondary_configs: &'a BTreeMap<String, Vec<KafkaConfigParam>>,
293 ) -> Result<KafkaTopicConfig<'a>, ConfigError> {
294 let configs = self
295 .0
296 .iter()
297 .map(|tc| {
298 Ok(KafkaParams {
299 topic_name: tc.topic_name.clone(),
300 config_name: tc.kafka_config_name.as_deref(),
301 params: match &tc.kafka_config_name {
302 Some(config) => secondary_configs
303 .get(config)
304 .ok_or(ConfigError::UnknownKafkaConfigName)?,
305 None => default_config.as_slice(),
306 },
307 key_rate_limit: tc.key_rate_limit,
308 })
309 })
310 .collect::<Result<_, _>>()?;
311
312 Ok(KafkaTopicConfig(configs))
313 }
314}
315
316#[derive(Debug, Deserialize, Serialize)]
318pub struct KafkaConfigParam {
319 pub name: String,
321 pub value: String,
323}
324
325#[cfg(test)]
326mod tests {
327
328 use super::*;
329
330 #[test]
331 fn test_kafka_config() {
332 let yaml = r#"
333ingest-events: "ingest-events-kafka-topic"
334profiles:
335 name: "ingest-profiles"
336 config: "profiles"
337ingest-metrics: "ingest-metrics-3"
338transactions: "ingest-transactions-kafka-topic"
339"#;
340
341 let mut second_config = BTreeMap::new();
342 second_config.insert(
343 "profiles".to_owned(),
344 vec![KafkaConfigParam {
345 name: "test".to_owned(),
346 value: "test-value".to_owned(),
347 }],
348 );
349
350 let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
351 insta::assert_debug_snapshot!(topics, @r###"
352 TopicAssignments {
353 events: TopicAssignment(
354 [
355 TopicConfig {
356 topic_name: "ingest-events-kafka-topic",
357 kafka_config_name: None,
358 key_rate_limit: None,
359 },
360 ],
361 ),
362 attachments: TopicAssignment(
363 [
364 TopicConfig {
365 topic_name: "ingest-attachments",
366 kafka_config_name: None,
367 key_rate_limit: None,
368 },
369 ],
370 ),
371 transactions: TopicAssignment(
372 [
373 TopicConfig {
374 topic_name: "ingest-transactions-kafka-topic",
375 kafka_config_name: None,
376 key_rate_limit: None,
377 },
378 ],
379 ),
380 outcomes: TopicAssignment(
381 [
382 TopicConfig {
383 topic_name: "outcomes",
384 kafka_config_name: None,
385 key_rate_limit: None,
386 },
387 ],
388 ),
389 outcomes_billing: TopicAssignment(
390 [
391 TopicConfig {
392 topic_name: "outcomes-billing",
393 kafka_config_name: None,
394 key_rate_limit: None,
395 },
396 ],
397 ),
398 metrics_sessions: TopicAssignment(
399 [
400 TopicConfig {
401 topic_name: "ingest-metrics-3",
402 kafka_config_name: None,
403 key_rate_limit: None,
404 },
405 ],
406 ),
407 metrics_generic: TopicAssignment(
408 [
409 TopicConfig {
410 topic_name: "ingest-performance-metrics",
411 kafka_config_name: None,
412 key_rate_limit: None,
413 },
414 ],
415 ),
416 profiles: TopicAssignment(
417 [
418 TopicConfig {
419 topic_name: "ingest-profiles",
420 kafka_config_name: Some(
421 "profiles",
422 ),
423 key_rate_limit: None,
424 },
425 ],
426 ),
427 replay_events: TopicAssignment(
428 [
429 TopicConfig {
430 topic_name: "ingest-replay-events",
431 kafka_config_name: None,
432 key_rate_limit: None,
433 },
434 ],
435 ),
436 replay_recordings: TopicAssignment(
437 [
438 TopicConfig {
439 topic_name: "ingest-replay-recordings",
440 kafka_config_name: None,
441 key_rate_limit: None,
442 },
443 ],
444 ),
445 monitors: TopicAssignment(
446 [
447 TopicConfig {
448 topic_name: "ingest-monitors",
449 kafka_config_name: None,
450 key_rate_limit: None,
451 },
452 ],
453 ),
454 spans: TopicAssignment(
455 [
456 TopicConfig {
457 topic_name: "ingest-spans",
458 kafka_config_name: None,
459 key_rate_limit: None,
460 },
461 ],
462 ),
463 feedback: TopicAssignment(
464 [
465 TopicConfig {
466 topic_name: "ingest-feedback-events",
467 kafka_config_name: None,
468 key_rate_limit: None,
469 },
470 ],
471 ),
472 items: TopicAssignment(
473 [
474 TopicConfig {
475 topic_name: "snuba-items",
476 kafka_config_name: None,
477 key_rate_limit: None,
478 },
479 ],
480 ),
481 unused: Unused(
482 [],
483 ),
484 }
485 "###);
486 }
487
488 #[test]
489 fn test_default_topic_is_valid() {
490 for topic in KafkaTopic::iter() {
491 let name = topic.logical_topic_name();
492 assert!(sentry_kafka_schemas::get_schema(name, None).is_ok());
493 }
494 }
495
496 #[test]
497 fn test_sharded_kafka_config() {
498 let yaml = r#"
499events: ["ingest-events-1", "ingest-events-2"]
500profiles:
501 - name: "ingest-profiles-1"
502 config: "profiles"
503 - name: "ingest-profiles-2"
504 config: "profiles"
505"#;
506 let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
507
508 let def_config = vec![KafkaConfigParam {
509 name: "test".to_owned(),
510 value: "test-value".to_owned(),
511 }];
512 let mut second_config = BTreeMap::new();
513 second_config.insert(
514 "profiles".to_owned(),
515 vec![KafkaConfigParam {
516 name: "test".to_owned(),
517 value: "test-value".to_owned(),
518 }],
519 );
520
521 let events_configs = topics
522 .events
523 .kafka_configs(&def_config, &second_config)
524 .expect("Kafka config for sharded events topic");
525
526 insta::assert_debug_snapshot!(events_configs, @r###"
527 KafkaTopicConfig(
528 [
529 KafkaParams {
530 topic_name: "ingest-events-1",
531 config_name: None,
532 params: [
533 KafkaConfigParam {
534 name: "test",
535 value: "test-value",
536 },
537 ],
538 key_rate_limit: None,
539 },
540 KafkaParams {
541 topic_name: "ingest-events-2",
542 config_name: None,
543 params: [
544 KafkaConfigParam {
545 name: "test",
546 value: "test-value",
547 },
548 ],
549 key_rate_limit: None,
550 },
551 ],
552 )
553 "###);
554
555 let profiles_configs = topics
556 .profiles
557 .kafka_configs(&def_config, &second_config)
558 .expect("Kafka config for sharded profiles topic");
559
560 insta::assert_debug_snapshot!(profiles_configs, @r###"
561 KafkaTopicConfig(
562 [
563 KafkaParams {
564 topic_name: "ingest-profiles-1",
565 config_name: Some(
566 "profiles",
567 ),
568 params: [
569 KafkaConfigParam {
570 name: "test",
571 value: "test-value",
572 },
573 ],
574 key_rate_limit: None,
575 },
576 KafkaParams {
577 topic_name: "ingest-profiles-2",
578 config_name: Some(
579 "profiles",
580 ),
581 params: [
582 KafkaConfigParam {
583 name: "test",
584 value: "test-value",
585 },
586 ],
587 key_rate_limit: None,
588 },
589 ],
590 )
591 "###);
592 }
593
594 #[test]
595 fn test_per_shard_rate_limits() {
596 let yaml = r#"
597events:
598 - name: "shard-0"
599 config: "cluster1"
600 key_rate_limit:
601 limit_per_window: 100
602 window_secs: 60
603 - name: "shard-1"
604 config: "cluster2"
605 key_rate_limit:
606 limit_per_window: 200
607 window_secs: 120
608 - name: "shard-2" # No rate limit (Primary variant)
609"#;
610
611 let def_config = vec![KafkaConfigParam {
612 name: "bootstrap.servers".to_owned(),
613 value: "primary:9092".to_owned(),
614 }];
615 let mut second_config = BTreeMap::new();
616 second_config.insert(
617 "cluster1".to_owned(),
618 vec![KafkaConfigParam {
619 name: "bootstrap.servers".to_owned(),
620 value: "cluster1:9092".to_owned(),
621 }],
622 );
623 second_config.insert(
624 "cluster2".to_owned(),
625 vec![KafkaConfigParam {
626 name: "bootstrap.servers".to_owned(),
627 value: "cluster2:9092".to_owned(),
628 }],
629 );
630
631 let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
632
633 let events_configs = topics
634 .events
635 .kafka_configs(&def_config, &second_config)
636 .expect("Kafka config for per-shard rate limits");
637
638 insta::assert_debug_snapshot!(events_configs, @r###"
639 KafkaTopicConfig(
640 [
641 KafkaParams {
642 topic_name: "shard-0",
643 config_name: Some(
644 "cluster1",
645 ),
646 params: [
647 KafkaConfigParam {
648 name: "bootstrap.servers",
649 value: "cluster1:9092",
650 },
651 ],
652 key_rate_limit: Some(
653 KeyRateLimit {
654 limit_per_window: 100,
655 window_secs: 60,
656 },
657 ),
658 },
659 KafkaParams {
660 topic_name: "shard-1",
661 config_name: Some(
662 "cluster2",
663 ),
664 params: [
665 KafkaConfigParam {
666 name: "bootstrap.servers",
667 value: "cluster2:9092",
668 },
669 ],
670 key_rate_limit: Some(
671 KeyRateLimit {
672 limit_per_window: 200,
673 window_secs: 120,
674 },
675 ),
676 },
677 KafkaParams {
678 topic_name: "shard-2",
679 config_name: None,
680 params: [
681 KafkaConfigParam {
682 name: "bootstrap.servers",
683 value: "primary:9092",
684 },
685 ],
686 key_rate_limit: None,
687 },
688 ],
689 )
690 "###);
691 }
692}