1use std::fmt;
2use std::str::FromStr;
3use std::sync::{Arc, Mutex, PoisonError};
4use std::time::{Duration, Instant};
5
6use relay_base_schema::data_category::DataCategory;
7use relay_base_schema::metrics::MetricNamespace;
8use relay_base_schema::organization::OrganizationId;
9use relay_base_schema::project::{ProjectId, ProjectKey};
10use smallvec::SmallVec;
11
12use crate::REJECT_ALL_SECS;
13use crate::quota::{DataCategories, ItemScoping, Quota, QuotaScope, ReasonCode, Scoping};
14
15#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
21pub struct RetryAfter {
22 when: Instant,
23}
24
25impl RetryAfter {
26 #[inline]
28 pub fn from_secs(seconds: u64) -> Self {
29 let now = Instant::now();
30 let when = now.checked_add(Duration::from_secs(seconds)).unwrap_or(now);
31 Self { when }
32 }
33
34 #[inline]
39 pub fn remaining_at(self, at: Instant) -> Option<Duration> {
40 if at >= self.when {
41 None
42 } else {
43 Some(self.when - at)
44 }
45 }
46
47 #[inline]
52 pub fn remaining(self) -> Option<Duration> {
53 self.remaining_at(Instant::now())
54 }
55
56 #[inline]
62 pub fn remaining_seconds_at(self, at: Instant) -> u64 {
63 match self.remaining_at(at) {
64 Some(duration) if duration.subsec_nanos() == 0 => duration.as_secs(),
66 Some(duration) => duration.as_secs() + 1,
67 None => 0,
68 }
69 }
70
71 #[inline]
76 pub fn remaining_seconds(self) -> u64 {
77 self.remaining_seconds_at(Instant::now())
78 }
79
80 #[inline]
82 pub fn expired_at(self, at: Instant) -> bool {
83 self.remaining_at(at).is_none()
84 }
85
86 #[inline]
88 pub fn expired(self) -> bool {
89 self.remaining_at(Instant::now()).is_none()
90 }
91}
92
93impl fmt::Debug for RetryAfter {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 match self.remaining_seconds() {
96 0 => write!(f, "RetryAfter(expired)"),
97 remaining => write!(f, "RetryAfter({remaining}s)"),
98 }
99 }
100}
101
102#[cfg(test)]
103impl serde::Serialize for RetryAfter {
104 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
105 where
106 S: serde::Serializer,
107 {
108 use serde::ser::SerializeTupleStruct;
109 let mut tup = serializer.serialize_tuple_struct("RetryAfter", 1)?;
110 tup.serialize_field(&self.remaining_seconds())?;
111 tup.end()
112 }
113}
114
115#[derive(Debug)]
117pub enum InvalidRetryAfter {
118 InvalidDelay(std::num::ParseFloatError),
120}
121
122impl FromStr for RetryAfter {
123 type Err = InvalidRetryAfter;
124
125 fn from_str(s: &str) -> Result<Self, Self::Err> {
126 let float = s.parse::<f64>().map_err(InvalidRetryAfter::InvalidDelay)?;
127 let seconds = float.max(0.0).ceil() as u64;
128 Ok(RetryAfter::from_secs(seconds))
129 }
130}
131
132#[derive(Clone, Debug, Eq, PartialEq)]
140#[cfg_attr(test, derive(serde::Serialize))]
141pub enum RateLimitScope {
142 Organization(OrganizationId),
144 Project(ProjectId),
146 Key(ProjectKey),
148}
149
150impl RateLimitScope {
151 pub fn for_quota(scoping: Scoping, scope: QuotaScope) -> Self {
156 match scope {
157 QuotaScope::Organization => Self::Organization(scoping.organization_id),
158 QuotaScope::Project => Self::Project(scoping.project_id),
159 QuotaScope::Key => Self::Key(scoping.project_key),
160 QuotaScope::Unknown => Self::Key(scoping.project_key),
162 }
163 }
164
165 pub fn name(&self) -> &'static str {
169 match *self {
170 Self::Key(_) => QuotaScope::Key.name(),
171 Self::Project(_) => QuotaScope::Project.name(),
172 Self::Organization(_) => QuotaScope::Organization.name(),
173 }
174 }
175}
176
177#[derive(Clone, Debug, PartialEq)]
184#[cfg_attr(test, derive(serde::Serialize))]
185pub struct RateLimit {
186 pub categories: DataCategories,
189
190 pub scope: RateLimitScope,
192
193 pub reason_code: Option<ReasonCode>,
195
196 pub retry_after: RetryAfter,
198
199 pub namespaces: SmallVec<[MetricNamespace; 1]>,
204}
205
206impl RateLimit {
207 pub fn from_quota(quota: &Quota, scoping: Scoping, retry_after: RetryAfter) -> Self {
212 Self {
213 categories: quota.categories.clone(),
214 scope: RateLimitScope::for_quota(scoping, quota.scope),
215 reason_code: quota.reason_code.clone(),
216 retry_after,
217 namespaces: quota.namespace.into_iter().collect(),
218 }
219 }
220
221 pub fn matches(&self, scoping: ItemScoping) -> bool {
226 self.matches_scope(scoping)
227 && scoping.matches_categories(&self.categories)
228 && scoping.matches_namespaces(&self.namespaces)
229 }
230
231 fn matches_scope(&self, scoping: ItemScoping) -> bool {
233 match self.scope {
234 RateLimitScope::Organization(org_id) => scoping.organization_id == org_id,
235 RateLimitScope::Project(project_id) => scoping.project_id == project_id,
236 RateLimitScope::Key(key) => scoping.project_key == key,
237 }
238 }
239}
240
241#[derive(Clone, Debug, Default)]
249#[cfg_attr(test, derive(serde::Serialize))]
250pub struct RateLimits {
251 limits: Vec<RateLimit>,
252}
253
254impl RateLimits {
255 pub fn new() -> Self {
257 Self::default()
258 }
259
260 pub fn add(&mut self, limit: RateLimit) {
265 let limit_opt = self.limits.iter_mut().find(|l| {
266 let RateLimit {
267 categories,
268 scope,
269 reason_code: _,
270 retry_after: _,
271 namespaces: namespace,
272 } = &limit;
273
274 *categories == l.categories && *scope == l.scope && *namespace == l.namespaces
275 });
276
277 match limit_opt {
278 None => self.limits.push(limit),
279 Some(existing) if existing.retry_after < limit.retry_after => *existing = limit,
280 Some(_) => (), }
282 }
283
284 pub fn merge(&mut self, limits: Self) {
290 for limit in limits {
291 self.add(limit);
292 }
293 }
294
295 pub fn merge_with(mut self, other: Self) -> Self {
299 self.merge(other);
300 self
301 }
302
303 pub fn is_ok(&self) -> bool {
307 !self.is_limited()
308 }
309
310 pub fn is_limited(&self) -> bool {
314 let now = Instant::now();
315 self.iter().any(|limit| !limit.retry_after.expired_at(now))
316 }
317
318 pub fn is_any_limited(&self, scopings: &[ItemScoping]) -> bool {
323 self.is_any_limited_with_quotas(&[], scopings)
324 }
325
326 pub fn is_any_limited_with_quotas<'a>(
334 &self,
335 quotas: impl IntoIterator<Item = &'a Quota>,
336 scopings: &[ItemScoping],
337 ) -> bool {
338 for quota in quotas {
339 for scoping in scopings {
340 if quota.limit == Some(0) && quota.matches(*scoping) {
341 return true;
342 }
343 }
344 }
345
346 let now = Instant::now();
347 for scoping in scopings {
348 for limit in &self.limits {
349 if limit.matches(*scoping) && !limit.retry_after.expired_at(now) {
350 return true;
351 }
352 }
353 }
354
355 false
356 }
357
358 pub fn clean_expired(&mut self, now: Instant) {
363 self.limits
364 .retain(|limit| !limit.retry_after.expired_at(now));
365 }
366
367 pub fn check(&self, scoping: ItemScoping) -> Self {
373 self.check_with_quotas(&[], scoping)
374 }
375
376 pub fn check_with_quotas<'a>(
384 &self,
385 quotas: impl IntoIterator<Item = &'a Quota>,
386 scoping: ItemScoping,
387 ) -> Self {
388 let mut applied_limits = Self::new();
389
390 for quota in quotas {
391 if quota.limit == Some(0) && quota.matches(scoping) {
392 let retry_after = RetryAfter::from_secs(REJECT_ALL_SECS);
393 applied_limits.add(RateLimit::from_quota(quota, *scoping, retry_after));
394 }
395 }
396
397 for limit in &self.limits {
398 if limit.matches(scoping) {
399 applied_limits.add(limit.clone());
400 }
401 }
402
403 applied_limits
404 }
405
406 pub fn iter(&self) -> RateLimitsIter<'_> {
408 RateLimitsIter {
409 iter: self.limits.iter(),
410 }
411 }
412
413 pub fn longest(&self) -> Option<&RateLimit> {
418 self.iter().max_by_key(|limit| limit.retry_after)
419 }
420
421 pub fn is_empty(&self) -> bool {
426 self.limits.is_empty()
427 }
428}
429
430pub struct RateLimitsIter<'a> {
435 iter: std::slice::Iter<'a, RateLimit>,
436}
437
438impl<'a> Iterator for RateLimitsIter<'a> {
439 type Item = &'a RateLimit;
440
441 fn next(&mut self) -> Option<Self::Item> {
442 self.iter.next()
443 }
444}
445
446impl IntoIterator for RateLimits {
447 type IntoIter = RateLimitsIntoIter;
448 type Item = RateLimit;
449
450 fn into_iter(self) -> Self::IntoIter {
451 RateLimitsIntoIter {
452 iter: self.limits.into_iter(),
453 }
454 }
455}
456
457pub struct RateLimitsIntoIter {
462 iter: std::vec::IntoIter<RateLimit>,
463}
464
465impl Iterator for RateLimitsIntoIter {
466 type Item = RateLimit;
467
468 fn next(&mut self) -> Option<Self::Item> {
469 self.iter.next()
470 }
471}
472
473impl<'a> IntoIterator for &'a RateLimits {
474 type IntoIter = RateLimitsIter<'a>;
475 type Item = &'a RateLimit;
476
477 fn into_iter(self) -> Self::IntoIter {
478 self.iter()
479 }
480}
481
482#[derive(Debug, Default)]
491pub struct CachedRateLimits(Mutex<Arc<RateLimits>>);
492
493impl CachedRateLimits {
494 pub fn new() -> Self {
496 Self::default()
497 }
498
499 pub fn add(&self, limit: RateLimit) {
503 let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
504 let current = Arc::make_mut(&mut inner);
505 current.add(limit);
506 }
507
508 pub fn merge(&self, limits: RateLimits) {
512 if limits.is_empty() {
513 return;
514 }
515
516 let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
517 let current = Arc::make_mut(&mut inner);
518 for mut limit in limits {
519 for i in 0..limit.categories.len() {
530 let Some(category) = limit.categories.get(i) else {
531 debug_assert!(false, "logical error");
532 break;
533 };
534
535 for inherited in inherited_categories(category) {
536 if let Some(categories) = limit.categories.add(*inherited) {
537 limit.categories = categories;
538 }
539 }
540 }
541
542 current.add(limit);
543 }
544 }
545
546 pub fn current_limits(&self) -> Arc<RateLimits> {
551 let now = Instant::now();
552 let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
553 Arc::make_mut(&mut inner).clean_expired(now);
554 Arc::clone(&inner)
555 }
556}
557
558fn inherited_categories(category: &DataCategory) -> &'static [DataCategory] {
568 match category {
569 DataCategory::Transaction => &[DataCategory::Span],
570 DataCategory::Span => &[DataCategory::Transaction],
571 _ => &[],
572 }
573}
574
575#[cfg(test)]
576mod tests {
577 use smallvec::smallvec;
578
579 use super::*;
580 use crate::MetricNamespaceScoping;
581 use crate::quota::DataCategory;
582
583 #[test]
584 fn test_parse_retry_after() {
585 let retry_after = "17.1".parse::<RetryAfter>().expect("parse RetryAfter");
587 assert_eq!(retry_after.remaining_seconds(), 18);
588 assert!(!retry_after.expired());
589 let retry_after = "17.7".parse::<RetryAfter>().expect("parse RetryAfter");
590 assert_eq!(retry_after.remaining_seconds(), 18);
591 assert!(!retry_after.expired());
592
593 let retry_after = "17".parse::<RetryAfter>().expect("parse RetryAfter");
595 assert_eq!(retry_after.remaining_seconds(), 17);
596 assert!(!retry_after.expired());
597
598 let retry_after = "-2".parse::<RetryAfter>().expect("parse RetryAfter");
600 assert_eq!(retry_after.remaining_seconds(), 0);
601 assert!(retry_after.expired());
602 let retry_after = "-inf".parse::<RetryAfter>().expect("parse RetryAfter");
603 assert_eq!(retry_after.remaining_seconds(), 0);
604 assert!(retry_after.expired());
605
606 let retry_after = "inf".parse::<RetryAfter>().expect("parse RetryAfter");
608 assert_eq!(retry_after.remaining_seconds(), 0);
609 assert!(retry_after.expired());
610 let retry_after = "NaN".parse::<RetryAfter>().expect("parse RetryAfter");
611 assert_eq!(retry_after.remaining_seconds(), 0);
612 assert!(retry_after.expired());
613
614 let retry_after = "100000000000000000000"
616 .parse::<RetryAfter>()
617 .expect("parse RetryAfter");
618 assert_eq!(retry_after.remaining_seconds(), 0);
619 assert!(retry_after.expired());
620
621 "".parse::<RetryAfter>().expect_err("error RetryAfter");
623 "nope".parse::<RetryAfter>().expect_err("error RetryAfter");
624 " 2 ".parse::<RetryAfter>().expect_err("error RetryAfter");
625 "6 0".parse::<RetryAfter>().expect_err("error RetryAfter");
626 }
627
628 #[test]
629 fn test_rate_limit_matches_categories() {
630 let rate_limit = RateLimit {
631 categories: [DataCategory::Unknown, DataCategory::Error].into(),
632 scope: RateLimitScope::Organization(OrganizationId::new(42)),
633 reason_code: None,
634 retry_after: RetryAfter::from_secs(1),
635 namespaces: smallvec![],
636 };
637
638 assert!(rate_limit.matches(ItemScoping {
639 category: DataCategory::Error,
640 scoping: Scoping {
641 organization_id: OrganizationId::new(42),
642 project_id: ProjectId::new(21),
643 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
644 key_id: None,
645 },
646 namespace: MetricNamespaceScoping::None,
647 }));
648
649 assert!(!rate_limit.matches(ItemScoping {
650 category: DataCategory::Transaction,
651 scoping: Scoping {
652 organization_id: OrganizationId::new(42),
653 project_id: ProjectId::new(21),
654 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
655 key_id: None,
656 },
657 namespace: MetricNamespaceScoping::None,
658 }));
659 }
660
661 #[test]
662 fn test_rate_limit_matches_organization() {
663 let rate_limit = RateLimit {
664 categories: DataCategories::new(),
665 scope: RateLimitScope::Organization(OrganizationId::new(42)),
666 reason_code: None,
667 retry_after: RetryAfter::from_secs(1),
668 namespaces: smallvec![],
669 };
670
671 assert!(rate_limit.matches(ItemScoping {
672 category: DataCategory::Error,
673 scoping: Scoping {
674 organization_id: OrganizationId::new(42),
675 project_id: ProjectId::new(21),
676 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
677 key_id: None,
678 },
679 namespace: MetricNamespaceScoping::None,
680 }));
681
682 assert!(!rate_limit.matches(ItemScoping {
683 category: DataCategory::Error,
684 scoping: Scoping {
685 organization_id: OrganizationId::new(0),
686 project_id: ProjectId::new(21),
687 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
688 key_id: None,
689 },
690 namespace: MetricNamespaceScoping::None,
691 }));
692 }
693
694 #[test]
695 fn test_rate_limit_matches_project() {
696 let rate_limit = RateLimit {
697 categories: DataCategories::new(),
698 scope: RateLimitScope::Project(ProjectId::new(21)),
699 reason_code: None,
700 retry_after: RetryAfter::from_secs(1),
701 namespaces: smallvec![],
702 };
703
704 assert!(rate_limit.matches(ItemScoping {
705 category: DataCategory::Error,
706 scoping: Scoping {
707 organization_id: OrganizationId::new(42),
708 project_id: ProjectId::new(21),
709 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
710 key_id: None,
711 },
712 namespace: MetricNamespaceScoping::None,
713 }));
714
715 assert!(!rate_limit.matches(ItemScoping {
716 category: DataCategory::Error,
717 scoping: Scoping {
718 organization_id: OrganizationId::new(42),
719 project_id: ProjectId::new(0),
720 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
721 key_id: None,
722 },
723 namespace: MetricNamespaceScoping::None,
724 }));
725 }
726
727 #[test]
728 fn test_rate_limit_matches_namespaces() {
729 let rate_limit = RateLimit {
730 categories: Default::default(),
731 scope: RateLimitScope::Organization(OrganizationId::new(42)),
732 reason_code: None,
733 retry_after: RetryAfter::from_secs(1),
734 namespaces: smallvec![MetricNamespace::Custom],
735 };
736
737 let scoping = Scoping {
738 organization_id: OrganizationId::new(42),
739 project_id: ProjectId::new(21),
740 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
741 key_id: None,
742 };
743
744 assert!(rate_limit.matches(ItemScoping {
745 category: DataCategory::MetricBucket,
746 scoping,
747 namespace: MetricNamespaceScoping::Some(MetricNamespace::Custom),
748 }));
749
750 assert!(!rate_limit.matches(ItemScoping {
751 category: DataCategory::MetricBucket,
752 scoping,
753 namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
754 }));
755
756 let general_rate_limit = RateLimit {
757 categories: Default::default(),
758 scope: RateLimitScope::Organization(OrganizationId::new(42)),
759 reason_code: None,
760 retry_after: RetryAfter::from_secs(1),
761 namespaces: smallvec![], };
763
764 assert!(general_rate_limit.matches(ItemScoping {
765 category: DataCategory::MetricBucket,
766 scoping,
767 namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
768 }));
769
770 assert!(general_rate_limit.matches(ItemScoping {
771 category: DataCategory::MetricBucket,
772 scoping,
773 namespace: MetricNamespaceScoping::None,
774 }));
775 }
776
777 #[test]
778 fn test_rate_limit_matches_key() {
779 let rate_limit = RateLimit {
780 categories: DataCategories::new(),
781 scope: RateLimitScope::Key(
782 ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
783 ),
784 reason_code: None,
785 retry_after: RetryAfter::from_secs(1),
786 namespaces: smallvec![],
787 };
788
789 assert!(rate_limit.matches(ItemScoping {
790 category: DataCategory::Error,
791 scoping: Scoping {
792 organization_id: OrganizationId::new(42),
793 project_id: ProjectId::new(21),
794 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
795 key_id: None,
796 },
797 namespace: MetricNamespaceScoping::None,
798 }));
799
800 assert!(!rate_limit.matches(ItemScoping {
801 category: DataCategory::Error,
802 scoping: Scoping {
803 organization_id: OrganizationId::new(0),
804 project_id: ProjectId::new(21),
805 project_key: ProjectKey::parse("deadbeefdeadbeefdeadbeefdeadbeef").unwrap(),
806 key_id: None,
807 },
808 namespace: MetricNamespaceScoping::None,
809 }));
810 }
811
812 #[test]
813 fn test_rate_limits_add_replacement() {
814 let mut rate_limits = RateLimits::new();
815
816 rate_limits.add(RateLimit {
817 categories: [DataCategory::Default, DataCategory::Error].into(),
818 scope: RateLimitScope::Organization(OrganizationId::new(42)),
819 reason_code: Some(ReasonCode::new("first")),
820 retry_after: RetryAfter::from_secs(1),
821 namespaces: smallvec![],
822 });
823
824 rate_limits.add(RateLimit {
826 categories: [DataCategory::Error, DataCategory::Default].into(),
827 scope: RateLimitScope::Organization(OrganizationId::new(42)),
828 reason_code: Some(ReasonCode::new("second")),
829 retry_after: RetryAfter::from_secs(10),
830 namespaces: smallvec![],
831 });
832
833 insta::assert_ron_snapshot!(rate_limits, @r#"
834 RateLimits(
835 limits: [
836 RateLimit(
837 categories: [
838 "default",
839 "error",
840 ],
841 scope: Organization(OrganizationId(42)),
842 reason_code: Some(ReasonCode("second")),
843 retry_after: RetryAfter(10),
844 namespaces: [],
845 ),
846 ],
847 )
848 "#);
849 }
850
851 #[test]
852 fn test_rate_limits_add_shadowing() {
853 let mut rate_limits = RateLimits::new();
854
855 rate_limits.add(RateLimit {
856 categories: [DataCategory::Default, DataCategory::Error].into(),
857 scope: RateLimitScope::Organization(OrganizationId::new(42)),
858 reason_code: Some(ReasonCode::new("first")),
859 retry_after: RetryAfter::from_secs(10),
860 namespaces: smallvec![],
861 });
862
863 rate_limits.add(RateLimit {
865 categories: [DataCategory::Error, DataCategory::Default].into(),
866 scope: RateLimitScope::Organization(OrganizationId::new(42)),
867 reason_code: Some(ReasonCode::new("second")),
868 retry_after: RetryAfter::from_secs(1),
869 namespaces: smallvec![],
870 });
871
872 insta::assert_ron_snapshot!(rate_limits, @r#"
873 RateLimits(
874 limits: [
875 RateLimit(
876 categories: [
877 "default",
878 "error",
879 ],
880 scope: Organization(OrganizationId(42)),
881 reason_code: Some(ReasonCode("first")),
882 retry_after: RetryAfter(10),
883 namespaces: [],
884 ),
885 ],
886 )
887 "#);
888 }
889
890 #[test]
891 fn test_rate_limits_add_buckets() {
892 let mut rate_limits = RateLimits::new();
893
894 rate_limits.add(RateLimit {
895 categories: [DataCategory::Error].into(),
896 scope: RateLimitScope::Organization(OrganizationId::new(42)),
897 reason_code: None,
898 retry_after: RetryAfter::from_secs(1),
899 namespaces: smallvec![],
900 });
901
902 rate_limits.add(RateLimit {
904 categories: [DataCategory::Transaction].into(),
905 scope: RateLimitScope::Organization(OrganizationId::new(42)),
906 reason_code: None,
907 retry_after: RetryAfter::from_secs(1),
908 namespaces: smallvec![],
909 });
910
911 rate_limits.add(RateLimit {
913 categories: [DataCategory::Error].into(),
914 scope: RateLimitScope::Project(ProjectId::new(21)),
915 reason_code: None,
916 retry_after: RetryAfter::from_secs(1),
917 namespaces: smallvec![],
918 });
919
920 insta::assert_ron_snapshot!(rate_limits, @r#"
921 RateLimits(
922 limits: [
923 RateLimit(
924 categories: [
925 "error",
926 ],
927 scope: Organization(OrganizationId(42)),
928 reason_code: None,
929 retry_after: RetryAfter(1),
930 namespaces: [],
931 ),
932 RateLimit(
933 categories: [
934 "transaction",
935 ],
936 scope: Organization(OrganizationId(42)),
937 reason_code: None,
938 retry_after: RetryAfter(1),
939 namespaces: [],
940 ),
941 RateLimit(
942 categories: [
943 "error",
944 ],
945 scope: Project(ProjectId(21)),
946 reason_code: None,
947 retry_after: RetryAfter(1),
948 namespaces: [],
949 ),
950 ],
951 )
952 "#);
953 }
954
955 #[test]
957 fn test_rate_limits_add_namespaces() {
958 let mut rate_limits = RateLimits::new();
959
960 rate_limits.add(RateLimit {
961 categories: [DataCategory::MetricBucket].into(),
962 scope: RateLimitScope::Organization(OrganizationId::new(42)),
963 reason_code: None,
964 retry_after: RetryAfter::from_secs(1),
965 namespaces: smallvec![MetricNamespace::Custom],
966 });
967
968 rate_limits.add(RateLimit {
970 categories: [DataCategory::MetricBucket].into(),
971 scope: RateLimitScope::Organization(OrganizationId::new(42)),
972 reason_code: None,
973 retry_after: RetryAfter::from_secs(1),
974 namespaces: smallvec![MetricNamespace::Spans],
975 });
976
977 insta::assert_ron_snapshot!(rate_limits, @r#"
978 RateLimits(
979 limits: [
980 RateLimit(
981 categories: [
982 "metric_bucket",
983 ],
984 scope: Organization(OrganizationId(42)),
985 reason_code: None,
986 retry_after: RetryAfter(1),
987 namespaces: [
988 "custom",
989 ],
990 ),
991 RateLimit(
992 categories: [
993 "metric_bucket",
994 ],
995 scope: Organization(OrganizationId(42)),
996 reason_code: None,
997 retry_after: RetryAfter(1),
998 namespaces: [
999 "spans",
1000 ],
1001 ),
1002 ],
1003 )
1004 "#);
1005 }
1006
1007 #[test]
1008 fn test_rate_limits_longest() {
1009 let mut rate_limits = RateLimits::new();
1010
1011 rate_limits.add(RateLimit {
1012 categories: [DataCategory::Error].into(),
1013 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1014 reason_code: Some(ReasonCode::new("first")),
1015 retry_after: RetryAfter::from_secs(1),
1016 namespaces: smallvec![],
1017 });
1018
1019 rate_limits.add(RateLimit {
1021 categories: [DataCategory::Transaction].into(),
1022 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1023 reason_code: Some(ReasonCode::new("second")),
1024 retry_after: RetryAfter::from_secs(10),
1025 namespaces: smallvec![],
1026 });
1027
1028 let rate_limit = rate_limits.longest().unwrap();
1029 insta::assert_ron_snapshot!(rate_limit, @r#"
1030 RateLimit(
1031 categories: [
1032 "transaction",
1033 ],
1034 scope: Organization(OrganizationId(42)),
1035 reason_code: Some(ReasonCode("second")),
1036 retry_after: RetryAfter(10),
1037 namespaces: [],
1038 )
1039 "#);
1040 }
1041
1042 #[test]
1043 fn test_rate_limits_clean_expired() {
1044 let mut rate_limits = RateLimits::new();
1045
1046 rate_limits.add(RateLimit {
1048 categories: [DataCategory::Error].into(),
1049 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1050 reason_code: None,
1051 retry_after: RetryAfter::from_secs(1),
1052 namespaces: smallvec![],
1053 });
1054
1055 rate_limits.add(RateLimit {
1057 categories: [DataCategory::Error].into(),
1058 scope: RateLimitScope::Project(ProjectId::new(21)),
1059 reason_code: None,
1060 retry_after: RetryAfter::from_secs(0),
1061 namespaces: smallvec![],
1062 });
1063
1064 assert_eq!(rate_limits.iter().count(), 2);
1066
1067 rate_limits.clean_expired(Instant::now());
1068
1069 insta::assert_ron_snapshot!(rate_limits, @r#"
1071 RateLimits(
1072 limits: [
1073 RateLimit(
1074 categories: [
1075 "error",
1076 ],
1077 scope: Organization(OrganizationId(42)),
1078 reason_code: None,
1079 retry_after: RetryAfter(1),
1080 namespaces: [],
1081 ),
1082 ],
1083 )
1084 "#);
1085 }
1086
1087 #[test]
1088 fn test_rate_limits_is_any_limited() {
1089 let mut rate_limits = RateLimits::new();
1090
1091 rate_limits.add(RateLimit {
1093 categories: [DataCategory::Error].into(),
1094 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1095 reason_code: None,
1096 retry_after: RetryAfter::from_secs(1),
1097 namespaces: smallvec![],
1098 });
1099
1100 rate_limits.add(RateLimit {
1102 categories: [DataCategory::Transaction].into(),
1103 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1104 reason_code: None,
1105 retry_after: RetryAfter::from_secs(1),
1106 namespaces: smallvec![],
1107 });
1108
1109 let scoping = Scoping {
1110 organization_id: OrganizationId::new(42),
1111 project_id: ProjectId::new(21),
1112 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1113 key_id: None,
1114 };
1115
1116 let quotas = &[Quota {
1117 id: None,
1118 categories: [DataCategory::Attachment].into(),
1119 scope: QuotaScope::Organization,
1120 scope_id: Some("42".into()),
1121 limit: Some(0),
1122 window: None,
1123 reason_code: Some(ReasonCode::new("zero")),
1124 namespace: None,
1125 }];
1126
1127 assert!(
1128 rate_limits.is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Error)])
1129 );
1130 assert!(
1131 rate_limits
1132 .is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Attachment)])
1133 );
1134 assert!(rate_limits.is_any_limited_with_quotas(
1135 quotas,
1136 &[
1137 scoping.item(DataCategory::Replay),
1138 scoping.item(DataCategory::Error)
1139 ]
1140 ));
1141
1142 assert!(
1143 !rate_limits.is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Replay)])
1144 );
1145 }
1146
1147 #[test]
1148 fn test_rate_limits_check() {
1149 let mut rate_limits = RateLimits::new();
1150
1151 rate_limits.add(RateLimit {
1153 categories: [DataCategory::Error].into(),
1154 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1155 reason_code: None,
1156 retry_after: RetryAfter::from_secs(1),
1157 namespaces: smallvec![],
1158 });
1159
1160 rate_limits.add(RateLimit {
1162 categories: [DataCategory::Transaction].into(),
1163 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1164 reason_code: None,
1165 retry_after: RetryAfter::from_secs(1),
1166 namespaces: smallvec![],
1167 });
1168
1169 let applied_limits = rate_limits.check(ItemScoping {
1170 category: DataCategory::Error,
1171 scoping: Scoping {
1172 organization_id: OrganizationId::new(42),
1173 project_id: ProjectId::new(21),
1174 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1175 key_id: None,
1176 },
1177 namespace: MetricNamespaceScoping::None,
1178 });
1179
1180 insta::assert_ron_snapshot!(applied_limits, @r#"
1182 RateLimits(
1183 limits: [
1184 RateLimit(
1185 categories: [
1186 "error",
1187 ],
1188 scope: Organization(OrganizationId(42)),
1189 reason_code: None,
1190 retry_after: RetryAfter(1),
1191 namespaces: [],
1192 ),
1193 ],
1194 )
1195 "#);
1196 }
1197
1198 #[test]
1199 fn test_rate_limits_check_quotas() {
1200 let mut rate_limits = RateLimits::new();
1201
1202 rate_limits.add(RateLimit {
1204 categories: [DataCategory::Error].into(),
1205 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1206 reason_code: None,
1207 retry_after: RetryAfter::from_secs(1),
1208 namespaces: smallvec![],
1209 });
1210
1211 rate_limits.add(RateLimit {
1213 categories: [DataCategory::Transaction].into(),
1214 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1215 reason_code: None,
1216 retry_after: RetryAfter::from_secs(1),
1217 namespaces: smallvec![],
1218 });
1219
1220 let item_scoping = ItemScoping {
1221 category: DataCategory::Error,
1222 scoping: Scoping {
1223 organization_id: OrganizationId::new(42),
1224 project_id: ProjectId::new(21),
1225 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1226 key_id: None,
1227 },
1228 namespace: MetricNamespaceScoping::None,
1229 };
1230
1231 let quotas = &[Quota {
1232 id: None,
1233 categories: [DataCategory::Error].into(),
1234 scope: QuotaScope::Organization,
1235 scope_id: Some("42".into()),
1236 limit: Some(0),
1237 window: None,
1238 reason_code: Some(ReasonCode::new("zero")),
1239 namespace: None,
1240 }];
1241
1242 let applied_limits = rate_limits.check_with_quotas(quotas, item_scoping);
1243
1244 insta::assert_ron_snapshot!(applied_limits, @r#"
1245 RateLimits(
1246 limits: [
1247 RateLimit(
1248 categories: [
1249 "error",
1250 ],
1251 scope: Organization(OrganizationId(42)),
1252 reason_code: Some(ReasonCode("zero")),
1253 retry_after: RetryAfter(60),
1254 namespaces: [],
1255 ),
1256 ],
1257 )
1258 "#);
1259 }
1260
1261 #[test]
1262 fn test_rate_limits_merge() {
1263 let mut rate_limits1 = RateLimits::new();
1264 let mut rate_limits2 = RateLimits::new();
1265
1266 rate_limits1.add(RateLimit {
1267 categories: [DataCategory::Error].into(),
1268 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1269 reason_code: Some(ReasonCode::new("first")),
1270 retry_after: RetryAfter::from_secs(1),
1271 namespaces: smallvec![],
1272 });
1273
1274 rate_limits1.add(RateLimit {
1275 categories: [DataCategory::TransactionIndexed].into(),
1276 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1277 reason_code: None,
1278 retry_after: RetryAfter::from_secs(1),
1279 namespaces: smallvec![],
1280 });
1281
1282 rate_limits2.add(RateLimit {
1283 categories: [DataCategory::Error].into(),
1284 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1285 reason_code: Some(ReasonCode::new("second")),
1286 retry_after: RetryAfter::from_secs(10),
1287 namespaces: smallvec![],
1288 });
1289
1290 rate_limits1.merge(rate_limits2);
1291
1292 insta::assert_ron_snapshot!(rate_limits1, @r#"
1293 RateLimits(
1294 limits: [
1295 RateLimit(
1296 categories: [
1297 "error",
1298 ],
1299 scope: Organization(OrganizationId(42)),
1300 reason_code: Some(ReasonCode("second")),
1301 retry_after: RetryAfter(10),
1302 namespaces: [],
1303 ),
1304 RateLimit(
1305 categories: [
1306 "transaction_indexed",
1307 ],
1308 scope: Organization(OrganizationId(42)),
1309 reason_code: None,
1310 retry_after: RetryAfter(1),
1311 namespaces: [],
1312 ),
1313 ],
1314 )
1315 "#);
1316 }
1317
1318 #[test]
1319 fn test_cached_rate_limits_expired() {
1320 let cached = CachedRateLimits::new();
1321
1322 cached.add(RateLimit {
1324 categories: [DataCategory::Error].into(),
1325 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1326 reason_code: None,
1327 retry_after: RetryAfter::from_secs(1),
1328 namespaces: smallvec![],
1329 });
1330
1331 cached.add(RateLimit {
1333 categories: [DataCategory::Error].into(),
1334 scope: RateLimitScope::Project(ProjectId::new(21)),
1335 reason_code: None,
1336 retry_after: RetryAfter::from_secs(0),
1337 namespaces: smallvec![],
1338 });
1339
1340 let rate_limits = cached.current_limits();
1341
1342 insta::assert_ron_snapshot!(rate_limits, @r#"
1343 RateLimits(
1344 limits: [
1345 RateLimit(
1346 categories: [
1347 "error",
1348 ],
1349 scope: Organization(OrganizationId(42)),
1350 reason_code: None,
1351 retry_after: RetryAfter(1),
1352 namespaces: [],
1353 ),
1354 ],
1355 )
1356 "#);
1357 }
1358}