relay_quotas/
rate_limit.rs

1use std::fmt;
2use std::str::FromStr;
3use std::sync::{Arc, Mutex, PoisonError};
4use std::time::{Duration, Instant};
5
6use relay_base_schema::metrics::MetricNamespace;
7use relay_base_schema::organization::OrganizationId;
8use relay_base_schema::project::{ProjectId, ProjectKey};
9use smallvec::SmallVec;
10
11use crate::REJECT_ALL_SECS;
12use crate::quota::{DataCategories, ItemScoping, Quota, QuotaScope, ReasonCode, Scoping};
13
14/// A monotonic expiration marker for rate limits.
15///
16/// [`RetryAfter`] represents a point in time when a rate limit expires. It allows checking
17/// whether the rate limit is still active or has expired, and calculating the remaining time
18/// until expiration.
19#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
20pub struct RetryAfter {
21    when: Instant,
22}
23
24impl RetryAfter {
25    /// Creates a new [`RetryAfter`] instance that expires after the specified number of seconds.
26    #[inline]
27    pub fn from_secs(seconds: u64) -> Self {
28        let now = Instant::now();
29        let when = now.checked_add(Duration::from_secs(seconds)).unwrap_or(now);
30        Self { when }
31    }
32
33    /// Returns the remaining duration until the rate limit expires using the specified instant
34    /// as the reference point.
35    ///
36    /// If the rate limit has already expired at the given instant, returns `None`.
37    #[inline]
38    pub fn remaining_at(self, at: Instant) -> Option<Duration> {
39        if at >= self.when {
40            None
41        } else {
42            Some(self.when - at)
43        }
44    }
45
46    /// Returns the remaining duration until the rate limit expires.
47    ///
48    /// This uses the current instant as the reference point. If the rate limit has already
49    /// expired, returns `None`.
50    #[inline]
51    pub fn remaining(self) -> Option<Duration> {
52        self.remaining_at(Instant::now())
53    }
54
55    /// Returns the remaining seconds until the rate limit expires using the specified instant
56    /// as the reference point.
57    ///
58    /// This method rounds up to the next second to ensure that rate limits are strictly enforced.
59    /// If the rate limit has already expired, returns `0`.
60    #[inline]
61    pub fn remaining_seconds_at(self, at: Instant) -> u64 {
62        match self.remaining_at(at) {
63            // Compensate for the missing subsec part by adding 1s
64            Some(duration) if duration.subsec_nanos() == 0 => duration.as_secs(),
65            Some(duration) => duration.as_secs() + 1,
66            None => 0,
67        }
68    }
69
70    /// Returns the remaining seconds until the rate limit expires.
71    ///
72    /// This uses the current instant as the reference point. If the rate limit
73    /// has already expired, returns `0`.
74    #[inline]
75    pub fn remaining_seconds(self) -> u64 {
76        self.remaining_seconds_at(Instant::now())
77    }
78
79    /// Returns whether this rate limit has expired at the specified instant.
80    #[inline]
81    pub fn expired_at(self, at: Instant) -> bool {
82        self.remaining_at(at).is_none()
83    }
84
85    /// Returns whether this rate limit has expired at the current instant.
86    #[inline]
87    pub fn expired(self) -> bool {
88        self.remaining_at(Instant::now()).is_none()
89    }
90}
91
92impl fmt::Debug for RetryAfter {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        match self.remaining_seconds() {
95            0 => write!(f, "RetryAfter(expired)"),
96            remaining => write!(f, "RetryAfter({remaining}s)"),
97        }
98    }
99}
100
101#[cfg(test)]
102impl serde::Serialize for RetryAfter {
103    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
104    where
105        S: serde::Serializer,
106    {
107        use serde::ser::SerializeTupleStruct;
108        let mut tup = serializer.serialize_tuple_struct("RetryAfter", 1)?;
109        tup.serialize_field(&self.remaining_seconds())?;
110        tup.end()
111    }
112}
113
114/// Error that occurs when parsing a [`RetryAfter`] from a string fails.
115#[derive(Debug)]
116pub enum InvalidRetryAfter {
117    /// The supplied delay in seconds was not valid.
118    InvalidDelay(std::num::ParseFloatError),
119}
120
121impl FromStr for RetryAfter {
122    type Err = InvalidRetryAfter;
123
124    fn from_str(s: &str) -> Result<Self, Self::Err> {
125        let float = s.parse::<f64>().map_err(InvalidRetryAfter::InvalidDelay)?;
126        let seconds = float.max(0.0).ceil() as u64;
127        Ok(RetryAfter::from_secs(seconds))
128    }
129}
130
131/// The scope that a rate limit applies to.
132///
133/// Unlike [`QuotaScope`], which only declares the class of the scope, this enum carries
134/// the specific identifiers of the individual scopes that a rate limit applies to.
135///
136/// Rate limits can be applied at different levels of granularity, from global (affecting all data)
137/// down to a specific project key.
138#[derive(Clone, Debug, Eq, PartialEq)]
139#[cfg_attr(test, derive(serde::Serialize))]
140pub enum RateLimitScope {
141    /// Global scope.
142    Global,
143    /// An organization with identifier.
144    Organization(OrganizationId),
145    /// A project with identifier.
146    Project(ProjectId),
147    /// A DSN public key.
148    Key(ProjectKey),
149}
150
151impl RateLimitScope {
152    /// Creates a rate limiting scope from the given item scoping for a specific quota.
153    ///
154    /// This extracts the appropriate scope identifier based on the quota's scope type.
155    /// For unknown scopes, it assumes the most specific scope (Key).
156    pub fn for_quota(scoping: Scoping, scope: QuotaScope) -> Self {
157        match scope {
158            QuotaScope::Global => Self::Global,
159            QuotaScope::Organization => Self::Organization(scoping.organization_id),
160            QuotaScope::Project => Self::Project(scoping.project_id),
161            QuotaScope::Key => Self::Key(scoping.project_key),
162            // For unknown scopes, assume the most specific scope:
163            QuotaScope::Unknown => Self::Key(scoping.project_key),
164        }
165    }
166
167    /// Returns the canonical name of this scope.
168    ///
169    /// This corresponds to the name of the corresponding [`QuotaScope`].
170    pub fn name(&self) -> &'static str {
171        match *self {
172            Self::Global => QuotaScope::Global.name(),
173            Self::Key(_) => QuotaScope::Key.name(),
174            Self::Project(_) => QuotaScope::Project.name(),
175            Self::Organization(_) => QuotaScope::Organization.name(),
176        }
177    }
178}
179
180/// An active rate limit that restricts data ingestion.
181///
182/// A rate limit defines restrictions for specific data categories within a particular scope.
183/// It includes an expiration time after which the limit is no longer enforced.
184///
185/// Rate limits can be created from [`Quota`]s or directly constructed with the needed parameters.
186#[derive(Clone, Debug, PartialEq)]
187#[cfg_attr(test, derive(serde::Serialize))]
188pub struct RateLimit {
189    /// A set of data categories that this quota applies to. If empty, this rate limit
190    /// applies to all data categories.
191    pub categories: DataCategories,
192
193    /// The scope of this rate limit.
194    pub scope: RateLimitScope,
195
196    /// A machine-readable reason indicating which quota caused this rate limit.
197    pub reason_code: Option<ReasonCode>,
198
199    /// A marker when this rate limit expires.
200    pub retry_after: RetryAfter,
201
202    /// The metric namespace of this rate limit.
203    ///
204    /// Only relevant for data categories of type metric bucket. If empty,
205    /// this rate limit applies to metrics of all namespaces.
206    pub namespaces: SmallVec<[MetricNamespace; 1]>,
207}
208
209impl RateLimit {
210    /// Creates a new rate limit from the given [`Quota`].
211    ///
212    /// This builds a rate limit with the appropriate scope derived from the quota and scoping
213    /// information. The categories and other properties are copied from the quota.
214    pub fn from_quota(quota: &Quota, scoping: Scoping, retry_after: RetryAfter) -> Self {
215        Self {
216            categories: quota.categories.clone(),
217            scope: RateLimitScope::for_quota(scoping, quota.scope),
218            reason_code: quota.reason_code.clone(),
219            retry_after,
220            namespaces: quota.namespace.into_iter().collect(),
221        }
222    }
223
224    /// Checks whether this rate limit applies to the given item.
225    ///
226    /// A rate limit applies if its scope matches the item's scope, and the item's
227    /// category and namespace match those of the rate limit.
228    pub fn matches(&self, scoping: ItemScoping) -> bool {
229        self.matches_scope(scoping)
230            && scoping.matches_categories(&self.categories)
231            && scoping.matches_namespaces(&self.namespaces)
232    }
233
234    /// Returns `true` if the rate limiting scope matches the given item.
235    fn matches_scope(&self, scoping: ItemScoping) -> bool {
236        match self.scope {
237            RateLimitScope::Global => true,
238            RateLimitScope::Organization(org_id) => scoping.organization_id == org_id,
239            RateLimitScope::Project(project_id) => scoping.project_id == project_id,
240            RateLimitScope::Key(key) => scoping.project_key == key,
241        }
242    }
243}
244
245/// A collection of scoped rate limits.
246///
247/// [`RateLimits`] manages a set of active rate limits that can be checked against
248/// incoming data.
249///
250/// The collection can be empty, indicated by [`is_ok`](Self::is_ok), meaning no rate limits
251/// are currently active.
252#[derive(Clone, Debug, Default)]
253#[cfg_attr(test, derive(serde::Serialize))]
254pub struct RateLimits {
255    limits: Vec<RateLimit>,
256}
257
258impl RateLimits {
259    /// Creates an empty [`RateLimits`] instance.
260    pub fn new() -> Self {
261        Self::default()
262    }
263
264    /// Adds a limit to this collection.
265    ///
266    /// If a rate limit with an overlapping scope already exists, the `retry_after` count is merged
267    /// with the existing limit. Otherwise, the new rate limit is added.
268    pub fn add(&mut self, mut limit: RateLimit) {
269        // Categories are logically a set, but not implemented as such.
270        limit.categories.sort();
271
272        let limit_opt = self.limits.iter_mut().find(|l| {
273            let RateLimit {
274                categories,
275                scope,
276                reason_code: _,
277                retry_after: _,
278                namespaces: namespace,
279            } = &limit;
280
281            *categories == l.categories && *scope == l.scope && *namespace == l.namespaces
282        });
283
284        match limit_opt {
285            None => self.limits.push(limit),
286            Some(existing) if existing.retry_after < limit.retry_after => *existing = limit,
287            Some(_) => (), // keep existing, longer limit
288        }
289    }
290
291    /// Merges all limits from another [`RateLimits`] instance into this one.
292    ///
293    /// This keeps all existing rate limits, adds new ones, and updates any existing ones
294    /// with a later expiration time. The resulting collection contains the maximum
295    /// constraints from both instances.
296    pub fn merge(&mut self, limits: Self) {
297        for limit in limits {
298            self.add(limit);
299        }
300    }
301
302    /// Merges all limits from another [`RateLimits`] with this one.
303    ///
304    /// See also: [`Self::merge`].
305    pub fn merge_with(mut self, other: Self) -> Self {
306        self.merge(other);
307        self
308    }
309
310    /// Returns `true` if this instance contains no active limits.
311    ///
312    /// This is the opposite of [`is_limited`](Self::is_limited).
313    pub fn is_ok(&self) -> bool {
314        !self.is_limited()
315    }
316
317    /// Returns `true` if this instance contains any active rate limits.
318    ///
319    /// A rate limit is considered active if it has not yet expired.
320    pub fn is_limited(&self) -> bool {
321        let now = Instant::now();
322        self.iter().any(|limit| !limit.retry_after.expired_at(now))
323    }
324
325    /// Removes expired rate limits from this instance.
326    ///
327    /// This is useful for cleaning up rate limits that are no longer relevant,
328    /// reducing memory usage and improving performance of subsequent operations.
329    pub fn clean_expired(&mut self, now: Instant) {
330        self.limits
331            .retain(|limit| !limit.retry_after.expired_at(now));
332    }
333
334    /// Checks whether any rate limits apply to the given scoping.
335    ///
336    /// Returns a new [`RateLimits`] instance containing only the rate limits that match
337    /// the provided [`ItemScoping`]. If no limits match, the returned instance will be empty
338    /// and [`is_ok`](Self::is_ok) will return `true`.
339    pub fn check(&self, scoping: ItemScoping) -> Self {
340        self.check_with_quotas(&[], scoping)
341    }
342
343    /// Checks whether any rate limits or static quotas apply to the given scoping.
344    ///
345    /// This is similar to [`check`](Self::check), but additionally checks for quotas with a static
346    /// limit of `0`, which reject items even if there is no active rate limit in this instance.
347    ///
348    /// Returns a new [`RateLimits`] instance containing the rate limits that match the provided
349    /// [`ItemScoping`]. If no limits or quotas match, the returned instance will be empty.
350    pub fn check_with_quotas<'a>(
351        &self,
352        quotas: impl IntoIterator<Item = &'a Quota>,
353        scoping: ItemScoping,
354    ) -> Self {
355        let mut applied_limits = Self::new();
356
357        for quota in quotas {
358            if quota.limit == Some(0) && quota.matches(scoping) {
359                let retry_after = RetryAfter::from_secs(REJECT_ALL_SECS);
360                applied_limits.add(RateLimit::from_quota(quota, *scoping, retry_after));
361            }
362        }
363
364        for limit in &self.limits {
365            if limit.matches(scoping) {
366                applied_limits.add(limit.clone());
367            }
368        }
369
370        applied_limits
371    }
372
373    /// Returns an iterator over all rate limits in this collection.
374    pub fn iter(&self) -> RateLimitsIter<'_> {
375        RateLimitsIter {
376            iter: self.limits.iter(),
377        }
378    }
379
380    /// Returns the rate limit with the latest expiration time.
381    ///
382    /// If multiple rate limits have the same expiration time, any of them may be returned.
383    /// If the collection is empty, returns `None`.
384    pub fn longest(&self) -> Option<&RateLimit> {
385        self.iter().max_by_key(|limit| limit.retry_after)
386    }
387
388    /// Returns `true` if there are no rate limits in this collection.
389    ///
390    /// Note that an empty collection is not the same as having no active limits.
391    /// Use [`is_ok`](Self::is_ok) to check if there are no active limits.
392    pub fn is_empty(&self) -> bool {
393        self.limits.is_empty()
394    }
395}
396
397/// An iterator over rate limit references.
398///
399/// This struct is created by the [`iter`](RateLimits::iter) method on [`RateLimits`].
400/// It yields shared references to the rate limits in the collection.
401pub struct RateLimitsIter<'a> {
402    iter: std::slice::Iter<'a, RateLimit>,
403}
404
405impl<'a> Iterator for RateLimitsIter<'a> {
406    type Item = &'a RateLimit;
407
408    fn next(&mut self) -> Option<Self::Item> {
409        self.iter.next()
410    }
411}
412
413impl IntoIterator for RateLimits {
414    type IntoIter = RateLimitsIntoIter;
415    type Item = RateLimit;
416
417    fn into_iter(self) -> Self::IntoIter {
418        RateLimitsIntoIter {
419            iter: self.limits.into_iter(),
420        }
421    }
422}
423
424/// An iterator that consumes a [`RateLimits`] collection.
425///
426/// This struct is created by the `into_iter` method on [`RateLimits`], provided by the
427/// [`IntoIterator`] trait. It yields owned rate limits by value.
428pub struct RateLimitsIntoIter {
429    iter: std::vec::IntoIter<RateLimit>,
430}
431
432impl Iterator for RateLimitsIntoIter {
433    type Item = RateLimit;
434
435    fn next(&mut self) -> Option<Self::Item> {
436        self.iter.next()
437    }
438}
439
440impl<'a> IntoIterator for &'a RateLimits {
441    type IntoIter = RateLimitsIter<'a>;
442    type Item = &'a RateLimit;
443
444    fn into_iter(self) -> Self::IntoIter {
445        self.iter()
446    }
447}
448
449/// A thread-safe cache of rate limits with automatic expiration handling.
450///
451/// [`CachedRateLimits`] wraps a [`RateLimits`] collection with a mutex to allow safe
452/// concurrent access from multiple threads. It automatically removes expired rate limits
453/// when retrieving the current limits.
454///
455/// This is useful for maintaining a shared set of rate limits across multiple
456/// processing threads or tasks.
457#[derive(Debug, Default)]
458pub struct CachedRateLimits(Mutex<Arc<RateLimits>>);
459
460impl CachedRateLimits {
461    /// Creates a new, empty instance without any rate limits.
462    pub fn new() -> Self {
463        Self::default()
464    }
465
466    /// Adds a rate limit to this collection.
467    ///
468    /// This is a thread-safe wrapper around [`RateLimits::add`].
469    pub fn add(&self, limit: RateLimit) {
470        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
471        let current = Arc::make_mut(&mut inner);
472        current.add(limit);
473    }
474
475    /// Merges rate limits from another collection into this one.
476    ///
477    /// This is a thread-safe wrapper around [`RateLimits::merge`].
478    pub fn merge(&self, limits: RateLimits) {
479        if limits.is_empty() {
480            return;
481        }
482
483        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
484        let current = Arc::make_mut(&mut inner);
485        for limit in limits {
486            current.add(limit)
487        }
488    }
489
490    /// Returns a reference to the current rate limits.
491    ///
492    /// This method automatically removes any expired rate limits before returning,
493    /// ensuring that only active limits are included in the result.
494    pub fn current_limits(&self) -> Arc<RateLimits> {
495        let now = Instant::now();
496        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
497        Arc::make_mut(&mut inner).clean_expired(now);
498        Arc::clone(&inner)
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use smallvec::smallvec;
505
506    use super::*;
507    use crate::MetricNamespaceScoping;
508    use crate::quota::DataCategory;
509
510    #[test]
511    fn test_parse_retry_after() {
512        // positive float always rounds up to the next integer
513        let retry_after = "17.1".parse::<RetryAfter>().expect("parse RetryAfter");
514        assert_eq!(retry_after.remaining_seconds(), 18);
515        assert!(!retry_after.expired());
516        let retry_after = "17.7".parse::<RetryAfter>().expect("parse RetryAfter");
517        assert_eq!(retry_after.remaining_seconds(), 18);
518        assert!(!retry_after.expired());
519
520        // positive int
521        let retry_after = "17".parse::<RetryAfter>().expect("parse RetryAfter");
522        assert_eq!(retry_after.remaining_seconds(), 17);
523        assert!(!retry_after.expired());
524
525        // negative numbers are treated as zero
526        let retry_after = "-2".parse::<RetryAfter>().expect("parse RetryAfter");
527        assert_eq!(retry_after.remaining_seconds(), 0);
528        assert!(retry_after.expired());
529        let retry_after = "-inf".parse::<RetryAfter>().expect("parse RetryAfter");
530        assert_eq!(retry_after.remaining_seconds(), 0);
531        assert!(retry_after.expired());
532
533        // inf and NaN are valid input and treated as zero
534        let retry_after = "inf".parse::<RetryAfter>().expect("parse RetryAfter");
535        assert_eq!(retry_after.remaining_seconds(), 0);
536        assert!(retry_after.expired());
537        let retry_after = "NaN".parse::<RetryAfter>().expect("parse RetryAfter");
538        assert_eq!(retry_after.remaining_seconds(), 0);
539        assert!(retry_after.expired());
540
541        // large inputs that would overflow are treated as zero
542        let retry_after = "100000000000000000000"
543            .parse::<RetryAfter>()
544            .expect("parse RetryAfter");
545        assert_eq!(retry_after.remaining_seconds(), 0);
546        assert!(retry_after.expired());
547
548        // invalid strings cause parse error
549        "".parse::<RetryAfter>().expect_err("error RetryAfter");
550        "nope".parse::<RetryAfter>().expect_err("error RetryAfter");
551        " 2 ".parse::<RetryAfter>().expect_err("error RetryAfter");
552        "6 0".parse::<RetryAfter>().expect_err("error RetryAfter");
553    }
554
555    #[test]
556    fn test_rate_limit_matches_categories() {
557        let rate_limit = RateLimit {
558            categories: smallvec![DataCategory::Unknown, DataCategory::Error],
559            scope: RateLimitScope::Organization(OrganizationId::new(42)),
560            reason_code: None,
561            retry_after: RetryAfter::from_secs(1),
562            namespaces: smallvec![],
563        };
564
565        assert!(rate_limit.matches(ItemScoping {
566            category: DataCategory::Error,
567            scoping: Scoping {
568                organization_id: OrganizationId::new(42),
569                project_id: ProjectId::new(21),
570                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
571                key_id: None,
572            },
573            namespace: MetricNamespaceScoping::None,
574        }));
575
576        assert!(!rate_limit.matches(ItemScoping {
577            category: DataCategory::Transaction,
578            scoping: Scoping {
579                organization_id: OrganizationId::new(42),
580                project_id: ProjectId::new(21),
581                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
582                key_id: None,
583            },
584            namespace: MetricNamespaceScoping::None,
585        }));
586    }
587
588    #[test]
589    fn test_rate_limit_matches_organization() {
590        let rate_limit = RateLimit {
591            categories: DataCategories::new(),
592            scope: RateLimitScope::Organization(OrganizationId::new(42)),
593            reason_code: None,
594            retry_after: RetryAfter::from_secs(1),
595            namespaces: smallvec![],
596        };
597
598        assert!(rate_limit.matches(ItemScoping {
599            category: DataCategory::Error,
600            scoping: Scoping {
601                organization_id: OrganizationId::new(42),
602                project_id: ProjectId::new(21),
603                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
604                key_id: None,
605            },
606            namespace: MetricNamespaceScoping::None,
607        }));
608
609        assert!(!rate_limit.matches(ItemScoping {
610            category: DataCategory::Error,
611            scoping: Scoping {
612                organization_id: OrganizationId::new(0),
613                project_id: ProjectId::new(21),
614                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
615                key_id: None,
616            },
617            namespace: MetricNamespaceScoping::None,
618        }));
619    }
620
621    #[test]
622    fn test_rate_limit_matches_project() {
623        let rate_limit = RateLimit {
624            categories: DataCategories::new(),
625            scope: RateLimitScope::Project(ProjectId::new(21)),
626            reason_code: None,
627            retry_after: RetryAfter::from_secs(1),
628            namespaces: smallvec![],
629        };
630
631        assert!(rate_limit.matches(ItemScoping {
632            category: DataCategory::Error,
633            scoping: Scoping {
634                organization_id: OrganizationId::new(42),
635                project_id: ProjectId::new(21),
636                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
637                key_id: None,
638            },
639            namespace: MetricNamespaceScoping::None,
640        }));
641
642        assert!(!rate_limit.matches(ItemScoping {
643            category: DataCategory::Error,
644            scoping: Scoping {
645                organization_id: OrganizationId::new(42),
646                project_id: ProjectId::new(0),
647                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
648                key_id: None,
649            },
650            namespace: MetricNamespaceScoping::None,
651        }));
652    }
653
654    #[test]
655    fn test_rate_limit_matches_namespaces() {
656        let rate_limit = RateLimit {
657            categories: smallvec![],
658            scope: RateLimitScope::Organization(OrganizationId::new(42)),
659            reason_code: None,
660            retry_after: RetryAfter::from_secs(1),
661            namespaces: smallvec![MetricNamespace::Custom],
662        };
663
664        let scoping = Scoping {
665            organization_id: OrganizationId::new(42),
666            project_id: ProjectId::new(21),
667            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
668            key_id: None,
669        };
670
671        assert!(rate_limit.matches(ItemScoping {
672            category: DataCategory::MetricBucket,
673            scoping,
674            namespace: MetricNamespaceScoping::Some(MetricNamespace::Custom),
675        }));
676
677        assert!(!rate_limit.matches(ItemScoping {
678            category: DataCategory::MetricBucket,
679            scoping,
680            namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
681        }));
682
683        let general_rate_limit = RateLimit {
684            categories: smallvec![],
685            scope: RateLimitScope::Organization(OrganizationId::new(42)),
686            reason_code: None,
687            retry_after: RetryAfter::from_secs(1),
688            namespaces: smallvec![], // all namespaces
689        };
690
691        assert!(general_rate_limit.matches(ItemScoping {
692            category: DataCategory::MetricBucket,
693            scoping,
694            namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
695        }));
696
697        assert!(general_rate_limit.matches(ItemScoping {
698            category: DataCategory::MetricBucket,
699            scoping,
700            namespace: MetricNamespaceScoping::None,
701        }));
702    }
703
704    #[test]
705    fn test_rate_limit_matches_key() {
706        let rate_limit = RateLimit {
707            categories: DataCategories::new(),
708            scope: RateLimitScope::Key(
709                ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
710            ),
711            reason_code: None,
712            retry_after: RetryAfter::from_secs(1),
713            namespaces: smallvec![],
714        };
715
716        assert!(rate_limit.matches(ItemScoping {
717            category: DataCategory::Error,
718            scoping: Scoping {
719                organization_id: OrganizationId::new(42),
720                project_id: ProjectId::new(21),
721                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
722                key_id: None,
723            },
724            namespace: MetricNamespaceScoping::None,
725        }));
726
727        assert!(!rate_limit.matches(ItemScoping {
728            category: DataCategory::Error,
729            scoping: Scoping {
730                organization_id: OrganizationId::new(0),
731                project_id: ProjectId::new(21),
732                project_key: ProjectKey::parse("deadbeefdeadbeefdeadbeefdeadbeef").unwrap(),
733                key_id: None,
734            },
735            namespace: MetricNamespaceScoping::None,
736        }));
737    }
738
739    #[test]
740    fn test_rate_limits_add_replacement() {
741        let mut rate_limits = RateLimits::new();
742
743        rate_limits.add(RateLimit {
744            categories: smallvec![DataCategory::Default, DataCategory::Error],
745            scope: RateLimitScope::Organization(OrganizationId::new(42)),
746            reason_code: Some(ReasonCode::new("first")),
747            retry_after: RetryAfter::from_secs(1),
748            namespaces: smallvec![],
749        });
750
751        // longer rate limit shadows shorter one
752        rate_limits.add(RateLimit {
753            categories: smallvec![DataCategory::Error, DataCategory::Default],
754            scope: RateLimitScope::Organization(OrganizationId::new(42)),
755            reason_code: Some(ReasonCode::new("second")),
756            retry_after: RetryAfter::from_secs(10),
757            namespaces: smallvec![],
758        });
759
760        insta::assert_ron_snapshot!(rate_limits, @r###"
761        RateLimits(
762          limits: [
763            RateLimit(
764              categories: [
765                default,
766                error,
767              ],
768              scope: Organization(OrganizationId(42)),
769              reason_code: Some(ReasonCode("second")),
770              retry_after: RetryAfter(10),
771              namespaces: [],
772            ),
773          ],
774        )
775        "###);
776    }
777
778    #[test]
779    fn test_rate_limits_add_shadowing() {
780        let mut rate_limits = RateLimits::new();
781
782        rate_limits.add(RateLimit {
783            categories: smallvec![DataCategory::Default, DataCategory::Error],
784            scope: RateLimitScope::Organization(OrganizationId::new(42)),
785            reason_code: Some(ReasonCode::new("first")),
786            retry_after: RetryAfter::from_secs(10),
787            namespaces: smallvec![],
788        });
789
790        // shorter rate limit is shadowed by existing one
791        rate_limits.add(RateLimit {
792            categories: smallvec![DataCategory::Error, DataCategory::Default],
793            scope: RateLimitScope::Organization(OrganizationId::new(42)),
794            reason_code: Some(ReasonCode::new("second")),
795            retry_after: RetryAfter::from_secs(1),
796            namespaces: smallvec![],
797        });
798
799        insta::assert_ron_snapshot!(rate_limits, @r###"
800        RateLimits(
801          limits: [
802            RateLimit(
803              categories: [
804                default,
805                error,
806              ],
807              scope: Organization(OrganizationId(42)),
808              reason_code: Some(ReasonCode("first")),
809              retry_after: RetryAfter(10),
810              namespaces: [],
811            ),
812          ],
813        )
814        "###);
815    }
816
817    #[test]
818    fn test_rate_limits_add_buckets() {
819        let mut rate_limits = RateLimits::new();
820
821        rate_limits.add(RateLimit {
822            categories: smallvec![DataCategory::Error],
823            scope: RateLimitScope::Organization(OrganizationId::new(42)),
824            reason_code: None,
825            retry_after: RetryAfter::from_secs(1),
826            namespaces: smallvec![],
827        });
828
829        // Same scope but different categories
830        rate_limits.add(RateLimit {
831            categories: smallvec![DataCategory::Transaction],
832            scope: RateLimitScope::Organization(OrganizationId::new(42)),
833            reason_code: None,
834            retry_after: RetryAfter::from_secs(1),
835            namespaces: smallvec![],
836        });
837
838        // Same categories but different scope
839        rate_limits.add(RateLimit {
840            categories: smallvec![DataCategory::Error],
841            scope: RateLimitScope::Project(ProjectId::new(21)),
842            reason_code: None,
843            retry_after: RetryAfter::from_secs(1),
844            namespaces: smallvec![],
845        });
846
847        insta::assert_ron_snapshot!(rate_limits, @r###"
848        RateLimits(
849          limits: [
850            RateLimit(
851              categories: [
852                error,
853              ],
854              scope: Organization(OrganizationId(42)),
855              reason_code: None,
856              retry_after: RetryAfter(1),
857              namespaces: [],
858            ),
859            RateLimit(
860              categories: [
861                transaction,
862              ],
863              scope: Organization(OrganizationId(42)),
864              reason_code: None,
865              retry_after: RetryAfter(1),
866              namespaces: [],
867            ),
868            RateLimit(
869              categories: [
870                error,
871              ],
872              scope: Project(ProjectId(21)),
873              reason_code: None,
874              retry_after: RetryAfter(1),
875              namespaces: [],
876            ),
877          ],
878        )
879        "###);
880    }
881
882    /// Regression test that ensures namespaces are correctly added to rate limits.
883    #[test]
884    fn test_rate_limits_add_namespaces() {
885        let mut rate_limits = RateLimits::new();
886
887        rate_limits.add(RateLimit {
888            categories: smallvec![DataCategory::MetricBucket],
889            scope: RateLimitScope::Organization(OrganizationId::new(42)),
890            reason_code: None,
891            retry_after: RetryAfter::from_secs(1),
892            namespaces: smallvec![MetricNamespace::Custom],
893        });
894
895        // Same category but different namespaces
896        rate_limits.add(RateLimit {
897            categories: smallvec![DataCategory::MetricBucket],
898            scope: RateLimitScope::Organization(OrganizationId::new(42)),
899            reason_code: None,
900            retry_after: RetryAfter::from_secs(1),
901            namespaces: smallvec![MetricNamespace::Spans],
902        });
903
904        insta::assert_ron_snapshot!(rate_limits, @r###"
905        RateLimits(
906          limits: [
907            RateLimit(
908              categories: [
909                metric_bucket,
910              ],
911              scope: Organization(OrganizationId(42)),
912              reason_code: None,
913              retry_after: RetryAfter(1),
914              namespaces: [
915                "custom",
916              ],
917            ),
918            RateLimit(
919              categories: [
920                metric_bucket,
921              ],
922              scope: Organization(OrganizationId(42)),
923              reason_code: None,
924              retry_after: RetryAfter(1),
925              namespaces: [
926                "spans",
927              ],
928            ),
929          ],
930        )
931        "###);
932    }
933
934    #[test]
935    fn test_rate_limits_longest() {
936        let mut rate_limits = RateLimits::new();
937
938        rate_limits.add(RateLimit {
939            categories: smallvec![DataCategory::Error],
940            scope: RateLimitScope::Organization(OrganizationId::new(42)),
941            reason_code: Some(ReasonCode::new("first")),
942            retry_after: RetryAfter::from_secs(1),
943            namespaces: smallvec![],
944        });
945
946        // Distinct scope to prevent deduplication
947        rate_limits.add(RateLimit {
948            categories: smallvec![DataCategory::Transaction],
949            scope: RateLimitScope::Organization(OrganizationId::new(42)),
950            reason_code: Some(ReasonCode::new("second")),
951            retry_after: RetryAfter::from_secs(10),
952            namespaces: smallvec![],
953        });
954
955        let rate_limit = rate_limits.longest().unwrap();
956        insta::assert_ron_snapshot!(rate_limit, @r###"
957        RateLimit(
958          categories: [
959            transaction,
960          ],
961          scope: Organization(OrganizationId(42)),
962          reason_code: Some(ReasonCode("second")),
963          retry_after: RetryAfter(10),
964          namespaces: [],
965        )
966        "###);
967    }
968
969    #[test]
970    fn test_rate_limits_clean_expired() {
971        let mut rate_limits = RateLimits::new();
972
973        // Active error limit
974        rate_limits.add(RateLimit {
975            categories: smallvec![DataCategory::Error],
976            scope: RateLimitScope::Organization(OrganizationId::new(42)),
977            reason_code: None,
978            retry_after: RetryAfter::from_secs(1),
979            namespaces: smallvec![],
980        });
981
982        // Inactive error limit with distinct scope
983        rate_limits.add(RateLimit {
984            categories: smallvec![DataCategory::Error],
985            scope: RateLimitScope::Project(ProjectId::new(21)),
986            reason_code: None,
987            retry_after: RetryAfter::from_secs(0),
988            namespaces: smallvec![],
989        });
990
991        // Sanity check before running `clean_expired`
992        assert_eq!(rate_limits.iter().count(), 2);
993
994        rate_limits.clean_expired(Instant::now());
995
996        // Check that the expired limit has been removed
997        insta::assert_ron_snapshot!(rate_limits, @r###"
998        RateLimits(
999          limits: [
1000            RateLimit(
1001              categories: [
1002                error,
1003              ],
1004              scope: Organization(OrganizationId(42)),
1005              reason_code: None,
1006              retry_after: RetryAfter(1),
1007              namespaces: [],
1008            ),
1009          ],
1010        )
1011        "###);
1012    }
1013
1014    #[test]
1015    fn test_rate_limits_check() {
1016        let mut rate_limits = RateLimits::new();
1017
1018        // Active error limit
1019        rate_limits.add(RateLimit {
1020            categories: smallvec![DataCategory::Error],
1021            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1022            reason_code: None,
1023            retry_after: RetryAfter::from_secs(1),
1024            namespaces: smallvec![],
1025        });
1026
1027        // Active transaction limit
1028        rate_limits.add(RateLimit {
1029            categories: smallvec![DataCategory::Transaction],
1030            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1031            reason_code: None,
1032            retry_after: RetryAfter::from_secs(1),
1033            namespaces: smallvec![],
1034        });
1035
1036        let applied_limits = rate_limits.check(ItemScoping {
1037            category: DataCategory::Error,
1038            scoping: Scoping {
1039                organization_id: OrganizationId::new(42),
1040                project_id: ProjectId::new(21),
1041                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1042                key_id: None,
1043            },
1044            namespace: MetricNamespaceScoping::None,
1045        });
1046
1047        // Check that the error limit is applied
1048        insta::assert_ron_snapshot!(applied_limits, @r###"
1049        RateLimits(
1050          limits: [
1051            RateLimit(
1052              categories: [
1053                error,
1054              ],
1055              scope: Organization(OrganizationId(42)),
1056              reason_code: None,
1057              retry_after: RetryAfter(1),
1058              namespaces: [],
1059            ),
1060          ],
1061        )
1062        "###);
1063    }
1064
1065    #[test]
1066    fn test_rate_limits_check_quotas() {
1067        let mut rate_limits = RateLimits::new();
1068
1069        // Active error limit
1070        rate_limits.add(RateLimit {
1071            categories: smallvec![DataCategory::Error],
1072            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1073            reason_code: None,
1074            retry_after: RetryAfter::from_secs(1),
1075            namespaces: smallvec![],
1076        });
1077
1078        // Active transaction limit
1079        rate_limits.add(RateLimit {
1080            categories: smallvec![DataCategory::Transaction],
1081            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1082            reason_code: None,
1083            retry_after: RetryAfter::from_secs(1),
1084            namespaces: smallvec![],
1085        });
1086
1087        let item_scoping = ItemScoping {
1088            category: DataCategory::Error,
1089            scoping: Scoping {
1090                organization_id: OrganizationId::new(42),
1091                project_id: ProjectId::new(21),
1092                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1093                key_id: None,
1094            },
1095            namespace: MetricNamespaceScoping::None,
1096        };
1097
1098        let quotas = &[Quota {
1099            id: None,
1100            categories: smallvec![DataCategory::Error],
1101            scope: QuotaScope::Organization,
1102            scope_id: Some("42".to_owned()),
1103            limit: Some(0),
1104            window: None,
1105            reason_code: Some(ReasonCode::new("zero")),
1106            namespace: None,
1107        }];
1108
1109        let applied_limits = rate_limits.check_with_quotas(quotas, item_scoping);
1110
1111        insta::assert_ron_snapshot!(applied_limits, @r###"
1112        RateLimits(
1113          limits: [
1114            RateLimit(
1115              categories: [
1116                error,
1117              ],
1118              scope: Organization(OrganizationId(42)),
1119              reason_code: Some(ReasonCode("zero")),
1120              retry_after: RetryAfter(60),
1121              namespaces: [],
1122            ),
1123          ],
1124        )
1125        "###);
1126    }
1127
1128    #[test]
1129    fn test_rate_limits_merge() {
1130        let mut rate_limits1 = RateLimits::new();
1131        let mut rate_limits2 = RateLimits::new();
1132
1133        rate_limits1.add(RateLimit {
1134            categories: smallvec![DataCategory::Error],
1135            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1136            reason_code: Some(ReasonCode::new("first")),
1137            retry_after: RetryAfter::from_secs(1),
1138            namespaces: smallvec![],
1139        });
1140
1141        rate_limits1.add(RateLimit {
1142            categories: smallvec![DataCategory::TransactionIndexed],
1143            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1144            reason_code: None,
1145            retry_after: RetryAfter::from_secs(1),
1146            namespaces: smallvec![],
1147        });
1148
1149        rate_limits2.add(RateLimit {
1150            categories: smallvec![DataCategory::Error],
1151            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1152            reason_code: Some(ReasonCode::new("second")),
1153            retry_after: RetryAfter::from_secs(10),
1154            namespaces: smallvec![],
1155        });
1156
1157        rate_limits1.merge(rate_limits2);
1158
1159        insta::assert_ron_snapshot!(rate_limits1, @r###"
1160        RateLimits(
1161          limits: [
1162            RateLimit(
1163              categories: [
1164                error,
1165              ],
1166              scope: Organization(OrganizationId(42)),
1167              reason_code: Some(ReasonCode("second")),
1168              retry_after: RetryAfter(10),
1169              namespaces: [],
1170            ),
1171            RateLimit(
1172              categories: [
1173                transaction_indexed,
1174              ],
1175              scope: Organization(OrganizationId(42)),
1176              reason_code: None,
1177              retry_after: RetryAfter(1),
1178              namespaces: [],
1179            ),
1180          ],
1181        )
1182        "###);
1183    }
1184
1185    #[test]
1186    fn test_cached_rate_limits_expired() {
1187        let cached = CachedRateLimits::new();
1188
1189        // Active error limit
1190        cached.add(RateLimit {
1191            categories: smallvec![DataCategory::Error],
1192            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1193            reason_code: None,
1194            retry_after: RetryAfter::from_secs(1),
1195            namespaces: smallvec![],
1196        });
1197
1198        // Inactive error limit with distinct scope
1199        cached.add(RateLimit {
1200            categories: smallvec![DataCategory::Error],
1201            scope: RateLimitScope::Project(ProjectId::new(21)),
1202            reason_code: None,
1203            retry_after: RetryAfter::from_secs(0),
1204            namespaces: smallvec![],
1205        });
1206
1207        let rate_limits = cached.current_limits();
1208
1209        insta::assert_ron_snapshot!(rate_limits, @r###"
1210        RateLimits(
1211          limits: [
1212            RateLimit(
1213              categories: [
1214                error,
1215              ],
1216              scope: Organization(OrganizationId(42)),
1217              reason_code: None,
1218              retry_after: RetryAfter(1),
1219              namespaces: [],
1220            ),
1221          ],
1222        )
1223        "###);
1224    }
1225}