relay_quotas/
rate_limit.rs

1use std::fmt;
2use std::str::FromStr;
3use std::sync::{Arc, Mutex, PoisonError};
4use std::time::{Duration, Instant};
5
6use relay_base_schema::data_category::DataCategory;
7use relay_base_schema::metrics::MetricNamespace;
8use relay_base_schema::organization::OrganizationId;
9use relay_base_schema::project::{ProjectId, ProjectKey};
10use smallvec::SmallVec;
11
12use crate::REJECT_ALL_SECS;
13use crate::quota::{DataCategories, ItemScoping, Quota, QuotaScope, ReasonCode, Scoping};
14
15/// A monotonic expiration marker for rate limits.
16///
17/// [`RetryAfter`] represents a point in time when a rate limit expires. It allows checking
18/// whether the rate limit is still active or has expired, and calculating the remaining time
19/// until expiration.
20#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
21pub struct RetryAfter {
22    when: Instant,
23}
24
25impl RetryAfter {
26    /// Creates a new [`RetryAfter`] instance that expires after the specified number of seconds.
27    #[inline]
28    pub fn from_secs(seconds: u64) -> Self {
29        let now = Instant::now();
30        let when = now.checked_add(Duration::from_secs(seconds)).unwrap_or(now);
31        Self { when }
32    }
33
34    /// Returns the remaining duration until the rate limit expires using the specified instant
35    /// as the reference point.
36    ///
37    /// If the rate limit has already expired at the given instant, returns `None`.
38    #[inline]
39    pub fn remaining_at(self, at: Instant) -> Option<Duration> {
40        if at >= self.when {
41            None
42        } else {
43            Some(self.when - at)
44        }
45    }
46
47    /// Returns the remaining duration until the rate limit expires.
48    ///
49    /// This uses the current instant as the reference point. If the rate limit has already
50    /// expired, returns `None`.
51    #[inline]
52    pub fn remaining(self) -> Option<Duration> {
53        self.remaining_at(Instant::now())
54    }
55
56    /// Returns the remaining seconds until the rate limit expires using the specified instant
57    /// as the reference point.
58    ///
59    /// This method rounds up to the next second to ensure that rate limits are strictly enforced.
60    /// If the rate limit has already expired, returns `0`.
61    #[inline]
62    pub fn remaining_seconds_at(self, at: Instant) -> u64 {
63        match self.remaining_at(at) {
64            // Compensate for the missing subsec part by adding 1s
65            Some(duration) if duration.subsec_nanos() == 0 => duration.as_secs(),
66            Some(duration) => duration.as_secs() + 1,
67            None => 0,
68        }
69    }
70
71    /// Returns the remaining seconds until the rate limit expires.
72    ///
73    /// This uses the current instant as the reference point. If the rate limit
74    /// has already expired, returns `0`.
75    #[inline]
76    pub fn remaining_seconds(self) -> u64 {
77        self.remaining_seconds_at(Instant::now())
78    }
79
80    /// Returns whether this rate limit has expired at the specified instant.
81    #[inline]
82    pub fn expired_at(self, at: Instant) -> bool {
83        self.remaining_at(at).is_none()
84    }
85
86    /// Returns whether this rate limit has expired at the current instant.
87    #[inline]
88    pub fn expired(self) -> bool {
89        self.remaining_at(Instant::now()).is_none()
90    }
91}
92
93impl fmt::Debug for RetryAfter {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        match self.remaining_seconds() {
96            0 => write!(f, "RetryAfter(expired)"),
97            remaining => write!(f, "RetryAfter({remaining}s)"),
98        }
99    }
100}
101
102#[cfg(test)]
103impl serde::Serialize for RetryAfter {
104    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
105    where
106        S: serde::Serializer,
107    {
108        use serde::ser::SerializeTupleStruct;
109        let mut tup = serializer.serialize_tuple_struct("RetryAfter", 1)?;
110        tup.serialize_field(&self.remaining_seconds())?;
111        tup.end()
112    }
113}
114
115/// Error that occurs when parsing a [`RetryAfter`] from a string fails.
116#[derive(Debug)]
117pub enum InvalidRetryAfter {
118    /// The supplied delay in seconds was not valid.
119    InvalidDelay(std::num::ParseFloatError),
120}
121
122impl FromStr for RetryAfter {
123    type Err = InvalidRetryAfter;
124
125    fn from_str(s: &str) -> Result<Self, Self::Err> {
126        let float = s.parse::<f64>().map_err(InvalidRetryAfter::InvalidDelay)?;
127        let seconds = float.max(0.0).ceil() as u64;
128        Ok(RetryAfter::from_secs(seconds))
129    }
130}
131
132/// The scope that a rate limit applies to.
133///
134/// Unlike [`QuotaScope`], which only declares the class of the scope, this enum carries
135/// the specific identifiers of the individual scopes that a rate limit applies to.
136///
137/// Rate limits can be applied at different levels of granularity, from global (affecting all data)
138/// down to a specific project key.
139#[derive(Clone, Debug, Eq, PartialEq)]
140#[cfg_attr(test, derive(serde::Serialize))]
141pub enum RateLimitScope {
142    /// Global scope.
143    Global,
144    /// An organization with identifier.
145    Organization(OrganizationId),
146    /// A project with identifier.
147    Project(ProjectId),
148    /// A DSN public key.
149    Key(ProjectKey),
150}
151
152impl RateLimitScope {
153    /// Creates a rate limiting scope from the given item scoping for a specific quota.
154    ///
155    /// This extracts the appropriate scope identifier based on the quota's scope type.
156    /// For unknown scopes, it assumes the most specific scope (Key).
157    pub fn for_quota(scoping: Scoping, scope: QuotaScope) -> Self {
158        match scope {
159            QuotaScope::Global => Self::Global,
160            QuotaScope::Organization => Self::Organization(scoping.organization_id),
161            QuotaScope::Project => Self::Project(scoping.project_id),
162            QuotaScope::Key => Self::Key(scoping.project_key),
163            // For unknown scopes, assume the most specific scope:
164            QuotaScope::Unknown => Self::Key(scoping.project_key),
165        }
166    }
167
168    /// Returns the canonical name of this scope.
169    ///
170    /// This corresponds to the name of the corresponding [`QuotaScope`].
171    pub fn name(&self) -> &'static str {
172        match *self {
173            Self::Global => QuotaScope::Global.name(),
174            Self::Key(_) => QuotaScope::Key.name(),
175            Self::Project(_) => QuotaScope::Project.name(),
176            Self::Organization(_) => QuotaScope::Organization.name(),
177        }
178    }
179}
180
181/// An active rate limit that restricts data ingestion.
182///
183/// A rate limit defines restrictions for specific data categories within a particular scope.
184/// It includes an expiration time after which the limit is no longer enforced.
185///
186/// Rate limits can be created from [`Quota`]s or directly constructed with the needed parameters.
187#[derive(Clone, Debug, PartialEq)]
188#[cfg_attr(test, derive(serde::Serialize))]
189pub struct RateLimit {
190    /// A set of data categories that this quota applies to. If empty, this rate limit
191    /// applies to all data categories.
192    pub categories: DataCategories,
193
194    /// The scope of this rate limit.
195    pub scope: RateLimitScope,
196
197    /// A machine-readable reason indicating which quota caused this rate limit.
198    pub reason_code: Option<ReasonCode>,
199
200    /// A marker when this rate limit expires.
201    pub retry_after: RetryAfter,
202
203    /// The metric namespace of this rate limit.
204    ///
205    /// Only relevant for data categories of type metric bucket. If empty,
206    /// this rate limit applies to metrics of all namespaces.
207    pub namespaces: SmallVec<[MetricNamespace; 1]>,
208}
209
210impl RateLimit {
211    /// Creates a new rate limit from the given [`Quota`].
212    ///
213    /// This builds a rate limit with the appropriate scope derived from the quota and scoping
214    /// information. The categories and other properties are copied from the quota.
215    pub fn from_quota(quota: &Quota, scoping: Scoping, retry_after: RetryAfter) -> Self {
216        Self {
217            categories: quota.categories.clone(),
218            scope: RateLimitScope::for_quota(scoping, quota.scope),
219            reason_code: quota.reason_code.clone(),
220            retry_after,
221            namespaces: quota.namespace.into_iter().collect(),
222        }
223    }
224
225    /// Checks whether this rate limit applies to the given item.
226    ///
227    /// A rate limit applies if its scope matches the item's scope, and the item's
228    /// category and namespace match those of the rate limit.
229    pub fn matches(&self, scoping: ItemScoping) -> bool {
230        self.matches_scope(scoping)
231            && scoping.matches_categories(&self.categories)
232            && scoping.matches_namespaces(&self.namespaces)
233    }
234
235    /// Returns `true` if the rate limiting scope matches the given item.
236    fn matches_scope(&self, scoping: ItemScoping) -> bool {
237        match self.scope {
238            RateLimitScope::Global => true,
239            RateLimitScope::Organization(org_id) => scoping.organization_id == org_id,
240            RateLimitScope::Project(project_id) => scoping.project_id == project_id,
241            RateLimitScope::Key(key) => scoping.project_key == key,
242        }
243    }
244}
245
246/// A collection of scoped rate limits.
247///
248/// [`RateLimits`] manages a set of active rate limits that can be checked against
249/// incoming data.
250///
251/// The collection can be empty, indicated by [`is_ok`](Self::is_ok), meaning no rate limits
252/// are currently active.
253#[derive(Clone, Debug, Default)]
254#[cfg_attr(test, derive(serde::Serialize))]
255pub struct RateLimits {
256    limits: Vec<RateLimit>,
257}
258
259impl RateLimits {
260    /// Creates an empty [`RateLimits`] instance.
261    pub fn new() -> Self {
262        Self::default()
263    }
264
265    /// Adds a limit to this collection.
266    ///
267    /// If a rate limit with an overlapping scope already exists, the `retry_after` count is merged
268    /// with the existing limit. Otherwise, the new rate limit is added.
269    pub fn add(&mut self, mut limit: RateLimit) {
270        // Categories are logically a set, but not implemented as such.
271        limit.categories.sort();
272
273        let limit_opt = self.limits.iter_mut().find(|l| {
274            let RateLimit {
275                categories,
276                scope,
277                reason_code: _,
278                retry_after: _,
279                namespaces: namespace,
280            } = &limit;
281
282            *categories == l.categories && *scope == l.scope && *namespace == l.namespaces
283        });
284
285        match limit_opt {
286            None => self.limits.push(limit),
287            Some(existing) if existing.retry_after < limit.retry_after => *existing = limit,
288            Some(_) => (), // keep existing, longer limit
289        }
290    }
291
292    /// Merges all limits from another [`RateLimits`] instance into this one.
293    ///
294    /// This keeps all existing rate limits, adds new ones, and updates any existing ones
295    /// with a later expiration time. The resulting collection contains the maximum
296    /// constraints from both instances.
297    pub fn merge(&mut self, limits: Self) {
298        for limit in limits {
299            self.add(limit);
300        }
301    }
302
303    /// Merges all limits from another [`RateLimits`] with this one.
304    ///
305    /// See also: [`Self::merge`].
306    pub fn merge_with(mut self, other: Self) -> Self {
307        self.merge(other);
308        self
309    }
310
311    /// Returns `true` if this instance contains no active limits.
312    ///
313    /// This is the opposite of [`is_limited`](Self::is_limited).
314    pub fn is_ok(&self) -> bool {
315        !self.is_limited()
316    }
317
318    /// Returns `true` if this instance contains any active rate limits.
319    ///
320    /// A rate limit is considered active if it has not yet expired.
321    pub fn is_limited(&self) -> bool {
322        let now = Instant::now();
323        self.iter().any(|limit| !limit.retry_after.expired_at(now))
324    }
325
326    /// Removes expired rate limits from this instance.
327    ///
328    /// This is useful for cleaning up rate limits that are no longer relevant,
329    /// reducing memory usage and improving performance of subsequent operations.
330    pub fn clean_expired(&mut self, now: Instant) {
331        self.limits
332            .retain(|limit| !limit.retry_after.expired_at(now));
333    }
334
335    /// Checks whether any rate limits apply to the given scoping.
336    ///
337    /// Returns a new [`RateLimits`] instance containing only the rate limits that match
338    /// the provided [`ItemScoping`]. If no limits match, the returned instance will be empty
339    /// and [`is_ok`](Self::is_ok) will return `true`.
340    pub fn check(&self, scoping: ItemScoping) -> Self {
341        self.check_with_quotas(&[], scoping)
342    }
343
344    /// Checks whether any rate limits or static quotas apply to the given scoping.
345    ///
346    /// This is similar to [`check`](Self::check), but additionally checks for quotas with a static
347    /// limit of `0`, which reject items even if there is no active rate limit in this instance.
348    ///
349    /// Returns a new [`RateLimits`] instance containing the rate limits that match the provided
350    /// [`ItemScoping`]. If no limits or quotas match, the returned instance will be empty.
351    pub fn check_with_quotas<'a>(
352        &self,
353        quotas: impl IntoIterator<Item = &'a Quota>,
354        scoping: ItemScoping,
355    ) -> Self {
356        let mut applied_limits = Self::new();
357
358        for quota in quotas {
359            if quota.limit == Some(0) && quota.matches(scoping) {
360                let retry_after = RetryAfter::from_secs(REJECT_ALL_SECS);
361                applied_limits.add(RateLimit::from_quota(quota, *scoping, retry_after));
362            }
363        }
364
365        for limit in &self.limits {
366            if limit.matches(scoping) {
367                applied_limits.add(limit.clone());
368            }
369        }
370
371        applied_limits
372    }
373
374    /// Returns an iterator over all rate limits in this collection.
375    pub fn iter(&self) -> RateLimitsIter<'_> {
376        RateLimitsIter {
377            iter: self.limits.iter(),
378        }
379    }
380
381    /// Returns the rate limit with the latest expiration time.
382    ///
383    /// If multiple rate limits have the same expiration time, any of them may be returned.
384    /// If the collection is empty, returns `None`.
385    pub fn longest(&self) -> Option<&RateLimit> {
386        self.iter().max_by_key(|limit| limit.retry_after)
387    }
388
389    /// Returns `true` if there are no rate limits in this collection.
390    ///
391    /// Note that an empty collection is not the same as having no active limits.
392    /// Use [`is_ok`](Self::is_ok) to check if there are no active limits.
393    pub fn is_empty(&self) -> bool {
394        self.limits.is_empty()
395    }
396}
397
398/// An iterator over rate limit references.
399///
400/// This struct is created by the [`iter`](RateLimits::iter) method on [`RateLimits`].
401/// It yields shared references to the rate limits in the collection.
402pub struct RateLimitsIter<'a> {
403    iter: std::slice::Iter<'a, RateLimit>,
404}
405
406impl<'a> Iterator for RateLimitsIter<'a> {
407    type Item = &'a RateLimit;
408
409    fn next(&mut self) -> Option<Self::Item> {
410        self.iter.next()
411    }
412}
413
414impl IntoIterator for RateLimits {
415    type IntoIter = RateLimitsIntoIter;
416    type Item = RateLimit;
417
418    fn into_iter(self) -> Self::IntoIter {
419        RateLimitsIntoIter {
420            iter: self.limits.into_iter(),
421        }
422    }
423}
424
425/// An iterator that consumes a [`RateLimits`] collection.
426///
427/// This struct is created by the `into_iter` method on [`RateLimits`], provided by the
428/// [`IntoIterator`] trait. It yields owned rate limits by value.
429pub struct RateLimitsIntoIter {
430    iter: std::vec::IntoIter<RateLimit>,
431}
432
433impl Iterator for RateLimitsIntoIter {
434    type Item = RateLimit;
435
436    fn next(&mut self) -> Option<Self::Item> {
437        self.iter.next()
438    }
439}
440
441impl<'a> IntoIterator for &'a RateLimits {
442    type IntoIter = RateLimitsIter<'a>;
443    type Item = &'a RateLimit;
444
445    fn into_iter(self) -> Self::IntoIter {
446        self.iter()
447    }
448}
449
450/// A thread-safe cache of rate limits with automatic expiration handling.
451///
452/// [`CachedRateLimits`] wraps a [`RateLimits`] collection with a mutex to allow safe
453/// concurrent access from multiple threads. It automatically removes expired rate limits
454/// when retrieving the current limits.
455///
456/// This is useful for maintaining a shared set of rate limits across multiple
457/// processing threads or tasks.
458#[derive(Debug, Default)]
459pub struct CachedRateLimits(Mutex<Arc<RateLimits>>);
460
461impl CachedRateLimits {
462    /// Creates a new, empty instance without any rate limits.
463    pub fn new() -> Self {
464        Self::default()
465    }
466
467    /// Adds a rate limit to this collection.
468    ///
469    /// This is a thread-safe wrapper around [`RateLimits::add`].
470    pub fn add(&self, limit: RateLimit) {
471        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
472        let current = Arc::make_mut(&mut inner);
473        current.add(limit);
474    }
475
476    /// Merges rate limits from another collection into this one.
477    ///
478    /// This is a thread-safe wrapper around [`RateLimits::merge`].
479    pub fn merge(&self, limits: RateLimits) {
480        if limits.is_empty() {
481            return;
482        }
483
484        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
485        let current = Arc::make_mut(&mut inner);
486        for mut limit in limits {
487            // To spice it up, we do have some special casing here for 'inherited categories',
488            // e.g. spans and transactions.
489            //
490            // The tldr; is, as transactions are just containers for spans,
491            // we can enforce span limits on transactions but also vice versa.
492            //
493            // So this is largely an enforcement problem, but since Relay propagates
494            // rate limits to clients, we clone the limits with the inherited category.
495            // This ensures old SDKs rate limit correctly, but also it simplifies client
496            // implementations. Only Relay needs to make this decision.
497            for i in 0..limit.categories.len() {
498                let Some(category) = limit.categories.get(i) else {
499                    debug_assert!(false, "logical error");
500                    break;
501                };
502
503                for inherited in inherited_categories(category) {
504                    if !limit.categories.contains(inherited) {
505                        limit.categories.push(*inherited);
506                    }
507                }
508            }
509
510            current.add(limit);
511        }
512    }
513
514    /// Returns a reference to the current rate limits.
515    ///
516    /// This method automatically removes any expired rate limits before returning,
517    /// ensuring that only active limits are included in the result.
518    pub fn current_limits(&self) -> Arc<RateLimits> {
519        let now = Instant::now();
520        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
521        Arc::make_mut(&mut inner).clean_expired(now);
522        Arc::clone(&inner)
523    }
524}
525
526/// Returns inherited rate limit categories for the passed category.
527///
528/// When a rate limit for a category can also be enforced in a different category,
529/// then it's an inherited category.
530///
531/// For example, a transaction rate limit can also be applied to spans and vice versa.
532///
533/// For a detailed explanation on span/transaction enforcement see:
534/// <https://develop.sentry.dev/ingestion/relay/transaction-span-ratelimits/>.
535fn inherited_categories(category: &DataCategory) -> &'static [DataCategory] {
536    match category {
537        DataCategory::Transaction => &[DataCategory::Span],
538        DataCategory::Span => &[DataCategory::Transaction],
539        _ => &[],
540    }
541}
542
543#[cfg(test)]
544mod tests {
545    use smallvec::smallvec;
546
547    use super::*;
548    use crate::MetricNamespaceScoping;
549    use crate::quota::DataCategory;
550
551    #[test]
552    fn test_parse_retry_after() {
553        // positive float always rounds up to the next integer
554        let retry_after = "17.1".parse::<RetryAfter>().expect("parse RetryAfter");
555        assert_eq!(retry_after.remaining_seconds(), 18);
556        assert!(!retry_after.expired());
557        let retry_after = "17.7".parse::<RetryAfter>().expect("parse RetryAfter");
558        assert_eq!(retry_after.remaining_seconds(), 18);
559        assert!(!retry_after.expired());
560
561        // positive int
562        let retry_after = "17".parse::<RetryAfter>().expect("parse RetryAfter");
563        assert_eq!(retry_after.remaining_seconds(), 17);
564        assert!(!retry_after.expired());
565
566        // negative numbers are treated as zero
567        let retry_after = "-2".parse::<RetryAfter>().expect("parse RetryAfter");
568        assert_eq!(retry_after.remaining_seconds(), 0);
569        assert!(retry_after.expired());
570        let retry_after = "-inf".parse::<RetryAfter>().expect("parse RetryAfter");
571        assert_eq!(retry_after.remaining_seconds(), 0);
572        assert!(retry_after.expired());
573
574        // inf and NaN are valid input and treated as zero
575        let retry_after = "inf".parse::<RetryAfter>().expect("parse RetryAfter");
576        assert_eq!(retry_after.remaining_seconds(), 0);
577        assert!(retry_after.expired());
578        let retry_after = "NaN".parse::<RetryAfter>().expect("parse RetryAfter");
579        assert_eq!(retry_after.remaining_seconds(), 0);
580        assert!(retry_after.expired());
581
582        // large inputs that would overflow are treated as zero
583        let retry_after = "100000000000000000000"
584            .parse::<RetryAfter>()
585            .expect("parse RetryAfter");
586        assert_eq!(retry_after.remaining_seconds(), 0);
587        assert!(retry_after.expired());
588
589        // invalid strings cause parse error
590        "".parse::<RetryAfter>().expect_err("error RetryAfter");
591        "nope".parse::<RetryAfter>().expect_err("error RetryAfter");
592        " 2 ".parse::<RetryAfter>().expect_err("error RetryAfter");
593        "6 0".parse::<RetryAfter>().expect_err("error RetryAfter");
594    }
595
596    #[test]
597    fn test_rate_limit_matches_categories() {
598        let rate_limit = RateLimit {
599            categories: smallvec![DataCategory::Unknown, DataCategory::Error],
600            scope: RateLimitScope::Organization(OrganizationId::new(42)),
601            reason_code: None,
602            retry_after: RetryAfter::from_secs(1),
603            namespaces: smallvec![],
604        };
605
606        assert!(rate_limit.matches(ItemScoping {
607            category: DataCategory::Error,
608            scoping: Scoping {
609                organization_id: OrganizationId::new(42),
610                project_id: ProjectId::new(21),
611                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
612                key_id: None,
613            },
614            namespace: MetricNamespaceScoping::None,
615        }));
616
617        assert!(!rate_limit.matches(ItemScoping {
618            category: DataCategory::Transaction,
619            scoping: Scoping {
620                organization_id: OrganizationId::new(42),
621                project_id: ProjectId::new(21),
622                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
623                key_id: None,
624            },
625            namespace: MetricNamespaceScoping::None,
626        }));
627    }
628
629    #[test]
630    fn test_rate_limit_matches_organization() {
631        let rate_limit = RateLimit {
632            categories: DataCategories::new(),
633            scope: RateLimitScope::Organization(OrganizationId::new(42)),
634            reason_code: None,
635            retry_after: RetryAfter::from_secs(1),
636            namespaces: smallvec![],
637        };
638
639        assert!(rate_limit.matches(ItemScoping {
640            category: DataCategory::Error,
641            scoping: Scoping {
642                organization_id: OrganizationId::new(42),
643                project_id: ProjectId::new(21),
644                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
645                key_id: None,
646            },
647            namespace: MetricNamespaceScoping::None,
648        }));
649
650        assert!(!rate_limit.matches(ItemScoping {
651            category: DataCategory::Error,
652            scoping: Scoping {
653                organization_id: OrganizationId::new(0),
654                project_id: ProjectId::new(21),
655                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
656                key_id: None,
657            },
658            namespace: MetricNamespaceScoping::None,
659        }));
660    }
661
662    #[test]
663    fn test_rate_limit_matches_project() {
664        let rate_limit = RateLimit {
665            categories: DataCategories::new(),
666            scope: RateLimitScope::Project(ProjectId::new(21)),
667            reason_code: None,
668            retry_after: RetryAfter::from_secs(1),
669            namespaces: smallvec![],
670        };
671
672        assert!(rate_limit.matches(ItemScoping {
673            category: DataCategory::Error,
674            scoping: Scoping {
675                organization_id: OrganizationId::new(42),
676                project_id: ProjectId::new(21),
677                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
678                key_id: None,
679            },
680            namespace: MetricNamespaceScoping::None,
681        }));
682
683        assert!(!rate_limit.matches(ItemScoping {
684            category: DataCategory::Error,
685            scoping: Scoping {
686                organization_id: OrganizationId::new(42),
687                project_id: ProjectId::new(0),
688                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
689                key_id: None,
690            },
691            namespace: MetricNamespaceScoping::None,
692        }));
693    }
694
695    #[test]
696    fn test_rate_limit_matches_namespaces() {
697        let rate_limit = RateLimit {
698            categories: smallvec![],
699            scope: RateLimitScope::Organization(OrganizationId::new(42)),
700            reason_code: None,
701            retry_after: RetryAfter::from_secs(1),
702            namespaces: smallvec![MetricNamespace::Custom],
703        };
704
705        let scoping = Scoping {
706            organization_id: OrganizationId::new(42),
707            project_id: ProjectId::new(21),
708            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
709            key_id: None,
710        };
711
712        assert!(rate_limit.matches(ItemScoping {
713            category: DataCategory::MetricBucket,
714            scoping,
715            namespace: MetricNamespaceScoping::Some(MetricNamespace::Custom),
716        }));
717
718        assert!(!rate_limit.matches(ItemScoping {
719            category: DataCategory::MetricBucket,
720            scoping,
721            namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
722        }));
723
724        let general_rate_limit = RateLimit {
725            categories: smallvec![],
726            scope: RateLimitScope::Organization(OrganizationId::new(42)),
727            reason_code: None,
728            retry_after: RetryAfter::from_secs(1),
729            namespaces: smallvec![], // all namespaces
730        };
731
732        assert!(general_rate_limit.matches(ItemScoping {
733            category: DataCategory::MetricBucket,
734            scoping,
735            namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
736        }));
737
738        assert!(general_rate_limit.matches(ItemScoping {
739            category: DataCategory::MetricBucket,
740            scoping,
741            namespace: MetricNamespaceScoping::None,
742        }));
743    }
744
745    #[test]
746    fn test_rate_limit_matches_key() {
747        let rate_limit = RateLimit {
748            categories: DataCategories::new(),
749            scope: RateLimitScope::Key(
750                ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
751            ),
752            reason_code: None,
753            retry_after: RetryAfter::from_secs(1),
754            namespaces: smallvec![],
755        };
756
757        assert!(rate_limit.matches(ItemScoping {
758            category: DataCategory::Error,
759            scoping: Scoping {
760                organization_id: OrganizationId::new(42),
761                project_id: ProjectId::new(21),
762                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
763                key_id: None,
764            },
765            namespace: MetricNamespaceScoping::None,
766        }));
767
768        assert!(!rate_limit.matches(ItemScoping {
769            category: DataCategory::Error,
770            scoping: Scoping {
771                organization_id: OrganizationId::new(0),
772                project_id: ProjectId::new(21),
773                project_key: ProjectKey::parse("deadbeefdeadbeefdeadbeefdeadbeef").unwrap(),
774                key_id: None,
775            },
776            namespace: MetricNamespaceScoping::None,
777        }));
778    }
779
780    #[test]
781    fn test_rate_limits_add_replacement() {
782        let mut rate_limits = RateLimits::new();
783
784        rate_limits.add(RateLimit {
785            categories: smallvec![DataCategory::Default, DataCategory::Error],
786            scope: RateLimitScope::Organization(OrganizationId::new(42)),
787            reason_code: Some(ReasonCode::new("first")),
788            retry_after: RetryAfter::from_secs(1),
789            namespaces: smallvec![],
790        });
791
792        // longer rate limit shadows shorter one
793        rate_limits.add(RateLimit {
794            categories: smallvec![DataCategory::Error, DataCategory::Default],
795            scope: RateLimitScope::Organization(OrganizationId::new(42)),
796            reason_code: Some(ReasonCode::new("second")),
797            retry_after: RetryAfter::from_secs(10),
798            namespaces: smallvec![],
799        });
800
801        insta::assert_ron_snapshot!(rate_limits, @r###"
802        RateLimits(
803          limits: [
804            RateLimit(
805              categories: [
806                default,
807                error,
808              ],
809              scope: Organization(OrganizationId(42)),
810              reason_code: Some(ReasonCode("second")),
811              retry_after: RetryAfter(10),
812              namespaces: [],
813            ),
814          ],
815        )
816        "###);
817    }
818
819    #[test]
820    fn test_rate_limits_add_shadowing() {
821        let mut rate_limits = RateLimits::new();
822
823        rate_limits.add(RateLimit {
824            categories: smallvec![DataCategory::Default, DataCategory::Error],
825            scope: RateLimitScope::Organization(OrganizationId::new(42)),
826            reason_code: Some(ReasonCode::new("first")),
827            retry_after: RetryAfter::from_secs(10),
828            namespaces: smallvec![],
829        });
830
831        // shorter rate limit is shadowed by existing one
832        rate_limits.add(RateLimit {
833            categories: smallvec![DataCategory::Error, DataCategory::Default],
834            scope: RateLimitScope::Organization(OrganizationId::new(42)),
835            reason_code: Some(ReasonCode::new("second")),
836            retry_after: RetryAfter::from_secs(1),
837            namespaces: smallvec![],
838        });
839
840        insta::assert_ron_snapshot!(rate_limits, @r###"
841        RateLimits(
842          limits: [
843            RateLimit(
844              categories: [
845                default,
846                error,
847              ],
848              scope: Organization(OrganizationId(42)),
849              reason_code: Some(ReasonCode("first")),
850              retry_after: RetryAfter(10),
851              namespaces: [],
852            ),
853          ],
854        )
855        "###);
856    }
857
858    #[test]
859    fn test_rate_limits_add_buckets() {
860        let mut rate_limits = RateLimits::new();
861
862        rate_limits.add(RateLimit {
863            categories: smallvec![DataCategory::Error],
864            scope: RateLimitScope::Organization(OrganizationId::new(42)),
865            reason_code: None,
866            retry_after: RetryAfter::from_secs(1),
867            namespaces: smallvec![],
868        });
869
870        // Same scope but different categories
871        rate_limits.add(RateLimit {
872            categories: smallvec![DataCategory::Transaction],
873            scope: RateLimitScope::Organization(OrganizationId::new(42)),
874            reason_code: None,
875            retry_after: RetryAfter::from_secs(1),
876            namespaces: smallvec![],
877        });
878
879        // Same categories but different scope
880        rate_limits.add(RateLimit {
881            categories: smallvec![DataCategory::Error],
882            scope: RateLimitScope::Project(ProjectId::new(21)),
883            reason_code: None,
884            retry_after: RetryAfter::from_secs(1),
885            namespaces: smallvec![],
886        });
887
888        insta::assert_ron_snapshot!(rate_limits, @r###"
889        RateLimits(
890          limits: [
891            RateLimit(
892              categories: [
893                error,
894              ],
895              scope: Organization(OrganizationId(42)),
896              reason_code: None,
897              retry_after: RetryAfter(1),
898              namespaces: [],
899            ),
900            RateLimit(
901              categories: [
902                transaction,
903              ],
904              scope: Organization(OrganizationId(42)),
905              reason_code: None,
906              retry_after: RetryAfter(1),
907              namespaces: [],
908            ),
909            RateLimit(
910              categories: [
911                error,
912              ],
913              scope: Project(ProjectId(21)),
914              reason_code: None,
915              retry_after: RetryAfter(1),
916              namespaces: [],
917            ),
918          ],
919        )
920        "###);
921    }
922
923    /// Regression test that ensures namespaces are correctly added to rate limits.
924    #[test]
925    fn test_rate_limits_add_namespaces() {
926        let mut rate_limits = RateLimits::new();
927
928        rate_limits.add(RateLimit {
929            categories: smallvec![DataCategory::MetricBucket],
930            scope: RateLimitScope::Organization(OrganizationId::new(42)),
931            reason_code: None,
932            retry_after: RetryAfter::from_secs(1),
933            namespaces: smallvec![MetricNamespace::Custom],
934        });
935
936        // Same category but different namespaces
937        rate_limits.add(RateLimit {
938            categories: smallvec![DataCategory::MetricBucket],
939            scope: RateLimitScope::Organization(OrganizationId::new(42)),
940            reason_code: None,
941            retry_after: RetryAfter::from_secs(1),
942            namespaces: smallvec![MetricNamespace::Spans],
943        });
944
945        insta::assert_ron_snapshot!(rate_limits, @r###"
946        RateLimits(
947          limits: [
948            RateLimit(
949              categories: [
950                metric_bucket,
951              ],
952              scope: Organization(OrganizationId(42)),
953              reason_code: None,
954              retry_after: RetryAfter(1),
955              namespaces: [
956                "custom",
957              ],
958            ),
959            RateLimit(
960              categories: [
961                metric_bucket,
962              ],
963              scope: Organization(OrganizationId(42)),
964              reason_code: None,
965              retry_after: RetryAfter(1),
966              namespaces: [
967                "spans",
968              ],
969            ),
970          ],
971        )
972        "###);
973    }
974
975    #[test]
976    fn test_rate_limits_longest() {
977        let mut rate_limits = RateLimits::new();
978
979        rate_limits.add(RateLimit {
980            categories: smallvec![DataCategory::Error],
981            scope: RateLimitScope::Organization(OrganizationId::new(42)),
982            reason_code: Some(ReasonCode::new("first")),
983            retry_after: RetryAfter::from_secs(1),
984            namespaces: smallvec![],
985        });
986
987        // Distinct scope to prevent deduplication
988        rate_limits.add(RateLimit {
989            categories: smallvec![DataCategory::Transaction],
990            scope: RateLimitScope::Organization(OrganizationId::new(42)),
991            reason_code: Some(ReasonCode::new("second")),
992            retry_after: RetryAfter::from_secs(10),
993            namespaces: smallvec![],
994        });
995
996        let rate_limit = rate_limits.longest().unwrap();
997        insta::assert_ron_snapshot!(rate_limit, @r###"
998        RateLimit(
999          categories: [
1000            transaction,
1001          ],
1002          scope: Organization(OrganizationId(42)),
1003          reason_code: Some(ReasonCode("second")),
1004          retry_after: RetryAfter(10),
1005          namespaces: [],
1006        )
1007        "###);
1008    }
1009
1010    #[test]
1011    fn test_rate_limits_clean_expired() {
1012        let mut rate_limits = RateLimits::new();
1013
1014        // Active error limit
1015        rate_limits.add(RateLimit {
1016            categories: smallvec![DataCategory::Error],
1017            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1018            reason_code: None,
1019            retry_after: RetryAfter::from_secs(1),
1020            namespaces: smallvec![],
1021        });
1022
1023        // Inactive error limit with distinct scope
1024        rate_limits.add(RateLimit {
1025            categories: smallvec![DataCategory::Error],
1026            scope: RateLimitScope::Project(ProjectId::new(21)),
1027            reason_code: None,
1028            retry_after: RetryAfter::from_secs(0),
1029            namespaces: smallvec![],
1030        });
1031
1032        // Sanity check before running `clean_expired`
1033        assert_eq!(rate_limits.iter().count(), 2);
1034
1035        rate_limits.clean_expired(Instant::now());
1036
1037        // Check that the expired limit has been removed
1038        insta::assert_ron_snapshot!(rate_limits, @r###"
1039        RateLimits(
1040          limits: [
1041            RateLimit(
1042              categories: [
1043                error,
1044              ],
1045              scope: Organization(OrganizationId(42)),
1046              reason_code: None,
1047              retry_after: RetryAfter(1),
1048              namespaces: [],
1049            ),
1050          ],
1051        )
1052        "###);
1053    }
1054
1055    #[test]
1056    fn test_rate_limits_check() {
1057        let mut rate_limits = RateLimits::new();
1058
1059        // Active error limit
1060        rate_limits.add(RateLimit {
1061            categories: smallvec![DataCategory::Error],
1062            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1063            reason_code: None,
1064            retry_after: RetryAfter::from_secs(1),
1065            namespaces: smallvec![],
1066        });
1067
1068        // Active transaction limit
1069        rate_limits.add(RateLimit {
1070            categories: smallvec![DataCategory::Transaction],
1071            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1072            reason_code: None,
1073            retry_after: RetryAfter::from_secs(1),
1074            namespaces: smallvec![],
1075        });
1076
1077        let applied_limits = rate_limits.check(ItemScoping {
1078            category: DataCategory::Error,
1079            scoping: Scoping {
1080                organization_id: OrganizationId::new(42),
1081                project_id: ProjectId::new(21),
1082                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1083                key_id: None,
1084            },
1085            namespace: MetricNamespaceScoping::None,
1086        });
1087
1088        // Check that the error limit is applied
1089        insta::assert_ron_snapshot!(applied_limits, @r###"
1090        RateLimits(
1091          limits: [
1092            RateLimit(
1093              categories: [
1094                error,
1095              ],
1096              scope: Organization(OrganizationId(42)),
1097              reason_code: None,
1098              retry_after: RetryAfter(1),
1099              namespaces: [],
1100            ),
1101          ],
1102        )
1103        "###);
1104    }
1105
1106    #[test]
1107    fn test_rate_limits_check_quotas() {
1108        let mut rate_limits = RateLimits::new();
1109
1110        // Active error limit
1111        rate_limits.add(RateLimit {
1112            categories: smallvec![DataCategory::Error],
1113            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1114            reason_code: None,
1115            retry_after: RetryAfter::from_secs(1),
1116            namespaces: smallvec![],
1117        });
1118
1119        // Active transaction limit
1120        rate_limits.add(RateLimit {
1121            categories: smallvec![DataCategory::Transaction],
1122            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1123            reason_code: None,
1124            retry_after: RetryAfter::from_secs(1),
1125            namespaces: smallvec![],
1126        });
1127
1128        let item_scoping = ItemScoping {
1129            category: DataCategory::Error,
1130            scoping: Scoping {
1131                organization_id: OrganizationId::new(42),
1132                project_id: ProjectId::new(21),
1133                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1134                key_id: None,
1135            },
1136            namespace: MetricNamespaceScoping::None,
1137        };
1138
1139        let quotas = &[Quota {
1140            id: None,
1141            categories: smallvec![DataCategory::Error],
1142            scope: QuotaScope::Organization,
1143            scope_id: Some("42".to_owned()),
1144            limit: Some(0),
1145            window: None,
1146            reason_code: Some(ReasonCode::new("zero")),
1147            namespace: None,
1148        }];
1149
1150        let applied_limits = rate_limits.check_with_quotas(quotas, item_scoping);
1151
1152        insta::assert_ron_snapshot!(applied_limits, @r###"
1153        RateLimits(
1154          limits: [
1155            RateLimit(
1156              categories: [
1157                error,
1158              ],
1159              scope: Organization(OrganizationId(42)),
1160              reason_code: Some(ReasonCode("zero")),
1161              retry_after: RetryAfter(60),
1162              namespaces: [],
1163            ),
1164          ],
1165        )
1166        "###);
1167    }
1168
1169    #[test]
1170    fn test_rate_limits_merge() {
1171        let mut rate_limits1 = RateLimits::new();
1172        let mut rate_limits2 = RateLimits::new();
1173
1174        rate_limits1.add(RateLimit {
1175            categories: smallvec![DataCategory::Error],
1176            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1177            reason_code: Some(ReasonCode::new("first")),
1178            retry_after: RetryAfter::from_secs(1),
1179            namespaces: smallvec![],
1180        });
1181
1182        rate_limits1.add(RateLimit {
1183            categories: smallvec![DataCategory::TransactionIndexed],
1184            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1185            reason_code: None,
1186            retry_after: RetryAfter::from_secs(1),
1187            namespaces: smallvec![],
1188        });
1189
1190        rate_limits2.add(RateLimit {
1191            categories: smallvec![DataCategory::Error],
1192            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1193            reason_code: Some(ReasonCode::new("second")),
1194            retry_after: RetryAfter::from_secs(10),
1195            namespaces: smallvec![],
1196        });
1197
1198        rate_limits1.merge(rate_limits2);
1199
1200        insta::assert_ron_snapshot!(rate_limits1, @r###"
1201        RateLimits(
1202          limits: [
1203            RateLimit(
1204              categories: [
1205                error,
1206              ],
1207              scope: Organization(OrganizationId(42)),
1208              reason_code: Some(ReasonCode("second")),
1209              retry_after: RetryAfter(10),
1210              namespaces: [],
1211            ),
1212            RateLimit(
1213              categories: [
1214                transaction_indexed,
1215              ],
1216              scope: Organization(OrganizationId(42)),
1217              reason_code: None,
1218              retry_after: RetryAfter(1),
1219              namespaces: [],
1220            ),
1221          ],
1222        )
1223        "###);
1224    }
1225
1226    #[test]
1227    fn test_cached_rate_limits_expired() {
1228        let cached = CachedRateLimits::new();
1229
1230        // Active error limit
1231        cached.add(RateLimit {
1232            categories: smallvec![DataCategory::Error],
1233            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1234            reason_code: None,
1235            retry_after: RetryAfter::from_secs(1),
1236            namespaces: smallvec![],
1237        });
1238
1239        // Inactive error limit with distinct scope
1240        cached.add(RateLimit {
1241            categories: smallvec![DataCategory::Error],
1242            scope: RateLimitScope::Project(ProjectId::new(21)),
1243            reason_code: None,
1244            retry_after: RetryAfter::from_secs(0),
1245            namespaces: smallvec![],
1246        });
1247
1248        let rate_limits = cached.current_limits();
1249
1250        insta::assert_ron_snapshot!(rate_limits, @r###"
1251        RateLimits(
1252          limits: [
1253            RateLimit(
1254              categories: [
1255                error,
1256              ],
1257              scope: Organization(OrganizationId(42)),
1258              reason_code: None,
1259              retry_after: RetryAfter(1),
1260              namespaces: [],
1261            ),
1262          ],
1263        )
1264        "###);
1265    }
1266}