relay_kafka/producer/
schemas.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};

use sentry_kafka_schemas::{Schema as SentrySchema, SchemaError as SentrySchemaError};
use thiserror::Error;

use crate::config::{KafkaTopic, TopicAssignment, TopicAssignments};

#[derive(Debug, Error)]
pub enum SchemaError {
    /// The "logical topic" is a concept in sentry-kafka-schemas and Snuba that identifies topics
    /// irrespective of how they are structured in production. Snuba has a mapping from "logical
    /// topic" to "physical topic" that is very similar to our `TopicAssignments`.
    ///
    /// The name of a logical topic is almost always equal to the Kafka topic's name in local
    /// development. So, in order to determine a logical topic name from a `KafkaTopic`, we get
    /// `TopicAssignments::default()` and try to resolve the name through that.
    ///
    /// When doing that we assume that Relay's default topic assignments don't use slicing/sharding
    /// and no custom clusters.
    ///
    /// If somebody changes `impl Default for TopicAssignments` to have more complex defaults, this
    /// error will start occurring. But it should not happen in prod.
    #[error("failed to determine logical topic")]
    LogicalTopic,

    /// Failed to compile schema
    #[error("failed to compile schema")]
    SchemaCompiled(#[source] SentrySchemaError),

    /// Failed to validate message JSON against schema
    #[error("message violates schema")]
    Validation(#[source] SentrySchemaError),
}

/// Validates payloads for their given topic's schema.
#[derive(Debug, Default)]
pub struct Validator {
    /// Caches the schema for given topics.
    schemas: Mutex<BTreeMap<KafkaTopic, Option<Arc<SentrySchema>>>>,
}

impl Validator {
    /// Validate a message for a given topic's schema.
    pub fn validate_message_schema(
        &self,
        topic: KafkaTopic,
        message: &[u8],
    ) -> Result<(), SchemaError> {
        let Some(schema) = self.get_schema(topic)? else {
            return Ok(());
        };

        schema
            .validate_json(message)
            .map_err(SchemaError::Validation)
            .map(drop)
    }

    fn get_schema(&self, topic: KafkaTopic) -> Result<Option<Arc<SentrySchema>>, SchemaError> {
        let mut schemas = self.schemas.lock().unwrap_or_else(|e| e.into_inner());

        Ok(match schemas.entry(topic) {
            Entry::Vacant(entry) => entry.insert({
                let default_assignments = TopicAssignments::default();
                let logical_topic_name = match default_assignments.get(topic) {
                    TopicAssignment::Primary(logical_topic_name) => logical_topic_name,
                    _ => return Err(SchemaError::LogicalTopic),
                };

                match sentry_kafka_schemas::get_schema(logical_topic_name, None) {
                    Ok(schema) => Some(Arc::new(schema)),
                    Err(SentrySchemaError::TopicNotFound) => None,
                    Err(err) => return Err(SchemaError::SchemaCompiled(err)),
                }
            }),
            Entry::Occupied(entry) => entry.into_mut(),
        }
        .as_ref()
        .map(Arc::clone))
    }
}