relay_server/metrics/
metric_stats.rs

1use std::collections::BTreeMap;
2use std::sync::{Arc, OnceLock};
3
4use relay_base_schema::organization::OrganizationId;
5#[cfg(feature = "processing")]
6use relay_cardinality::{CardinalityLimit, CardinalityReport};
7use relay_config::Config;
8#[cfg(feature = "processing")]
9use relay_metrics::GaugeValue;
10use relay_metrics::{Bucket, BucketValue, MetricName, UnixTimestamp};
11use relay_quotas::Scoping;
12use relay_system::Addr;
13
14use crate::metrics::TrackableBucket;
15use crate::services::global_config::GlobalConfigHandle;
16use crate::services::metrics::{Aggregator, MergeBuckets};
17use crate::services::outcome::Outcome;
18use crate::utils::is_rolled_out;
19
20fn volume_metric_mri() -> MetricName {
21    static VOLUME_METRIC_MRI: OnceLock<MetricName> = OnceLock::new();
22
23    VOLUME_METRIC_MRI
24        .get_or_init(|| "c:metric_stats/volume@none".into())
25        .clone()
26}
27
28#[cfg(feature = "processing")]
29fn cardinality_metric_mri() -> MetricName {
30    static CARDINALITY_METRIC_MRI: OnceLock<MetricName> = OnceLock::new();
31
32    CARDINALITY_METRIC_MRI
33        .get_or_init(|| "g:metric_stats/cardinality@none".into())
34        .clone()
35}
36
37/// Tracks stats about metrics.
38///
39/// Metric stats are similar to outcomes for envelopes, they record
40/// the final "fate" of a metric and its properties.
41///
42/// Unlike outcomes metric stats are tracked on a per MRI basis
43/// and contain additional metadata, like the cardinality of a metric.
44#[derive(Clone, Debug)]
45pub struct MetricStats {
46    config: Arc<Config>,
47    global_config: GlobalConfigHandle,
48    aggregator: Addr<Aggregator>,
49}
50
51impl MetricStats {
52    /// Creates a new [`MetricStats`] instance.
53    pub fn new(
54        config: Arc<Config>,
55        global_config: GlobalConfigHandle,
56        aggregator: Addr<Aggregator>,
57    ) -> Self {
58        Self {
59            config,
60            global_config,
61            aggregator,
62        }
63    }
64
65    /// Tracks the metric volume and outcome for the bucket.
66    pub fn track_metric(&self, scoping: Scoping, bucket: impl TrackableBucket, outcome: &Outcome) {
67        if !self.is_enabled(scoping) {
68            return;
69        }
70
71        let Some(volume) = self.to_volume_metric(&bucket, outcome) else {
72            return;
73        };
74
75        relay_log::trace!(
76            "Tracking volume of {} for mri '{}': {}",
77            bucket.metadata().merges,
78            bucket.name(),
79            outcome
80        );
81        self.aggregator
82            .send(MergeBuckets::new(scoping.project_key, vec![volume]));
83    }
84
85    /// Tracks the cardinality of a metric.
86    #[cfg(feature = "processing")]
87    pub fn track_cardinality(
88        &self,
89        scoping: Scoping,
90        limit: &CardinalityLimit,
91        report: &CardinalityReport,
92    ) {
93        if !self.is_enabled(scoping) {
94            return;
95        }
96
97        let Some(cardinality) = self.to_cardinality_metric(limit, report) else {
98            return;
99        };
100
101        relay_log::trace!(
102            "Tracking cardinality '{}' for mri '{}': {}",
103            limit.id,
104            report.metric_name.as_deref().unwrap_or("-"),
105            report.cardinality,
106        );
107        self.aggregator
108            .send(MergeBuckets::new(scoping.project_key, vec![cardinality]));
109    }
110
111    fn is_enabled(&self, scoping: Scoping) -> bool {
112        self.config.metric_stats_enabled() && self.is_rolled_out(scoping.organization_id)
113    }
114
115    fn is_rolled_out(&self, organization_id: OrganizationId) -> bool {
116        let rate = self
117            .global_config
118            .current()
119            .options
120            .metric_stats_rollout_rate;
121
122        is_rolled_out(organization_id.value(), rate).is_keep()
123    }
124
125    fn to_volume_metric(&self, bucket: impl TrackableBucket, outcome: &Outcome) -> Option<Bucket> {
126        let volume = bucket.metadata().merges;
127        if volume == 0 {
128            return None;
129        }
130
131        let namespace = bucket.name().namespace();
132        if !namespace.has_metric_stats() {
133            return None;
134        }
135
136        let mut tags = BTreeMap::from([
137            ("mri".to_owned(), bucket.name().to_string()),
138            ("mri.type".to_owned(), bucket.ty().to_string()),
139            ("mri.namespace".to_owned(), namespace.to_string()),
140            (
141                "outcome.id".to_owned(),
142                outcome.to_outcome_id().as_u8().to_string(),
143            ),
144        ]);
145
146        if let Some(reason) = outcome.to_reason() {
147            tags.insert("outcome.reason".to_owned(), reason.into_owned());
148        }
149
150        Some(Bucket {
151            timestamp: UnixTimestamp::now(),
152            width: 0,
153            name: volume_metric_mri(),
154            value: BucketValue::Counter(volume.into()),
155            tags,
156            metadata: Default::default(),
157        })
158    }
159
160    #[cfg(feature = "processing")]
161    fn to_cardinality_metric(
162        &self,
163        limit: &CardinalityLimit,
164        report: &CardinalityReport,
165    ) -> Option<Bucket> {
166        let cardinality = report.cardinality;
167        if cardinality == 0 {
168            return None;
169        }
170
171        let mut tags = BTreeMap::from([
172            ("cardinality.limit".to_owned(), limit.id.clone()),
173            (
174                "cardinality.scope".to_owned(),
175                limit.scope.as_str().to_owned(),
176            ),
177            (
178                "cardinality.window".to_owned(),
179                limit.window.window_seconds.to_string(),
180            ),
181        ]);
182
183        if let Some(ref name) = report.metric_name {
184            tags.insert("mri".to_owned(), name.to_string());
185            tags.insert("mri.namespace".to_owned(), name.namespace().to_string());
186            if let Some(t) = name.try_type() {
187                tags.insert("mri.type".to_owned(), t.to_string());
188            }
189        } else {
190            if let Some(namespace) = limit.namespace {
191                tags.insert("mri.namespace".to_owned(), namespace.to_string());
192            }
193            if let Some(t) = report.metric_type {
194                tags.insert("mri.type".to_owned(), t.to_string());
195            }
196        }
197
198        Some(Bucket {
199            timestamp: report.timestamp,
200            width: 0,
201            name: cardinality_metric_mri(),
202            value: BucketValue::Gauge(GaugeValue::single(cardinality.into())),
203            tags,
204            metadata: Default::default(),
205        })
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use relay_base_schema::project::{ProjectId, ProjectKey};
212    #[cfg(feature = "processing")]
213    use relay_cardinality::{CardinalityScope, SlidingWindow};
214    use relay_dynamic_config::GlobalConfig;
215    #[cfg(feature = "processing")]
216    use relay_metrics::{MetricNamespace, MetricType};
217    use relay_quotas::ReasonCode;
218    use tokio::sync::mpsc::UnboundedReceiver;
219
220    use super::*;
221
222    impl MetricStats {
223        pub fn test() -> (Self, UnboundedReceiver<Aggregator>) {
224            create_metric_stats(1.0)
225        }
226    }
227
228    fn create_metric_stats(rollout_rate: f32) -> (MetricStats, UnboundedReceiver<Aggregator>) {
229        let config = Config::from_json_value(serde_json::json!({
230            "processing": {
231                "enabled": true,
232                "kafka_config": [],
233            }
234        }))
235        .unwrap();
236
237        let mut global_config = GlobalConfig::default();
238        global_config.options.metric_stats_rollout_rate = rollout_rate;
239        let global_config = GlobalConfigHandle::fixed(global_config);
240
241        let (addr, receiver) = Addr::custom();
242        let ms = MetricStats::new(Arc::new(config), global_config, addr);
243
244        (ms, receiver)
245    }
246
247    fn scoping() -> Scoping {
248        Scoping {
249            organization_id: OrganizationId::new(42),
250            project_id: ProjectId::new(21),
251            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
252            key_id: Some(17),
253        }
254    }
255
256    macro_rules! tags {
257        ($(($key:expr, $value:expr),)*) => {
258            BTreeMap::from([
259                $(($key.to_owned(), $value.to_owned())),*
260            ])
261        }
262    }
263
264    #[test]
265    fn test_metric_stats_volume() {
266        let (ms, mut receiver) = create_metric_stats(1.0);
267
268        let scoping = scoping();
269        let mut bucket = Bucket::parse(b"rt@millisecond:57|d", UnixTimestamp::now()).unwrap();
270
271        ms.track_metric(scoping, &bucket, &Outcome::Accepted);
272
273        bucket.metadata.merges = bucket.metadata.merges.saturating_add(41);
274        ms.track_metric(
275            scoping,
276            &bucket,
277            &Outcome::RateLimited(Some(ReasonCode::new("foobar"))),
278        );
279
280        drop(ms);
281
282        let Aggregator::MergeBuckets(mut mb) = receiver.blocking_recv().unwrap();
283        assert_eq!(mb.project_key, scoping.project_key);
284
285        assert_eq!(mb.buckets.len(), 1);
286        let bucket = mb.buckets.pop().unwrap();
287
288        assert_eq!(&*bucket.name, "c:metric_stats/volume@none");
289        assert_eq!(bucket.value, BucketValue::Counter(1.into()));
290        assert_eq!(
291            bucket.tags,
292            tags!(
293                ("mri", "d:custom/rt@millisecond"),
294                ("mri.type", "d"),
295                ("mri.namespace", "custom"),
296                ("outcome.id", "0"),
297            )
298        );
299
300        let Aggregator::MergeBuckets(mut mb) = receiver.blocking_recv().unwrap();
301        assert_eq!(mb.project_key, scoping.project_key);
302
303        assert_eq!(mb.buckets.len(), 1);
304        let bucket = mb.buckets.pop().unwrap();
305
306        assert_eq!(&*bucket.name, "c:metric_stats/volume@none");
307        assert_eq!(bucket.value, BucketValue::Counter(42.into()));
308        assert_eq!(
309            bucket.tags,
310            tags!(
311                ("mri", "d:custom/rt@millisecond"),
312                ("mri.type", "d"),
313                ("mri.namespace", "custom"),
314                ("outcome.id", "2"),
315                ("outcome.reason", "foobar"),
316            )
317        );
318
319        assert!(receiver.blocking_recv().is_none());
320    }
321
322    #[test]
323    #[cfg(feature = "processing")]
324    fn test_metric_stats_cardinality_name() {
325        let (ms, mut receiver) = create_metric_stats(1.0);
326
327        let scoping = scoping();
328        let limit = CardinalityLimit {
329            id: "test".to_owned(),
330            passive: false,
331            report: true,
332            window: SlidingWindow {
333                window_seconds: 246,
334                granularity_seconds: 123,
335            },
336            limit: 99,
337            scope: CardinalityScope::Name,
338            namespace: None,
339        };
340        let report = CardinalityReport {
341            timestamp: UnixTimestamp::from_secs(3333),
342            organization_id: Some(scoping.organization_id),
343            project_id: Some(scoping.project_id),
344            metric_type: None,
345            metric_name: Some(MetricName::from("d:custom/rt@millisecond")),
346            cardinality: 12,
347        };
348
349        ms.track_cardinality(scoping, &limit, &report);
350
351        drop(ms);
352
353        let Aggregator::MergeBuckets(mut mb) = receiver.blocking_recv().unwrap();
354        assert_eq!(mb.project_key, scoping.project_key);
355
356        assert_eq!(mb.buckets.len(), 1);
357        let bucket = mb.buckets.pop().unwrap();
358
359        assert_eq!(&*bucket.name, "g:metric_stats/cardinality@none");
360        assert_eq!(bucket.timestamp, UnixTimestamp::from_secs(3333));
361        assert_eq!(
362            bucket.value,
363            BucketValue::Gauge(GaugeValue {
364                last: 12.into(),
365                min: 12.into(),
366                max: 12.into(),
367                sum: 12.into(),
368                count: 1,
369            })
370        );
371        assert_eq!(
372            bucket.tags,
373            tags!(
374                ("mri", "d:custom/rt@millisecond"),
375                ("mri.type", "d"),
376                ("mri.namespace", "custom"),
377                ("cardinality.limit", "test"),
378                ("cardinality.scope", "name"),
379                ("cardinality.window", "246"),
380            )
381        );
382
383        assert!(receiver.blocking_recv().is_none());
384    }
385
386    #[test]
387    #[cfg(feature = "processing")]
388    fn test_metric_stats_cardinality_type() {
389        let (ms, mut receiver) = create_metric_stats(1.0);
390
391        let scoping = scoping();
392        let limit = CardinalityLimit {
393            id: "test".to_owned(),
394            passive: false,
395            report: true,
396            window: SlidingWindow {
397                window_seconds: 246,
398                granularity_seconds: 123,
399            },
400            limit: 99,
401            scope: CardinalityScope::Type,
402            namespace: Some(MetricNamespace::Spans),
403        };
404        let report = CardinalityReport {
405            timestamp: UnixTimestamp::from_secs(2222),
406            organization_id: Some(scoping.organization_id),
407            project_id: Some(scoping.project_id),
408            metric_type: Some(MetricType::Distribution),
409            metric_name: None,
410            cardinality: 12,
411        };
412
413        ms.track_cardinality(scoping, &limit, &report);
414
415        drop(ms);
416
417        let Aggregator::MergeBuckets(mut mb) = receiver.blocking_recv().unwrap();
418        assert_eq!(mb.project_key, scoping.project_key);
419
420        assert_eq!(mb.buckets.len(), 1);
421        let bucket = mb.buckets.pop().unwrap();
422
423        assert_eq!(&*bucket.name, "g:metric_stats/cardinality@none");
424        assert_eq!(bucket.timestamp, UnixTimestamp::from_secs(2222));
425        assert_eq!(
426            bucket.value,
427            BucketValue::Gauge(GaugeValue {
428                last: 12.into(),
429                min: 12.into(),
430                max: 12.into(),
431                sum: 12.into(),
432                count: 1,
433            })
434        );
435        assert_eq!(
436            bucket.tags,
437            tags!(
438                ("mri.type", "d"),
439                ("mri.namespace", "spans"),
440                ("cardinality.limit", "test"),
441                ("cardinality.scope", "type"),
442                ("cardinality.window", "246"),
443            )
444        );
445
446        assert!(receiver.blocking_recv().is_none());
447    }
448
449    #[test]
450    fn test_metric_stats_rollout_rate_disabled() {
451        let (ms, mut receiver) = create_metric_stats(0.0);
452
453        let scoping = scoping();
454        let bucket = Bucket::parse(b"rt@millisecond:57|d", UnixTimestamp::now()).unwrap();
455        ms.track_metric(scoping, &bucket, &Outcome::Accepted);
456
457        drop(ms);
458
459        assert!(receiver.blocking_recv().is_none());
460    }
461
462    #[test]
463    fn test_metric_stats_disabled_namespace() {
464        let (ms, mut receiver) = create_metric_stats(1.0);
465
466        let scoping = scoping();
467        let bucket =
468            Bucket::parse(b"transactions/rt@millisecond:57|d", UnixTimestamp::now()).unwrap();
469        ms.track_metric(scoping, &bucket, &Outcome::Accepted);
470
471        drop(ms);
472
473        assert!(receiver.blocking_recv().is_none());
474    }
475}