1use std::cmp::Reverse;
4use std::collections::BTreeMap;
5
6use async_trait::async_trait;
7use hashbrown::{HashMap, HashSet};
8use relay_base_schema::metrics::{MetricName, MetricNamespace, MetricType};
9use relay_base_schema::organization::OrganizationId;
10use relay_base_schema::project::ProjectId;
11use relay_common::time::UnixTimestamp;
12use relay_statsd::metric;
13
14use crate::statsd::CardinalityLimiterTimers;
15use crate::{CardinalityLimit, Error, Result};
16
17#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
21pub struct Scoping {
22 pub organization_id: OrganizationId,
24 pub project_id: ProjectId,
26}
27
28#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
33pub struct CardinalityReport {
34 pub timestamp: UnixTimestamp,
36
37 pub organization_id: Option<OrganizationId>,
42 pub project_id: Option<ProjectId>,
47 pub metric_type: Option<MetricType>,
52 pub metric_name: Option<MetricName>,
57
58 pub cardinality: u32,
60}
61
62pub trait Reporter<'a> {
64 fn reject(&mut self, limit: &'a CardinalityLimit, entry_id: EntryId);
66
67 fn report_cardinality(&mut self, limit: &'a CardinalityLimit, report: CardinalityReport);
75}
76
77#[async_trait]
79pub trait Limiter {
80 async fn check_cardinality_limits<'a, 'b, E, R>(
84 &self,
85 scoping: Scoping,
86 limits: &'a [CardinalityLimit],
87 entries: E,
88 reporter: &mut R,
89 ) -> Result<()>
90 where
91 E: IntoIterator<Item = Entry<'b>> + Send,
92 R: Reporter<'a> + Send;
93}
94
95pub trait CardinalityItem {
97 fn to_hash(&self) -> u32;
99
100 fn namespace(&self) -> Option<MetricNamespace>;
104
105 fn name(&self) -> &MetricName;
107}
108
109#[derive(Clone, Copy, Debug)]
111pub struct Entry<'a> {
112 pub id: EntryId,
114 pub namespace: MetricNamespace,
116 pub name: &'a MetricName,
118 pub hash: u32,
120}
121
122#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
128pub struct EntryId(pub usize);
129
130impl<'a> Entry<'a> {
131 pub fn new(id: EntryId, namespace: MetricNamespace, name: &'a MetricName, hash: u32) -> Self {
133 Self {
134 id,
135 namespace,
136 name,
137 hash,
138 }
139 }
140}
141
142pub struct CardinalityLimiter<T: Limiter> {
146 limiter: T,
147}
148
149impl<T: Limiter> CardinalityLimiter<T> {
150 pub fn new(limiter: T) -> Self {
152 Self { limiter }
153 }
154
155 pub async fn check_cardinality_limits<'a, I: CardinalityItem>(
159 &self,
160 scoping: Scoping,
161 limits: &'a [CardinalityLimit],
162 items: Vec<I>,
163 ) -> Result<CardinalityLimits<'a, I>, (Vec<I>, Error)> {
164 if limits.is_empty() {
165 return Ok(CardinalityLimits::new(items, Default::default()));
166 }
167
168 metric!(timer(CardinalityLimiterTimers::CardinalityLimiter), {
169 let entries = items
170 .iter()
171 .enumerate()
172 .filter_map(|(id, item)| {
173 Some(Entry::new(
174 EntryId(id),
175 item.namespace()?,
176 item.name(),
177 item.to_hash(),
178 ))
179 })
180 .collect::<Vec<_>>();
181
182 let mut rejections = DefaultReporter::default();
183 if let Err(err) = self
184 .limiter
185 .check_cardinality_limits(scoping, limits, entries, &mut rejections)
186 .await
187 {
188 return Err((items, err));
189 }
190
191 if !rejections.entries.is_empty() {
192 relay_log::debug!(
193 scoping = ?scoping,
194 "rejected {} metrics due to cardinality limit",
195 rejections.entries.len(),
196 );
197 }
198
199 Ok(CardinalityLimits::new(items, rejections))
200 })
201 }
202}
203
204#[derive(Debug, Default)]
208struct DefaultReporter<'a> {
209 exceeded_limits: HashSet<&'a CardinalityLimit>,
211 entries: HashMap<usize, &'a CardinalityLimit>,
216 reports: BTreeMap<&'a CardinalityLimit, Vec<CardinalityReport>>,
217}
218
219impl<'a> Reporter<'a> for DefaultReporter<'a> {
220 #[inline(always)]
221 fn reject(&mut self, limit: &'a CardinalityLimit, entry_id: EntryId) {
222 self.exceeded_limits.insert(limit);
223 if !limit.passive {
224 self.entries
227 .entry(entry_id.0)
228 .and_modify(|existing_limit| {
229 if (Reverse(limit.scope), limit.limit)
231 < (Reverse(existing_limit.scope), existing_limit.limit)
232 {
233 *existing_limit = limit;
234 }
235 })
236 .or_insert(limit);
237 }
238 }
239
240 #[inline(always)]
241 fn report_cardinality(&mut self, limit: &'a CardinalityLimit, report: CardinalityReport) {
242 if !limit.report {
243 return;
244 }
245 self.reports.entry(limit).or_default().push(report);
246 }
247}
248
249#[derive(Debug)]
251pub struct CardinalityLimitsSplit<'a, T> {
252 pub accepted: Vec<T>,
254 pub rejected: Vec<(T, &'a CardinalityLimit)>,
257}
258
259impl<T> CardinalityLimitsSplit<'_, T> {
260 fn with_capacity(accepted_capacity: usize, rejected_capacity: usize) -> Self {
263 CardinalityLimitsSplit {
264 accepted: Vec::with_capacity(accepted_capacity),
265 rejected: Vec::with_capacity(rejected_capacity),
266 }
267 }
268}
269
270#[derive(Debug)]
272pub struct CardinalityLimits<'a, T> {
273 source: Vec<T>,
275 rejections: HashMap<usize, &'a CardinalityLimit>,
277 exceeded_limits: HashSet<&'a CardinalityLimit>,
279 reports: BTreeMap<&'a CardinalityLimit, Vec<CardinalityReport>>,
281}
282
283impl<'a, T> CardinalityLimits<'a, T> {
284 fn new(source: Vec<T>, reporter: DefaultReporter<'a>) -> Self {
285 Self {
286 source,
287 rejections: reporter.entries,
288 exceeded_limits: reporter.exceeded_limits,
289 reports: reporter.reports,
290 }
291 }
292
293 pub fn has_rejections(&self) -> bool {
295 !self.rejections.is_empty()
296 }
297
298 pub fn exceeded_limits(&self) -> &HashSet<&'a CardinalityLimit> {
302 &self.exceeded_limits
303 }
304
305 pub fn cardinality_reports(&self) -> &BTreeMap<&'a CardinalityLimit, Vec<CardinalityReport>> {
310 &self.reports
311 }
312
313 pub fn into_source(self) -> Vec<T> {
315 self.source
316 }
317
318 pub fn rejected(&self) -> impl Iterator<Item = &T> {
320 self.rejections.keys().filter_map(|&i| self.source.get(i))
321 }
322
323 pub fn into_split(mut self) -> CardinalityLimitsSplit<'a, T> {
325 if self.rejections.is_empty() {
326 return CardinalityLimitsSplit {
327 accepted: self.source,
328 rejected: Vec::new(),
329 };
330 }
331 let source_len = self.source.len();
334 let rejections_len = self.rejections.len();
335 self.source.into_iter().enumerate().fold(
336 CardinalityLimitsSplit::with_capacity(source_len - rejections_len, rejections_len),
337 |mut split, (i, item)| {
338 if let Some(exceeded) = self.rejections.remove(&i) {
339 split.rejected.push((item, exceeded));
340 } else {
341 split.accepted.push(item);
342 };
343
344 split
345 },
346 )
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use crate::{CardinalityScope, SlidingWindow};
353
354 use super::*;
355
356 #[derive(Debug, Clone, Hash, PartialEq, Eq)]
357 struct Item {
358 hash: u32,
359 namespace: Option<MetricNamespace>,
360 name: MetricName,
361 }
362
363 impl Item {
364 fn new(hash: u32, namespace: impl Into<Option<MetricNamespace>>) -> Self {
365 Self {
366 hash,
367 namespace: namespace.into(),
368 name: MetricName::from("foobar"),
369 }
370 }
371 }
372
373 impl CardinalityItem for Item {
374 fn to_hash(&self) -> u32 {
375 self.hash
376 }
377
378 fn namespace(&self) -> Option<MetricNamespace> {
379 self.namespace
380 }
381
382 fn name(&self) -> &MetricName {
383 &self.name
384 }
385 }
386
387 fn build_limits() -> [CardinalityLimit; 1] {
388 [CardinalityLimit {
389 id: "limit".to_owned(),
390 passive: false,
391 report: false,
392 window: SlidingWindow {
393 window_seconds: 3600,
394 granularity_seconds: 360,
395 },
396 limit: 10_000,
397 scope: CardinalityScope::Organization,
398 namespace: None,
399 }]
400 }
401
402 fn build_scoping() -> Scoping {
403 Scoping {
404 organization_id: OrganizationId::new(1),
405 project_id: ProjectId::new(1),
406 }
407 }
408
409 #[tokio::test]
410 async fn test_accepted() {
411 fn assert_eq(value: Vec<char>, expected_value: Vec<char>) {
413 assert_eq!(value, expected_value)
414 }
415
416 let limit = CardinalityLimit {
417 id: "dummy_limit".to_owned(),
418 passive: false,
419 report: false,
420 window: SlidingWindow {
421 window_seconds: 0,
422 granularity_seconds: 0,
423 },
424 limit: 0,
425 scope: CardinalityScope::Organization,
426 namespace: None,
427 };
428
429 let limits = CardinalityLimits {
430 source: vec!['a', 'b', 'c', 'd', 'e'],
431 rejections: HashMap::from([(0, &limit), (1, &limit), (3, &limit)]),
432 exceeded_limits: HashSet::new(),
433 reports: BTreeMap::new(),
434 };
435 assert!(limits.has_rejections());
436 let split = limits.into_split();
437 assert_eq!(
438 split.rejected,
439 vec![('a', &limit), ('b', &limit), ('d', &limit)]
440 );
441 assert_eq!(split.accepted, vec!['c', 'e']);
442
443 let limits = CardinalityLimits {
444 source: vec!['a', 'b', 'c', 'd', 'e'],
445 rejections: HashMap::from([]),
446 exceeded_limits: HashSet::new(),
447 reports: BTreeMap::new(),
448 };
449 assert!(!limits.has_rejections());
450 let split = limits.into_split();
451 assert!(split.rejected.is_empty());
452 assert_eq!(split.accepted, vec!['a', 'b', 'c', 'd', 'e']);
453
454 let limits = CardinalityLimits {
455 source: vec!['a', 'b', 'c', 'd', 'e'],
456 rejections: HashMap::from([
457 (0, &limit),
458 (1, &limit),
459 (2, &limit),
460 (3, &limit),
461 (4, &limit),
462 ]),
463 exceeded_limits: HashSet::new(),
464 reports: BTreeMap::new(),
465 };
466 assert!(limits.has_rejections());
467 let split = limits.into_split();
468 assert_eq!(
469 split.rejected,
470 vec![
471 ('a', &limit),
472 ('b', &limit),
473 ('c', &limit),
474 ('d', &limit),
475 ('e', &limit)
476 ]
477 );
478 assert_eq(split.accepted, vec![]);
479 }
480
481 #[tokio::test]
482 async fn test_limiter_reject_all() {
483 struct RejectAllLimiter;
484
485 #[async_trait]
486 impl Limiter for RejectAllLimiter {
487 async fn check_cardinality_limits<'a, 'b, I, T>(
488 &self,
489 _scoping: Scoping,
490 limits: &'a [CardinalityLimit],
491 entries: I,
492 rejections: &mut T,
493 ) -> Result<()>
494 where
495 I: IntoIterator<Item = Entry<'b>> + Send,
496 T: Reporter<'a> + Send,
497 {
498 for entry in entries {
499 rejections.reject(&limits[0], entry.id);
500 }
501
502 Ok(())
503 }
504 }
505
506 let limiter = CardinalityLimiter::new(RejectAllLimiter);
507
508 let items = vec![
509 Item::new(0, MetricNamespace::Transactions),
510 Item::new(1, MetricNamespace::Transactions),
511 ];
512 let limits = build_limits();
513 let result = limiter
514 .check_cardinality_limits(build_scoping(), &limits, items.clone())
515 .await
516 .unwrap();
517
518 let expected_items = items
519 .into_iter()
520 .zip(std::iter::repeat(&limits[0]))
521 .collect::<Vec<_>>();
522
523 assert_eq!(result.exceeded_limits(), &HashSet::from([&limits[0]]));
524 let split = result.into_split();
525 assert_eq!(split.rejected, expected_items);
526 assert!(split.accepted.is_empty());
527 }
528
529 #[tokio::test]
530 async fn test_limiter_accept_all() {
531 struct AcceptAllLimiter;
532
533 #[async_trait]
534 impl Limiter for AcceptAllLimiter {
535 async fn check_cardinality_limits<'a, 'b, I, T>(
536 &self,
537 _scoping: Scoping,
538 _limits: &'a [CardinalityLimit],
539 _entries: I,
540 _reporter: &mut T,
541 ) -> Result<()>
542 where
543 I: IntoIterator<Item = Entry<'b>> + Send,
544 T: Reporter<'a> + Send,
545 {
546 Ok(())
547 }
548 }
549
550 let limiter = CardinalityLimiter::new(AcceptAllLimiter);
551
552 let items = vec![
553 Item::new(0, MetricNamespace::Transactions),
554 Item::new(1, MetricNamespace::Spans),
555 ];
556 let limits = build_limits();
557 let result = limiter
558 .check_cardinality_limits(build_scoping(), &limits, items.clone())
559 .await
560 .unwrap();
561
562 let split = result.into_split();
563 assert!(split.rejected.is_empty());
564 assert_eq!(split.accepted, items);
565 }
566
567 #[tokio::test]
568 async fn test_limiter_accept_odd_reject_even() {
569 struct RejectEvenLimiter;
570
571 #[async_trait]
572 impl Limiter for RejectEvenLimiter {
573 async fn check_cardinality_limits<'a, 'b, I, T>(
574 &self,
575 scoping: Scoping,
576 limits: &'a [CardinalityLimit],
577 entries: I,
578 reporter: &mut T,
579 ) -> Result<()>
580 where
581 I: IntoIterator<Item = Entry<'b>> + Send,
582 T: Reporter<'a> + Send,
583 {
584 assert_eq!(scoping, build_scoping());
585 assert_eq!(limits, &build_limits());
586
587 for entry in entries {
588 if entry.id.0 % 2 == 0 {
589 reporter.reject(&limits[0], entry.id);
590 }
591 }
592
593 Ok(())
594 }
595 }
596
597 let limiter = CardinalityLimiter::new(RejectEvenLimiter);
598
599 let items = vec![
600 Item::new(0, MetricNamespace::Sessions),
601 Item::new(1, MetricNamespace::Transactions),
602 Item::new(2, MetricNamespace::Spans),
603 Item::new(3, MetricNamespace::Custom),
604 Item::new(4, MetricNamespace::Custom),
605 Item::new(5, MetricNamespace::Transactions),
606 Item::new(6, MetricNamespace::Spans),
607 ];
608 let limits = build_limits();
609 let split = limiter
610 .check_cardinality_limits(build_scoping(), &limits, items)
611 .await
612 .unwrap()
613 .into_split();
614
615 assert_eq!(
616 split.rejected,
617 vec![
618 (Item::new(0, MetricNamespace::Sessions), &limits[0]),
619 (Item::new(2, MetricNamespace::Spans), &limits[0]),
620 (Item::new(4, MetricNamespace::Custom), &limits[0]),
621 (Item::new(6, MetricNamespace::Spans), &limits[0]),
622 ]
623 );
624 assert_eq!(
625 split.accepted,
626 vec![
627 Item::new(1, MetricNamespace::Transactions),
628 Item::new(3, MetricNamespace::Custom),
629 Item::new(5, MetricNamespace::Transactions),
630 ]
631 );
632 }
633
634 #[tokio::test]
635 async fn test_limiter_passive() {
636 struct RejectLimits;
637
638 #[async_trait]
639 impl Limiter for RejectLimits {
640 async fn check_cardinality_limits<'a, 'b, I, T>(
641 &self,
642 _scoping: Scoping,
643 limits: &'a [CardinalityLimit],
644 entries: I,
645 reporter: &mut T,
646 ) -> Result<()>
647 where
648 I: IntoIterator<Item = Entry<'b>> + Send,
649 T: Reporter<'a> + Send,
650 {
651 for entry in entries {
652 reporter.reject(&limits[entry.id.0 % limits.len()], entry.id);
653 }
654 Ok(())
655 }
656 }
657
658 let limiter = CardinalityLimiter::new(RejectLimits);
659 let limits = &[
660 CardinalityLimit {
661 id: "limit_passive".to_owned(),
662 passive: false,
663 report: false,
664 window: SlidingWindow {
665 window_seconds: 3600,
666 granularity_seconds: 360,
667 },
668 limit: 10_000,
669 scope: CardinalityScope::Organization,
670 namespace: None,
671 },
672 CardinalityLimit {
673 id: "limit_enforced".to_owned(),
674 passive: true,
675 report: false,
676 window: SlidingWindow {
677 window_seconds: 3600,
678 granularity_seconds: 360,
679 },
680 limit: 10_000,
681 scope: CardinalityScope::Organization,
682 namespace: None,
683 },
684 ];
685
686 let items = vec![
687 Item::new(0, MetricNamespace::Custom),
688 Item::new(1, MetricNamespace::Custom),
689 Item::new(2, MetricNamespace::Custom),
690 Item::new(3, MetricNamespace::Custom),
691 Item::new(4, MetricNamespace::Custom),
692 Item::new(5, MetricNamespace::Custom),
693 ];
694 let limited = limiter
695 .check_cardinality_limits(build_scoping(), limits, items)
696 .await
697 .unwrap();
698
699 assert!(limited.has_rejections());
700 assert_eq!(limited.exceeded_limits(), &limits.iter().collect());
701
702 let split = limited.into_split();
703 assert_eq!(
704 split.rejected,
705 vec![
706 (Item::new(0, MetricNamespace::Custom), &limits[0]),
707 (Item::new(2, MetricNamespace::Custom), &limits[0]),
708 (Item::new(4, MetricNamespace::Custom), &limits[0]),
709 ]
710 );
711 assert_eq!(
712 split.accepted,
713 vec![
714 Item::new(1, MetricNamespace::Custom),
715 Item::new(3, MetricNamespace::Custom),
716 Item::new(5, MetricNamespace::Custom),
717 ]
718 );
719 }
720
721 #[tokio::test]
722 async fn test_cardinality_report() {
723 struct CreateReports;
724
725 #[async_trait]
726 impl Limiter for CreateReports {
727 async fn check_cardinality_limits<'a, 'b, I, T>(
728 &self,
729 scoping: Scoping,
730 limits: &'a [CardinalityLimit],
731 _entries: I,
732 reporter: &mut T,
733 ) -> Result<()>
734 where
735 I: IntoIterator<Item = Entry<'b>> + Send,
736 T: Reporter<'a> + Send,
737 {
738 reporter.report_cardinality(
739 &limits[0],
740 CardinalityReport {
741 timestamp: UnixTimestamp::from_secs(5000),
742 organization_id: Some(scoping.organization_id),
743 project_id: Some(scoping.project_id),
744 metric_type: None,
745 metric_name: Some(MetricName::from("foo")),
746 cardinality: 1,
747 },
748 );
749
750 reporter.report_cardinality(
751 &limits[0],
752 CardinalityReport {
753 timestamp: UnixTimestamp::from_secs(5001),
754 organization_id: Some(scoping.organization_id),
755 project_id: Some(scoping.project_id),
756 metric_type: None,
757 metric_name: Some(MetricName::from("bar")),
758 cardinality: 2,
759 },
760 );
761
762 reporter.report_cardinality(
763 &limits[2],
764 CardinalityReport {
765 timestamp: UnixTimestamp::from_secs(5002),
766 organization_id: Some(scoping.organization_id),
767 project_id: Some(scoping.project_id),
768 metric_type: None,
769 metric_name: None,
770 cardinality: 3,
771 },
772 );
773
774 Ok(())
775 }
776 }
777
778 let window = SlidingWindow {
779 window_seconds: 3600,
780 granularity_seconds: 360,
781 };
782
783 let limits = &[
784 CardinalityLimit {
785 id: "report".to_owned(),
786 passive: false,
787 report: true,
788 window,
789 limit: 10_000,
790 scope: CardinalityScope::Organization,
791 namespace: None,
792 },
793 CardinalityLimit {
794 id: "no_report".to_owned(),
795 passive: false,
796 report: false,
797 window,
798 limit: 10_000,
799 scope: CardinalityScope::Organization,
800 namespace: None,
801 },
802 CardinalityLimit {
803 id: "report_again".to_owned(),
804 passive: true,
805 report: true,
806 window,
807 limit: 10_000,
808 scope: CardinalityScope::Organization,
809 namespace: None,
810 },
811 ];
812 let scoping = build_scoping();
813 let items = vec![Item::new(0, MetricNamespace::Custom)];
814
815 let limiter = CardinalityLimiter::new(CreateReports);
816 let limited = limiter
817 .check_cardinality_limits(scoping, limits, items)
818 .await
819 .unwrap();
820
821 let reports = limited.cardinality_reports();
822 assert_eq!(reports.len(), 2);
823 assert_eq!(
824 reports.get(&limits[0]).unwrap(),
825 &[
826 CardinalityReport {
827 timestamp: UnixTimestamp::from_secs(5000),
828 organization_id: Some(scoping.organization_id),
829 project_id: Some(scoping.project_id),
830 metric_type: None,
831 metric_name: Some(MetricName::from("foo")),
832 cardinality: 1
833 },
834 CardinalityReport {
835 timestamp: UnixTimestamp::from_secs(5001),
836 organization_id: Some(scoping.organization_id),
837 project_id: Some(scoping.project_id),
838 metric_type: None,
839 metric_name: Some(MetricName::from("bar")),
840 cardinality: 2
841 }
842 ]
843 );
844 assert_eq!(
845 reports.get(&limits[2]).unwrap(),
846 &[CardinalityReport {
847 timestamp: UnixTimestamp::from_secs(5002),
848 organization_id: Some(scoping.organization_id),
849 project_id: Some(scoping.project_id),
850 metric_type: None,
851 metric_name: None,
852 cardinality: 3
853 }]
854 );
855 }
856}