relay_cardinality/redis/
limiter.rs

1use async_trait::async_trait;
2use relay_redis::{AsyncRedisClient, AsyncRedisConnection};
3use relay_statsd::metric;
4use std::time::Duration;
5
6use crate::{
7    CardinalityLimit, Result,
8    limiter::{CardinalityReport, Entry, Limiter, Reporter, Scoping},
9    redis::{
10        cache::{Cache, CacheOutcome},
11        quota::QuotaScoping,
12        script::{CardinalityScript, CardinalityScriptResult, Status},
13        state::{LimitState, RedisEntry},
14    },
15    statsd::{CardinalityLimiterHistograms, CardinalityLimiterTimers},
16};
17use relay_common::time::UnixTimestamp;
18
19/// Configuration options for the [`RedisSetLimiter`].
20pub struct RedisSetLimiterOptions {
21    /// Cache vacuum interval for the in memory cache.
22    ///
23    /// The cache will scan for expired values based on this interval.
24    pub cache_vacuum_interval: Duration,
25}
26
27/// Implementation uses Redis sets to keep track of cardinality.
28pub struct RedisSetLimiter {
29    redis: AsyncRedisClient,
30    script: CardinalityScript,
31    cache: Cache,
32    #[cfg(test)]
33    timestamp: UnixTimestamp,
34    #[cfg(test)]
35    time_offset: std::time::Duration,
36}
37
38/// A Redis based limiter using Redis sets to track cardinality and membership.
39impl RedisSetLimiter {
40    /// Creates a new [`RedisSetLimiter`].
41    pub fn new(options: RedisSetLimiterOptions, redis: AsyncRedisClient) -> Self {
42        Self {
43            redis,
44            script: CardinalityScript::load(),
45            cache: Cache::new(options.cache_vacuum_interval),
46            #[cfg(test)]
47            timestamp: UnixTimestamp::now(),
48            #[cfg(test)]
49            time_offset: std::time::Duration::from_secs(0),
50        }
51    }
52
53    /// Checks the limits for a specific scope.
54    ///
55    /// Returns an iterator over all entries which have been accepted.
56    async fn check_limits(
57        &self,
58        connection: &mut AsyncRedisConnection,
59        state: &mut LimitState<'_>,
60        timestamp: UnixTimestamp,
61    ) -> Result<Vec<CheckedLimits>> {
62        let limit = state.limit;
63
64        let scopes = state.take_scopes();
65
66        let mut num_hashes: u64 = 0;
67
68        let mut pipeline = self.script.pipe();
69        for (scope, entries) in &scopes {
70            let keys = scope.slots(timestamp).map(|slot| scope.to_redis_key(slot));
71
72            let hashes = entries.iter().map(|entry| entry.hash);
73            num_hashes += hashes.len() as u64;
74
75            // The expiry is a off by `window.granularity_seconds`,
76            // but since this is only used for cleanup, this is not an issue.
77            pipeline.add_invocation(limit, scope.redis_key_ttl(), hashes, keys);
78        }
79
80        metric!(
81            histogram(CardinalityLimiterHistograms::RedisCheckHashes) = num_hashes,
82            id = state.id(),
83        );
84
85        let results = pipeline.invoke(connection).await?;
86
87        debug_assert_eq!(results.len(), scopes.len());
88        scopes
89            .into_iter()
90            .zip(results)
91            .inspect(|(_, result)| {
92                metric!(
93                    histogram(CardinalityLimiterHistograms::RedisSetCardinality) =
94                        result.cardinality as u64,
95                    id = state.id(),
96                );
97            })
98            .map(|((scope, entries), result)| CheckedLimits::new(scope, entries, result))
99            .collect()
100    }
101}
102
103#[async_trait]
104impl Limiter for RedisSetLimiter {
105    async fn check_cardinality_limits<'a, 'b, E, R>(
106        &self,
107        scoping: Scoping,
108        limits: &'a [CardinalityLimit],
109        entries: E,
110        reporter: &mut R,
111    ) -> Result<()>
112    where
113        E: IntoIterator<Item = Entry<'b>> + Send,
114        R: Reporter<'a> + Send,
115    {
116        #[cfg(not(test))]
117        let timestamp = UnixTimestamp::now();
118        // Allows to fast-forward time in tests using a fixed offset.
119        #[cfg(test)]
120        let timestamp = self.timestamp + self.time_offset;
121
122        let mut states = LimitState::from_limits(scoping, limits);
123
124        {
125            // Acquire a read lock on the cache.
126            let cache = self.cache.read(timestamp);
127            for entry in entries {
128                for state in states.iter_mut() {
129                    let Some(scope) = state.matching_scope(entry) else {
130                        // Entry not relevant for limit.
131                        continue;
132                    };
133
134                    match cache.check(&scope, entry.hash, state.limit) {
135                        CacheOutcome::Accepted => {
136                            // Accepted already, nothing to do.
137                            state.cache_hit();
138                            state.accepted();
139                        }
140                        CacheOutcome::Rejected => {
141                            // Rejected, add it to the rejected list and move on.
142                            reporter.reject(state.cardinality_limit(), entry.id);
143                            state.cache_hit();
144                            state.rejected();
145                        }
146                        CacheOutcome::Unknown => {
147                            // Add the entry to the state -> needs to be checked with Redis.
148                            state.add(scope, RedisEntry::new(entry.id, entry.hash));
149                            state.cache_miss();
150                        }
151                    }
152                }
153            }
154        }
155
156        let mut connection = self.redis.get_connection().await?;
157
158        for mut state in states {
159            if state.is_empty() {
160                continue;
161            }
162
163            let id = &state.id().to_string();
164            let scopes = num_scopes_tag(&state);
165            let results = metric!(
166                timer(CardinalityLimiterTimers::Redis),
167                id = id,
168                scopes = scopes,
169                {
170                    self.check_limits(&mut connection, &mut state, timestamp)
171                        .await
172                }
173            )?;
174
175            for result in results {
176                reporter.report_cardinality(state.cardinality_limit(), result.to_report(timestamp));
177
178                {
179                    // This always acquires a write lock, but we only hit this
180                    // if we previously didn't satisfy the request from the cache,
181                    // -> there is a very high chance we actually need the lock.
182                    //
183                    // Acquire a read lock on the cache.
184                    let mut cache = self.cache.update(&result.scope, timestamp);
185                    for (entry, status) in result {
186                        if status.is_rejected() {
187                            reporter.reject(state.cardinality_limit(), entry.id);
188                            state.rejected();
189                        } else {
190                            cache.accept(entry.hash);
191                            state.accepted();
192                        }
193                    }
194                }
195            }
196        }
197
198        Ok(())
199    }
200}
201
202struct CheckedLimits {
203    scope: QuotaScoping,
204    cardinality: u32,
205    entries: Vec<RedisEntry>,
206    statuses: Vec<Status>,
207}
208
209impl CheckedLimits {
210    fn new(
211        scope: QuotaScoping,
212        entries: Vec<RedisEntry>,
213        result: CardinalityScriptResult,
214    ) -> Result<Self> {
215        result.validate(entries.len())?;
216
217        Ok(Self {
218            scope,
219            entries,
220            cardinality: result.cardinality,
221            statuses: result.statuses,
222        })
223    }
224
225    fn to_report(&self, timestamp: UnixTimestamp) -> CardinalityReport {
226        CardinalityReport {
227            timestamp,
228            organization_id: self.scope.organization_id,
229            project_id: self.scope.project_id,
230            metric_type: self.scope.metric_type,
231            metric_name: self.scope.metric_name.clone(),
232            cardinality: self.cardinality,
233        }
234    }
235}
236
237impl IntoIterator for CheckedLimits {
238    type Item = (RedisEntry, Status);
239    type IntoIter = std::iter::Zip<std::vec::IntoIter<RedisEntry>, std::vec::IntoIter<Status>>;
240
241    fn into_iter(self) -> Self::IntoIter {
242        debug_assert_eq!(
243            self.entries.len(),
244            self.statuses.len(),
245            "expected same amount of entries as statuses"
246        );
247        std::iter::zip(self.entries, self.statuses)
248    }
249}
250
251/// Buckets the amount of scopes contained in a state into a metric tag.
252fn num_scopes_tag(state: &LimitState<'_>) -> &'static str {
253    match state.scopes().len() {
254        0 => "0",
255        1 => "1",
256        2 => "2",
257        3 => "3",
258        4 => "4",
259        5 => "5",
260        6..=10 => "10",
261        11..=25 => "25",
262        26..=50 => "50",
263        51..=100 => "100",
264        101..=500 => "500",
265        _ => "> 500",
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use std::collections::{BTreeMap, HashSet};
272    use std::sync::atomic::AtomicU64;
273
274    use relay_base_schema::metrics::{MetricName, MetricNamespace::*, MetricType};
275    use relay_base_schema::organization::OrganizationId;
276    use relay_base_schema::project::ProjectId;
277    use relay_redis::{RedisConfigOptions, redis};
278
279    use crate::limiter::EntryId;
280    use crate::redis::{KEY_PREFIX, KEY_VERSION};
281    use crate::{CardinalityScope, SlidingWindow};
282
283    use super::*;
284
285    fn build_limiter() -> RedisSetLimiter {
286        let url = std::env::var("RELAY_REDIS_URL")
287            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned());
288
289        let opts = RedisConfigOptions {
290            max_connections: 1,
291            ..Default::default()
292        };
293        let redis = AsyncRedisClient::single(&url, &opts).unwrap();
294
295        RedisSetLimiter::new(
296            RedisSetLimiterOptions {
297                cache_vacuum_interval: Duration::from_secs(5),
298            },
299            redis,
300        )
301    }
302
303    async fn new_scoping(limiter: &RedisSetLimiter) -> Scoping {
304        static ORGS: AtomicU64 = AtomicU64::new(100);
305
306        let scoping = Scoping {
307            organization_id: OrganizationId::new(
308                ORGS.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
309            ),
310            project_id: ProjectId::new(1),
311        };
312
313        limiter.flush(scoping).await;
314
315        scoping
316    }
317
318    #[derive(Debug, Default, PartialEq, Eq)]
319    struct TestReporter {
320        entries: HashSet<EntryId>,
321        reports: BTreeMap<CardinalityLimit, Vec<CardinalityReport>>,
322    }
323
324    impl TestReporter {
325        fn contains_any(&self, ids: impl IntoIterator<Item = usize>) -> bool {
326            ids.into_iter()
327                .any(|id| self.entries.contains(&EntryId(id)))
328        }
329
330        #[track_caller]
331        fn assert_cardinality(&self, limit: &CardinalityLimit, cardinality: u32) {
332            let Some(r) = self.reports.get(limit) else {
333                panic!("expected cardinality report for limit {limit:?}");
334            };
335            assert_eq!(r.len(), 1, "expected one cardinality report");
336            assert_eq!(r[0].cardinality, cardinality);
337        }
338    }
339
340    impl<'a> super::Reporter<'a> for TestReporter {
341        fn reject(&mut self, _limit: &'a CardinalityLimit, entry_id: EntryId) {
342            self.entries.insert(entry_id);
343        }
344
345        fn report_cardinality(&mut self, limit: &'a CardinalityLimit, report: CardinalityReport) {
346            self.reports.entry(limit.clone()).or_default().push(report);
347        }
348    }
349
350    impl std::ops::Deref for TestReporter {
351        type Target = HashSet<EntryId>;
352
353        fn deref(&self) -> &Self::Target {
354            &self.entries
355        }
356    }
357
358    impl RedisSetLimiter {
359        /// Remove all redis state for an organization.
360        async fn flush(&self, scoping: Scoping) {
361            let pattern = format!(
362                "{KEY_PREFIX}:{KEY_VERSION}:scope-{{{o}-*",
363                o = scoping.organization_id
364            );
365
366            let mut connection = self.redis.get_connection().await.unwrap();
367
368            let keys = redis::cmd("KEYS")
369                .arg(pattern)
370                .query_async::<Vec<String>>(&mut connection)
371                .await
372                .unwrap();
373
374            if !keys.is_empty() {
375                let mut del = redis::cmd("DEL");
376                for key in keys {
377                    del.arg(key);
378                }
379                del.query_async::<()>(&mut connection).await.unwrap();
380            }
381        }
382
383        async fn redis_sets(&self, scoping: Scoping) -> Vec<(String, usize)> {
384            let pattern = format!(
385                "{KEY_PREFIX}:{KEY_VERSION}:scope-{{{o}-*",
386                o = scoping.organization_id
387            );
388
389            let mut connection = self.redis.get_connection().await.unwrap();
390
391            let keys: Vec<String> = redis::cmd("KEYS")
392                .arg(pattern)
393                .query_async(&mut connection)
394                .await
395                .unwrap();
396
397            let mut results = Vec::with_capacity(keys.len());
398            for key in keys {
399                let size = redis::cmd("SCARD")
400                    .arg(&key)
401                    .query_async(&mut connection)
402                    .await
403                    .unwrap();
404
405                results.push((key, size));
406            }
407
408            results
409        }
410
411        async fn test_limits<'a, I>(
412            &self,
413            scoping: Scoping,
414            limits: &'a [CardinalityLimit],
415            entries: I,
416        ) -> TestReporter
417        where
418            I: IntoIterator<Item = Entry<'a>> + Send,
419        {
420            let mut reporter = TestReporter::default();
421            self.check_cardinality_limits(scoping, limits, entries, &mut reporter)
422                .await
423                .unwrap();
424            for reports in reporter.reports.values_mut() {
425                reports.sort();
426            }
427            reporter
428        }
429
430        fn timestamp(&self) -> UnixTimestamp {
431            self.timestamp + self.time_offset
432        }
433    }
434
435    #[tokio::test]
436    async fn test_limiter_accept_previously_seen() {
437        let limiter = build_limiter();
438
439        let m0 = MetricName::from("a");
440        let m1 = MetricName::from("b");
441        let m2 = MetricName::from("c");
442        let m3 = MetricName::from("d");
443        let m4 = MetricName::from("e");
444        let m5 = MetricName::from("f");
445
446        let entries = [
447            Entry::new(EntryId(0), Custom, &m0, 0),
448            Entry::new(EntryId(1), Custom, &m1, 1),
449            Entry::new(EntryId(2), Custom, &m2, 2),
450            Entry::new(EntryId(3), Custom, &m3, 3),
451            Entry::new(EntryId(4), Custom, &m4, 4),
452            Entry::new(EntryId(5), Custom, &m5, 5),
453        ];
454
455        let scoping = new_scoping(&limiter).await;
456        let mut limit = CardinalityLimit {
457            id: "limit".to_owned(),
458            passive: false,
459            report: false,
460            window: SlidingWindow {
461                window_seconds: 3600,
462                granularity_seconds: 360,
463            },
464            limit: 5,
465            scope: CardinalityScope::Organization,
466            namespace: Some(Custom),
467        };
468
469        // 6 items, limit is 5 -> 1 rejection.
470        let rejected = limiter
471            .test_limits(scoping, &[limit.clone()], entries)
472            .await;
473        assert_eq!(rejected.len(), 1);
474
475        // We're at the limit but it should still accept already accepted elements, even with a
476        // samller limit than previously accepted.
477        limit.limit = 3;
478        let rejected2 = limiter
479            .test_limits(scoping, &[limit.clone()], entries)
480            .await;
481        assert_eq!(rejected2.entries, rejected.entries);
482
483        // A higher limit should accept everthing
484        limit.limit = 6;
485        let rejected3 = limiter.test_limits(scoping, &[limit], entries).await;
486        assert_eq!(rejected3.len(), 0);
487    }
488
489    #[tokio::test]
490    async fn test_limiter_name_limit() {
491        let limiter = build_limiter();
492
493        let m0 = MetricName::from("a");
494        let m1 = MetricName::from("b");
495
496        let entries = [
497            Entry::new(EntryId(0), Custom, &m0, 0),
498            Entry::new(EntryId(1), Custom, &m0, 1),
499            Entry::new(EntryId(2), Custom, &m0, 2),
500            Entry::new(EntryId(3), Custom, &m1, 3),
501            Entry::new(EntryId(4), Custom, &m1, 4),
502            Entry::new(EntryId(5), Custom, &m1, 5),
503        ];
504
505        let scoping = new_scoping(&limiter).await;
506        let limit = CardinalityLimit {
507            id: "limit".to_owned(),
508            passive: false,
509            report: true,
510            window: SlidingWindow {
511                window_seconds: 3600,
512                granularity_seconds: 360,
513            },
514            limit: 2,
515            scope: CardinalityScope::Name,
516            namespace: Some(Custom),
517        };
518
519        let rejected = limiter
520            .test_limits(scoping, &[limit.clone()], entries)
521            .await;
522        assert_eq!(rejected.len(), 2);
523        assert!(rejected.contains_any([0, 1, 2]));
524        assert!(rejected.contains_any([3, 4, 5]));
525
526        assert_eq!(rejected.reports.len(), 1);
527        let reports = rejected.reports.get(&limit).unwrap();
528        assert_eq!(
529            reports,
530            &[
531                CardinalityReport {
532                    timestamp: limiter.timestamp(),
533                    organization_id: Some(scoping.organization_id),
534                    project_id: Some(scoping.project_id),
535                    metric_type: None,
536                    metric_name: Some(m0),
537                    cardinality: 2,
538                },
539                CardinalityReport {
540                    timestamp: limiter.timestamp(),
541                    organization_id: Some(scoping.organization_id),
542                    project_id: Some(scoping.project_id),
543                    metric_type: None,
544                    metric_name: Some(m1),
545                    cardinality: 2,
546                },
547            ]
548        );
549    }
550
551    #[tokio::test]
552    async fn test_limiter_type_limit() {
553        let limiter = build_limiter();
554
555        let m0 = MetricName::from("c:custom/foo@none");
556        let m1 = MetricName::from("c:custom/bar@none");
557        let m2 = MetricName::from("d:custom/foo@none");
558
559        let entries = [
560            Entry::new(EntryId(0), Custom, &m0, 0),
561            Entry::new(EntryId(1), Custom, &m0, 1),
562            Entry::new(EntryId(2), Custom, &m1, 2),
563            Entry::new(EntryId(3), Custom, &m2, 3),
564            Entry::new(EntryId(4), Custom, &m2, 4),
565            Entry::new(EntryId(5), Custom, &m2, 5),
566        ];
567
568        let scoping = new_scoping(&limiter).await;
569        let limit = CardinalityLimit {
570            id: "limit".to_owned(),
571            passive: false,
572            report: true,
573            window: SlidingWindow {
574                window_seconds: 3600,
575                granularity_seconds: 360,
576            },
577            limit: 2,
578            scope: CardinalityScope::Type,
579            namespace: Some(Custom),
580        };
581
582        let rejected = limiter
583            .test_limits(scoping, &[limit.clone()], entries)
584            .await;
585        assert_eq!(rejected.len(), 2);
586        assert!(rejected.contains_any([0, 1, 2]));
587        assert!(rejected.contains_any([3, 4, 5]));
588
589        assert_eq!(rejected.reports.len(), 1);
590        let reports = rejected.reports.get(&limit).unwrap();
591        assert_eq!(
592            reports,
593            &[
594                CardinalityReport {
595                    timestamp: limiter.timestamp(),
596                    organization_id: Some(scoping.organization_id),
597                    project_id: Some(scoping.project_id),
598                    metric_type: Some(MetricType::Counter),
599                    metric_name: None,
600                    cardinality: 2,
601                },
602                CardinalityReport {
603                    timestamp: limiter.timestamp(),
604                    organization_id: Some(scoping.organization_id),
605                    project_id: Some(scoping.project_id),
606                    metric_type: Some(MetricType::Distribution),
607                    metric_name: None,
608                    cardinality: 2,
609                },
610            ]
611        );
612    }
613
614    #[tokio::test]
615    async fn test_limiter_org_based_time_shift() {
616        let mut limiter = build_limiter();
617
618        let granularity_seconds = 10_000;
619
620        let scoping1 = Scoping {
621            organization_id: OrganizationId::new(granularity_seconds),
622            project_id: ProjectId::new(1),
623        };
624        let scoping2 = Scoping {
625            // Time shift relative to `scoping1` should be half the granularity.
626            organization_id: OrganizationId::new(granularity_seconds / 2),
627            project_id: ProjectId::new(1),
628        };
629
630        let limits = &[CardinalityLimit {
631            id: "limit".to_owned(),
632            passive: false,
633            report: false,
634            window: SlidingWindow {
635                window_seconds: granularity_seconds * 3,
636                granularity_seconds,
637            },
638            limit: 1,
639            scope: CardinalityScope::Organization,
640            namespace: Some(Custom),
641        }];
642
643        let m = MetricName::from("a");
644
645        let entries1 = [Entry::new(EntryId(0), Custom, &m, 0)];
646        assert!(
647            limiter
648                .test_limits(scoping1, limits, entries1)
649                .await
650                .is_empty()
651        );
652        assert!(
653            limiter
654                .test_limits(scoping2, limits, entries1)
655                .await
656                .is_empty()
657        );
658
659        // Make sure `entries2` is not accepted.
660        let entries2 = [Entry::new(EntryId(1), Custom, &m, 1)];
661        assert_eq!(
662            limiter.test_limits(scoping1, limits, entries2).await.len(),
663            1
664        );
665        assert_eq!(
666            limiter.test_limits(scoping2, limits, entries2).await.len(),
667            1
668        );
669
670        let mut scoping1_accept = None;
671        let mut scoping2_accept = None;
672
673        // Measure time required until `entries2` is accepted.
674        for i in 0..100 {
675            let offset = i * granularity_seconds / 10;
676
677            limiter.time_offset = Duration::from_secs(offset);
678
679            if scoping1_accept.is_none()
680                && limiter
681                    .test_limits(scoping1, limits, entries2)
682                    .await
683                    .is_empty()
684            {
685                scoping1_accept = Some(offset as i64);
686            }
687
688            if scoping2_accept.is_none()
689                && limiter
690                    .test_limits(scoping2, limits, entries2)
691                    .await
692                    .is_empty()
693            {
694                scoping2_accept = Some(offset as i64);
695            }
696
697            if scoping1_accept.is_some() && scoping2_accept.is_some() {
698                break;
699            }
700        }
701
702        let scoping1_accept = scoping1_accept.unwrap();
703        let scoping2_accept = scoping2_accept.unwrap();
704
705        let diff = (scoping1_accept - scoping2_accept).abs();
706        let expected = granularity_seconds as i64 / 2;
707        // Make sure they are perfectly offset.
708        assert_eq!(diff, expected);
709    }
710
711    #[tokio::test]
712    async fn test_limiter_small_within_limits() {
713        let limiter = build_limiter();
714        let scoping = new_scoping(&limiter).await;
715
716        let limits = &[CardinalityLimit {
717            id: "limit".to_owned(),
718            passive: false,
719            report: false,
720            window: SlidingWindow {
721                window_seconds: 3600,
722                granularity_seconds: 360,
723            },
724            limit: 10_000,
725            scope: CardinalityScope::Organization,
726            namespace: Some(Custom),
727        }];
728
729        let m = MetricName::from("a");
730
731        let entries = (0..50)
732            .map(|i| Entry::new(EntryId(i as usize), Custom, &m, i))
733            .collect::<Vec<_>>();
734        let rejected = limiter.test_limits(scoping, limits, entries).await;
735        assert_eq!(rejected.len(), 0);
736
737        let entries = (100..150)
738            .map(|i| Entry::new(EntryId(i as usize), Custom, &m, i))
739            .collect::<Vec<_>>();
740        let rejected = limiter.test_limits(scoping, limits, entries).await;
741        assert_eq!(rejected.len(), 0);
742    }
743
744    #[tokio::test]
745    async fn test_limiter_big_limit() {
746        let limiter = build_limiter();
747
748        let scoping = new_scoping(&limiter).await;
749        let limits = &[CardinalityLimit {
750            id: "limit".to_owned(),
751            passive: false,
752            report: false,
753            window: SlidingWindow {
754                window_seconds: 3600,
755                granularity_seconds: 360,
756            },
757            limit: 80_000,
758            scope: CardinalityScope::Organization,
759            namespace: Some(Custom),
760        }];
761
762        let m = MetricName::from("a");
763
764        let entries = (0..100_000)
765            .map(|i| Entry::new(EntryId(i as usize), Custom, &m, i))
766            .collect::<Vec<_>>();
767
768        let rejected = limiter.test_limits(scoping, limits, entries).await;
769        assert_eq!(rejected.len(), 20_000);
770    }
771
772    #[tokio::test]
773    async fn test_limiter_sliding_window() {
774        let mut limiter = build_limiter();
775
776        let scoping = new_scoping(&limiter).await;
777        let window = SlidingWindow {
778            window_seconds: 3600,
779            granularity_seconds: 360,
780        };
781        let limits = &[CardinalityLimit {
782            id: "limit".to_owned(),
783            passive: false,
784            report: false,
785            window,
786            limit: 1,
787            scope: CardinalityScope::Organization,
788            namespace: Some(Custom),
789        }];
790
791        let m0 = MetricName::from("a");
792        let m1 = MetricName::from("b");
793
794        let entries1 = [Entry::new(EntryId(0), Custom, &m0, 0)];
795        let entries2 = [Entry::new(EntryId(1), Custom, &m1, 1)];
796
797        // 1 item and limit is 1 -> No rejections.
798        let rejected = limiter.test_limits(scoping, limits, entries1).await;
799        assert_eq!(rejected.len(), 0);
800
801        for i in 0..(window.window_seconds / window.granularity_seconds) * 2 {
802            // Fast forward time.
803            limiter.time_offset = Duration::from_secs(i * window.granularity_seconds);
804
805            // Should accept the already inserted item.
806            let rejected = limiter.test_limits(scoping, limits, entries1).await;
807            assert_eq!(rejected.len(), 0);
808            // Should reject the new item.
809            let rejected = limiter.test_limits(scoping, limits, entries2).await;
810            assert_eq!(rejected.len(), 1);
811        }
812
813        // Fast forward time to a fresh window.
814        limiter.time_offset = Duration::from_secs(1 + window.window_seconds * 3);
815        // Accept the new element.
816        let rejected = limiter.test_limits(scoping, limits, entries2).await;
817        assert_eq!(rejected.len(), 0);
818        // Reject the old element now.
819        let rejected = limiter.test_limits(scoping, limits, entries1).await;
820        assert_eq!(rejected.len(), 1);
821    }
822
823    #[tokio::test]
824    async fn test_limiter_no_namespace_limit_is_shared_limit() {
825        let limiter = build_limiter();
826        let scoping = new_scoping(&limiter).await;
827
828        let limits = &[CardinalityLimit {
829            id: "limit".to_owned(),
830            passive: false,
831            report: false,
832            window: SlidingWindow {
833                window_seconds: 3600,
834                granularity_seconds: 360,
835            },
836            limit: 2,
837            scope: CardinalityScope::Organization,
838            namespace: None,
839        }];
840
841        let m0 = MetricName::from("a");
842        let m1 = MetricName::from("b");
843        let m2 = MetricName::from("c");
844
845        let entries1 = [Entry::new(EntryId(0), Custom, &m0, 0)];
846        let entries2 = [Entry::new(EntryId(0), Spans, &m1, 1)];
847        let entries3 = [Entry::new(EntryId(0), Transactions, &m2, 2)];
848
849        let rejected = limiter.test_limits(scoping, limits, entries1).await;
850        assert_eq!(rejected.len(), 0);
851
852        let rejected = limiter.test_limits(scoping, limits, entries2).await;
853        assert_eq!(rejected.len(), 0);
854
855        let rejected = limiter.test_limits(scoping, limits, entries3).await;
856        assert_eq!(rejected.len(), 1);
857    }
858
859    #[tokio::test]
860    async fn test_limiter_multiple_limits() {
861        let limiter = build_limiter();
862        let scoping = new_scoping(&limiter).await;
863
864        let limits = &[
865            CardinalityLimit {
866                id: "limit1".to_owned(),
867                passive: false,
868                report: false,
869                window: SlidingWindow {
870                    window_seconds: 3600,
871                    granularity_seconds: 360,
872                },
873                limit: 1,
874                scope: CardinalityScope::Organization,
875                namespace: Some(Custom),
876            },
877            CardinalityLimit {
878                id: "limit2".to_owned(),
879                passive: false,
880                report: false,
881                window: SlidingWindow {
882                    window_seconds: 3600,
883                    granularity_seconds: 360,
884                },
885                limit: 1,
886                scope: CardinalityScope::Organization,
887                namespace: Some(Custom),
888            },
889            CardinalityLimit {
890                id: "limit3".to_owned(),
891                passive: false,
892                report: false,
893                window: SlidingWindow {
894                    window_seconds: 3600,
895                    granularity_seconds: 360,
896                },
897                limit: 1,
898                scope: CardinalityScope::Project,
899                namespace: Some(Spans),
900            },
901            CardinalityLimit {
902                id: "unknown_skipped".to_owned(),
903                passive: false,
904                report: false,
905                window: SlidingWindow {
906                    window_seconds: 3600,
907                    granularity_seconds: 360,
908                },
909                limit: 1,
910                scope: CardinalityScope::Unknown,
911                namespace: Some(Transactions),
912            },
913        ];
914
915        let m0 = MetricName::from("a");
916        let m1 = MetricName::from("b");
917        let m2 = MetricName::from("c");
918        let m3 = MetricName::from("d");
919        let m4 = MetricName::from("e");
920        let m5 = MetricName::from("f");
921
922        let entries = [
923            Entry::new(EntryId(0), Custom, &m0, 0),
924            Entry::new(EntryId(1), Custom, &m1, 1),
925            Entry::new(EntryId(2), Spans, &m2, 2),
926            Entry::new(EntryId(3), Spans, &m3, 3),
927            Entry::new(EntryId(4), Transactions, &m4, 4),
928            Entry::new(EntryId(5), Transactions, &m5, 5),
929        ];
930
931        // Run multiple times to make sure caching does not interfere.
932        for i in 0..3 {
933            let rejected = limiter.test_limits(scoping, limits, entries).await;
934
935            // 2 transactions + 1 span + 1 custom (4) accepted -> 2 (6-4) rejected.
936            assert_eq!(rejected.len(), 2);
937            // 1 custom rejected.
938            assert!(rejected.contains_any([0, 1]));
939            // 1 span rejected.
940            assert!(rejected.contains_any([2, 3]));
941            // 2 transactions accepted -> no rejections.
942            assert!(!rejected.contains(&EntryId(4)));
943            assert!(!rejected.contains(&EntryId(5)));
944
945            // Cardinality reports are only generated for items not coming from the cache,
946            // after the first iteration items may be coming from the cache.
947            if i == 0 {
948                assert_eq!(rejected.reports.len(), 3);
949                assert_eq!(
950                    rejected.reports.get(&limits[0]).unwrap(),
951                    &[CardinalityReport {
952                        timestamp: limiter.timestamp(),
953                        organization_id: Some(scoping.organization_id),
954                        project_id: None,
955                        metric_type: None,
956                        metric_name: None,
957                        cardinality: 1
958                    }]
959                );
960                assert_eq!(
961                    rejected.reports.get(&limits[1]).unwrap(),
962                    &[CardinalityReport {
963                        timestamp: limiter.timestamp(),
964                        organization_id: Some(scoping.organization_id),
965                        project_id: None,
966                        metric_type: None,
967                        metric_name: None,
968                        cardinality: 1
969                    }]
970                );
971                assert_eq!(
972                    rejected.reports.get(&limits[2]).unwrap(),
973                    &[CardinalityReport {
974                        timestamp: limiter.timestamp(),
975                        organization_id: Some(scoping.organization_id),
976                        project_id: Some(scoping.project_id),
977                        metric_type: None,
978                        metric_name: None,
979                        cardinality: 1
980                    }]
981                );
982                assert!(!rejected.reports.contains_key(&limits[3]));
983            } else {
984                // Coming from cache.
985                assert!(rejected.reports.is_empty());
986            }
987        }
988    }
989
990    #[tokio::test]
991    async fn test_project_limit() {
992        let limiter = build_limiter();
993
994        let scoping1 = new_scoping(&limiter).await;
995        let scoping2 = Scoping {
996            project_id: ProjectId::new(2),
997            ..scoping1
998        };
999
1000        let limits = &[CardinalityLimit {
1001            id: "limit".to_owned(),
1002            passive: false,
1003            report: false,
1004            window: SlidingWindow {
1005                window_seconds: 3600,
1006                granularity_seconds: 360,
1007            },
1008            limit: 1,
1009            scope: CardinalityScope::Project,
1010            namespace: None,
1011        }];
1012
1013        let m1 = MetricName::from("a");
1014        let m2 = MetricName::from("b");
1015        let entries1 = [Entry::new(EntryId(0), Custom, &m1, 0)];
1016        let entries2 = [Entry::new(EntryId(0), Custom, &m2, 1)];
1017
1018        // Accept different entries for different scopes.
1019        let rejected = limiter.test_limits(scoping1, limits, entries1).await;
1020        assert_eq!(rejected.len(), 0);
1021        let rejected = limiter.test_limits(scoping2, limits, entries2).await;
1022        assert_eq!(rejected.len(), 0);
1023
1024        // Make sure the other entry is not accepted.
1025        let rejected = limiter.test_limits(scoping1, limits, entries2).await;
1026        assert_eq!(rejected.len(), 1);
1027        let rejected = limiter.test_limits(scoping2, limits, entries1).await;
1028        assert_eq!(rejected.len(), 1);
1029    }
1030
1031    #[tokio::test]
1032    async fn test_limiter_sliding_window_full() {
1033        let mut limiter = build_limiter();
1034        let scoping = new_scoping(&limiter).await;
1035
1036        let window = SlidingWindow {
1037            window_seconds: 300,
1038            granularity_seconds: 100,
1039        };
1040        let limits = &[CardinalityLimit {
1041            id: "limit".to_owned(),
1042            passive: false,
1043            report: false,
1044            window,
1045            limit: 100,
1046            scope: CardinalityScope::Organization,
1047            namespace: Some(Custom),
1048        }];
1049
1050        let m = MetricName::from("foo");
1051        macro_rules! test {
1052            ($r:expr) => {{
1053                let entries = $r
1054                    .map(|i| Entry::new(EntryId(i as usize), Custom, &m, i))
1055                    .collect::<Vec<_>>();
1056
1057                limiter.test_limits(scoping, limits, entries).await
1058            }};
1059        }
1060
1061        macro_rules! assert_test {
1062            ($v:expr, $rejected:expr) => {{
1063                let report = test!($v);
1064                assert_eq!(report.len(), $rejected);
1065            }};
1066            ($v:expr, $rejected:expr, $cardinality:expr) => {{
1067                let report = test!($v);
1068                assert_eq!(report.len(), $rejected);
1069                report.assert_cardinality(&limits[0], $cardinality);
1070            }};
1071        }
1072
1073        // Fill the first window with values.
1074        assert_test!(0..100, 0, 100);
1075
1076        // Refresh 0..50 - Full.
1077        limiter.time_offset = Duration::from_secs(window.granularity_seconds);
1078        assert_test!(0..50, 0, 100);
1079        assert_test!(100..125, 25, 100);
1080
1081        // Refresh 0..50 - Full.
1082        limiter.time_offset = Duration::from_secs(window.granularity_seconds * 2);
1083        assert_test!(0..50, 0, 100);
1084        assert_test!(125..150, 25, 100);
1085
1086        // Refresh 0..50 - 50..100 fell out of the window, add 25 (size 50 -> 75).
1087        // --> Set 4 has size 75.
1088        limiter.time_offset = Duration::from_secs(window.granularity_seconds * 3);
1089        assert_test!(0..50, 0, 50);
1090        assert_test!(150..175, 0, 75);
1091
1092        // Refresh 0..50 - Still 25 available (size 75 -> 100).
1093        limiter.time_offset = Duration::from_secs(window.granularity_seconds * 4);
1094        assert_test!(0..50, 0, 75);
1095        assert_test!(175..200, 0, 100);
1096
1097        // From this point on it is repeating:
1098        //  - Always refresh 0..50.
1099        //  - First granule is full.
1100        //  - Next two granules each have space for 25 -> immediately filled.
1101        for i in 0..21 {
1102            let start = 200 + i * 25;
1103            let end = start + 25;
1104
1105            limiter.time_offset = Duration::from_secs(window.granularity_seconds * (i as u64 + 5));
1106
1107            if i % 3 == 0 {
1108                assert_test!(0..50, 0, 100);
1109                assert_test!(start..end, 25, 100);
1110            } else {
1111                assert_test!(0..50, 0, 75);
1112                assert_test!(start..end, 0, 100);
1113            }
1114        }
1115
1116        let mut sets = limiter.redis_sets(scoping).await;
1117        sets.sort();
1118
1119        let len = sets.len();
1120        for (i, (_, size)) in sets.into_iter().enumerate() {
1121            // Set 4 and the last set were never fully filled.
1122            if i == 3 || i + 1 == len {
1123                assert_eq!(size, 75);
1124            } else {
1125                assert_eq!(size, 100);
1126            }
1127        }
1128    }
1129
1130    #[tokio::test]
1131    async fn test_limiter_sliding_window_perfect() {
1132        let mut limiter = build_limiter();
1133        let scoping = new_scoping(&limiter).await;
1134
1135        let window = SlidingWindow {
1136            window_seconds: 300,
1137            granularity_seconds: 100,
1138        };
1139        let limits = &[CardinalityLimit {
1140            id: "limit".to_owned(),
1141            passive: false,
1142            report: false,
1143            window,
1144            limit: 100,
1145            scope: CardinalityScope::Organization,
1146            namespace: Some(Custom),
1147        }];
1148
1149        let m = MetricName::from("foo");
1150        macro_rules! test {
1151            ($r:expr) => {{
1152                let entries = $r
1153                    .map(|i| Entry::new(EntryId(i as usize), Custom, &m, i))
1154                    .collect::<Vec<_>>();
1155
1156                limiter.test_limits(scoping, limits, entries).await
1157            }};
1158        }
1159
1160        // Test Case:
1161        //  1. Fill the window
1162        //  2. Fast forward one granule and fill that with the same values
1163        //  3. Assert that values "confirmed" in the first granule are active for a full window
1164        //
1165        // [ ][ ][ ]{ }{ }{ }[ ][ ][ ]
1166        // [A][A][A]{ }{ }{ }[ ][ ][ ]
1167        //    [A][A]{A}{ }{ }[ ][ ][ ]
1168        //     |     \-- Assert this value.
1169        //     \-- Touch this value.
1170
1171        // Fill the first window with values.
1172        let report = test!(0..100);
1173        assert!(report.is_empty());
1174        report.assert_cardinality(&limits[0], 100);
1175
1176        // Fast forward one granule with the same values.
1177        limiter.time_offset = Duration::from_secs(window.granularity_seconds);
1178        let report = test!(0..100);
1179        assert!(report.is_empty());
1180        report.assert_cardinality(&limits[0], 100);
1181
1182        // Fast forward to the first granule after a full reset.
1183        limiter.time_offset = Duration::from_secs(window.window_seconds);
1184        // Make sure the window is full and does not accept new values.
1185        assert_eq!(test!(200..300).len(), 100);
1186        // Assert original values.
1187        assert!(test!(0..100).is_empty());
1188    }
1189}