1use std::collections::BTreeMap;
4
5use serde::{Deserialize, Serialize, de};
6use thiserror::Error;
7
8#[derive(Error, Debug)]
10pub enum ConfigError {
11 #[error("unknown kafka config name")]
13 UnknownKafkaConfigName,
14 #[error("invalid kafka shard configuration: must have shard with index 0")]
16 InvalidShard,
17}
18
19#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
21pub enum KafkaTopic {
22 Events,
24 Attachments,
26 Transactions,
28 Outcomes,
30 OutcomesBilling,
32 MetricsSessions,
34 MetricsGeneric,
36 Profiles,
38 ReplayRecordings,
40 Monitors,
42 Spans,
44 Feedback,
46 Items,
48}
49
50impl KafkaTopic {
51 pub fn iter() -> std::slice::Iter<'static, Self> {
54 use KafkaTopic::*;
55 static TOPICS: [KafkaTopic; 13] = [
56 Events,
57 Attachments,
58 Transactions,
59 Outcomes,
60 OutcomesBilling,
61 MetricsSessions,
62 MetricsGeneric,
63 Profiles,
64 ReplayRecordings,
65 Monitors,
66 Spans,
67 Feedback,
68 Items,
69 ];
70 TOPICS.iter()
71 }
72}
73
74macro_rules! define_topic_assignments {
75 ($($field_name:ident : ($kafka_topic:path, $default_topic:literal, $doc:literal)),* $(,)?) => {
76 #[derive(Deserialize, Serialize, Debug)]
78 #[serde(default)]
79 pub struct TopicAssignments {
80 $(
81 #[serde(alias = $default_topic)]
82 #[doc = $doc]
83 pub $field_name: TopicAssignment,
84 )*
85
86 #[serde(flatten, skip_serializing)]
88 pub unused: Unused,
89 }
90
91 impl TopicAssignments{
92 #[must_use]
94 pub fn get(&self, kafka_topic: KafkaTopic) -> &TopicAssignment {
95 match kafka_topic {
96 $(
97 $kafka_topic => &self.$field_name,
98 )*
99 }
100 }
101 }
102
103 impl KafkaTopic {
104 pub fn logical_topic_name(&self) -> &'static str {
106 match self {
107 $(
108 $kafka_topic => $default_topic,
109 )*
110 }
111 }
112 }
113
114 impl Default for TopicAssignments {
115 fn default() -> Self {
116 Self {
117 $(
118 $field_name: $default_topic.to_owned().into(),
119 )*
120 unused: Default::default()
121 }
122 }
123 }
124 };
125}
126
127define_topic_assignments! {
131 events: (KafkaTopic::Events, "ingest-events", "Simple events topic name."),
132 attachments: (KafkaTopic::Attachments, "ingest-attachments", "Events with attachments topic name."),
133 transactions: (KafkaTopic::Transactions, "ingest-transactions", "Transaction events topic name."),
134 outcomes: (KafkaTopic::Outcomes, "outcomes", "Outcomes topic name."),
135 outcomes_billing: (KafkaTopic::OutcomesBilling, "outcomes-billing", "Outcomes topic name for billing critical outcomes."),
136 metrics_sessions: (KafkaTopic::MetricsSessions, "ingest-metrics", "Topic name for metrics extracted from sessions, aka release health."),
137 metrics_generic: (KafkaTopic::MetricsGeneric, "ingest-performance-metrics", "Topic name for all other kinds of metrics."),
138 profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"),
139 replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."),
140 monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."),
141 spans: (KafkaTopic::Spans, "ingest-spans", "Standalone spans without a transaction."),
142 feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."),
143 items: (KafkaTopic::Items, "snuba-items", "Items topic."),
144}
145
146#[derive(Debug, Default)]
148pub struct Unused(Vec<String>);
149
150impl Unused {
151 pub fn names(&self) -> &[String] {
153 &self.0
154 }
155}
156
157impl<'de> de::Deserialize<'de> for Unused {
158 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
159 where
160 D: de::Deserializer<'de>,
161 {
162 let topics = BTreeMap::<String, de::IgnoredAny>::deserialize(deserializer)?;
163 Ok(Self(topics.into_keys().collect()))
164 }
165}
166
167#[derive(Debug, Serialize)]
175pub struct TopicAssignment(Vec<TopicConfig>);
176
177impl<'de> de::Deserialize<'de> for TopicAssignment {
178 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
179 where
180 D: de::Deserializer<'de>,
181 {
182 #[derive(Deserialize, Debug)]
183 #[serde(untagged)]
184 enum Inner {
185 ShardedPrimary(Vec<String>),
187 ShardedSecondary(Vec<TopicConfig>),
188 Primary(String),
189 Secondary(TopicConfig),
190 }
191
192 let configs = match Inner::deserialize(deserializer)? {
193 Inner::Primary(topic_name) => vec![topic_name.into()],
194 Inner::Secondary(config) => vec![config],
195 Inner::ShardedPrimary(topic_names) => topic_names.into_iter().map(From::from).collect(),
196 Inner::ShardedSecondary(configs) => configs,
197 };
198
199 if configs.is_empty() {
200 return Err(de::Error::custom(
201 "topic assignment must have at least one shard",
202 ));
203 }
204
205 Ok(Self(configs))
206 }
207}
208
209#[derive(Debug, Deserialize, Serialize)]
211pub struct TopicConfig {
212 #[serde(rename = "name")]
214 topic_name: String,
215 #[serde(rename = "config", skip_serializing_if = "Option::is_none")]
219 kafka_config_name: Option<String>,
220 #[serde(default, skip_serializing_if = "Option::is_none")]
222 key_rate_limit: Option<KeyRateLimit>,
223}
224
225impl From<String> for TopicConfig {
226 fn from(topic_name: String) -> Self {
227 Self {
228 topic_name,
229 kafka_config_name: None,
230 key_rate_limit: None,
231 }
232 }
233}
234
235#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
237pub struct KeyRateLimit {
238 pub limit_per_window: u64,
240
241 pub window_secs: u64,
245}
246
247#[derive(Debug)]
252pub struct KafkaTopicConfig<'a>(Vec<KafkaParams<'a>>);
253
254impl<'a> KafkaTopicConfig<'a> {
255 pub fn topics(&self) -> &[KafkaParams<'a>] {
257 &self.0
258 }
259}
260
261#[derive(Debug)]
263pub struct KafkaParams<'a> {
264 pub topic_name: String,
266 pub config_name: Option<&'a str>,
268 pub params: &'a [KafkaConfigParam],
270 pub key_rate_limit: Option<KeyRateLimit>,
272}
273
274impl From<String> for TopicAssignment {
275 fn from(topic_name: String) -> Self {
276 Self(vec![topic_name.into()])
277 }
278}
279
280impl TopicAssignment {
281 pub fn kafka_configs<'a>(
286 &'a self,
287 default_config: &'a Vec<KafkaConfigParam>,
288 secondary_configs: &'a BTreeMap<String, Vec<KafkaConfigParam>>,
289 ) -> Result<KafkaTopicConfig<'a>, ConfigError> {
290 let configs = self
291 .0
292 .iter()
293 .map(|tc| {
294 Ok(KafkaParams {
295 topic_name: tc.topic_name.clone(),
296 config_name: tc.kafka_config_name.as_deref(),
297 params: match &tc.kafka_config_name {
298 Some(config) => secondary_configs
299 .get(config)
300 .ok_or(ConfigError::UnknownKafkaConfigName)?,
301 None => default_config.as_slice(),
302 },
303 key_rate_limit: tc.key_rate_limit,
304 })
305 })
306 .collect::<Result<_, _>>()?;
307
308 Ok(KafkaTopicConfig(configs))
309 }
310}
311
312#[derive(Debug, Deserialize, Serialize)]
314pub struct KafkaConfigParam {
315 pub name: String,
317 pub value: String,
319}
320
321#[cfg(test)]
322mod tests {
323
324 use super::*;
325
326 #[test]
327 fn test_kafka_config() {
328 let yaml = r#"
329ingest-events: "ingest-events-kafka-topic"
330profiles:
331 name: "ingest-profiles"
332 config: "profiles"
333ingest-metrics: "ingest-metrics-3"
334transactions: "ingest-transactions-kafka-topic"
335"#;
336
337 let mut second_config = BTreeMap::new();
338 second_config.insert(
339 "profiles".to_owned(),
340 vec![KafkaConfigParam {
341 name: "test".to_owned(),
342 value: "test-value".to_owned(),
343 }],
344 );
345
346 let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
347 insta::assert_debug_snapshot!(topics, @r#"
348 TopicAssignments {
349 events: TopicAssignment(
350 [
351 TopicConfig {
352 topic_name: "ingest-events-kafka-topic",
353 kafka_config_name: None,
354 key_rate_limit: None,
355 },
356 ],
357 ),
358 attachments: TopicAssignment(
359 [
360 TopicConfig {
361 topic_name: "ingest-attachments",
362 kafka_config_name: None,
363 key_rate_limit: None,
364 },
365 ],
366 ),
367 transactions: TopicAssignment(
368 [
369 TopicConfig {
370 topic_name: "ingest-transactions-kafka-topic",
371 kafka_config_name: None,
372 key_rate_limit: None,
373 },
374 ],
375 ),
376 outcomes: TopicAssignment(
377 [
378 TopicConfig {
379 topic_name: "outcomes",
380 kafka_config_name: None,
381 key_rate_limit: None,
382 },
383 ],
384 ),
385 outcomes_billing: TopicAssignment(
386 [
387 TopicConfig {
388 topic_name: "outcomes-billing",
389 kafka_config_name: None,
390 key_rate_limit: None,
391 },
392 ],
393 ),
394 metrics_sessions: TopicAssignment(
395 [
396 TopicConfig {
397 topic_name: "ingest-metrics-3",
398 kafka_config_name: None,
399 key_rate_limit: None,
400 },
401 ],
402 ),
403 metrics_generic: TopicAssignment(
404 [
405 TopicConfig {
406 topic_name: "ingest-performance-metrics",
407 kafka_config_name: None,
408 key_rate_limit: None,
409 },
410 ],
411 ),
412 profiles: TopicAssignment(
413 [
414 TopicConfig {
415 topic_name: "ingest-profiles",
416 kafka_config_name: Some(
417 "profiles",
418 ),
419 key_rate_limit: None,
420 },
421 ],
422 ),
423 replay_recordings: TopicAssignment(
424 [
425 TopicConfig {
426 topic_name: "ingest-replay-recordings",
427 kafka_config_name: None,
428 key_rate_limit: None,
429 },
430 ],
431 ),
432 monitors: TopicAssignment(
433 [
434 TopicConfig {
435 topic_name: "ingest-monitors",
436 kafka_config_name: None,
437 key_rate_limit: None,
438 },
439 ],
440 ),
441 spans: TopicAssignment(
442 [
443 TopicConfig {
444 topic_name: "ingest-spans",
445 kafka_config_name: None,
446 key_rate_limit: None,
447 },
448 ],
449 ),
450 feedback: TopicAssignment(
451 [
452 TopicConfig {
453 topic_name: "ingest-feedback-events",
454 kafka_config_name: None,
455 key_rate_limit: None,
456 },
457 ],
458 ),
459 items: TopicAssignment(
460 [
461 TopicConfig {
462 topic_name: "snuba-items",
463 kafka_config_name: None,
464 key_rate_limit: None,
465 },
466 ],
467 ),
468 unused: Unused(
469 [],
470 ),
471 }
472 "#);
473 }
474
475 #[test]
476 fn test_default_topic_is_valid() {
477 for topic in KafkaTopic::iter() {
478 let name = topic.logical_topic_name();
479 assert!(sentry_kafka_schemas::get_schema(name, None).is_ok());
480 }
481 }
482
483 #[test]
484 fn test_sharded_kafka_config() {
485 let yaml = r#"
486events: ["ingest-events-1", "ingest-events-2"]
487profiles:
488 - name: "ingest-profiles-1"
489 config: "profiles"
490 - name: "ingest-profiles-2"
491 config: "profiles"
492"#;
493 let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
494
495 let def_config = vec![KafkaConfigParam {
496 name: "test".to_owned(),
497 value: "test-value".to_owned(),
498 }];
499 let mut second_config = BTreeMap::new();
500 second_config.insert(
501 "profiles".to_owned(),
502 vec![KafkaConfigParam {
503 name: "test".to_owned(),
504 value: "test-value".to_owned(),
505 }],
506 );
507
508 let events_configs = topics
509 .events
510 .kafka_configs(&def_config, &second_config)
511 .expect("Kafka config for sharded events topic");
512
513 insta::assert_debug_snapshot!(events_configs, @r###"
514 KafkaTopicConfig(
515 [
516 KafkaParams {
517 topic_name: "ingest-events-1",
518 config_name: None,
519 params: [
520 KafkaConfigParam {
521 name: "test",
522 value: "test-value",
523 },
524 ],
525 key_rate_limit: None,
526 },
527 KafkaParams {
528 topic_name: "ingest-events-2",
529 config_name: None,
530 params: [
531 KafkaConfigParam {
532 name: "test",
533 value: "test-value",
534 },
535 ],
536 key_rate_limit: None,
537 },
538 ],
539 )
540 "###);
541
542 let profiles_configs = topics
543 .profiles
544 .kafka_configs(&def_config, &second_config)
545 .expect("Kafka config for sharded profiles topic");
546
547 insta::assert_debug_snapshot!(profiles_configs, @r###"
548 KafkaTopicConfig(
549 [
550 KafkaParams {
551 topic_name: "ingest-profiles-1",
552 config_name: Some(
553 "profiles",
554 ),
555 params: [
556 KafkaConfigParam {
557 name: "test",
558 value: "test-value",
559 },
560 ],
561 key_rate_limit: None,
562 },
563 KafkaParams {
564 topic_name: "ingest-profiles-2",
565 config_name: Some(
566 "profiles",
567 ),
568 params: [
569 KafkaConfigParam {
570 name: "test",
571 value: "test-value",
572 },
573 ],
574 key_rate_limit: None,
575 },
576 ],
577 )
578 "###);
579 }
580
581 #[test]
582 fn test_per_shard_rate_limits() {
583 let yaml = r#"
584events:
585 - name: "shard-0"
586 config: "cluster1"
587 key_rate_limit:
588 limit_per_window: 100
589 window_secs: 60
590 - name: "shard-1"
591 config: "cluster2"
592 key_rate_limit:
593 limit_per_window: 200
594 window_secs: 120
595 - name: "shard-2" # No rate limit (Primary variant)
596"#;
597
598 let def_config = vec![KafkaConfigParam {
599 name: "bootstrap.servers".to_owned(),
600 value: "primary:9092".to_owned(),
601 }];
602 let mut second_config = BTreeMap::new();
603 second_config.insert(
604 "cluster1".to_owned(),
605 vec![KafkaConfigParam {
606 name: "bootstrap.servers".to_owned(),
607 value: "cluster1:9092".to_owned(),
608 }],
609 );
610 second_config.insert(
611 "cluster2".to_owned(),
612 vec![KafkaConfigParam {
613 name: "bootstrap.servers".to_owned(),
614 value: "cluster2:9092".to_owned(),
615 }],
616 );
617
618 let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
619
620 let events_configs = topics
621 .events
622 .kafka_configs(&def_config, &second_config)
623 .expect("Kafka config for per-shard rate limits");
624
625 insta::assert_debug_snapshot!(events_configs, @r###"
626 KafkaTopicConfig(
627 [
628 KafkaParams {
629 topic_name: "shard-0",
630 config_name: Some(
631 "cluster1",
632 ),
633 params: [
634 KafkaConfigParam {
635 name: "bootstrap.servers",
636 value: "cluster1:9092",
637 },
638 ],
639 key_rate_limit: Some(
640 KeyRateLimit {
641 limit_per_window: 100,
642 window_secs: 60,
643 },
644 ),
645 },
646 KafkaParams {
647 topic_name: "shard-1",
648 config_name: Some(
649 "cluster2",
650 ),
651 params: [
652 KafkaConfigParam {
653 name: "bootstrap.servers",
654 value: "cluster2:9092",
655 },
656 ],
657 key_rate_limit: Some(
658 KeyRateLimit {
659 limit_per_window: 200,
660 window_secs: 120,
661 },
662 ),
663 },
664 KafkaParams {
665 topic_name: "shard-2",
666 config_name: None,
667 params: [
668 KafkaConfigParam {
669 name: "bootstrap.servers",
670 value: "primary:9092",
671 },
672 ],
673 key_rate_limit: None,
674 },
675 ],
676 )
677 "###);
678 }
679}