relay_cardinality/
limiter.rs

1//! Relay Cardinality Limiter
2
3use std::cmp::Reverse;
4use std::collections::BTreeMap;
5
6use async_trait::async_trait;
7use hashbrown::{HashMap, HashSet};
8use relay_base_schema::metrics::{MetricName, MetricNamespace, MetricType};
9use relay_base_schema::organization::OrganizationId;
10use relay_base_schema::project::ProjectId;
11use relay_common::time::UnixTimestamp;
12use relay_statsd::metric;
13
14use crate::statsd::CardinalityLimiterTimers;
15use crate::{CardinalityLimit, Error, Result};
16
17/// Data scoping information.
18///
19/// This structure holds information of all scopes required for attributing entries to limits.
20#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
21pub struct Scoping {
22    /// The organization id.
23    pub organization_id: OrganizationId,
24    /// The project id.
25    pub project_id: ProjectId,
26}
27
28/// Cardinality report for a specific limit.
29///
30/// Contains scoping information for the enforced limit and the current cardinality.
31/// If all of the scoping information is `None`, the limit is a global cardinality limit.
32#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
33pub struct CardinalityReport {
34    /// Time for which the cardinality limit was enforced.
35    pub timestamp: UnixTimestamp,
36
37    /// Organization id for which the cardinality limit was applied.
38    ///
39    /// Only available if the the limit was at least scoped to
40    /// [`CardinalityScope::Organization`](crate::CardinalityScope::Organization).
41    pub organization_id: Option<OrganizationId>,
42    /// Project id for which the cardinality limit was applied.
43    ///
44    /// Only available if the the limit was at least scoped to
45    /// [`CardinalityScope::Project`](crate::CardinalityScope::Project).
46    pub project_id: Option<ProjectId>,
47    /// Metric type for which the cardinality limit was applied.
48    ///
49    /// Only available if the the limit was scoped to
50    /// [`CardinalityScope::Type`](crate::CardinalityScope::Type).
51    pub metric_type: Option<MetricType>,
52    /// Metric name for which the cardinality limit was applied.
53    ///
54    /// Only available if the limit was scoped to
55    /// [`CardinalityScope::Name`](crate::CardinalityScope::Name).
56    pub metric_name: Option<MetricName>,
57
58    /// The current cardinality.
59    pub cardinality: u32,
60}
61
62/// Accumulator of all cardinality limiter decisions.
63pub trait Reporter<'a> {
64    /// Called for ever [`Entry`] which was rejected from the [`Limiter`].
65    fn reject(&mut self, limit: &'a CardinalityLimit, entry_id: EntryId);
66
67    /// Called for every individual limit applied.
68    ///
69    /// The callback can be called multiple times with different reports
70    /// for the same `limit` or not at all if there was no change in cardinality.
71    ///
72    /// For example, with a name scoped limit can be called once for every
73    /// metric name matching the limit.
74    fn report_cardinality(&mut self, limit: &'a CardinalityLimit, report: CardinalityReport);
75}
76
77/// Limiter responsible to enforce limits.
78#[async_trait]
79pub trait Limiter {
80    /// Verifies cardinality limits.
81    ///
82    /// Returns an iterator containing only accepted entries.
83    async fn check_cardinality_limits<'a, 'b, E, R>(
84        &self,
85        scoping: Scoping,
86        limits: &'a [CardinalityLimit],
87        entries: E,
88        reporter: &mut R,
89    ) -> Result<()>
90    where
91        E: IntoIterator<Item = Entry<'b>> + Send,
92        R: Reporter<'a> + Send;
93}
94
95/// Unit of operation for the cardinality limiter.
96pub trait CardinalityItem {
97    /// Transforms this item into a consistent hash.
98    fn to_hash(&self) -> u32;
99
100    /// Metric namespace of the item.
101    ///
102    /// If this method returns `None` the item is automatically rejected.
103    fn namespace(&self) -> Option<MetricNamespace>;
104
105    /// Name of the item.
106    fn name(&self) -> &MetricName;
107}
108
109/// A single entry to check cardinality for.
110#[derive(Clone, Copy, Debug)]
111pub struct Entry<'a> {
112    /// Opaque entry id, used to keep track of indices and buckets.
113    pub id: EntryId,
114    /// Metric namespace to which the cardinality limit can be scoped.
115    pub namespace: MetricNamespace,
116    /// Name to which the cardinality limit can be scoped.
117    pub name: &'a MetricName,
118    /// Hash of the metric name and tags.
119    pub hash: u32,
120}
121
122/// Represents a unique Id for a bucket within one invocation
123/// of the cardinality limiter.
124///
125/// Opaque data structure used by [`CardinalityLimiter`] to track
126/// which buckets have been accepted and rejected.
127#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
128pub struct EntryId(pub usize);
129
130impl<'a> Entry<'a> {
131    /// Creates a new entry.
132    pub fn new(id: EntryId, namespace: MetricNamespace, name: &'a MetricName, hash: u32) -> Self {
133        Self {
134            id,
135            namespace,
136            name,
137            hash,
138        }
139    }
140}
141
142/// Cardinality Limiter enforcing cardinality limits on buckets.
143///
144/// Delegates enforcement to a [`Limiter`].
145pub struct CardinalityLimiter<T: Limiter> {
146    limiter: T,
147}
148
149impl<T: Limiter> CardinalityLimiter<T> {
150    /// Creates a new cardinality limiter.
151    pub fn new(limiter: T) -> Self {
152        Self { limiter }
153    }
154
155    /// Checks cardinality limits of a list of buckets.
156    ///
157    /// Returns an iterator of all buckets that have been accepted.
158    pub async fn check_cardinality_limits<'a, I: CardinalityItem>(
159        &self,
160        scoping: Scoping,
161        limits: &'a [CardinalityLimit],
162        items: Vec<I>,
163    ) -> Result<CardinalityLimits<'a, I>, (Vec<I>, Error)> {
164        if limits.is_empty() {
165            return Ok(CardinalityLimits::new(items, Default::default()));
166        }
167
168        metric!(timer(CardinalityLimiterTimers::CardinalityLimiter), {
169            let entries = items
170                .iter()
171                .enumerate()
172                .filter_map(|(id, item)| {
173                    Some(Entry::new(
174                        EntryId(id),
175                        item.namespace()?,
176                        item.name(),
177                        item.to_hash(),
178                    ))
179                })
180                .collect::<Vec<_>>();
181
182            let mut rejections = DefaultReporter::default();
183            if let Err(err) = self
184                .limiter
185                .check_cardinality_limits(scoping, limits, entries, &mut rejections)
186                .await
187            {
188                return Err((items, err));
189            }
190
191            if !rejections.entries.is_empty() {
192                relay_log::debug!(
193                    scoping = ?scoping,
194                    "rejected {} metrics due to cardinality limit",
195                    rejections.entries.len(),
196                );
197            }
198
199            Ok(CardinalityLimits::new(items, rejections))
200        })
201    }
202}
203
204/// Internal outcome accumulator tracking the raw value from an [`EntryId`].
205///
206/// The result can be used directly by [`CardinalityLimits`].
207#[derive(Debug, Default)]
208struct DefaultReporter<'a> {
209    /// All limits that have been exceeded.
210    exceeded_limits: HashSet<&'a CardinalityLimit>,
211    /// A map from entries that have been rejected to the most
212    /// specific non-passive limit that they exceeded.
213    ///
214    /// "Specificity" is determined by scope and limit, in that order.
215    entries: HashMap<usize, &'a CardinalityLimit>,
216    reports: BTreeMap<&'a CardinalityLimit, Vec<CardinalityReport>>,
217}
218
219impl<'a> Reporter<'a> for DefaultReporter<'a> {
220    #[inline(always)]
221    fn reject(&mut self, limit: &'a CardinalityLimit, entry_id: EntryId) {
222        self.exceeded_limits.insert(limit);
223        if !limit.passive {
224            // Write `limit` into the entry if it's more specific than the existing limit
225            // (or if there isn't one)
226            self.entries
227                .entry(entry_id.0)
228                .and_modify(|existing_limit| {
229                    // Scopes are ordered by reverse specificity (org is the smallest), so we use `Reverse` here
230                    if (Reverse(limit.scope), limit.limit)
231                        < (Reverse(existing_limit.scope), existing_limit.limit)
232                    {
233                        *existing_limit = limit;
234                    }
235                })
236                .or_insert(limit);
237        }
238    }
239
240    #[inline(always)]
241    fn report_cardinality(&mut self, limit: &'a CardinalityLimit, report: CardinalityReport) {
242        if !limit.report {
243            return;
244        }
245        self.reports.entry(limit).or_default().push(report);
246    }
247}
248
249/// Split of the original source containing accepted and rejected source elements.
250#[derive(Debug)]
251pub struct CardinalityLimitsSplit<'a, T> {
252    /// The list of accepted elements of the source.
253    pub accepted: Vec<T>,
254    /// The list of rejected elements of the source, together
255    /// with the most specific limit they exceeded.
256    pub rejected: Vec<(T, &'a CardinalityLimit)>,
257}
258
259impl<T> CardinalityLimitsSplit<'_, T> {
260    /// Creates a new cardinality limits split with a given capacity for `accepted` and `rejected`
261    /// elements.
262    fn with_capacity(accepted_capacity: usize, rejected_capacity: usize) -> Self {
263        CardinalityLimitsSplit {
264            accepted: Vec::with_capacity(accepted_capacity),
265            rejected: Vec::with_capacity(rejected_capacity),
266        }
267    }
268}
269
270/// Result of [`CardinalityLimiter::check_cardinality_limits`].
271#[derive(Debug)]
272pub struct CardinalityLimits<'a, T> {
273    /// The source.
274    source: Vec<T>,
275    /// List of rejected item indices pointing into `source`.
276    rejections: HashMap<usize, &'a CardinalityLimit>,
277    /// All non-passive exceeded limits.
278    exceeded_limits: HashSet<&'a CardinalityLimit>,
279    /// Generated cardinality reports.
280    reports: BTreeMap<&'a CardinalityLimit, Vec<CardinalityReport>>,
281}
282
283impl<'a, T> CardinalityLimits<'a, T> {
284    fn new(source: Vec<T>, reporter: DefaultReporter<'a>) -> Self {
285        Self {
286            source,
287            rejections: reporter.entries,
288            exceeded_limits: reporter.exceeded_limits,
289            reports: reporter.reports,
290        }
291    }
292
293    /// Returns `true` if any items have been rejected.
294    pub fn has_rejections(&self) -> bool {
295        !self.rejections.is_empty()
296    }
297
298    /// Returns all id's of cardinality limits which were exceeded.
299    ///
300    /// This includes passive limits.
301    pub fn exceeded_limits(&self) -> &HashSet<&'a CardinalityLimit> {
302        &self.exceeded_limits
303    }
304
305    /// Returns all cardinality reports grouped by the cardinality limit.
306    ///
307    /// Cardinality reports are generated for all cardinality limits with reporting enabled
308    /// and the current cardinality changed.
309    pub fn cardinality_reports(&self) -> &BTreeMap<&'a CardinalityLimit, Vec<CardinalityReport>> {
310        &self.reports
311    }
312
313    /// Recovers the original list of items passed to the cardinality limiter.
314    pub fn into_source(self) -> Vec<T> {
315        self.source
316    }
317
318    /// Returns an iterator yielding only rejected items.
319    pub fn rejected(&self) -> impl Iterator<Item = &T> {
320        self.rejections.keys().filter_map(|&i| self.source.get(i))
321    }
322
323    /// Consumes the result and returns [`CardinalityLimitsSplit`] containing all accepted and rejected items.
324    pub fn into_split(mut self) -> CardinalityLimitsSplit<'a, T> {
325        if self.rejections.is_empty() {
326            return CardinalityLimitsSplit {
327                accepted: self.source,
328                rejected: Vec::new(),
329            };
330        }
331        // TODO: we might want to optimize this method later, by reusing one of the arrays and
332        // swap removing elements from it.
333        let source_len = self.source.len();
334        let rejections_len = self.rejections.len();
335        self.source.into_iter().enumerate().fold(
336            CardinalityLimitsSplit::with_capacity(source_len - rejections_len, rejections_len),
337            |mut split, (i, item)| {
338                if let Some(exceeded) = self.rejections.remove(&i) {
339                    split.rejected.push((item, exceeded));
340                } else {
341                    split.accepted.push(item);
342                };
343
344                split
345            },
346        )
347    }
348}
349
350#[cfg(test)]
351mod tests {
352    use crate::{CardinalityScope, SlidingWindow};
353
354    use super::*;
355
356    #[derive(Debug, Clone, Hash, PartialEq, Eq)]
357    struct Item {
358        hash: u32,
359        namespace: Option<MetricNamespace>,
360        name: MetricName,
361    }
362
363    impl Item {
364        fn new(hash: u32, namespace: impl Into<Option<MetricNamespace>>) -> Self {
365            Self {
366                hash,
367                namespace: namespace.into(),
368                name: MetricName::from("foobar"),
369            }
370        }
371    }
372
373    impl CardinalityItem for Item {
374        fn to_hash(&self) -> u32 {
375            self.hash
376        }
377
378        fn namespace(&self) -> Option<MetricNamespace> {
379            self.namespace
380        }
381
382        fn name(&self) -> &MetricName {
383            &self.name
384        }
385    }
386
387    fn build_limits() -> [CardinalityLimit; 1] {
388        [CardinalityLimit {
389            id: "limit".to_owned(),
390            passive: false,
391            report: false,
392            window: SlidingWindow {
393                window_seconds: 3600,
394                granularity_seconds: 360,
395            },
396            limit: 10_000,
397            scope: CardinalityScope::Organization,
398            namespace: None,
399        }]
400    }
401
402    fn build_scoping() -> Scoping {
403        Scoping {
404            organization_id: OrganizationId::new(1),
405            project_id: ProjectId::new(1),
406        }
407    }
408
409    #[tokio::test]
410    async fn test_accepted() {
411        // HACK: we need to make Windows happy.
412        fn assert_eq(value: Vec<char>, expected_value: Vec<char>) {
413            assert_eq!(value, expected_value)
414        }
415
416        let limit = CardinalityLimit {
417            id: "dummy_limit".to_owned(),
418            passive: false,
419            report: false,
420            window: SlidingWindow {
421                window_seconds: 0,
422                granularity_seconds: 0,
423            },
424            limit: 0,
425            scope: CardinalityScope::Organization,
426            namespace: None,
427        };
428
429        let limits = CardinalityLimits {
430            source: vec!['a', 'b', 'c', 'd', 'e'],
431            rejections: HashMap::from([(0, &limit), (1, &limit), (3, &limit)]),
432            exceeded_limits: HashSet::new(),
433            reports: BTreeMap::new(),
434        };
435        assert!(limits.has_rejections());
436        let split = limits.into_split();
437        assert_eq!(
438            split.rejected,
439            vec![('a', &limit), ('b', &limit), ('d', &limit)]
440        );
441        assert_eq!(split.accepted, vec!['c', 'e']);
442
443        let limits = CardinalityLimits {
444            source: vec!['a', 'b', 'c', 'd', 'e'],
445            rejections: HashMap::from([]),
446            exceeded_limits: HashSet::new(),
447            reports: BTreeMap::new(),
448        };
449        assert!(!limits.has_rejections());
450        let split = limits.into_split();
451        assert!(split.rejected.is_empty());
452        assert_eq!(split.accepted, vec!['a', 'b', 'c', 'd', 'e']);
453
454        let limits = CardinalityLimits {
455            source: vec!['a', 'b', 'c', 'd', 'e'],
456            rejections: HashMap::from([
457                (0, &limit),
458                (1, &limit),
459                (2, &limit),
460                (3, &limit),
461                (4, &limit),
462            ]),
463            exceeded_limits: HashSet::new(),
464            reports: BTreeMap::new(),
465        };
466        assert!(limits.has_rejections());
467        let split = limits.into_split();
468        assert_eq!(
469            split.rejected,
470            vec![
471                ('a', &limit),
472                ('b', &limit),
473                ('c', &limit),
474                ('d', &limit),
475                ('e', &limit)
476            ]
477        );
478        assert_eq(split.accepted, vec![]);
479    }
480
481    #[tokio::test]
482    async fn test_limiter_reject_all() {
483        struct RejectAllLimiter;
484
485        #[async_trait]
486        impl Limiter for RejectAllLimiter {
487            async fn check_cardinality_limits<'a, 'b, I, T>(
488                &self,
489                _scoping: Scoping,
490                limits: &'a [CardinalityLimit],
491                entries: I,
492                rejections: &mut T,
493            ) -> Result<()>
494            where
495                I: IntoIterator<Item = Entry<'b>> + Send,
496                T: Reporter<'a> + Send,
497            {
498                for entry in entries {
499                    rejections.reject(&limits[0], entry.id);
500                }
501
502                Ok(())
503            }
504        }
505
506        let limiter = CardinalityLimiter::new(RejectAllLimiter);
507
508        let items = vec![
509            Item::new(0, MetricNamespace::Transactions),
510            Item::new(1, MetricNamespace::Transactions),
511        ];
512        let limits = build_limits();
513        let result = limiter
514            .check_cardinality_limits(build_scoping(), &limits, items.clone())
515            .await
516            .unwrap();
517
518        let expected_items = items
519            .into_iter()
520            .zip(std::iter::repeat(&limits[0]))
521            .collect::<Vec<_>>();
522
523        assert_eq!(result.exceeded_limits(), &HashSet::from([&limits[0]]));
524        let split = result.into_split();
525        assert_eq!(split.rejected, expected_items);
526        assert!(split.accepted.is_empty());
527    }
528
529    #[tokio::test]
530    async fn test_limiter_accept_all() {
531        struct AcceptAllLimiter;
532
533        #[async_trait]
534        impl Limiter for AcceptAllLimiter {
535            async fn check_cardinality_limits<'a, 'b, I, T>(
536                &self,
537                _scoping: Scoping,
538                _limits: &'a [CardinalityLimit],
539                _entries: I,
540                _reporter: &mut T,
541            ) -> Result<()>
542            where
543                I: IntoIterator<Item = Entry<'b>> + Send,
544                T: Reporter<'a> + Send,
545            {
546                Ok(())
547            }
548        }
549
550        let limiter = CardinalityLimiter::new(AcceptAllLimiter);
551
552        let items = vec![
553            Item::new(0, MetricNamespace::Transactions),
554            Item::new(1, MetricNamespace::Spans),
555        ];
556        let limits = build_limits();
557        let result = limiter
558            .check_cardinality_limits(build_scoping(), &limits, items.clone())
559            .await
560            .unwrap();
561
562        let split = result.into_split();
563        assert!(split.rejected.is_empty());
564        assert_eq!(split.accepted, items);
565    }
566
567    #[tokio::test]
568    async fn test_limiter_accept_odd_reject_even() {
569        struct RejectEvenLimiter;
570
571        #[async_trait]
572        impl Limiter for RejectEvenLimiter {
573            async fn check_cardinality_limits<'a, 'b, I, T>(
574                &self,
575                scoping: Scoping,
576                limits: &'a [CardinalityLimit],
577                entries: I,
578                reporter: &mut T,
579            ) -> Result<()>
580            where
581                I: IntoIterator<Item = Entry<'b>> + Send,
582                T: Reporter<'a> + Send,
583            {
584                assert_eq!(scoping, build_scoping());
585                assert_eq!(limits, &build_limits());
586
587                for entry in entries {
588                    if entry.id.0 % 2 == 0 {
589                        reporter.reject(&limits[0], entry.id);
590                    }
591                }
592
593                Ok(())
594            }
595        }
596
597        let limiter = CardinalityLimiter::new(RejectEvenLimiter);
598
599        let items = vec![
600            Item::new(0, MetricNamespace::Sessions),
601            Item::new(1, MetricNamespace::Transactions),
602            Item::new(2, MetricNamespace::Spans),
603            Item::new(3, MetricNamespace::Custom),
604            Item::new(4, MetricNamespace::Custom),
605            Item::new(5, MetricNamespace::Transactions),
606            Item::new(6, MetricNamespace::Spans),
607        ];
608        let limits = build_limits();
609        let split = limiter
610            .check_cardinality_limits(build_scoping(), &limits, items)
611            .await
612            .unwrap()
613            .into_split();
614
615        assert_eq!(
616            split.rejected,
617            vec![
618                (Item::new(0, MetricNamespace::Sessions), &limits[0]),
619                (Item::new(2, MetricNamespace::Spans), &limits[0]),
620                (Item::new(4, MetricNamespace::Custom), &limits[0]),
621                (Item::new(6, MetricNamespace::Spans), &limits[0]),
622            ]
623        );
624        assert_eq!(
625            split.accepted,
626            vec![
627                Item::new(1, MetricNamespace::Transactions),
628                Item::new(3, MetricNamespace::Custom),
629                Item::new(5, MetricNamespace::Transactions),
630            ]
631        );
632    }
633
634    #[tokio::test]
635    async fn test_limiter_passive() {
636        struct RejectLimits;
637
638        #[async_trait]
639        impl Limiter for RejectLimits {
640            async fn check_cardinality_limits<'a, 'b, I, T>(
641                &self,
642                _scoping: Scoping,
643                limits: &'a [CardinalityLimit],
644                entries: I,
645                reporter: &mut T,
646            ) -> Result<()>
647            where
648                I: IntoIterator<Item = Entry<'b>> + Send,
649                T: Reporter<'a> + Send,
650            {
651                for entry in entries {
652                    reporter.reject(&limits[entry.id.0 % limits.len()], entry.id);
653                }
654                Ok(())
655            }
656        }
657
658        let limiter = CardinalityLimiter::new(RejectLimits);
659        let limits = &[
660            CardinalityLimit {
661                id: "limit_passive".to_owned(),
662                passive: false,
663                report: false,
664                window: SlidingWindow {
665                    window_seconds: 3600,
666                    granularity_seconds: 360,
667                },
668                limit: 10_000,
669                scope: CardinalityScope::Organization,
670                namespace: None,
671            },
672            CardinalityLimit {
673                id: "limit_enforced".to_owned(),
674                passive: true,
675                report: false,
676                window: SlidingWindow {
677                    window_seconds: 3600,
678                    granularity_seconds: 360,
679                },
680                limit: 10_000,
681                scope: CardinalityScope::Organization,
682                namespace: None,
683            },
684        ];
685
686        let items = vec![
687            Item::new(0, MetricNamespace::Custom),
688            Item::new(1, MetricNamespace::Custom),
689            Item::new(2, MetricNamespace::Custom),
690            Item::new(3, MetricNamespace::Custom),
691            Item::new(4, MetricNamespace::Custom),
692            Item::new(5, MetricNamespace::Custom),
693        ];
694        let limited = limiter
695            .check_cardinality_limits(build_scoping(), limits, items)
696            .await
697            .unwrap();
698
699        assert!(limited.has_rejections());
700        assert_eq!(limited.exceeded_limits(), &limits.iter().collect());
701
702        let split = limited.into_split();
703        assert_eq!(
704            split.rejected,
705            vec![
706                (Item::new(0, MetricNamespace::Custom), &limits[0]),
707                (Item::new(2, MetricNamespace::Custom), &limits[0]),
708                (Item::new(4, MetricNamespace::Custom), &limits[0]),
709            ]
710        );
711        assert_eq!(
712            split.accepted,
713            vec![
714                Item::new(1, MetricNamespace::Custom),
715                Item::new(3, MetricNamespace::Custom),
716                Item::new(5, MetricNamespace::Custom),
717            ]
718        );
719    }
720
721    #[tokio::test]
722    async fn test_cardinality_report() {
723        struct CreateReports;
724
725        #[async_trait]
726        impl Limiter for CreateReports {
727            async fn check_cardinality_limits<'a, 'b, I, T>(
728                &self,
729                scoping: Scoping,
730                limits: &'a [CardinalityLimit],
731                _entries: I,
732                reporter: &mut T,
733            ) -> Result<()>
734            where
735                I: IntoIterator<Item = Entry<'b>> + Send,
736                T: Reporter<'a> + Send,
737            {
738                reporter.report_cardinality(
739                    &limits[0],
740                    CardinalityReport {
741                        timestamp: UnixTimestamp::from_secs(5000),
742                        organization_id: Some(scoping.organization_id),
743                        project_id: Some(scoping.project_id),
744                        metric_type: None,
745                        metric_name: Some(MetricName::from("foo")),
746                        cardinality: 1,
747                    },
748                );
749
750                reporter.report_cardinality(
751                    &limits[0],
752                    CardinalityReport {
753                        timestamp: UnixTimestamp::from_secs(5001),
754                        organization_id: Some(scoping.organization_id),
755                        project_id: Some(scoping.project_id),
756                        metric_type: None,
757                        metric_name: Some(MetricName::from("bar")),
758                        cardinality: 2,
759                    },
760                );
761
762                reporter.report_cardinality(
763                    &limits[2],
764                    CardinalityReport {
765                        timestamp: UnixTimestamp::from_secs(5002),
766                        organization_id: Some(scoping.organization_id),
767                        project_id: Some(scoping.project_id),
768                        metric_type: None,
769                        metric_name: None,
770                        cardinality: 3,
771                    },
772                );
773
774                Ok(())
775            }
776        }
777
778        let window = SlidingWindow {
779            window_seconds: 3600,
780            granularity_seconds: 360,
781        };
782
783        let limits = &[
784            CardinalityLimit {
785                id: "report".to_owned(),
786                passive: false,
787                report: true,
788                window,
789                limit: 10_000,
790                scope: CardinalityScope::Organization,
791                namespace: None,
792            },
793            CardinalityLimit {
794                id: "no_report".to_owned(),
795                passive: false,
796                report: false,
797                window,
798                limit: 10_000,
799                scope: CardinalityScope::Organization,
800                namespace: None,
801            },
802            CardinalityLimit {
803                id: "report_again".to_owned(),
804                passive: true,
805                report: true,
806                window,
807                limit: 10_000,
808                scope: CardinalityScope::Organization,
809                namespace: None,
810            },
811        ];
812        let scoping = build_scoping();
813        let items = vec![Item::new(0, MetricNamespace::Custom)];
814
815        let limiter = CardinalityLimiter::new(CreateReports);
816        let limited = limiter
817            .check_cardinality_limits(scoping, limits, items)
818            .await
819            .unwrap();
820
821        let reports = limited.cardinality_reports();
822        assert_eq!(reports.len(), 2);
823        assert_eq!(
824            reports.get(&limits[0]).unwrap(),
825            &[
826                CardinalityReport {
827                    timestamp: UnixTimestamp::from_secs(5000),
828                    organization_id: Some(scoping.organization_id),
829                    project_id: Some(scoping.project_id),
830                    metric_type: None,
831                    metric_name: Some(MetricName::from("foo")),
832                    cardinality: 1
833                },
834                CardinalityReport {
835                    timestamp: UnixTimestamp::from_secs(5001),
836                    organization_id: Some(scoping.organization_id),
837                    project_id: Some(scoping.project_id),
838                    metric_type: None,
839                    metric_name: Some(MetricName::from("bar")),
840                    cardinality: 2
841                }
842            ]
843        );
844        assert_eq!(
845            reports.get(&limits[2]).unwrap(),
846            &[CardinalityReport {
847                timestamp: UnixTimestamp::from_secs(5002),
848                organization_id: Some(scoping.organization_id),
849                project_id: Some(scoping.project_id),
850                metric_type: None,
851                metric_name: None,
852                cardinality: 3
853            }]
854        );
855    }
856}