relay_quotas/
rate_limit.rs

1use std::fmt;
2use std::str::FromStr;
3use std::sync::{Arc, Mutex, PoisonError};
4use std::time::{Duration, Instant};
5
6use relay_base_schema::metrics::MetricNamespace;
7use relay_base_schema::organization::OrganizationId;
8use relay_base_schema::project::{ProjectId, ProjectKey};
9use smallvec::SmallVec;
10
11use crate::REJECT_ALL_SECS;
12use crate::quota::{DataCategories, ItemScoping, Quota, QuotaScope, ReasonCode, Scoping};
13
14/// A monotonic expiration marker for rate limits.
15///
16/// [`RetryAfter`] represents a point in time when a rate limit expires. It allows checking
17/// whether the rate limit is still active or has expired, and calculating the remaining time
18/// until expiration.
19#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
20pub struct RetryAfter {
21    when: Instant,
22}
23
24impl RetryAfter {
25    /// Creates a new [`RetryAfter`] instance that expires after the specified number of seconds.
26    #[inline]
27    pub fn from_secs(seconds: u64) -> Self {
28        let now = Instant::now();
29        let when = now.checked_add(Duration::from_secs(seconds)).unwrap_or(now);
30        Self { when }
31    }
32
33    /// Returns the remaining duration until the rate limit expires using the specified instant
34    /// as the reference point.
35    ///
36    /// If the rate limit has already expired at the given instant, returns `None`.
37    #[inline]
38    pub fn remaining_at(self, at: Instant) -> Option<Duration> {
39        if at >= self.when {
40            None
41        } else {
42            Some(self.when - at)
43        }
44    }
45
46    /// Returns the remaining duration until the rate limit expires.
47    ///
48    /// This uses the current instant as the reference point. If the rate limit has already
49    /// expired, returns `None`.
50    #[inline]
51    pub fn remaining(self) -> Option<Duration> {
52        self.remaining_at(Instant::now())
53    }
54
55    /// Returns the remaining seconds until the rate limit expires using the specified instant
56    /// as the reference point.
57    ///
58    /// This method rounds up to the next second to ensure that rate limits are strictly enforced.
59    /// If the rate limit has already expired, returns `0`.
60    #[inline]
61    pub fn remaining_seconds_at(self, at: Instant) -> u64 {
62        match self.remaining_at(at) {
63            // Compensate for the missing subsec part by adding 1s
64            Some(duration) if duration.subsec_nanos() == 0 => duration.as_secs(),
65            Some(duration) => duration.as_secs() + 1,
66            None => 0,
67        }
68    }
69
70    /// Returns the remaining seconds until the rate limit expires.
71    ///
72    /// This uses the current instant as the reference point. If the rate limit
73    /// has already expired, returns `0`.
74    #[inline]
75    pub fn remaining_seconds(self) -> u64 {
76        self.remaining_seconds_at(Instant::now())
77    }
78
79    /// Returns whether this rate limit has expired at the specified instant.
80    #[inline]
81    pub fn expired_at(self, at: Instant) -> bool {
82        self.remaining_at(at).is_none()
83    }
84
85    /// Returns whether this rate limit has expired at the current instant.
86    #[inline]
87    pub fn expired(self) -> bool {
88        self.remaining_at(Instant::now()).is_none()
89    }
90}
91
92impl fmt::Debug for RetryAfter {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        match self.remaining_seconds() {
95            0 => write!(f, "RetryAfter(expired)"),
96            remaining => write!(f, "RetryAfter({remaining}s)"),
97        }
98    }
99}
100
101#[cfg(test)]
102impl serde::Serialize for RetryAfter {
103    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
104    where
105        S: serde::Serializer,
106    {
107        use serde::ser::SerializeTupleStruct;
108        let mut tup = serializer.serialize_tuple_struct("RetryAfter", 1)?;
109        tup.serialize_field(&self.remaining_seconds())?;
110        tup.end()
111    }
112}
113
114/// Error that occurs when parsing a [`RetryAfter`] from a string fails.
115#[derive(Debug)]
116pub enum InvalidRetryAfter {
117    /// The supplied delay in seconds was not valid.
118    InvalidDelay(std::num::ParseFloatError),
119}
120
121impl FromStr for RetryAfter {
122    type Err = InvalidRetryAfter;
123
124    fn from_str(s: &str) -> Result<Self, Self::Err> {
125        let float = s.parse::<f64>().map_err(InvalidRetryAfter::InvalidDelay)?;
126        let seconds = float.max(0.0).ceil() as u64;
127        Ok(RetryAfter::from_secs(seconds))
128    }
129}
130
131/// The scope that a rate limit applies to.
132///
133/// Unlike [`QuotaScope`], which only declares the class of the scope, this enum carries
134/// the specific identifiers of the individual scopes that a rate limit applies to.
135///
136/// Rate limits can be applied at different levels of granularity, from global (affecting all data)
137/// down to a specific project key.
138#[derive(Clone, Debug, Eq, PartialEq)]
139#[cfg_attr(test, derive(serde::Serialize))]
140pub enum RateLimitScope {
141    /// Global scope.
142    Global,
143    /// An organization with identifier.
144    Organization(OrganizationId),
145    /// A project with identifier.
146    Project(ProjectId),
147    /// A DSN public key.
148    Key(ProjectKey),
149}
150
151impl RateLimitScope {
152    /// Creates a rate limiting scope from the given item scoping for a specific quota.
153    ///
154    /// This extracts the appropriate scope identifier based on the quota's scope type.
155    /// For unknown scopes, it assumes the most specific scope (Key).
156    pub fn for_quota(scoping: Scoping, scope: QuotaScope) -> Self {
157        match scope {
158            QuotaScope::Global => Self::Global,
159            QuotaScope::Organization => Self::Organization(scoping.organization_id),
160            QuotaScope::Project => Self::Project(scoping.project_id),
161            QuotaScope::Key => Self::Key(scoping.project_key),
162            // For unknown scopes, assume the most specific scope:
163            QuotaScope::Unknown => Self::Key(scoping.project_key),
164        }
165    }
166
167    /// Returns the canonical name of this scope.
168    ///
169    /// This corresponds to the name of the corresponding [`QuotaScope`].
170    pub fn name(&self) -> &'static str {
171        match *self {
172            Self::Global => QuotaScope::Global.name(),
173            Self::Key(_) => QuotaScope::Key.name(),
174            Self::Project(_) => QuotaScope::Project.name(),
175            Self::Organization(_) => QuotaScope::Organization.name(),
176        }
177    }
178}
179
180/// An active rate limit that restricts data ingestion.
181///
182/// A rate limit defines restrictions for specific data categories within a particular scope.
183/// It includes an expiration time after which the limit is no longer enforced.
184///
185/// Rate limits can be created from [`Quota`]s or directly constructed with the needed parameters.
186#[derive(Clone, Debug, PartialEq)]
187#[cfg_attr(test, derive(serde::Serialize))]
188pub struct RateLimit {
189    /// A set of data categories that this quota applies to. If empty, this rate limit
190    /// applies to all data categories.
191    pub categories: DataCategories,
192
193    /// The scope of this rate limit.
194    pub scope: RateLimitScope,
195
196    /// A machine-readable reason indicating which quota caused this rate limit.
197    pub reason_code: Option<ReasonCode>,
198
199    /// A marker when this rate limit expires.
200    pub retry_after: RetryAfter,
201
202    /// The metric namespace of this rate limit.
203    ///
204    /// Only relevant for data categories of type metric bucket. If empty,
205    /// this rate limit applies to metrics of all namespaces.
206    pub namespaces: SmallVec<[MetricNamespace; 1]>,
207}
208
209impl RateLimit {
210    /// Creates a new rate limit from the given [`Quota`].
211    ///
212    /// This builds a rate limit with the appropriate scope derived from the quota and scoping
213    /// information. The categories and other properties are copied from the quota.
214    pub fn from_quota(quota: &Quota, scoping: Scoping, retry_after: RetryAfter) -> Self {
215        Self {
216            categories: quota.categories.clone(),
217            scope: RateLimitScope::for_quota(scoping, quota.scope),
218            reason_code: quota.reason_code.clone(),
219            retry_after,
220            namespaces: quota.namespace.into_iter().collect(),
221        }
222    }
223
224    /// Checks whether this rate limit applies to the given item.
225    ///
226    /// A rate limit applies if its scope matches the item's scope, and the item's
227    /// category and namespace match those of the rate limit.
228    pub fn matches(&self, scoping: ItemScoping) -> bool {
229        self.matches_scope(scoping)
230            && scoping.matches_categories(&self.categories)
231            && scoping.matches_namespaces(&self.namespaces)
232    }
233
234    /// Returns `true` if the rate limiting scope matches the given item.
235    fn matches_scope(&self, scoping: ItemScoping) -> bool {
236        match self.scope {
237            RateLimitScope::Global => true,
238            RateLimitScope::Organization(org_id) => scoping.organization_id == org_id,
239            RateLimitScope::Project(project_id) => scoping.project_id == project_id,
240            RateLimitScope::Key(key) => scoping.project_key == key,
241        }
242    }
243}
244
245/// A collection of scoped rate limits.
246///
247/// [`RateLimits`] manages a set of active rate limits that can be checked against
248/// incoming data.
249///
250/// The collection can be empty, indicated by [`is_ok`](Self::is_ok), meaning no rate limits
251/// are currently active.
252#[derive(Clone, Debug, Default)]
253#[cfg_attr(test, derive(serde::Serialize))]
254pub struct RateLimits {
255    limits: Vec<RateLimit>,
256}
257
258impl RateLimits {
259    /// Creates an empty [`RateLimits`] instance.
260    pub fn new() -> Self {
261        Self::default()
262    }
263
264    /// Adds a limit to this collection.
265    ///
266    /// If a rate limit with an overlapping scope already exists, the `retry_after` count is merged
267    /// with the existing limit. Otherwise, the new rate limit is added.
268    pub fn add(&mut self, mut limit: RateLimit) {
269        // Categories are logically a set, but not implemented as such.
270        limit.categories.sort();
271
272        let limit_opt = self.limits.iter_mut().find(|l| {
273            let RateLimit {
274                categories,
275                scope,
276                reason_code: _,
277                retry_after: _,
278                namespaces: namespace,
279            } = &limit;
280
281            *categories == l.categories && *scope == l.scope && *namespace == l.namespaces
282        });
283
284        match limit_opt {
285            None => self.limits.push(limit),
286            Some(existing) if existing.retry_after < limit.retry_after => *existing = limit,
287            Some(_) => (), // keep existing, longer limit
288        }
289    }
290
291    /// Merges all limits from another [`RateLimits`] instance into this one.
292    ///
293    /// This keeps all existing rate limits, adds new ones, and updates any existing ones
294    /// with a later expiration time. The resulting collection contains the maximum
295    /// constraints from both instances.
296    pub fn merge(&mut self, limits: Self) {
297        for limit in limits {
298            self.add(limit);
299        }
300    }
301
302    /// Returns `true` if this instance contains no active limits.
303    ///
304    /// This is the opposite of [`is_limited`](Self::is_limited).
305    pub fn is_ok(&self) -> bool {
306        !self.is_limited()
307    }
308
309    /// Returns `true` if this instance contains any active rate limits.
310    ///
311    /// A rate limit is considered active if it has not yet expired.
312    pub fn is_limited(&self) -> bool {
313        let now = Instant::now();
314        self.iter().any(|limit| !limit.retry_after.expired_at(now))
315    }
316
317    /// Removes expired rate limits from this instance.
318    ///
319    /// This is useful for cleaning up rate limits that are no longer relevant,
320    /// reducing memory usage and improving performance of subsequent operations.
321    pub fn clean_expired(&mut self, now: Instant) {
322        self.limits
323            .retain(|limit| !limit.retry_after.expired_at(now));
324    }
325
326    /// Checks whether any rate limits apply to the given scoping.
327    ///
328    /// Returns a new [`RateLimits`] instance containing only the rate limits that match
329    /// the provided [`ItemScoping`]. If no limits match, the returned instance will be empty
330    /// and [`is_ok`](Self::is_ok) will return `true`.
331    pub fn check(&self, scoping: ItemScoping) -> Self {
332        self.check_with_quotas(&[], scoping)
333    }
334
335    /// Checks whether any rate limits or static quotas apply to the given scoping.
336    ///
337    /// This is similar to [`check`](Self::check), but additionally checks for quotas with a static
338    /// limit of `0`, which reject items even if there is no active rate limit in this instance.
339    ///
340    /// Returns a new [`RateLimits`] instance containing the rate limits that match the provided
341    /// [`ItemScoping`]. If no limits or quotas match, the returned instance will be empty.
342    pub fn check_with_quotas<'a>(
343        &self,
344        quotas: impl IntoIterator<Item = &'a Quota>,
345        scoping: ItemScoping,
346    ) -> Self {
347        let mut applied_limits = Self::new();
348
349        for quota in quotas {
350            if quota.limit == Some(0) && quota.matches(scoping) {
351                let retry_after = RetryAfter::from_secs(REJECT_ALL_SECS);
352                applied_limits.add(RateLimit::from_quota(quota, *scoping, retry_after));
353            }
354        }
355
356        for limit in &self.limits {
357            if limit.matches(scoping) {
358                applied_limits.add(limit.clone());
359            }
360        }
361
362        applied_limits
363    }
364
365    /// Returns an iterator over all rate limits in this collection.
366    pub fn iter(&self) -> RateLimitsIter<'_> {
367        RateLimitsIter {
368            iter: self.limits.iter(),
369        }
370    }
371
372    /// Returns the rate limit with the latest expiration time.
373    ///
374    /// If multiple rate limits have the same expiration time, any of them may be returned.
375    /// If the collection is empty, returns `None`.
376    pub fn longest(&self) -> Option<&RateLimit> {
377        self.iter().max_by_key(|limit| limit.retry_after)
378    }
379
380    /// Returns `true` if there are no rate limits in this collection.
381    ///
382    /// Note that an empty collection is not the same as having no active limits.
383    /// Use [`is_ok`](Self::is_ok) to check if there are no active limits.
384    pub fn is_empty(&self) -> bool {
385        self.limits.is_empty()
386    }
387}
388
389/// An iterator over rate limit references.
390///
391/// This struct is created by the [`iter`](RateLimits::iter) method on [`RateLimits`].
392/// It yields shared references to the rate limits in the collection.
393pub struct RateLimitsIter<'a> {
394    iter: std::slice::Iter<'a, RateLimit>,
395}
396
397impl<'a> Iterator for RateLimitsIter<'a> {
398    type Item = &'a RateLimit;
399
400    fn next(&mut self) -> Option<Self::Item> {
401        self.iter.next()
402    }
403}
404
405impl IntoIterator for RateLimits {
406    type IntoIter = RateLimitsIntoIter;
407    type Item = RateLimit;
408
409    fn into_iter(self) -> Self::IntoIter {
410        RateLimitsIntoIter {
411            iter: self.limits.into_iter(),
412        }
413    }
414}
415
416/// An iterator that consumes a [`RateLimits`] collection.
417///
418/// This struct is created by the `into_iter` method on [`RateLimits`], provided by the
419/// [`IntoIterator`] trait. It yields owned rate limits by value.
420pub struct RateLimitsIntoIter {
421    iter: std::vec::IntoIter<RateLimit>,
422}
423
424impl Iterator for RateLimitsIntoIter {
425    type Item = RateLimit;
426
427    fn next(&mut self) -> Option<Self::Item> {
428        self.iter.next()
429    }
430}
431
432impl<'a> IntoIterator for &'a RateLimits {
433    type IntoIter = RateLimitsIter<'a>;
434    type Item = &'a RateLimit;
435
436    fn into_iter(self) -> Self::IntoIter {
437        self.iter()
438    }
439}
440
441/// A thread-safe cache of rate limits with automatic expiration handling.
442///
443/// [`CachedRateLimits`] wraps a [`RateLimits`] collection with a mutex to allow safe
444/// concurrent access from multiple threads. It automatically removes expired rate limits
445/// when retrieving the current limits.
446///
447/// This is useful for maintaining a shared set of rate limits across multiple
448/// processing threads or tasks.
449#[derive(Debug, Default)]
450pub struct CachedRateLimits(Mutex<Arc<RateLimits>>);
451
452impl CachedRateLimits {
453    /// Creates a new, empty instance without any rate limits.
454    pub fn new() -> Self {
455        Self::default()
456    }
457
458    /// Adds a rate limit to this collection.
459    ///
460    /// This is a thread-safe wrapper around [`RateLimits::add`].
461    pub fn add(&self, limit: RateLimit) {
462        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
463        let current = Arc::make_mut(&mut inner);
464        current.add(limit);
465    }
466
467    /// Merges rate limits from another collection into this one.
468    ///
469    /// This is a thread-safe wrapper around [`RateLimits::merge`].
470    pub fn merge(&self, limits: RateLimits) {
471        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
472        let current = Arc::make_mut(&mut inner);
473        for limit in limits {
474            current.add(limit)
475        }
476    }
477
478    /// Returns a reference to the current rate limits.
479    ///
480    /// This method automatically removes any expired rate limits before returning,
481    /// ensuring that only active limits are included in the result.
482    pub fn current_limits(&self) -> Arc<RateLimits> {
483        let now = Instant::now();
484        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
485        Arc::make_mut(&mut inner).clean_expired(now);
486        Arc::clone(&inner)
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use smallvec::smallvec;
493
494    use super::*;
495    use crate::MetricNamespaceScoping;
496    use crate::quota::DataCategory;
497
498    #[test]
499    fn test_parse_retry_after() {
500        // positive float always rounds up to the next integer
501        let retry_after = "17.1".parse::<RetryAfter>().expect("parse RetryAfter");
502        assert_eq!(retry_after.remaining_seconds(), 18);
503        assert!(!retry_after.expired());
504        let retry_after = "17.7".parse::<RetryAfter>().expect("parse RetryAfter");
505        assert_eq!(retry_after.remaining_seconds(), 18);
506        assert!(!retry_after.expired());
507
508        // positive int
509        let retry_after = "17".parse::<RetryAfter>().expect("parse RetryAfter");
510        assert_eq!(retry_after.remaining_seconds(), 17);
511        assert!(!retry_after.expired());
512
513        // negative numbers are treated as zero
514        let retry_after = "-2".parse::<RetryAfter>().expect("parse RetryAfter");
515        assert_eq!(retry_after.remaining_seconds(), 0);
516        assert!(retry_after.expired());
517        let retry_after = "-inf".parse::<RetryAfter>().expect("parse RetryAfter");
518        assert_eq!(retry_after.remaining_seconds(), 0);
519        assert!(retry_after.expired());
520
521        // inf and NaN are valid input and treated as zero
522        let retry_after = "inf".parse::<RetryAfter>().expect("parse RetryAfter");
523        assert_eq!(retry_after.remaining_seconds(), 0);
524        assert!(retry_after.expired());
525        let retry_after = "NaN".parse::<RetryAfter>().expect("parse RetryAfter");
526        assert_eq!(retry_after.remaining_seconds(), 0);
527        assert!(retry_after.expired());
528
529        // large inputs that would overflow are treated as zero
530        let retry_after = "100000000000000000000"
531            .parse::<RetryAfter>()
532            .expect("parse RetryAfter");
533        assert_eq!(retry_after.remaining_seconds(), 0);
534        assert!(retry_after.expired());
535
536        // invalid strings cause parse error
537        "".parse::<RetryAfter>().expect_err("error RetryAfter");
538        "nope".parse::<RetryAfter>().expect_err("error RetryAfter");
539        " 2 ".parse::<RetryAfter>().expect_err("error RetryAfter");
540        "6 0".parse::<RetryAfter>().expect_err("error RetryAfter");
541    }
542
543    #[test]
544    fn test_rate_limit_matches_categories() {
545        let rate_limit = RateLimit {
546            categories: smallvec![DataCategory::Unknown, DataCategory::Error],
547            scope: RateLimitScope::Organization(OrganizationId::new(42)),
548            reason_code: None,
549            retry_after: RetryAfter::from_secs(1),
550            namespaces: smallvec![],
551        };
552
553        assert!(rate_limit.matches(ItemScoping {
554            category: DataCategory::Error,
555            scoping: Scoping {
556                organization_id: OrganizationId::new(42),
557                project_id: ProjectId::new(21),
558                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
559                key_id: None,
560            },
561            namespace: MetricNamespaceScoping::None,
562        }));
563
564        assert!(!rate_limit.matches(ItemScoping {
565            category: DataCategory::Transaction,
566            scoping: Scoping {
567                organization_id: OrganizationId::new(42),
568                project_id: ProjectId::new(21),
569                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
570                key_id: None,
571            },
572            namespace: MetricNamespaceScoping::None,
573        }));
574    }
575
576    #[test]
577    fn test_rate_limit_matches_organization() {
578        let rate_limit = RateLimit {
579            categories: DataCategories::new(),
580            scope: RateLimitScope::Organization(OrganizationId::new(42)),
581            reason_code: None,
582            retry_after: RetryAfter::from_secs(1),
583            namespaces: smallvec![],
584        };
585
586        assert!(rate_limit.matches(ItemScoping {
587            category: DataCategory::Error,
588            scoping: Scoping {
589                organization_id: OrganizationId::new(42),
590                project_id: ProjectId::new(21),
591                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
592                key_id: None,
593            },
594            namespace: MetricNamespaceScoping::None,
595        }));
596
597        assert!(!rate_limit.matches(ItemScoping {
598            category: DataCategory::Error,
599            scoping: Scoping {
600                organization_id: OrganizationId::new(0),
601                project_id: ProjectId::new(21),
602                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
603                key_id: None,
604            },
605            namespace: MetricNamespaceScoping::None,
606        }));
607    }
608
609    #[test]
610    fn test_rate_limit_matches_project() {
611        let rate_limit = RateLimit {
612            categories: DataCategories::new(),
613            scope: RateLimitScope::Project(ProjectId::new(21)),
614            reason_code: None,
615            retry_after: RetryAfter::from_secs(1),
616            namespaces: smallvec![],
617        };
618
619        assert!(rate_limit.matches(ItemScoping {
620            category: DataCategory::Error,
621            scoping: Scoping {
622                organization_id: OrganizationId::new(42),
623                project_id: ProjectId::new(21),
624                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
625                key_id: None,
626            },
627            namespace: MetricNamespaceScoping::None,
628        }));
629
630        assert!(!rate_limit.matches(ItemScoping {
631            category: DataCategory::Error,
632            scoping: Scoping {
633                organization_id: OrganizationId::new(42),
634                project_id: ProjectId::new(0),
635                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
636                key_id: None,
637            },
638            namespace: MetricNamespaceScoping::None,
639        }));
640    }
641
642    #[test]
643    fn test_rate_limit_matches_namespaces() {
644        let rate_limit = RateLimit {
645            categories: smallvec![],
646            scope: RateLimitScope::Organization(OrganizationId::new(42)),
647            reason_code: None,
648            retry_after: RetryAfter::from_secs(1),
649            namespaces: smallvec![MetricNamespace::Custom],
650        };
651
652        let scoping = Scoping {
653            organization_id: OrganizationId::new(42),
654            project_id: ProjectId::new(21),
655            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
656            key_id: None,
657        };
658
659        assert!(rate_limit.matches(ItemScoping {
660            category: DataCategory::MetricBucket,
661            scoping,
662            namespace: MetricNamespaceScoping::Some(MetricNamespace::Custom),
663        }));
664
665        assert!(!rate_limit.matches(ItemScoping {
666            category: DataCategory::MetricBucket,
667            scoping,
668            namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
669        }));
670
671        let general_rate_limit = RateLimit {
672            categories: smallvec![],
673            scope: RateLimitScope::Organization(OrganizationId::new(42)),
674            reason_code: None,
675            retry_after: RetryAfter::from_secs(1),
676            namespaces: smallvec![], // all namespaces
677        };
678
679        assert!(general_rate_limit.matches(ItemScoping {
680            category: DataCategory::MetricBucket,
681            scoping,
682            namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
683        }));
684
685        assert!(general_rate_limit.matches(ItemScoping {
686            category: DataCategory::MetricBucket,
687            scoping,
688            namespace: MetricNamespaceScoping::None,
689        }));
690    }
691
692    #[test]
693    fn test_rate_limit_matches_key() {
694        let rate_limit = RateLimit {
695            categories: DataCategories::new(),
696            scope: RateLimitScope::Key(
697                ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
698            ),
699            reason_code: None,
700            retry_after: RetryAfter::from_secs(1),
701            namespaces: smallvec![],
702        };
703
704        assert!(rate_limit.matches(ItemScoping {
705            category: DataCategory::Error,
706            scoping: Scoping {
707                organization_id: OrganizationId::new(42),
708                project_id: ProjectId::new(21),
709                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
710                key_id: None,
711            },
712            namespace: MetricNamespaceScoping::None,
713        }));
714
715        assert!(!rate_limit.matches(ItemScoping {
716            category: DataCategory::Error,
717            scoping: Scoping {
718                organization_id: OrganizationId::new(0),
719                project_id: ProjectId::new(21),
720                project_key: ProjectKey::parse("deadbeefdeadbeefdeadbeefdeadbeef").unwrap(),
721                key_id: None,
722            },
723            namespace: MetricNamespaceScoping::None,
724        }));
725    }
726
727    #[test]
728    fn test_rate_limits_add_replacement() {
729        let mut rate_limits = RateLimits::new();
730
731        rate_limits.add(RateLimit {
732            categories: smallvec![DataCategory::Default, DataCategory::Error],
733            scope: RateLimitScope::Organization(OrganizationId::new(42)),
734            reason_code: Some(ReasonCode::new("first")),
735            retry_after: RetryAfter::from_secs(1),
736            namespaces: smallvec![],
737        });
738
739        // longer rate limit shadows shorter one
740        rate_limits.add(RateLimit {
741            categories: smallvec![DataCategory::Error, DataCategory::Default],
742            scope: RateLimitScope::Organization(OrganizationId::new(42)),
743            reason_code: Some(ReasonCode::new("second")),
744            retry_after: RetryAfter::from_secs(10),
745            namespaces: smallvec![],
746        });
747
748        insta::assert_ron_snapshot!(rate_limits, @r###"
749        RateLimits(
750          limits: [
751            RateLimit(
752              categories: [
753                default,
754                error,
755              ],
756              scope: Organization(OrganizationId(42)),
757              reason_code: Some(ReasonCode("second")),
758              retry_after: RetryAfter(10),
759              namespaces: [],
760            ),
761          ],
762        )
763        "###);
764    }
765
766    #[test]
767    fn test_rate_limits_add_shadowing() {
768        let mut rate_limits = RateLimits::new();
769
770        rate_limits.add(RateLimit {
771            categories: smallvec![DataCategory::Default, DataCategory::Error],
772            scope: RateLimitScope::Organization(OrganizationId::new(42)),
773            reason_code: Some(ReasonCode::new("first")),
774            retry_after: RetryAfter::from_secs(10),
775            namespaces: smallvec![],
776        });
777
778        // shorter rate limit is shadowed by existing one
779        rate_limits.add(RateLimit {
780            categories: smallvec![DataCategory::Error, DataCategory::Default],
781            scope: RateLimitScope::Organization(OrganizationId::new(42)),
782            reason_code: Some(ReasonCode::new("second")),
783            retry_after: RetryAfter::from_secs(1),
784            namespaces: smallvec![],
785        });
786
787        insta::assert_ron_snapshot!(rate_limits, @r###"
788        RateLimits(
789          limits: [
790            RateLimit(
791              categories: [
792                default,
793                error,
794              ],
795              scope: Organization(OrganizationId(42)),
796              reason_code: Some(ReasonCode("first")),
797              retry_after: RetryAfter(10),
798              namespaces: [],
799            ),
800          ],
801        )
802        "###);
803    }
804
805    #[test]
806    fn test_rate_limits_add_buckets() {
807        let mut rate_limits = RateLimits::new();
808
809        rate_limits.add(RateLimit {
810            categories: smallvec![DataCategory::Error],
811            scope: RateLimitScope::Organization(OrganizationId::new(42)),
812            reason_code: None,
813            retry_after: RetryAfter::from_secs(1),
814            namespaces: smallvec![],
815        });
816
817        // Same scope but different categories
818        rate_limits.add(RateLimit {
819            categories: smallvec![DataCategory::Transaction],
820            scope: RateLimitScope::Organization(OrganizationId::new(42)),
821            reason_code: None,
822            retry_after: RetryAfter::from_secs(1),
823            namespaces: smallvec![],
824        });
825
826        // Same categories but different scope
827        rate_limits.add(RateLimit {
828            categories: smallvec![DataCategory::Error],
829            scope: RateLimitScope::Project(ProjectId::new(21)),
830            reason_code: None,
831            retry_after: RetryAfter::from_secs(1),
832            namespaces: smallvec![],
833        });
834
835        insta::assert_ron_snapshot!(rate_limits, @r###"
836        RateLimits(
837          limits: [
838            RateLimit(
839              categories: [
840                error,
841              ],
842              scope: Organization(OrganizationId(42)),
843              reason_code: None,
844              retry_after: RetryAfter(1),
845              namespaces: [],
846            ),
847            RateLimit(
848              categories: [
849                transaction,
850              ],
851              scope: Organization(OrganizationId(42)),
852              reason_code: None,
853              retry_after: RetryAfter(1),
854              namespaces: [],
855            ),
856            RateLimit(
857              categories: [
858                error,
859              ],
860              scope: Project(ProjectId(21)),
861              reason_code: None,
862              retry_after: RetryAfter(1),
863              namespaces: [],
864            ),
865          ],
866        )
867        "###);
868    }
869
870    /// Regression test that ensures namespaces are correctly added to rate limits.
871    #[test]
872    fn test_rate_limits_add_namespaces() {
873        let mut rate_limits = RateLimits::new();
874
875        rate_limits.add(RateLimit {
876            categories: smallvec![DataCategory::MetricBucket],
877            scope: RateLimitScope::Organization(OrganizationId::new(42)),
878            reason_code: None,
879            retry_after: RetryAfter::from_secs(1),
880            namespaces: smallvec![MetricNamespace::Custom],
881        });
882
883        // Same category but different namespaces
884        rate_limits.add(RateLimit {
885            categories: smallvec![DataCategory::MetricBucket],
886            scope: RateLimitScope::Organization(OrganizationId::new(42)),
887            reason_code: None,
888            retry_after: RetryAfter::from_secs(1),
889            namespaces: smallvec![MetricNamespace::Spans],
890        });
891
892        insta::assert_ron_snapshot!(rate_limits, @r###"
893        RateLimits(
894          limits: [
895            RateLimit(
896              categories: [
897                metric_bucket,
898              ],
899              scope: Organization(OrganizationId(42)),
900              reason_code: None,
901              retry_after: RetryAfter(1),
902              namespaces: [
903                "custom",
904              ],
905            ),
906            RateLimit(
907              categories: [
908                metric_bucket,
909              ],
910              scope: Organization(OrganizationId(42)),
911              reason_code: None,
912              retry_after: RetryAfter(1),
913              namespaces: [
914                "spans",
915              ],
916            ),
917          ],
918        )
919        "###);
920    }
921
922    #[test]
923    fn test_rate_limits_longest() {
924        let mut rate_limits = RateLimits::new();
925
926        rate_limits.add(RateLimit {
927            categories: smallvec![DataCategory::Error],
928            scope: RateLimitScope::Organization(OrganizationId::new(42)),
929            reason_code: Some(ReasonCode::new("first")),
930            retry_after: RetryAfter::from_secs(1),
931            namespaces: smallvec![],
932        });
933
934        // Distinct scope to prevent deduplication
935        rate_limits.add(RateLimit {
936            categories: smallvec![DataCategory::Transaction],
937            scope: RateLimitScope::Organization(OrganizationId::new(42)),
938            reason_code: Some(ReasonCode::new("second")),
939            retry_after: RetryAfter::from_secs(10),
940            namespaces: smallvec![],
941        });
942
943        let rate_limit = rate_limits.longest().unwrap();
944        insta::assert_ron_snapshot!(rate_limit, @r###"
945        RateLimit(
946          categories: [
947            transaction,
948          ],
949          scope: Organization(OrganizationId(42)),
950          reason_code: Some(ReasonCode("second")),
951          retry_after: RetryAfter(10),
952          namespaces: [],
953        )
954        "###);
955    }
956
957    #[test]
958    fn test_rate_limits_clean_expired() {
959        let mut rate_limits = RateLimits::new();
960
961        // Active error limit
962        rate_limits.add(RateLimit {
963            categories: smallvec![DataCategory::Error],
964            scope: RateLimitScope::Organization(OrganizationId::new(42)),
965            reason_code: None,
966            retry_after: RetryAfter::from_secs(1),
967            namespaces: smallvec![],
968        });
969
970        // Inactive error limit with distinct scope
971        rate_limits.add(RateLimit {
972            categories: smallvec![DataCategory::Error],
973            scope: RateLimitScope::Project(ProjectId::new(21)),
974            reason_code: None,
975            retry_after: RetryAfter::from_secs(0),
976            namespaces: smallvec![],
977        });
978
979        // Sanity check before running `clean_expired`
980        assert_eq!(rate_limits.iter().count(), 2);
981
982        rate_limits.clean_expired(Instant::now());
983
984        // Check that the expired limit has been removed
985        insta::assert_ron_snapshot!(rate_limits, @r###"
986        RateLimits(
987          limits: [
988            RateLimit(
989              categories: [
990                error,
991              ],
992              scope: Organization(OrganizationId(42)),
993              reason_code: None,
994              retry_after: RetryAfter(1),
995              namespaces: [],
996            ),
997          ],
998        )
999        "###);
1000    }
1001
1002    #[test]
1003    fn test_rate_limits_check() {
1004        let mut rate_limits = RateLimits::new();
1005
1006        // Active error limit
1007        rate_limits.add(RateLimit {
1008            categories: smallvec![DataCategory::Error],
1009            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1010            reason_code: None,
1011            retry_after: RetryAfter::from_secs(1),
1012            namespaces: smallvec![],
1013        });
1014
1015        // Active transaction limit
1016        rate_limits.add(RateLimit {
1017            categories: smallvec![DataCategory::Transaction],
1018            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1019            reason_code: None,
1020            retry_after: RetryAfter::from_secs(1),
1021            namespaces: smallvec![],
1022        });
1023
1024        let applied_limits = rate_limits.check(ItemScoping {
1025            category: DataCategory::Error,
1026            scoping: Scoping {
1027                organization_id: OrganizationId::new(42),
1028                project_id: ProjectId::new(21),
1029                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1030                key_id: None,
1031            },
1032            namespace: MetricNamespaceScoping::None,
1033        });
1034
1035        // Check that the error limit is applied
1036        insta::assert_ron_snapshot!(applied_limits, @r###"
1037        RateLimits(
1038          limits: [
1039            RateLimit(
1040              categories: [
1041                error,
1042              ],
1043              scope: Organization(OrganizationId(42)),
1044              reason_code: None,
1045              retry_after: RetryAfter(1),
1046              namespaces: [],
1047            ),
1048          ],
1049        )
1050        "###);
1051    }
1052
1053    #[test]
1054    fn test_rate_limits_check_quotas() {
1055        let mut rate_limits = RateLimits::new();
1056
1057        // Active error limit
1058        rate_limits.add(RateLimit {
1059            categories: smallvec![DataCategory::Error],
1060            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1061            reason_code: None,
1062            retry_after: RetryAfter::from_secs(1),
1063            namespaces: smallvec![],
1064        });
1065
1066        // Active transaction limit
1067        rate_limits.add(RateLimit {
1068            categories: smallvec![DataCategory::Transaction],
1069            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1070            reason_code: None,
1071            retry_after: RetryAfter::from_secs(1),
1072            namespaces: smallvec![],
1073        });
1074
1075        let item_scoping = ItemScoping {
1076            category: DataCategory::Error,
1077            scoping: Scoping {
1078                organization_id: OrganizationId::new(42),
1079                project_id: ProjectId::new(21),
1080                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1081                key_id: None,
1082            },
1083            namespace: MetricNamespaceScoping::None,
1084        };
1085
1086        let quotas = &[Quota {
1087            id: None,
1088            categories: smallvec![DataCategory::Error],
1089            scope: QuotaScope::Organization,
1090            scope_id: Some("42".to_owned()),
1091            limit: Some(0),
1092            window: None,
1093            reason_code: Some(ReasonCode::new("zero")),
1094            namespace: None,
1095        }];
1096
1097        let applied_limits = rate_limits.check_with_quotas(quotas, item_scoping);
1098
1099        insta::assert_ron_snapshot!(applied_limits, @r###"
1100        RateLimits(
1101          limits: [
1102            RateLimit(
1103              categories: [
1104                error,
1105              ],
1106              scope: Organization(OrganizationId(42)),
1107              reason_code: Some(ReasonCode("zero")),
1108              retry_after: RetryAfter(60),
1109              namespaces: [],
1110            ),
1111          ],
1112        )
1113        "###);
1114    }
1115
1116    #[test]
1117    fn test_rate_limits_merge() {
1118        let mut rate_limits1 = RateLimits::new();
1119        let mut rate_limits2 = RateLimits::new();
1120
1121        rate_limits1.add(RateLimit {
1122            categories: smallvec![DataCategory::Error],
1123            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1124            reason_code: Some(ReasonCode::new("first")),
1125            retry_after: RetryAfter::from_secs(1),
1126            namespaces: smallvec![],
1127        });
1128
1129        rate_limits1.add(RateLimit {
1130            categories: smallvec![DataCategory::TransactionIndexed],
1131            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1132            reason_code: None,
1133            retry_after: RetryAfter::from_secs(1),
1134            namespaces: smallvec![],
1135        });
1136
1137        rate_limits2.add(RateLimit {
1138            categories: smallvec![DataCategory::Error],
1139            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1140            reason_code: Some(ReasonCode::new("second")),
1141            retry_after: RetryAfter::from_secs(10),
1142            namespaces: smallvec![],
1143        });
1144
1145        rate_limits1.merge(rate_limits2);
1146
1147        insta::assert_ron_snapshot!(rate_limits1, @r###"
1148        RateLimits(
1149          limits: [
1150            RateLimit(
1151              categories: [
1152                error,
1153              ],
1154              scope: Organization(OrganizationId(42)),
1155              reason_code: Some(ReasonCode("second")),
1156              retry_after: RetryAfter(10),
1157              namespaces: [],
1158            ),
1159            RateLimit(
1160              categories: [
1161                transaction_indexed,
1162              ],
1163              scope: Organization(OrganizationId(42)),
1164              reason_code: None,
1165              retry_after: RetryAfter(1),
1166              namespaces: [],
1167            ),
1168          ],
1169        )
1170        "###);
1171    }
1172
1173    #[test]
1174    fn test_cached_rate_limits_expired() {
1175        let cached = CachedRateLimits::new();
1176
1177        // Active error limit
1178        cached.add(RateLimit {
1179            categories: smallvec![DataCategory::Error],
1180            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1181            reason_code: None,
1182            retry_after: RetryAfter::from_secs(1),
1183            namespaces: smallvec![],
1184        });
1185
1186        // Inactive error limit with distinct scope
1187        cached.add(RateLimit {
1188            categories: smallvec![DataCategory::Error],
1189            scope: RateLimitScope::Project(ProjectId::new(21)),
1190            reason_code: None,
1191            retry_after: RetryAfter::from_secs(0),
1192            namespaces: smallvec![],
1193        });
1194
1195        let rate_limits = cached.current_limits();
1196
1197        insta::assert_ron_snapshot!(rate_limits, @r###"
1198        RateLimits(
1199          limits: [
1200            RateLimit(
1201              categories: [
1202                error,
1203              ],
1204              scope: Organization(OrganizationId(42)),
1205              reason_code: None,
1206              retry_after: RetryAfter(1),
1207              namespaces: [],
1208            ),
1209          ],
1210        )
1211        "###);
1212    }
1213}