1use std::borrow::Cow;
2use std::collections::BTreeMap;
3
4use relay_common::time::UnixTimestamp;
5use relay_dynamic_config::{CombinedMetricExtractionConfig, TagMapping, TagSource, TagSpec};
6use relay_metrics::{Bucket, BucketMetadata, BucketValue, MetricResourceIdentifier, MetricType};
7use relay_protocol::{FiniteF64, Getter, Val};
8use relay_quotas::DataCategory;
9
10pub trait Extractable: Getter {
12 fn category(&self) -> DataCategory;
14
15 fn timestamp(&self) -> Option<UnixTimestamp>;
17}
18
19pub fn extract_metrics<T>(instance: &T, config: CombinedMetricExtractionConfig<'_>) -> Vec<Bucket>
28where
29 T: Extractable,
30{
31 let mut metrics = Vec::new();
32
33 let Some(timestamp) = instance.timestamp() else {
34 relay_log::error!("invalid event timestamp for metric extraction");
35 return metrics;
36 };
37
38 let received_at = if cfg!(not(test)) {
41 UnixTimestamp::now()
42 } else {
43 UnixTimestamp::from_secs(0)
44 };
45
46 for metric_spec in config.metrics() {
47 if metric_spec.category != instance.category() {
48 continue;
49 }
50
51 if let Some(condition) = &metric_spec.condition {
52 if !condition.matches(instance) {
53 continue;
54 }
55 }
56
57 let Ok(mri) = MetricResourceIdentifier::parse(&metric_spec.mri) else {
60 relay_log::error!(mri = metric_spec.mri, "invalid MRI for metric extraction");
61 continue;
62 };
63
64 let Some(value) = read_metric_value(instance, metric_spec.field.as_deref(), mri.ty) else {
65 continue;
66 };
67
68 metrics.push(Bucket {
69 name: mri.to_string().into(),
70 width: 0,
71 value,
72 timestamp,
73 tags: extract_tags(instance, &metric_spec.tags),
74 metadata: BucketMetadata::new(received_at),
75 });
76 }
77
78 tmp_apply_tags(&mut metrics, instance, config.tags());
80
81 metrics
82}
83
84pub fn tmp_apply_tags<'a, T>(
85 metrics: &mut [Bucket],
86 instance: &T,
87 mappings: impl IntoIterator<Item = &'a TagMapping>,
88) where
89 T: Getter,
90{
91 for mapping in mappings.into_iter() {
92 let mut lazy_tags = None;
93
94 for metric in &mut *metrics {
95 if mapping.matches(&metric.name) {
96 let tags = lazy_tags.get_or_insert_with(|| extract_tags(instance, &mapping.tags));
97
98 for (key, val) in tags {
99 if !metric.tags.contains_key(key) {
100 metric.tags.insert(key.clone(), val.clone());
101 }
102 }
103 }
104 }
105 }
106}
107
108fn extract_tags<T>(instance: &T, tags: &[TagSpec]) -> BTreeMap<String, String>
109where
110 T: Getter,
111{
112 let mut map = BTreeMap::new();
113
114 for tag_spec in tags {
115 if let Some(ref condition) = tag_spec.condition {
116 if !condition.matches(instance) {
117 continue;
118 }
119 }
120
121 let value_opt = match tag_spec.source() {
122 TagSource::Literal(value) => Some(value.to_owned()),
123 TagSource::Field(field) => match instance.get_value(field) {
124 Some(Val::String(s)) => Some(s.to_owned()),
125 Some(Val::Bool(true)) => Some("True".to_owned()),
126 Some(Val::Bool(false)) => Some("False".to_owned()),
127 _ => None,
128 },
129 TagSource::Unknown => None,
130 };
131
132 if let Some(value) = value_opt {
133 if !map.contains_key(&tag_spec.key) {
135 map.insert(tag_spec.key.clone(), value);
136 }
137 }
138 }
139
140 map
141}
142
143fn read_metric_value(
144 instance: &impl Getter,
145 field: Option<&str>,
146 ty: MetricType,
147) -> Option<BucketValue> {
148 let finite = |float: f64| match FiniteF64::new(float) {
149 Some(f) => Some(f),
150 None => {
151 relay_log::error!(
152 tags.field = field,
153 tags.metric_type = ?ty,
154 "non-finite float value in generic metric extraction"
155 );
156 None
157 }
158 };
159
160 Some(match ty {
161 MetricType::Counter => BucketValue::counter(match field {
162 Some(field) => finite(instance.get_value(field)?.as_f64()?)?,
163 None => 1.into(),
164 }),
165 MetricType::Distribution => {
166 BucketValue::distribution(finite(instance.get_value(field?)?.as_f64()?)?)
167 }
168 MetricType::Set => BucketValue::set_from_str(&match instance.get_value(field?)? {
169 Val::I64(num) => Cow::Owned(num.to_string()),
170 Val::U64(num) => Cow::Owned(num.to_string()),
171 Val::String(s) => Cow::Borrowed(s),
172 _ => return None,
173 }),
174 MetricType::Gauge => BucketValue::gauge(finite(instance.get_value(field?)?.as_f64()?)?),
175 })
176}
177
178#[cfg(test)]
179mod tests {
180 use relay_event_schema::protocol::Event;
181 use relay_protocol::FromValue;
182 use serde_json::json;
183
184 use super::*;
185
186 #[test]
187 fn extract_counter() {
188 let event_json = json!({
189 "type": "transaction",
190 "timestamp": 1597976302.0,
191 });
192 let event = Event::from_value(event_json.into());
193
194 let config_json = json!({
195 "version": 1,
196 "metrics": [
197 {
198 "category": "transaction",
199 "mri": "c:transactions/counter@none",
200 }
201 ]
202 });
203 let config = serde_json::from_value(config_json).unwrap();
204
205 let metrics = extract_metrics(
206 event.value().unwrap(),
207 CombinedMetricExtractionConfig::from(&config),
208 );
209 insta::assert_debug_snapshot!(metrics, @r###"
210 [
211 Bucket {
212 timestamp: UnixTimestamp(1597976302),
213 width: 0,
214 name: MetricName(
215 "c:transactions/counter@none",
216 ),
217 value: Counter(
218 1.0,
219 ),
220 tags: {},
221 metadata: BucketMetadata {
222 merges: 1,
223 received_at: Some(
224 UnixTimestamp(0),
225 ),
226 extracted_from_indexed: false,
227 },
228 },
229 ]
230 "###);
231 }
232
233 #[test]
234 fn extract_distribution() {
235 let event_json = json!({
236 "type": "transaction",
237 "start_timestamp": 1597976300.0,
238 "timestamp": 1597976302.0,
239 });
240 let event = Event::from_value(event_json.into());
241
242 let config_json = json!({
243 "version": 1,
244 "metrics": [
245 {
246 "category": "transaction",
247 "mri": "d:transactions/duration@none",
248 "field": "event.duration",
249 }
250 ]
251 });
252 let config = serde_json::from_value(config_json).unwrap();
253
254 let metrics = extract_metrics(
255 event.value().unwrap(),
256 CombinedMetricExtractionConfig::from(&config),
257 );
258 insta::assert_debug_snapshot!(metrics, @r###"
259 [
260 Bucket {
261 timestamp: UnixTimestamp(1597976302),
262 width: 0,
263 name: MetricName(
264 "d:transactions/duration@none",
265 ),
266 value: Distribution(
267 [
268 2000.0,
269 ],
270 ),
271 tags: {},
272 metadata: BucketMetadata {
273 merges: 1,
274 received_at: Some(
275 UnixTimestamp(0),
276 ),
277 extracted_from_indexed: false,
278 },
279 },
280 ]
281 "###);
282 }
283
284 #[test]
285 fn extract_set() {
286 let event_json = json!({
287 "type": "transaction",
288 "timestamp": 1597976302.0,
289 "user": {
290 "id": "4711",
291 },
292 });
293 let event = Event::from_value(event_json.into());
294
295 let config_json = json!({
296 "version": 1,
297 "metrics": [
298 {
299 "category": "transaction",
300 "mri": "s:transactions/users@none",
301 "field": "event.user.id",
302 }
303 ]
304 });
305 let config = serde_json::from_value(config_json).unwrap();
306
307 let metrics = extract_metrics(
308 event.value().unwrap(),
309 CombinedMetricExtractionConfig::from(&config),
310 );
311 insta::assert_debug_snapshot!(metrics, @r###"
312 [
313 Bucket {
314 timestamp: UnixTimestamp(1597976302),
315 width: 0,
316 name: MetricName(
317 "s:transactions/users@none",
318 ),
319 value: Set(
320 {
321 943162418,
322 },
323 ),
324 tags: {},
325 metadata: BucketMetadata {
326 merges: 1,
327 received_at: Some(
328 UnixTimestamp(0),
329 ),
330 extracted_from_indexed: false,
331 },
332 },
333 ]
334 "###);
335 }
336
337 #[test]
338 fn extract_set_numeric() {
339 let event_json = json!({
340 "type": "transaction",
341 "timestamp": 1597976302.0,
342 "user": {
343 "id": -4711,
344 },
345 });
346 let event = Event::from_value(event_json.into());
347
348 let config_json = json!({
349 "version": 1,
350 "metrics": [
351 {
352 "category": "transaction",
353 "mri": "s:transactions/users@none",
354 "field": "event.user.id",
355 }
356 ]
357 });
358 let config = serde_json::from_value(config_json).unwrap();
359
360 let metrics = extract_metrics(
361 event.value().unwrap(),
362 CombinedMetricExtractionConfig::from(&config),
363 );
364 insta::assert_debug_snapshot!(metrics, @r###"
365 [
366 Bucket {
367 timestamp: UnixTimestamp(1597976302),
368 width: 0,
369 name: MetricName(
370 "s:transactions/users@none",
371 ),
372 value: Set(
373 {
374 1893272827,
375 },
376 ),
377 tags: {},
378 metadata: BucketMetadata {
379 merges: 1,
380 received_at: Some(
381 UnixTimestamp(0),
382 ),
383 extracted_from_indexed: false,
384 },
385 },
386 ]
387 "###);
388 }
389
390 #[test]
391 fn extract_tag_conditions() {
392 let event_json = json!({
393 "type": "transaction",
394 "start_timestamp": 1597976300.0,
395 "timestamp": 1597976302.0,
396 "release": "myapp@1.0.0",
397 });
398 let event = Event::from_value(event_json.into());
399
400 let config_json = json!({
401 "version": 1,
402 "metrics": [
403 {
404 "category": "transaction",
405 "mri": "c:transactions/counter@none",
406 "tags": [
407 {"key": "id", "value": "4711"},
408 {"key": "release", "field": "event.release"},
409 {
410 "key": "fast",
411 "value": "yes",
412 "condition": {"op": "lt", "name": "event.duration", "value": 2000},
413 },
414 {
415 "key": "fast",
416 "value": "no",
417 "condition": {"op": "gte", "name": "event.duration", "value": 2000},
418 },
419 ]
420 }
421 ]
422 });
423 let config = serde_json::from_value(config_json).unwrap();
424
425 let metrics = extract_metrics(
426 event.value().unwrap(),
427 CombinedMetricExtractionConfig::from(&config),
428 );
429 insta::assert_debug_snapshot!(metrics, @r###"
430 [
431 Bucket {
432 timestamp: UnixTimestamp(1597976302),
433 width: 0,
434 name: MetricName(
435 "c:transactions/counter@none",
436 ),
437 value: Counter(
438 1.0,
439 ),
440 tags: {
441 "fast": "no",
442 "id": "4711",
443 "release": "myapp@1.0.0",
444 },
445 metadata: BucketMetadata {
446 merges: 1,
447 received_at: Some(
448 UnixTimestamp(0),
449 ),
450 extracted_from_indexed: false,
451 },
452 },
453 ]
454 "###);
455 }
456
457 #[test]
458 fn extract_tag_precedence() {
459 let event_json = json!({
460 "type": "transaction",
461 "start_timestamp": 1597976300.0,
462 "timestamp": 1597976302.0,
463 "release": "myapp@1.0.0",
464 });
465 let event = Event::from_value(event_json.into());
466
467 let config_json = json!({
470 "version": 1,
471 "metrics": [
472 {
473 "category": "transaction",
474 "mri": "c:transactions/counter@none",
475 "tags": [
476 {
477 "key": "fast",
478 "value": "yes",
479 "condition": {"op": "lte", "name": "event.duration", "value": 2000},
480 },
481 {
482 "key": "fast",
483 "value": "no",
484 },
485 ]
486 }
487 ]
488 });
489 let config = serde_json::from_value(config_json).unwrap();
490
491 let metrics = extract_metrics(
492 event.value().unwrap(),
493 CombinedMetricExtractionConfig::from(&config),
494 );
495 insta::assert_debug_snapshot!(metrics, @r###"
496 [
497 Bucket {
498 timestamp: UnixTimestamp(1597976302),
499 width: 0,
500 name: MetricName(
501 "c:transactions/counter@none",
502 ),
503 value: Counter(
504 1.0,
505 ),
506 tags: {
507 "fast": "yes",
508 },
509 metadata: BucketMetadata {
510 merges: 1,
511 received_at: Some(
512 UnixTimestamp(0),
513 ),
514 extracted_from_indexed: false,
515 },
516 },
517 ]
518 "###);
519 }
520
521 #[test]
522 fn extract_tag_precedence_multiple_rules() {
523 let event_json = json!({
524 "type": "transaction",
525 "start_timestamp": 1597976300.0,
526 "timestamp": 1597976302.0,
527 "release": "myapp@1.0.0",
528 });
529 let event = Event::from_value(event_json.into());
530
531 let config_json = json!({
534 "version": 1,
535 "metrics": [{
536 "category": "transaction",
537 "mri": "c:transactions/counter@none",
538 }],
539 "tags": [
540 {
541 "metrics": ["c:transactions/counter@none"],
542 "tags": [{
543 "key": "fast",
544 "value": "yes",
545 "condition": {"op": "lte", "name": "event.duration", "value": 2000},
546 }],
547 },
548 {
549 "metrics": ["c:transactions/counter@none"],
550 "tags": [{
551 "key": "fast",
552 "value": "no",
553 }]
554 },
555 ]
556 });
557 let config = serde_json::from_value(config_json).unwrap();
558
559 let metrics = extract_metrics(
560 event.value().unwrap(),
561 CombinedMetricExtractionConfig::from(&config),
562 );
563 insta::assert_debug_snapshot!(metrics, @r###"
564 [
565 Bucket {
566 timestamp: UnixTimestamp(1597976302),
567 width: 0,
568 name: MetricName(
569 "c:transactions/counter@none",
570 ),
571 value: Counter(
572 1.0,
573 ),
574 tags: {
575 "fast": "yes",
576 },
577 metadata: BucketMetadata {
578 merges: 1,
579 received_at: Some(
580 UnixTimestamp(0),
581 ),
582 extracted_from_indexed: false,
583 },
584 },
585 ]
586 "###);
587 }
588
589 #[test]
590 fn extract_tag_bool() {
591 let event_json = json!({
592 "type": "transaction",
593 "start_timestamp": 1597976300.0,
594 "timestamp": 1597976302.0,
595 "extra": {
596 "flag": true,
597 }
598 });
599 let event = Event::from_value(event_json.into());
600
601 let config_json = json!({
602 "version": 1,
603 "metrics": [
604 {
605 "category": "transaction",
606 "mri": "c:transactions/counter@none",
607 "tags": [
608 {"key": "flag", "field": "event.extra.flag"},
609 ]
610 }
611 ]
612 });
613 let config = serde_json::from_value(config_json).unwrap();
614
615 let metrics = extract_metrics(
616 event.value().unwrap(),
617 CombinedMetricExtractionConfig::from(&config),
618 );
619 insta::assert_debug_snapshot!(metrics, @r###"
620 [
621 Bucket {
622 timestamp: UnixTimestamp(1597976302),
623 width: 0,
624 name: MetricName(
625 "c:transactions/counter@none",
626 ),
627 value: Counter(
628 1.0,
629 ),
630 tags: {
631 "flag": "True",
632 },
633 metadata: BucketMetadata {
634 merges: 1,
635 received_at: Some(
636 UnixTimestamp(0),
637 ),
638 extracted_from_indexed: false,
639 },
640 },
641 ]
642 "###);
643 }
644}