relay_quotas/
rate_limit.rs

1use std::fmt;
2use std::str::FromStr;
3use std::sync::{Arc, Mutex, PoisonError};
4use std::time::{Duration, Instant};
5
6use relay_base_schema::data_category::DataCategory;
7use relay_base_schema::metrics::MetricNamespace;
8use relay_base_schema::organization::OrganizationId;
9use relay_base_schema::project::{ProjectId, ProjectKey};
10use smallvec::SmallVec;
11
12use crate::REJECT_ALL_SECS;
13use crate::quota::{DataCategories, ItemScoping, Quota, QuotaScope, ReasonCode, Scoping};
14
15/// A monotonic expiration marker for rate limits.
16///
17/// [`RetryAfter`] represents a point in time when a rate limit expires. It allows checking
18/// whether the rate limit is still active or has expired, and calculating the remaining time
19/// until expiration.
20#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
21pub struct RetryAfter {
22    when: Instant,
23}
24
25impl RetryAfter {
26    /// Creates a new [`RetryAfter`] instance that expires after the specified number of seconds.
27    #[inline]
28    pub fn from_secs(seconds: u64) -> Self {
29        let now = Instant::now();
30        let when = now.checked_add(Duration::from_secs(seconds)).unwrap_or(now);
31        Self { when }
32    }
33
34    /// Returns the remaining duration until the rate limit expires using the specified instant
35    /// as the reference point.
36    ///
37    /// If the rate limit has already expired at the given instant, returns `None`.
38    #[inline]
39    pub fn remaining_at(self, at: Instant) -> Option<Duration> {
40        if at >= self.when {
41            None
42        } else {
43            Some(self.when - at)
44        }
45    }
46
47    /// Returns the remaining duration until the rate limit expires.
48    ///
49    /// This uses the current instant as the reference point. If the rate limit has already
50    /// expired, returns `None`.
51    #[inline]
52    pub fn remaining(self) -> Option<Duration> {
53        self.remaining_at(Instant::now())
54    }
55
56    /// Returns the remaining seconds until the rate limit expires using the specified instant
57    /// as the reference point.
58    ///
59    /// This method rounds up to the next second to ensure that rate limits are strictly enforced.
60    /// If the rate limit has already expired, returns `0`.
61    #[inline]
62    pub fn remaining_seconds_at(self, at: Instant) -> u64 {
63        match self.remaining_at(at) {
64            // Compensate for the missing subsec part by adding 1s
65            Some(duration) if duration.subsec_nanos() == 0 => duration.as_secs(),
66            Some(duration) => duration.as_secs() + 1,
67            None => 0,
68        }
69    }
70
71    /// Returns the remaining seconds until the rate limit expires.
72    ///
73    /// This uses the current instant as the reference point. If the rate limit
74    /// has already expired, returns `0`.
75    #[inline]
76    pub fn remaining_seconds(self) -> u64 {
77        self.remaining_seconds_at(Instant::now())
78    }
79
80    /// Returns whether this rate limit has expired at the specified instant.
81    #[inline]
82    pub fn expired_at(self, at: Instant) -> bool {
83        self.remaining_at(at).is_none()
84    }
85
86    /// Returns whether this rate limit has expired at the current instant.
87    #[inline]
88    pub fn expired(self) -> bool {
89        self.remaining_at(Instant::now()).is_none()
90    }
91}
92
93impl fmt::Debug for RetryAfter {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        match self.remaining_seconds() {
96            0 => write!(f, "RetryAfter(expired)"),
97            remaining => write!(f, "RetryAfter({remaining}s)"),
98        }
99    }
100}
101
102#[cfg(test)]
103impl serde::Serialize for RetryAfter {
104    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
105    where
106        S: serde::Serializer,
107    {
108        use serde::ser::SerializeTupleStruct;
109        let mut tup = serializer.serialize_tuple_struct("RetryAfter", 1)?;
110        tup.serialize_field(&self.remaining_seconds())?;
111        tup.end()
112    }
113}
114
115/// Error that occurs when parsing a [`RetryAfter`] from a string fails.
116#[derive(Debug)]
117pub enum InvalidRetryAfter {
118    /// The supplied delay in seconds was not valid.
119    InvalidDelay(std::num::ParseFloatError),
120}
121
122impl FromStr for RetryAfter {
123    type Err = InvalidRetryAfter;
124
125    fn from_str(s: &str) -> Result<Self, Self::Err> {
126        let float = s.parse::<f64>().map_err(InvalidRetryAfter::InvalidDelay)?;
127        let seconds = float.max(0.0).ceil() as u64;
128        Ok(RetryAfter::from_secs(seconds))
129    }
130}
131
132/// The scope that a rate limit applies to.
133///
134/// Unlike [`QuotaScope`], which only declares the class of the scope, this enum carries
135/// the specific identifiers of the individual scopes that a rate limit applies to.
136///
137/// Rate limits can be applied at different levels of granularity, from global (affecting all data)
138/// down to a specific project key.
139#[derive(Clone, Debug, Eq, PartialEq)]
140#[cfg_attr(test, derive(serde::Serialize))]
141pub enum RateLimitScope {
142    /// Global scope.
143    Global,
144    /// An organization with identifier.
145    Organization(OrganizationId),
146    /// A project with identifier.
147    Project(ProjectId),
148    /// A DSN public key.
149    Key(ProjectKey),
150}
151
152impl RateLimitScope {
153    /// Creates a rate limiting scope from the given item scoping for a specific quota.
154    ///
155    /// This extracts the appropriate scope identifier based on the quota's scope type.
156    /// For unknown scopes, it assumes the most specific scope (Key).
157    pub fn for_quota(scoping: Scoping, scope: QuotaScope) -> Self {
158        match scope {
159            QuotaScope::Global => Self::Global,
160            QuotaScope::Organization => Self::Organization(scoping.organization_id),
161            QuotaScope::Project => Self::Project(scoping.project_id),
162            QuotaScope::Key => Self::Key(scoping.project_key),
163            // For unknown scopes, assume the most specific scope:
164            QuotaScope::Unknown => Self::Key(scoping.project_key),
165        }
166    }
167
168    /// Returns the canonical name of this scope.
169    ///
170    /// This corresponds to the name of the corresponding [`QuotaScope`].
171    pub fn name(&self) -> &'static str {
172        match *self {
173            Self::Global => QuotaScope::Global.name(),
174            Self::Key(_) => QuotaScope::Key.name(),
175            Self::Project(_) => QuotaScope::Project.name(),
176            Self::Organization(_) => QuotaScope::Organization.name(),
177        }
178    }
179}
180
181/// An active rate limit that restricts data ingestion.
182///
183/// A rate limit defines restrictions for specific data categories within a particular scope.
184/// It includes an expiration time after which the limit is no longer enforced.
185///
186/// Rate limits can be created from [`Quota`]s or directly constructed with the needed parameters.
187#[derive(Clone, Debug, PartialEq)]
188#[cfg_attr(test, derive(serde::Serialize))]
189pub struct RateLimit {
190    /// A set of data categories that this quota applies to. If empty, this rate limit
191    /// applies to all data categories.
192    pub categories: DataCategories,
193
194    /// The scope of this rate limit.
195    pub scope: RateLimitScope,
196
197    /// A machine-readable reason indicating which quota caused this rate limit.
198    pub reason_code: Option<ReasonCode>,
199
200    /// A marker when this rate limit expires.
201    pub retry_after: RetryAfter,
202
203    /// The metric namespace of this rate limit.
204    ///
205    /// Only relevant for data categories of type metric bucket. If empty,
206    /// this rate limit applies to metrics of all namespaces.
207    pub namespaces: SmallVec<[MetricNamespace; 1]>,
208}
209
210impl RateLimit {
211    /// Creates a new rate limit from the given [`Quota`].
212    ///
213    /// This builds a rate limit with the appropriate scope derived from the quota and scoping
214    /// information. The categories and other properties are copied from the quota.
215    pub fn from_quota(quota: &Quota, scoping: Scoping, retry_after: RetryAfter) -> Self {
216        Self {
217            categories: quota.categories.clone(),
218            scope: RateLimitScope::for_quota(scoping, quota.scope),
219            reason_code: quota.reason_code.clone(),
220            retry_after,
221            namespaces: quota.namespace.into_iter().collect(),
222        }
223    }
224
225    /// Checks whether this rate limit applies to the given item.
226    ///
227    /// A rate limit applies if its scope matches the item's scope, and the item's
228    /// category and namespace match those of the rate limit.
229    pub fn matches(&self, scoping: ItemScoping) -> bool {
230        self.matches_scope(scoping)
231            && scoping.matches_categories(&self.categories)
232            && scoping.matches_namespaces(&self.namespaces)
233    }
234
235    /// Returns `true` if the rate limiting scope matches the given item.
236    fn matches_scope(&self, scoping: ItemScoping) -> bool {
237        match self.scope {
238            RateLimitScope::Global => true,
239            RateLimitScope::Organization(org_id) => scoping.organization_id == org_id,
240            RateLimitScope::Project(project_id) => scoping.project_id == project_id,
241            RateLimitScope::Key(key) => scoping.project_key == key,
242        }
243    }
244}
245
246/// A collection of scoped rate limits.
247///
248/// [`RateLimits`] manages a set of active rate limits that can be checked against
249/// incoming data.
250///
251/// The collection can be empty, indicated by [`is_ok`](Self::is_ok), meaning no rate limits
252/// are currently active.
253#[derive(Clone, Debug, Default)]
254#[cfg_attr(test, derive(serde::Serialize))]
255pub struct RateLimits {
256    limits: Vec<RateLimit>,
257}
258
259impl RateLimits {
260    /// Creates an empty [`RateLimits`] instance.
261    pub fn new() -> Self {
262        Self::default()
263    }
264
265    /// Adds a limit to this collection.
266    ///
267    /// If a rate limit with an overlapping scope already exists, the `retry_after` count is merged
268    /// with the existing limit. Otherwise, the new rate limit is added.
269    pub fn add(&mut self, limit: RateLimit) {
270        let limit_opt = self.limits.iter_mut().find(|l| {
271            let RateLimit {
272                categories,
273                scope,
274                reason_code: _,
275                retry_after: _,
276                namespaces: namespace,
277            } = &limit;
278
279            *categories == l.categories && *scope == l.scope && *namespace == l.namespaces
280        });
281
282        match limit_opt {
283            None => self.limits.push(limit),
284            Some(existing) if existing.retry_after < limit.retry_after => *existing = limit,
285            Some(_) => (), // keep existing, longer limit
286        }
287    }
288
289    /// Merges all limits from another [`RateLimits`] instance into this one.
290    ///
291    /// This keeps all existing rate limits, adds new ones, and updates any existing ones
292    /// with a later expiration time. The resulting collection contains the maximum
293    /// constraints from both instances.
294    pub fn merge(&mut self, limits: Self) {
295        for limit in limits {
296            self.add(limit);
297        }
298    }
299
300    /// Merges all limits from another [`RateLimits`] with this one.
301    ///
302    /// See also: [`Self::merge`].
303    pub fn merge_with(mut self, other: Self) -> Self {
304        self.merge(other);
305        self
306    }
307
308    /// Returns `true` if this instance contains no active limits.
309    ///
310    /// This is the opposite of [`is_limited`](Self::is_limited).
311    pub fn is_ok(&self) -> bool {
312        !self.is_limited()
313    }
314
315    /// Returns `true` if this instance contains any active rate limits.
316    ///
317    /// A rate limit is considered active if it has not yet expired.
318    pub fn is_limited(&self) -> bool {
319        let now = Instant::now();
320        self.iter().any(|limit| !limit.retry_after.expired_at(now))
321    }
322
323    /// Returns `true` if this is instance contains any active rate limits
324    /// for the specified categories.
325    ///
326    /// A rate limit is considered active if it has not yet expired.
327    pub fn is_any_limited(&self, scopings: &[ItemScoping]) -> bool {
328        self.is_any_limited_with_quotas(&[], scopings)
329    }
330
331    /// Returns `true` if this is instance contains any active rate limits
332    /// for the specified categories.
333    ///
334    /// This is similar to [`Self::is_limited`], but additionally checks for quotas with a static
335    /// limit of `0`, which reject items even if there is no active rate limit in this instance.
336    ///
337    /// A rate limit is considered active if it has not yet expired.
338    pub fn is_any_limited_with_quotas<'a>(
339        &self,
340        quotas: impl IntoIterator<Item = &'a Quota>,
341        scopings: &[ItemScoping],
342    ) -> bool {
343        for quota in quotas {
344            for scoping in scopings {
345                if quota.limit == Some(0) && quota.matches(*scoping) {
346                    return true;
347                }
348            }
349        }
350
351        let now = Instant::now();
352        for scoping in scopings {
353            for limit in &self.limits {
354                if limit.matches(*scoping) && !limit.retry_after.expired_at(now) {
355                    return true;
356                }
357            }
358        }
359
360        false
361    }
362
363    /// Removes expired rate limits from this instance.
364    ///
365    /// This is useful for cleaning up rate limits that are no longer relevant,
366    /// reducing memory usage and improving performance of subsequent operations.
367    pub fn clean_expired(&mut self, now: Instant) {
368        self.limits
369            .retain(|limit| !limit.retry_after.expired_at(now));
370    }
371
372    /// Checks whether any rate limits apply to the given scoping.
373    ///
374    /// Returns a new [`RateLimits`] instance containing only the rate limits that match
375    /// the provided [`ItemScoping`]. If no limits match, the returned instance will be empty
376    /// and [`is_ok`](Self::is_ok) will return `true`.
377    pub fn check(&self, scoping: ItemScoping) -> Self {
378        self.check_with_quotas(&[], scoping)
379    }
380
381    /// Checks whether any rate limits or static quotas apply to the given scoping.
382    ///
383    /// This is similar to [`check`](Self::check), but additionally checks for quotas with a static
384    /// limit of `0`, which reject items even if there is no active rate limit in this instance.
385    ///
386    /// Returns a new [`RateLimits`] instance containing the rate limits that match the provided
387    /// [`ItemScoping`]. If no limits or quotas match, the returned instance will be empty.
388    pub fn check_with_quotas<'a>(
389        &self,
390        quotas: impl IntoIterator<Item = &'a Quota>,
391        scoping: ItemScoping,
392    ) -> Self {
393        let mut applied_limits = Self::new();
394
395        for quota in quotas {
396            if quota.limit == Some(0) && quota.matches(scoping) {
397                let retry_after = RetryAfter::from_secs(REJECT_ALL_SECS);
398                applied_limits.add(RateLimit::from_quota(quota, *scoping, retry_after));
399            }
400        }
401
402        for limit in &self.limits {
403            if limit.matches(scoping) {
404                applied_limits.add(limit.clone());
405            }
406        }
407
408        applied_limits
409    }
410
411    /// Returns an iterator over all rate limits in this collection.
412    pub fn iter(&self) -> RateLimitsIter<'_> {
413        RateLimitsIter {
414            iter: self.limits.iter(),
415        }
416    }
417
418    /// Returns the rate limit with the latest expiration time.
419    ///
420    /// If multiple rate limits have the same expiration time, any of them may be returned.
421    /// If the collection is empty, returns `None`.
422    pub fn longest(&self) -> Option<&RateLimit> {
423        self.iter().max_by_key(|limit| limit.retry_after)
424    }
425
426    /// Returns `true` if there are no rate limits in this collection.
427    ///
428    /// Note that an empty collection is not the same as having no active limits.
429    /// Use [`is_ok`](Self::is_ok) to check if there are no active limits.
430    pub fn is_empty(&self) -> bool {
431        self.limits.is_empty()
432    }
433}
434
435/// An iterator over rate limit references.
436///
437/// This struct is created by the [`iter`](RateLimits::iter) method on [`RateLimits`].
438/// It yields shared references to the rate limits in the collection.
439pub struct RateLimitsIter<'a> {
440    iter: std::slice::Iter<'a, RateLimit>,
441}
442
443impl<'a> Iterator for RateLimitsIter<'a> {
444    type Item = &'a RateLimit;
445
446    fn next(&mut self) -> Option<Self::Item> {
447        self.iter.next()
448    }
449}
450
451impl IntoIterator for RateLimits {
452    type IntoIter = RateLimitsIntoIter;
453    type Item = RateLimit;
454
455    fn into_iter(self) -> Self::IntoIter {
456        RateLimitsIntoIter {
457            iter: self.limits.into_iter(),
458        }
459    }
460}
461
462/// An iterator that consumes a [`RateLimits`] collection.
463///
464/// This struct is created by the `into_iter` method on [`RateLimits`], provided by the
465/// [`IntoIterator`] trait. It yields owned rate limits by value.
466pub struct RateLimitsIntoIter {
467    iter: std::vec::IntoIter<RateLimit>,
468}
469
470impl Iterator for RateLimitsIntoIter {
471    type Item = RateLimit;
472
473    fn next(&mut self) -> Option<Self::Item> {
474        self.iter.next()
475    }
476}
477
478impl<'a> IntoIterator for &'a RateLimits {
479    type IntoIter = RateLimitsIter<'a>;
480    type Item = &'a RateLimit;
481
482    fn into_iter(self) -> Self::IntoIter {
483        self.iter()
484    }
485}
486
487/// A thread-safe cache of rate limits with automatic expiration handling.
488///
489/// [`CachedRateLimits`] wraps a [`RateLimits`] collection with a mutex to allow safe
490/// concurrent access from multiple threads. It automatically removes expired rate limits
491/// when retrieving the current limits.
492///
493/// This is useful for maintaining a shared set of rate limits across multiple
494/// processing threads or tasks.
495#[derive(Debug, Default)]
496pub struct CachedRateLimits(Mutex<Arc<RateLimits>>);
497
498impl CachedRateLimits {
499    /// Creates a new, empty instance without any rate limits.
500    pub fn new() -> Self {
501        Self::default()
502    }
503
504    /// Adds a rate limit to this collection.
505    ///
506    /// This is a thread-safe wrapper around [`RateLimits::add`].
507    pub fn add(&self, limit: RateLimit) {
508        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
509        let current = Arc::make_mut(&mut inner);
510        current.add(limit);
511    }
512
513    /// Merges rate limits from another collection into this one.
514    ///
515    /// This is a thread-safe wrapper around [`RateLimits::merge`].
516    pub fn merge(&self, limits: RateLimits) {
517        if limits.is_empty() {
518            return;
519        }
520
521        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
522        let current = Arc::make_mut(&mut inner);
523        for mut limit in limits {
524            // To spice it up, we do have some special casing here for 'inherited categories',
525            // e.g. spans and transactions.
526            //
527            // The tldr; is, as transactions are just containers for spans,
528            // we can enforce span limits on transactions but also vice versa.
529            //
530            // So this is largely an enforcement problem, but since Relay propagates
531            // rate limits to clients, we clone the limits with the inherited category.
532            // This ensures old SDKs rate limit correctly, but also it simplifies client
533            // implementations. Only Relay needs to make this decision.
534            for i in 0..limit.categories.len() {
535                let Some(category) = limit.categories.get(i) else {
536                    debug_assert!(false, "logical error");
537                    break;
538                };
539
540                for inherited in inherited_categories(category) {
541                    if let Some(categories) = limit.categories.add(*inherited) {
542                        limit.categories = categories;
543                    }
544                }
545            }
546
547            current.add(limit);
548        }
549    }
550
551    /// Returns a reference to the current rate limits.
552    ///
553    /// This method automatically removes any expired rate limits before returning,
554    /// ensuring that only active limits are included in the result.
555    pub fn current_limits(&self) -> Arc<RateLimits> {
556        let now = Instant::now();
557        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
558        Arc::make_mut(&mut inner).clean_expired(now);
559        Arc::clone(&inner)
560    }
561}
562
563/// Returns inherited rate limit categories for the passed category.
564///
565/// When a rate limit for a category can also be enforced in a different category,
566/// then it's an inherited category.
567///
568/// For example, a transaction rate limit can also be applied to spans and vice versa.
569///
570/// For a detailed explanation on span/transaction enforcement see:
571/// <https://develop.sentry.dev/ingestion/relay/transaction-span-ratelimits/>.
572fn inherited_categories(category: &DataCategory) -> &'static [DataCategory] {
573    match category {
574        DataCategory::Transaction => &[DataCategory::Span],
575        DataCategory::Span => &[DataCategory::Transaction],
576        _ => &[],
577    }
578}
579
580#[cfg(test)]
581mod tests {
582    use smallvec::smallvec;
583
584    use super::*;
585    use crate::MetricNamespaceScoping;
586    use crate::quota::DataCategory;
587
588    #[test]
589    fn test_parse_retry_after() {
590        // positive float always rounds up to the next integer
591        let retry_after = "17.1".parse::<RetryAfter>().expect("parse RetryAfter");
592        assert_eq!(retry_after.remaining_seconds(), 18);
593        assert!(!retry_after.expired());
594        let retry_after = "17.7".parse::<RetryAfter>().expect("parse RetryAfter");
595        assert_eq!(retry_after.remaining_seconds(), 18);
596        assert!(!retry_after.expired());
597
598        // positive int
599        let retry_after = "17".parse::<RetryAfter>().expect("parse RetryAfter");
600        assert_eq!(retry_after.remaining_seconds(), 17);
601        assert!(!retry_after.expired());
602
603        // negative numbers are treated as zero
604        let retry_after = "-2".parse::<RetryAfter>().expect("parse RetryAfter");
605        assert_eq!(retry_after.remaining_seconds(), 0);
606        assert!(retry_after.expired());
607        let retry_after = "-inf".parse::<RetryAfter>().expect("parse RetryAfter");
608        assert_eq!(retry_after.remaining_seconds(), 0);
609        assert!(retry_after.expired());
610
611        // inf and NaN are valid input and treated as zero
612        let retry_after = "inf".parse::<RetryAfter>().expect("parse RetryAfter");
613        assert_eq!(retry_after.remaining_seconds(), 0);
614        assert!(retry_after.expired());
615        let retry_after = "NaN".parse::<RetryAfter>().expect("parse RetryAfter");
616        assert_eq!(retry_after.remaining_seconds(), 0);
617        assert!(retry_after.expired());
618
619        // large inputs that would overflow are treated as zero
620        let retry_after = "100000000000000000000"
621            .parse::<RetryAfter>()
622            .expect("parse RetryAfter");
623        assert_eq!(retry_after.remaining_seconds(), 0);
624        assert!(retry_after.expired());
625
626        // invalid strings cause parse error
627        "".parse::<RetryAfter>().expect_err("error RetryAfter");
628        "nope".parse::<RetryAfter>().expect_err("error RetryAfter");
629        " 2 ".parse::<RetryAfter>().expect_err("error RetryAfter");
630        "6 0".parse::<RetryAfter>().expect_err("error RetryAfter");
631    }
632
633    #[test]
634    fn test_rate_limit_matches_categories() {
635        let rate_limit = RateLimit {
636            categories: [DataCategory::Unknown, DataCategory::Error].into(),
637            scope: RateLimitScope::Organization(OrganizationId::new(42)),
638            reason_code: None,
639            retry_after: RetryAfter::from_secs(1),
640            namespaces: smallvec![],
641        };
642
643        assert!(rate_limit.matches(ItemScoping {
644            category: DataCategory::Error,
645            scoping: Scoping {
646                organization_id: OrganizationId::new(42),
647                project_id: ProjectId::new(21),
648                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
649                key_id: None,
650            },
651            namespace: MetricNamespaceScoping::None,
652        }));
653
654        assert!(!rate_limit.matches(ItemScoping {
655            category: DataCategory::Transaction,
656            scoping: Scoping {
657                organization_id: OrganizationId::new(42),
658                project_id: ProjectId::new(21),
659                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
660                key_id: None,
661            },
662            namespace: MetricNamespaceScoping::None,
663        }));
664    }
665
666    #[test]
667    fn test_rate_limit_matches_organization() {
668        let rate_limit = RateLimit {
669            categories: DataCategories::new(),
670            scope: RateLimitScope::Organization(OrganizationId::new(42)),
671            reason_code: None,
672            retry_after: RetryAfter::from_secs(1),
673            namespaces: smallvec![],
674        };
675
676        assert!(rate_limit.matches(ItemScoping {
677            category: DataCategory::Error,
678            scoping: Scoping {
679                organization_id: OrganizationId::new(42),
680                project_id: ProjectId::new(21),
681                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
682                key_id: None,
683            },
684            namespace: MetricNamespaceScoping::None,
685        }));
686
687        assert!(!rate_limit.matches(ItemScoping {
688            category: DataCategory::Error,
689            scoping: Scoping {
690                organization_id: OrganizationId::new(0),
691                project_id: ProjectId::new(21),
692                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
693                key_id: None,
694            },
695            namespace: MetricNamespaceScoping::None,
696        }));
697    }
698
699    #[test]
700    fn test_rate_limit_matches_project() {
701        let rate_limit = RateLimit {
702            categories: DataCategories::new(),
703            scope: RateLimitScope::Project(ProjectId::new(21)),
704            reason_code: None,
705            retry_after: RetryAfter::from_secs(1),
706            namespaces: smallvec![],
707        };
708
709        assert!(rate_limit.matches(ItemScoping {
710            category: DataCategory::Error,
711            scoping: Scoping {
712                organization_id: OrganizationId::new(42),
713                project_id: ProjectId::new(21),
714                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
715                key_id: None,
716            },
717            namespace: MetricNamespaceScoping::None,
718        }));
719
720        assert!(!rate_limit.matches(ItemScoping {
721            category: DataCategory::Error,
722            scoping: Scoping {
723                organization_id: OrganizationId::new(42),
724                project_id: ProjectId::new(0),
725                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
726                key_id: None,
727            },
728            namespace: MetricNamespaceScoping::None,
729        }));
730    }
731
732    #[test]
733    fn test_rate_limit_matches_namespaces() {
734        let rate_limit = RateLimit {
735            categories: Default::default(),
736            scope: RateLimitScope::Organization(OrganizationId::new(42)),
737            reason_code: None,
738            retry_after: RetryAfter::from_secs(1),
739            namespaces: smallvec![MetricNamespace::Custom],
740        };
741
742        let scoping = Scoping {
743            organization_id: OrganizationId::new(42),
744            project_id: ProjectId::new(21),
745            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
746            key_id: None,
747        };
748
749        assert!(rate_limit.matches(ItemScoping {
750            category: DataCategory::MetricBucket,
751            scoping,
752            namespace: MetricNamespaceScoping::Some(MetricNamespace::Custom),
753        }));
754
755        assert!(!rate_limit.matches(ItemScoping {
756            category: DataCategory::MetricBucket,
757            scoping,
758            namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
759        }));
760
761        let general_rate_limit = RateLimit {
762            categories: Default::default(),
763            scope: RateLimitScope::Organization(OrganizationId::new(42)),
764            reason_code: None,
765            retry_after: RetryAfter::from_secs(1),
766            namespaces: smallvec![], // all namespaces
767        };
768
769        assert!(general_rate_limit.matches(ItemScoping {
770            category: DataCategory::MetricBucket,
771            scoping,
772            namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
773        }));
774
775        assert!(general_rate_limit.matches(ItemScoping {
776            category: DataCategory::MetricBucket,
777            scoping,
778            namespace: MetricNamespaceScoping::None,
779        }));
780    }
781
782    #[test]
783    fn test_rate_limit_matches_key() {
784        let rate_limit = RateLimit {
785            categories: DataCategories::new(),
786            scope: RateLimitScope::Key(
787                ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
788            ),
789            reason_code: None,
790            retry_after: RetryAfter::from_secs(1),
791            namespaces: smallvec![],
792        };
793
794        assert!(rate_limit.matches(ItemScoping {
795            category: DataCategory::Error,
796            scoping: Scoping {
797                organization_id: OrganizationId::new(42),
798                project_id: ProjectId::new(21),
799                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
800                key_id: None,
801            },
802            namespace: MetricNamespaceScoping::None,
803        }));
804
805        assert!(!rate_limit.matches(ItemScoping {
806            category: DataCategory::Error,
807            scoping: Scoping {
808                organization_id: OrganizationId::new(0),
809                project_id: ProjectId::new(21),
810                project_key: ProjectKey::parse("deadbeefdeadbeefdeadbeefdeadbeef").unwrap(),
811                key_id: None,
812            },
813            namespace: MetricNamespaceScoping::None,
814        }));
815    }
816
817    #[test]
818    fn test_rate_limits_add_replacement() {
819        let mut rate_limits = RateLimits::new();
820
821        rate_limits.add(RateLimit {
822            categories: [DataCategory::Default, DataCategory::Error].into(),
823            scope: RateLimitScope::Organization(OrganizationId::new(42)),
824            reason_code: Some(ReasonCode::new("first")),
825            retry_after: RetryAfter::from_secs(1),
826            namespaces: smallvec![],
827        });
828
829        // longer rate limit shadows shorter one
830        rate_limits.add(RateLimit {
831            categories: [DataCategory::Error, DataCategory::Default].into(),
832            scope: RateLimitScope::Organization(OrganizationId::new(42)),
833            reason_code: Some(ReasonCode::new("second")),
834            retry_after: RetryAfter::from_secs(10),
835            namespaces: smallvec![],
836        });
837
838        insta::assert_ron_snapshot!(rate_limits, @r#"
839        RateLimits(
840          limits: [
841            RateLimit(
842              categories: [
843                "default",
844                "error",
845              ],
846              scope: Organization(OrganizationId(42)),
847              reason_code: Some(ReasonCode("second")),
848              retry_after: RetryAfter(10),
849              namespaces: [],
850            ),
851          ],
852        )
853        "#);
854    }
855
856    #[test]
857    fn test_rate_limits_add_shadowing() {
858        let mut rate_limits = RateLimits::new();
859
860        rate_limits.add(RateLimit {
861            categories: [DataCategory::Default, DataCategory::Error].into(),
862            scope: RateLimitScope::Organization(OrganizationId::new(42)),
863            reason_code: Some(ReasonCode::new("first")),
864            retry_after: RetryAfter::from_secs(10),
865            namespaces: smallvec![],
866        });
867
868        // shorter rate limit is shadowed by existing one
869        rate_limits.add(RateLimit {
870            categories: [DataCategory::Error, DataCategory::Default].into(),
871            scope: RateLimitScope::Organization(OrganizationId::new(42)),
872            reason_code: Some(ReasonCode::new("second")),
873            retry_after: RetryAfter::from_secs(1),
874            namespaces: smallvec![],
875        });
876
877        insta::assert_ron_snapshot!(rate_limits, @r#"
878        RateLimits(
879          limits: [
880            RateLimit(
881              categories: [
882                "default",
883                "error",
884              ],
885              scope: Organization(OrganizationId(42)),
886              reason_code: Some(ReasonCode("first")),
887              retry_after: RetryAfter(10),
888              namespaces: [],
889            ),
890          ],
891        )
892        "#);
893    }
894
895    #[test]
896    fn test_rate_limits_add_buckets() {
897        let mut rate_limits = RateLimits::new();
898
899        rate_limits.add(RateLimit {
900            categories: [DataCategory::Error].into(),
901            scope: RateLimitScope::Organization(OrganizationId::new(42)),
902            reason_code: None,
903            retry_after: RetryAfter::from_secs(1),
904            namespaces: smallvec![],
905        });
906
907        // Same scope but different categories
908        rate_limits.add(RateLimit {
909            categories: [DataCategory::Transaction].into(),
910            scope: RateLimitScope::Organization(OrganizationId::new(42)),
911            reason_code: None,
912            retry_after: RetryAfter::from_secs(1),
913            namespaces: smallvec![],
914        });
915
916        // Same categories but different scope
917        rate_limits.add(RateLimit {
918            categories: [DataCategory::Error].into(),
919            scope: RateLimitScope::Project(ProjectId::new(21)),
920            reason_code: None,
921            retry_after: RetryAfter::from_secs(1),
922            namespaces: smallvec![],
923        });
924
925        insta::assert_ron_snapshot!(rate_limits, @r#"
926        RateLimits(
927          limits: [
928            RateLimit(
929              categories: [
930                "error",
931              ],
932              scope: Organization(OrganizationId(42)),
933              reason_code: None,
934              retry_after: RetryAfter(1),
935              namespaces: [],
936            ),
937            RateLimit(
938              categories: [
939                "transaction",
940              ],
941              scope: Organization(OrganizationId(42)),
942              reason_code: None,
943              retry_after: RetryAfter(1),
944              namespaces: [],
945            ),
946            RateLimit(
947              categories: [
948                "error",
949              ],
950              scope: Project(ProjectId(21)),
951              reason_code: None,
952              retry_after: RetryAfter(1),
953              namespaces: [],
954            ),
955          ],
956        )
957        "#);
958    }
959
960    /// Regression test that ensures namespaces are correctly added to rate limits.
961    #[test]
962    fn test_rate_limits_add_namespaces() {
963        let mut rate_limits = RateLimits::new();
964
965        rate_limits.add(RateLimit {
966            categories: [DataCategory::MetricBucket].into(),
967            scope: RateLimitScope::Organization(OrganizationId::new(42)),
968            reason_code: None,
969            retry_after: RetryAfter::from_secs(1),
970            namespaces: smallvec![MetricNamespace::Custom],
971        });
972
973        // Same category but different namespaces
974        rate_limits.add(RateLimit {
975            categories: [DataCategory::MetricBucket].into(),
976            scope: RateLimitScope::Organization(OrganizationId::new(42)),
977            reason_code: None,
978            retry_after: RetryAfter::from_secs(1),
979            namespaces: smallvec![MetricNamespace::Spans],
980        });
981
982        insta::assert_ron_snapshot!(rate_limits, @r#"
983        RateLimits(
984          limits: [
985            RateLimit(
986              categories: [
987                "metric_bucket",
988              ],
989              scope: Organization(OrganizationId(42)),
990              reason_code: None,
991              retry_after: RetryAfter(1),
992              namespaces: [
993                "custom",
994              ],
995            ),
996            RateLimit(
997              categories: [
998                "metric_bucket",
999              ],
1000              scope: Organization(OrganizationId(42)),
1001              reason_code: None,
1002              retry_after: RetryAfter(1),
1003              namespaces: [
1004                "spans",
1005              ],
1006            ),
1007          ],
1008        )
1009        "#);
1010    }
1011
1012    #[test]
1013    fn test_rate_limits_longest() {
1014        let mut rate_limits = RateLimits::new();
1015
1016        rate_limits.add(RateLimit {
1017            categories: [DataCategory::Error].into(),
1018            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1019            reason_code: Some(ReasonCode::new("first")),
1020            retry_after: RetryAfter::from_secs(1),
1021            namespaces: smallvec![],
1022        });
1023
1024        // Distinct scope to prevent deduplication
1025        rate_limits.add(RateLimit {
1026            categories: [DataCategory::Transaction].into(),
1027            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1028            reason_code: Some(ReasonCode::new("second")),
1029            retry_after: RetryAfter::from_secs(10),
1030            namespaces: smallvec![],
1031        });
1032
1033        let rate_limit = rate_limits.longest().unwrap();
1034        insta::assert_ron_snapshot!(rate_limit, @r#"
1035        RateLimit(
1036          categories: [
1037            "transaction",
1038          ],
1039          scope: Organization(OrganizationId(42)),
1040          reason_code: Some(ReasonCode("second")),
1041          retry_after: RetryAfter(10),
1042          namespaces: [],
1043        )
1044        "#);
1045    }
1046
1047    #[test]
1048    fn test_rate_limits_clean_expired() {
1049        let mut rate_limits = RateLimits::new();
1050
1051        // Active error limit
1052        rate_limits.add(RateLimit {
1053            categories: [DataCategory::Error].into(),
1054            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1055            reason_code: None,
1056            retry_after: RetryAfter::from_secs(1),
1057            namespaces: smallvec![],
1058        });
1059
1060        // Inactive error limit with distinct scope
1061        rate_limits.add(RateLimit {
1062            categories: [DataCategory::Error].into(),
1063            scope: RateLimitScope::Project(ProjectId::new(21)),
1064            reason_code: None,
1065            retry_after: RetryAfter::from_secs(0),
1066            namespaces: smallvec![],
1067        });
1068
1069        // Sanity check before running `clean_expired`
1070        assert_eq!(rate_limits.iter().count(), 2);
1071
1072        rate_limits.clean_expired(Instant::now());
1073
1074        // Check that the expired limit has been removed
1075        insta::assert_ron_snapshot!(rate_limits, @r#"
1076        RateLimits(
1077          limits: [
1078            RateLimit(
1079              categories: [
1080                "error",
1081              ],
1082              scope: Organization(OrganizationId(42)),
1083              reason_code: None,
1084              retry_after: RetryAfter(1),
1085              namespaces: [],
1086            ),
1087          ],
1088        )
1089        "#);
1090    }
1091
1092    #[test]
1093    fn test_rate_limits_is_any_limited() {
1094        let mut rate_limits = RateLimits::new();
1095
1096        // Active error limit
1097        rate_limits.add(RateLimit {
1098            categories: [DataCategory::Error].into(),
1099            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1100            reason_code: None,
1101            retry_after: RetryAfter::from_secs(1),
1102            namespaces: smallvec![],
1103        });
1104
1105        // Active transaction limit
1106        rate_limits.add(RateLimit {
1107            categories: [DataCategory::Transaction].into(),
1108            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1109            reason_code: None,
1110            retry_after: RetryAfter::from_secs(1),
1111            namespaces: smallvec![],
1112        });
1113
1114        let scoping = Scoping {
1115            organization_id: OrganizationId::new(42),
1116            project_id: ProjectId::new(21),
1117            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1118            key_id: None,
1119        };
1120
1121        let quotas = &[Quota {
1122            id: None,
1123            categories: [DataCategory::Attachment].into(),
1124            scope: QuotaScope::Organization,
1125            scope_id: Some("42".into()),
1126            limit: Some(0),
1127            window: None,
1128            reason_code: Some(ReasonCode::new("zero")),
1129            namespace: None,
1130        }];
1131
1132        assert!(
1133            rate_limits.is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Error)])
1134        );
1135        assert!(
1136            rate_limits
1137                .is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Attachment)])
1138        );
1139        assert!(rate_limits.is_any_limited_with_quotas(
1140            quotas,
1141            &[
1142                scoping.item(DataCategory::Replay),
1143                scoping.item(DataCategory::Error)
1144            ]
1145        ));
1146
1147        assert!(
1148            !rate_limits.is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Replay)])
1149        );
1150    }
1151
1152    #[test]
1153    fn test_rate_limits_check() {
1154        let mut rate_limits = RateLimits::new();
1155
1156        // Active error limit
1157        rate_limits.add(RateLimit {
1158            categories: [DataCategory::Error].into(),
1159            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1160            reason_code: None,
1161            retry_after: RetryAfter::from_secs(1),
1162            namespaces: smallvec![],
1163        });
1164
1165        // Active transaction limit
1166        rate_limits.add(RateLimit {
1167            categories: [DataCategory::Transaction].into(),
1168            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1169            reason_code: None,
1170            retry_after: RetryAfter::from_secs(1),
1171            namespaces: smallvec![],
1172        });
1173
1174        let applied_limits = rate_limits.check(ItemScoping {
1175            category: DataCategory::Error,
1176            scoping: Scoping {
1177                organization_id: OrganizationId::new(42),
1178                project_id: ProjectId::new(21),
1179                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1180                key_id: None,
1181            },
1182            namespace: MetricNamespaceScoping::None,
1183        });
1184
1185        // Check that the error limit is applied
1186        insta::assert_ron_snapshot!(applied_limits, @r#"
1187        RateLimits(
1188          limits: [
1189            RateLimit(
1190              categories: [
1191                "error",
1192              ],
1193              scope: Organization(OrganizationId(42)),
1194              reason_code: None,
1195              retry_after: RetryAfter(1),
1196              namespaces: [],
1197            ),
1198          ],
1199        )
1200        "#);
1201    }
1202
1203    #[test]
1204    fn test_rate_limits_check_quotas() {
1205        let mut rate_limits = RateLimits::new();
1206
1207        // Active error limit
1208        rate_limits.add(RateLimit {
1209            categories: [DataCategory::Error].into(),
1210            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1211            reason_code: None,
1212            retry_after: RetryAfter::from_secs(1),
1213            namespaces: smallvec![],
1214        });
1215
1216        // Active transaction limit
1217        rate_limits.add(RateLimit {
1218            categories: [DataCategory::Transaction].into(),
1219            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1220            reason_code: None,
1221            retry_after: RetryAfter::from_secs(1),
1222            namespaces: smallvec![],
1223        });
1224
1225        let item_scoping = ItemScoping {
1226            category: DataCategory::Error,
1227            scoping: Scoping {
1228                organization_id: OrganizationId::new(42),
1229                project_id: ProjectId::new(21),
1230                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1231                key_id: None,
1232            },
1233            namespace: MetricNamespaceScoping::None,
1234        };
1235
1236        let quotas = &[Quota {
1237            id: None,
1238            categories: [DataCategory::Error].into(),
1239            scope: QuotaScope::Organization,
1240            scope_id: Some("42".into()),
1241            limit: Some(0),
1242            window: None,
1243            reason_code: Some(ReasonCode::new("zero")),
1244            namespace: None,
1245        }];
1246
1247        let applied_limits = rate_limits.check_with_quotas(quotas, item_scoping);
1248
1249        insta::assert_ron_snapshot!(applied_limits, @r#"
1250        RateLimits(
1251          limits: [
1252            RateLimit(
1253              categories: [
1254                "error",
1255              ],
1256              scope: Organization(OrganizationId(42)),
1257              reason_code: Some(ReasonCode("zero")),
1258              retry_after: RetryAfter(60),
1259              namespaces: [],
1260            ),
1261          ],
1262        )
1263        "#);
1264    }
1265
1266    #[test]
1267    fn test_rate_limits_merge() {
1268        let mut rate_limits1 = RateLimits::new();
1269        let mut rate_limits2 = RateLimits::new();
1270
1271        rate_limits1.add(RateLimit {
1272            categories: [DataCategory::Error].into(),
1273            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1274            reason_code: Some(ReasonCode::new("first")),
1275            retry_after: RetryAfter::from_secs(1),
1276            namespaces: smallvec![],
1277        });
1278
1279        rate_limits1.add(RateLimit {
1280            categories: [DataCategory::TransactionIndexed].into(),
1281            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1282            reason_code: None,
1283            retry_after: RetryAfter::from_secs(1),
1284            namespaces: smallvec![],
1285        });
1286
1287        rate_limits2.add(RateLimit {
1288            categories: [DataCategory::Error].into(),
1289            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1290            reason_code: Some(ReasonCode::new("second")),
1291            retry_after: RetryAfter::from_secs(10),
1292            namespaces: smallvec![],
1293        });
1294
1295        rate_limits1.merge(rate_limits2);
1296
1297        insta::assert_ron_snapshot!(rate_limits1, @r#"
1298        RateLimits(
1299          limits: [
1300            RateLimit(
1301              categories: [
1302                "error",
1303              ],
1304              scope: Organization(OrganizationId(42)),
1305              reason_code: Some(ReasonCode("second")),
1306              retry_after: RetryAfter(10),
1307              namespaces: [],
1308            ),
1309            RateLimit(
1310              categories: [
1311                "transaction_indexed",
1312              ],
1313              scope: Organization(OrganizationId(42)),
1314              reason_code: None,
1315              retry_after: RetryAfter(1),
1316              namespaces: [],
1317            ),
1318          ],
1319        )
1320        "#);
1321    }
1322
1323    #[test]
1324    fn test_cached_rate_limits_expired() {
1325        let cached = CachedRateLimits::new();
1326
1327        // Active error limit
1328        cached.add(RateLimit {
1329            categories: [DataCategory::Error].into(),
1330            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1331            reason_code: None,
1332            retry_after: RetryAfter::from_secs(1),
1333            namespaces: smallvec![],
1334        });
1335
1336        // Inactive error limit with distinct scope
1337        cached.add(RateLimit {
1338            categories: [DataCategory::Error].into(),
1339            scope: RateLimitScope::Project(ProjectId::new(21)),
1340            reason_code: None,
1341            retry_after: RetryAfter::from_secs(0),
1342            namespaces: smallvec![],
1343        });
1344
1345        let rate_limits = cached.current_limits();
1346
1347        insta::assert_ron_snapshot!(rate_limits, @r#"
1348        RateLimits(
1349          limits: [
1350            RateLimit(
1351              categories: [
1352                "error",
1353              ],
1354              scope: Organization(OrganizationId(42)),
1355              reason_code: None,
1356              retry_after: RetryAfter(1),
1357              namespaces: [],
1358            ),
1359          ],
1360        )
1361        "#);
1362    }
1363}