1use std::fmt;
2use std::str::FromStr;
3use std::sync::{Arc, Mutex, PoisonError};
4use std::time::{Duration, Instant};
5
6use relay_base_schema::data_category::DataCategory;
7use relay_base_schema::metrics::MetricNamespace;
8use relay_base_schema::organization::OrganizationId;
9use relay_base_schema::project::{ProjectId, ProjectKey};
10use smallvec::SmallVec;
11
12use crate::REJECT_ALL_SECS;
13use crate::quota::{DataCategories, ItemScoping, Quota, QuotaScope, ReasonCode, Scoping};
14
15#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
21pub struct RetryAfter {
22 when: Instant,
23}
24
25impl RetryAfter {
26 #[inline]
28 pub fn from_secs(seconds: u64) -> Self {
29 let now = Instant::now();
30 let when = now.checked_add(Duration::from_secs(seconds)).unwrap_or(now);
31 Self { when }
32 }
33
34 #[inline]
39 pub fn remaining_at(self, at: Instant) -> Option<Duration> {
40 if at >= self.when {
41 None
42 } else {
43 Some(self.when - at)
44 }
45 }
46
47 #[inline]
52 pub fn remaining(self) -> Option<Duration> {
53 self.remaining_at(Instant::now())
54 }
55
56 #[inline]
62 pub fn remaining_seconds_at(self, at: Instant) -> u64 {
63 match self.remaining_at(at) {
64 Some(duration) if duration.subsec_nanos() == 0 => duration.as_secs(),
66 Some(duration) => duration.as_secs() + 1,
67 None => 0,
68 }
69 }
70
71 #[inline]
76 pub fn remaining_seconds(self) -> u64 {
77 self.remaining_seconds_at(Instant::now())
78 }
79
80 #[inline]
82 pub fn expired_at(self, at: Instant) -> bool {
83 self.remaining_at(at).is_none()
84 }
85
86 #[inline]
88 pub fn expired(self) -> bool {
89 self.remaining_at(Instant::now()).is_none()
90 }
91}
92
93impl fmt::Debug for RetryAfter {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 match self.remaining_seconds() {
96 0 => write!(f, "RetryAfter(expired)"),
97 remaining => write!(f, "RetryAfter({remaining}s)"),
98 }
99 }
100}
101
102#[cfg(test)]
103impl serde::Serialize for RetryAfter {
104 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
105 where
106 S: serde::Serializer,
107 {
108 use serde::ser::SerializeTupleStruct;
109 let mut tup = serializer.serialize_tuple_struct("RetryAfter", 1)?;
110 tup.serialize_field(&self.remaining_seconds())?;
111 tup.end()
112 }
113}
114
115#[derive(Debug)]
117pub enum InvalidRetryAfter {
118 InvalidDelay(std::num::ParseFloatError),
120}
121
122impl FromStr for RetryAfter {
123 type Err = InvalidRetryAfter;
124
125 fn from_str(s: &str) -> Result<Self, Self::Err> {
126 let float = s.parse::<f64>().map_err(InvalidRetryAfter::InvalidDelay)?;
127 let seconds = float.max(0.0).ceil() as u64;
128 Ok(RetryAfter::from_secs(seconds))
129 }
130}
131
132#[derive(Clone, Debug, Eq, PartialEq)]
140#[cfg_attr(test, derive(serde::Serialize))]
141pub enum RateLimitScope {
142 Global,
144 Organization(OrganizationId),
146 Project(ProjectId),
148 Key(ProjectKey),
150}
151
152impl RateLimitScope {
153 pub fn for_quota(scoping: Scoping, scope: QuotaScope) -> Self {
158 match scope {
159 QuotaScope::Global => Self::Global,
160 QuotaScope::Organization => Self::Organization(scoping.organization_id),
161 QuotaScope::Project => Self::Project(scoping.project_id),
162 QuotaScope::Key => Self::Key(scoping.project_key),
163 QuotaScope::Unknown => Self::Key(scoping.project_key),
165 }
166 }
167
168 pub fn name(&self) -> &'static str {
172 match *self {
173 Self::Global => QuotaScope::Global.name(),
174 Self::Key(_) => QuotaScope::Key.name(),
175 Self::Project(_) => QuotaScope::Project.name(),
176 Self::Organization(_) => QuotaScope::Organization.name(),
177 }
178 }
179}
180
181#[derive(Clone, Debug, PartialEq)]
188#[cfg_attr(test, derive(serde::Serialize))]
189pub struct RateLimit {
190 pub categories: DataCategories,
193
194 pub scope: RateLimitScope,
196
197 pub reason_code: Option<ReasonCode>,
199
200 pub retry_after: RetryAfter,
202
203 pub namespaces: SmallVec<[MetricNamespace; 1]>,
208}
209
210impl RateLimit {
211 pub fn from_quota(quota: &Quota, scoping: Scoping, retry_after: RetryAfter) -> Self {
216 Self {
217 categories: quota.categories.clone(),
218 scope: RateLimitScope::for_quota(scoping, quota.scope),
219 reason_code: quota.reason_code.clone(),
220 retry_after,
221 namespaces: quota.namespace.into_iter().collect(),
222 }
223 }
224
225 pub fn matches(&self, scoping: ItemScoping) -> bool {
230 self.matches_scope(scoping)
231 && scoping.matches_categories(&self.categories)
232 && scoping.matches_namespaces(&self.namespaces)
233 }
234
235 fn matches_scope(&self, scoping: ItemScoping) -> bool {
237 match self.scope {
238 RateLimitScope::Global => true,
239 RateLimitScope::Organization(org_id) => scoping.organization_id == org_id,
240 RateLimitScope::Project(project_id) => scoping.project_id == project_id,
241 RateLimitScope::Key(key) => scoping.project_key == key,
242 }
243 }
244}
245
246#[derive(Clone, Debug, Default)]
254#[cfg_attr(test, derive(serde::Serialize))]
255pub struct RateLimits {
256 limits: Vec<RateLimit>,
257}
258
259impl RateLimits {
260 pub fn new() -> Self {
262 Self::default()
263 }
264
265 pub fn add(&mut self, limit: RateLimit) {
270 let limit_opt = self.limits.iter_mut().find(|l| {
271 let RateLimit {
272 categories,
273 scope,
274 reason_code: _,
275 retry_after: _,
276 namespaces: namespace,
277 } = &limit;
278
279 *categories == l.categories && *scope == l.scope && *namespace == l.namespaces
280 });
281
282 match limit_opt {
283 None => self.limits.push(limit),
284 Some(existing) if existing.retry_after < limit.retry_after => *existing = limit,
285 Some(_) => (), }
287 }
288
289 pub fn merge(&mut self, limits: Self) {
295 for limit in limits {
296 self.add(limit);
297 }
298 }
299
300 pub fn merge_with(mut self, other: Self) -> Self {
304 self.merge(other);
305 self
306 }
307
308 pub fn is_ok(&self) -> bool {
312 !self.is_limited()
313 }
314
315 pub fn is_limited(&self) -> bool {
319 let now = Instant::now();
320 self.iter().any(|limit| !limit.retry_after.expired_at(now))
321 }
322
323 pub fn is_any_limited(&self, scopings: &[ItemScoping]) -> bool {
328 self.is_any_limited_with_quotas(&[], scopings)
329 }
330
331 pub fn is_any_limited_with_quotas<'a>(
339 &self,
340 quotas: impl IntoIterator<Item = &'a Quota>,
341 scopings: &[ItemScoping],
342 ) -> bool {
343 for quota in quotas {
344 for scoping in scopings {
345 if quota.limit == Some(0) && quota.matches(*scoping) {
346 return true;
347 }
348 }
349 }
350
351 let now = Instant::now();
352 for scoping in scopings {
353 for limit in &self.limits {
354 if limit.matches(*scoping) && !limit.retry_after.expired_at(now) {
355 return true;
356 }
357 }
358 }
359
360 false
361 }
362
363 pub fn clean_expired(&mut self, now: Instant) {
368 self.limits
369 .retain(|limit| !limit.retry_after.expired_at(now));
370 }
371
372 pub fn check(&self, scoping: ItemScoping) -> Self {
378 self.check_with_quotas(&[], scoping)
379 }
380
381 pub fn check_with_quotas<'a>(
389 &self,
390 quotas: impl IntoIterator<Item = &'a Quota>,
391 scoping: ItemScoping,
392 ) -> Self {
393 let mut applied_limits = Self::new();
394
395 for quota in quotas {
396 if quota.limit == Some(0) && quota.matches(scoping) {
397 let retry_after = RetryAfter::from_secs(REJECT_ALL_SECS);
398 applied_limits.add(RateLimit::from_quota(quota, *scoping, retry_after));
399 }
400 }
401
402 for limit in &self.limits {
403 if limit.matches(scoping) {
404 applied_limits.add(limit.clone());
405 }
406 }
407
408 applied_limits
409 }
410
411 pub fn iter(&self) -> RateLimitsIter<'_> {
413 RateLimitsIter {
414 iter: self.limits.iter(),
415 }
416 }
417
418 pub fn longest(&self) -> Option<&RateLimit> {
423 self.iter().max_by_key(|limit| limit.retry_after)
424 }
425
426 pub fn is_empty(&self) -> bool {
431 self.limits.is_empty()
432 }
433}
434
435pub struct RateLimitsIter<'a> {
440 iter: std::slice::Iter<'a, RateLimit>,
441}
442
443impl<'a> Iterator for RateLimitsIter<'a> {
444 type Item = &'a RateLimit;
445
446 fn next(&mut self) -> Option<Self::Item> {
447 self.iter.next()
448 }
449}
450
451impl IntoIterator for RateLimits {
452 type IntoIter = RateLimitsIntoIter;
453 type Item = RateLimit;
454
455 fn into_iter(self) -> Self::IntoIter {
456 RateLimitsIntoIter {
457 iter: self.limits.into_iter(),
458 }
459 }
460}
461
462pub struct RateLimitsIntoIter {
467 iter: std::vec::IntoIter<RateLimit>,
468}
469
470impl Iterator for RateLimitsIntoIter {
471 type Item = RateLimit;
472
473 fn next(&mut self) -> Option<Self::Item> {
474 self.iter.next()
475 }
476}
477
478impl<'a> IntoIterator for &'a RateLimits {
479 type IntoIter = RateLimitsIter<'a>;
480 type Item = &'a RateLimit;
481
482 fn into_iter(self) -> Self::IntoIter {
483 self.iter()
484 }
485}
486
487#[derive(Debug, Default)]
496pub struct CachedRateLimits(Mutex<Arc<RateLimits>>);
497
498impl CachedRateLimits {
499 pub fn new() -> Self {
501 Self::default()
502 }
503
504 pub fn add(&self, limit: RateLimit) {
508 let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
509 let current = Arc::make_mut(&mut inner);
510 current.add(limit);
511 }
512
513 pub fn merge(&self, limits: RateLimits) {
517 if limits.is_empty() {
518 return;
519 }
520
521 let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
522 let current = Arc::make_mut(&mut inner);
523 for mut limit in limits {
524 for i in 0..limit.categories.len() {
535 let Some(category) = limit.categories.get(i) else {
536 debug_assert!(false, "logical error");
537 break;
538 };
539
540 for inherited in inherited_categories(category) {
541 if let Some(categories) = limit.categories.add(*inherited) {
542 limit.categories = categories;
543 }
544 }
545 }
546
547 current.add(limit);
548 }
549 }
550
551 pub fn current_limits(&self) -> Arc<RateLimits> {
556 let now = Instant::now();
557 let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
558 Arc::make_mut(&mut inner).clean_expired(now);
559 Arc::clone(&inner)
560 }
561}
562
563fn inherited_categories(category: &DataCategory) -> &'static [DataCategory] {
573 match category {
574 DataCategory::Transaction => &[DataCategory::Span],
575 DataCategory::Span => &[DataCategory::Transaction],
576 _ => &[],
577 }
578}
579
580#[cfg(test)]
581mod tests {
582 use smallvec::smallvec;
583
584 use super::*;
585 use crate::MetricNamespaceScoping;
586 use crate::quota::DataCategory;
587
588 #[test]
589 fn test_parse_retry_after() {
590 let retry_after = "17.1".parse::<RetryAfter>().expect("parse RetryAfter");
592 assert_eq!(retry_after.remaining_seconds(), 18);
593 assert!(!retry_after.expired());
594 let retry_after = "17.7".parse::<RetryAfter>().expect("parse RetryAfter");
595 assert_eq!(retry_after.remaining_seconds(), 18);
596 assert!(!retry_after.expired());
597
598 let retry_after = "17".parse::<RetryAfter>().expect("parse RetryAfter");
600 assert_eq!(retry_after.remaining_seconds(), 17);
601 assert!(!retry_after.expired());
602
603 let retry_after = "-2".parse::<RetryAfter>().expect("parse RetryAfter");
605 assert_eq!(retry_after.remaining_seconds(), 0);
606 assert!(retry_after.expired());
607 let retry_after = "-inf".parse::<RetryAfter>().expect("parse RetryAfter");
608 assert_eq!(retry_after.remaining_seconds(), 0);
609 assert!(retry_after.expired());
610
611 let retry_after = "inf".parse::<RetryAfter>().expect("parse RetryAfter");
613 assert_eq!(retry_after.remaining_seconds(), 0);
614 assert!(retry_after.expired());
615 let retry_after = "NaN".parse::<RetryAfter>().expect("parse RetryAfter");
616 assert_eq!(retry_after.remaining_seconds(), 0);
617 assert!(retry_after.expired());
618
619 let retry_after = "100000000000000000000"
621 .parse::<RetryAfter>()
622 .expect("parse RetryAfter");
623 assert_eq!(retry_after.remaining_seconds(), 0);
624 assert!(retry_after.expired());
625
626 "".parse::<RetryAfter>().expect_err("error RetryAfter");
628 "nope".parse::<RetryAfter>().expect_err("error RetryAfter");
629 " 2 ".parse::<RetryAfter>().expect_err("error RetryAfter");
630 "6 0".parse::<RetryAfter>().expect_err("error RetryAfter");
631 }
632
633 #[test]
634 fn test_rate_limit_matches_categories() {
635 let rate_limit = RateLimit {
636 categories: [DataCategory::Unknown, DataCategory::Error].into(),
637 scope: RateLimitScope::Organization(OrganizationId::new(42)),
638 reason_code: None,
639 retry_after: RetryAfter::from_secs(1),
640 namespaces: smallvec![],
641 };
642
643 assert!(rate_limit.matches(ItemScoping {
644 category: DataCategory::Error,
645 scoping: Scoping {
646 organization_id: OrganizationId::new(42),
647 project_id: ProjectId::new(21),
648 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
649 key_id: None,
650 },
651 namespace: MetricNamespaceScoping::None,
652 }));
653
654 assert!(!rate_limit.matches(ItemScoping {
655 category: DataCategory::Transaction,
656 scoping: Scoping {
657 organization_id: OrganizationId::new(42),
658 project_id: ProjectId::new(21),
659 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
660 key_id: None,
661 },
662 namespace: MetricNamespaceScoping::None,
663 }));
664 }
665
666 #[test]
667 fn test_rate_limit_matches_organization() {
668 let rate_limit = RateLimit {
669 categories: DataCategories::new(),
670 scope: RateLimitScope::Organization(OrganizationId::new(42)),
671 reason_code: None,
672 retry_after: RetryAfter::from_secs(1),
673 namespaces: smallvec![],
674 };
675
676 assert!(rate_limit.matches(ItemScoping {
677 category: DataCategory::Error,
678 scoping: Scoping {
679 organization_id: OrganizationId::new(42),
680 project_id: ProjectId::new(21),
681 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
682 key_id: None,
683 },
684 namespace: MetricNamespaceScoping::None,
685 }));
686
687 assert!(!rate_limit.matches(ItemScoping {
688 category: DataCategory::Error,
689 scoping: Scoping {
690 organization_id: OrganizationId::new(0),
691 project_id: ProjectId::new(21),
692 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
693 key_id: None,
694 },
695 namespace: MetricNamespaceScoping::None,
696 }));
697 }
698
699 #[test]
700 fn test_rate_limit_matches_project() {
701 let rate_limit = RateLimit {
702 categories: DataCategories::new(),
703 scope: RateLimitScope::Project(ProjectId::new(21)),
704 reason_code: None,
705 retry_after: RetryAfter::from_secs(1),
706 namespaces: smallvec![],
707 };
708
709 assert!(rate_limit.matches(ItemScoping {
710 category: DataCategory::Error,
711 scoping: Scoping {
712 organization_id: OrganizationId::new(42),
713 project_id: ProjectId::new(21),
714 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
715 key_id: None,
716 },
717 namespace: MetricNamespaceScoping::None,
718 }));
719
720 assert!(!rate_limit.matches(ItemScoping {
721 category: DataCategory::Error,
722 scoping: Scoping {
723 organization_id: OrganizationId::new(42),
724 project_id: ProjectId::new(0),
725 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
726 key_id: None,
727 },
728 namespace: MetricNamespaceScoping::None,
729 }));
730 }
731
732 #[test]
733 fn test_rate_limit_matches_namespaces() {
734 let rate_limit = RateLimit {
735 categories: Default::default(),
736 scope: RateLimitScope::Organization(OrganizationId::new(42)),
737 reason_code: None,
738 retry_after: RetryAfter::from_secs(1),
739 namespaces: smallvec![MetricNamespace::Custom],
740 };
741
742 let scoping = Scoping {
743 organization_id: OrganizationId::new(42),
744 project_id: ProjectId::new(21),
745 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
746 key_id: None,
747 };
748
749 assert!(rate_limit.matches(ItemScoping {
750 category: DataCategory::MetricBucket,
751 scoping,
752 namespace: MetricNamespaceScoping::Some(MetricNamespace::Custom),
753 }));
754
755 assert!(!rate_limit.matches(ItemScoping {
756 category: DataCategory::MetricBucket,
757 scoping,
758 namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
759 }));
760
761 let general_rate_limit = RateLimit {
762 categories: Default::default(),
763 scope: RateLimitScope::Organization(OrganizationId::new(42)),
764 reason_code: None,
765 retry_after: RetryAfter::from_secs(1),
766 namespaces: smallvec![], };
768
769 assert!(general_rate_limit.matches(ItemScoping {
770 category: DataCategory::MetricBucket,
771 scoping,
772 namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
773 }));
774
775 assert!(general_rate_limit.matches(ItemScoping {
776 category: DataCategory::MetricBucket,
777 scoping,
778 namespace: MetricNamespaceScoping::None,
779 }));
780 }
781
782 #[test]
783 fn test_rate_limit_matches_key() {
784 let rate_limit = RateLimit {
785 categories: DataCategories::new(),
786 scope: RateLimitScope::Key(
787 ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
788 ),
789 reason_code: None,
790 retry_after: RetryAfter::from_secs(1),
791 namespaces: smallvec![],
792 };
793
794 assert!(rate_limit.matches(ItemScoping {
795 category: DataCategory::Error,
796 scoping: Scoping {
797 organization_id: OrganizationId::new(42),
798 project_id: ProjectId::new(21),
799 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
800 key_id: None,
801 },
802 namespace: MetricNamespaceScoping::None,
803 }));
804
805 assert!(!rate_limit.matches(ItemScoping {
806 category: DataCategory::Error,
807 scoping: Scoping {
808 organization_id: OrganizationId::new(0),
809 project_id: ProjectId::new(21),
810 project_key: ProjectKey::parse("deadbeefdeadbeefdeadbeefdeadbeef").unwrap(),
811 key_id: None,
812 },
813 namespace: MetricNamespaceScoping::None,
814 }));
815 }
816
817 #[test]
818 fn test_rate_limits_add_replacement() {
819 let mut rate_limits = RateLimits::new();
820
821 rate_limits.add(RateLimit {
822 categories: [DataCategory::Default, DataCategory::Error].into(),
823 scope: RateLimitScope::Organization(OrganizationId::new(42)),
824 reason_code: Some(ReasonCode::new("first")),
825 retry_after: RetryAfter::from_secs(1),
826 namespaces: smallvec![],
827 });
828
829 rate_limits.add(RateLimit {
831 categories: [DataCategory::Error, DataCategory::Default].into(),
832 scope: RateLimitScope::Organization(OrganizationId::new(42)),
833 reason_code: Some(ReasonCode::new("second")),
834 retry_after: RetryAfter::from_secs(10),
835 namespaces: smallvec![],
836 });
837
838 insta::assert_ron_snapshot!(rate_limits, @r#"
839 RateLimits(
840 limits: [
841 RateLimit(
842 categories: [
843 "default",
844 "error",
845 ],
846 scope: Organization(OrganizationId(42)),
847 reason_code: Some(ReasonCode("second")),
848 retry_after: RetryAfter(10),
849 namespaces: [],
850 ),
851 ],
852 )
853 "#);
854 }
855
856 #[test]
857 fn test_rate_limits_add_shadowing() {
858 let mut rate_limits = RateLimits::new();
859
860 rate_limits.add(RateLimit {
861 categories: [DataCategory::Default, DataCategory::Error].into(),
862 scope: RateLimitScope::Organization(OrganizationId::new(42)),
863 reason_code: Some(ReasonCode::new("first")),
864 retry_after: RetryAfter::from_secs(10),
865 namespaces: smallvec![],
866 });
867
868 rate_limits.add(RateLimit {
870 categories: [DataCategory::Error, DataCategory::Default].into(),
871 scope: RateLimitScope::Organization(OrganizationId::new(42)),
872 reason_code: Some(ReasonCode::new("second")),
873 retry_after: RetryAfter::from_secs(1),
874 namespaces: smallvec![],
875 });
876
877 insta::assert_ron_snapshot!(rate_limits, @r#"
878 RateLimits(
879 limits: [
880 RateLimit(
881 categories: [
882 "default",
883 "error",
884 ],
885 scope: Organization(OrganizationId(42)),
886 reason_code: Some(ReasonCode("first")),
887 retry_after: RetryAfter(10),
888 namespaces: [],
889 ),
890 ],
891 )
892 "#);
893 }
894
895 #[test]
896 fn test_rate_limits_add_buckets() {
897 let mut rate_limits = RateLimits::new();
898
899 rate_limits.add(RateLimit {
900 categories: [DataCategory::Error].into(),
901 scope: RateLimitScope::Organization(OrganizationId::new(42)),
902 reason_code: None,
903 retry_after: RetryAfter::from_secs(1),
904 namespaces: smallvec![],
905 });
906
907 rate_limits.add(RateLimit {
909 categories: [DataCategory::Transaction].into(),
910 scope: RateLimitScope::Organization(OrganizationId::new(42)),
911 reason_code: None,
912 retry_after: RetryAfter::from_secs(1),
913 namespaces: smallvec![],
914 });
915
916 rate_limits.add(RateLimit {
918 categories: [DataCategory::Error].into(),
919 scope: RateLimitScope::Project(ProjectId::new(21)),
920 reason_code: None,
921 retry_after: RetryAfter::from_secs(1),
922 namespaces: smallvec![],
923 });
924
925 insta::assert_ron_snapshot!(rate_limits, @r#"
926 RateLimits(
927 limits: [
928 RateLimit(
929 categories: [
930 "error",
931 ],
932 scope: Organization(OrganizationId(42)),
933 reason_code: None,
934 retry_after: RetryAfter(1),
935 namespaces: [],
936 ),
937 RateLimit(
938 categories: [
939 "transaction",
940 ],
941 scope: Organization(OrganizationId(42)),
942 reason_code: None,
943 retry_after: RetryAfter(1),
944 namespaces: [],
945 ),
946 RateLimit(
947 categories: [
948 "error",
949 ],
950 scope: Project(ProjectId(21)),
951 reason_code: None,
952 retry_after: RetryAfter(1),
953 namespaces: [],
954 ),
955 ],
956 )
957 "#);
958 }
959
960 #[test]
962 fn test_rate_limits_add_namespaces() {
963 let mut rate_limits = RateLimits::new();
964
965 rate_limits.add(RateLimit {
966 categories: [DataCategory::MetricBucket].into(),
967 scope: RateLimitScope::Organization(OrganizationId::new(42)),
968 reason_code: None,
969 retry_after: RetryAfter::from_secs(1),
970 namespaces: smallvec![MetricNamespace::Custom],
971 });
972
973 rate_limits.add(RateLimit {
975 categories: [DataCategory::MetricBucket].into(),
976 scope: RateLimitScope::Organization(OrganizationId::new(42)),
977 reason_code: None,
978 retry_after: RetryAfter::from_secs(1),
979 namespaces: smallvec![MetricNamespace::Spans],
980 });
981
982 insta::assert_ron_snapshot!(rate_limits, @r#"
983 RateLimits(
984 limits: [
985 RateLimit(
986 categories: [
987 "metric_bucket",
988 ],
989 scope: Organization(OrganizationId(42)),
990 reason_code: None,
991 retry_after: RetryAfter(1),
992 namespaces: [
993 "custom",
994 ],
995 ),
996 RateLimit(
997 categories: [
998 "metric_bucket",
999 ],
1000 scope: Organization(OrganizationId(42)),
1001 reason_code: None,
1002 retry_after: RetryAfter(1),
1003 namespaces: [
1004 "spans",
1005 ],
1006 ),
1007 ],
1008 )
1009 "#);
1010 }
1011
1012 #[test]
1013 fn test_rate_limits_longest() {
1014 let mut rate_limits = RateLimits::new();
1015
1016 rate_limits.add(RateLimit {
1017 categories: [DataCategory::Error].into(),
1018 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1019 reason_code: Some(ReasonCode::new("first")),
1020 retry_after: RetryAfter::from_secs(1),
1021 namespaces: smallvec![],
1022 });
1023
1024 rate_limits.add(RateLimit {
1026 categories: [DataCategory::Transaction].into(),
1027 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1028 reason_code: Some(ReasonCode::new("second")),
1029 retry_after: RetryAfter::from_secs(10),
1030 namespaces: smallvec![],
1031 });
1032
1033 let rate_limit = rate_limits.longest().unwrap();
1034 insta::assert_ron_snapshot!(rate_limit, @r#"
1035 RateLimit(
1036 categories: [
1037 "transaction",
1038 ],
1039 scope: Organization(OrganizationId(42)),
1040 reason_code: Some(ReasonCode("second")),
1041 retry_after: RetryAfter(10),
1042 namespaces: [],
1043 )
1044 "#);
1045 }
1046
1047 #[test]
1048 fn test_rate_limits_clean_expired() {
1049 let mut rate_limits = RateLimits::new();
1050
1051 rate_limits.add(RateLimit {
1053 categories: [DataCategory::Error].into(),
1054 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1055 reason_code: None,
1056 retry_after: RetryAfter::from_secs(1),
1057 namespaces: smallvec![],
1058 });
1059
1060 rate_limits.add(RateLimit {
1062 categories: [DataCategory::Error].into(),
1063 scope: RateLimitScope::Project(ProjectId::new(21)),
1064 reason_code: None,
1065 retry_after: RetryAfter::from_secs(0),
1066 namespaces: smallvec![],
1067 });
1068
1069 assert_eq!(rate_limits.iter().count(), 2);
1071
1072 rate_limits.clean_expired(Instant::now());
1073
1074 insta::assert_ron_snapshot!(rate_limits, @r#"
1076 RateLimits(
1077 limits: [
1078 RateLimit(
1079 categories: [
1080 "error",
1081 ],
1082 scope: Organization(OrganizationId(42)),
1083 reason_code: None,
1084 retry_after: RetryAfter(1),
1085 namespaces: [],
1086 ),
1087 ],
1088 )
1089 "#);
1090 }
1091
1092 #[test]
1093 fn test_rate_limits_is_any_limited() {
1094 let mut rate_limits = RateLimits::new();
1095
1096 rate_limits.add(RateLimit {
1098 categories: [DataCategory::Error].into(),
1099 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1100 reason_code: None,
1101 retry_after: RetryAfter::from_secs(1),
1102 namespaces: smallvec![],
1103 });
1104
1105 rate_limits.add(RateLimit {
1107 categories: [DataCategory::Transaction].into(),
1108 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1109 reason_code: None,
1110 retry_after: RetryAfter::from_secs(1),
1111 namespaces: smallvec![],
1112 });
1113
1114 let scoping = Scoping {
1115 organization_id: OrganizationId::new(42),
1116 project_id: ProjectId::new(21),
1117 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1118 key_id: None,
1119 };
1120
1121 let quotas = &[Quota {
1122 id: None,
1123 categories: [DataCategory::Attachment].into(),
1124 scope: QuotaScope::Organization,
1125 scope_id: Some("42".into()),
1126 limit: Some(0),
1127 window: None,
1128 reason_code: Some(ReasonCode::new("zero")),
1129 namespace: None,
1130 }];
1131
1132 assert!(
1133 rate_limits.is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Error)])
1134 );
1135 assert!(
1136 rate_limits
1137 .is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Attachment)])
1138 );
1139 assert!(rate_limits.is_any_limited_with_quotas(
1140 quotas,
1141 &[
1142 scoping.item(DataCategory::Replay),
1143 scoping.item(DataCategory::Error)
1144 ]
1145 ));
1146
1147 assert!(
1148 !rate_limits.is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Replay)])
1149 );
1150 }
1151
1152 #[test]
1153 fn test_rate_limits_check() {
1154 let mut rate_limits = RateLimits::new();
1155
1156 rate_limits.add(RateLimit {
1158 categories: [DataCategory::Error].into(),
1159 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1160 reason_code: None,
1161 retry_after: RetryAfter::from_secs(1),
1162 namespaces: smallvec![],
1163 });
1164
1165 rate_limits.add(RateLimit {
1167 categories: [DataCategory::Transaction].into(),
1168 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1169 reason_code: None,
1170 retry_after: RetryAfter::from_secs(1),
1171 namespaces: smallvec![],
1172 });
1173
1174 let applied_limits = rate_limits.check(ItemScoping {
1175 category: DataCategory::Error,
1176 scoping: Scoping {
1177 organization_id: OrganizationId::new(42),
1178 project_id: ProjectId::new(21),
1179 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1180 key_id: None,
1181 },
1182 namespace: MetricNamespaceScoping::None,
1183 });
1184
1185 insta::assert_ron_snapshot!(applied_limits, @r#"
1187 RateLimits(
1188 limits: [
1189 RateLimit(
1190 categories: [
1191 "error",
1192 ],
1193 scope: Organization(OrganizationId(42)),
1194 reason_code: None,
1195 retry_after: RetryAfter(1),
1196 namespaces: [],
1197 ),
1198 ],
1199 )
1200 "#);
1201 }
1202
1203 #[test]
1204 fn test_rate_limits_check_quotas() {
1205 let mut rate_limits = RateLimits::new();
1206
1207 rate_limits.add(RateLimit {
1209 categories: [DataCategory::Error].into(),
1210 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1211 reason_code: None,
1212 retry_after: RetryAfter::from_secs(1),
1213 namespaces: smallvec![],
1214 });
1215
1216 rate_limits.add(RateLimit {
1218 categories: [DataCategory::Transaction].into(),
1219 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1220 reason_code: None,
1221 retry_after: RetryAfter::from_secs(1),
1222 namespaces: smallvec![],
1223 });
1224
1225 let item_scoping = ItemScoping {
1226 category: DataCategory::Error,
1227 scoping: Scoping {
1228 organization_id: OrganizationId::new(42),
1229 project_id: ProjectId::new(21),
1230 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1231 key_id: None,
1232 },
1233 namespace: MetricNamespaceScoping::None,
1234 };
1235
1236 let quotas = &[Quota {
1237 id: None,
1238 categories: [DataCategory::Error].into(),
1239 scope: QuotaScope::Organization,
1240 scope_id: Some("42".into()),
1241 limit: Some(0),
1242 window: None,
1243 reason_code: Some(ReasonCode::new("zero")),
1244 namespace: None,
1245 }];
1246
1247 let applied_limits = rate_limits.check_with_quotas(quotas, item_scoping);
1248
1249 insta::assert_ron_snapshot!(applied_limits, @r#"
1250 RateLimits(
1251 limits: [
1252 RateLimit(
1253 categories: [
1254 "error",
1255 ],
1256 scope: Organization(OrganizationId(42)),
1257 reason_code: Some(ReasonCode("zero")),
1258 retry_after: RetryAfter(60),
1259 namespaces: [],
1260 ),
1261 ],
1262 )
1263 "#);
1264 }
1265
1266 #[test]
1267 fn test_rate_limits_merge() {
1268 let mut rate_limits1 = RateLimits::new();
1269 let mut rate_limits2 = RateLimits::new();
1270
1271 rate_limits1.add(RateLimit {
1272 categories: [DataCategory::Error].into(),
1273 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1274 reason_code: Some(ReasonCode::new("first")),
1275 retry_after: RetryAfter::from_secs(1),
1276 namespaces: smallvec![],
1277 });
1278
1279 rate_limits1.add(RateLimit {
1280 categories: [DataCategory::TransactionIndexed].into(),
1281 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1282 reason_code: None,
1283 retry_after: RetryAfter::from_secs(1),
1284 namespaces: smallvec![],
1285 });
1286
1287 rate_limits2.add(RateLimit {
1288 categories: [DataCategory::Error].into(),
1289 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1290 reason_code: Some(ReasonCode::new("second")),
1291 retry_after: RetryAfter::from_secs(10),
1292 namespaces: smallvec![],
1293 });
1294
1295 rate_limits1.merge(rate_limits2);
1296
1297 insta::assert_ron_snapshot!(rate_limits1, @r#"
1298 RateLimits(
1299 limits: [
1300 RateLimit(
1301 categories: [
1302 "error",
1303 ],
1304 scope: Organization(OrganizationId(42)),
1305 reason_code: Some(ReasonCode("second")),
1306 retry_after: RetryAfter(10),
1307 namespaces: [],
1308 ),
1309 RateLimit(
1310 categories: [
1311 "transaction_indexed",
1312 ],
1313 scope: Organization(OrganizationId(42)),
1314 reason_code: None,
1315 retry_after: RetryAfter(1),
1316 namespaces: [],
1317 ),
1318 ],
1319 )
1320 "#);
1321 }
1322
1323 #[test]
1324 fn test_cached_rate_limits_expired() {
1325 let cached = CachedRateLimits::new();
1326
1327 cached.add(RateLimit {
1329 categories: [DataCategory::Error].into(),
1330 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1331 reason_code: None,
1332 retry_after: RetryAfter::from_secs(1),
1333 namespaces: smallvec![],
1334 });
1335
1336 cached.add(RateLimit {
1338 categories: [DataCategory::Error].into(),
1339 scope: RateLimitScope::Project(ProjectId::new(21)),
1340 reason_code: None,
1341 retry_after: RetryAfter::from_secs(0),
1342 namespaces: smallvec![],
1343 });
1344
1345 let rate_limits = cached.current_limits();
1346
1347 insta::assert_ron_snapshot!(rate_limits, @r#"
1348 RateLimits(
1349 limits: [
1350 RateLimit(
1351 categories: [
1352 "error",
1353 ],
1354 scope: Organization(OrganizationId(42)),
1355 reason_code: None,
1356 retry_after: RetryAfter(1),
1357 namespaces: [],
1358 ),
1359 ],
1360 )
1361 "#);
1362 }
1363}