relay_quotas/
rate_limit.rs

1use std::fmt;
2use std::str::FromStr;
3use std::sync::{Arc, Mutex, PoisonError};
4use std::time::{Duration, Instant};
5
6use relay_base_schema::data_category::DataCategory;
7use relay_base_schema::metrics::MetricNamespace;
8use relay_base_schema::organization::OrganizationId;
9use relay_base_schema::project::{ProjectId, ProjectKey};
10use smallvec::SmallVec;
11
12use crate::REJECT_ALL_SECS;
13use crate::quota::{DataCategories, ItemScoping, Quota, QuotaScope, ReasonCode, Scoping};
14
15/// A monotonic expiration marker for rate limits.
16///
17/// [`RetryAfter`] represents a point in time when a rate limit expires. It allows checking
18/// whether the rate limit is still active or has expired, and calculating the remaining time
19/// until expiration.
20#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
21pub struct RetryAfter {
22    when: Instant,
23}
24
25impl RetryAfter {
26    /// Creates a new [`RetryAfter`] instance that expires after the specified number of seconds.
27    #[inline]
28    pub fn from_secs(seconds: u64) -> Self {
29        let now = Instant::now();
30        let when = now.checked_add(Duration::from_secs(seconds)).unwrap_or(now);
31        Self { when }
32    }
33
34    /// Returns the remaining duration until the rate limit expires using the specified instant
35    /// as the reference point.
36    ///
37    /// If the rate limit has already expired at the given instant, returns `None`.
38    #[inline]
39    pub fn remaining_at(self, at: Instant) -> Option<Duration> {
40        if at >= self.when {
41            None
42        } else {
43            Some(self.when - at)
44        }
45    }
46
47    /// Returns the remaining duration until the rate limit expires.
48    ///
49    /// This uses the current instant as the reference point. If the rate limit has already
50    /// expired, returns `None`.
51    #[inline]
52    pub fn remaining(self) -> Option<Duration> {
53        self.remaining_at(Instant::now())
54    }
55
56    /// Returns the remaining seconds until the rate limit expires using the specified instant
57    /// as the reference point.
58    ///
59    /// This method rounds up to the next second to ensure that rate limits are strictly enforced.
60    /// If the rate limit has already expired, returns `0`.
61    #[inline]
62    pub fn remaining_seconds_at(self, at: Instant) -> u64 {
63        match self.remaining_at(at) {
64            // Compensate for the missing subsec part by adding 1s
65            Some(duration) if duration.subsec_nanos() == 0 => duration.as_secs(),
66            Some(duration) => duration.as_secs() + 1,
67            None => 0,
68        }
69    }
70
71    /// Returns the remaining seconds until the rate limit expires.
72    ///
73    /// This uses the current instant as the reference point. If the rate limit
74    /// has already expired, returns `0`.
75    #[inline]
76    pub fn remaining_seconds(self) -> u64 {
77        self.remaining_seconds_at(Instant::now())
78    }
79
80    /// Returns whether this rate limit has expired at the specified instant.
81    #[inline]
82    pub fn expired_at(self, at: Instant) -> bool {
83        self.remaining_at(at).is_none()
84    }
85
86    /// Returns whether this rate limit has expired at the current instant.
87    #[inline]
88    pub fn expired(self) -> bool {
89        self.remaining_at(Instant::now()).is_none()
90    }
91}
92
93impl fmt::Debug for RetryAfter {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        match self.remaining_seconds() {
96            0 => write!(f, "RetryAfter(expired)"),
97            remaining => write!(f, "RetryAfter({remaining}s)"),
98        }
99    }
100}
101
102#[cfg(test)]
103impl serde::Serialize for RetryAfter {
104    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
105    where
106        S: serde::Serializer,
107    {
108        use serde::ser::SerializeTupleStruct;
109        let mut tup = serializer.serialize_tuple_struct("RetryAfter", 1)?;
110        tup.serialize_field(&self.remaining_seconds())?;
111        tup.end()
112    }
113}
114
115/// Error that occurs when parsing a [`RetryAfter`] from a string fails.
116#[derive(Debug)]
117pub enum InvalidRetryAfter {
118    /// The supplied delay in seconds was not valid.
119    InvalidDelay(std::num::ParseFloatError),
120}
121
122impl FromStr for RetryAfter {
123    type Err = InvalidRetryAfter;
124
125    fn from_str(s: &str) -> Result<Self, Self::Err> {
126        let float = s.parse::<f64>().map_err(InvalidRetryAfter::InvalidDelay)?;
127        let seconds = float.max(0.0).ceil() as u64;
128        Ok(RetryAfter::from_secs(seconds))
129    }
130}
131
132/// The scope that a rate limit applies to.
133///
134/// Unlike [`QuotaScope`], which only declares the class of the scope, this enum carries
135/// the specific identifiers of the individual scopes that a rate limit applies to.
136///
137/// Rate limits can be applied at different levels of granularity, from global (affecting all data)
138/// down to a specific project key.
139#[derive(Clone, Debug, Eq, PartialEq)]
140#[cfg_attr(test, derive(serde::Serialize))]
141pub enum RateLimitScope {
142    /// Global scope.
143    Global,
144    /// An organization with identifier.
145    Organization(OrganizationId),
146    /// A project with identifier.
147    Project(ProjectId),
148    /// A DSN public key.
149    Key(ProjectKey),
150}
151
152impl RateLimitScope {
153    /// Creates a rate limiting scope from the given item scoping for a specific quota.
154    ///
155    /// This extracts the appropriate scope identifier based on the quota's scope type.
156    /// For unknown scopes, it assumes the most specific scope (Key).
157    pub fn for_quota(scoping: Scoping, scope: QuotaScope) -> Self {
158        match scope {
159            QuotaScope::Global => Self::Global,
160            QuotaScope::Organization => Self::Organization(scoping.organization_id),
161            QuotaScope::Project => Self::Project(scoping.project_id),
162            QuotaScope::Key => Self::Key(scoping.project_key),
163            // For unknown scopes, assume the most specific scope:
164            QuotaScope::Unknown => Self::Key(scoping.project_key),
165        }
166    }
167
168    /// Returns the canonical name of this scope.
169    ///
170    /// This corresponds to the name of the corresponding [`QuotaScope`].
171    pub fn name(&self) -> &'static str {
172        match *self {
173            Self::Global => QuotaScope::Global.name(),
174            Self::Key(_) => QuotaScope::Key.name(),
175            Self::Project(_) => QuotaScope::Project.name(),
176            Self::Organization(_) => QuotaScope::Organization.name(),
177        }
178    }
179}
180
181/// An active rate limit that restricts data ingestion.
182///
183/// A rate limit defines restrictions for specific data categories within a particular scope.
184/// It includes an expiration time after which the limit is no longer enforced.
185///
186/// Rate limits can be created from [`Quota`]s or directly constructed with the needed parameters.
187#[derive(Clone, Debug, PartialEq)]
188#[cfg_attr(test, derive(serde::Serialize))]
189pub struct RateLimit {
190    /// A set of data categories that this quota applies to. If empty, this rate limit
191    /// applies to all data categories.
192    pub categories: DataCategories,
193
194    /// The scope of this rate limit.
195    pub scope: RateLimitScope,
196
197    /// A machine-readable reason indicating which quota caused this rate limit.
198    pub reason_code: Option<ReasonCode>,
199
200    /// A marker when this rate limit expires.
201    pub retry_after: RetryAfter,
202
203    /// The metric namespace of this rate limit.
204    ///
205    /// Only relevant for data categories of type metric bucket. If empty,
206    /// this rate limit applies to metrics of all namespaces.
207    pub namespaces: SmallVec<[MetricNamespace; 1]>,
208}
209
210impl RateLimit {
211    /// Creates a new rate limit from the given [`Quota`].
212    ///
213    /// This builds a rate limit with the appropriate scope derived from the quota and scoping
214    /// information. The categories and other properties are copied from the quota.
215    pub fn from_quota(quota: &Quota, scoping: Scoping, retry_after: RetryAfter) -> Self {
216        Self {
217            categories: quota.categories.clone(),
218            scope: RateLimitScope::for_quota(scoping, quota.scope),
219            reason_code: quota.reason_code.clone(),
220            retry_after,
221            namespaces: quota.namespace.into_iter().collect(),
222        }
223    }
224
225    /// Checks whether this rate limit applies to the given item.
226    ///
227    /// A rate limit applies if its scope matches the item's scope, and the item's
228    /// category and namespace match those of the rate limit.
229    pub fn matches(&self, scoping: ItemScoping) -> bool {
230        self.matches_scope(scoping)
231            && scoping.matches_categories(&self.categories)
232            && scoping.matches_namespaces(&self.namespaces)
233    }
234
235    /// Returns `true` if the rate limiting scope matches the given item.
236    fn matches_scope(&self, scoping: ItemScoping) -> bool {
237        match self.scope {
238            RateLimitScope::Global => true,
239            RateLimitScope::Organization(org_id) => scoping.organization_id == org_id,
240            RateLimitScope::Project(project_id) => scoping.project_id == project_id,
241            RateLimitScope::Key(key) => scoping.project_key == key,
242        }
243    }
244}
245
246/// A collection of scoped rate limits.
247///
248/// [`RateLimits`] manages a set of active rate limits that can be checked against
249/// incoming data.
250///
251/// The collection can be empty, indicated by [`is_ok`](Self::is_ok), meaning no rate limits
252/// are currently active.
253#[derive(Clone, Debug, Default)]
254#[cfg_attr(test, derive(serde::Serialize))]
255pub struct RateLimits {
256    limits: Vec<RateLimit>,
257}
258
259impl RateLimits {
260    /// Creates an empty [`RateLimits`] instance.
261    pub fn new() -> Self {
262        Self::default()
263    }
264
265    /// Adds a limit to this collection.
266    ///
267    /// If a rate limit with an overlapping scope already exists, the `retry_after` count is merged
268    /// with the existing limit. Otherwise, the new rate limit is added.
269    pub fn add(&mut self, mut limit: RateLimit) {
270        // Categories are logically a set, but not implemented as such.
271        limit.categories.sort();
272
273        let limit_opt = self.limits.iter_mut().find(|l| {
274            let RateLimit {
275                categories,
276                scope,
277                reason_code: _,
278                retry_after: _,
279                namespaces: namespace,
280            } = &limit;
281
282            *categories == l.categories && *scope == l.scope && *namespace == l.namespaces
283        });
284
285        match limit_opt {
286            None => self.limits.push(limit),
287            Some(existing) if existing.retry_after < limit.retry_after => *existing = limit,
288            Some(_) => (), // keep existing, longer limit
289        }
290    }
291
292    /// Merges all limits from another [`RateLimits`] instance into this one.
293    ///
294    /// This keeps all existing rate limits, adds new ones, and updates any existing ones
295    /// with a later expiration time. The resulting collection contains the maximum
296    /// constraints from both instances.
297    pub fn merge(&mut self, limits: Self) {
298        for limit in limits {
299            self.add(limit);
300        }
301    }
302
303    /// Merges all limits from another [`RateLimits`] with this one.
304    ///
305    /// See also: [`Self::merge`].
306    pub fn merge_with(mut self, other: Self) -> Self {
307        self.merge(other);
308        self
309    }
310
311    /// Returns `true` if this instance contains no active limits.
312    ///
313    /// This is the opposite of [`is_limited`](Self::is_limited).
314    pub fn is_ok(&self) -> bool {
315        !self.is_limited()
316    }
317
318    /// Returns `true` if this instance contains any active rate limits.
319    ///
320    /// A rate limit is considered active if it has not yet expired.
321    pub fn is_limited(&self) -> bool {
322        let now = Instant::now();
323        self.iter().any(|limit| !limit.retry_after.expired_at(now))
324    }
325
326    /// Returns `true` if this is instance contains any active rate limits
327    /// for the specified categories.
328    ///
329    /// A rate limit is considered active if it has not yet expired.
330    pub fn is_any_limited(&self, scopings: &[ItemScoping]) -> bool {
331        self.is_any_limited_with_quotas(&[], scopings)
332    }
333
334    /// Returns `true` if this is instance contains any active rate limits
335    /// for the specified categories.
336    ///
337    /// This is similar to [`Self::is_limited`], but additionally checks for quotas with a static
338    /// limit of `0`, which reject items even if there is no active rate limit in this instance.
339    ///
340    /// A rate limit is considered active if it has not yet expired.
341    pub fn is_any_limited_with_quotas<'a>(
342        &self,
343        quotas: impl IntoIterator<Item = &'a Quota>,
344        scopings: &[ItemScoping],
345    ) -> bool {
346        for quota in quotas {
347            for scoping in scopings {
348                if quota.limit == Some(0) && quota.matches(*scoping) {
349                    return true;
350                }
351            }
352        }
353
354        let now = Instant::now();
355        for scoping in scopings {
356            for limit in &self.limits {
357                if limit.matches(*scoping) && !limit.retry_after.expired_at(now) {
358                    return true;
359                }
360            }
361        }
362
363        false
364    }
365
366    /// Removes expired rate limits from this instance.
367    ///
368    /// This is useful for cleaning up rate limits that are no longer relevant,
369    /// reducing memory usage and improving performance of subsequent operations.
370    pub fn clean_expired(&mut self, now: Instant) {
371        self.limits
372            .retain(|limit| !limit.retry_after.expired_at(now));
373    }
374
375    /// Checks whether any rate limits apply to the given scoping.
376    ///
377    /// Returns a new [`RateLimits`] instance containing only the rate limits that match
378    /// the provided [`ItemScoping`]. If no limits match, the returned instance will be empty
379    /// and [`is_ok`](Self::is_ok) will return `true`.
380    pub fn check(&self, scoping: ItemScoping) -> Self {
381        self.check_with_quotas(&[], scoping)
382    }
383
384    /// Checks whether any rate limits or static quotas apply to the given scoping.
385    ///
386    /// This is similar to [`check`](Self::check), but additionally checks for quotas with a static
387    /// limit of `0`, which reject items even if there is no active rate limit in this instance.
388    ///
389    /// Returns a new [`RateLimits`] instance containing the rate limits that match the provided
390    /// [`ItemScoping`]. If no limits or quotas match, the returned instance will be empty.
391    pub fn check_with_quotas<'a>(
392        &self,
393        quotas: impl IntoIterator<Item = &'a Quota>,
394        scoping: ItemScoping,
395    ) -> Self {
396        let mut applied_limits = Self::new();
397
398        for quota in quotas {
399            if quota.limit == Some(0) && quota.matches(scoping) {
400                let retry_after = RetryAfter::from_secs(REJECT_ALL_SECS);
401                applied_limits.add(RateLimit::from_quota(quota, *scoping, retry_after));
402            }
403        }
404
405        for limit in &self.limits {
406            if limit.matches(scoping) {
407                applied_limits.add(limit.clone());
408            }
409        }
410
411        applied_limits
412    }
413
414    /// Returns an iterator over all rate limits in this collection.
415    pub fn iter(&self) -> RateLimitsIter<'_> {
416        RateLimitsIter {
417            iter: self.limits.iter(),
418        }
419    }
420
421    /// Returns the rate limit with the latest expiration time.
422    ///
423    /// If multiple rate limits have the same expiration time, any of them may be returned.
424    /// If the collection is empty, returns `None`.
425    pub fn longest(&self) -> Option<&RateLimit> {
426        self.iter().max_by_key(|limit| limit.retry_after)
427    }
428
429    /// Returns `true` if there are no rate limits in this collection.
430    ///
431    /// Note that an empty collection is not the same as having no active limits.
432    /// Use [`is_ok`](Self::is_ok) to check if there are no active limits.
433    pub fn is_empty(&self) -> bool {
434        self.limits.is_empty()
435    }
436}
437
438/// An iterator over rate limit references.
439///
440/// This struct is created by the [`iter`](RateLimits::iter) method on [`RateLimits`].
441/// It yields shared references to the rate limits in the collection.
442pub struct RateLimitsIter<'a> {
443    iter: std::slice::Iter<'a, RateLimit>,
444}
445
446impl<'a> Iterator for RateLimitsIter<'a> {
447    type Item = &'a RateLimit;
448
449    fn next(&mut self) -> Option<Self::Item> {
450        self.iter.next()
451    }
452}
453
454impl IntoIterator for RateLimits {
455    type IntoIter = RateLimitsIntoIter;
456    type Item = RateLimit;
457
458    fn into_iter(self) -> Self::IntoIter {
459        RateLimitsIntoIter {
460            iter: self.limits.into_iter(),
461        }
462    }
463}
464
465/// An iterator that consumes a [`RateLimits`] collection.
466///
467/// This struct is created by the `into_iter` method on [`RateLimits`], provided by the
468/// [`IntoIterator`] trait. It yields owned rate limits by value.
469pub struct RateLimitsIntoIter {
470    iter: std::vec::IntoIter<RateLimit>,
471}
472
473impl Iterator for RateLimitsIntoIter {
474    type Item = RateLimit;
475
476    fn next(&mut self) -> Option<Self::Item> {
477        self.iter.next()
478    }
479}
480
481impl<'a> IntoIterator for &'a RateLimits {
482    type IntoIter = RateLimitsIter<'a>;
483    type Item = &'a RateLimit;
484
485    fn into_iter(self) -> Self::IntoIter {
486        self.iter()
487    }
488}
489
490/// A thread-safe cache of rate limits with automatic expiration handling.
491///
492/// [`CachedRateLimits`] wraps a [`RateLimits`] collection with a mutex to allow safe
493/// concurrent access from multiple threads. It automatically removes expired rate limits
494/// when retrieving the current limits.
495///
496/// This is useful for maintaining a shared set of rate limits across multiple
497/// processing threads or tasks.
498#[derive(Debug, Default)]
499pub struct CachedRateLimits(Mutex<Arc<RateLimits>>);
500
501impl CachedRateLimits {
502    /// Creates a new, empty instance without any rate limits.
503    pub fn new() -> Self {
504        Self::default()
505    }
506
507    /// Adds a rate limit to this collection.
508    ///
509    /// This is a thread-safe wrapper around [`RateLimits::add`].
510    pub fn add(&self, limit: RateLimit) {
511        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
512        let current = Arc::make_mut(&mut inner);
513        current.add(limit);
514    }
515
516    /// Merges rate limits from another collection into this one.
517    ///
518    /// This is a thread-safe wrapper around [`RateLimits::merge`].
519    pub fn merge(&self, limits: RateLimits) {
520        if limits.is_empty() {
521            return;
522        }
523
524        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
525        let current = Arc::make_mut(&mut inner);
526        for mut limit in limits {
527            // To spice it up, we do have some special casing here for 'inherited categories',
528            // e.g. spans and transactions.
529            //
530            // The tldr; is, as transactions are just containers for spans,
531            // we can enforce span limits on transactions but also vice versa.
532            //
533            // So this is largely an enforcement problem, but since Relay propagates
534            // rate limits to clients, we clone the limits with the inherited category.
535            // This ensures old SDKs rate limit correctly, but also it simplifies client
536            // implementations. Only Relay needs to make this decision.
537            for i in 0..limit.categories.len() {
538                let Some(category) = limit.categories.get(i) else {
539                    debug_assert!(false, "logical error");
540                    break;
541                };
542
543                for inherited in inherited_categories(category) {
544                    if !limit.categories.contains(inherited) {
545                        limit.categories.push(*inherited);
546                    }
547                }
548            }
549
550            current.add(limit);
551        }
552    }
553
554    /// Returns a reference to the current rate limits.
555    ///
556    /// This method automatically removes any expired rate limits before returning,
557    /// ensuring that only active limits are included in the result.
558    pub fn current_limits(&self) -> Arc<RateLimits> {
559        let now = Instant::now();
560        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
561        Arc::make_mut(&mut inner).clean_expired(now);
562        Arc::clone(&inner)
563    }
564}
565
566/// Returns inherited rate limit categories for the passed category.
567///
568/// When a rate limit for a category can also be enforced in a different category,
569/// then it's an inherited category.
570///
571/// For example, a transaction rate limit can also be applied to spans and vice versa.
572///
573/// For a detailed explanation on span/transaction enforcement see:
574/// <https://develop.sentry.dev/ingestion/relay/transaction-span-ratelimits/>.
575fn inherited_categories(category: &DataCategory) -> &'static [DataCategory] {
576    match category {
577        DataCategory::Transaction => &[DataCategory::Span],
578        DataCategory::Span => &[DataCategory::Transaction],
579        _ => &[],
580    }
581}
582
583#[cfg(test)]
584mod tests {
585    use smallvec::smallvec;
586
587    use super::*;
588    use crate::MetricNamespaceScoping;
589    use crate::quota::DataCategory;
590
591    #[test]
592    fn test_parse_retry_after() {
593        // positive float always rounds up to the next integer
594        let retry_after = "17.1".parse::<RetryAfter>().expect("parse RetryAfter");
595        assert_eq!(retry_after.remaining_seconds(), 18);
596        assert!(!retry_after.expired());
597        let retry_after = "17.7".parse::<RetryAfter>().expect("parse RetryAfter");
598        assert_eq!(retry_after.remaining_seconds(), 18);
599        assert!(!retry_after.expired());
600
601        // positive int
602        let retry_after = "17".parse::<RetryAfter>().expect("parse RetryAfter");
603        assert_eq!(retry_after.remaining_seconds(), 17);
604        assert!(!retry_after.expired());
605
606        // negative numbers are treated as zero
607        let retry_after = "-2".parse::<RetryAfter>().expect("parse RetryAfter");
608        assert_eq!(retry_after.remaining_seconds(), 0);
609        assert!(retry_after.expired());
610        let retry_after = "-inf".parse::<RetryAfter>().expect("parse RetryAfter");
611        assert_eq!(retry_after.remaining_seconds(), 0);
612        assert!(retry_after.expired());
613
614        // inf and NaN are valid input and treated as zero
615        let retry_after = "inf".parse::<RetryAfter>().expect("parse RetryAfter");
616        assert_eq!(retry_after.remaining_seconds(), 0);
617        assert!(retry_after.expired());
618        let retry_after = "NaN".parse::<RetryAfter>().expect("parse RetryAfter");
619        assert_eq!(retry_after.remaining_seconds(), 0);
620        assert!(retry_after.expired());
621
622        // large inputs that would overflow are treated as zero
623        let retry_after = "100000000000000000000"
624            .parse::<RetryAfter>()
625            .expect("parse RetryAfter");
626        assert_eq!(retry_after.remaining_seconds(), 0);
627        assert!(retry_after.expired());
628
629        // invalid strings cause parse error
630        "".parse::<RetryAfter>().expect_err("error RetryAfter");
631        "nope".parse::<RetryAfter>().expect_err("error RetryAfter");
632        " 2 ".parse::<RetryAfter>().expect_err("error RetryAfter");
633        "6 0".parse::<RetryAfter>().expect_err("error RetryAfter");
634    }
635
636    #[test]
637    fn test_rate_limit_matches_categories() {
638        let rate_limit = RateLimit {
639            categories: smallvec![DataCategory::Unknown, DataCategory::Error],
640            scope: RateLimitScope::Organization(OrganizationId::new(42)),
641            reason_code: None,
642            retry_after: RetryAfter::from_secs(1),
643            namespaces: smallvec![],
644        };
645
646        assert!(rate_limit.matches(ItemScoping {
647            category: DataCategory::Error,
648            scoping: Scoping {
649                organization_id: OrganizationId::new(42),
650                project_id: ProjectId::new(21),
651                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
652                key_id: None,
653            },
654            namespace: MetricNamespaceScoping::None,
655        }));
656
657        assert!(!rate_limit.matches(ItemScoping {
658            category: DataCategory::Transaction,
659            scoping: Scoping {
660                organization_id: OrganizationId::new(42),
661                project_id: ProjectId::new(21),
662                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
663                key_id: None,
664            },
665            namespace: MetricNamespaceScoping::None,
666        }));
667    }
668
669    #[test]
670    fn test_rate_limit_matches_organization() {
671        let rate_limit = RateLimit {
672            categories: DataCategories::new(),
673            scope: RateLimitScope::Organization(OrganizationId::new(42)),
674            reason_code: None,
675            retry_after: RetryAfter::from_secs(1),
676            namespaces: smallvec![],
677        };
678
679        assert!(rate_limit.matches(ItemScoping {
680            category: DataCategory::Error,
681            scoping: Scoping {
682                organization_id: OrganizationId::new(42),
683                project_id: ProjectId::new(21),
684                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
685                key_id: None,
686            },
687            namespace: MetricNamespaceScoping::None,
688        }));
689
690        assert!(!rate_limit.matches(ItemScoping {
691            category: DataCategory::Error,
692            scoping: Scoping {
693                organization_id: OrganizationId::new(0),
694                project_id: ProjectId::new(21),
695                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
696                key_id: None,
697            },
698            namespace: MetricNamespaceScoping::None,
699        }));
700    }
701
702    #[test]
703    fn test_rate_limit_matches_project() {
704        let rate_limit = RateLimit {
705            categories: DataCategories::new(),
706            scope: RateLimitScope::Project(ProjectId::new(21)),
707            reason_code: None,
708            retry_after: RetryAfter::from_secs(1),
709            namespaces: smallvec![],
710        };
711
712        assert!(rate_limit.matches(ItemScoping {
713            category: DataCategory::Error,
714            scoping: Scoping {
715                organization_id: OrganizationId::new(42),
716                project_id: ProjectId::new(21),
717                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
718                key_id: None,
719            },
720            namespace: MetricNamespaceScoping::None,
721        }));
722
723        assert!(!rate_limit.matches(ItemScoping {
724            category: DataCategory::Error,
725            scoping: Scoping {
726                organization_id: OrganizationId::new(42),
727                project_id: ProjectId::new(0),
728                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
729                key_id: None,
730            },
731            namespace: MetricNamespaceScoping::None,
732        }));
733    }
734
735    #[test]
736    fn test_rate_limit_matches_namespaces() {
737        let rate_limit = RateLimit {
738            categories: smallvec![],
739            scope: RateLimitScope::Organization(OrganizationId::new(42)),
740            reason_code: None,
741            retry_after: RetryAfter::from_secs(1),
742            namespaces: smallvec![MetricNamespace::Custom],
743        };
744
745        let scoping = Scoping {
746            organization_id: OrganizationId::new(42),
747            project_id: ProjectId::new(21),
748            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
749            key_id: None,
750        };
751
752        assert!(rate_limit.matches(ItemScoping {
753            category: DataCategory::MetricBucket,
754            scoping,
755            namespace: MetricNamespaceScoping::Some(MetricNamespace::Custom),
756        }));
757
758        assert!(!rate_limit.matches(ItemScoping {
759            category: DataCategory::MetricBucket,
760            scoping,
761            namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
762        }));
763
764        let general_rate_limit = RateLimit {
765            categories: smallvec![],
766            scope: RateLimitScope::Organization(OrganizationId::new(42)),
767            reason_code: None,
768            retry_after: RetryAfter::from_secs(1),
769            namespaces: smallvec![], // all namespaces
770        };
771
772        assert!(general_rate_limit.matches(ItemScoping {
773            category: DataCategory::MetricBucket,
774            scoping,
775            namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
776        }));
777
778        assert!(general_rate_limit.matches(ItemScoping {
779            category: DataCategory::MetricBucket,
780            scoping,
781            namespace: MetricNamespaceScoping::None,
782        }));
783    }
784
785    #[test]
786    fn test_rate_limit_matches_key() {
787        let rate_limit = RateLimit {
788            categories: DataCategories::new(),
789            scope: RateLimitScope::Key(
790                ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
791            ),
792            reason_code: None,
793            retry_after: RetryAfter::from_secs(1),
794            namespaces: smallvec![],
795        };
796
797        assert!(rate_limit.matches(ItemScoping {
798            category: DataCategory::Error,
799            scoping: Scoping {
800                organization_id: OrganizationId::new(42),
801                project_id: ProjectId::new(21),
802                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
803                key_id: None,
804            },
805            namespace: MetricNamespaceScoping::None,
806        }));
807
808        assert!(!rate_limit.matches(ItemScoping {
809            category: DataCategory::Error,
810            scoping: Scoping {
811                organization_id: OrganizationId::new(0),
812                project_id: ProjectId::new(21),
813                project_key: ProjectKey::parse("deadbeefdeadbeefdeadbeefdeadbeef").unwrap(),
814                key_id: None,
815            },
816            namespace: MetricNamespaceScoping::None,
817        }));
818    }
819
820    #[test]
821    fn test_rate_limits_add_replacement() {
822        let mut rate_limits = RateLimits::new();
823
824        rate_limits.add(RateLimit {
825            categories: smallvec![DataCategory::Default, DataCategory::Error],
826            scope: RateLimitScope::Organization(OrganizationId::new(42)),
827            reason_code: Some(ReasonCode::new("first")),
828            retry_after: RetryAfter::from_secs(1),
829            namespaces: smallvec![],
830        });
831
832        // longer rate limit shadows shorter one
833        rate_limits.add(RateLimit {
834            categories: smallvec![DataCategory::Error, DataCategory::Default],
835            scope: RateLimitScope::Organization(OrganizationId::new(42)),
836            reason_code: Some(ReasonCode::new("second")),
837            retry_after: RetryAfter::from_secs(10),
838            namespaces: smallvec![],
839        });
840
841        insta::assert_ron_snapshot!(rate_limits, @r###"
842        RateLimits(
843          limits: [
844            RateLimit(
845              categories: [
846                default,
847                error,
848              ],
849              scope: Organization(OrganizationId(42)),
850              reason_code: Some(ReasonCode("second")),
851              retry_after: RetryAfter(10),
852              namespaces: [],
853            ),
854          ],
855        )
856        "###);
857    }
858
859    #[test]
860    fn test_rate_limits_add_shadowing() {
861        let mut rate_limits = RateLimits::new();
862
863        rate_limits.add(RateLimit {
864            categories: smallvec![DataCategory::Default, DataCategory::Error],
865            scope: RateLimitScope::Organization(OrganizationId::new(42)),
866            reason_code: Some(ReasonCode::new("first")),
867            retry_after: RetryAfter::from_secs(10),
868            namespaces: smallvec![],
869        });
870
871        // shorter rate limit is shadowed by existing one
872        rate_limits.add(RateLimit {
873            categories: smallvec![DataCategory::Error, DataCategory::Default],
874            scope: RateLimitScope::Organization(OrganizationId::new(42)),
875            reason_code: Some(ReasonCode::new("second")),
876            retry_after: RetryAfter::from_secs(1),
877            namespaces: smallvec![],
878        });
879
880        insta::assert_ron_snapshot!(rate_limits, @r###"
881        RateLimits(
882          limits: [
883            RateLimit(
884              categories: [
885                default,
886                error,
887              ],
888              scope: Organization(OrganizationId(42)),
889              reason_code: Some(ReasonCode("first")),
890              retry_after: RetryAfter(10),
891              namespaces: [],
892            ),
893          ],
894        )
895        "###);
896    }
897
898    #[test]
899    fn test_rate_limits_add_buckets() {
900        let mut rate_limits = RateLimits::new();
901
902        rate_limits.add(RateLimit {
903            categories: smallvec![DataCategory::Error],
904            scope: RateLimitScope::Organization(OrganizationId::new(42)),
905            reason_code: None,
906            retry_after: RetryAfter::from_secs(1),
907            namespaces: smallvec![],
908        });
909
910        // Same scope but different categories
911        rate_limits.add(RateLimit {
912            categories: smallvec![DataCategory::Transaction],
913            scope: RateLimitScope::Organization(OrganizationId::new(42)),
914            reason_code: None,
915            retry_after: RetryAfter::from_secs(1),
916            namespaces: smallvec![],
917        });
918
919        // Same categories but different scope
920        rate_limits.add(RateLimit {
921            categories: smallvec![DataCategory::Error],
922            scope: RateLimitScope::Project(ProjectId::new(21)),
923            reason_code: None,
924            retry_after: RetryAfter::from_secs(1),
925            namespaces: smallvec![],
926        });
927
928        insta::assert_ron_snapshot!(rate_limits, @r###"
929        RateLimits(
930          limits: [
931            RateLimit(
932              categories: [
933                error,
934              ],
935              scope: Organization(OrganizationId(42)),
936              reason_code: None,
937              retry_after: RetryAfter(1),
938              namespaces: [],
939            ),
940            RateLimit(
941              categories: [
942                transaction,
943              ],
944              scope: Organization(OrganizationId(42)),
945              reason_code: None,
946              retry_after: RetryAfter(1),
947              namespaces: [],
948            ),
949            RateLimit(
950              categories: [
951                error,
952              ],
953              scope: Project(ProjectId(21)),
954              reason_code: None,
955              retry_after: RetryAfter(1),
956              namespaces: [],
957            ),
958          ],
959        )
960        "###);
961    }
962
963    /// Regression test that ensures namespaces are correctly added to rate limits.
964    #[test]
965    fn test_rate_limits_add_namespaces() {
966        let mut rate_limits = RateLimits::new();
967
968        rate_limits.add(RateLimit {
969            categories: smallvec![DataCategory::MetricBucket],
970            scope: RateLimitScope::Organization(OrganizationId::new(42)),
971            reason_code: None,
972            retry_after: RetryAfter::from_secs(1),
973            namespaces: smallvec![MetricNamespace::Custom],
974        });
975
976        // Same category but different namespaces
977        rate_limits.add(RateLimit {
978            categories: smallvec![DataCategory::MetricBucket],
979            scope: RateLimitScope::Organization(OrganizationId::new(42)),
980            reason_code: None,
981            retry_after: RetryAfter::from_secs(1),
982            namespaces: smallvec![MetricNamespace::Spans],
983        });
984
985        insta::assert_ron_snapshot!(rate_limits, @r###"
986        RateLimits(
987          limits: [
988            RateLimit(
989              categories: [
990                metric_bucket,
991              ],
992              scope: Organization(OrganizationId(42)),
993              reason_code: None,
994              retry_after: RetryAfter(1),
995              namespaces: [
996                "custom",
997              ],
998            ),
999            RateLimit(
1000              categories: [
1001                metric_bucket,
1002              ],
1003              scope: Organization(OrganizationId(42)),
1004              reason_code: None,
1005              retry_after: RetryAfter(1),
1006              namespaces: [
1007                "spans",
1008              ],
1009            ),
1010          ],
1011        )
1012        "###);
1013    }
1014
1015    #[test]
1016    fn test_rate_limits_longest() {
1017        let mut rate_limits = RateLimits::new();
1018
1019        rate_limits.add(RateLimit {
1020            categories: smallvec![DataCategory::Error],
1021            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1022            reason_code: Some(ReasonCode::new("first")),
1023            retry_after: RetryAfter::from_secs(1),
1024            namespaces: smallvec![],
1025        });
1026
1027        // Distinct scope to prevent deduplication
1028        rate_limits.add(RateLimit {
1029            categories: smallvec![DataCategory::Transaction],
1030            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1031            reason_code: Some(ReasonCode::new("second")),
1032            retry_after: RetryAfter::from_secs(10),
1033            namespaces: smallvec![],
1034        });
1035
1036        let rate_limit = rate_limits.longest().unwrap();
1037        insta::assert_ron_snapshot!(rate_limit, @r###"
1038        RateLimit(
1039          categories: [
1040            transaction,
1041          ],
1042          scope: Organization(OrganizationId(42)),
1043          reason_code: Some(ReasonCode("second")),
1044          retry_after: RetryAfter(10),
1045          namespaces: [],
1046        )
1047        "###);
1048    }
1049
1050    #[test]
1051    fn test_rate_limits_clean_expired() {
1052        let mut rate_limits = RateLimits::new();
1053
1054        // Active error limit
1055        rate_limits.add(RateLimit {
1056            categories: smallvec![DataCategory::Error],
1057            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1058            reason_code: None,
1059            retry_after: RetryAfter::from_secs(1),
1060            namespaces: smallvec![],
1061        });
1062
1063        // Inactive error limit with distinct scope
1064        rate_limits.add(RateLimit {
1065            categories: smallvec![DataCategory::Error],
1066            scope: RateLimitScope::Project(ProjectId::new(21)),
1067            reason_code: None,
1068            retry_after: RetryAfter::from_secs(0),
1069            namespaces: smallvec![],
1070        });
1071
1072        // Sanity check before running `clean_expired`
1073        assert_eq!(rate_limits.iter().count(), 2);
1074
1075        rate_limits.clean_expired(Instant::now());
1076
1077        // Check that the expired limit has been removed
1078        insta::assert_ron_snapshot!(rate_limits, @r###"
1079        RateLimits(
1080          limits: [
1081            RateLimit(
1082              categories: [
1083                error,
1084              ],
1085              scope: Organization(OrganizationId(42)),
1086              reason_code: None,
1087              retry_after: RetryAfter(1),
1088              namespaces: [],
1089            ),
1090          ],
1091        )
1092        "###);
1093    }
1094
1095    #[test]
1096    fn test_rate_limits_is_any_limited() {
1097        let mut rate_limits = RateLimits::new();
1098
1099        // Active error limit
1100        rate_limits.add(RateLimit {
1101            categories: smallvec![DataCategory::Error],
1102            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1103            reason_code: None,
1104            retry_after: RetryAfter::from_secs(1),
1105            namespaces: smallvec![],
1106        });
1107
1108        // Active transaction limit
1109        rate_limits.add(RateLimit {
1110            categories: smallvec![DataCategory::Transaction],
1111            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1112            reason_code: None,
1113            retry_after: RetryAfter::from_secs(1),
1114            namespaces: smallvec![],
1115        });
1116
1117        let scoping = Scoping {
1118            organization_id: OrganizationId::new(42),
1119            project_id: ProjectId::new(21),
1120            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1121            key_id: None,
1122        };
1123
1124        let quotas = &[Quota {
1125            id: None,
1126            categories: smallvec![DataCategory::Attachment],
1127            scope: QuotaScope::Organization,
1128            scope_id: Some("42".to_owned()),
1129            limit: Some(0),
1130            window: None,
1131            reason_code: Some(ReasonCode::new("zero")),
1132            namespace: None,
1133        }];
1134
1135        assert!(
1136            rate_limits.is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Error)])
1137        );
1138        assert!(
1139            rate_limits
1140                .is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Attachment)])
1141        );
1142        assert!(rate_limits.is_any_limited_with_quotas(
1143            quotas,
1144            &[
1145                scoping.item(DataCategory::Replay),
1146                scoping.item(DataCategory::Error)
1147            ]
1148        ));
1149
1150        assert!(
1151            !rate_limits.is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Replay)])
1152        );
1153    }
1154
1155    #[test]
1156    fn test_rate_limits_check() {
1157        let mut rate_limits = RateLimits::new();
1158
1159        // Active error limit
1160        rate_limits.add(RateLimit {
1161            categories: smallvec![DataCategory::Error],
1162            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1163            reason_code: None,
1164            retry_after: RetryAfter::from_secs(1),
1165            namespaces: smallvec![],
1166        });
1167
1168        // Active transaction limit
1169        rate_limits.add(RateLimit {
1170            categories: smallvec![DataCategory::Transaction],
1171            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1172            reason_code: None,
1173            retry_after: RetryAfter::from_secs(1),
1174            namespaces: smallvec![],
1175        });
1176
1177        let applied_limits = rate_limits.check(ItemScoping {
1178            category: DataCategory::Error,
1179            scoping: Scoping {
1180                organization_id: OrganizationId::new(42),
1181                project_id: ProjectId::new(21),
1182                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1183                key_id: None,
1184            },
1185            namespace: MetricNamespaceScoping::None,
1186        });
1187
1188        // Check that the error limit is applied
1189        insta::assert_ron_snapshot!(applied_limits, @r###"
1190        RateLimits(
1191          limits: [
1192            RateLimit(
1193              categories: [
1194                error,
1195              ],
1196              scope: Organization(OrganizationId(42)),
1197              reason_code: None,
1198              retry_after: RetryAfter(1),
1199              namespaces: [],
1200            ),
1201          ],
1202        )
1203        "###);
1204    }
1205
1206    #[test]
1207    fn test_rate_limits_check_quotas() {
1208        let mut rate_limits = RateLimits::new();
1209
1210        // Active error limit
1211        rate_limits.add(RateLimit {
1212            categories: smallvec![DataCategory::Error],
1213            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1214            reason_code: None,
1215            retry_after: RetryAfter::from_secs(1),
1216            namespaces: smallvec![],
1217        });
1218
1219        // Active transaction limit
1220        rate_limits.add(RateLimit {
1221            categories: smallvec![DataCategory::Transaction],
1222            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1223            reason_code: None,
1224            retry_after: RetryAfter::from_secs(1),
1225            namespaces: smallvec![],
1226        });
1227
1228        let item_scoping = ItemScoping {
1229            category: DataCategory::Error,
1230            scoping: Scoping {
1231                organization_id: OrganizationId::new(42),
1232                project_id: ProjectId::new(21),
1233                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1234                key_id: None,
1235            },
1236            namespace: MetricNamespaceScoping::None,
1237        };
1238
1239        let quotas = &[Quota {
1240            id: None,
1241            categories: smallvec![DataCategory::Error],
1242            scope: QuotaScope::Organization,
1243            scope_id: Some("42".to_owned()),
1244            limit: Some(0),
1245            window: None,
1246            reason_code: Some(ReasonCode::new("zero")),
1247            namespace: None,
1248        }];
1249
1250        let applied_limits = rate_limits.check_with_quotas(quotas, item_scoping);
1251
1252        insta::assert_ron_snapshot!(applied_limits, @r###"
1253        RateLimits(
1254          limits: [
1255            RateLimit(
1256              categories: [
1257                error,
1258              ],
1259              scope: Organization(OrganizationId(42)),
1260              reason_code: Some(ReasonCode("zero")),
1261              retry_after: RetryAfter(60),
1262              namespaces: [],
1263            ),
1264          ],
1265        )
1266        "###);
1267    }
1268
1269    #[test]
1270    fn test_rate_limits_merge() {
1271        let mut rate_limits1 = RateLimits::new();
1272        let mut rate_limits2 = RateLimits::new();
1273
1274        rate_limits1.add(RateLimit {
1275            categories: smallvec![DataCategory::Error],
1276            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1277            reason_code: Some(ReasonCode::new("first")),
1278            retry_after: RetryAfter::from_secs(1),
1279            namespaces: smallvec![],
1280        });
1281
1282        rate_limits1.add(RateLimit {
1283            categories: smallvec![DataCategory::TransactionIndexed],
1284            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1285            reason_code: None,
1286            retry_after: RetryAfter::from_secs(1),
1287            namespaces: smallvec![],
1288        });
1289
1290        rate_limits2.add(RateLimit {
1291            categories: smallvec![DataCategory::Error],
1292            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1293            reason_code: Some(ReasonCode::new("second")),
1294            retry_after: RetryAfter::from_secs(10),
1295            namespaces: smallvec![],
1296        });
1297
1298        rate_limits1.merge(rate_limits2);
1299
1300        insta::assert_ron_snapshot!(rate_limits1, @r###"
1301        RateLimits(
1302          limits: [
1303            RateLimit(
1304              categories: [
1305                error,
1306              ],
1307              scope: Organization(OrganizationId(42)),
1308              reason_code: Some(ReasonCode("second")),
1309              retry_after: RetryAfter(10),
1310              namespaces: [],
1311            ),
1312            RateLimit(
1313              categories: [
1314                transaction_indexed,
1315              ],
1316              scope: Organization(OrganizationId(42)),
1317              reason_code: None,
1318              retry_after: RetryAfter(1),
1319              namespaces: [],
1320            ),
1321          ],
1322        )
1323        "###);
1324    }
1325
1326    #[test]
1327    fn test_cached_rate_limits_expired() {
1328        let cached = CachedRateLimits::new();
1329
1330        // Active error limit
1331        cached.add(RateLimit {
1332            categories: smallvec![DataCategory::Error],
1333            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1334            reason_code: None,
1335            retry_after: RetryAfter::from_secs(1),
1336            namespaces: smallvec![],
1337        });
1338
1339        // Inactive error limit with distinct scope
1340        cached.add(RateLimit {
1341            categories: smallvec![DataCategory::Error],
1342            scope: RateLimitScope::Project(ProjectId::new(21)),
1343            reason_code: None,
1344            retry_after: RetryAfter::from_secs(0),
1345            namespaces: smallvec![],
1346        });
1347
1348        let rate_limits = cached.current_limits();
1349
1350        insta::assert_ron_snapshot!(rate_limits, @r###"
1351        RateLimits(
1352          limits: [
1353            RateLimit(
1354              categories: [
1355                error,
1356              ],
1357              scope: Organization(OrganizationId(42)),
1358              reason_code: None,
1359              retry_after: RetryAfter(1),
1360              namespaces: [],
1361            ),
1362          ],
1363        )
1364        "###);
1365    }
1366}