1use std::collections::BTreeMap;
8
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11
12#[derive(Error, Debug)]
14pub enum ConfigError {
15 #[error("unknown kafka config name")]
17 UnknownKafkaConfigName,
18 #[error("invalid kafka shard configuration: must have shard with index 0")]
20 InvalidShard,
21}
22
23#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
25pub enum KafkaTopic {
26 Events,
28 Attachments,
30 Transactions,
32 Outcomes,
34 OutcomesBilling,
36 MetricsSessions,
38 MetricsGeneric,
40 Profiles,
42 ReplayEvents,
44 ReplayRecordings,
46 Monitors,
48 OurLogs,
50 Spans,
52 Feedback,
54 Items,
56}
57
58impl KafkaTopic {
59 pub fn iter() -> std::slice::Iter<'static, Self> {
62 use KafkaTopic::*;
63 static TOPICS: [KafkaTopic; 15] = [
64 Events,
65 Attachments,
66 Transactions,
67 Outcomes,
68 OutcomesBilling,
69 MetricsSessions,
70 MetricsGeneric,
71 Profiles,
72 ReplayEvents,
73 ReplayRecordings,
74 Monitors,
75 OurLogs,
76 Spans,
77 Feedback,
78 Items,
79 ];
80 TOPICS.iter()
81 }
82}
83
84macro_rules! define_topic_assignments {
85 ($($field_name:ident : ($kafka_topic:path, $default_topic:literal, $doc:literal)),* $(,)?) => {
86 #[derive(Serialize, Deserialize, Debug)]
88 #[serde(default)]
89 pub struct TopicAssignments {
90 $(
91 #[serde(alias = $default_topic)]
92 #[doc = $doc]
93 pub $field_name: TopicAssignment,
94 )*
95
96 #[serde(flatten)]
98 pub unused: BTreeMap<String, TopicAssignment>,
99 }
100
101 impl TopicAssignments{
102 #[must_use]
104 pub fn get(&self, kafka_topic: KafkaTopic) -> &TopicAssignment {
105 match kafka_topic {
106 $(
107 $kafka_topic => &self.$field_name,
108 )*
109 }
110 }
111 }
112
113 impl Default for TopicAssignments {
114 fn default() -> Self {
115 Self {
116 $(
117 $field_name: $default_topic.to_owned().into(),
118 )*
119 unused: BTreeMap::new(),
120 }
121 }
122 }
123 };
124}
125
126define_topic_assignments! {
130 events: (KafkaTopic::Events, "ingest-events", "Simple events topic name."),
131 attachments: (KafkaTopic::Attachments, "ingest-attachments", "Events with attachments topic name."),
132 transactions: (KafkaTopic::Transactions, "ingest-transactions", "Transaction events topic name."),
133 outcomes: (KafkaTopic::Outcomes, "outcomes", "Outcomes topic name."),
134 outcomes_billing: (KafkaTopic::OutcomesBilling, "outcomes-billing", "Outcomes topic name for billing critical outcomes."),
135 metrics_sessions: (KafkaTopic::MetricsSessions, "ingest-metrics", "Topic name for metrics extracted from sessions, aka release health."),
136 metrics_generic: (KafkaTopic::MetricsGeneric, "ingest-performance-metrics", "Topic name for all other kinds of metrics."),
137 profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"),
138 replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."),
139 replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."),
140 ourlogs: (KafkaTopic::OurLogs, "snuba-ourlogs", "Logs from our logs product."),
141 monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."),
142 spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."),
143 feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."),
144 items: (KafkaTopic::Items, "snuba-items", "Items topic."),
145}
146
147#[derive(Serialize, Deserialize, Debug)]
155#[serde(untagged)]
156pub enum TopicAssignment {
157 Primary(String),
160 Secondary(KafkaTopicConfig),
164}
165
166#[derive(Serialize, Deserialize, Debug)]
168pub struct KafkaTopicConfig {
169 #[serde(rename = "name")]
171 topic_name: String,
172 #[serde(rename = "config")]
174 kafka_config_name: String,
175}
176
177#[derive(Debug)]
179pub struct KafkaParams<'a> {
180 pub topic_name: &'a str,
182 pub config_name: Option<&'a str>,
184 pub params: &'a [KafkaConfigParam],
186}
187
188impl From<String> for TopicAssignment {
189 fn from(topic_name: String) -> Self {
190 Self::Primary(topic_name)
191 }
192}
193
194impl TopicAssignment {
195 pub fn kafka_config<'a>(
200 &'a self,
201 default_config: &'a Vec<KafkaConfigParam>,
202 secondary_configs: &'a BTreeMap<String, Vec<KafkaConfigParam>>,
203 ) -> Result<KafkaParams<'a>, ConfigError> {
204 let kafka_config = match self {
205 Self::Primary(topic_name) => KafkaParams {
206 topic_name,
207 config_name: None,
208 params: default_config.as_slice(),
209 },
210 Self::Secondary(KafkaTopicConfig {
211 topic_name,
212 kafka_config_name,
213 }) => KafkaParams {
214 config_name: Some(kafka_config_name),
215 topic_name,
216 params: secondary_configs
217 .get(kafka_config_name)
218 .ok_or(ConfigError::UnknownKafkaConfigName)?,
219 },
220 };
221
222 Ok(kafka_config)
223 }
224}
225
226#[derive(Serialize, Deserialize, Debug)]
228pub struct KafkaConfigParam {
229 pub name: String,
231 pub value: String,
233}
234
235#[cfg(test)]
236mod tests {
237
238 use super::*;
239
240 #[test]
241 fn test_kafka_config() {
242 let yaml = r#"
243ingest-events: "ingest-events-kafka-topic"
244profiles:
245 name: "ingest-profiles"
246 config: "profiles"
247ingest-metrics: "ingest-metrics-3"
248transactions: "ingest-transactions-kafka-topic"
249"#;
250
251 let def_config = vec![KafkaConfigParam {
252 name: "test".to_string(),
253 value: "test-value".to_string(),
254 }];
255 let mut second_config = BTreeMap::new();
256 second_config.insert(
257 "profiles".to_string(),
258 vec![KafkaConfigParam {
259 name: "test".to_string(),
260 value: "test-value".to_string(),
261 }],
262 );
263
264 let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
265 let events = topics.events;
266 let profiles = topics.profiles;
267 let metrics_sessions = topics.metrics_sessions;
268 let transactions = topics.transactions;
269
270 assert!(matches!(events, TopicAssignment::Primary(_)));
271 assert!(matches!(profiles, TopicAssignment::Secondary { .. }));
272 assert!(matches!(metrics_sessions, TopicAssignment::Primary(_)));
273 assert!(matches!(transactions, TopicAssignment::Primary(_)));
274
275 let events_config = events
276 .kafka_config(&def_config, &second_config)
277 .expect("Kafka config for events topic");
278 assert!(matches!(
279 events_config,
280 KafkaParams {
281 topic_name: "ingest-events-kafka-topic",
282 ..
283 }
284 ));
285
286 let events_config = profiles
287 .kafka_config(&def_config, &second_config)
288 .expect("Kafka config for profiles topic");
289 assert!(matches!(
290 events_config,
291 KafkaParams {
292 topic_name: "ingest-profiles",
293 config_name: Some("profiles"),
294 ..
295 }
296 ));
297
298 let events_config = metrics_sessions
299 .kafka_config(&def_config, &second_config)
300 .expect("Kafka config for metrics topic");
301 assert!(matches!(
302 events_config,
303 KafkaParams {
304 topic_name: "ingest-metrics-3",
305 ..
306 }
307 ));
308
309 let transactions_config = transactions
311 .kafka_config(&def_config, &second_config)
312 .expect("Kafka config for transactions topic");
313 assert!(matches!(
314 transactions_config,
315 KafkaParams {
316 topic_name: "ingest-transactions-kafka-topic",
317 ..
318 }
319 ));
320 }
321
322 #[test]
323 fn test_default_topic_is_valid() {
324 let topic_assignments = TopicAssignments::default();
325
326 let currrently_undefined_topics = [
328 "ingest-attachments".to_string(),
329 "ingest-transactions".to_string(),
330 "profiles".to_string(),
331 "ingest-monitors".to_string(),
332 ];
333
334 for topic in KafkaTopic::iter() {
335 match topic_assignments.get(*topic) {
336 TopicAssignment::Primary(logical_topic_name) => {
337 if !currrently_undefined_topics.contains(logical_topic_name) {
338 assert!(sentry_kafka_schemas::get_schema(logical_topic_name, None).is_ok());
339 }
340 }
341 _ => panic!("invalid default"),
342 }
343 }
344 }
345}