use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum ConfigError {
#[error("unknown kafka config name")]
UnknownKafkaConfigName,
#[error("invalid kafka shard configuration: must have shard with index 0")]
InvalidShard,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum KafkaTopic {
Events,
Attachments,
Transactions,
Outcomes,
OutcomesBilling,
MetricsSessions,
MetricsGeneric,
Profiles,
ReplayEvents,
ReplayRecordings,
Monitors,
Spans,
Feedback,
}
impl KafkaTopic {
pub fn iter() -> std::slice::Iter<'static, Self> {
use KafkaTopic::*;
static TOPICS: [KafkaTopic; 13] = [
Events,
Attachments,
Transactions,
Outcomes,
OutcomesBilling,
MetricsSessions,
MetricsGeneric,
Profiles,
ReplayEvents,
ReplayRecordings,
Monitors,
Spans,
Feedback,
];
TOPICS.iter()
}
}
macro_rules! define_topic_assignments {
($($field_name:ident : ($kafka_topic:path, $default_topic:literal, $doc:literal)),* $(,)?) => {
#[derive(Serialize, Deserialize, Debug)]
#[serde(default)]
pub struct TopicAssignments {
$(
#[serde(alias = $default_topic)]
#[doc = $doc]
pub $field_name: TopicAssignment,
)*
#[serde(flatten)]
pub unused: BTreeMap<String, TopicAssignment>,
}
impl TopicAssignments{
#[must_use]
pub fn get(&self, kafka_topic: KafkaTopic) -> &TopicAssignment {
match kafka_topic {
$(
$kafka_topic => &self.$field_name,
)*
}
}
}
impl Default for TopicAssignments {
fn default() -> Self {
Self {
$(
$field_name: $default_topic.to_owned().into(),
)*
unused: BTreeMap::new(),
}
}
}
};
}
define_topic_assignments! {
events: (KafkaTopic::Events, "ingest-events", "Simple events topic name."),
attachments: (KafkaTopic::Attachments, "ingest-attachments", "Events with attachments topic name."),
transactions: (KafkaTopic::Transactions, "ingest-transactions", "Transaction events topic name."),
outcomes: (KafkaTopic::Outcomes, "outcomes", "Outcomes topic name."),
outcomes_billing: (KafkaTopic::OutcomesBilling, "outcomes-billing", "Outcomes topic name for billing critical outcomes."),
metrics_sessions: (KafkaTopic::MetricsSessions, "ingest-metrics", "Topic name for metrics extracted from sessions, aka release health."),
metrics_generic: (KafkaTopic::MetricsGeneric, "ingest-performance-metrics", "Topic name for all other kinds of metrics."),
profiles: (KafkaTopic::Profiles, "profiles", "Stacktrace topic name"),
replay_events: (KafkaTopic::ReplayEvents, "ingest-replay-events", "Replay Events topic name."),
replay_recordings: (KafkaTopic::ReplayRecordings, "ingest-replay-recordings", "Recordings topic name."),
monitors: (KafkaTopic::Monitors, "ingest-monitors", "Monitor check-ins."),
spans: (KafkaTopic::Spans, "snuba-spans", "Standalone spans without a transaction."),
feedback: (KafkaTopic::Feedback, "ingest-feedback-events", "Feedback events topic."),
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)]
pub enum TopicAssignment {
Primary(String),
Secondary(KafkaTopicConfig),
}
#[derive(Serialize, Deserialize, Debug)]
pub struct KafkaTopicConfig {
#[serde(rename = "name")]
topic_name: String,
#[serde(rename = "config")]
kafka_config_name: String,
}
#[derive(Debug)]
pub struct KafkaParams<'a> {
pub topic_name: &'a str,
pub config_name: Option<&'a str>,
pub params: &'a [KafkaConfigParam],
}
impl From<String> for TopicAssignment {
fn from(topic_name: String) -> Self {
Self::Primary(topic_name)
}
}
impl TopicAssignment {
pub fn kafka_config<'a>(
&'a self,
default_config: &'a Vec<KafkaConfigParam>,
secondary_configs: &'a BTreeMap<String, Vec<KafkaConfigParam>>,
) -> Result<KafkaParams<'a>, ConfigError> {
let kafka_config = match self {
Self::Primary(topic_name) => KafkaParams {
topic_name,
config_name: None,
params: default_config.as_slice(),
},
Self::Secondary(KafkaTopicConfig {
topic_name,
kafka_config_name,
}) => KafkaParams {
config_name: Some(kafka_config_name),
topic_name,
params: secondary_configs
.get(kafka_config_name)
.ok_or(ConfigError::UnknownKafkaConfigName)?,
},
};
Ok(kafka_config)
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct KafkaConfigParam {
pub name: String,
pub value: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_kafka_config() {
let yaml = r#"
ingest-events: "ingest-events-kafka-topic"
profiles:
name: "ingest-profiles"
config: "profiles"
ingest-metrics: "ingest-metrics-3"
transactions: "ingest-transactions-kafka-topic"
"#;
let def_config = vec![KafkaConfigParam {
name: "test".to_string(),
value: "test-value".to_string(),
}];
let mut second_config = BTreeMap::new();
second_config.insert(
"profiles".to_string(),
vec![KafkaConfigParam {
name: "test".to_string(),
value: "test-value".to_string(),
}],
);
let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
let events = topics.events;
let profiles = topics.profiles;
let metrics_sessions = topics.metrics_sessions;
let transactions = topics.transactions;
assert!(matches!(events, TopicAssignment::Primary(_)));
assert!(matches!(profiles, TopicAssignment::Secondary { .. }));
assert!(matches!(metrics_sessions, TopicAssignment::Primary(_)));
assert!(matches!(transactions, TopicAssignment::Primary(_)));
let events_config = events
.kafka_config(&def_config, &second_config)
.expect("Kafka config for events topic");
assert!(matches!(
events_config,
KafkaParams {
topic_name: "ingest-events-kafka-topic",
..
}
));
let events_config = profiles
.kafka_config(&def_config, &second_config)
.expect("Kafka config for profiles topic");
assert!(matches!(
events_config,
KafkaParams {
topic_name: "ingest-profiles",
config_name: Some("profiles"),
..
}
));
let events_config = metrics_sessions
.kafka_config(&def_config, &second_config)
.expect("Kafka config for metrics topic");
assert!(matches!(
events_config,
KafkaParams {
topic_name: "ingest-metrics-3",
..
}
));
let transactions_config = transactions
.kafka_config(&def_config, &second_config)
.expect("Kafka config for transactions topic");
assert!(matches!(
transactions_config,
KafkaParams {
topic_name: "ingest-transactions-kafka-topic",
..
}
));
}
#[test]
fn test_default_topic_is_valid() {
let topic_assignments = TopicAssignments::default();
let currrently_undefined_topics = [
"ingest-attachments".to_string(),
"ingest-transactions".to_string(),
"profiles".to_string(),
"ingest-monitors".to_string(),
];
for topic in KafkaTopic::iter() {
match topic_assignments.get(*topic) {
TopicAssignment::Primary(logical_topic_name) => {
if !currrently_undefined_topics.contains(logical_topic_name) {
assert!(sentry_kafka_schemas::get_schema(logical_topic_name, None).is_ok());
}
}
_ => panic!("invalid default"),
}
}
}
}