relay_quotas/
rate_limit.rs

1use std::fmt;
2use std::str::FromStr;
3use std::sync::{Arc, Mutex, PoisonError};
4use std::time::{Duration, Instant};
5
6use relay_base_schema::data_category::DataCategory;
7use relay_base_schema::metrics::MetricNamespace;
8use relay_base_schema::organization::OrganizationId;
9use relay_base_schema::project::{ProjectId, ProjectKey};
10use smallvec::SmallVec;
11
12use crate::REJECT_ALL_SECS;
13use crate::quota::{DataCategories, ItemScoping, Quota, QuotaScope, ReasonCode, Scoping};
14
15/// A monotonic expiration marker for rate limits.
16///
17/// [`RetryAfter`] represents a point in time when a rate limit expires. It allows checking
18/// whether the rate limit is still active or has expired, and calculating the remaining time
19/// until expiration.
20#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
21pub struct RetryAfter {
22    when: Instant,
23}
24
25impl RetryAfter {
26    /// Creates a new [`RetryAfter`] instance that expires after the specified number of seconds.
27    #[inline]
28    pub fn from_secs(seconds: u64) -> Self {
29        let now = Instant::now();
30        let when = now.checked_add(Duration::from_secs(seconds)).unwrap_or(now);
31        Self { when }
32    }
33
34    /// Returns the remaining duration until the rate limit expires using the specified instant
35    /// as the reference point.
36    ///
37    /// If the rate limit has already expired at the given instant, returns `None`.
38    #[inline]
39    pub fn remaining_at(self, at: Instant) -> Option<Duration> {
40        if at >= self.when {
41            None
42        } else {
43            Some(self.when - at)
44        }
45    }
46
47    /// Returns the remaining duration until the rate limit expires.
48    ///
49    /// This uses the current instant as the reference point. If the rate limit has already
50    /// expired, returns `None`.
51    #[inline]
52    pub fn remaining(self) -> Option<Duration> {
53        self.remaining_at(Instant::now())
54    }
55
56    /// Returns the remaining seconds until the rate limit expires using the specified instant
57    /// as the reference point.
58    ///
59    /// This method rounds up to the next second to ensure that rate limits are strictly enforced.
60    /// If the rate limit has already expired, returns `0`.
61    #[inline]
62    pub fn remaining_seconds_at(self, at: Instant) -> u64 {
63        match self.remaining_at(at) {
64            // Compensate for the missing subsec part by adding 1s
65            Some(duration) if duration.subsec_nanos() == 0 => duration.as_secs(),
66            Some(duration) => duration.as_secs() + 1,
67            None => 0,
68        }
69    }
70
71    /// Returns the remaining seconds until the rate limit expires.
72    ///
73    /// This uses the current instant as the reference point. If the rate limit
74    /// has already expired, returns `0`.
75    #[inline]
76    pub fn remaining_seconds(self) -> u64 {
77        self.remaining_seconds_at(Instant::now())
78    }
79
80    /// Returns whether this rate limit has expired at the specified instant.
81    #[inline]
82    pub fn expired_at(self, at: Instant) -> bool {
83        self.remaining_at(at).is_none()
84    }
85
86    /// Returns whether this rate limit has expired at the current instant.
87    #[inline]
88    pub fn expired(self) -> bool {
89        self.remaining_at(Instant::now()).is_none()
90    }
91}
92
93impl fmt::Debug for RetryAfter {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        match self.remaining_seconds() {
96            0 => write!(f, "RetryAfter(expired)"),
97            remaining => write!(f, "RetryAfter({remaining}s)"),
98        }
99    }
100}
101
102#[cfg(test)]
103impl serde::Serialize for RetryAfter {
104    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
105    where
106        S: serde::Serializer,
107    {
108        use serde::ser::SerializeTupleStruct;
109        let mut tup = serializer.serialize_tuple_struct("RetryAfter", 1)?;
110        tup.serialize_field(&self.remaining_seconds())?;
111        tup.end()
112    }
113}
114
115/// Error that occurs when parsing a [`RetryAfter`] from a string fails.
116#[derive(Debug)]
117pub enum InvalidRetryAfter {
118    /// The supplied delay in seconds was not valid.
119    InvalidDelay(std::num::ParseFloatError),
120}
121
122impl FromStr for RetryAfter {
123    type Err = InvalidRetryAfter;
124
125    fn from_str(s: &str) -> Result<Self, Self::Err> {
126        let float = s.parse::<f64>().map_err(InvalidRetryAfter::InvalidDelay)?;
127        let seconds = float.max(0.0).ceil() as u64;
128        Ok(RetryAfter::from_secs(seconds))
129    }
130}
131
132/// The scope that a rate limit applies to.
133///
134/// Unlike [`QuotaScope`], which only declares the class of the scope, this enum carries
135/// the specific identifiers of the individual scopes that a rate limit applies to.
136///
137/// Rate limits can be applied at different levels of granularity, from global (affecting all data)
138/// down to a specific project key.
139#[derive(Clone, Debug, Eq, PartialEq)]
140#[cfg_attr(test, derive(serde::Serialize))]
141pub enum RateLimitScope {
142    /// An organization with identifier.
143    Organization(OrganizationId),
144    /// A project with identifier.
145    Project(ProjectId),
146    /// A DSN public key.
147    Key(ProjectKey),
148}
149
150impl RateLimitScope {
151    /// Creates a rate limiting scope from the given item scoping for a specific quota.
152    ///
153    /// This extracts the appropriate scope identifier based on the quota's scope type.
154    /// For unknown scopes, it assumes the most specific scope (Key).
155    pub fn for_quota(scoping: Scoping, scope: QuotaScope) -> Self {
156        match scope {
157            QuotaScope::Organization => Self::Organization(scoping.organization_id),
158            QuotaScope::Project => Self::Project(scoping.project_id),
159            QuotaScope::Key => Self::Key(scoping.project_key),
160            // For unknown scopes, assume the most specific scope:
161            QuotaScope::Unknown => Self::Key(scoping.project_key),
162        }
163    }
164
165    /// Returns the canonical name of this scope.
166    ///
167    /// This corresponds to the name of the corresponding [`QuotaScope`].
168    pub fn name(&self) -> &'static str {
169        match *self {
170            Self::Key(_) => QuotaScope::Key.name(),
171            Self::Project(_) => QuotaScope::Project.name(),
172            Self::Organization(_) => QuotaScope::Organization.name(),
173        }
174    }
175}
176
177/// An active rate limit that restricts data ingestion.
178///
179/// A rate limit defines restrictions for specific data categories within a particular scope.
180/// It includes an expiration time after which the limit is no longer enforced.
181///
182/// Rate limits can be created from [`Quota`]s or directly constructed with the needed parameters.
183#[derive(Clone, Debug, PartialEq)]
184#[cfg_attr(test, derive(serde::Serialize))]
185pub struct RateLimit {
186    /// A set of data categories that this quota applies to. If empty, this rate limit
187    /// applies to all data categories.
188    pub categories: DataCategories,
189
190    /// The scope of this rate limit.
191    pub scope: RateLimitScope,
192
193    /// A machine-readable reason indicating which quota caused this rate limit.
194    pub reason_code: Option<ReasonCode>,
195
196    /// A marker when this rate limit expires.
197    pub retry_after: RetryAfter,
198
199    /// The metric namespace of this rate limit.
200    ///
201    /// Only relevant for data categories of type metric bucket. If empty,
202    /// this rate limit applies to metrics of all namespaces.
203    pub namespaces: SmallVec<[MetricNamespace; 1]>,
204}
205
206impl RateLimit {
207    /// Creates a new rate limit from the given [`Quota`].
208    ///
209    /// This builds a rate limit with the appropriate scope derived from the quota and scoping
210    /// information. The categories and other properties are copied from the quota.
211    pub fn from_quota(quota: &Quota, scoping: Scoping, retry_after: RetryAfter) -> Self {
212        Self {
213            categories: quota.categories.clone(),
214            scope: RateLimitScope::for_quota(scoping, quota.scope),
215            reason_code: quota.reason_code.clone(),
216            retry_after,
217            namespaces: quota.namespace.into_iter().collect(),
218        }
219    }
220
221    /// Checks whether this rate limit applies to the given item.
222    ///
223    /// A rate limit applies if its scope matches the item's scope, and the item's
224    /// category and namespace match those of the rate limit.
225    pub fn matches(&self, scoping: ItemScoping) -> bool {
226        self.matches_scope(scoping)
227            && scoping.matches_categories(&self.categories)
228            && scoping.matches_namespaces(&self.namespaces)
229    }
230
231    /// Returns `true` if the rate limiting scope matches the given item.
232    fn matches_scope(&self, scoping: ItemScoping) -> bool {
233        match self.scope {
234            RateLimitScope::Organization(org_id) => scoping.organization_id == org_id,
235            RateLimitScope::Project(project_id) => scoping.project_id == project_id,
236            RateLimitScope::Key(key) => scoping.project_key == key,
237        }
238    }
239}
240
241/// A collection of scoped rate limits.
242///
243/// [`RateLimits`] manages a set of active rate limits that can be checked against
244/// incoming data.
245///
246/// The collection can be empty, indicated by [`is_ok`](Self::is_ok), meaning no rate limits
247/// are currently active.
248#[derive(Clone, Debug, Default)]
249#[cfg_attr(test, derive(serde::Serialize))]
250pub struct RateLimits {
251    limits: Vec<RateLimit>,
252}
253
254impl RateLimits {
255    /// Creates an empty [`RateLimits`] instance.
256    pub fn new() -> Self {
257        Self::default()
258    }
259
260    /// Adds a limit to this collection.
261    ///
262    /// If a rate limit with an overlapping scope already exists, the `retry_after` count is merged
263    /// with the existing limit. Otherwise, the new rate limit is added.
264    pub fn add(&mut self, limit: RateLimit) {
265        let limit_opt = self.limits.iter_mut().find(|l| {
266            let RateLimit {
267                categories,
268                scope,
269                reason_code: _,
270                retry_after: _,
271                namespaces: namespace,
272            } = &limit;
273
274            *categories == l.categories && *scope == l.scope && *namespace == l.namespaces
275        });
276
277        match limit_opt {
278            None => self.limits.push(limit),
279            Some(existing) if existing.retry_after < limit.retry_after => *existing = limit,
280            Some(_) => (), // keep existing, longer limit
281        }
282    }
283
284    /// Merges all limits from another [`RateLimits`] instance into this one.
285    ///
286    /// This keeps all existing rate limits, adds new ones, and updates any existing ones
287    /// with a later expiration time. The resulting collection contains the maximum
288    /// constraints from both instances.
289    pub fn merge(&mut self, limits: Self) {
290        for limit in limits {
291            self.add(limit);
292        }
293    }
294
295    /// Merges all limits from another [`RateLimits`] with this one.
296    ///
297    /// See also: [`Self::merge`].
298    pub fn merge_with(mut self, other: Self) -> Self {
299        self.merge(other);
300        self
301    }
302
303    /// Returns `true` if this instance contains no active limits.
304    ///
305    /// This is the opposite of [`is_limited`](Self::is_limited).
306    pub fn is_ok(&self) -> bool {
307        !self.is_limited()
308    }
309
310    /// Returns `true` if this instance contains any active rate limits.
311    ///
312    /// A rate limit is considered active if it has not yet expired.
313    pub fn is_limited(&self) -> bool {
314        let now = Instant::now();
315        self.iter().any(|limit| !limit.retry_after.expired_at(now))
316    }
317
318    /// Returns `true` if this is instance contains any active rate limits
319    /// for the specified categories.
320    ///
321    /// A rate limit is considered active if it has not yet expired.
322    pub fn is_any_limited(&self, scopings: &[ItemScoping]) -> bool {
323        self.is_any_limited_with_quotas(&[], scopings)
324    }
325
326    /// Returns `true` if this is instance contains any active rate limits
327    /// for the specified categories.
328    ///
329    /// This is similar to [`Self::is_limited`], but additionally checks for quotas with a static
330    /// limit of `0`, which reject items even if there is no active rate limit in this instance.
331    ///
332    /// A rate limit is considered active if it has not yet expired.
333    pub fn is_any_limited_with_quotas<'a>(
334        &self,
335        quotas: impl IntoIterator<Item = &'a Quota>,
336        scopings: &[ItemScoping],
337    ) -> bool {
338        for quota in quotas {
339            for scoping in scopings {
340                if quota.limit == Some(0) && quota.matches(*scoping) {
341                    return true;
342                }
343            }
344        }
345
346        let now = Instant::now();
347        for scoping in scopings {
348            for limit in &self.limits {
349                if limit.matches(*scoping) && !limit.retry_after.expired_at(now) {
350                    return true;
351                }
352            }
353        }
354
355        false
356    }
357
358    /// Removes expired rate limits from this instance.
359    ///
360    /// This is useful for cleaning up rate limits that are no longer relevant,
361    /// reducing memory usage and improving performance of subsequent operations.
362    pub fn clean_expired(&mut self, now: Instant) {
363        self.limits
364            .retain(|limit| !limit.retry_after.expired_at(now));
365    }
366
367    /// Checks whether any rate limits apply to the given scoping.
368    ///
369    /// Returns a new [`RateLimits`] instance containing only the rate limits that match
370    /// the provided [`ItemScoping`]. If no limits match, the returned instance will be empty
371    /// and [`is_ok`](Self::is_ok) will return `true`.
372    pub fn check(&self, scoping: ItemScoping) -> Self {
373        self.check_with_quotas(&[], scoping)
374    }
375
376    /// Checks whether any rate limits or static quotas apply to the given scoping.
377    ///
378    /// This is similar to [`check`](Self::check), but additionally checks for quotas with a static
379    /// limit of `0`, which reject items even if there is no active rate limit in this instance.
380    ///
381    /// Returns a new [`RateLimits`] instance containing the rate limits that match the provided
382    /// [`ItemScoping`]. If no limits or quotas match, the returned instance will be empty.
383    pub fn check_with_quotas<'a>(
384        &self,
385        quotas: impl IntoIterator<Item = &'a Quota>,
386        scoping: ItemScoping,
387    ) -> Self {
388        let mut applied_limits = Self::new();
389
390        for quota in quotas {
391            if quota.limit == Some(0) && quota.matches(scoping) {
392                let retry_after = RetryAfter::from_secs(REJECT_ALL_SECS);
393                applied_limits.add(RateLimit::from_quota(quota, *scoping, retry_after));
394            }
395        }
396
397        for limit in &self.limits {
398            if limit.matches(scoping) {
399                applied_limits.add(limit.clone());
400            }
401        }
402
403        applied_limits
404    }
405
406    /// Returns an iterator over all rate limits in this collection.
407    pub fn iter(&self) -> RateLimitsIter<'_> {
408        RateLimitsIter {
409            iter: self.limits.iter(),
410        }
411    }
412
413    /// Returns the rate limit with the latest expiration time.
414    ///
415    /// If multiple rate limits have the same expiration time, any of them may be returned.
416    /// If the collection is empty, returns `None`.
417    pub fn longest(&self) -> Option<&RateLimit> {
418        self.iter().max_by_key(|limit| limit.retry_after)
419    }
420
421    /// Returns `true` if there are no rate limits in this collection.
422    ///
423    /// Note that an empty collection is not the same as having no active limits.
424    /// Use [`is_ok`](Self::is_ok) to check if there are no active limits.
425    pub fn is_empty(&self) -> bool {
426        self.limits.is_empty()
427    }
428}
429
430/// An iterator over rate limit references.
431///
432/// This struct is created by the [`iter`](RateLimits::iter) method on [`RateLimits`].
433/// It yields shared references to the rate limits in the collection.
434pub struct RateLimitsIter<'a> {
435    iter: std::slice::Iter<'a, RateLimit>,
436}
437
438impl<'a> Iterator for RateLimitsIter<'a> {
439    type Item = &'a RateLimit;
440
441    fn next(&mut self) -> Option<Self::Item> {
442        self.iter.next()
443    }
444}
445
446impl IntoIterator for RateLimits {
447    type IntoIter = RateLimitsIntoIter;
448    type Item = RateLimit;
449
450    fn into_iter(self) -> Self::IntoIter {
451        RateLimitsIntoIter {
452            iter: self.limits.into_iter(),
453        }
454    }
455}
456
457/// An iterator that consumes a [`RateLimits`] collection.
458///
459/// This struct is created by the `into_iter` method on [`RateLimits`], provided by the
460/// [`IntoIterator`] trait. It yields owned rate limits by value.
461pub struct RateLimitsIntoIter {
462    iter: std::vec::IntoIter<RateLimit>,
463}
464
465impl Iterator for RateLimitsIntoIter {
466    type Item = RateLimit;
467
468    fn next(&mut self) -> Option<Self::Item> {
469        self.iter.next()
470    }
471}
472
473impl<'a> IntoIterator for &'a RateLimits {
474    type IntoIter = RateLimitsIter<'a>;
475    type Item = &'a RateLimit;
476
477    fn into_iter(self) -> Self::IntoIter {
478        self.iter()
479    }
480}
481
482/// A thread-safe cache of rate limits with automatic expiration handling.
483///
484/// [`CachedRateLimits`] wraps a [`RateLimits`] collection with a mutex to allow safe
485/// concurrent access from multiple threads. It automatically removes expired rate limits
486/// when retrieving the current limits.
487///
488/// This is useful for maintaining a shared set of rate limits across multiple
489/// processing threads or tasks.
490#[derive(Debug, Default)]
491pub struct CachedRateLimits(Mutex<Arc<RateLimits>>);
492
493impl CachedRateLimits {
494    /// Creates a new, empty instance without any rate limits.
495    pub fn new() -> Self {
496        Self::default()
497    }
498
499    /// Adds a rate limit to this collection.
500    ///
501    /// This is a thread-safe wrapper around [`RateLimits::add`].
502    pub fn add(&self, limit: RateLimit) {
503        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
504        let current = Arc::make_mut(&mut inner);
505        current.add(limit);
506    }
507
508    /// Merges rate limits from another collection into this one.
509    ///
510    /// This is a thread-safe wrapper around [`RateLimits::merge`].
511    pub fn merge(&self, limits: RateLimits) {
512        if limits.is_empty() {
513            return;
514        }
515
516        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
517        let current = Arc::make_mut(&mut inner);
518        for mut limit in limits {
519            // To spice it up, we do have some special casing here for 'inherited categories',
520            // e.g. spans and transactions.
521            //
522            // The tldr; is, as transactions are just containers for spans,
523            // we can enforce span limits on transactions but also vice versa.
524            //
525            // So this is largely an enforcement problem, but since Relay propagates
526            // rate limits to clients, we clone the limits with the inherited category.
527            // This ensures old SDKs rate limit correctly, but also it simplifies client
528            // implementations. Only Relay needs to make this decision.
529            for i in 0..limit.categories.len() {
530                let Some(category) = limit.categories.get(i) else {
531                    debug_assert!(false, "logical error");
532                    break;
533                };
534
535                for inherited in inherited_categories(category) {
536                    if let Some(categories) = limit.categories.add(*inherited) {
537                        limit.categories = categories;
538                    }
539                }
540            }
541
542            current.add(limit);
543        }
544    }
545
546    /// Returns a reference to the current rate limits.
547    ///
548    /// This method automatically removes any expired rate limits before returning,
549    /// ensuring that only active limits are included in the result.
550    pub fn current_limits(&self) -> Arc<RateLimits> {
551        let now = Instant::now();
552        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
553        Arc::make_mut(&mut inner).clean_expired(now);
554        Arc::clone(&inner)
555    }
556}
557
558/// Returns inherited rate limit categories for the passed category.
559///
560/// When a rate limit for a category can also be enforced in a different category,
561/// then it's an inherited category.
562///
563/// For example, a transaction rate limit can also be applied to spans and vice versa.
564///
565/// For a detailed explanation on span/transaction enforcement see:
566/// <https://develop.sentry.dev/ingestion/relay/transaction-span-ratelimits/>.
567fn inherited_categories(category: &DataCategory) -> &'static [DataCategory] {
568    match category {
569        DataCategory::Transaction => &[DataCategory::Span],
570        DataCategory::Span => &[DataCategory::Transaction],
571        _ => &[],
572    }
573}
574
575#[cfg(test)]
576mod tests {
577    use smallvec::smallvec;
578
579    use super::*;
580    use crate::MetricNamespaceScoping;
581    use crate::quota::DataCategory;
582
583    #[test]
584    fn test_parse_retry_after() {
585        // positive float always rounds up to the next integer
586        let retry_after = "17.1".parse::<RetryAfter>().expect("parse RetryAfter");
587        assert_eq!(retry_after.remaining_seconds(), 18);
588        assert!(!retry_after.expired());
589        let retry_after = "17.7".parse::<RetryAfter>().expect("parse RetryAfter");
590        assert_eq!(retry_after.remaining_seconds(), 18);
591        assert!(!retry_after.expired());
592
593        // positive int
594        let retry_after = "17".parse::<RetryAfter>().expect("parse RetryAfter");
595        assert_eq!(retry_after.remaining_seconds(), 17);
596        assert!(!retry_after.expired());
597
598        // negative numbers are treated as zero
599        let retry_after = "-2".parse::<RetryAfter>().expect("parse RetryAfter");
600        assert_eq!(retry_after.remaining_seconds(), 0);
601        assert!(retry_after.expired());
602        let retry_after = "-inf".parse::<RetryAfter>().expect("parse RetryAfter");
603        assert_eq!(retry_after.remaining_seconds(), 0);
604        assert!(retry_after.expired());
605
606        // inf and NaN are valid input and treated as zero
607        let retry_after = "inf".parse::<RetryAfter>().expect("parse RetryAfter");
608        assert_eq!(retry_after.remaining_seconds(), 0);
609        assert!(retry_after.expired());
610        let retry_after = "NaN".parse::<RetryAfter>().expect("parse RetryAfter");
611        assert_eq!(retry_after.remaining_seconds(), 0);
612        assert!(retry_after.expired());
613
614        // large inputs that would overflow are treated as zero
615        let retry_after = "100000000000000000000"
616            .parse::<RetryAfter>()
617            .expect("parse RetryAfter");
618        assert_eq!(retry_after.remaining_seconds(), 0);
619        assert!(retry_after.expired());
620
621        // invalid strings cause parse error
622        "".parse::<RetryAfter>().expect_err("error RetryAfter");
623        "nope".parse::<RetryAfter>().expect_err("error RetryAfter");
624        " 2 ".parse::<RetryAfter>().expect_err("error RetryAfter");
625        "6 0".parse::<RetryAfter>().expect_err("error RetryAfter");
626    }
627
628    #[test]
629    fn test_rate_limit_matches_categories() {
630        let rate_limit = RateLimit {
631            categories: [DataCategory::Unknown, DataCategory::Error].into(),
632            scope: RateLimitScope::Organization(OrganizationId::new(42)),
633            reason_code: None,
634            retry_after: RetryAfter::from_secs(1),
635            namespaces: smallvec![],
636        };
637
638        assert!(rate_limit.matches(ItemScoping {
639            category: DataCategory::Error,
640            scoping: Scoping {
641                organization_id: OrganizationId::new(42),
642                project_id: ProjectId::new(21),
643                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
644                key_id: None,
645            },
646            namespace: MetricNamespaceScoping::None,
647        }));
648
649        assert!(!rate_limit.matches(ItemScoping {
650            category: DataCategory::Transaction,
651            scoping: Scoping {
652                organization_id: OrganizationId::new(42),
653                project_id: ProjectId::new(21),
654                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
655                key_id: None,
656            },
657            namespace: MetricNamespaceScoping::None,
658        }));
659    }
660
661    #[test]
662    fn test_rate_limit_matches_organization() {
663        let rate_limit = RateLimit {
664            categories: DataCategories::new(),
665            scope: RateLimitScope::Organization(OrganizationId::new(42)),
666            reason_code: None,
667            retry_after: RetryAfter::from_secs(1),
668            namespaces: smallvec![],
669        };
670
671        assert!(rate_limit.matches(ItemScoping {
672            category: DataCategory::Error,
673            scoping: Scoping {
674                organization_id: OrganizationId::new(42),
675                project_id: ProjectId::new(21),
676                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
677                key_id: None,
678            },
679            namespace: MetricNamespaceScoping::None,
680        }));
681
682        assert!(!rate_limit.matches(ItemScoping {
683            category: DataCategory::Error,
684            scoping: Scoping {
685                organization_id: OrganizationId::new(0),
686                project_id: ProjectId::new(21),
687                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
688                key_id: None,
689            },
690            namespace: MetricNamespaceScoping::None,
691        }));
692    }
693
694    #[test]
695    fn test_rate_limit_matches_project() {
696        let rate_limit = RateLimit {
697            categories: DataCategories::new(),
698            scope: RateLimitScope::Project(ProjectId::new(21)),
699            reason_code: None,
700            retry_after: RetryAfter::from_secs(1),
701            namespaces: smallvec![],
702        };
703
704        assert!(rate_limit.matches(ItemScoping {
705            category: DataCategory::Error,
706            scoping: Scoping {
707                organization_id: OrganizationId::new(42),
708                project_id: ProjectId::new(21),
709                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
710                key_id: None,
711            },
712            namespace: MetricNamespaceScoping::None,
713        }));
714
715        assert!(!rate_limit.matches(ItemScoping {
716            category: DataCategory::Error,
717            scoping: Scoping {
718                organization_id: OrganizationId::new(42),
719                project_id: ProjectId::new(0),
720                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
721                key_id: None,
722            },
723            namespace: MetricNamespaceScoping::None,
724        }));
725    }
726
727    #[test]
728    fn test_rate_limit_matches_namespaces() {
729        let rate_limit = RateLimit {
730            categories: Default::default(),
731            scope: RateLimitScope::Organization(OrganizationId::new(42)),
732            reason_code: None,
733            retry_after: RetryAfter::from_secs(1),
734            namespaces: smallvec![MetricNamespace::Custom],
735        };
736
737        let scoping = Scoping {
738            organization_id: OrganizationId::new(42),
739            project_id: ProjectId::new(21),
740            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
741            key_id: None,
742        };
743
744        assert!(rate_limit.matches(ItemScoping {
745            category: DataCategory::MetricBucket,
746            scoping,
747            namespace: MetricNamespaceScoping::Some(MetricNamespace::Custom),
748        }));
749
750        assert!(!rate_limit.matches(ItemScoping {
751            category: DataCategory::MetricBucket,
752            scoping,
753            namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
754        }));
755
756        let general_rate_limit = RateLimit {
757            categories: Default::default(),
758            scope: RateLimitScope::Organization(OrganizationId::new(42)),
759            reason_code: None,
760            retry_after: RetryAfter::from_secs(1),
761            namespaces: smallvec![], // all namespaces
762        };
763
764        assert!(general_rate_limit.matches(ItemScoping {
765            category: DataCategory::MetricBucket,
766            scoping,
767            namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
768        }));
769
770        assert!(general_rate_limit.matches(ItemScoping {
771            category: DataCategory::MetricBucket,
772            scoping,
773            namespace: MetricNamespaceScoping::None,
774        }));
775    }
776
777    #[test]
778    fn test_rate_limit_matches_key() {
779        let rate_limit = RateLimit {
780            categories: DataCategories::new(),
781            scope: RateLimitScope::Key(
782                ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
783            ),
784            reason_code: None,
785            retry_after: RetryAfter::from_secs(1),
786            namespaces: smallvec![],
787        };
788
789        assert!(rate_limit.matches(ItemScoping {
790            category: DataCategory::Error,
791            scoping: Scoping {
792                organization_id: OrganizationId::new(42),
793                project_id: ProjectId::new(21),
794                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
795                key_id: None,
796            },
797            namespace: MetricNamespaceScoping::None,
798        }));
799
800        assert!(!rate_limit.matches(ItemScoping {
801            category: DataCategory::Error,
802            scoping: Scoping {
803                organization_id: OrganizationId::new(0),
804                project_id: ProjectId::new(21),
805                project_key: ProjectKey::parse("deadbeefdeadbeefdeadbeefdeadbeef").unwrap(),
806                key_id: None,
807            },
808            namespace: MetricNamespaceScoping::None,
809        }));
810    }
811
812    #[test]
813    fn test_rate_limits_add_replacement() {
814        let mut rate_limits = RateLimits::new();
815
816        rate_limits.add(RateLimit {
817            categories: [DataCategory::Default, DataCategory::Error].into(),
818            scope: RateLimitScope::Organization(OrganizationId::new(42)),
819            reason_code: Some(ReasonCode::new("first")),
820            retry_after: RetryAfter::from_secs(1),
821            namespaces: smallvec![],
822        });
823
824        // longer rate limit shadows shorter one
825        rate_limits.add(RateLimit {
826            categories: [DataCategory::Error, DataCategory::Default].into(),
827            scope: RateLimitScope::Organization(OrganizationId::new(42)),
828            reason_code: Some(ReasonCode::new("second")),
829            retry_after: RetryAfter::from_secs(10),
830            namespaces: smallvec![],
831        });
832
833        insta::assert_ron_snapshot!(rate_limits, @r#"
834        RateLimits(
835          limits: [
836            RateLimit(
837              categories: [
838                "default",
839                "error",
840              ],
841              scope: Organization(OrganizationId(42)),
842              reason_code: Some(ReasonCode("second")),
843              retry_after: RetryAfter(10),
844              namespaces: [],
845            ),
846          ],
847        )
848        "#);
849    }
850
851    #[test]
852    fn test_rate_limits_add_shadowing() {
853        let mut rate_limits = RateLimits::new();
854
855        rate_limits.add(RateLimit {
856            categories: [DataCategory::Default, DataCategory::Error].into(),
857            scope: RateLimitScope::Organization(OrganizationId::new(42)),
858            reason_code: Some(ReasonCode::new("first")),
859            retry_after: RetryAfter::from_secs(10),
860            namespaces: smallvec![],
861        });
862
863        // shorter rate limit is shadowed by existing one
864        rate_limits.add(RateLimit {
865            categories: [DataCategory::Error, DataCategory::Default].into(),
866            scope: RateLimitScope::Organization(OrganizationId::new(42)),
867            reason_code: Some(ReasonCode::new("second")),
868            retry_after: RetryAfter::from_secs(1),
869            namespaces: smallvec![],
870        });
871
872        insta::assert_ron_snapshot!(rate_limits, @r#"
873        RateLimits(
874          limits: [
875            RateLimit(
876              categories: [
877                "default",
878                "error",
879              ],
880              scope: Organization(OrganizationId(42)),
881              reason_code: Some(ReasonCode("first")),
882              retry_after: RetryAfter(10),
883              namespaces: [],
884            ),
885          ],
886        )
887        "#);
888    }
889
890    #[test]
891    fn test_rate_limits_add_buckets() {
892        let mut rate_limits = RateLimits::new();
893
894        rate_limits.add(RateLimit {
895            categories: [DataCategory::Error].into(),
896            scope: RateLimitScope::Organization(OrganizationId::new(42)),
897            reason_code: None,
898            retry_after: RetryAfter::from_secs(1),
899            namespaces: smallvec![],
900        });
901
902        // Same scope but different categories
903        rate_limits.add(RateLimit {
904            categories: [DataCategory::Transaction].into(),
905            scope: RateLimitScope::Organization(OrganizationId::new(42)),
906            reason_code: None,
907            retry_after: RetryAfter::from_secs(1),
908            namespaces: smallvec![],
909        });
910
911        // Same categories but different scope
912        rate_limits.add(RateLimit {
913            categories: [DataCategory::Error].into(),
914            scope: RateLimitScope::Project(ProjectId::new(21)),
915            reason_code: None,
916            retry_after: RetryAfter::from_secs(1),
917            namespaces: smallvec![],
918        });
919
920        insta::assert_ron_snapshot!(rate_limits, @r#"
921        RateLimits(
922          limits: [
923            RateLimit(
924              categories: [
925                "error",
926              ],
927              scope: Organization(OrganizationId(42)),
928              reason_code: None,
929              retry_after: RetryAfter(1),
930              namespaces: [],
931            ),
932            RateLimit(
933              categories: [
934                "transaction",
935              ],
936              scope: Organization(OrganizationId(42)),
937              reason_code: None,
938              retry_after: RetryAfter(1),
939              namespaces: [],
940            ),
941            RateLimit(
942              categories: [
943                "error",
944              ],
945              scope: Project(ProjectId(21)),
946              reason_code: None,
947              retry_after: RetryAfter(1),
948              namespaces: [],
949            ),
950          ],
951        )
952        "#);
953    }
954
955    /// Regression test that ensures namespaces are correctly added to rate limits.
956    #[test]
957    fn test_rate_limits_add_namespaces() {
958        let mut rate_limits = RateLimits::new();
959
960        rate_limits.add(RateLimit {
961            categories: [DataCategory::MetricBucket].into(),
962            scope: RateLimitScope::Organization(OrganizationId::new(42)),
963            reason_code: None,
964            retry_after: RetryAfter::from_secs(1),
965            namespaces: smallvec![MetricNamespace::Custom],
966        });
967
968        // Same category but different namespaces
969        rate_limits.add(RateLimit {
970            categories: [DataCategory::MetricBucket].into(),
971            scope: RateLimitScope::Organization(OrganizationId::new(42)),
972            reason_code: None,
973            retry_after: RetryAfter::from_secs(1),
974            namespaces: smallvec![MetricNamespace::Spans],
975        });
976
977        insta::assert_ron_snapshot!(rate_limits, @r#"
978        RateLimits(
979          limits: [
980            RateLimit(
981              categories: [
982                "metric_bucket",
983              ],
984              scope: Organization(OrganizationId(42)),
985              reason_code: None,
986              retry_after: RetryAfter(1),
987              namespaces: [
988                "custom",
989              ],
990            ),
991            RateLimit(
992              categories: [
993                "metric_bucket",
994              ],
995              scope: Organization(OrganizationId(42)),
996              reason_code: None,
997              retry_after: RetryAfter(1),
998              namespaces: [
999                "spans",
1000              ],
1001            ),
1002          ],
1003        )
1004        "#);
1005    }
1006
1007    #[test]
1008    fn test_rate_limits_longest() {
1009        let mut rate_limits = RateLimits::new();
1010
1011        rate_limits.add(RateLimit {
1012            categories: [DataCategory::Error].into(),
1013            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1014            reason_code: Some(ReasonCode::new("first")),
1015            retry_after: RetryAfter::from_secs(1),
1016            namespaces: smallvec![],
1017        });
1018
1019        // Distinct scope to prevent deduplication
1020        rate_limits.add(RateLimit {
1021            categories: [DataCategory::Transaction].into(),
1022            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1023            reason_code: Some(ReasonCode::new("second")),
1024            retry_after: RetryAfter::from_secs(10),
1025            namespaces: smallvec![],
1026        });
1027
1028        let rate_limit = rate_limits.longest().unwrap();
1029        insta::assert_ron_snapshot!(rate_limit, @r#"
1030        RateLimit(
1031          categories: [
1032            "transaction",
1033          ],
1034          scope: Organization(OrganizationId(42)),
1035          reason_code: Some(ReasonCode("second")),
1036          retry_after: RetryAfter(10),
1037          namespaces: [],
1038        )
1039        "#);
1040    }
1041
1042    #[test]
1043    fn test_rate_limits_clean_expired() {
1044        let mut rate_limits = RateLimits::new();
1045
1046        // Active error limit
1047        rate_limits.add(RateLimit {
1048            categories: [DataCategory::Error].into(),
1049            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1050            reason_code: None,
1051            retry_after: RetryAfter::from_secs(1),
1052            namespaces: smallvec![],
1053        });
1054
1055        // Inactive error limit with distinct scope
1056        rate_limits.add(RateLimit {
1057            categories: [DataCategory::Error].into(),
1058            scope: RateLimitScope::Project(ProjectId::new(21)),
1059            reason_code: None,
1060            retry_after: RetryAfter::from_secs(0),
1061            namespaces: smallvec![],
1062        });
1063
1064        // Sanity check before running `clean_expired`
1065        assert_eq!(rate_limits.iter().count(), 2);
1066
1067        rate_limits.clean_expired(Instant::now());
1068
1069        // Check that the expired limit has been removed
1070        insta::assert_ron_snapshot!(rate_limits, @r#"
1071        RateLimits(
1072          limits: [
1073            RateLimit(
1074              categories: [
1075                "error",
1076              ],
1077              scope: Organization(OrganizationId(42)),
1078              reason_code: None,
1079              retry_after: RetryAfter(1),
1080              namespaces: [],
1081            ),
1082          ],
1083        )
1084        "#);
1085    }
1086
1087    #[test]
1088    fn test_rate_limits_is_any_limited() {
1089        let mut rate_limits = RateLimits::new();
1090
1091        // Active error limit
1092        rate_limits.add(RateLimit {
1093            categories: [DataCategory::Error].into(),
1094            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1095            reason_code: None,
1096            retry_after: RetryAfter::from_secs(1),
1097            namespaces: smallvec![],
1098        });
1099
1100        // Active transaction limit
1101        rate_limits.add(RateLimit {
1102            categories: [DataCategory::Transaction].into(),
1103            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1104            reason_code: None,
1105            retry_after: RetryAfter::from_secs(1),
1106            namespaces: smallvec![],
1107        });
1108
1109        let scoping = Scoping {
1110            organization_id: OrganizationId::new(42),
1111            project_id: ProjectId::new(21),
1112            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1113            key_id: None,
1114        };
1115
1116        let quotas = &[Quota {
1117            id: None,
1118            categories: [DataCategory::Attachment].into(),
1119            scope: QuotaScope::Organization,
1120            scope_id: Some("42".into()),
1121            limit: Some(0),
1122            window: None,
1123            reason_code: Some(ReasonCode::new("zero")),
1124            namespace: None,
1125        }];
1126
1127        assert!(
1128            rate_limits.is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Error)])
1129        );
1130        assert!(
1131            rate_limits
1132                .is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Attachment)])
1133        );
1134        assert!(rate_limits.is_any_limited_with_quotas(
1135            quotas,
1136            &[
1137                scoping.item(DataCategory::Replay),
1138                scoping.item(DataCategory::Error)
1139            ]
1140        ));
1141
1142        assert!(
1143            !rate_limits.is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Replay)])
1144        );
1145    }
1146
1147    #[test]
1148    fn test_rate_limits_check() {
1149        let mut rate_limits = RateLimits::new();
1150
1151        // Active error limit
1152        rate_limits.add(RateLimit {
1153            categories: [DataCategory::Error].into(),
1154            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1155            reason_code: None,
1156            retry_after: RetryAfter::from_secs(1),
1157            namespaces: smallvec![],
1158        });
1159
1160        // Active transaction limit
1161        rate_limits.add(RateLimit {
1162            categories: [DataCategory::Transaction].into(),
1163            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1164            reason_code: None,
1165            retry_after: RetryAfter::from_secs(1),
1166            namespaces: smallvec![],
1167        });
1168
1169        let applied_limits = rate_limits.check(ItemScoping {
1170            category: DataCategory::Error,
1171            scoping: Scoping {
1172                organization_id: OrganizationId::new(42),
1173                project_id: ProjectId::new(21),
1174                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1175                key_id: None,
1176            },
1177            namespace: MetricNamespaceScoping::None,
1178        });
1179
1180        // Check that the error limit is applied
1181        insta::assert_ron_snapshot!(applied_limits, @r#"
1182        RateLimits(
1183          limits: [
1184            RateLimit(
1185              categories: [
1186                "error",
1187              ],
1188              scope: Organization(OrganizationId(42)),
1189              reason_code: None,
1190              retry_after: RetryAfter(1),
1191              namespaces: [],
1192            ),
1193          ],
1194        )
1195        "#);
1196    }
1197
1198    #[test]
1199    fn test_rate_limits_check_quotas() {
1200        let mut rate_limits = RateLimits::new();
1201
1202        // Active error limit
1203        rate_limits.add(RateLimit {
1204            categories: [DataCategory::Error].into(),
1205            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1206            reason_code: None,
1207            retry_after: RetryAfter::from_secs(1),
1208            namespaces: smallvec![],
1209        });
1210
1211        // Active transaction limit
1212        rate_limits.add(RateLimit {
1213            categories: [DataCategory::Transaction].into(),
1214            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1215            reason_code: None,
1216            retry_after: RetryAfter::from_secs(1),
1217            namespaces: smallvec![],
1218        });
1219
1220        let item_scoping = ItemScoping {
1221            category: DataCategory::Error,
1222            scoping: Scoping {
1223                organization_id: OrganizationId::new(42),
1224                project_id: ProjectId::new(21),
1225                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1226                key_id: None,
1227            },
1228            namespace: MetricNamespaceScoping::None,
1229        };
1230
1231        let quotas = &[Quota {
1232            id: None,
1233            categories: [DataCategory::Error].into(),
1234            scope: QuotaScope::Organization,
1235            scope_id: Some("42".into()),
1236            limit: Some(0),
1237            window: None,
1238            reason_code: Some(ReasonCode::new("zero")),
1239            namespace: None,
1240        }];
1241
1242        let applied_limits = rate_limits.check_with_quotas(quotas, item_scoping);
1243
1244        insta::assert_ron_snapshot!(applied_limits, @r#"
1245        RateLimits(
1246          limits: [
1247            RateLimit(
1248              categories: [
1249                "error",
1250              ],
1251              scope: Organization(OrganizationId(42)),
1252              reason_code: Some(ReasonCode("zero")),
1253              retry_after: RetryAfter(60),
1254              namespaces: [],
1255            ),
1256          ],
1257        )
1258        "#);
1259    }
1260
1261    #[test]
1262    fn test_rate_limits_merge() {
1263        let mut rate_limits1 = RateLimits::new();
1264        let mut rate_limits2 = RateLimits::new();
1265
1266        rate_limits1.add(RateLimit {
1267            categories: [DataCategory::Error].into(),
1268            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1269            reason_code: Some(ReasonCode::new("first")),
1270            retry_after: RetryAfter::from_secs(1),
1271            namespaces: smallvec![],
1272        });
1273
1274        rate_limits1.add(RateLimit {
1275            categories: [DataCategory::TransactionIndexed].into(),
1276            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1277            reason_code: None,
1278            retry_after: RetryAfter::from_secs(1),
1279            namespaces: smallvec![],
1280        });
1281
1282        rate_limits2.add(RateLimit {
1283            categories: [DataCategory::Error].into(),
1284            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1285            reason_code: Some(ReasonCode::new("second")),
1286            retry_after: RetryAfter::from_secs(10),
1287            namespaces: smallvec![],
1288        });
1289
1290        rate_limits1.merge(rate_limits2);
1291
1292        insta::assert_ron_snapshot!(rate_limits1, @r#"
1293        RateLimits(
1294          limits: [
1295            RateLimit(
1296              categories: [
1297                "error",
1298              ],
1299              scope: Organization(OrganizationId(42)),
1300              reason_code: Some(ReasonCode("second")),
1301              retry_after: RetryAfter(10),
1302              namespaces: [],
1303            ),
1304            RateLimit(
1305              categories: [
1306                "transaction_indexed",
1307              ],
1308              scope: Organization(OrganizationId(42)),
1309              reason_code: None,
1310              retry_after: RetryAfter(1),
1311              namespaces: [],
1312            ),
1313          ],
1314        )
1315        "#);
1316    }
1317
1318    #[test]
1319    fn test_cached_rate_limits_expired() {
1320        let cached = CachedRateLimits::new();
1321
1322        // Active error limit
1323        cached.add(RateLimit {
1324            categories: [DataCategory::Error].into(),
1325            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1326            reason_code: None,
1327            retry_after: RetryAfter::from_secs(1),
1328            namespaces: smallvec![],
1329        });
1330
1331        // Inactive error limit with distinct scope
1332        cached.add(RateLimit {
1333            categories: [DataCategory::Error].into(),
1334            scope: RateLimitScope::Project(ProjectId::new(21)),
1335            reason_code: None,
1336            retry_after: RetryAfter::from_secs(0),
1337            namespaces: smallvec![],
1338        });
1339
1340        let rate_limits = cached.current_limits();
1341
1342        insta::assert_ron_snapshot!(rate_limits, @r#"
1343        RateLimits(
1344          limits: [
1345            RateLimit(
1346              categories: [
1347                "error",
1348              ],
1349              scope: Organization(OrganizationId(42)),
1350              reason_code: None,
1351              retry_after: RetryAfter(1),
1352              namespaces: [],
1353            ),
1354          ],
1355        )
1356        "#);
1357    }
1358}