1use std::fmt;
2use std::str::FromStr;
3use std::sync::{Arc, Mutex, PoisonError};
4use std::time::{Duration, Instant};
5
6use relay_base_schema::data_category::DataCategory;
7use relay_base_schema::metrics::MetricNamespace;
8use relay_base_schema::organization::OrganizationId;
9use relay_base_schema::project::{ProjectId, ProjectKey};
10use smallvec::SmallVec;
11
12use crate::REJECT_ALL_SECS;
13use crate::quota::{DataCategories, ItemScoping, Quota, QuotaScope, ReasonCode, Scoping};
14
15#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
21pub struct RetryAfter {
22 when: Instant,
23}
24
25impl RetryAfter {
26 #[inline]
28 pub fn from_secs(seconds: u64) -> Self {
29 let now = Instant::now();
30 let when = now.checked_add(Duration::from_secs(seconds)).unwrap_or(now);
31 Self { when }
32 }
33
34 #[inline]
39 pub fn remaining_at(self, at: Instant) -> Option<Duration> {
40 if at >= self.when {
41 None
42 } else {
43 Some(self.when - at)
44 }
45 }
46
47 #[inline]
52 pub fn remaining(self) -> Option<Duration> {
53 self.remaining_at(Instant::now())
54 }
55
56 #[inline]
62 pub fn remaining_seconds_at(self, at: Instant) -> u64 {
63 match self.remaining_at(at) {
64 Some(duration) if duration.subsec_nanos() == 0 => duration.as_secs(),
66 Some(duration) => duration.as_secs() + 1,
67 None => 0,
68 }
69 }
70
71 #[inline]
76 pub fn remaining_seconds(self) -> u64 {
77 self.remaining_seconds_at(Instant::now())
78 }
79
80 #[inline]
82 pub fn expired_at(self, at: Instant) -> bool {
83 self.remaining_at(at).is_none()
84 }
85
86 #[inline]
88 pub fn expired(self) -> bool {
89 self.remaining_at(Instant::now()).is_none()
90 }
91}
92
93impl fmt::Debug for RetryAfter {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 match self.remaining_seconds() {
96 0 => write!(f, "RetryAfter(expired)"),
97 remaining => write!(f, "RetryAfter({remaining}s)"),
98 }
99 }
100}
101
102#[cfg(test)]
103impl serde::Serialize for RetryAfter {
104 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
105 where
106 S: serde::Serializer,
107 {
108 use serde::ser::SerializeTupleStruct;
109 let mut tup = serializer.serialize_tuple_struct("RetryAfter", 1)?;
110 tup.serialize_field(&self.remaining_seconds())?;
111 tup.end()
112 }
113}
114
115#[derive(Debug)]
117pub enum InvalidRetryAfter {
118 InvalidDelay(std::num::ParseFloatError),
120}
121
122impl FromStr for RetryAfter {
123 type Err = InvalidRetryAfter;
124
125 fn from_str(s: &str) -> Result<Self, Self::Err> {
126 let float = s.parse::<f64>().map_err(InvalidRetryAfter::InvalidDelay)?;
127 let seconds = float.max(0.0).ceil() as u64;
128 Ok(RetryAfter::from_secs(seconds))
129 }
130}
131
132#[derive(Clone, Debug, Eq, PartialEq)]
140#[cfg_attr(test, derive(serde::Serialize))]
141pub enum RateLimitScope {
142 Organization(OrganizationId),
144 Project(ProjectId),
146 Key(ProjectKey),
148}
149
150impl RateLimitScope {
151 pub fn for_quota(scoping: Scoping, scope: QuotaScope) -> Self {
156 match scope {
157 QuotaScope::Organization => Self::Organization(scoping.organization_id),
158 QuotaScope::Project => Self::Project(scoping.project_id),
159 QuotaScope::Key => Self::Key(scoping.project_key),
160 QuotaScope::Unknown => Self::Key(scoping.project_key),
162 }
163 }
164
165 pub fn name(&self) -> &'static str {
169 match *self {
170 Self::Key(_) => QuotaScope::Key.name(),
171 Self::Project(_) => QuotaScope::Project.name(),
172 Self::Organization(_) => QuotaScope::Organization.name(),
173 }
174 }
175}
176
177#[derive(Clone, Debug, PartialEq)]
184#[cfg_attr(test, derive(serde::Serialize))]
185pub struct RateLimit {
186 pub categories: DataCategories,
189
190 pub scope: RateLimitScope,
192
193 pub reason_code: Option<ReasonCode>,
195
196 pub retry_after: RetryAfter,
198
199 pub namespaces: SmallVec<[MetricNamespace; 1]>,
204}
205
206impl RateLimit {
207 pub fn from_quota(quota: &Quota, scoping: Scoping, retry_after: RetryAfter) -> Self {
212 Self {
213 categories: quota.categories,
214 scope: RateLimitScope::for_quota(scoping, quota.scope),
215 reason_code: quota.reason_code.clone(),
216 retry_after,
217 namespaces: quota.namespace.into_iter().collect(),
218 }
219 }
220
221 pub fn matches(&self, scoping: ItemScoping) -> bool {
226 self.matches_scope(scoping)
227 && scoping.matches_categories(self.categories)
228 && scoping.matches_namespaces(&self.namespaces)
229 }
230
231 fn matches_scope(&self, scoping: ItemScoping) -> bool {
233 match self.scope {
234 RateLimitScope::Organization(org_id) => scoping.organization_id == org_id,
235 RateLimitScope::Project(project_id) => scoping.project_id == project_id,
236 RateLimitScope::Key(key) => scoping.project_key == key,
237 }
238 }
239}
240
241#[derive(Clone, Debug, Default)]
249#[cfg_attr(test, derive(serde::Serialize))]
250pub struct RateLimits {
251 limits: Vec<RateLimit>,
252}
253
254impl RateLimits {
255 pub fn new() -> Self {
257 Self::default()
258 }
259
260 pub fn add(&mut self, limit: RateLimit) {
265 let limit_opt = self.limits.iter_mut().find(|l| {
266 let RateLimit {
267 categories,
268 scope,
269 reason_code: _,
270 retry_after: _,
271 namespaces: namespace,
272 } = &limit;
273
274 *categories == l.categories && *scope == l.scope && *namespace == l.namespaces
275 });
276
277 match limit_opt {
278 None => self.limits.push(limit),
279 Some(existing) if existing.retry_after < limit.retry_after => *existing = limit,
280 Some(_) => (), }
282 }
283
284 pub fn merge(&mut self, limits: Self) {
290 for limit in limits {
291 self.add(limit);
292 }
293 }
294
295 pub fn merge_with(mut self, other: Self) -> Self {
299 self.merge(other);
300 self
301 }
302
303 pub fn is_ok(&self) -> bool {
307 !self.is_limited()
308 }
309
310 pub fn is_limited(&self) -> bool {
314 let now = Instant::now();
315 self.iter().any(|limit| !limit.retry_after.expired_at(now))
316 }
317
318 pub fn is_any_limited(&self, scopings: &[ItemScoping]) -> bool {
323 self.is_any_limited_with_quotas(&[], scopings)
324 }
325
326 pub fn is_any_limited_with_quotas<'a>(
334 &self,
335 quotas: impl IntoIterator<Item = &'a Quota>,
336 scopings: &[ItemScoping],
337 ) -> bool {
338 for quota in quotas {
339 for scoping in scopings {
340 if quota.limit == Some(0) && quota.matches(*scoping) {
341 return true;
342 }
343 }
344 }
345
346 let now = Instant::now();
347 for scoping in scopings {
348 for limit in &self.limits {
349 if limit.matches(*scoping) && !limit.retry_after.expired_at(now) {
350 return true;
351 }
352 }
353 }
354
355 false
356 }
357
358 pub fn clean_expired(&mut self, now: Instant) {
363 self.limits
364 .retain(|limit| !limit.retry_after.expired_at(now));
365 }
366
367 pub fn check(&self, scoping: ItemScoping) -> Self {
373 self.check_with_quotas(&[], scoping)
374 }
375
376 pub fn check_with_quotas<'a>(
384 &self,
385 quotas: impl IntoIterator<Item = &'a Quota>,
386 scoping: ItemScoping,
387 ) -> Self {
388 let mut applied_limits = Self::new();
389
390 for quota in quotas {
391 if quota.limit == Some(0) && quota.matches(scoping) {
392 let retry_after = RetryAfter::from_secs(REJECT_ALL_SECS);
393 applied_limits.add(RateLimit::from_quota(quota, *scoping, retry_after));
394 }
395 }
396
397 for limit in &self.limits {
398 if limit.matches(scoping) {
399 applied_limits.add(limit.clone());
400 }
401 }
402
403 applied_limits
404 }
405
406 pub fn iter(&self) -> RateLimitsIter<'_> {
408 RateLimitsIter {
409 iter: self.limits.iter(),
410 }
411 }
412
413 pub fn longest(&self) -> Option<&RateLimit> {
418 self.iter().max_by_key(|limit| limit.retry_after)
419 }
420
421 pub fn is_empty(&self) -> bool {
426 self.limits.is_empty()
427 }
428}
429
430pub struct RateLimitsIter<'a> {
435 iter: std::slice::Iter<'a, RateLimit>,
436}
437
438impl<'a> Iterator for RateLimitsIter<'a> {
439 type Item = &'a RateLimit;
440
441 fn next(&mut self) -> Option<Self::Item> {
442 self.iter.next()
443 }
444}
445
446impl IntoIterator for RateLimits {
447 type IntoIter = RateLimitsIntoIter;
448 type Item = RateLimit;
449
450 fn into_iter(self) -> Self::IntoIter {
451 RateLimitsIntoIter {
452 iter: self.limits.into_iter(),
453 }
454 }
455}
456
457pub struct RateLimitsIntoIter {
462 iter: std::vec::IntoIter<RateLimit>,
463}
464
465impl Iterator for RateLimitsIntoIter {
466 type Item = RateLimit;
467
468 fn next(&mut self) -> Option<Self::Item> {
469 self.iter.next()
470 }
471}
472
473impl<'a> IntoIterator for &'a RateLimits {
474 type IntoIter = RateLimitsIter<'a>;
475 type Item = &'a RateLimit;
476
477 fn into_iter(self) -> Self::IntoIter {
478 self.iter()
479 }
480}
481
482#[derive(Debug, Default)]
491pub struct CachedRateLimits(Mutex<Arc<RateLimits>>);
492
493impl CachedRateLimits {
494 pub fn new() -> Self {
496 Self::default()
497 }
498
499 pub fn add(&self, limit: RateLimit) {
503 let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
504 let current = Arc::make_mut(&mut inner);
505 current.add(limit);
506 }
507
508 pub fn merge(&self, limits: RateLimits) {
512 if limits.is_empty() {
513 return;
514 }
515
516 let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
517 let current = Arc::make_mut(&mut inner);
518 for mut limit in limits {
519 for category in limit.categories.iter() {
530 for inherited in inherited_categories(&category) {
531 if let Some(categories) = limit.categories.add(*inherited) {
532 limit.categories = categories;
533 }
534 }
535 }
536
537 current.add(limit);
538 }
539 }
540
541 pub fn current_limits(&self) -> Arc<RateLimits> {
546 let now = Instant::now();
547 let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
548 Arc::make_mut(&mut inner).clean_expired(now);
549 Arc::clone(&inner)
550 }
551}
552
553fn inherited_categories(category: &DataCategory) -> &'static [DataCategory] {
563 match category {
564 DataCategory::Transaction => &[DataCategory::Span],
565 DataCategory::Span => &[DataCategory::Transaction],
566 _ => &[],
567 }
568}
569
570#[cfg(test)]
571mod tests {
572 use smallvec::smallvec;
573
574 use super::*;
575 use crate::MetricNamespaceScoping;
576 use crate::quota::DataCategory;
577
578 #[test]
579 fn test_parse_retry_after() {
580 let retry_after = "17.1".parse::<RetryAfter>().expect("parse RetryAfter");
582 assert_eq!(retry_after.remaining_seconds(), 18);
583 assert!(!retry_after.expired());
584 let retry_after = "17.7".parse::<RetryAfter>().expect("parse RetryAfter");
585 assert_eq!(retry_after.remaining_seconds(), 18);
586 assert!(!retry_after.expired());
587
588 let retry_after = "17".parse::<RetryAfter>().expect("parse RetryAfter");
590 assert_eq!(retry_after.remaining_seconds(), 17);
591 assert!(!retry_after.expired());
592
593 let retry_after = "-2".parse::<RetryAfter>().expect("parse RetryAfter");
595 assert_eq!(retry_after.remaining_seconds(), 0);
596 assert!(retry_after.expired());
597 let retry_after = "-inf".parse::<RetryAfter>().expect("parse RetryAfter");
598 assert_eq!(retry_after.remaining_seconds(), 0);
599 assert!(retry_after.expired());
600
601 let retry_after = "inf".parse::<RetryAfter>().expect("parse RetryAfter");
603 assert_eq!(retry_after.remaining_seconds(), 0);
604 assert!(retry_after.expired());
605 let retry_after = "NaN".parse::<RetryAfter>().expect("parse RetryAfter");
606 assert_eq!(retry_after.remaining_seconds(), 0);
607 assert!(retry_after.expired());
608
609 let retry_after = "100000000000000000000"
611 .parse::<RetryAfter>()
612 .expect("parse RetryAfter");
613 assert_eq!(retry_after.remaining_seconds(), 0);
614 assert!(retry_after.expired());
615
616 "".parse::<RetryAfter>().expect_err("error RetryAfter");
618 "nope".parse::<RetryAfter>().expect_err("error RetryAfter");
619 " 2 ".parse::<RetryAfter>().expect_err("error RetryAfter");
620 "6 0".parse::<RetryAfter>().expect_err("error RetryAfter");
621 }
622
623 #[test]
624 fn test_rate_limit_matches_categories() {
625 let rate_limit = RateLimit {
626 categories: [DataCategory::Unknown, DataCategory::Error].into(),
627 scope: RateLimitScope::Organization(OrganizationId::new(42)),
628 reason_code: None,
629 retry_after: RetryAfter::from_secs(1),
630 namespaces: smallvec![],
631 };
632
633 assert!(rate_limit.matches(ItemScoping {
634 category: DataCategory::Error,
635 scoping: Scoping {
636 organization_id: OrganizationId::new(42),
637 project_id: ProjectId::new(21),
638 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
639 key_id: None,
640 },
641 namespace: MetricNamespaceScoping::None,
642 }));
643
644 assert!(!rate_limit.matches(ItemScoping {
645 category: DataCategory::Transaction,
646 scoping: Scoping {
647 organization_id: OrganizationId::new(42),
648 project_id: ProjectId::new(21),
649 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
650 key_id: None,
651 },
652 namespace: MetricNamespaceScoping::None,
653 }));
654 }
655
656 #[test]
657 fn test_rate_limit_matches_organization() {
658 let rate_limit = RateLimit {
659 categories: DataCategories::new(),
660 scope: RateLimitScope::Organization(OrganizationId::new(42)),
661 reason_code: None,
662 retry_after: RetryAfter::from_secs(1),
663 namespaces: smallvec![],
664 };
665
666 assert!(rate_limit.matches(ItemScoping {
667 category: DataCategory::Error,
668 scoping: Scoping {
669 organization_id: OrganizationId::new(42),
670 project_id: ProjectId::new(21),
671 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
672 key_id: None,
673 },
674 namespace: MetricNamespaceScoping::None,
675 }));
676
677 assert!(!rate_limit.matches(ItemScoping {
678 category: DataCategory::Error,
679 scoping: Scoping {
680 organization_id: OrganizationId::new(0),
681 project_id: ProjectId::new(21),
682 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
683 key_id: None,
684 },
685 namespace: MetricNamespaceScoping::None,
686 }));
687 }
688
689 #[test]
690 fn test_rate_limit_matches_project() {
691 let rate_limit = RateLimit {
692 categories: DataCategories::new(),
693 scope: RateLimitScope::Project(ProjectId::new(21)),
694 reason_code: None,
695 retry_after: RetryAfter::from_secs(1),
696 namespaces: smallvec![],
697 };
698
699 assert!(rate_limit.matches(ItemScoping {
700 category: DataCategory::Error,
701 scoping: Scoping {
702 organization_id: OrganizationId::new(42),
703 project_id: ProjectId::new(21),
704 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
705 key_id: None,
706 },
707 namespace: MetricNamespaceScoping::None,
708 }));
709
710 assert!(!rate_limit.matches(ItemScoping {
711 category: DataCategory::Error,
712 scoping: Scoping {
713 organization_id: OrganizationId::new(42),
714 project_id: ProjectId::new(0),
715 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
716 key_id: None,
717 },
718 namespace: MetricNamespaceScoping::None,
719 }));
720 }
721
722 #[test]
723 fn test_rate_limit_matches_namespaces() {
724 let rate_limit = RateLimit {
725 categories: Default::default(),
726 scope: RateLimitScope::Organization(OrganizationId::new(42)),
727 reason_code: None,
728 retry_after: RetryAfter::from_secs(1),
729 namespaces: smallvec![MetricNamespace::Custom],
730 };
731
732 let scoping = Scoping {
733 organization_id: OrganizationId::new(42),
734 project_id: ProjectId::new(21),
735 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
736 key_id: None,
737 };
738
739 assert!(rate_limit.matches(ItemScoping {
740 category: DataCategory::MetricBucket,
741 scoping,
742 namespace: MetricNamespaceScoping::Some(MetricNamespace::Custom),
743 }));
744
745 assert!(!rate_limit.matches(ItemScoping {
746 category: DataCategory::MetricBucket,
747 scoping,
748 namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
749 }));
750
751 let general_rate_limit = RateLimit {
752 categories: Default::default(),
753 scope: RateLimitScope::Organization(OrganizationId::new(42)),
754 reason_code: None,
755 retry_after: RetryAfter::from_secs(1),
756 namespaces: smallvec![], };
758
759 assert!(general_rate_limit.matches(ItemScoping {
760 category: DataCategory::MetricBucket,
761 scoping,
762 namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
763 }));
764
765 assert!(general_rate_limit.matches(ItemScoping {
766 category: DataCategory::MetricBucket,
767 scoping,
768 namespace: MetricNamespaceScoping::None,
769 }));
770 }
771
772 #[test]
773 fn test_rate_limit_matches_key() {
774 let rate_limit = RateLimit {
775 categories: DataCategories::new(),
776 scope: RateLimitScope::Key(
777 ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
778 ),
779 reason_code: None,
780 retry_after: RetryAfter::from_secs(1),
781 namespaces: smallvec![],
782 };
783
784 assert!(rate_limit.matches(ItemScoping {
785 category: DataCategory::Error,
786 scoping: Scoping {
787 organization_id: OrganizationId::new(42),
788 project_id: ProjectId::new(21),
789 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
790 key_id: None,
791 },
792 namespace: MetricNamespaceScoping::None,
793 }));
794
795 assert!(!rate_limit.matches(ItemScoping {
796 category: DataCategory::Error,
797 scoping: Scoping {
798 organization_id: OrganizationId::new(0),
799 project_id: ProjectId::new(21),
800 project_key: ProjectKey::parse("deadbeefdeadbeefdeadbeefdeadbeef").unwrap(),
801 key_id: None,
802 },
803 namespace: MetricNamespaceScoping::None,
804 }));
805 }
806
807 #[test]
808 fn test_rate_limits_add_replacement() {
809 let mut rate_limits = RateLimits::new();
810
811 rate_limits.add(RateLimit {
812 categories: [DataCategory::Default, DataCategory::Error].into(),
813 scope: RateLimitScope::Organization(OrganizationId::new(42)),
814 reason_code: Some(ReasonCode::new("first")),
815 retry_after: RetryAfter::from_secs(1),
816 namespaces: smallvec![],
817 });
818
819 rate_limits.add(RateLimit {
821 categories: [DataCategory::Error, DataCategory::Default].into(),
822 scope: RateLimitScope::Organization(OrganizationId::new(42)),
823 reason_code: Some(ReasonCode::new("second")),
824 retry_after: RetryAfter::from_secs(10),
825 namespaces: smallvec![],
826 });
827
828 insta::assert_ron_snapshot!(rate_limits, @r#"
829 RateLimits(
830 limits: [
831 RateLimit(
832 categories: [
833 "default",
834 "error",
835 ],
836 scope: Organization(OrganizationId(42)),
837 reason_code: Some(ReasonCode("second")),
838 retry_after: RetryAfter(10),
839 namespaces: [],
840 ),
841 ],
842 )
843 "#);
844 }
845
846 #[test]
847 fn test_rate_limits_add_shadowing() {
848 let mut rate_limits = RateLimits::new();
849
850 rate_limits.add(RateLimit {
851 categories: [DataCategory::Default, DataCategory::Error].into(),
852 scope: RateLimitScope::Organization(OrganizationId::new(42)),
853 reason_code: Some(ReasonCode::new("first")),
854 retry_after: RetryAfter::from_secs(10),
855 namespaces: smallvec![],
856 });
857
858 rate_limits.add(RateLimit {
860 categories: [DataCategory::Error, DataCategory::Default].into(),
861 scope: RateLimitScope::Organization(OrganizationId::new(42)),
862 reason_code: Some(ReasonCode::new("second")),
863 retry_after: RetryAfter::from_secs(1),
864 namespaces: smallvec![],
865 });
866
867 insta::assert_ron_snapshot!(rate_limits, @r#"
868 RateLimits(
869 limits: [
870 RateLimit(
871 categories: [
872 "default",
873 "error",
874 ],
875 scope: Organization(OrganizationId(42)),
876 reason_code: Some(ReasonCode("first")),
877 retry_after: RetryAfter(10),
878 namespaces: [],
879 ),
880 ],
881 )
882 "#);
883 }
884
885 #[test]
886 fn test_rate_limits_add_buckets() {
887 let mut rate_limits = RateLimits::new();
888
889 rate_limits.add(RateLimit {
890 categories: [DataCategory::Error].into(),
891 scope: RateLimitScope::Organization(OrganizationId::new(42)),
892 reason_code: None,
893 retry_after: RetryAfter::from_secs(1),
894 namespaces: smallvec![],
895 });
896
897 rate_limits.add(RateLimit {
899 categories: [DataCategory::Transaction].into(),
900 scope: RateLimitScope::Organization(OrganizationId::new(42)),
901 reason_code: None,
902 retry_after: RetryAfter::from_secs(1),
903 namespaces: smallvec![],
904 });
905
906 rate_limits.add(RateLimit {
908 categories: [DataCategory::Error].into(),
909 scope: RateLimitScope::Project(ProjectId::new(21)),
910 reason_code: None,
911 retry_after: RetryAfter::from_secs(1),
912 namespaces: smallvec![],
913 });
914
915 insta::assert_ron_snapshot!(rate_limits, @r#"
916 RateLimits(
917 limits: [
918 RateLimit(
919 categories: [
920 "error",
921 ],
922 scope: Organization(OrganizationId(42)),
923 reason_code: None,
924 retry_after: RetryAfter(1),
925 namespaces: [],
926 ),
927 RateLimit(
928 categories: [
929 "transaction",
930 ],
931 scope: Organization(OrganizationId(42)),
932 reason_code: None,
933 retry_after: RetryAfter(1),
934 namespaces: [],
935 ),
936 RateLimit(
937 categories: [
938 "error",
939 ],
940 scope: Project(ProjectId(21)),
941 reason_code: None,
942 retry_after: RetryAfter(1),
943 namespaces: [],
944 ),
945 ],
946 )
947 "#);
948 }
949
950 #[test]
952 fn test_rate_limits_add_namespaces() {
953 let mut rate_limits = RateLimits::new();
954
955 rate_limits.add(RateLimit {
956 categories: [DataCategory::MetricBucket].into(),
957 scope: RateLimitScope::Organization(OrganizationId::new(42)),
958 reason_code: None,
959 retry_after: RetryAfter::from_secs(1),
960 namespaces: smallvec![MetricNamespace::Custom],
961 });
962
963 rate_limits.add(RateLimit {
965 categories: [DataCategory::MetricBucket].into(),
966 scope: RateLimitScope::Organization(OrganizationId::new(42)),
967 reason_code: None,
968 retry_after: RetryAfter::from_secs(1),
969 namespaces: smallvec![MetricNamespace::Spans],
970 });
971
972 insta::assert_ron_snapshot!(rate_limits, @r#"
973 RateLimits(
974 limits: [
975 RateLimit(
976 categories: [
977 "metric_bucket",
978 ],
979 scope: Organization(OrganizationId(42)),
980 reason_code: None,
981 retry_after: RetryAfter(1),
982 namespaces: [
983 "custom",
984 ],
985 ),
986 RateLimit(
987 categories: [
988 "metric_bucket",
989 ],
990 scope: Organization(OrganizationId(42)),
991 reason_code: None,
992 retry_after: RetryAfter(1),
993 namespaces: [
994 "spans",
995 ],
996 ),
997 ],
998 )
999 "#);
1000 }
1001
1002 #[test]
1003 fn test_rate_limits_longest() {
1004 let mut rate_limits = RateLimits::new();
1005
1006 rate_limits.add(RateLimit {
1007 categories: [DataCategory::Error].into(),
1008 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1009 reason_code: Some(ReasonCode::new("first")),
1010 retry_after: RetryAfter::from_secs(1),
1011 namespaces: smallvec![],
1012 });
1013
1014 rate_limits.add(RateLimit {
1016 categories: [DataCategory::Transaction].into(),
1017 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1018 reason_code: Some(ReasonCode::new("second")),
1019 retry_after: RetryAfter::from_secs(10),
1020 namespaces: smallvec![],
1021 });
1022
1023 let rate_limit = rate_limits.longest().unwrap();
1024 insta::assert_ron_snapshot!(rate_limit, @r#"
1025 RateLimit(
1026 categories: [
1027 "transaction",
1028 ],
1029 scope: Organization(OrganizationId(42)),
1030 reason_code: Some(ReasonCode("second")),
1031 retry_after: RetryAfter(10),
1032 namespaces: [],
1033 )
1034 "#);
1035 }
1036
1037 #[test]
1038 fn test_rate_limits_clean_expired() {
1039 let mut rate_limits = RateLimits::new();
1040
1041 rate_limits.add(RateLimit {
1043 categories: [DataCategory::Error].into(),
1044 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1045 reason_code: None,
1046 retry_after: RetryAfter::from_secs(1),
1047 namespaces: smallvec![],
1048 });
1049
1050 rate_limits.add(RateLimit {
1052 categories: [DataCategory::Error].into(),
1053 scope: RateLimitScope::Project(ProjectId::new(21)),
1054 reason_code: None,
1055 retry_after: RetryAfter::from_secs(0),
1056 namespaces: smallvec![],
1057 });
1058
1059 assert_eq!(rate_limits.iter().count(), 2);
1061
1062 rate_limits.clean_expired(Instant::now());
1063
1064 insta::assert_ron_snapshot!(rate_limits, @r#"
1066 RateLimits(
1067 limits: [
1068 RateLimit(
1069 categories: [
1070 "error",
1071 ],
1072 scope: Organization(OrganizationId(42)),
1073 reason_code: None,
1074 retry_after: RetryAfter(1),
1075 namespaces: [],
1076 ),
1077 ],
1078 )
1079 "#);
1080 }
1081
1082 #[test]
1083 fn test_rate_limits_is_any_limited() {
1084 let mut rate_limits = RateLimits::new();
1085
1086 rate_limits.add(RateLimit {
1088 categories: [DataCategory::Error].into(),
1089 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1090 reason_code: None,
1091 retry_after: RetryAfter::from_secs(1),
1092 namespaces: smallvec![],
1093 });
1094
1095 rate_limits.add(RateLimit {
1097 categories: [DataCategory::Transaction].into(),
1098 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1099 reason_code: None,
1100 retry_after: RetryAfter::from_secs(1),
1101 namespaces: smallvec![],
1102 });
1103
1104 let scoping = Scoping {
1105 organization_id: OrganizationId::new(42),
1106 project_id: ProjectId::new(21),
1107 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1108 key_id: None,
1109 };
1110
1111 let quotas = &[Quota {
1112 id: None,
1113 categories: [DataCategory::Attachment].into(),
1114 scope: QuotaScope::Organization,
1115 scope_id: Some("42".into()),
1116 limit: Some(0),
1117 window: None,
1118 reason_code: Some(ReasonCode::new("zero")),
1119 namespace: None,
1120 }];
1121
1122 assert!(
1123 rate_limits.is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Error)])
1124 );
1125 assert!(
1126 rate_limits
1127 .is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Attachment)])
1128 );
1129 assert!(rate_limits.is_any_limited_with_quotas(
1130 quotas,
1131 &[
1132 scoping.item(DataCategory::Replay),
1133 scoping.item(DataCategory::Error)
1134 ]
1135 ));
1136
1137 assert!(
1138 !rate_limits.is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Replay)])
1139 );
1140 }
1141
1142 #[test]
1143 fn test_rate_limits_check() {
1144 let mut rate_limits = RateLimits::new();
1145
1146 rate_limits.add(RateLimit {
1148 categories: [DataCategory::Error].into(),
1149 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1150 reason_code: None,
1151 retry_after: RetryAfter::from_secs(1),
1152 namespaces: smallvec![],
1153 });
1154
1155 rate_limits.add(RateLimit {
1157 categories: [DataCategory::Transaction].into(),
1158 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1159 reason_code: None,
1160 retry_after: RetryAfter::from_secs(1),
1161 namespaces: smallvec![],
1162 });
1163
1164 let applied_limits = rate_limits.check(ItemScoping {
1165 category: DataCategory::Error,
1166 scoping: Scoping {
1167 organization_id: OrganizationId::new(42),
1168 project_id: ProjectId::new(21),
1169 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1170 key_id: None,
1171 },
1172 namespace: MetricNamespaceScoping::None,
1173 });
1174
1175 insta::assert_ron_snapshot!(applied_limits, @r#"
1177 RateLimits(
1178 limits: [
1179 RateLimit(
1180 categories: [
1181 "error",
1182 ],
1183 scope: Organization(OrganizationId(42)),
1184 reason_code: None,
1185 retry_after: RetryAfter(1),
1186 namespaces: [],
1187 ),
1188 ],
1189 )
1190 "#);
1191 }
1192
1193 #[test]
1194 fn test_rate_limits_check_quotas() {
1195 let mut rate_limits = RateLimits::new();
1196
1197 rate_limits.add(RateLimit {
1199 categories: [DataCategory::Error].into(),
1200 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1201 reason_code: None,
1202 retry_after: RetryAfter::from_secs(1),
1203 namespaces: smallvec![],
1204 });
1205
1206 rate_limits.add(RateLimit {
1208 categories: [DataCategory::Transaction].into(),
1209 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1210 reason_code: None,
1211 retry_after: RetryAfter::from_secs(1),
1212 namespaces: smallvec![],
1213 });
1214
1215 let item_scoping = ItemScoping {
1216 category: DataCategory::Error,
1217 scoping: Scoping {
1218 organization_id: OrganizationId::new(42),
1219 project_id: ProjectId::new(21),
1220 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1221 key_id: None,
1222 },
1223 namespace: MetricNamespaceScoping::None,
1224 };
1225
1226 let quotas = &[Quota {
1227 id: None,
1228 categories: [DataCategory::Error].into(),
1229 scope: QuotaScope::Organization,
1230 scope_id: Some("42".into()),
1231 limit: Some(0),
1232 window: None,
1233 reason_code: Some(ReasonCode::new("zero")),
1234 namespace: None,
1235 }];
1236
1237 let applied_limits = rate_limits.check_with_quotas(quotas, item_scoping);
1238
1239 insta::assert_ron_snapshot!(applied_limits, @r#"
1240 RateLimits(
1241 limits: [
1242 RateLimit(
1243 categories: [
1244 "error",
1245 ],
1246 scope: Organization(OrganizationId(42)),
1247 reason_code: Some(ReasonCode("zero")),
1248 retry_after: RetryAfter(60),
1249 namespaces: [],
1250 ),
1251 ],
1252 )
1253 "#);
1254 }
1255
1256 #[test]
1257 fn test_rate_limits_merge() {
1258 let mut rate_limits1 = RateLimits::new();
1259 let mut rate_limits2 = RateLimits::new();
1260
1261 rate_limits1.add(RateLimit {
1262 categories: [DataCategory::Error].into(),
1263 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1264 reason_code: Some(ReasonCode::new("first")),
1265 retry_after: RetryAfter::from_secs(1),
1266 namespaces: smallvec![],
1267 });
1268
1269 rate_limits1.add(RateLimit {
1270 categories: [DataCategory::TransactionIndexed].into(),
1271 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1272 reason_code: None,
1273 retry_after: RetryAfter::from_secs(1),
1274 namespaces: smallvec![],
1275 });
1276
1277 rate_limits2.add(RateLimit {
1278 categories: [DataCategory::Error].into(),
1279 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1280 reason_code: Some(ReasonCode::new("second")),
1281 retry_after: RetryAfter::from_secs(10),
1282 namespaces: smallvec![],
1283 });
1284
1285 rate_limits1.merge(rate_limits2);
1286
1287 insta::assert_ron_snapshot!(rate_limits1, @r#"
1288 RateLimits(
1289 limits: [
1290 RateLimit(
1291 categories: [
1292 "error",
1293 ],
1294 scope: Organization(OrganizationId(42)),
1295 reason_code: Some(ReasonCode("second")),
1296 retry_after: RetryAfter(10),
1297 namespaces: [],
1298 ),
1299 RateLimit(
1300 categories: [
1301 "transaction_indexed",
1302 ],
1303 scope: Organization(OrganizationId(42)),
1304 reason_code: None,
1305 retry_after: RetryAfter(1),
1306 namespaces: [],
1307 ),
1308 ],
1309 )
1310 "#);
1311 }
1312
1313 #[test]
1314 fn test_cached_rate_limits_expired() {
1315 let cached = CachedRateLimits::new();
1316
1317 cached.add(RateLimit {
1319 categories: [DataCategory::Error].into(),
1320 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1321 reason_code: None,
1322 retry_after: RetryAfter::from_secs(1),
1323 namespaces: smallvec![],
1324 });
1325
1326 cached.add(RateLimit {
1328 categories: [DataCategory::Error].into(),
1329 scope: RateLimitScope::Project(ProjectId::new(21)),
1330 reason_code: None,
1331 retry_after: RetryAfter::from_secs(0),
1332 namespaces: smallvec![],
1333 });
1334
1335 let rate_limits = cached.current_limits();
1336
1337 insta::assert_ron_snapshot!(rate_limits, @r#"
1338 RateLimits(
1339 limits: [
1340 RateLimit(
1341 categories: [
1342 "error",
1343 ],
1344 scope: Organization(OrganizationId(42)),
1345 reason_code: None,
1346 retry_after: RetryAfter(1),
1347 namespaces: [],
1348 ),
1349 ],
1350 )
1351 "#);
1352 }
1353}