1use std::collections::BTreeMap;
2use std::sync::{Arc, OnceLock};
3
4use relay_base_schema::organization::OrganizationId;
5#[cfg(feature = "processing")]
6use relay_cardinality::{CardinalityLimit, CardinalityReport};
7use relay_config::Config;
8#[cfg(feature = "processing")]
9use relay_metrics::GaugeValue;
10use relay_metrics::{Bucket, BucketValue, MetricName, UnixTimestamp};
11use relay_quotas::Scoping;
12use relay_system::Addr;
13
14use crate::metrics::TrackableBucket;
15use crate::services::global_config::GlobalConfigHandle;
16use crate::services::metrics::{Aggregator, MergeBuckets};
17use crate::services::outcome::Outcome;
18use crate::utils::is_rolled_out;
19
20fn volume_metric_mri() -> MetricName {
21 static VOLUME_METRIC_MRI: OnceLock<MetricName> = OnceLock::new();
22
23 VOLUME_METRIC_MRI
24 .get_or_init(|| "c:metric_stats/volume@none".into())
25 .clone()
26}
27
28#[cfg(feature = "processing")]
29fn cardinality_metric_mri() -> MetricName {
30 static CARDINALITY_METRIC_MRI: OnceLock<MetricName> = OnceLock::new();
31
32 CARDINALITY_METRIC_MRI
33 .get_or_init(|| "g:metric_stats/cardinality@none".into())
34 .clone()
35}
36
37#[derive(Clone, Debug)]
45pub struct MetricStats {
46 config: Arc<Config>,
47 global_config: GlobalConfigHandle,
48 aggregator: Addr<Aggregator>,
49}
50
51impl MetricStats {
52 pub fn new(
54 config: Arc<Config>,
55 global_config: GlobalConfigHandle,
56 aggregator: Addr<Aggregator>,
57 ) -> Self {
58 Self {
59 config,
60 global_config,
61 aggregator,
62 }
63 }
64
65 pub fn track_metric(&self, scoping: Scoping, bucket: impl TrackableBucket, outcome: &Outcome) {
67 if !self.is_enabled(scoping) {
68 return;
69 }
70
71 let Some(volume) = self.to_volume_metric(&bucket, outcome) else {
72 return;
73 };
74
75 relay_log::trace!(
76 "Tracking volume of {} for mri '{}': {}",
77 bucket.metadata().merges,
78 bucket.name(),
79 outcome
80 );
81 self.aggregator
82 .send(MergeBuckets::new(scoping.project_key, vec![volume]));
83 }
84
85 #[cfg(feature = "processing")]
87 pub fn track_cardinality(
88 &self,
89 scoping: Scoping,
90 limit: &CardinalityLimit,
91 report: &CardinalityReport,
92 ) {
93 if !self.is_enabled(scoping) {
94 return;
95 }
96
97 let Some(cardinality) = self.to_cardinality_metric(limit, report) else {
98 return;
99 };
100
101 relay_log::trace!(
102 "Tracking cardinality '{}' for mri '{}': {}",
103 limit.id,
104 report.metric_name.as_deref().unwrap_or("-"),
105 report.cardinality,
106 );
107 self.aggregator
108 .send(MergeBuckets::new(scoping.project_key, vec![cardinality]));
109 }
110
111 fn is_enabled(&self, scoping: Scoping) -> bool {
112 self.config.metric_stats_enabled() && self.is_rolled_out(scoping.organization_id)
113 }
114
115 fn is_rolled_out(&self, organization_id: OrganizationId) -> bool {
116 let rate = self
117 .global_config
118 .current()
119 .options
120 .metric_stats_rollout_rate;
121
122 is_rolled_out(organization_id.value(), rate).is_keep()
123 }
124
125 fn to_volume_metric(&self, bucket: impl TrackableBucket, outcome: &Outcome) -> Option<Bucket> {
126 let volume = bucket.metadata().merges;
127 if volume == 0 {
128 return None;
129 }
130
131 let namespace = bucket.name().namespace();
132 if !namespace.has_metric_stats() {
133 return None;
134 }
135
136 let mut tags = BTreeMap::from([
137 ("mri".to_owned(), bucket.name().to_string()),
138 ("mri.type".to_owned(), bucket.ty().to_string()),
139 ("mri.namespace".to_owned(), namespace.to_string()),
140 (
141 "outcome.id".to_owned(),
142 outcome.to_outcome_id().as_u8().to_string(),
143 ),
144 ]);
145
146 if let Some(reason) = outcome.to_reason() {
147 tags.insert("outcome.reason".to_owned(), reason.into_owned());
148 }
149
150 Some(Bucket {
151 timestamp: UnixTimestamp::now(),
152 width: 0,
153 name: volume_metric_mri(),
154 value: BucketValue::Counter(volume.into()),
155 tags,
156 metadata: Default::default(),
157 })
158 }
159
160 #[cfg(feature = "processing")]
161 fn to_cardinality_metric(
162 &self,
163 limit: &CardinalityLimit,
164 report: &CardinalityReport,
165 ) -> Option<Bucket> {
166 let cardinality = report.cardinality;
167 if cardinality == 0 {
168 return None;
169 }
170
171 let mut tags = BTreeMap::from([
172 ("cardinality.limit".to_owned(), limit.id.clone()),
173 (
174 "cardinality.scope".to_owned(),
175 limit.scope.as_str().to_owned(),
176 ),
177 (
178 "cardinality.window".to_owned(),
179 limit.window.window_seconds.to_string(),
180 ),
181 ]);
182
183 if let Some(ref name) = report.metric_name {
184 tags.insert("mri".to_owned(), name.to_string());
185 tags.insert("mri.namespace".to_owned(), name.namespace().to_string());
186 if let Some(t) = name.try_type() {
187 tags.insert("mri.type".to_owned(), t.to_string());
188 }
189 } else {
190 if let Some(namespace) = limit.namespace {
191 tags.insert("mri.namespace".to_owned(), namespace.to_string());
192 }
193 if let Some(t) = report.metric_type {
194 tags.insert("mri.type".to_owned(), t.to_string());
195 }
196 }
197
198 Some(Bucket {
199 timestamp: report.timestamp,
200 width: 0,
201 name: cardinality_metric_mri(),
202 value: BucketValue::Gauge(GaugeValue::single(cardinality.into())),
203 tags,
204 metadata: Default::default(),
205 })
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use relay_base_schema::project::{ProjectId, ProjectKey};
212 #[cfg(feature = "processing")]
213 use relay_cardinality::{CardinalityScope, SlidingWindow};
214 use relay_dynamic_config::GlobalConfig;
215 #[cfg(feature = "processing")]
216 use relay_metrics::{MetricNamespace, MetricType};
217 use relay_quotas::ReasonCode;
218 use tokio::sync::mpsc::UnboundedReceiver;
219
220 use super::*;
221
222 impl MetricStats {
223 pub fn test() -> (Self, UnboundedReceiver<Aggregator>) {
224 create_metric_stats(1.0)
225 }
226 }
227
228 fn create_metric_stats(rollout_rate: f32) -> (MetricStats, UnboundedReceiver<Aggregator>) {
229 let config = Config::from_json_value(serde_json::json!({
230 "processing": {
231 "enabled": true,
232 "kafka_config": [],
233 }
234 }))
235 .unwrap();
236
237 let mut global_config = GlobalConfig::default();
238 global_config.options.metric_stats_rollout_rate = rollout_rate;
239 let global_config = GlobalConfigHandle::fixed(global_config);
240
241 let (addr, receiver) = Addr::custom();
242 let ms = MetricStats::new(Arc::new(config), global_config, addr);
243
244 (ms, receiver)
245 }
246
247 fn scoping() -> Scoping {
248 Scoping {
249 organization_id: OrganizationId::new(42),
250 project_id: ProjectId::new(21),
251 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
252 key_id: Some(17),
253 }
254 }
255
256 macro_rules! tags {
257 ($(($key:expr, $value:expr),)*) => {
258 BTreeMap::from([
259 $(($key.to_owned(), $value.to_owned())),*
260 ])
261 }
262 }
263
264 #[test]
265 fn test_metric_stats_volume() {
266 let (ms, mut receiver) = create_metric_stats(1.0);
267
268 let scoping = scoping();
269 let mut bucket = Bucket::parse(b"rt@millisecond:57|d", UnixTimestamp::now()).unwrap();
270
271 ms.track_metric(scoping, &bucket, &Outcome::Accepted);
272
273 bucket.metadata.merges = bucket.metadata.merges.saturating_add(41);
274 ms.track_metric(
275 scoping,
276 &bucket,
277 &Outcome::RateLimited(Some(ReasonCode::new("foobar"))),
278 );
279
280 drop(ms);
281
282 let Aggregator::MergeBuckets(mut mb) = receiver.blocking_recv().unwrap();
283 assert_eq!(mb.project_key, scoping.project_key);
284
285 assert_eq!(mb.buckets.len(), 1);
286 let bucket = mb.buckets.pop().unwrap();
287
288 assert_eq!(&*bucket.name, "c:metric_stats/volume@none");
289 assert_eq!(bucket.value, BucketValue::Counter(1.into()));
290 assert_eq!(
291 bucket.tags,
292 tags!(
293 ("mri", "d:custom/rt@millisecond"),
294 ("mri.type", "d"),
295 ("mri.namespace", "custom"),
296 ("outcome.id", "0"),
297 )
298 );
299
300 let Aggregator::MergeBuckets(mut mb) = receiver.blocking_recv().unwrap();
301 assert_eq!(mb.project_key, scoping.project_key);
302
303 assert_eq!(mb.buckets.len(), 1);
304 let bucket = mb.buckets.pop().unwrap();
305
306 assert_eq!(&*bucket.name, "c:metric_stats/volume@none");
307 assert_eq!(bucket.value, BucketValue::Counter(42.into()));
308 assert_eq!(
309 bucket.tags,
310 tags!(
311 ("mri", "d:custom/rt@millisecond"),
312 ("mri.type", "d"),
313 ("mri.namespace", "custom"),
314 ("outcome.id", "2"),
315 ("outcome.reason", "foobar"),
316 )
317 );
318
319 assert!(receiver.blocking_recv().is_none());
320 }
321
322 #[test]
323 #[cfg(feature = "processing")]
324 fn test_metric_stats_cardinality_name() {
325 let (ms, mut receiver) = create_metric_stats(1.0);
326
327 let scoping = scoping();
328 let limit = CardinalityLimit {
329 id: "test".to_owned(),
330 passive: false,
331 report: true,
332 window: SlidingWindow {
333 window_seconds: 246,
334 granularity_seconds: 123,
335 },
336 limit: 99,
337 scope: CardinalityScope::Name,
338 namespace: None,
339 };
340 let report = CardinalityReport {
341 timestamp: UnixTimestamp::from_secs(3333),
342 organization_id: Some(scoping.organization_id),
343 project_id: Some(scoping.project_id),
344 metric_type: None,
345 metric_name: Some(MetricName::from("d:custom/rt@millisecond")),
346 cardinality: 12,
347 };
348
349 ms.track_cardinality(scoping, &limit, &report);
350
351 drop(ms);
352
353 let Aggregator::MergeBuckets(mut mb) = receiver.blocking_recv().unwrap();
354 assert_eq!(mb.project_key, scoping.project_key);
355
356 assert_eq!(mb.buckets.len(), 1);
357 let bucket = mb.buckets.pop().unwrap();
358
359 assert_eq!(&*bucket.name, "g:metric_stats/cardinality@none");
360 assert_eq!(bucket.timestamp, UnixTimestamp::from_secs(3333));
361 assert_eq!(
362 bucket.value,
363 BucketValue::Gauge(GaugeValue {
364 last: 12.into(),
365 min: 12.into(),
366 max: 12.into(),
367 sum: 12.into(),
368 count: 1,
369 })
370 );
371 assert_eq!(
372 bucket.tags,
373 tags!(
374 ("mri", "d:custom/rt@millisecond"),
375 ("mri.type", "d"),
376 ("mri.namespace", "custom"),
377 ("cardinality.limit", "test"),
378 ("cardinality.scope", "name"),
379 ("cardinality.window", "246"),
380 )
381 );
382
383 assert!(receiver.blocking_recv().is_none());
384 }
385
386 #[test]
387 #[cfg(feature = "processing")]
388 fn test_metric_stats_cardinality_type() {
389 let (ms, mut receiver) = create_metric_stats(1.0);
390
391 let scoping = scoping();
392 let limit = CardinalityLimit {
393 id: "test".to_owned(),
394 passive: false,
395 report: true,
396 window: SlidingWindow {
397 window_seconds: 246,
398 granularity_seconds: 123,
399 },
400 limit: 99,
401 scope: CardinalityScope::Type,
402 namespace: Some(MetricNamespace::Spans),
403 };
404 let report = CardinalityReport {
405 timestamp: UnixTimestamp::from_secs(2222),
406 organization_id: Some(scoping.organization_id),
407 project_id: Some(scoping.project_id),
408 metric_type: Some(MetricType::Distribution),
409 metric_name: None,
410 cardinality: 12,
411 };
412
413 ms.track_cardinality(scoping, &limit, &report);
414
415 drop(ms);
416
417 let Aggregator::MergeBuckets(mut mb) = receiver.blocking_recv().unwrap();
418 assert_eq!(mb.project_key, scoping.project_key);
419
420 assert_eq!(mb.buckets.len(), 1);
421 let bucket = mb.buckets.pop().unwrap();
422
423 assert_eq!(&*bucket.name, "g:metric_stats/cardinality@none");
424 assert_eq!(bucket.timestamp, UnixTimestamp::from_secs(2222));
425 assert_eq!(
426 bucket.value,
427 BucketValue::Gauge(GaugeValue {
428 last: 12.into(),
429 min: 12.into(),
430 max: 12.into(),
431 sum: 12.into(),
432 count: 1,
433 })
434 );
435 assert_eq!(
436 bucket.tags,
437 tags!(
438 ("mri.type", "d"),
439 ("mri.namespace", "spans"),
440 ("cardinality.limit", "test"),
441 ("cardinality.scope", "type"),
442 ("cardinality.window", "246"),
443 )
444 );
445
446 assert!(receiver.blocking_recv().is_none());
447 }
448
449 #[test]
450 fn test_metric_stats_rollout_rate_disabled() {
451 let (ms, mut receiver) = create_metric_stats(0.0);
452
453 let scoping = scoping();
454 let bucket = Bucket::parse(b"rt@millisecond:57|d", UnixTimestamp::now()).unwrap();
455 ms.track_metric(scoping, &bucket, &Outcome::Accepted);
456
457 drop(ms);
458
459 assert!(receiver.blocking_recv().is_none());
460 }
461
462 #[test]
463 fn test_metric_stats_disabled_namespace() {
464 let (ms, mut receiver) = create_metric_stats(1.0);
465
466 let scoping = scoping();
467 let bucket =
468 Bucket::parse(b"transactions/rt@millisecond:57|d", UnixTimestamp::now()).unwrap();
469 ms.track_metric(scoping, &bucket, &Outcome::Accepted);
470
471 drop(ms);
472
473 assert!(receiver.blocking_recv().is_none());
474 }
475}