Skip to main content

relay_quotas/
rate_limit.rs

1use std::fmt;
2use std::str::FromStr;
3use std::sync::{Arc, Mutex, PoisonError};
4use std::time::{Duration, Instant};
5
6use relay_base_schema::data_category::DataCategory;
7use relay_base_schema::metrics::MetricNamespace;
8use relay_base_schema::organization::OrganizationId;
9use relay_base_schema::project::{ProjectId, ProjectKey};
10use smallvec::SmallVec;
11
12use crate::REJECT_ALL_SECS;
13use crate::quota::{DataCategories, ItemScoping, Quota, QuotaScope, ReasonCode, Scoping};
14
15/// A monotonic expiration marker for rate limits.
16///
17/// [`RetryAfter`] represents a point in time when a rate limit expires. It allows checking
18/// whether the rate limit is still active or has expired, and calculating the remaining time
19/// until expiration.
20#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
21pub struct RetryAfter {
22    when: Instant,
23}
24
25impl RetryAfter {
26    /// Creates a new [`RetryAfter`] instance that expires after the specified number of seconds.
27    #[inline]
28    pub fn from_secs(seconds: u64) -> Self {
29        let now = Instant::now();
30        let when = now.checked_add(Duration::from_secs(seconds)).unwrap_or(now);
31        Self { when }
32    }
33
34    /// Returns the remaining duration until the rate limit expires using the specified instant
35    /// as the reference point.
36    ///
37    /// If the rate limit has already expired at the given instant, returns `None`.
38    #[inline]
39    pub fn remaining_at(self, at: Instant) -> Option<Duration> {
40        if at >= self.when {
41            None
42        } else {
43            Some(self.when - at)
44        }
45    }
46
47    /// Returns the remaining duration until the rate limit expires.
48    ///
49    /// This uses the current instant as the reference point. If the rate limit has already
50    /// expired, returns `None`.
51    #[inline]
52    pub fn remaining(self) -> Option<Duration> {
53        self.remaining_at(Instant::now())
54    }
55
56    /// Returns the remaining seconds until the rate limit expires using the specified instant
57    /// as the reference point.
58    ///
59    /// This method rounds up to the next second to ensure that rate limits are strictly enforced.
60    /// If the rate limit has already expired, returns `0`.
61    #[inline]
62    pub fn remaining_seconds_at(self, at: Instant) -> u64 {
63        match self.remaining_at(at) {
64            // Compensate for the missing subsec part by adding 1s
65            Some(duration) if duration.subsec_nanos() == 0 => duration.as_secs(),
66            Some(duration) => duration.as_secs() + 1,
67            None => 0,
68        }
69    }
70
71    /// Returns the remaining seconds until the rate limit expires.
72    ///
73    /// This uses the current instant as the reference point. If the rate limit
74    /// has already expired, returns `0`.
75    #[inline]
76    pub fn remaining_seconds(self) -> u64 {
77        self.remaining_seconds_at(Instant::now())
78    }
79
80    /// Returns whether this rate limit has expired at the specified instant.
81    #[inline]
82    pub fn expired_at(self, at: Instant) -> bool {
83        self.remaining_at(at).is_none()
84    }
85
86    /// Returns whether this rate limit has expired at the current instant.
87    #[inline]
88    pub fn expired(self) -> bool {
89        self.remaining_at(Instant::now()).is_none()
90    }
91}
92
93impl fmt::Debug for RetryAfter {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        match self.remaining_seconds() {
96            0 => write!(f, "RetryAfter(expired)"),
97            remaining => write!(f, "RetryAfter({remaining}s)"),
98        }
99    }
100}
101
102#[cfg(test)]
103impl serde::Serialize for RetryAfter {
104    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
105    where
106        S: serde::Serializer,
107    {
108        use serde::ser::SerializeTupleStruct;
109        let mut tup = serializer.serialize_tuple_struct("RetryAfter", 1)?;
110        tup.serialize_field(&self.remaining_seconds())?;
111        tup.end()
112    }
113}
114
115/// Error that occurs when parsing a [`RetryAfter`] from a string fails.
116#[derive(Debug)]
117pub enum InvalidRetryAfter {
118    /// The supplied delay in seconds was not valid.
119    InvalidDelay(std::num::ParseFloatError),
120}
121
122impl FromStr for RetryAfter {
123    type Err = InvalidRetryAfter;
124
125    fn from_str(s: &str) -> Result<Self, Self::Err> {
126        let float = s.parse::<f64>().map_err(InvalidRetryAfter::InvalidDelay)?;
127        let seconds = float.max(0.0).ceil() as u64;
128        Ok(RetryAfter::from_secs(seconds))
129    }
130}
131
132/// The scope that a rate limit applies to.
133///
134/// Unlike [`QuotaScope`], which only declares the class of the scope, this enum carries
135/// the specific identifiers of the individual scopes that a rate limit applies to.
136///
137/// Rate limits can be applied at different levels of granularity, from global (affecting all data)
138/// down to a specific project key.
139#[derive(Clone, Debug, Eq, PartialEq)]
140#[cfg_attr(test, derive(serde::Serialize))]
141pub enum RateLimitScope {
142    /// An organization with identifier.
143    Organization(OrganizationId),
144    /// A project with identifier.
145    Project(ProjectId),
146    /// A DSN public key.
147    Key(ProjectKey),
148}
149
150impl RateLimitScope {
151    /// Creates a rate limiting scope from the given item scoping for a specific quota.
152    ///
153    /// This extracts the appropriate scope identifier based on the quota's scope type.
154    /// For unknown scopes, it assumes the most specific scope (Key).
155    pub fn for_quota(scoping: Scoping, scope: QuotaScope) -> Self {
156        match scope {
157            QuotaScope::Organization => Self::Organization(scoping.organization_id),
158            QuotaScope::Project => Self::Project(scoping.project_id),
159            QuotaScope::Key => Self::Key(scoping.project_key),
160            // For unknown scopes, assume the most specific scope:
161            QuotaScope::Unknown => Self::Key(scoping.project_key),
162        }
163    }
164
165    /// Returns the canonical name of this scope.
166    ///
167    /// This corresponds to the name of the corresponding [`QuotaScope`].
168    pub fn name(&self) -> &'static str {
169        match *self {
170            Self::Key(_) => QuotaScope::Key.name(),
171            Self::Project(_) => QuotaScope::Project.name(),
172            Self::Organization(_) => QuotaScope::Organization.name(),
173        }
174    }
175}
176
177/// An active rate limit that restricts data ingestion.
178///
179/// A rate limit defines restrictions for specific data categories within a particular scope.
180/// It includes an expiration time after which the limit is no longer enforced.
181///
182/// Rate limits can be created from [`Quota`]s or directly constructed with the needed parameters.
183#[derive(Clone, Debug, PartialEq)]
184#[cfg_attr(test, derive(serde::Serialize))]
185pub struct RateLimit {
186    /// A set of data categories that this quota applies to. If empty, this rate limit
187    /// applies to all data categories.
188    pub categories: DataCategories,
189
190    /// The scope of this rate limit.
191    pub scope: RateLimitScope,
192
193    /// A machine-readable reason indicating which quota caused this rate limit.
194    pub reason_code: Option<ReasonCode>,
195
196    /// A marker when this rate limit expires.
197    pub retry_after: RetryAfter,
198
199    /// The metric namespace of this rate limit.
200    ///
201    /// Only relevant for data categories of type metric bucket. If empty,
202    /// this rate limit applies to metrics of all namespaces.
203    pub namespaces: SmallVec<[MetricNamespace; 1]>,
204}
205
206impl RateLimit {
207    /// Creates a new rate limit from the given [`Quota`].
208    ///
209    /// This builds a rate limit with the appropriate scope derived from the quota and scoping
210    /// information. The categories and other properties are copied from the quota.
211    pub fn from_quota(quota: &Quota, scoping: Scoping, retry_after: RetryAfter) -> Self {
212        Self {
213            categories: quota.categories,
214            scope: RateLimitScope::for_quota(scoping, quota.scope),
215            reason_code: quota.reason_code.clone(),
216            retry_after,
217            namespaces: quota.namespace.into_iter().collect(),
218        }
219    }
220
221    /// Checks whether this rate limit applies to the given item.
222    ///
223    /// A rate limit applies if its scope matches the item's scope, and the item's
224    /// category and namespace match those of the rate limit.
225    pub fn matches(&self, scoping: ItemScoping) -> bool {
226        self.matches_scope(scoping)
227            && scoping.matches_categories(self.categories)
228            && scoping.matches_namespaces(&self.namespaces)
229    }
230
231    /// Returns `true` if the rate limiting scope matches the given item.
232    fn matches_scope(&self, scoping: ItemScoping) -> bool {
233        match self.scope {
234            RateLimitScope::Organization(org_id) => scoping.organization_id == org_id,
235            RateLimitScope::Project(project_id) => scoping.project_id == project_id,
236            RateLimitScope::Key(key) => scoping.project_key == key,
237        }
238    }
239}
240
241/// A collection of scoped rate limits.
242///
243/// [`RateLimits`] manages a set of active rate limits that can be checked against
244/// incoming data.
245///
246/// The collection can be empty, indicated by [`is_ok`](Self::is_ok), meaning no rate limits
247/// are currently active.
248#[derive(Clone, Debug, Default)]
249#[cfg_attr(test, derive(serde::Serialize))]
250pub struct RateLimits {
251    limits: Vec<RateLimit>,
252}
253
254impl RateLimits {
255    /// Creates an empty [`RateLimits`] instance.
256    pub fn new() -> Self {
257        Self::default()
258    }
259
260    /// Adds a limit to this collection.
261    ///
262    /// If a rate limit with an overlapping scope already exists, the `retry_after` count is merged
263    /// with the existing limit. Otherwise, the new rate limit is added.
264    pub fn add(&mut self, limit: RateLimit) {
265        let limit_opt = self.limits.iter_mut().find(|l| {
266            let RateLimit {
267                categories,
268                scope,
269                reason_code: _,
270                retry_after: _,
271                namespaces: namespace,
272            } = &limit;
273
274            *categories == l.categories && *scope == l.scope && *namespace == l.namespaces
275        });
276
277        match limit_opt {
278            None => self.limits.push(limit),
279            Some(existing) if existing.retry_after < limit.retry_after => *existing = limit,
280            Some(_) => (), // keep existing, longer limit
281        }
282    }
283
284    /// Merges all limits from another [`RateLimits`] instance into this one.
285    ///
286    /// This keeps all existing rate limits, adds new ones, and updates any existing ones
287    /// with a later expiration time. The resulting collection contains the maximum
288    /// constraints from both instances.
289    pub fn merge(&mut self, limits: Self) {
290        for limit in limits {
291            self.add(limit);
292        }
293    }
294
295    /// Merges all limits from another [`RateLimits`] with this one.
296    ///
297    /// See also: [`Self::merge`].
298    pub fn merge_with(mut self, other: Self) -> Self {
299        self.merge(other);
300        self
301    }
302
303    /// Returns `true` if this instance contains no active limits.
304    ///
305    /// This is the opposite of [`is_limited`](Self::is_limited).
306    pub fn is_ok(&self) -> bool {
307        !self.is_limited()
308    }
309
310    /// Returns `true` if this instance contains any active rate limits.
311    ///
312    /// A rate limit is considered active if it has not yet expired.
313    pub fn is_limited(&self) -> bool {
314        let now = Instant::now();
315        self.iter().any(|limit| !limit.retry_after.expired_at(now))
316    }
317
318    /// Returns `true` if this is instance contains any active rate limits
319    /// for the specified categories.
320    ///
321    /// A rate limit is considered active if it has not yet expired.
322    pub fn is_any_limited(&self, scopings: &[ItemScoping]) -> bool {
323        self.is_any_limited_with_quotas(&[], scopings)
324    }
325
326    /// Returns `true` if this is instance contains any active rate limits
327    /// for the specified categories.
328    ///
329    /// This is similar to [`Self::is_limited`], but additionally checks for quotas with a static
330    /// limit of `0`, which reject items even if there is no active rate limit in this instance.
331    ///
332    /// A rate limit is considered active if it has not yet expired.
333    pub fn is_any_limited_with_quotas<'a>(
334        &self,
335        quotas: impl IntoIterator<Item = &'a Quota>,
336        scopings: &[ItemScoping],
337    ) -> bool {
338        for quota in quotas {
339            for scoping in scopings {
340                if quota.limit == Some(0) && quota.matches(*scoping) {
341                    return true;
342                }
343            }
344        }
345
346        let now = Instant::now();
347        for scoping in scopings {
348            for limit in &self.limits {
349                if limit.matches(*scoping) && !limit.retry_after.expired_at(now) {
350                    return true;
351                }
352            }
353        }
354
355        false
356    }
357
358    /// Removes expired rate limits from this instance.
359    ///
360    /// This is useful for cleaning up rate limits that are no longer relevant,
361    /// reducing memory usage and improving performance of subsequent operations.
362    pub fn clean_expired(&mut self, now: Instant) {
363        self.limits
364            .retain(|limit| !limit.retry_after.expired_at(now));
365    }
366
367    /// Checks whether any rate limits apply to the given scoping.
368    ///
369    /// Returns a new [`RateLimits`] instance containing only the rate limits that match
370    /// the provided [`ItemScoping`]. If no limits match, the returned instance will be empty
371    /// and [`is_ok`](Self::is_ok) will return `true`.
372    pub fn check(&self, scoping: ItemScoping) -> Self {
373        self.check_with_quotas(&[], scoping)
374    }
375
376    /// Checks whether any rate limits or static quotas apply to the given scoping.
377    ///
378    /// This is similar to [`check`](Self::check), but additionally checks for quotas with a static
379    /// limit of `0`, which reject items even if there is no active rate limit in this instance.
380    ///
381    /// Returns a new [`RateLimits`] instance containing the rate limits that match the provided
382    /// [`ItemScoping`]. If no limits or quotas match, the returned instance will be empty.
383    pub fn check_with_quotas<'a>(
384        &self,
385        quotas: impl IntoIterator<Item = &'a Quota>,
386        scoping: ItemScoping,
387    ) -> Self {
388        let mut applied_limits = Self::new();
389
390        for quota in quotas {
391            if quota.limit == Some(0) && quota.matches(scoping) {
392                let retry_after = RetryAfter::from_secs(REJECT_ALL_SECS);
393                applied_limits.add(RateLimit::from_quota(quota, *scoping, retry_after));
394            }
395        }
396
397        for limit in &self.limits {
398            if limit.matches(scoping) {
399                applied_limits.add(limit.clone());
400            }
401        }
402
403        applied_limits
404    }
405
406    /// Returns an iterator over all rate limits in this collection.
407    pub fn iter(&self) -> RateLimitsIter<'_> {
408        RateLimitsIter {
409            iter: self.limits.iter(),
410        }
411    }
412
413    /// Returns the rate limit with the latest expiration time.
414    ///
415    /// If multiple rate limits have the same expiration time, any of them may be returned.
416    /// If the collection is empty, returns `None`.
417    pub fn longest(&self) -> Option<&RateLimit> {
418        self.iter().max_by_key(|limit| limit.retry_after)
419    }
420
421    /// Returns `true` if there are no rate limits in this collection.
422    ///
423    /// Note that an empty collection is not the same as having no active limits.
424    /// Use [`is_ok`](Self::is_ok) to check if there are no active limits.
425    pub fn is_empty(&self) -> bool {
426        self.limits.is_empty()
427    }
428}
429
430/// An iterator over rate limit references.
431///
432/// This struct is created by the [`iter`](RateLimits::iter) method on [`RateLimits`].
433/// It yields shared references to the rate limits in the collection.
434pub struct RateLimitsIter<'a> {
435    iter: std::slice::Iter<'a, RateLimit>,
436}
437
438impl<'a> Iterator for RateLimitsIter<'a> {
439    type Item = &'a RateLimit;
440
441    fn next(&mut self) -> Option<Self::Item> {
442        self.iter.next()
443    }
444}
445
446impl IntoIterator for RateLimits {
447    type IntoIter = RateLimitsIntoIter;
448    type Item = RateLimit;
449
450    fn into_iter(self) -> Self::IntoIter {
451        RateLimitsIntoIter {
452            iter: self.limits.into_iter(),
453        }
454    }
455}
456
457/// An iterator that consumes a [`RateLimits`] collection.
458///
459/// This struct is created by the `into_iter` method on [`RateLimits`], provided by the
460/// [`IntoIterator`] trait. It yields owned rate limits by value.
461pub struct RateLimitsIntoIter {
462    iter: std::vec::IntoIter<RateLimit>,
463}
464
465impl Iterator for RateLimitsIntoIter {
466    type Item = RateLimit;
467
468    fn next(&mut self) -> Option<Self::Item> {
469        self.iter.next()
470    }
471}
472
473impl<'a> IntoIterator for &'a RateLimits {
474    type IntoIter = RateLimitsIter<'a>;
475    type Item = &'a RateLimit;
476
477    fn into_iter(self) -> Self::IntoIter {
478        self.iter()
479    }
480}
481
482/// A thread-safe cache of rate limits with automatic expiration handling.
483///
484/// [`CachedRateLimits`] wraps a [`RateLimits`] collection with a mutex to allow safe
485/// concurrent access from multiple threads. It automatically removes expired rate limits
486/// when retrieving the current limits.
487///
488/// This is useful for maintaining a shared set of rate limits across multiple
489/// processing threads or tasks.
490#[derive(Debug, Default)]
491pub struct CachedRateLimits(Mutex<Arc<RateLimits>>);
492
493impl CachedRateLimits {
494    /// Creates a new, empty instance without any rate limits.
495    pub fn new() -> Self {
496        Self::default()
497    }
498
499    /// Adds a rate limit to this collection.
500    ///
501    /// This is a thread-safe wrapper around [`RateLimits::add`].
502    pub fn add(&self, limit: RateLimit) {
503        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
504        let current = Arc::make_mut(&mut inner);
505        current.add(limit);
506    }
507
508    /// Merges rate limits from another collection into this one.
509    ///
510    /// This is a thread-safe wrapper around [`RateLimits::merge`].
511    pub fn merge(&self, limits: RateLimits) {
512        if limits.is_empty() {
513            return;
514        }
515
516        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
517        let current = Arc::make_mut(&mut inner);
518        for mut limit in limits {
519            // To spice it up, we do have some special casing here for 'inherited categories',
520            // e.g. spans and transactions.
521            //
522            // The tldr; is, as transactions are just containers for spans,
523            // we can enforce span limits on transactions but also vice versa.
524            //
525            // So this is largely an enforcement problem, but since Relay propagates
526            // rate limits to clients, we clone the limits with the inherited category.
527            // This ensures old SDKs rate limit correctly, but also it simplifies client
528            // implementations. Only Relay needs to make this decision.
529            for category in limit.categories.iter() {
530                for inherited in inherited_categories(&category) {
531                    if let Some(categories) = limit.categories.add(*inherited) {
532                        limit.categories = categories;
533                    }
534                }
535            }
536
537            current.add(limit);
538        }
539    }
540
541    /// Returns a reference to the current rate limits.
542    ///
543    /// This method automatically removes any expired rate limits before returning,
544    /// ensuring that only active limits are included in the result.
545    pub fn current_limits(&self) -> Arc<RateLimits> {
546        let now = Instant::now();
547        let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
548        Arc::make_mut(&mut inner).clean_expired(now);
549        Arc::clone(&inner)
550    }
551}
552
553/// Returns inherited rate limit categories for the passed category.
554///
555/// When a rate limit for a category can also be enforced in a different category,
556/// then it's an inherited category.
557///
558/// For example, a transaction rate limit can also be applied to spans and vice versa.
559///
560/// For a detailed explanation on span/transaction enforcement see:
561/// <https://develop.sentry.dev/ingestion/relay/transaction-span-ratelimits/>.
562fn inherited_categories(category: &DataCategory) -> &'static [DataCategory] {
563    match category {
564        DataCategory::Transaction => &[DataCategory::Span],
565        DataCategory::Span => &[DataCategory::Transaction],
566        _ => &[],
567    }
568}
569
570#[cfg(test)]
571mod tests {
572    use smallvec::smallvec;
573
574    use super::*;
575    use crate::MetricNamespaceScoping;
576    use crate::quota::DataCategory;
577
578    #[test]
579    fn test_parse_retry_after() {
580        // positive float always rounds up to the next integer
581        let retry_after = "17.1".parse::<RetryAfter>().expect("parse RetryAfter");
582        assert_eq!(retry_after.remaining_seconds(), 18);
583        assert!(!retry_after.expired());
584        let retry_after = "17.7".parse::<RetryAfter>().expect("parse RetryAfter");
585        assert_eq!(retry_after.remaining_seconds(), 18);
586        assert!(!retry_after.expired());
587
588        // positive int
589        let retry_after = "17".parse::<RetryAfter>().expect("parse RetryAfter");
590        assert_eq!(retry_after.remaining_seconds(), 17);
591        assert!(!retry_after.expired());
592
593        // negative numbers are treated as zero
594        let retry_after = "-2".parse::<RetryAfter>().expect("parse RetryAfter");
595        assert_eq!(retry_after.remaining_seconds(), 0);
596        assert!(retry_after.expired());
597        let retry_after = "-inf".parse::<RetryAfter>().expect("parse RetryAfter");
598        assert_eq!(retry_after.remaining_seconds(), 0);
599        assert!(retry_after.expired());
600
601        // inf and NaN are valid input and treated as zero
602        let retry_after = "inf".parse::<RetryAfter>().expect("parse RetryAfter");
603        assert_eq!(retry_after.remaining_seconds(), 0);
604        assert!(retry_after.expired());
605        let retry_after = "NaN".parse::<RetryAfter>().expect("parse RetryAfter");
606        assert_eq!(retry_after.remaining_seconds(), 0);
607        assert!(retry_after.expired());
608
609        // large inputs that would overflow are treated as zero
610        let retry_after = "100000000000000000000"
611            .parse::<RetryAfter>()
612            .expect("parse RetryAfter");
613        assert_eq!(retry_after.remaining_seconds(), 0);
614        assert!(retry_after.expired());
615
616        // invalid strings cause parse error
617        "".parse::<RetryAfter>().expect_err("error RetryAfter");
618        "nope".parse::<RetryAfter>().expect_err("error RetryAfter");
619        " 2 ".parse::<RetryAfter>().expect_err("error RetryAfter");
620        "6 0".parse::<RetryAfter>().expect_err("error RetryAfter");
621    }
622
623    #[test]
624    fn test_rate_limit_matches_categories() {
625        let rate_limit = RateLimit {
626            categories: [DataCategory::Unknown, DataCategory::Error].into(),
627            scope: RateLimitScope::Organization(OrganizationId::new(42)),
628            reason_code: None,
629            retry_after: RetryAfter::from_secs(1),
630            namespaces: smallvec![],
631        };
632
633        assert!(rate_limit.matches(ItemScoping {
634            category: DataCategory::Error,
635            scoping: Scoping {
636                organization_id: OrganizationId::new(42),
637                project_id: ProjectId::new(21),
638                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
639                key_id: None,
640            },
641            namespace: MetricNamespaceScoping::None,
642        }));
643
644        assert!(!rate_limit.matches(ItemScoping {
645            category: DataCategory::Transaction,
646            scoping: Scoping {
647                organization_id: OrganizationId::new(42),
648                project_id: ProjectId::new(21),
649                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
650                key_id: None,
651            },
652            namespace: MetricNamespaceScoping::None,
653        }));
654    }
655
656    #[test]
657    fn test_rate_limit_matches_organization() {
658        let rate_limit = RateLimit {
659            categories: DataCategories::new(),
660            scope: RateLimitScope::Organization(OrganizationId::new(42)),
661            reason_code: None,
662            retry_after: RetryAfter::from_secs(1),
663            namespaces: smallvec![],
664        };
665
666        assert!(rate_limit.matches(ItemScoping {
667            category: DataCategory::Error,
668            scoping: Scoping {
669                organization_id: OrganizationId::new(42),
670                project_id: ProjectId::new(21),
671                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
672                key_id: None,
673            },
674            namespace: MetricNamespaceScoping::None,
675        }));
676
677        assert!(!rate_limit.matches(ItemScoping {
678            category: DataCategory::Error,
679            scoping: Scoping {
680                organization_id: OrganizationId::new(0),
681                project_id: ProjectId::new(21),
682                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
683                key_id: None,
684            },
685            namespace: MetricNamespaceScoping::None,
686        }));
687    }
688
689    #[test]
690    fn test_rate_limit_matches_project() {
691        let rate_limit = RateLimit {
692            categories: DataCategories::new(),
693            scope: RateLimitScope::Project(ProjectId::new(21)),
694            reason_code: None,
695            retry_after: RetryAfter::from_secs(1),
696            namespaces: smallvec![],
697        };
698
699        assert!(rate_limit.matches(ItemScoping {
700            category: DataCategory::Error,
701            scoping: Scoping {
702                organization_id: OrganizationId::new(42),
703                project_id: ProjectId::new(21),
704                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
705                key_id: None,
706            },
707            namespace: MetricNamespaceScoping::None,
708        }));
709
710        assert!(!rate_limit.matches(ItemScoping {
711            category: DataCategory::Error,
712            scoping: Scoping {
713                organization_id: OrganizationId::new(42),
714                project_id: ProjectId::new(0),
715                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
716                key_id: None,
717            },
718            namespace: MetricNamespaceScoping::None,
719        }));
720    }
721
722    #[test]
723    fn test_rate_limit_matches_namespaces() {
724        let rate_limit = RateLimit {
725            categories: Default::default(),
726            scope: RateLimitScope::Organization(OrganizationId::new(42)),
727            reason_code: None,
728            retry_after: RetryAfter::from_secs(1),
729            namespaces: smallvec![MetricNamespace::Custom],
730        };
731
732        let scoping = Scoping {
733            organization_id: OrganizationId::new(42),
734            project_id: ProjectId::new(21),
735            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
736            key_id: None,
737        };
738
739        assert!(rate_limit.matches(ItemScoping {
740            category: DataCategory::MetricBucket,
741            scoping,
742            namespace: MetricNamespaceScoping::Some(MetricNamespace::Custom),
743        }));
744
745        assert!(!rate_limit.matches(ItemScoping {
746            category: DataCategory::MetricBucket,
747            scoping,
748            namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
749        }));
750
751        let general_rate_limit = RateLimit {
752            categories: Default::default(),
753            scope: RateLimitScope::Organization(OrganizationId::new(42)),
754            reason_code: None,
755            retry_after: RetryAfter::from_secs(1),
756            namespaces: smallvec![], // all namespaces
757        };
758
759        assert!(general_rate_limit.matches(ItemScoping {
760            category: DataCategory::MetricBucket,
761            scoping,
762            namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans),
763        }));
764
765        assert!(general_rate_limit.matches(ItemScoping {
766            category: DataCategory::MetricBucket,
767            scoping,
768            namespace: MetricNamespaceScoping::None,
769        }));
770    }
771
772    #[test]
773    fn test_rate_limit_matches_key() {
774        let rate_limit = RateLimit {
775            categories: DataCategories::new(),
776            scope: RateLimitScope::Key(
777                ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
778            ),
779            reason_code: None,
780            retry_after: RetryAfter::from_secs(1),
781            namespaces: smallvec![],
782        };
783
784        assert!(rate_limit.matches(ItemScoping {
785            category: DataCategory::Error,
786            scoping: Scoping {
787                organization_id: OrganizationId::new(42),
788                project_id: ProjectId::new(21),
789                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
790                key_id: None,
791            },
792            namespace: MetricNamespaceScoping::None,
793        }));
794
795        assert!(!rate_limit.matches(ItemScoping {
796            category: DataCategory::Error,
797            scoping: Scoping {
798                organization_id: OrganizationId::new(0),
799                project_id: ProjectId::new(21),
800                project_key: ProjectKey::parse("deadbeefdeadbeefdeadbeefdeadbeef").unwrap(),
801                key_id: None,
802            },
803            namespace: MetricNamespaceScoping::None,
804        }));
805    }
806
807    #[test]
808    fn test_rate_limits_add_replacement() {
809        let mut rate_limits = RateLimits::new();
810
811        rate_limits.add(RateLimit {
812            categories: [DataCategory::Default, DataCategory::Error].into(),
813            scope: RateLimitScope::Organization(OrganizationId::new(42)),
814            reason_code: Some(ReasonCode::new("first")),
815            retry_after: RetryAfter::from_secs(1),
816            namespaces: smallvec![],
817        });
818
819        // longer rate limit shadows shorter one
820        rate_limits.add(RateLimit {
821            categories: [DataCategory::Error, DataCategory::Default].into(),
822            scope: RateLimitScope::Organization(OrganizationId::new(42)),
823            reason_code: Some(ReasonCode::new("second")),
824            retry_after: RetryAfter::from_secs(10),
825            namespaces: smallvec![],
826        });
827
828        insta::assert_ron_snapshot!(rate_limits, @r#"
829        RateLimits(
830          limits: [
831            RateLimit(
832              categories: [
833                "default",
834                "error",
835              ],
836              scope: Organization(OrganizationId(42)),
837              reason_code: Some(ReasonCode("second")),
838              retry_after: RetryAfter(10),
839              namespaces: [],
840            ),
841          ],
842        )
843        "#);
844    }
845
846    #[test]
847    fn test_rate_limits_add_shadowing() {
848        let mut rate_limits = RateLimits::new();
849
850        rate_limits.add(RateLimit {
851            categories: [DataCategory::Default, DataCategory::Error].into(),
852            scope: RateLimitScope::Organization(OrganizationId::new(42)),
853            reason_code: Some(ReasonCode::new("first")),
854            retry_after: RetryAfter::from_secs(10),
855            namespaces: smallvec![],
856        });
857
858        // shorter rate limit is shadowed by existing one
859        rate_limits.add(RateLimit {
860            categories: [DataCategory::Error, DataCategory::Default].into(),
861            scope: RateLimitScope::Organization(OrganizationId::new(42)),
862            reason_code: Some(ReasonCode::new("second")),
863            retry_after: RetryAfter::from_secs(1),
864            namespaces: smallvec![],
865        });
866
867        insta::assert_ron_snapshot!(rate_limits, @r#"
868        RateLimits(
869          limits: [
870            RateLimit(
871              categories: [
872                "default",
873                "error",
874              ],
875              scope: Organization(OrganizationId(42)),
876              reason_code: Some(ReasonCode("first")),
877              retry_after: RetryAfter(10),
878              namespaces: [],
879            ),
880          ],
881        )
882        "#);
883    }
884
885    #[test]
886    fn test_rate_limits_add_buckets() {
887        let mut rate_limits = RateLimits::new();
888
889        rate_limits.add(RateLimit {
890            categories: [DataCategory::Error].into(),
891            scope: RateLimitScope::Organization(OrganizationId::new(42)),
892            reason_code: None,
893            retry_after: RetryAfter::from_secs(1),
894            namespaces: smallvec![],
895        });
896
897        // Same scope but different categories
898        rate_limits.add(RateLimit {
899            categories: [DataCategory::Transaction].into(),
900            scope: RateLimitScope::Organization(OrganizationId::new(42)),
901            reason_code: None,
902            retry_after: RetryAfter::from_secs(1),
903            namespaces: smallvec![],
904        });
905
906        // Same categories but different scope
907        rate_limits.add(RateLimit {
908            categories: [DataCategory::Error].into(),
909            scope: RateLimitScope::Project(ProjectId::new(21)),
910            reason_code: None,
911            retry_after: RetryAfter::from_secs(1),
912            namespaces: smallvec![],
913        });
914
915        insta::assert_ron_snapshot!(rate_limits, @r#"
916        RateLimits(
917          limits: [
918            RateLimit(
919              categories: [
920                "error",
921              ],
922              scope: Organization(OrganizationId(42)),
923              reason_code: None,
924              retry_after: RetryAfter(1),
925              namespaces: [],
926            ),
927            RateLimit(
928              categories: [
929                "transaction",
930              ],
931              scope: Organization(OrganizationId(42)),
932              reason_code: None,
933              retry_after: RetryAfter(1),
934              namespaces: [],
935            ),
936            RateLimit(
937              categories: [
938                "error",
939              ],
940              scope: Project(ProjectId(21)),
941              reason_code: None,
942              retry_after: RetryAfter(1),
943              namespaces: [],
944            ),
945          ],
946        )
947        "#);
948    }
949
950    /// Regression test that ensures namespaces are correctly added to rate limits.
951    #[test]
952    fn test_rate_limits_add_namespaces() {
953        let mut rate_limits = RateLimits::new();
954
955        rate_limits.add(RateLimit {
956            categories: [DataCategory::MetricBucket].into(),
957            scope: RateLimitScope::Organization(OrganizationId::new(42)),
958            reason_code: None,
959            retry_after: RetryAfter::from_secs(1),
960            namespaces: smallvec![MetricNamespace::Custom],
961        });
962
963        // Same category but different namespaces
964        rate_limits.add(RateLimit {
965            categories: [DataCategory::MetricBucket].into(),
966            scope: RateLimitScope::Organization(OrganizationId::new(42)),
967            reason_code: None,
968            retry_after: RetryAfter::from_secs(1),
969            namespaces: smallvec![MetricNamespace::Spans],
970        });
971
972        insta::assert_ron_snapshot!(rate_limits, @r#"
973        RateLimits(
974          limits: [
975            RateLimit(
976              categories: [
977                "metric_bucket",
978              ],
979              scope: Organization(OrganizationId(42)),
980              reason_code: None,
981              retry_after: RetryAfter(1),
982              namespaces: [
983                "custom",
984              ],
985            ),
986            RateLimit(
987              categories: [
988                "metric_bucket",
989              ],
990              scope: Organization(OrganizationId(42)),
991              reason_code: None,
992              retry_after: RetryAfter(1),
993              namespaces: [
994                "spans",
995              ],
996            ),
997          ],
998        )
999        "#);
1000    }
1001
1002    #[test]
1003    fn test_rate_limits_longest() {
1004        let mut rate_limits = RateLimits::new();
1005
1006        rate_limits.add(RateLimit {
1007            categories: [DataCategory::Error].into(),
1008            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1009            reason_code: Some(ReasonCode::new("first")),
1010            retry_after: RetryAfter::from_secs(1),
1011            namespaces: smallvec![],
1012        });
1013
1014        // Distinct scope to prevent deduplication
1015        rate_limits.add(RateLimit {
1016            categories: [DataCategory::Transaction].into(),
1017            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1018            reason_code: Some(ReasonCode::new("second")),
1019            retry_after: RetryAfter::from_secs(10),
1020            namespaces: smallvec![],
1021        });
1022
1023        let rate_limit = rate_limits.longest().unwrap();
1024        insta::assert_ron_snapshot!(rate_limit, @r#"
1025        RateLimit(
1026          categories: [
1027            "transaction",
1028          ],
1029          scope: Organization(OrganizationId(42)),
1030          reason_code: Some(ReasonCode("second")),
1031          retry_after: RetryAfter(10),
1032          namespaces: [],
1033        )
1034        "#);
1035    }
1036
1037    #[test]
1038    fn test_rate_limits_clean_expired() {
1039        let mut rate_limits = RateLimits::new();
1040
1041        // Active error limit
1042        rate_limits.add(RateLimit {
1043            categories: [DataCategory::Error].into(),
1044            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1045            reason_code: None,
1046            retry_after: RetryAfter::from_secs(1),
1047            namespaces: smallvec![],
1048        });
1049
1050        // Inactive error limit with distinct scope
1051        rate_limits.add(RateLimit {
1052            categories: [DataCategory::Error].into(),
1053            scope: RateLimitScope::Project(ProjectId::new(21)),
1054            reason_code: None,
1055            retry_after: RetryAfter::from_secs(0),
1056            namespaces: smallvec![],
1057        });
1058
1059        // Sanity check before running `clean_expired`
1060        assert_eq!(rate_limits.iter().count(), 2);
1061
1062        rate_limits.clean_expired(Instant::now());
1063
1064        // Check that the expired limit has been removed
1065        insta::assert_ron_snapshot!(rate_limits, @r#"
1066        RateLimits(
1067          limits: [
1068            RateLimit(
1069              categories: [
1070                "error",
1071              ],
1072              scope: Organization(OrganizationId(42)),
1073              reason_code: None,
1074              retry_after: RetryAfter(1),
1075              namespaces: [],
1076            ),
1077          ],
1078        )
1079        "#);
1080    }
1081
1082    #[test]
1083    fn test_rate_limits_is_any_limited() {
1084        let mut rate_limits = RateLimits::new();
1085
1086        // Active error limit
1087        rate_limits.add(RateLimit {
1088            categories: [DataCategory::Error].into(),
1089            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1090            reason_code: None,
1091            retry_after: RetryAfter::from_secs(1),
1092            namespaces: smallvec![],
1093        });
1094
1095        // Active transaction limit
1096        rate_limits.add(RateLimit {
1097            categories: [DataCategory::Transaction].into(),
1098            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1099            reason_code: None,
1100            retry_after: RetryAfter::from_secs(1),
1101            namespaces: smallvec![],
1102        });
1103
1104        let scoping = Scoping {
1105            organization_id: OrganizationId::new(42),
1106            project_id: ProjectId::new(21),
1107            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1108            key_id: None,
1109        };
1110
1111        let quotas = &[Quota {
1112            id: None,
1113            categories: [DataCategory::Attachment].into(),
1114            scope: QuotaScope::Organization,
1115            scope_id: Some("42".into()),
1116            limit: Some(0),
1117            window: None,
1118            reason_code: Some(ReasonCode::new("zero")),
1119            namespace: None,
1120        }];
1121
1122        assert!(
1123            rate_limits.is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Error)])
1124        );
1125        assert!(
1126            rate_limits
1127                .is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Attachment)])
1128        );
1129        assert!(rate_limits.is_any_limited_with_quotas(
1130            quotas,
1131            &[
1132                scoping.item(DataCategory::Replay),
1133                scoping.item(DataCategory::Error)
1134            ]
1135        ));
1136
1137        assert!(
1138            !rate_limits.is_any_limited_with_quotas(quotas, &[scoping.item(DataCategory::Replay)])
1139        );
1140    }
1141
1142    #[test]
1143    fn test_rate_limits_check() {
1144        let mut rate_limits = RateLimits::new();
1145
1146        // Active error limit
1147        rate_limits.add(RateLimit {
1148            categories: [DataCategory::Error].into(),
1149            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1150            reason_code: None,
1151            retry_after: RetryAfter::from_secs(1),
1152            namespaces: smallvec![],
1153        });
1154
1155        // Active transaction limit
1156        rate_limits.add(RateLimit {
1157            categories: [DataCategory::Transaction].into(),
1158            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1159            reason_code: None,
1160            retry_after: RetryAfter::from_secs(1),
1161            namespaces: smallvec![],
1162        });
1163
1164        let applied_limits = rate_limits.check(ItemScoping {
1165            category: DataCategory::Error,
1166            scoping: Scoping {
1167                organization_id: OrganizationId::new(42),
1168                project_id: ProjectId::new(21),
1169                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1170                key_id: None,
1171            },
1172            namespace: MetricNamespaceScoping::None,
1173        });
1174
1175        // Check that the error limit is applied
1176        insta::assert_ron_snapshot!(applied_limits, @r#"
1177        RateLimits(
1178          limits: [
1179            RateLimit(
1180              categories: [
1181                "error",
1182              ],
1183              scope: Organization(OrganizationId(42)),
1184              reason_code: None,
1185              retry_after: RetryAfter(1),
1186              namespaces: [],
1187            ),
1188          ],
1189        )
1190        "#);
1191    }
1192
1193    #[test]
1194    fn test_rate_limits_check_quotas() {
1195        let mut rate_limits = RateLimits::new();
1196
1197        // Active error limit
1198        rate_limits.add(RateLimit {
1199            categories: [DataCategory::Error].into(),
1200            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1201            reason_code: None,
1202            retry_after: RetryAfter::from_secs(1),
1203            namespaces: smallvec![],
1204        });
1205
1206        // Active transaction limit
1207        rate_limits.add(RateLimit {
1208            categories: [DataCategory::Transaction].into(),
1209            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1210            reason_code: None,
1211            retry_after: RetryAfter::from_secs(1),
1212            namespaces: smallvec![],
1213        });
1214
1215        let item_scoping = ItemScoping {
1216            category: DataCategory::Error,
1217            scoping: Scoping {
1218                organization_id: OrganizationId::new(42),
1219                project_id: ProjectId::new(21),
1220                project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1221                key_id: None,
1222            },
1223            namespace: MetricNamespaceScoping::None,
1224        };
1225
1226        let quotas = &[Quota {
1227            id: None,
1228            categories: [DataCategory::Error].into(),
1229            scope: QuotaScope::Organization,
1230            scope_id: Some("42".into()),
1231            limit: Some(0),
1232            window: None,
1233            reason_code: Some(ReasonCode::new("zero")),
1234            namespace: None,
1235        }];
1236
1237        let applied_limits = rate_limits.check_with_quotas(quotas, item_scoping);
1238
1239        insta::assert_ron_snapshot!(applied_limits, @r#"
1240        RateLimits(
1241          limits: [
1242            RateLimit(
1243              categories: [
1244                "error",
1245              ],
1246              scope: Organization(OrganizationId(42)),
1247              reason_code: Some(ReasonCode("zero")),
1248              retry_after: RetryAfter(60),
1249              namespaces: [],
1250            ),
1251          ],
1252        )
1253        "#);
1254    }
1255
1256    #[test]
1257    fn test_rate_limits_merge() {
1258        let mut rate_limits1 = RateLimits::new();
1259        let mut rate_limits2 = RateLimits::new();
1260
1261        rate_limits1.add(RateLimit {
1262            categories: [DataCategory::Error].into(),
1263            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1264            reason_code: Some(ReasonCode::new("first")),
1265            retry_after: RetryAfter::from_secs(1),
1266            namespaces: smallvec![],
1267        });
1268
1269        rate_limits1.add(RateLimit {
1270            categories: [DataCategory::TransactionIndexed].into(),
1271            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1272            reason_code: None,
1273            retry_after: RetryAfter::from_secs(1),
1274            namespaces: smallvec![],
1275        });
1276
1277        rate_limits2.add(RateLimit {
1278            categories: [DataCategory::Error].into(),
1279            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1280            reason_code: Some(ReasonCode::new("second")),
1281            retry_after: RetryAfter::from_secs(10),
1282            namespaces: smallvec![],
1283        });
1284
1285        rate_limits1.merge(rate_limits2);
1286
1287        insta::assert_ron_snapshot!(rate_limits1, @r#"
1288        RateLimits(
1289          limits: [
1290            RateLimit(
1291              categories: [
1292                "error",
1293              ],
1294              scope: Organization(OrganizationId(42)),
1295              reason_code: Some(ReasonCode("second")),
1296              retry_after: RetryAfter(10),
1297              namespaces: [],
1298            ),
1299            RateLimit(
1300              categories: [
1301                "transaction_indexed",
1302              ],
1303              scope: Organization(OrganizationId(42)),
1304              reason_code: None,
1305              retry_after: RetryAfter(1),
1306              namespaces: [],
1307            ),
1308          ],
1309        )
1310        "#);
1311    }
1312
1313    #[test]
1314    fn test_cached_rate_limits_expired() {
1315        let cached = CachedRateLimits::new();
1316
1317        // Active error limit
1318        cached.add(RateLimit {
1319            categories: [DataCategory::Error].into(),
1320            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1321            reason_code: None,
1322            retry_after: RetryAfter::from_secs(1),
1323            namespaces: smallvec![],
1324        });
1325
1326        // Inactive error limit with distinct scope
1327        cached.add(RateLimit {
1328            categories: [DataCategory::Error].into(),
1329            scope: RateLimitScope::Project(ProjectId::new(21)),
1330            reason_code: None,
1331            retry_after: RetryAfter::from_secs(0),
1332            namespaces: smallvec![],
1333        });
1334
1335        let rate_limits = cached.current_limits();
1336
1337        insta::assert_ron_snapshot!(rate_limits, @r#"
1338        RateLimits(
1339          limits: [
1340            RateLimit(
1341              categories: [
1342                "error",
1343              ],
1344              scope: Organization(OrganizationId(42)),
1345              reason_code: None,
1346              retry_after: RetryAfter(1),
1347              namespaces: [],
1348            ),
1349          ],
1350        )
1351        "#);
1352    }
1353}