1use std::collections::BTreeMap;
8
9use serde::{Deserialize, Serialize};
10use thiserror::Error;
11
12#[derive(Error, Debug)]
14pub enum ConfigError {
15 #[error("unknown kafka config name")]
17 UnknownKafkaConfigName,
18 #[error("invalid kafka shard configuration: must have shard with index 0")]
20 InvalidShard,
21}
22
23#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
25pub enum KafkaTopic {
26 Events,
28 Attachments,
30 Transactions,
32 Outcomes,
34 OutcomesBilling,
36 MetricsSessions,
38 MetricsGeneric,
40 Profiles,
42 ReplayEvents,
44 ReplayRecordings,
46 Monitors,
48 OurLogs,
50 Spans,
52 Feedback,
54 Items,
56}
57
58impl KafkaTopic {
59 pub fn iter() -> std::slice::Iter<'static, Self> {
62 use KafkaTopic::*;
63 static TOPICS: [KafkaTopic; 15] = [
64 Events,
65 Attachments,
66 Transactions,
67 Outcomes,
68 OutcomesBilling,
69 MetricsSessions,
70 MetricsGeneric,
71 Profiles,
72 ReplayEvents,
73 ReplayRecordings,
74 Monitors,
75 OurLogs,
76 Spans,
77 Feedback,
78 Items,
79 ];
80 TOPICS.iter()
81 }
82}
83
84macro_rules! define_topic_assignments {
85 ($($field_name:ident : ($kafka_topic:path, $default_topic:literal, $doc:literal)),* $(,)?) => {
86 #[derive(Serialize, Deserialize, Debug)]
88 #[serde(default)]
89 pub struct TopicAssignments {
90 $(
91 #[serde(alias = $default_topic)]
92 #[doc = $doc]
93 pub $field_name: TopicAssignment,
94 )*
95
96 #[serde(flatten)]
98 pub unused: BTreeMap<String, TopicAssignment>,
99 }
100
101 impl TopicAssignments{
102 #[must_use]
104 pub fn get(&self, kafka_topic: KafkaTopic) -> &TopicAssignment {
105 match kafka_topic {
106 $(
107 $kafka_topic => &self.$field_name,
108 )*
109 }
110 }
111 }
112
113 impl Default for TopicAssignments {
114 fn default() -> Self {
115 Self {
116 $(
117 $field_name: $default_topic.to_owned().into(),
118 )*
119 unused: BTreeMap::new(),
120 }
121 }
122 }
123 };
124}
125
126define_topic_assignments! {
130 events: (KafkaTopic::Events, "ingest-events", "Simple events topic name."),
131 attachments: (KafkaTopic::Attachments, "ingest-attachments", "Events with attachments topic name."),
132 transactions: (KafkaTopic::Transactions, "ingest-transactions", "Transaction events topic name."),
133 outcomes: (KafkaTopic::Outcomes, "outcomes", "Outcomes topic name."),
134 outcomes_billing: (KafkaTopic::OutcomesBilling, "outcomes-billing", "Outcomes topic name for billing critical outcomes."),
135 metrics_sessions: (KafkaTopic::MetricsSessions, "ingest-metrics", "Topic name for metrics extracted from sessions, aka release health."),
136 metrics_generic: (KafkaTopic::MetricsGeneric, "ingest-performance-metrics", "Topic name for all other kinds of metrics."),
137 profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"),
138 replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."),
139 replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."),
140 ourlogs: (KafkaTopic::OurLogs, "snuba-ourlogs", "Logs from our logs product."),
141 monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."),
142 spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."),
143 feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."),
144 items: (KafkaTopic::Items, "snuba-items", "Items topic."),
145}
146
147#[derive(Serialize, Deserialize, Debug)]
155#[serde(untagged)]
156pub enum TopicAssignment {
157 Primary(String),
160 Secondary(KafkaTopicConfig),
164}
165
166#[derive(Serialize, Deserialize, Debug)]
168pub struct KafkaTopicConfig {
169 #[serde(rename = "name")]
171 topic_name: String,
172 #[serde(rename = "config")]
174 kafka_config_name: String,
175 #[serde(default, skip_serializing_if = "Option::is_none")]
177 key_rate_limit: Option<KeyRateLimit>,
178}
179
180#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
182pub struct KeyRateLimit {
183 pub limit_per_window: u64,
185
186 pub window_secs: u64,
190}
191
192#[derive(Debug)]
194pub struct KafkaParams<'a> {
195 pub topic_name: &'a str,
197 pub config_name: Option<&'a str>,
199 pub params: &'a [KafkaConfigParam],
201 pub key_rate_limit: Option<KeyRateLimit>,
203}
204
205impl From<String> for TopicAssignment {
206 fn from(topic_name: String) -> Self {
207 Self::Primary(topic_name)
208 }
209}
210
211impl TopicAssignment {
212 pub fn kafka_config<'a>(
217 &'a self,
218 default_config: &'a Vec<KafkaConfigParam>,
219 secondary_configs: &'a BTreeMap<String, Vec<KafkaConfigParam>>,
220 ) -> Result<KafkaParams<'a>, ConfigError> {
221 let kafka_config = match self {
222 Self::Primary(topic_name) => KafkaParams {
223 topic_name,
224 config_name: None,
225 params: default_config.as_slice(),
226 key_rate_limit: None,
229 },
230 Self::Secondary(KafkaTopicConfig {
231 topic_name,
232 kafka_config_name,
233 key_rate_limit,
234 }) => KafkaParams {
235 config_name: Some(kafka_config_name),
236 topic_name,
237 params: secondary_configs
238 .get(kafka_config_name)
239 .ok_or(ConfigError::UnknownKafkaConfigName)?,
240 key_rate_limit: *key_rate_limit,
241 },
242 };
243
244 Ok(kafka_config)
245 }
246}
247
248#[derive(Serialize, Deserialize, Debug)]
250pub struct KafkaConfigParam {
251 pub name: String,
253 pub value: String,
255}
256
257#[cfg(test)]
258mod tests {
259
260 use super::*;
261
262 #[test]
263 fn test_kafka_config() {
264 let yaml = r#"
265ingest-events: "ingest-events-kafka-topic"
266profiles:
267 name: "ingest-profiles"
268 config: "profiles"
269ingest-metrics: "ingest-metrics-3"
270transactions: "ingest-transactions-kafka-topic"
271"#;
272
273 let def_config = vec![KafkaConfigParam {
274 name: "test".to_string(),
275 value: "test-value".to_string(),
276 }];
277 let mut second_config = BTreeMap::new();
278 second_config.insert(
279 "profiles".to_string(),
280 vec![KafkaConfigParam {
281 name: "test".to_string(),
282 value: "test-value".to_string(),
283 }],
284 );
285
286 let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
287 let events = topics.events;
288 let profiles = topics.profiles;
289 let metrics_sessions = topics.metrics_sessions;
290 let transactions = topics.transactions;
291
292 assert!(matches!(events, TopicAssignment::Primary(_)));
293 assert!(matches!(profiles, TopicAssignment::Secondary { .. }));
294 assert!(matches!(metrics_sessions, TopicAssignment::Primary(_)));
295 assert!(matches!(transactions, TopicAssignment::Primary(_)));
296
297 let events_config = events
298 .kafka_config(&def_config, &second_config)
299 .expect("Kafka config for events topic");
300 assert!(matches!(
301 events_config,
302 KafkaParams {
303 topic_name: "ingest-events-kafka-topic",
304 ..
305 }
306 ));
307
308 let events_config = profiles
309 .kafka_config(&def_config, &second_config)
310 .expect("Kafka config for profiles topic");
311 assert!(matches!(
312 events_config,
313 KafkaParams {
314 topic_name: "ingest-profiles",
315 config_name: Some("profiles"),
316 ..
317 }
318 ));
319
320 let events_config = metrics_sessions
321 .kafka_config(&def_config, &second_config)
322 .expect("Kafka config for metrics topic");
323 assert!(matches!(
324 events_config,
325 KafkaParams {
326 topic_name: "ingest-metrics-3",
327 ..
328 }
329 ));
330
331 let transactions_config = transactions
333 .kafka_config(&def_config, &second_config)
334 .expect("Kafka config for transactions topic");
335 assert!(matches!(
336 transactions_config,
337 KafkaParams {
338 topic_name: "ingest-transactions-kafka-topic",
339 ..
340 }
341 ));
342 }
343
344 #[test]
345 fn test_default_topic_is_valid() {
346 let topic_assignments = TopicAssignments::default();
347
348 let currrently_undefined_topics = [
350 "ingest-attachments".to_string(),
351 "ingest-transactions".to_string(),
352 "profiles".to_string(),
353 "ingest-monitors".to_string(),
354 ];
355
356 for topic in KafkaTopic::iter() {
357 match topic_assignments.get(*topic) {
358 TopicAssignment::Primary(logical_topic_name) => {
359 if !currrently_undefined_topics.contains(logical_topic_name) {
360 assert!(sentry_kafka_schemas::get_schema(logical_topic_name, None).is_ok());
361 }
362 }
363 _ => panic!("invalid default"),
364 }
365 }
366 }
367}