relay_server/metrics_extraction/
generic.rs

1use std::borrow::Cow;
2use std::collections::BTreeMap;
3
4use relay_common::time::UnixTimestamp;
5use relay_dynamic_config::{CombinedMetricExtractionConfig, TagMapping, TagSource, TagSpec};
6use relay_metrics::{Bucket, BucketMetadata, BucketValue, MetricResourceIdentifier, MetricType};
7use relay_protocol::{FiniteF64, Getter, Val};
8use relay_quotas::DataCategory;
9
10/// Item from which metrics can be extracted.
11pub trait Extractable: Getter {
12    /// Data category for the metric spec to match on.
13    fn category(&self) -> DataCategory;
14
15    /// The timestamp to associate with the extracted metrics.
16    fn timestamp(&self) -> Option<UnixTimestamp>;
17}
18
19/// Extract metrics from any type that implements both [`Extractable`] and [`Getter`].
20///
21/// The instance must have a valid timestamp; if the timestamp is missing or invalid, no metrics are
22/// extracted. Timestamp and clock drift correction should occur before metrics extraction to ensure
23/// valid timestamps.
24///
25/// Any MRI can be defined multiple times in the config (this will create multiple buckets), but
26/// for every tag in a bucket, there can be only one value. The first encountered tag value wins.
27pub fn extract_metrics<T>(instance: &T, config: CombinedMetricExtractionConfig<'_>) -> Vec<Bucket>
28where
29    T: Extractable,
30{
31    let mut metrics = Vec::new();
32
33    let Some(timestamp) = instance.timestamp() else {
34        relay_log::error!("invalid event timestamp for metric extraction");
35        return metrics;
36    };
37
38    // For extracted metrics we assume the `received_at` timestamp is equivalent to the time
39    // in which the metric is extracted.
40    let received_at = if cfg!(not(test)) {
41        UnixTimestamp::now()
42    } else {
43        UnixTimestamp::from_secs(0)
44    };
45
46    for metric_spec in config.metrics() {
47        if metric_spec.category != instance.category() {
48            continue;
49        }
50
51        if let Some(condition) = &metric_spec.condition {
52            if !condition.matches(instance) {
53                continue;
54            }
55        }
56
57        // Parse the MRI so that we can obtain the type, but subsequently re-serialize it into the
58        // generated metric to ensure the MRI is normalized.
59        let Ok(mri) = MetricResourceIdentifier::parse(&metric_spec.mri) else {
60            relay_log::error!(mri = metric_spec.mri, "invalid MRI for metric extraction");
61            continue;
62        };
63
64        let Some(value) = read_metric_value(instance, metric_spec.field.as_deref(), mri.ty) else {
65            continue;
66        };
67
68        metrics.push(Bucket {
69            name: mri.to_string().into(),
70            width: 0,
71            value,
72            timestamp,
73            tags: extract_tags(instance, &metric_spec.tags),
74            metadata: BucketMetadata::new(received_at),
75        });
76    }
77
78    // TODO: Inline this again once transaction metric extraction has been moved to generic metrics.
79    tmp_apply_tags(&mut metrics, instance, config.tags());
80
81    metrics
82}
83
84pub fn tmp_apply_tags<'a, T>(
85    metrics: &mut [Bucket],
86    instance: &T,
87    mappings: impl IntoIterator<Item = &'a TagMapping>,
88) where
89    T: Getter,
90{
91    for mapping in mappings.into_iter() {
92        let mut lazy_tags = None;
93
94        for metric in &mut *metrics {
95            if mapping.matches(&metric.name) {
96                let tags = lazy_tags.get_or_insert_with(|| extract_tags(instance, &mapping.tags));
97
98                for (key, val) in tags {
99                    if !metric.tags.contains_key(key) {
100                        metric.tags.insert(key.clone(), val.clone());
101                    }
102                }
103            }
104        }
105    }
106}
107
108fn extract_tags<T>(instance: &T, tags: &[TagSpec]) -> BTreeMap<String, String>
109where
110    T: Getter,
111{
112    let mut map = BTreeMap::new();
113
114    for tag_spec in tags {
115        if let Some(ref condition) = tag_spec.condition {
116            if !condition.matches(instance) {
117                continue;
118            }
119        }
120
121        let value_opt = match tag_spec.source() {
122            TagSource::Literal(value) => Some(value.to_owned()),
123            TagSource::Field(field) => match instance.get_value(field) {
124                Some(Val::String(s)) => Some(s.to_owned()),
125                Some(Val::Bool(true)) => Some("True".to_owned()),
126                Some(Val::Bool(false)) => Some("False".to_owned()),
127                _ => None,
128            },
129            TagSource::Unknown => None,
130        };
131
132        if let Some(value) = value_opt {
133            // Explicitly do not override existing tags on a metric. First condition wins.
134            if !map.contains_key(&tag_spec.key) {
135                map.insert(tag_spec.key.clone(), value);
136            }
137        }
138    }
139
140    map
141}
142
143fn read_metric_value(
144    instance: &impl Getter,
145    field: Option<&str>,
146    ty: MetricType,
147) -> Option<BucketValue> {
148    let finite = |float: f64| match FiniteF64::new(float) {
149        Some(f) => Some(f),
150        None => {
151            relay_log::error!(
152                tags.field = field,
153                tags.metric_type = ?ty,
154                "non-finite float value in generic metric extraction"
155            );
156            None
157        }
158    };
159
160    Some(match ty {
161        MetricType::Counter => BucketValue::counter(match field {
162            Some(field) => finite(instance.get_value(field)?.as_f64()?)?,
163            None => 1.into(),
164        }),
165        MetricType::Distribution => {
166            BucketValue::distribution(finite(instance.get_value(field?)?.as_f64()?)?)
167        }
168        MetricType::Set => BucketValue::set_from_str(&match instance.get_value(field?)? {
169            Val::I64(num) => Cow::Owned(num.to_string()),
170            Val::U64(num) => Cow::Owned(num.to_string()),
171            Val::String(s) => Cow::Borrowed(s),
172            _ => return None,
173        }),
174        MetricType::Gauge => BucketValue::gauge(finite(instance.get_value(field?)?.as_f64()?)?),
175    })
176}
177
178#[cfg(test)]
179mod tests {
180    use relay_event_schema::protocol::Event;
181    use relay_protocol::FromValue;
182    use serde_json::json;
183
184    use super::*;
185
186    #[test]
187    fn extract_counter() {
188        let event_json = json!({
189            "type": "transaction",
190            "timestamp": 1597976302.0,
191        });
192        let event = Event::from_value(event_json.into());
193
194        let config_json = json!({
195            "version": 1,
196            "metrics": [
197                {
198                    "category": "transaction",
199                    "mri": "c:transactions/counter@none",
200                }
201            ]
202        });
203        let config = serde_json::from_value(config_json).unwrap();
204
205        let metrics = extract_metrics(
206            event.value().unwrap(),
207            CombinedMetricExtractionConfig::from(&config),
208        );
209        insta::assert_debug_snapshot!(metrics, @r###"
210        [
211            Bucket {
212                timestamp: UnixTimestamp(1597976302),
213                width: 0,
214                name: MetricName(
215                    "c:transactions/counter@none",
216                ),
217                value: Counter(
218                    1.0,
219                ),
220                tags: {},
221                metadata: BucketMetadata {
222                    merges: 1,
223                    received_at: Some(
224                        UnixTimestamp(0),
225                    ),
226                    extracted_from_indexed: false,
227                },
228            },
229        ]
230        "###);
231    }
232
233    #[test]
234    fn extract_distribution() {
235        let event_json = json!({
236            "type": "transaction",
237            "start_timestamp": 1597976300.0,
238            "timestamp": 1597976302.0,
239        });
240        let event = Event::from_value(event_json.into());
241
242        let config_json = json!({
243            "version": 1,
244            "metrics": [
245                {
246                    "category": "transaction",
247                    "mri": "d:transactions/duration@none",
248                    "field": "event.duration",
249                }
250            ]
251        });
252        let config = serde_json::from_value(config_json).unwrap();
253
254        let metrics = extract_metrics(
255            event.value().unwrap(),
256            CombinedMetricExtractionConfig::from(&config),
257        );
258        insta::assert_debug_snapshot!(metrics, @r###"
259        [
260            Bucket {
261                timestamp: UnixTimestamp(1597976302),
262                width: 0,
263                name: MetricName(
264                    "d:transactions/duration@none",
265                ),
266                value: Distribution(
267                    [
268                        2000.0,
269                    ],
270                ),
271                tags: {},
272                metadata: BucketMetadata {
273                    merges: 1,
274                    received_at: Some(
275                        UnixTimestamp(0),
276                    ),
277                    extracted_from_indexed: false,
278                },
279            },
280        ]
281        "###);
282    }
283
284    #[test]
285    fn extract_set() {
286        let event_json = json!({
287            "type": "transaction",
288            "timestamp": 1597976302.0,
289            "user": {
290                "id": "4711",
291            },
292        });
293        let event = Event::from_value(event_json.into());
294
295        let config_json = json!({
296            "version": 1,
297            "metrics": [
298                {
299                    "category": "transaction",
300                    "mri": "s:transactions/users@none",
301                    "field": "event.user.id",
302                }
303            ]
304        });
305        let config = serde_json::from_value(config_json).unwrap();
306
307        let metrics = extract_metrics(
308            event.value().unwrap(),
309            CombinedMetricExtractionConfig::from(&config),
310        );
311        insta::assert_debug_snapshot!(metrics, @r###"
312        [
313            Bucket {
314                timestamp: UnixTimestamp(1597976302),
315                width: 0,
316                name: MetricName(
317                    "s:transactions/users@none",
318                ),
319                value: Set(
320                    {
321                        943162418,
322                    },
323                ),
324                tags: {},
325                metadata: BucketMetadata {
326                    merges: 1,
327                    received_at: Some(
328                        UnixTimestamp(0),
329                    ),
330                    extracted_from_indexed: false,
331                },
332            },
333        ]
334        "###);
335    }
336
337    #[test]
338    fn extract_set_numeric() {
339        let event_json = json!({
340            "type": "transaction",
341            "timestamp": 1597976302.0,
342            "user": {
343                "id": -4711,
344            },
345        });
346        let event = Event::from_value(event_json.into());
347
348        let config_json = json!({
349            "version": 1,
350            "metrics": [
351                {
352                    "category": "transaction",
353                    "mri": "s:transactions/users@none",
354                    "field": "event.user.id",
355                }
356            ]
357        });
358        let config = serde_json::from_value(config_json).unwrap();
359
360        let metrics = extract_metrics(
361            event.value().unwrap(),
362            CombinedMetricExtractionConfig::from(&config),
363        );
364        insta::assert_debug_snapshot!(metrics, @r###"
365        [
366            Bucket {
367                timestamp: UnixTimestamp(1597976302),
368                width: 0,
369                name: MetricName(
370                    "s:transactions/users@none",
371                ),
372                value: Set(
373                    {
374                        1893272827,
375                    },
376                ),
377                tags: {},
378                metadata: BucketMetadata {
379                    merges: 1,
380                    received_at: Some(
381                        UnixTimestamp(0),
382                    ),
383                    extracted_from_indexed: false,
384                },
385            },
386        ]
387        "###);
388    }
389
390    #[test]
391    fn extract_tag_conditions() {
392        let event_json = json!({
393            "type": "transaction",
394            "start_timestamp": 1597976300.0,
395            "timestamp": 1597976302.0,
396            "release": "myapp@1.0.0",
397        });
398        let event = Event::from_value(event_json.into());
399
400        let config_json = json!({
401            "version": 1,
402            "metrics": [
403                {
404                    "category": "transaction",
405                    "mri": "c:transactions/counter@none",
406                    "tags": [
407                        {"key": "id", "value": "4711"},
408                        {"key": "release", "field": "event.release"},
409                        {
410                            "key": "fast",
411                            "value": "yes",
412                            "condition": {"op": "lt", "name": "event.duration", "value": 2000},
413                        },
414                        {
415                            "key": "fast",
416                            "value": "no",
417                            "condition": {"op": "gte", "name": "event.duration", "value": 2000},
418                        },
419                    ]
420                }
421            ]
422        });
423        let config = serde_json::from_value(config_json).unwrap();
424
425        let metrics = extract_metrics(
426            event.value().unwrap(),
427            CombinedMetricExtractionConfig::from(&config),
428        );
429        insta::assert_debug_snapshot!(metrics, @r###"
430        [
431            Bucket {
432                timestamp: UnixTimestamp(1597976302),
433                width: 0,
434                name: MetricName(
435                    "c:transactions/counter@none",
436                ),
437                value: Counter(
438                    1.0,
439                ),
440                tags: {
441                    "fast": "no",
442                    "id": "4711",
443                    "release": "myapp@1.0.0",
444                },
445                metadata: BucketMetadata {
446                    merges: 1,
447                    received_at: Some(
448                        UnixTimestamp(0),
449                    ),
450                    extracted_from_indexed: false,
451                },
452            },
453        ]
454        "###);
455    }
456
457    #[test]
458    fn extract_tag_precedence() {
459        let event_json = json!({
460            "type": "transaction",
461            "start_timestamp": 1597976300.0,
462            "timestamp": 1597976302.0,
463            "release": "myapp@1.0.0",
464        });
465        let event = Event::from_value(event_json.into());
466
467        // NOTE: The first condition should match and therefore the second tag should be skipped.
468
469        let config_json = json!({
470            "version": 1,
471            "metrics": [
472                {
473                    "category": "transaction",
474                    "mri": "c:transactions/counter@none",
475                    "tags": [
476                        {
477                            "key": "fast",
478                            "value": "yes",
479                            "condition": {"op": "lte", "name": "event.duration", "value": 2000},
480                        },
481                        {
482                            "key": "fast",
483                            "value": "no",
484                        },
485                    ]
486                }
487            ]
488        });
489        let config = serde_json::from_value(config_json).unwrap();
490
491        let metrics = extract_metrics(
492            event.value().unwrap(),
493            CombinedMetricExtractionConfig::from(&config),
494        );
495        insta::assert_debug_snapshot!(metrics, @r###"
496        [
497            Bucket {
498                timestamp: UnixTimestamp(1597976302),
499                width: 0,
500                name: MetricName(
501                    "c:transactions/counter@none",
502                ),
503                value: Counter(
504                    1.0,
505                ),
506                tags: {
507                    "fast": "yes",
508                },
509                metadata: BucketMetadata {
510                    merges: 1,
511                    received_at: Some(
512                        UnixTimestamp(0),
513                    ),
514                    extracted_from_indexed: false,
515                },
516            },
517        ]
518        "###);
519    }
520
521    #[test]
522    fn extract_tag_precedence_multiple_rules() {
523        let event_json = json!({
524            "type": "transaction",
525            "start_timestamp": 1597976300.0,
526            "timestamp": 1597976302.0,
527            "release": "myapp@1.0.0",
528        });
529        let event = Event::from_value(event_json.into());
530
531        // NOTE: The first tagging condition should match and the second one should be skipped.
532
533        let config_json = json!({
534            "version": 1,
535            "metrics": [{
536                "category": "transaction",
537                "mri": "c:transactions/counter@none",
538            }],
539            "tags": [
540                {
541                    "metrics": ["c:transactions/counter@none"],
542                    "tags": [{
543                        "key": "fast",
544                        "value": "yes",
545                        "condition": {"op": "lte", "name": "event.duration", "value": 2000},
546                    }],
547                },
548                {
549                    "metrics": ["c:transactions/counter@none"],
550                    "tags": [{
551                        "key": "fast",
552                        "value": "no",
553                    }]
554                },
555            ]
556        });
557        let config = serde_json::from_value(config_json).unwrap();
558
559        let metrics = extract_metrics(
560            event.value().unwrap(),
561            CombinedMetricExtractionConfig::from(&config),
562        );
563        insta::assert_debug_snapshot!(metrics, @r###"
564        [
565            Bucket {
566                timestamp: UnixTimestamp(1597976302),
567                width: 0,
568                name: MetricName(
569                    "c:transactions/counter@none",
570                ),
571                value: Counter(
572                    1.0,
573                ),
574                tags: {
575                    "fast": "yes",
576                },
577                metadata: BucketMetadata {
578                    merges: 1,
579                    received_at: Some(
580                        UnixTimestamp(0),
581                    ),
582                    extracted_from_indexed: false,
583                },
584            },
585        ]
586        "###);
587    }
588
589    #[test]
590    fn extract_tag_bool() {
591        let event_json = json!({
592            "type": "transaction",
593            "start_timestamp": 1597976300.0,
594            "timestamp": 1597976302.0,
595            "extra": {
596                "flag": true,
597            }
598        });
599        let event = Event::from_value(event_json.into());
600
601        let config_json = json!({
602            "version": 1,
603            "metrics": [
604                {
605                    "category": "transaction",
606                    "mri": "c:transactions/counter@none",
607                    "tags": [
608                        {"key": "flag", "field": "event.extra.flag"},
609                    ]
610                }
611            ]
612        });
613        let config = serde_json::from_value(config_json).unwrap();
614
615        let metrics = extract_metrics(
616            event.value().unwrap(),
617            CombinedMetricExtractionConfig::from(&config),
618        );
619        insta::assert_debug_snapshot!(metrics, @r###"
620        [
621            Bucket {
622                timestamp: UnixTimestamp(1597976302),
623                width: 0,
624                name: MetricName(
625                    "c:transactions/counter@none",
626                ),
627                value: Counter(
628                    1.0,
629                ),
630                tags: {
631                    "flag": "True",
632                },
633                metadata: BucketMetadata {
634                    merges: 1,
635                    received_at: Some(
636                        UnixTimestamp(0),
637                    ),
638                    extracted_from_indexed: false,
639                },
640            },
641        ]
642        "###);
643    }
644}