1use std::collections::BTreeMap;
8
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11
12#[derive(Error, Debug)]
14pub enum ConfigError {
15 #[error("unknown kafka config name")]
17 UnknownKafkaConfigName,
18 #[error("invalid kafka shard configuration: must have shard with index 0")]
20 InvalidShard,
21}
22
23#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
25pub enum KafkaTopic {
26 Events,
28 Attachments,
30 Transactions,
32 Outcomes,
34 OutcomesBilling,
36 MetricsSessions,
38 MetricsGeneric,
40 Profiles,
42 ReplayEvents,
44 ReplayRecordings,
46 Monitors,
48 OurLogs,
50 Spans,
52 Feedback,
54}
55
56impl KafkaTopic {
57 pub fn iter() -> std::slice::Iter<'static, Self> {
60 use KafkaTopic::*;
61 static TOPICS: [KafkaTopic; 14] = [
62 Events,
63 Attachments,
64 Transactions,
65 Outcomes,
66 OutcomesBilling,
67 MetricsSessions,
68 MetricsGeneric,
69 Profiles,
70 ReplayEvents,
71 ReplayRecordings,
72 Monitors,
73 OurLogs,
74 Spans,
75 Feedback,
76 ];
77 TOPICS.iter()
78 }
79}
80
81macro_rules! define_topic_assignments {
82 ($($field_name:ident : ($kafka_topic:path, $default_topic:literal, $doc:literal)),* $(,)?) => {
83 #[derive(Serialize, Deserialize, Debug)]
85 #[serde(default)]
86 pub struct TopicAssignments {
87 $(
88 #[serde(alias = $default_topic)]
89 #[doc = $doc]
90 pub $field_name: TopicAssignment,
91 )*
92
93 #[serde(flatten)]
95 pub unused: BTreeMap<String, TopicAssignment>,
96 }
97
98 impl TopicAssignments{
99 #[must_use]
101 pub fn get(&self, kafka_topic: KafkaTopic) -> &TopicAssignment {
102 match kafka_topic {
103 $(
104 $kafka_topic => &self.$field_name,
105 )*
106 }
107 }
108 }
109
110 impl Default for TopicAssignments {
111 fn default() -> Self {
112 Self {
113 $(
114 $field_name: $default_topic.to_owned().into(),
115 )*
116 unused: BTreeMap::new(),
117 }
118 }
119 }
120 };
121}
122
123define_topic_assignments! {
127 events: (KafkaTopic::Events, "ingest-events", "Simple events topic name."),
128 attachments: (KafkaTopic::Attachments, "ingest-attachments", "Events with attachments topic name."),
129 transactions: (KafkaTopic::Transactions, "ingest-transactions", "Transaction events topic name."),
130 outcomes: (KafkaTopic::Outcomes, "outcomes", "Outcomes topic name."),
131 outcomes_billing: (KafkaTopic::OutcomesBilling, "outcomes-billing", "Outcomes topic name for billing critical outcomes."),
132 metrics_sessions: (KafkaTopic::MetricsSessions, "ingest-metrics", "Topic name for metrics extracted from sessions, aka release health."),
133 metrics_generic: (KafkaTopic::MetricsGeneric, "ingest-performance-metrics", "Topic name for all other kinds of metrics."),
134 profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"),
135 replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."),
136 replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."),
137 ourlogs: (KafkaTopic::OurLogs, "snuba-ourlogs", "Logs from our logs product."),
138 monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."),
139 spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."),
140 feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."),
141}
142
143#[derive(Serialize, Deserialize, Debug)]
151#[serde(untagged)]
152pub enum TopicAssignment {
153 Primary(String),
156 Secondary(KafkaTopicConfig),
160}
161
162#[derive(Serialize, Deserialize, Debug)]
164pub struct KafkaTopicConfig {
165 #[serde(rename = "name")]
167 topic_name: String,
168 #[serde(rename = "config")]
170 kafka_config_name: String,
171}
172
173#[derive(Debug)]
175pub struct KafkaParams<'a> {
176 pub topic_name: &'a str,
178 pub config_name: Option<&'a str>,
180 pub params: &'a [KafkaConfigParam],
182}
183
184impl From<String> for TopicAssignment {
185 fn from(topic_name: String) -> Self {
186 Self::Primary(topic_name)
187 }
188}
189
190impl TopicAssignment {
191 pub fn kafka_config<'a>(
196 &'a self,
197 default_config: &'a Vec<KafkaConfigParam>,
198 secondary_configs: &'a BTreeMap<String, Vec<KafkaConfigParam>>,
199 ) -> Result<KafkaParams<'a>, ConfigError> {
200 let kafka_config = match self {
201 Self::Primary(topic_name) => KafkaParams {
202 topic_name,
203 config_name: None,
204 params: default_config.as_slice(),
205 },
206 Self::Secondary(KafkaTopicConfig {
207 topic_name,
208 kafka_config_name,
209 }) => KafkaParams {
210 config_name: Some(kafka_config_name),
211 topic_name,
212 params: secondary_configs
213 .get(kafka_config_name)
214 .ok_or(ConfigError::UnknownKafkaConfigName)?,
215 },
216 };
217
218 Ok(kafka_config)
219 }
220}
221
222#[derive(Serialize, Deserialize, Debug)]
224pub struct KafkaConfigParam {
225 pub name: String,
227 pub value: String,
229}
230
231#[cfg(test)]
232mod tests {
233
234 use super::*;
235
236 #[test]
237 fn test_kafka_config() {
238 let yaml = r#"
239ingest-events: "ingest-events-kafka-topic"
240profiles:
241 name: "ingest-profiles"
242 config: "profiles"
243ingest-metrics: "ingest-metrics-3"
244transactions: "ingest-transactions-kafka-topic"
245"#;
246
247 let def_config = vec![KafkaConfigParam {
248 name: "test".to_string(),
249 value: "test-value".to_string(),
250 }];
251 let mut second_config = BTreeMap::new();
252 second_config.insert(
253 "profiles".to_string(),
254 vec![KafkaConfigParam {
255 name: "test".to_string(),
256 value: "test-value".to_string(),
257 }],
258 );
259
260 let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
261 let events = topics.events;
262 let profiles = topics.profiles;
263 let metrics_sessions = topics.metrics_sessions;
264 let transactions = topics.transactions;
265
266 assert!(matches!(events, TopicAssignment::Primary(_)));
267 assert!(matches!(profiles, TopicAssignment::Secondary { .. }));
268 assert!(matches!(metrics_sessions, TopicAssignment::Primary(_)));
269 assert!(matches!(transactions, TopicAssignment::Primary(_)));
270
271 let events_config = events
272 .kafka_config(&def_config, &second_config)
273 .expect("Kafka config for events topic");
274 assert!(matches!(
275 events_config,
276 KafkaParams {
277 topic_name: "ingest-events-kafka-topic",
278 ..
279 }
280 ));
281
282 let events_config = profiles
283 .kafka_config(&def_config, &second_config)
284 .expect("Kafka config for profiles topic");
285 assert!(matches!(
286 events_config,
287 KafkaParams {
288 topic_name: "ingest-profiles",
289 config_name: Some("profiles"),
290 ..
291 }
292 ));
293
294 let events_config = metrics_sessions
295 .kafka_config(&def_config, &second_config)
296 .expect("Kafka config for metrics topic");
297 assert!(matches!(
298 events_config,
299 KafkaParams {
300 topic_name: "ingest-metrics-3",
301 ..
302 }
303 ));
304
305 let transactions_config = transactions
307 .kafka_config(&def_config, &second_config)
308 .expect("Kafka config for transactions topic");
309 assert!(matches!(
310 transactions_config,
311 KafkaParams {
312 topic_name: "ingest-transactions-kafka-topic",
313 ..
314 }
315 ));
316 }
317
318 #[test]
319 fn test_default_topic_is_valid() {
320 let topic_assignments = TopicAssignments::default();
321
322 let currrently_undefined_topics = [
324 "ingest-attachments".to_string(),
325 "ingest-transactions".to_string(),
326 "profiles".to_string(),
327 "ingest-monitors".to_string(),
328 ];
329
330 for topic in KafkaTopic::iter() {
331 match topic_assignments.get(*topic) {
332 TopicAssignment::Primary(logical_topic_name) => {
333 if !currrently_undefined_topics.contains(logical_topic_name) {
334 assert!(sentry_kafka_schemas::get_schema(logical_topic_name, None).is_ok());
335 }
336 }
337 _ => panic!("invalid default"),
338 }
339 }
340 }
341}