1use async_trait::async_trait;
2use relay_redis::{AsyncRedisClient, AsyncRedisConnection};
3use relay_statsd::metric;
4use std::time::Duration;
5
6use crate::{
7 CardinalityLimit, Result,
8 limiter::{CardinalityReport, Entry, Limiter, Reporter, Scoping},
9 redis::{
10 cache::{Cache, CacheOutcome},
11 quota::QuotaScoping,
12 script::{CardinalityScript, CardinalityScriptResult, Status},
13 state::{LimitState, RedisEntry},
14 },
15 statsd::{CardinalityLimiterHistograms, CardinalityLimiterTimers},
16};
17use relay_common::time::UnixTimestamp;
18
19pub struct RedisSetLimiterOptions {
21 pub cache_vacuum_interval: Duration,
25}
26
27pub struct RedisSetLimiter {
29 redis: AsyncRedisClient,
30 script: CardinalityScript,
31 cache: Cache,
32 #[cfg(test)]
33 timestamp: UnixTimestamp,
34 #[cfg(test)]
35 time_offset: std::time::Duration,
36}
37
38impl RedisSetLimiter {
40 pub fn new(options: RedisSetLimiterOptions, redis: AsyncRedisClient) -> Self {
42 Self {
43 redis,
44 script: CardinalityScript::load(),
45 cache: Cache::new(options.cache_vacuum_interval),
46 #[cfg(test)]
47 timestamp: UnixTimestamp::now(),
48 #[cfg(test)]
49 time_offset: std::time::Duration::from_secs(0),
50 }
51 }
52
53 async fn check_limits(
57 &self,
58 connection: &mut AsyncRedisConnection,
59 state: &mut LimitState<'_>,
60 timestamp: UnixTimestamp,
61 ) -> Result<Vec<CheckedLimits>> {
62 let limit = state.limit;
63
64 let scopes = state.take_scopes();
65
66 let mut num_hashes: u64 = 0;
67
68 let mut pipeline = self.script.pipe();
69 for (scope, entries) in &scopes {
70 let keys = scope.slots(timestamp).map(|slot| scope.to_redis_key(slot));
71
72 let hashes = entries.iter().map(|entry| entry.hash);
73 num_hashes += hashes.len() as u64;
74
75 pipeline.add_invocation(limit, scope.redis_key_ttl(), hashes, keys);
78 }
79
80 metric!(
81 histogram(CardinalityLimiterHistograms::RedisCheckHashes) = num_hashes,
82 id = state.id(),
83 );
84
85 let results = pipeline.invoke(connection).await?;
86
87 debug_assert_eq!(results.len(), scopes.len());
88 scopes
89 .into_iter()
90 .zip(results)
91 .inspect(|(_, result)| {
92 metric!(
93 histogram(CardinalityLimiterHistograms::RedisSetCardinality) =
94 result.cardinality as u64,
95 id = state.id(),
96 );
97 })
98 .map(|((scope, entries), result)| CheckedLimits::new(scope, entries, result))
99 .collect()
100 }
101}
102
103#[async_trait]
104impl Limiter for RedisSetLimiter {
105 async fn check_cardinality_limits<'a, 'b, E, R>(
106 &self,
107 scoping: Scoping,
108 limits: &'a [CardinalityLimit],
109 entries: E,
110 reporter: &mut R,
111 ) -> Result<()>
112 where
113 E: IntoIterator<Item = Entry<'b>> + Send,
114 R: Reporter<'a> + Send,
115 {
116 #[cfg(not(test))]
117 let timestamp = UnixTimestamp::now();
118 #[cfg(test)]
120 let timestamp = self.timestamp + self.time_offset;
121
122 let mut states = LimitState::from_limits(scoping, limits);
123
124 {
125 let cache = self.cache.read(timestamp);
127 for entry in entries {
128 for state in states.iter_mut() {
129 let Some(scope) = state.matching_scope(entry) else {
130 continue;
132 };
133
134 match cache.check(&scope, entry.hash, state.limit) {
135 CacheOutcome::Accepted => {
136 state.cache_hit();
138 state.accepted();
139 }
140 CacheOutcome::Rejected => {
141 reporter.reject(state.cardinality_limit(), entry.id);
143 state.cache_hit();
144 state.rejected();
145 }
146 CacheOutcome::Unknown => {
147 state.add(scope, RedisEntry::new(entry.id, entry.hash));
149 state.cache_miss();
150 }
151 }
152 }
153 }
154 }
155
156 let mut connection = self.redis.get_connection().await?;
157
158 for mut state in states {
159 if state.is_empty() {
160 continue;
161 }
162
163 let id = &state.id().to_string();
164 let scopes = num_scopes_tag(&state);
165 let results = metric!(
166 timer(CardinalityLimiterTimers::Redis),
167 id = id,
168 scopes = scopes,
169 {
170 self.check_limits(&mut connection, &mut state, timestamp)
171 .await
172 }
173 )?;
174
175 for result in results {
176 reporter.report_cardinality(state.cardinality_limit(), result.to_report(timestamp));
177
178 {
179 let mut cache = self.cache.update(&result.scope, timestamp);
185 for (entry, status) in result {
186 if status.is_rejected() {
187 reporter.reject(state.cardinality_limit(), entry.id);
188 state.rejected();
189 } else {
190 cache.accept(entry.hash);
191 state.accepted();
192 }
193 }
194 }
195 }
196 }
197
198 Ok(())
199 }
200}
201
202struct CheckedLimits {
203 scope: QuotaScoping,
204 cardinality: u32,
205 entries: Vec<RedisEntry>,
206 statuses: Vec<Status>,
207}
208
209impl CheckedLimits {
210 fn new(
211 scope: QuotaScoping,
212 entries: Vec<RedisEntry>,
213 result: CardinalityScriptResult,
214 ) -> Result<Self> {
215 result.validate(entries.len())?;
216
217 Ok(Self {
218 scope,
219 entries,
220 cardinality: result.cardinality,
221 statuses: result.statuses,
222 })
223 }
224
225 fn to_report(&self, timestamp: UnixTimestamp) -> CardinalityReport {
226 CardinalityReport {
227 timestamp,
228 organization_id: self.scope.organization_id,
229 project_id: self.scope.project_id,
230 metric_type: self.scope.metric_type,
231 metric_name: self.scope.metric_name.clone(),
232 cardinality: self.cardinality,
233 }
234 }
235}
236
237impl IntoIterator for CheckedLimits {
238 type Item = (RedisEntry, Status);
239 type IntoIter = std::iter::Zip<std::vec::IntoIter<RedisEntry>, std::vec::IntoIter<Status>>;
240
241 fn into_iter(self) -> Self::IntoIter {
242 debug_assert_eq!(
243 self.entries.len(),
244 self.statuses.len(),
245 "expected same amount of entries as statuses"
246 );
247 std::iter::zip(self.entries, self.statuses)
248 }
249}
250
251fn num_scopes_tag(state: &LimitState<'_>) -> &'static str {
253 match state.scopes().len() {
254 0 => "0",
255 1 => "1",
256 2 => "2",
257 3 => "3",
258 4 => "4",
259 5 => "5",
260 6..=10 => "10",
261 11..=25 => "25",
262 26..=50 => "50",
263 51..=100 => "100",
264 101..=500 => "500",
265 _ => "> 500",
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use std::collections::{BTreeMap, HashSet};
272 use std::sync::atomic::AtomicU64;
273
274 use relay_base_schema::metrics::{MetricName, MetricNamespace::*, MetricType};
275 use relay_base_schema::organization::OrganizationId;
276 use relay_base_schema::project::ProjectId;
277 use relay_redis::{RedisConfigOptions, redis};
278
279 use crate::limiter::EntryId;
280 use crate::redis::{KEY_PREFIX, KEY_VERSION};
281 use crate::{CardinalityScope, SlidingWindow};
282
283 use super::*;
284
285 fn build_limiter() -> RedisSetLimiter {
286 let url = std::env::var("RELAY_REDIS_URL")
287 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned());
288
289 let opts = RedisConfigOptions {
290 max_connections: 1,
291 ..Default::default()
292 };
293 let redis = AsyncRedisClient::single(&url, &opts).unwrap();
294
295 RedisSetLimiter::new(
296 RedisSetLimiterOptions {
297 cache_vacuum_interval: Duration::from_secs(5),
298 },
299 redis,
300 )
301 }
302
303 async fn new_scoping(limiter: &RedisSetLimiter) -> Scoping {
304 static ORGS: AtomicU64 = AtomicU64::new(100);
305
306 let scoping = Scoping {
307 organization_id: OrganizationId::new(
308 ORGS.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
309 ),
310 project_id: ProjectId::new(1),
311 };
312
313 limiter.flush(scoping).await;
314
315 scoping
316 }
317
318 #[derive(Debug, Default, PartialEq, Eq)]
319 struct TestReporter {
320 entries: HashSet<EntryId>,
321 reports: BTreeMap<CardinalityLimit, Vec<CardinalityReport>>,
322 }
323
324 impl TestReporter {
325 fn contains_any(&self, ids: impl IntoIterator<Item = usize>) -> bool {
326 ids.into_iter()
327 .any(|id| self.entries.contains(&EntryId(id)))
328 }
329
330 #[track_caller]
331 fn assert_cardinality(&self, limit: &CardinalityLimit, cardinality: u32) {
332 let Some(r) = self.reports.get(limit) else {
333 panic!("expected cardinality report for limit {limit:?}");
334 };
335 assert_eq!(r.len(), 1, "expected one cardinality report");
336 assert_eq!(r[0].cardinality, cardinality);
337 }
338 }
339
340 impl<'a> super::Reporter<'a> for TestReporter {
341 fn reject(&mut self, _limit: &'a CardinalityLimit, entry_id: EntryId) {
342 self.entries.insert(entry_id);
343 }
344
345 fn report_cardinality(&mut self, limit: &'a CardinalityLimit, report: CardinalityReport) {
346 self.reports.entry(limit.clone()).or_default().push(report);
347 }
348 }
349
350 impl std::ops::Deref for TestReporter {
351 type Target = HashSet<EntryId>;
352
353 fn deref(&self) -> &Self::Target {
354 &self.entries
355 }
356 }
357
358 impl RedisSetLimiter {
359 async fn flush(&self, scoping: Scoping) {
361 let pattern = format!(
362 "{KEY_PREFIX}:{KEY_VERSION}:scope-{{{o}-*",
363 o = scoping.organization_id
364 );
365
366 let mut connection = self.redis.get_connection().await.unwrap();
367
368 let keys = redis::cmd("KEYS")
369 .arg(pattern)
370 .query_async::<Vec<String>>(&mut connection)
371 .await
372 .unwrap();
373
374 if !keys.is_empty() {
375 let mut del = redis::cmd("DEL");
376 for key in keys {
377 del.arg(key);
378 }
379 del.query_async::<()>(&mut connection).await.unwrap();
380 }
381 }
382
383 async fn redis_sets(&self, scoping: Scoping) -> Vec<(String, usize)> {
384 let pattern = format!(
385 "{KEY_PREFIX}:{KEY_VERSION}:scope-{{{o}-*",
386 o = scoping.organization_id
387 );
388
389 let mut connection = self.redis.get_connection().await.unwrap();
390
391 let keys: Vec<String> = redis::cmd("KEYS")
392 .arg(pattern)
393 .query_async(&mut connection)
394 .await
395 .unwrap();
396
397 let mut results = Vec::with_capacity(keys.len());
398 for key in keys {
399 let size = redis::cmd("SCARD")
400 .arg(&key)
401 .query_async(&mut connection)
402 .await
403 .unwrap();
404
405 results.push((key, size));
406 }
407
408 results
409 }
410
411 async fn test_limits<'a, I>(
412 &self,
413 scoping: Scoping,
414 limits: &'a [CardinalityLimit],
415 entries: I,
416 ) -> TestReporter
417 where
418 I: IntoIterator<Item = Entry<'a>> + Send,
419 {
420 let mut reporter = TestReporter::default();
421 self.check_cardinality_limits(scoping, limits, entries, &mut reporter)
422 .await
423 .unwrap();
424 for reports in reporter.reports.values_mut() {
425 reports.sort();
426 }
427 reporter
428 }
429
430 fn timestamp(&self) -> UnixTimestamp {
431 self.timestamp + self.time_offset
432 }
433 }
434
435 #[tokio::test]
436 async fn test_limiter_accept_previously_seen() {
437 let limiter = build_limiter();
438
439 let m0 = MetricName::from("a");
440 let m1 = MetricName::from("b");
441 let m2 = MetricName::from("c");
442 let m3 = MetricName::from("d");
443 let m4 = MetricName::from("e");
444 let m5 = MetricName::from("f");
445
446 let entries = [
447 Entry::new(EntryId(0), Custom, &m0, 0),
448 Entry::new(EntryId(1), Custom, &m1, 1),
449 Entry::new(EntryId(2), Custom, &m2, 2),
450 Entry::new(EntryId(3), Custom, &m3, 3),
451 Entry::new(EntryId(4), Custom, &m4, 4),
452 Entry::new(EntryId(5), Custom, &m5, 5),
453 ];
454
455 let scoping = new_scoping(&limiter).await;
456 let mut limit = CardinalityLimit {
457 id: "limit".to_owned(),
458 passive: false,
459 report: false,
460 window: SlidingWindow {
461 window_seconds: 3600,
462 granularity_seconds: 360,
463 },
464 limit: 5,
465 scope: CardinalityScope::Organization,
466 namespace: Some(Custom),
467 };
468
469 let rejected = limiter
471 .test_limits(scoping, &[limit.clone()], entries)
472 .await;
473 assert_eq!(rejected.len(), 1);
474
475 limit.limit = 3;
478 let rejected2 = limiter
479 .test_limits(scoping, &[limit.clone()], entries)
480 .await;
481 assert_eq!(rejected2.entries, rejected.entries);
482
483 limit.limit = 6;
485 let rejected3 = limiter.test_limits(scoping, &[limit], entries).await;
486 assert_eq!(rejected3.len(), 0);
487 }
488
489 #[tokio::test]
490 async fn test_limiter_name_limit() {
491 let limiter = build_limiter();
492
493 let m0 = MetricName::from("a");
494 let m1 = MetricName::from("b");
495
496 let entries = [
497 Entry::new(EntryId(0), Custom, &m0, 0),
498 Entry::new(EntryId(1), Custom, &m0, 1),
499 Entry::new(EntryId(2), Custom, &m0, 2),
500 Entry::new(EntryId(3), Custom, &m1, 3),
501 Entry::new(EntryId(4), Custom, &m1, 4),
502 Entry::new(EntryId(5), Custom, &m1, 5),
503 ];
504
505 let scoping = new_scoping(&limiter).await;
506 let limit = CardinalityLimit {
507 id: "limit".to_owned(),
508 passive: false,
509 report: true,
510 window: SlidingWindow {
511 window_seconds: 3600,
512 granularity_seconds: 360,
513 },
514 limit: 2,
515 scope: CardinalityScope::Name,
516 namespace: Some(Custom),
517 };
518
519 let rejected = limiter
520 .test_limits(scoping, &[limit.clone()], entries)
521 .await;
522 assert_eq!(rejected.len(), 2);
523 assert!(rejected.contains_any([0, 1, 2]));
524 assert!(rejected.contains_any([3, 4, 5]));
525
526 assert_eq!(rejected.reports.len(), 1);
527 let reports = rejected.reports.get(&limit).unwrap();
528 assert_eq!(
529 reports,
530 &[
531 CardinalityReport {
532 timestamp: limiter.timestamp(),
533 organization_id: Some(scoping.organization_id),
534 project_id: Some(scoping.project_id),
535 metric_type: None,
536 metric_name: Some(m0),
537 cardinality: 2,
538 },
539 CardinalityReport {
540 timestamp: limiter.timestamp(),
541 organization_id: Some(scoping.organization_id),
542 project_id: Some(scoping.project_id),
543 metric_type: None,
544 metric_name: Some(m1),
545 cardinality: 2,
546 },
547 ]
548 );
549 }
550
551 #[tokio::test]
552 async fn test_limiter_type_limit() {
553 let limiter = build_limiter();
554
555 let m0 = MetricName::from("c:custom/foo@none");
556 let m1 = MetricName::from("c:custom/bar@none");
557 let m2 = MetricName::from("d:custom/foo@none");
558
559 let entries = [
560 Entry::new(EntryId(0), Custom, &m0, 0),
561 Entry::new(EntryId(1), Custom, &m0, 1),
562 Entry::new(EntryId(2), Custom, &m1, 2),
563 Entry::new(EntryId(3), Custom, &m2, 3),
564 Entry::new(EntryId(4), Custom, &m2, 4),
565 Entry::new(EntryId(5), Custom, &m2, 5),
566 ];
567
568 let scoping = new_scoping(&limiter).await;
569 let limit = CardinalityLimit {
570 id: "limit".to_owned(),
571 passive: false,
572 report: true,
573 window: SlidingWindow {
574 window_seconds: 3600,
575 granularity_seconds: 360,
576 },
577 limit: 2,
578 scope: CardinalityScope::Type,
579 namespace: Some(Custom),
580 };
581
582 let rejected = limiter
583 .test_limits(scoping, &[limit.clone()], entries)
584 .await;
585 assert_eq!(rejected.len(), 2);
586 assert!(rejected.contains_any([0, 1, 2]));
587 assert!(rejected.contains_any([3, 4, 5]));
588
589 assert_eq!(rejected.reports.len(), 1);
590 let reports = rejected.reports.get(&limit).unwrap();
591 assert_eq!(
592 reports,
593 &[
594 CardinalityReport {
595 timestamp: limiter.timestamp(),
596 organization_id: Some(scoping.organization_id),
597 project_id: Some(scoping.project_id),
598 metric_type: Some(MetricType::Counter),
599 metric_name: None,
600 cardinality: 2,
601 },
602 CardinalityReport {
603 timestamp: limiter.timestamp(),
604 organization_id: Some(scoping.organization_id),
605 project_id: Some(scoping.project_id),
606 metric_type: Some(MetricType::Distribution),
607 metric_name: None,
608 cardinality: 2,
609 },
610 ]
611 );
612 }
613
614 #[tokio::test]
615 async fn test_limiter_org_based_time_shift() {
616 let mut limiter = build_limiter();
617
618 let granularity_seconds = 10_000;
619
620 let scoping1 = Scoping {
621 organization_id: OrganizationId::new(granularity_seconds),
622 project_id: ProjectId::new(1),
623 };
624 let scoping2 = Scoping {
625 organization_id: OrganizationId::new(granularity_seconds / 2),
627 project_id: ProjectId::new(1),
628 };
629
630 let limits = &[CardinalityLimit {
631 id: "limit".to_owned(),
632 passive: false,
633 report: false,
634 window: SlidingWindow {
635 window_seconds: granularity_seconds * 3,
636 granularity_seconds,
637 },
638 limit: 1,
639 scope: CardinalityScope::Organization,
640 namespace: Some(Custom),
641 }];
642
643 let m = MetricName::from("a");
644
645 let entries1 = [Entry::new(EntryId(0), Custom, &m, 0)];
646 assert!(
647 limiter
648 .test_limits(scoping1, limits, entries1)
649 .await
650 .is_empty()
651 );
652 assert!(
653 limiter
654 .test_limits(scoping2, limits, entries1)
655 .await
656 .is_empty()
657 );
658
659 let entries2 = [Entry::new(EntryId(1), Custom, &m, 1)];
661 assert_eq!(
662 limiter.test_limits(scoping1, limits, entries2).await.len(),
663 1
664 );
665 assert_eq!(
666 limiter.test_limits(scoping2, limits, entries2).await.len(),
667 1
668 );
669
670 let mut scoping1_accept = None;
671 let mut scoping2_accept = None;
672
673 for i in 0..100 {
675 let offset = i * granularity_seconds / 10;
676
677 limiter.time_offset = Duration::from_secs(offset);
678
679 if scoping1_accept.is_none()
680 && limiter
681 .test_limits(scoping1, limits, entries2)
682 .await
683 .is_empty()
684 {
685 scoping1_accept = Some(offset as i64);
686 }
687
688 if scoping2_accept.is_none()
689 && limiter
690 .test_limits(scoping2, limits, entries2)
691 .await
692 .is_empty()
693 {
694 scoping2_accept = Some(offset as i64);
695 }
696
697 if scoping1_accept.is_some() && scoping2_accept.is_some() {
698 break;
699 }
700 }
701
702 let scoping1_accept = scoping1_accept.unwrap();
703 let scoping2_accept = scoping2_accept.unwrap();
704
705 let diff = (scoping1_accept - scoping2_accept).abs();
706 let expected = granularity_seconds as i64 / 2;
707 assert_eq!(diff, expected);
709 }
710
711 #[tokio::test]
712 async fn test_limiter_small_within_limits() {
713 let limiter = build_limiter();
714 let scoping = new_scoping(&limiter).await;
715
716 let limits = &[CardinalityLimit {
717 id: "limit".to_owned(),
718 passive: false,
719 report: false,
720 window: SlidingWindow {
721 window_seconds: 3600,
722 granularity_seconds: 360,
723 },
724 limit: 10_000,
725 scope: CardinalityScope::Organization,
726 namespace: Some(Custom),
727 }];
728
729 let m = MetricName::from("a");
730
731 let entries = (0..50)
732 .map(|i| Entry::new(EntryId(i as usize), Custom, &m, i))
733 .collect::<Vec<_>>();
734 let rejected = limiter.test_limits(scoping, limits, entries).await;
735 assert_eq!(rejected.len(), 0);
736
737 let entries = (100..150)
738 .map(|i| Entry::new(EntryId(i as usize), Custom, &m, i))
739 .collect::<Vec<_>>();
740 let rejected = limiter.test_limits(scoping, limits, entries).await;
741 assert_eq!(rejected.len(), 0);
742 }
743
744 #[tokio::test]
745 async fn test_limiter_big_limit() {
746 let limiter = build_limiter();
747
748 let scoping = new_scoping(&limiter).await;
749 let limits = &[CardinalityLimit {
750 id: "limit".to_owned(),
751 passive: false,
752 report: false,
753 window: SlidingWindow {
754 window_seconds: 3600,
755 granularity_seconds: 360,
756 },
757 limit: 80_000,
758 scope: CardinalityScope::Organization,
759 namespace: Some(Custom),
760 }];
761
762 let m = MetricName::from("a");
763
764 let entries = (0..100_000)
765 .map(|i| Entry::new(EntryId(i as usize), Custom, &m, i))
766 .collect::<Vec<_>>();
767
768 let rejected = limiter.test_limits(scoping, limits, entries).await;
769 assert_eq!(rejected.len(), 20_000);
770 }
771
772 #[tokio::test]
773 async fn test_limiter_sliding_window() {
774 let mut limiter = build_limiter();
775
776 let scoping = new_scoping(&limiter).await;
777 let window = SlidingWindow {
778 window_seconds: 3600,
779 granularity_seconds: 360,
780 };
781 let limits = &[CardinalityLimit {
782 id: "limit".to_owned(),
783 passive: false,
784 report: false,
785 window,
786 limit: 1,
787 scope: CardinalityScope::Organization,
788 namespace: Some(Custom),
789 }];
790
791 let m0 = MetricName::from("a");
792 let m1 = MetricName::from("b");
793
794 let entries1 = [Entry::new(EntryId(0), Custom, &m0, 0)];
795 let entries2 = [Entry::new(EntryId(1), Custom, &m1, 1)];
796
797 let rejected = limiter.test_limits(scoping, limits, entries1).await;
799 assert_eq!(rejected.len(), 0);
800
801 for i in 0..(window.window_seconds / window.granularity_seconds) * 2 {
802 limiter.time_offset = Duration::from_secs(i * window.granularity_seconds);
804
805 let rejected = limiter.test_limits(scoping, limits, entries1).await;
807 assert_eq!(rejected.len(), 0);
808 let rejected = limiter.test_limits(scoping, limits, entries2).await;
810 assert_eq!(rejected.len(), 1);
811 }
812
813 limiter.time_offset = Duration::from_secs(1 + window.window_seconds * 3);
815 let rejected = limiter.test_limits(scoping, limits, entries2).await;
817 assert_eq!(rejected.len(), 0);
818 let rejected = limiter.test_limits(scoping, limits, entries1).await;
820 assert_eq!(rejected.len(), 1);
821 }
822
823 #[tokio::test]
824 async fn test_limiter_no_namespace_limit_is_shared_limit() {
825 let limiter = build_limiter();
826 let scoping = new_scoping(&limiter).await;
827
828 let limits = &[CardinalityLimit {
829 id: "limit".to_owned(),
830 passive: false,
831 report: false,
832 window: SlidingWindow {
833 window_seconds: 3600,
834 granularity_seconds: 360,
835 },
836 limit: 2,
837 scope: CardinalityScope::Organization,
838 namespace: None,
839 }];
840
841 let m0 = MetricName::from("a");
842 let m1 = MetricName::from("b");
843 let m2 = MetricName::from("c");
844
845 let entries1 = [Entry::new(EntryId(0), Custom, &m0, 0)];
846 let entries2 = [Entry::new(EntryId(0), Spans, &m1, 1)];
847 let entries3 = [Entry::new(EntryId(0), Transactions, &m2, 2)];
848
849 let rejected = limiter.test_limits(scoping, limits, entries1).await;
850 assert_eq!(rejected.len(), 0);
851
852 let rejected = limiter.test_limits(scoping, limits, entries2).await;
853 assert_eq!(rejected.len(), 0);
854
855 let rejected = limiter.test_limits(scoping, limits, entries3).await;
856 assert_eq!(rejected.len(), 1);
857 }
858
859 #[tokio::test]
860 async fn test_limiter_multiple_limits() {
861 let limiter = build_limiter();
862 let scoping = new_scoping(&limiter).await;
863
864 let limits = &[
865 CardinalityLimit {
866 id: "limit1".to_owned(),
867 passive: false,
868 report: false,
869 window: SlidingWindow {
870 window_seconds: 3600,
871 granularity_seconds: 360,
872 },
873 limit: 1,
874 scope: CardinalityScope::Organization,
875 namespace: Some(Custom),
876 },
877 CardinalityLimit {
878 id: "limit2".to_owned(),
879 passive: false,
880 report: false,
881 window: SlidingWindow {
882 window_seconds: 3600,
883 granularity_seconds: 360,
884 },
885 limit: 1,
886 scope: CardinalityScope::Organization,
887 namespace: Some(Custom),
888 },
889 CardinalityLimit {
890 id: "limit3".to_owned(),
891 passive: false,
892 report: false,
893 window: SlidingWindow {
894 window_seconds: 3600,
895 granularity_seconds: 360,
896 },
897 limit: 1,
898 scope: CardinalityScope::Project,
899 namespace: Some(Spans),
900 },
901 CardinalityLimit {
902 id: "unknown_skipped".to_owned(),
903 passive: false,
904 report: false,
905 window: SlidingWindow {
906 window_seconds: 3600,
907 granularity_seconds: 360,
908 },
909 limit: 1,
910 scope: CardinalityScope::Unknown,
911 namespace: Some(Transactions),
912 },
913 ];
914
915 let m0 = MetricName::from("a");
916 let m1 = MetricName::from("b");
917 let m2 = MetricName::from("c");
918 let m3 = MetricName::from("d");
919 let m4 = MetricName::from("e");
920 let m5 = MetricName::from("f");
921
922 let entries = [
923 Entry::new(EntryId(0), Custom, &m0, 0),
924 Entry::new(EntryId(1), Custom, &m1, 1),
925 Entry::new(EntryId(2), Spans, &m2, 2),
926 Entry::new(EntryId(3), Spans, &m3, 3),
927 Entry::new(EntryId(4), Transactions, &m4, 4),
928 Entry::new(EntryId(5), Transactions, &m5, 5),
929 ];
930
931 for i in 0..3 {
933 let rejected = limiter.test_limits(scoping, limits, entries).await;
934
935 assert_eq!(rejected.len(), 2);
937 assert!(rejected.contains_any([0, 1]));
939 assert!(rejected.contains_any([2, 3]));
941 assert!(!rejected.contains(&EntryId(4)));
943 assert!(!rejected.contains(&EntryId(5)));
944
945 if i == 0 {
948 assert_eq!(rejected.reports.len(), 3);
949 assert_eq!(
950 rejected.reports.get(&limits[0]).unwrap(),
951 &[CardinalityReport {
952 timestamp: limiter.timestamp(),
953 organization_id: Some(scoping.organization_id),
954 project_id: None,
955 metric_type: None,
956 metric_name: None,
957 cardinality: 1
958 }]
959 );
960 assert_eq!(
961 rejected.reports.get(&limits[1]).unwrap(),
962 &[CardinalityReport {
963 timestamp: limiter.timestamp(),
964 organization_id: Some(scoping.organization_id),
965 project_id: None,
966 metric_type: None,
967 metric_name: None,
968 cardinality: 1
969 }]
970 );
971 assert_eq!(
972 rejected.reports.get(&limits[2]).unwrap(),
973 &[CardinalityReport {
974 timestamp: limiter.timestamp(),
975 organization_id: Some(scoping.organization_id),
976 project_id: Some(scoping.project_id),
977 metric_type: None,
978 metric_name: None,
979 cardinality: 1
980 }]
981 );
982 assert!(!rejected.reports.contains_key(&limits[3]));
983 } else {
984 assert!(rejected.reports.is_empty());
986 }
987 }
988 }
989
990 #[tokio::test]
991 async fn test_project_limit() {
992 let limiter = build_limiter();
993
994 let scoping1 = new_scoping(&limiter).await;
995 let scoping2 = Scoping {
996 project_id: ProjectId::new(2),
997 ..scoping1
998 };
999
1000 let limits = &[CardinalityLimit {
1001 id: "limit".to_owned(),
1002 passive: false,
1003 report: false,
1004 window: SlidingWindow {
1005 window_seconds: 3600,
1006 granularity_seconds: 360,
1007 },
1008 limit: 1,
1009 scope: CardinalityScope::Project,
1010 namespace: None,
1011 }];
1012
1013 let m1 = MetricName::from("a");
1014 let m2 = MetricName::from("b");
1015 let entries1 = [Entry::new(EntryId(0), Custom, &m1, 0)];
1016 let entries2 = [Entry::new(EntryId(0), Custom, &m2, 1)];
1017
1018 let rejected = limiter.test_limits(scoping1, limits, entries1).await;
1020 assert_eq!(rejected.len(), 0);
1021 let rejected = limiter.test_limits(scoping2, limits, entries2).await;
1022 assert_eq!(rejected.len(), 0);
1023
1024 let rejected = limiter.test_limits(scoping1, limits, entries2).await;
1026 assert_eq!(rejected.len(), 1);
1027 let rejected = limiter.test_limits(scoping2, limits, entries1).await;
1028 assert_eq!(rejected.len(), 1);
1029 }
1030
1031 #[tokio::test]
1032 async fn test_limiter_sliding_window_full() {
1033 let mut limiter = build_limiter();
1034 let scoping = new_scoping(&limiter).await;
1035
1036 let window = SlidingWindow {
1037 window_seconds: 300,
1038 granularity_seconds: 100,
1039 };
1040 let limits = &[CardinalityLimit {
1041 id: "limit".to_owned(),
1042 passive: false,
1043 report: false,
1044 window,
1045 limit: 100,
1046 scope: CardinalityScope::Organization,
1047 namespace: Some(Custom),
1048 }];
1049
1050 let m = MetricName::from("foo");
1051 macro_rules! test {
1052 ($r:expr) => {{
1053 let entries = $r
1054 .map(|i| Entry::new(EntryId(i as usize), Custom, &m, i))
1055 .collect::<Vec<_>>();
1056
1057 limiter.test_limits(scoping, limits, entries).await
1058 }};
1059 }
1060
1061 macro_rules! assert_test {
1062 ($v:expr, $rejected:expr) => {{
1063 let report = test!($v);
1064 assert_eq!(report.len(), $rejected);
1065 }};
1066 ($v:expr, $rejected:expr, $cardinality:expr) => {{
1067 let report = test!($v);
1068 assert_eq!(report.len(), $rejected);
1069 report.assert_cardinality(&limits[0], $cardinality);
1070 }};
1071 }
1072
1073 assert_test!(0..100, 0, 100);
1075
1076 limiter.time_offset = Duration::from_secs(window.granularity_seconds);
1078 assert_test!(0..50, 0, 100);
1079 assert_test!(100..125, 25, 100);
1080
1081 limiter.time_offset = Duration::from_secs(window.granularity_seconds * 2);
1083 assert_test!(0..50, 0, 100);
1084 assert_test!(125..150, 25, 100);
1085
1086 limiter.time_offset = Duration::from_secs(window.granularity_seconds * 3);
1089 assert_test!(0..50, 0, 50);
1090 assert_test!(150..175, 0, 75);
1091
1092 limiter.time_offset = Duration::from_secs(window.granularity_seconds * 4);
1094 assert_test!(0..50, 0, 75);
1095 assert_test!(175..200, 0, 100);
1096
1097 for i in 0..21 {
1102 let start = 200 + i * 25;
1103 let end = start + 25;
1104
1105 limiter.time_offset = Duration::from_secs(window.granularity_seconds * (i as u64 + 5));
1106
1107 if i % 3 == 0 {
1108 assert_test!(0..50, 0, 100);
1109 assert_test!(start..end, 25, 100);
1110 } else {
1111 assert_test!(0..50, 0, 75);
1112 assert_test!(start..end, 0, 100);
1113 }
1114 }
1115
1116 let mut sets = limiter.redis_sets(scoping).await;
1117 sets.sort();
1118
1119 let len = sets.len();
1120 for (i, (_, size)) in sets.into_iter().enumerate() {
1121 if i == 3 || i + 1 == len {
1123 assert_eq!(size, 75);
1124 } else {
1125 assert_eq!(size, 100);
1126 }
1127 }
1128 }
1129
1130 #[tokio::test]
1131 async fn test_limiter_sliding_window_perfect() {
1132 let mut limiter = build_limiter();
1133 let scoping = new_scoping(&limiter).await;
1134
1135 let window = SlidingWindow {
1136 window_seconds: 300,
1137 granularity_seconds: 100,
1138 };
1139 let limits = &[CardinalityLimit {
1140 id: "limit".to_owned(),
1141 passive: false,
1142 report: false,
1143 window,
1144 limit: 100,
1145 scope: CardinalityScope::Organization,
1146 namespace: Some(Custom),
1147 }];
1148
1149 let m = MetricName::from("foo");
1150 macro_rules! test {
1151 ($r:expr) => {{
1152 let entries = $r
1153 .map(|i| Entry::new(EntryId(i as usize), Custom, &m, i))
1154 .collect::<Vec<_>>();
1155
1156 limiter.test_limits(scoping, limits, entries).await
1157 }};
1158 }
1159
1160 let report = test!(0..100);
1173 assert!(report.is_empty());
1174 report.assert_cardinality(&limits[0], 100);
1175
1176 limiter.time_offset = Duration::from_secs(window.granularity_seconds);
1178 let report = test!(0..100);
1179 assert!(report.is_empty());
1180 report.assert_cardinality(&limits[0], 100);
1181
1182 limiter.time_offset = Duration::from_secs(window.window_seconds);
1184 assert_eq!(test!(200..300).len(), 100);
1186 assert!(test!(0..100).is_empty());
1188 }
1189}