relay_config/
aggregator.rs

1//! Metrics aggregator configuration.
2
3use relay_metrics::aggregator::AggregatorConfig;
4use relay_metrics::{MetricNamespace, UnixTimestamp};
5use serde::{Deserialize, Serialize};
6
7/// Parameters used for metric aggregation.
8#[derive(Clone, Debug, Deserialize, Serialize)]
9#[serde(default)]
10pub struct AggregatorServiceConfig {
11    /// The config used by the internal aggregator.
12    #[serde(flatten)]
13    pub aggregator: AggregatorConfig,
14
15    /// The length the name of a metric is allowed to be.
16    ///
17    /// Defaults to `200` bytes.
18    pub max_name_length: usize,
19
20    /// The length the tag key is allowed to be.
21    ///
22    /// Defaults to `200` bytes.
23    pub max_tag_key_length: usize,
24
25    /// The length the tag value is allowed to be.
26    ///
27    /// Defaults to `200` chars.
28    pub max_tag_value_length: usize,
29
30    /// The approximate maximum number of bytes submitted within one flush cycle.
31    ///
32    /// This controls how big flushed batches of buckets get, depending on the number of buckets,
33    /// the cumulative length of their keys, and the number of raw values. Since final serialization
34    /// adds some additional overhead, this number is approximate and some safety margin should be
35    /// left to hard limits.
36    pub max_flush_bytes: usize,
37}
38
39impl AggregatorServiceConfig {
40    /// Returns the valid range for metrics timestamps.
41    ///
42    /// Metrics or buckets outside of this range should be discarded.
43    pub fn timestamp_range(&self) -> std::ops::Range<UnixTimestamp> {
44        let now = UnixTimestamp::now().as_secs();
45        let min_timestamp =
46            UnixTimestamp::from_secs(now.saturating_sub(self.aggregator.max_secs_in_past));
47        let max_timestamp =
48            UnixTimestamp::from_secs(now.saturating_add(self.aggregator.max_secs_in_future));
49        min_timestamp..max_timestamp
50    }
51}
52
53impl Default for AggregatorServiceConfig {
54    fn default() -> Self {
55        Self {
56            aggregator: AggregatorConfig::default(),
57            max_name_length: 200,
58            max_tag_key_length: 200,
59            max_tag_value_length: 200,
60            max_flush_bytes: 5_000_000, // 5 MB
61        }
62    }
63}
64
65/// Contains an [`AggregatorServiceConfig`] for a specific scope.
66///
67/// For now, the only way to scope an aggregator is by [`MetricNamespace`].
68#[derive(Clone, Debug, Deserialize, Serialize)]
69pub struct ScopedAggregatorConfig {
70    /// Name of the aggregator, used to tag statsd metrics.
71    pub name: String,
72    /// Condition that needs to be met for a metric or bucket to be routed to a
73    /// secondary aggregator.
74    pub condition: Condition,
75    /// The configuration of the secondary aggregator.
76    pub config: AggregatorServiceConfig,
77}
78
79/// Condition that needs to be met for a metric or bucket to be routed to a
80/// secondary aggregator.
81#[derive(Clone, Debug, Deserialize, Serialize)]
82#[serde(tag = "op", rename_all = "lowercase")]
83pub enum Condition {
84    /// Checks for equality on a specific field.
85    Eq(FieldCondition),
86    /// Matches if all conditions are true.
87    And {
88        /// Inner rules to combine.
89        inner: Vec<Condition>,
90    },
91    /// Matches if any condition is true.
92    Or {
93        /// Inner rules to combine.
94        inner: Vec<Condition>,
95    },
96    /// Inverts the condition.
97    Not {
98        /// Inner rule to negate.
99        inner: Box<Condition>,
100    },
101}
102
103impl Condition {
104    /// Checks if the condition matches the given namespace.
105    pub fn matches(&self, namespace: Option<MetricNamespace>) -> bool {
106        match self {
107            Condition::Eq(field) => field.matches(namespace),
108            Condition::And { inner } => inner.iter().all(|cond| cond.matches(namespace)),
109            Condition::Or { inner } => inner.iter().any(|cond| cond.matches(namespace)),
110            Condition::Not { inner } => !inner.matches(namespace),
111        }
112    }
113}
114
115/// Defines a field and a field value to compare to when a [`Condition`] is evaluated.
116#[derive(Clone, Debug, Deserialize, Serialize)]
117#[serde(tag = "field", content = "value", rename_all = "lowercase")]
118pub enum FieldCondition {
119    /// Field that allows comparison to a metric or bucket's namespace.
120    Namespace(MetricNamespace),
121}
122
123impl FieldCondition {
124    fn matches(&self, namespace: Option<MetricNamespace>) -> bool {
125        match (self, namespace) {
126            (FieldCondition::Namespace(expected), Some(actual)) => expected == &actual,
127            _ => false,
128        }
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use insta::assert_debug_snapshot;
135    use relay_metrics::MetricNamespace;
136    use serde_json::json;
137
138    use super::*;
139
140    #[test]
141    fn condition_roundtrip() {
142        let json = json!({"op": "eq", "field": "namespace", "value": "spans"});
143        assert_debug_snapshot!(
144            serde_json::from_value::<Condition>(json).unwrap(),
145            @r###"
146        Eq(
147            Namespace(
148                Spans,
149            ),
150        )
151        "###
152        );
153    }
154
155    #[test]
156    fn condition_multiple_namespaces() {
157        let json = json!({
158            "op": "or",
159            "inner": [
160                {"op": "eq", "field": "namespace", "value": "spans"},
161                {"op": "eq", "field": "namespace", "value": "custom"}
162            ]
163        });
164
165        let condition = serde_json::from_value::<Condition>(json).unwrap();
166        assert!(condition.matches(Some(MetricNamespace::Spans)));
167        assert!(condition.matches(Some(MetricNamespace::Custom)));
168        assert!(!condition.matches(Some(MetricNamespace::Transactions)));
169    }
170}