relay_server/utils/
rate_limits.rs

1use std::fmt::{self, Write};
2use std::future::Future;
3use std::marker::PhantomData;
4
5use relay_profiling::ProfileType;
6use relay_quotas::{
7    DataCategory, ItemScoping, QuotaScope, RateLimit, RateLimitScope, RateLimits, ReasonCode,
8    Scoping,
9};
10
11use crate::envelope::{AttachmentParentType, Envelope, Item, ItemType};
12use crate::integrations::Integration;
13use crate::managed::ManagedEnvelope;
14use crate::services::outcome::Outcome;
15
16/// Name of the rate limits header.
17pub const RATE_LIMITS_HEADER: &str = "X-Sentry-Rate-Limits";
18
19/// Formats the `X-Sentry-Rate-Limits` header.
20pub fn format_rate_limits(rate_limits: &RateLimits) -> String {
21    let mut header = String::new();
22
23    for rate_limit in rate_limits {
24        if !header.is_empty() {
25            header.push_str(", ");
26        }
27
28        write!(header, "{}:", rate_limit.retry_after.remaining_seconds()).ok();
29
30        for (index, category) in rate_limit.categories.iter().enumerate() {
31            if index > 0 {
32                header.push(';');
33            }
34            write!(header, "{category}").ok();
35        }
36
37        write!(header, ":{}", rate_limit.scope.name()).ok();
38
39        if let Some(ref reason_code) = rate_limit.reason_code {
40            write!(header, ":{reason_code}").ok();
41        } else if !rate_limit.namespaces.is_empty() {
42            write!(header, ":").ok(); // delimits the empty reason code for namespaces
43        }
44
45        for (index, namespace) in rate_limit.namespaces.iter().enumerate() {
46            header.push(if index == 0 { ':' } else { ';' });
47            write!(header, "{namespace}").ok();
48        }
49    }
50
51    header
52}
53
54/// Parses the `X-Sentry-Rate-Limits` header.
55pub fn parse_rate_limits(scoping: &Scoping, string: &str) -> RateLimits {
56    let mut rate_limits = RateLimits::new();
57
58    for limit in string.split(',') {
59        let limit = limit.trim();
60        if limit.is_empty() {
61            continue;
62        }
63
64        let mut components = limit.split(':');
65
66        let retry_after = match components.next().and_then(|s| s.parse().ok()) {
67            Some(retry_after) => retry_after,
68            None => continue,
69        };
70
71        let categories = components
72            .next()
73            .unwrap_or("")
74            .split(';')
75            .filter(|category| !category.is_empty())
76            .map(DataCategory::from_name)
77            .collect();
78
79        let quota_scope = QuotaScope::from_name(components.next().unwrap_or(""));
80        let scope = RateLimitScope::for_quota(*scoping, quota_scope);
81
82        let reason_code = components
83            .next()
84            .filter(|s| !s.is_empty())
85            .map(ReasonCode::new);
86
87        let namespace = components
88            .next()
89            .unwrap_or("")
90            .split(';')
91            .filter(|s| !s.is_empty())
92            .filter_map(|s| s.parse().ok())
93            .collect();
94
95        rate_limits.add(RateLimit {
96            categories,
97            scope,
98            reason_code,
99            retry_after,
100            namespaces: namespace,
101        });
102    }
103
104    rate_limits
105}
106
107/// Infer the data category from an item.
108///
109/// Categories depend mostly on the item type, with a few special cases:
110/// - `Event`: the category is inferred from the event type. This requires the `event_type` header
111///   to be set on the event item.
112/// - `Attachment`: If the attachment creates an event (e.g. for minidumps), the category is assumed
113///   to be `Error`.
114fn infer_event_category(item: &Item) -> Option<DataCategory> {
115    match item.ty() {
116        ItemType::Event => Some(DataCategory::Error),
117        ItemType::Transaction => Some(DataCategory::Transaction),
118        ItemType::Security | ItemType::RawSecurity => Some(DataCategory::Security),
119        ItemType::UnrealReport => Some(DataCategory::Error),
120        ItemType::UserReportV2 => Some(DataCategory::UserReportV2),
121        ItemType::Attachment if item.creates_event() => Some(DataCategory::Error),
122        ItemType::Attachment => None,
123        ItemType::Session => None,
124        ItemType::Sessions => None,
125        ItemType::Statsd => None,
126        ItemType::MetricBuckets => None,
127        ItemType::FormData => None,
128        ItemType::UserReport => None,
129        ItemType::Profile => None,
130        ItemType::ReplayEvent => None,
131        ItemType::ReplayRecording => None,
132        ItemType::ReplayVideo => None,
133        ItemType::ClientReport => None,
134        ItemType::CheckIn => None,
135        ItemType::Nel => None,
136        ItemType::Log => None,
137        ItemType::TraceMetric => None,
138        ItemType::Span => None,
139        ItemType::ProfileChunk => None,
140        ItemType::Integration => None,
141        ItemType::Unknown(_) => None,
142    }
143}
144
145/// Quantity metrics for a single category of attachments.
146///
147/// Tracks both the count of attachments and size in bytes.
148#[derive(Clone, Copy, Debug, Default)]
149pub struct AttachmentQuantity {
150    /// Number of attachment items.
151    pub count: usize,
152    /// Total size of attachments in bytes.
153    pub bytes: usize,
154}
155
156impl AttachmentQuantity {
157    pub fn is_empty(&self) -> bool {
158        self.count == 0 && self.bytes == 0
159    }
160}
161
162/// Aggregated attachment quantities grouped by [`AttachmentParentType`].
163///
164/// This separation is necessary since rate limiting logic varies by [`AttachmentParentType`].
165#[derive(Clone, Copy, Debug, Default)]
166pub struct AttachmentQuantities {
167    /// Quantities of Event Attachments.
168    ///
169    /// See also: [`AttachmentParentType::Event`].
170    pub event: AttachmentQuantity,
171    /// Quantities of trace V2 Attachments.
172    pub trace: AttachmentQuantity,
173    /// Quantities of span V2 Attachments.
174    pub span: AttachmentQuantity,
175}
176
177impl AttachmentQuantities {
178    /// Returns the total count of all attachments across all parent types.
179    pub fn count(&self) -> usize {
180        let AttachmentQuantities { event, trace, span } = self;
181        event.count + trace.count + span.count
182    }
183
184    /// Returns the total size in bytes of all attachments across all parent types.
185    pub fn bytes(&self) -> usize {
186        let AttachmentQuantities { event, trace, span } = self;
187        event.bytes + trace.bytes + span.bytes
188    }
189}
190
191/// A summary of `Envelope` contents.
192///
193/// Summarizes the contained event, size of attachments, session updates, and whether there are
194/// plain attachments. This is used for efficient rate limiting or outcome handling.
195#[non_exhaustive]
196#[derive(Clone, Copy, Debug, Default)]
197pub struct EnvelopeSummary {
198    /// The data category of the event in the envelope. `None` if there is no event.
199    pub event_category: Option<DataCategory>,
200
201    /// The quantities of all attachments combined.
202    pub attachment_quantities: AttachmentQuantities,
203
204    /// The number of all session updates.
205    pub session_quantity: usize,
206
207    /// The number of profiles.
208    pub profile_quantity: usize,
209
210    /// The number of replays.
211    pub replay_quantity: usize,
212
213    /// The number of user reports (legacy item type for user feedback).
214    pub user_report_quantity: usize,
215
216    /// The number of monitor check-ins.
217    pub monitor_quantity: usize,
218
219    /// The number of log for the log product sent.
220    pub log_item_quantity: usize,
221
222    /// The number of log bytes for the log product sent, in bytes
223    pub log_byte_quantity: usize,
224
225    /// Secondary number of transactions.
226    ///
227    /// This is 0 for envelopes which contain a transaction,
228    /// only secondary transaction quantity should be tracked here,
229    /// these are for example transaction counts extracted from metrics.
230    ///
231    /// A "primary" transaction is contained within the envelope,
232    /// marking the envelope data category a [`DataCategory::Transaction`].
233    pub secondary_transaction_quantity: usize,
234
235    /// See `secondary_transaction_quantity`.
236    pub secondary_span_quantity: usize,
237
238    /// The number of standalone spans.
239    pub span_quantity: usize,
240
241    /// Indicates that the envelope contains regular attachments that do not create event payloads.
242    pub has_plain_attachments: bool,
243
244    /// The payload size of this envelope.
245    pub payload_size: usize,
246
247    /// The number of profile chunks in this envelope.
248    pub profile_chunk_quantity: usize,
249    /// The number of UI profile chunks in this envelope.
250    pub profile_chunk_ui_quantity: usize,
251
252    /// The number of trace metrics in this envelope.
253    pub trace_metric_quantity: usize,
254}
255
256impl EnvelopeSummary {
257    /// Creates an empty summary.
258    pub fn empty() -> Self {
259        Self::default()
260    }
261
262    /// Creates an envelope summary and aggregates the given envelope.
263    pub fn compute(envelope: &Envelope) -> Self {
264        let mut summary = Self::empty();
265
266        for item in envelope.items() {
267            if item.creates_event() {
268                summary.infer_category(item);
269            } else if item.ty() == &ItemType::Attachment {
270                // Plain attachments do not create events.
271                summary.has_plain_attachments = true;
272            }
273
274            // If the item has been rate limited before, the quota has been consumed and outcomes
275            // emitted. We can skip it here.
276            if item.rate_limited() {
277                continue;
278            }
279
280            if let Some(source_quantities) = item.source_quantities() {
281                summary.secondary_transaction_quantity += source_quantities.transactions;
282                summary.secondary_span_quantity += source_quantities.spans;
283                summary.profile_quantity += source_quantities.profiles;
284            }
285
286            summary.payload_size += item.len();
287
288            summary.add_quantities(item);
289
290            // Special case since v1 and v2 share a data category.
291            // Adding this in add_quantity would include v2 in the count.
292            if item.ty() == &ItemType::UserReport {
293                summary.user_report_quantity += 1;
294            }
295        }
296
297        summary
298    }
299
300    fn add_quantities(&mut self, item: &Item) {
301        for (category, quantity) in item.quantities() {
302            let target_quantity = match category {
303                DataCategory::Attachment => match item.attachment_parent_type() {
304                    AttachmentParentType::Span => &mut self.attachment_quantities.span.bytes,
305                    AttachmentParentType::Trace => &mut self.attachment_quantities.trace.bytes,
306                    AttachmentParentType::Event => &mut self.attachment_quantities.event.bytes,
307                },
308                DataCategory::AttachmentItem => match item.attachment_parent_type() {
309                    AttachmentParentType::Span => &mut self.attachment_quantities.span.count,
310                    AttachmentParentType::Trace => &mut self.attachment_quantities.trace.count,
311                    AttachmentParentType::Event => &mut self.attachment_quantities.event.count,
312                },
313                DataCategory::Session => &mut self.session_quantity,
314                DataCategory::Profile => &mut self.profile_quantity,
315                DataCategory::Replay => &mut self.replay_quantity,
316                DataCategory::DoNotUseReplayVideo => &mut self.replay_quantity,
317                DataCategory::Monitor => &mut self.monitor_quantity,
318                DataCategory::Span => &mut self.span_quantity,
319                DataCategory::TraceMetric => &mut self.trace_metric_quantity,
320                DataCategory::LogItem => &mut self.log_item_quantity,
321                DataCategory::LogByte => &mut self.log_byte_quantity,
322                DataCategory::ProfileChunk => &mut self.profile_chunk_quantity,
323                DataCategory::ProfileChunkUi => &mut self.profile_chunk_ui_quantity,
324                // TODO: This catch-all looks dangerous
325                _ => continue,
326            };
327            *target_quantity += quantity;
328        }
329    }
330
331    /// Infers the appropriate [`DataCategory`] for the envelope [`Item`].
332    ///
333    /// The inferred category is only applied to the [`EnvelopeSummary`] if there is not yet
334    /// a category set.
335    fn infer_category(&mut self, item: &Item) {
336        if matches!(self.event_category, None | Some(DataCategory::Default))
337            && let Some(category) = infer_event_category(item)
338        {
339            self.event_category = Some(category);
340        }
341    }
342
343    /// Returns `true` if the envelope contains items that depend on spans.
344    ///
345    /// This is used to determined if we should be checking span quota, as the quota should be
346    /// checked both if there are spans or if there are span dependent items (e.g. span attachments).
347    pub fn has_span_dependent_items(&self) -> bool {
348        !self.attachment_quantities.span.is_empty()
349    }
350}
351
352/// Rate limiting information for a data category.
353#[derive(Debug, Default, PartialEq)]
354#[cfg_attr(test, derive(Clone))]
355pub struct CategoryLimit {
356    /// The limited data category.
357    category: Option<DataCategory>,
358    /// The total rate limited quantity across all items.
359    ///
360    /// This will be `0` if nothing was rate limited.
361    quantity: usize,
362    /// The reason code of the applied rate limit.
363    ///
364    /// Defaults to `None` if the quota does not declare a reason code.
365    reason_code: Option<ReasonCode>,
366}
367
368impl CategoryLimit {
369    /// Creates a new `CategoryLimit`.
370    ///
371    /// Returns an inactive limit if `rate_limit` is `None`.
372    fn new(category: DataCategory, quantity: usize, rate_limit: Option<&RateLimit>) -> Self {
373        match rate_limit {
374            Some(limit) => Self {
375                category: Some(category),
376                quantity,
377                reason_code: limit.reason_code.clone(),
378            },
379            None => Self::default(),
380        }
381    }
382
383    /// Recreates the category limit, if active, for a new category with the same reason.
384    pub fn clone_for(&self, category: DataCategory, quantity: usize) -> CategoryLimit {
385        if !self.is_active() {
386            return Self::default();
387        }
388
389        Self {
390            category: Some(category),
391            quantity,
392            reason_code: self.reason_code.clone(),
393        }
394    }
395
396    /// Returns `true` if this is an active limit.
397    ///
398    /// Inactive limits are placeholders with no category set.
399    pub fn is_active(&self) -> bool {
400        self.category.is_some()
401    }
402}
403
404/// Rate limiting information for a single category of attachments.
405#[derive(Default, Debug)]
406#[cfg_attr(test, derive(Clone))]
407pub struct AttachmentLimits {
408    /// Rate limit applied to attachment bytes ([`DataCategory::Attachment`]).
409    pub bytes: CategoryLimit,
410    /// Rate limit applied to attachment item count ([`DataCategory::AttachmentItem`]).
411    pub count: CategoryLimit,
412}
413
414impl AttachmentLimits {
415    fn is_active(&self) -> bool {
416        self.bytes.is_active() || self.count.is_active()
417    }
418}
419
420/// Rate limiting information for attachments grouped by [`AttachmentParentType`].
421///
422/// See [`AttachmentQuantities`] for the corresponding quantity tracking.
423#[derive(Default, Debug)]
424#[cfg_attr(test, derive(Clone))]
425pub struct AttachmentsLimits {
426    /// Limits for V1 Attachments.
427    pub event: AttachmentLimits,
428    /// Limits for trace V2 Attachments.
429    pub trace: AttachmentLimits,
430    /// Limits for span V2 Attachments.
431    pub span: AttachmentLimits,
432}
433
434/// Information on the limited quantities returned by [`EnvelopeLimiter::compute`].
435#[derive(Default, Debug)]
436#[cfg_attr(test, derive(Clone))]
437pub struct Enforcement {
438    /// The event item rate limit.
439    pub event: CategoryLimit,
440    /// The rate limit for the indexed category of the event.
441    pub event_indexed: CategoryLimit,
442    /// The attachments limits
443    pub attachments_limits: AttachmentsLimits,
444    /// The combined session item rate limit.
445    pub sessions: CategoryLimit,
446    /// The combined profile item rate limit.
447    pub profiles: CategoryLimit,
448    /// The rate limit for the indexed profiles category.
449    pub profiles_indexed: CategoryLimit,
450    /// The combined replay item rate limit.
451    pub replays: CategoryLimit,
452    /// The combined check-in item rate limit.
453    pub check_ins: CategoryLimit,
454    /// The combined logs (our product logs) rate limit.
455    pub log_items: CategoryLimit,
456    /// The combined logs (our product logs) rate limit.
457    pub log_bytes: CategoryLimit,
458    /// The combined spans rate limit.
459    pub spans: CategoryLimit,
460    /// The rate limit for the indexed span category.
461    pub spans_indexed: CategoryLimit,
462    /// The rate limit for user report v1.
463    pub user_reports: CategoryLimit,
464    /// The combined profile chunk item rate limit.
465    pub profile_chunks: CategoryLimit,
466    /// The combined profile chunk ui item rate limit.
467    pub profile_chunks_ui: CategoryLimit,
468    /// The combined trace metric item rate limit.
469    pub trace_metrics: CategoryLimit,
470}
471
472impl Enforcement {
473    /// Returns the `CategoryLimit` for the event.
474    ///
475    /// `None` if the event is not rate limited.
476    pub fn active_event(&self) -> Option<&CategoryLimit> {
477        if self.event.is_active() {
478            Some(&self.event)
479        } else if self.event_indexed.is_active() {
480            Some(&self.event_indexed)
481        } else {
482            None
483        }
484    }
485
486    /// Returns `true` if the event is rate limited.
487    pub fn is_event_active(&self) -> bool {
488        self.active_event().is_some()
489    }
490
491    /// Helper for `track_outcomes`.
492    fn get_outcomes(self) -> impl Iterator<Item = (Outcome, DataCategory, usize)> {
493        let Self {
494            event,
495            event_indexed,
496            attachments_limits:
497                AttachmentsLimits {
498                    event:
499                        AttachmentLimits {
500                            bytes: event_attachment_bytes,
501                            count: event_attachment_item,
502                        },
503                    trace:
504                        AttachmentLimits {
505                            bytes: trace_attachment_bytes,
506                            count: trace_attachment_item,
507                        },
508                    span:
509                        AttachmentLimits {
510                            bytes: span_attachment_bytes,
511                            count: span_attachment_item,
512                        },
513                },
514            sessions: _, // Do not report outcomes for sessions.
515            profiles,
516            profiles_indexed,
517            replays,
518            check_ins,
519            log_items,
520            log_bytes,
521            spans,
522            spans_indexed,
523            user_reports,
524            profile_chunks,
525            profile_chunks_ui,
526            trace_metrics,
527        } = self;
528
529        let limits = [
530            event,
531            event_indexed,
532            event_attachment_bytes,
533            event_attachment_item,
534            trace_attachment_bytes,
535            trace_attachment_item,
536            span_attachment_bytes,
537            span_attachment_item,
538            profiles,
539            profiles_indexed,
540            replays,
541            check_ins,
542            log_items,
543            log_bytes,
544            spans,
545            spans_indexed,
546            user_reports,
547            profile_chunks,
548            profile_chunks_ui,
549            trace_metrics,
550        ];
551
552        limits
553            .into_iter()
554            .filter(|limit| limit.quantity > 0)
555            .filter_map(move |limit| {
556                Some((
557                    Outcome::RateLimited(limit.reason_code),
558                    limit.category?,
559                    limit.quantity,
560                ))
561            })
562    }
563
564    /// Applies the [`Enforcement`] on the [`Envelope`] by removing all items that were rate limited
565    /// and emits outcomes for each rate limited category.
566    ///
567    /// # Example
568    ///
569    /// ## Interaction between Events and Attachments
570    ///
571    /// An envelope with an `Error` event and an `Attachment`. Two quotas specify to drop all
572    /// attachments (reason `"a"`) and all errors (reason `"e"`). The result of enforcement will be:
573    ///
574    /// 1. All items are removed from the envelope.
575    /// 2. Enforcements report both the event and the attachment dropped with reason `"e"`, since
576    ///    dropping an event automatically drops all attachments with the same reason.
577    /// 3. Rate limits report the single event limit `"e"`, since attachment limits do not need to
578    ///    be checked in this case.
579    ///
580    /// ## Required Attachments
581    ///
582    /// An envelope with a single Minidump `Attachment`, and a single quota specifying to drop all
583    /// attachments with reason `"a"`:
584    ///
585    /// 1. Since the minidump creates an event and is required for processing, it remains in the
586    ///    envelope and is marked as `rate_limited`.
587    /// 2. Enforcements report the attachment dropped with reason `"a"`.
588    /// 3. Rate limits are empty since it is allowed to send required attachments even when rate
589    ///    limited.
590    ///
591    /// ## Previously Rate Limited Attachments
592    ///
593    /// An envelope with a single item marked as `rate_limited`, and a quota specifying to drop
594    /// everything with reason `"d"`:
595    ///
596    /// 1. The item remains in the envelope.
597    /// 2. Enforcements are empty. Rate limiting has occurred at an earlier stage in the pipeline.
598    /// 3. Rate limits are empty.
599    pub fn apply_with_outcomes(self, envelope: &mut ManagedEnvelope) {
600        envelope
601            .envelope_mut()
602            .retain_items(|item| self.retain_item(item));
603        self.track_outcomes(envelope);
604    }
605
606    /// Returns `true` when an [`Item`] can be retained, `false` otherwise.
607    fn retain_item(&self, item: &mut Item) -> bool {
608        // Remove event items and all items that depend on this event
609        if self.event.is_active() && item.requires_event() {
610            return false;
611        }
612
613        // When checking limits for categories that have an indexed variant,
614        // we only have to check the more specific, the indexed, variant
615        // to determine whether an item is limited.
616        match item.ty() {
617            ItemType::Attachment => {
618                match item.attachment_parent_type() {
619                    AttachmentParentType::Span => !self.attachments_limits.span.is_active(),
620                    AttachmentParentType::Trace => !self.attachments_limits.trace.is_active(),
621                    AttachmentParentType::Event => {
622                        if !self.attachments_limits.event.is_active() {
623                            return true;
624                        }
625                        if item.creates_event() {
626                            item.set_rate_limited(true);
627                            true
628                        } else {
629                            false
630                        }
631                    }
632                }
633            }
634            ItemType::Session => !self.sessions.is_active(),
635            ItemType::Profile => !self.profiles_indexed.is_active(),
636            ItemType::ReplayEvent => !self.replays.is_active(),
637            ItemType::ReplayVideo => !self.replays.is_active(),
638            ItemType::ReplayRecording => !self.replays.is_active(),
639            ItemType::UserReport => !self.user_reports.is_active(),
640            ItemType::CheckIn => !self.check_ins.is_active(),
641            ItemType::Log => {
642                !(self.log_items.is_active() || self.log_bytes.is_active())
643            }
644            ItemType::Span => !self.spans_indexed.is_active(),
645            ItemType::ProfileChunk => match item.profile_type() {
646                Some(ProfileType::Backend) => !self.profile_chunks.is_active(),
647                Some(ProfileType::Ui) => !self.profile_chunks_ui.is_active(),
648                None => true,
649            },
650            ItemType::TraceMetric => !self.trace_metrics.is_active(),
651            ItemType::Integration => match item.integration() {
652                Some(Integration::Logs(_)) => !(self.log_items.is_active() || self.log_bytes.is_active()),
653                Some(Integration::Spans(_)) => !self.spans_indexed.is_active(),
654                None => true,
655            },
656            ItemType::Event
657            | ItemType::Transaction
658            | ItemType::Security
659            | ItemType::FormData
660            | ItemType::RawSecurity
661            | ItemType::Nel
662            | ItemType::UnrealReport
663            | ItemType::Sessions
664            | ItemType::Statsd
665            | ItemType::MetricBuckets
666            | ItemType::ClientReport
667            | ItemType::UserReportV2  // This is an event type.
668            | ItemType::Unknown(_) => true,
669        }
670    }
671
672    /// Invokes track outcome on all enforcements reported by the [`EnvelopeLimiter`].
673    ///
674    /// Relay generally does not emit outcomes for sessions, so those are skipped.
675    fn track_outcomes(self, envelope: &mut ManagedEnvelope) {
676        for (outcome, category, quantity) in self.get_outcomes() {
677            envelope.track_outcome(outcome, category, quantity)
678        }
679    }
680}
681
682/// Which limits to check with the [`EnvelopeLimiter`].
683#[derive(Debug, Copy, Clone)]
684pub enum CheckLimits {
685    /// Checks all limits except indexed categories.
686    ///
687    /// In the fast path it is necessary to apply cached rate limits but to not enforce indexed rate limits.
688    /// Because at the time of the check the decision whether an envelope is sampled or not is not yet known.
689    /// Additionally even if the item is later dropped by dynamic sampling, it must still be around to extract metrics
690    /// and cannot be dropped too early.
691    NonIndexed,
692    /// Checks all limits against the envelope.
693    All,
694}
695
696struct Check<F, E, R> {
697    limits: CheckLimits,
698    check: F,
699    _1: PhantomData<E>,
700    _2: PhantomData<R>,
701}
702
703impl<F, E, R> Check<F, E, R>
704where
705    F: FnMut(ItemScoping, usize) -> R,
706    R: Future<Output = Result<RateLimits, E>>,
707{
708    async fn apply(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, E> {
709        if matches!(self.limits, CheckLimits::NonIndexed) && scoping.category.is_indexed() {
710            return Ok(RateLimits::default());
711        }
712
713        (self.check)(scoping, quantity).await
714    }
715}
716
717/// Enforces rate limits with the given `check` function on items in the envelope.
718///
719/// The `check` function is called with the following rules:
720///  - Once for a single event, if present in the envelope.
721///  - Once for all comprised attachments, unless the event was rate limited.
722///  - Once for all comprised sessions.
723///
724/// Items violating the rate limit are removed from the envelope. This follows a set of rules:
725///  - If the event is removed, all items depending on the event are removed (e.g. attachments).
726///  - Attachments are not removed if they create events (e.g. minidumps).
727///  - Sessions are handled separately from all of the above.
728pub struct EnvelopeLimiter<F, E, R> {
729    check: Check<F, E, R>,
730    event_category: Option<DataCategory>,
731}
732
733impl<'a, F, E, R> EnvelopeLimiter<F, E, R>
734where
735    F: FnMut(ItemScoping, usize) -> R,
736    R: Future<Output = Result<RateLimits, E>>,
737{
738    /// Create a new `EnvelopeLimiter` with the given `check` function.
739    pub fn new(limits: CheckLimits, check: F) -> Self {
740        Self {
741            check: Check {
742                check,
743                limits,
744                _1: PhantomData,
745                _2: PhantomData,
746            },
747            event_category: None,
748        }
749    }
750
751    /// Assume an event with the given category, even if no item is present in the envelope.
752    ///
753    /// This ensures that rate limits for the given data category are checked even if there is no
754    /// matching item in the envelope. Other items are handled according to the rules as if the
755    /// event item were present.
756    pub fn assume_event(&mut self, category: DataCategory) {
757        self.event_category = Some(category);
758    }
759
760    /// Process rate limits for the envelope, returning applied limits.
761    ///
762    /// Returns a tuple of `Enforcement` and `RateLimits`:
763    ///
764    /// - Enforcements declare the quantities of categories that have been rate limited with the
765    ///   individual reason codes that caused rate limiting. If multiple rate limits applied to a
766    ///   category, then the longest limit is reported.
767    /// - Rate limits declare all active rate limits, regardless of whether they have been applied
768    ///   to items in the envelope. This excludes rate limits applied to required attachments, since
769    ///   clients are allowed to continue sending them.
770    pub async fn compute(
771        mut self,
772        envelope: &mut Envelope,
773        scoping: &'a Scoping,
774    ) -> Result<(Enforcement, RateLimits), E> {
775        let mut summary = EnvelopeSummary::compute(envelope);
776        summary.event_category = self.event_category.or(summary.event_category);
777
778        let (enforcement, rate_limits) = self.execute(&summary, scoping).await?;
779        Ok((enforcement, rate_limits))
780    }
781
782    async fn execute(
783        &mut self,
784        summary: &EnvelopeSummary,
785        scoping: &'a Scoping,
786    ) -> Result<(Enforcement, RateLimits), E> {
787        let mut rate_limits = RateLimits::new();
788        let mut enforcement = Enforcement::default();
789
790        // Handle event.
791        if let Some(category) = summary.event_category {
792            // Check the broad category for limits.
793            let mut event_limits = self.check.apply(scoping.item(category), 1).await?;
794            enforcement.event = CategoryLimit::new(category, 1, event_limits.longest());
795
796            if let Some(index_category) = category.index_category() {
797                // Check the specific/indexed category for limits only if the specific one has not already
798                // an enforced limit.
799                if event_limits.is_empty() {
800                    event_limits.merge(self.check.apply(scoping.item(index_category), 1).await?);
801                }
802
803                enforcement.event_indexed =
804                    CategoryLimit::new(index_category, 1, event_limits.longest());
805            };
806
807            rate_limits.merge(event_limits);
808        }
809
810        // Handle spans.
811        if enforcement.is_event_active() {
812            enforcement.spans = enforcement
813                .event
814                .clone_for(DataCategory::Span, summary.span_quantity);
815
816            enforcement.spans_indexed = enforcement
817                .event_indexed
818                .clone_for(DataCategory::SpanIndexed, summary.span_quantity);
819        } else if summary.span_quantity > 0 || summary.has_span_dependent_items() {
820            let mut span_limits = self
821                .check
822                .apply(scoping.item(DataCategory::Span), summary.span_quantity)
823                .await?;
824            enforcement.spans = CategoryLimit::new(
825                DataCategory::Span,
826                summary.span_quantity,
827                span_limits.longest(),
828            );
829
830            if span_limits.is_empty() {
831                span_limits.merge(
832                    self.check
833                        .apply(
834                            scoping.item(DataCategory::SpanIndexed),
835                            summary.span_quantity,
836                        )
837                        .await?,
838                );
839            }
840
841            enforcement.spans_indexed = CategoryLimit::new(
842                DataCategory::SpanIndexed,
843                summary.span_quantity,
844                span_limits.longest(),
845            );
846
847            rate_limits.merge(span_limits);
848        }
849
850        // Handle span attachments
851        if enforcement.spans_indexed.is_active() {
852            enforcement.attachments_limits.span.bytes = enforcement.spans_indexed.clone_for(
853                DataCategory::Attachment,
854                summary.attachment_quantities.span.bytes,
855            );
856            enforcement.attachments_limits.span.count = enforcement.spans_indexed.clone_for(
857                DataCategory::AttachmentItem,
858                summary.attachment_quantities.span.count,
859            );
860        } else if !summary.attachment_quantities.span.is_empty() {
861            // While we could combine this check with the check that we do for event and trace
862            // attachments, this would complicate the logic so we opted against doing that.
863            // In practice the performance impact should be negligible since different types
864            // of attachments should rarely be send together.
865            enforcement.attachments_limits.span = self
866                .check_attachment_limits(scoping, &summary.attachment_quantities.span)
867                .await?;
868        }
869
870        // Handle attachments.
871        if let Some(limit) = enforcement.active_event() {
872            let limit1 = limit.clone_for(
873                DataCategory::Attachment,
874                summary.attachment_quantities.event.bytes,
875            );
876            let limit2 = limit.clone_for(
877                DataCategory::AttachmentItem,
878                summary.attachment_quantities.event.count,
879            );
880
881            enforcement.attachments_limits.event.bytes = limit1;
882            enforcement.attachments_limits.event.count = limit2;
883        } else {
884            let mut attachment_limits = RateLimits::new();
885            if summary.attachment_quantities.event.bytes > 0 {
886                let item_scoping = scoping.item(DataCategory::Attachment);
887
888                let attachment_byte_limits = self
889                    .check
890                    .apply(item_scoping, summary.attachment_quantities.event.bytes)
891                    .await?;
892
893                enforcement.attachments_limits.event.bytes = CategoryLimit::new(
894                    DataCategory::Attachment,
895                    summary.attachment_quantities.event.bytes,
896                    attachment_byte_limits.longest(),
897                );
898                enforcement.attachments_limits.event.count =
899                    enforcement.attachments_limits.event.bytes.clone_for(
900                        DataCategory::AttachmentItem,
901                        summary.attachment_quantities.event.count,
902                    );
903                attachment_limits.merge(attachment_byte_limits);
904            }
905            if !attachment_limits.is_limited() && summary.attachment_quantities.event.count > 0 {
906                let item_scoping = scoping.item(DataCategory::AttachmentItem);
907
908                let attachment_item_limits = self
909                    .check
910                    .apply(item_scoping, summary.attachment_quantities.event.count)
911                    .await?;
912
913                enforcement.attachments_limits.event.count = CategoryLimit::new(
914                    DataCategory::AttachmentItem,
915                    summary.attachment_quantities.event.count,
916                    attachment_item_limits.longest(),
917                );
918                enforcement.attachments_limits.event.bytes =
919                    enforcement.attachments_limits.event.count.clone_for(
920                        DataCategory::Attachment,
921                        summary.attachment_quantities.event.bytes,
922                    );
923                attachment_limits.merge(attachment_item_limits);
924            }
925
926            // Only record rate limits for plain attachments. For all other attachments, it's
927            // perfectly "legal" to send them. They will still be discarded in Sentry, but clients
928            // can continue to send them.
929            if summary.has_plain_attachments {
930                rate_limits.merge(attachment_limits);
931            }
932        }
933
934        // Handle trace attachments.
935        if !summary.attachment_quantities.trace.is_empty() {
936            enforcement.attachments_limits.trace = self
937                .check_attachment_limits(scoping, &summary.attachment_quantities.trace)
938                .await?;
939        }
940
941        // Handle sessions.
942        if summary.session_quantity > 0 {
943            let item_scoping = scoping.item(DataCategory::Session);
944            let session_limits = self
945                .check
946                .apply(item_scoping, summary.session_quantity)
947                .await?;
948            enforcement.sessions = CategoryLimit::new(
949                DataCategory::Session,
950                summary.session_quantity,
951                session_limits.longest(),
952            );
953            rate_limits.merge(session_limits);
954        }
955
956        // Handle trace metrics.
957        if summary.trace_metric_quantity > 0 {
958            let item_scoping = scoping.item(DataCategory::TraceMetric);
959            let trace_metric_limits = self
960                .check
961                .apply(item_scoping, summary.trace_metric_quantity)
962                .await?;
963            enforcement.trace_metrics = CategoryLimit::new(
964                DataCategory::TraceMetric,
965                summary.trace_metric_quantity,
966                trace_metric_limits.longest(),
967            );
968            rate_limits.merge(trace_metric_limits);
969        }
970
971        // Handle logs.
972        if summary.log_item_quantity > 0 {
973            let item_scoping = scoping.item(DataCategory::LogItem);
974            let log_limits = self
975                .check
976                .apply(item_scoping, summary.log_item_quantity)
977                .await?;
978            enforcement.log_items = CategoryLimit::new(
979                DataCategory::LogItem,
980                summary.log_item_quantity,
981                log_limits.longest(),
982            );
983            rate_limits.merge(log_limits);
984        }
985        if summary.log_byte_quantity > 0 {
986            let item_scoping = scoping.item(DataCategory::LogByte);
987            let log_limits = self
988                .check
989                .apply(item_scoping, summary.log_byte_quantity)
990                .await?;
991            enforcement.log_bytes = CategoryLimit::new(
992                DataCategory::LogByte,
993                summary.log_byte_quantity,
994                log_limits.longest(),
995            );
996            rate_limits.merge(log_limits);
997        }
998
999        // Handle profiles.
1000        if enforcement.is_event_active() {
1001            enforcement.profiles = enforcement
1002                .event
1003                .clone_for(DataCategory::Profile, summary.profile_quantity);
1004
1005            enforcement.profiles_indexed = enforcement
1006                .event_indexed
1007                .clone_for(DataCategory::ProfileIndexed, summary.profile_quantity)
1008        } else if summary.profile_quantity > 0 {
1009            let mut profile_limits = self
1010                .check
1011                .apply(
1012                    scoping.item(DataCategory::Profile),
1013                    summary.profile_quantity,
1014                )
1015                .await?;
1016
1017            // Profiles can persist in envelopes without transaction if the transaction item
1018            // was dropped by dynamic sampling.
1019            if profile_limits.is_empty() && summary.event_category.is_none() {
1020                profile_limits = self
1021                    .check
1022                    .apply(scoping.item(DataCategory::Transaction), 0)
1023                    .await?;
1024            }
1025
1026            enforcement.profiles = CategoryLimit::new(
1027                DataCategory::Profile,
1028                summary.profile_quantity,
1029                profile_limits.longest(),
1030            );
1031
1032            if profile_limits.is_empty() {
1033                profile_limits.merge(
1034                    self.check
1035                        .apply(
1036                            scoping.item(DataCategory::ProfileIndexed),
1037                            summary.profile_quantity,
1038                        )
1039                        .await?,
1040                );
1041            }
1042
1043            enforcement.profiles_indexed = CategoryLimit::new(
1044                DataCategory::ProfileIndexed,
1045                summary.profile_quantity,
1046                profile_limits.longest(),
1047            );
1048
1049            rate_limits.merge(profile_limits);
1050        }
1051
1052        // Handle replays.
1053        if summary.replay_quantity > 0 {
1054            let item_scoping = scoping.item(DataCategory::Replay);
1055            let replay_limits = self
1056                .check
1057                .apply(item_scoping, summary.replay_quantity)
1058                .await?;
1059            enforcement.replays = CategoryLimit::new(
1060                DataCategory::Replay,
1061                summary.replay_quantity,
1062                replay_limits.longest(),
1063            );
1064            rate_limits.merge(replay_limits);
1065        }
1066
1067        // Handle user report v1s, which share limits with v2.
1068        if summary.user_report_quantity > 0 {
1069            let item_scoping = scoping.item(DataCategory::UserReportV2);
1070            let user_report_v2_limits = self
1071                .check
1072                .apply(item_scoping, summary.user_report_quantity)
1073                .await?;
1074            enforcement.user_reports = CategoryLimit::new(
1075                DataCategory::UserReportV2,
1076                summary.user_report_quantity,
1077                user_report_v2_limits.longest(),
1078            );
1079            rate_limits.merge(user_report_v2_limits);
1080        }
1081
1082        // Handle monitor checkins.
1083        if summary.monitor_quantity > 0 {
1084            let item_scoping = scoping.item(DataCategory::Monitor);
1085            let checkin_limits = self
1086                .check
1087                .apply(item_scoping, summary.monitor_quantity)
1088                .await?;
1089            enforcement.check_ins = CategoryLimit::new(
1090                DataCategory::Monitor,
1091                summary.monitor_quantity,
1092                checkin_limits.longest(),
1093            );
1094            rate_limits.merge(checkin_limits);
1095        }
1096
1097        // Handle profile chunks.
1098        if summary.profile_chunk_quantity > 0 {
1099            let item_scoping = scoping.item(DataCategory::ProfileChunk);
1100            let limits = self
1101                .check
1102                .apply(item_scoping, summary.profile_chunk_quantity)
1103                .await?;
1104            enforcement.profile_chunks = CategoryLimit::new(
1105                DataCategory::ProfileChunk,
1106                summary.profile_chunk_quantity,
1107                limits.longest(),
1108            );
1109            rate_limits.merge(limits);
1110        }
1111
1112        if summary.profile_chunk_ui_quantity > 0 {
1113            let item_scoping = scoping.item(DataCategory::ProfileChunkUi);
1114            let limits = self
1115                .check
1116                .apply(item_scoping, summary.profile_chunk_ui_quantity)
1117                .await?;
1118            enforcement.profile_chunks_ui = CategoryLimit::new(
1119                DataCategory::ProfileChunkUi,
1120                summary.profile_chunk_ui_quantity,
1121                limits.longest(),
1122            );
1123            rate_limits.merge(limits);
1124        }
1125
1126        Ok((enforcement, rate_limits))
1127    }
1128
1129    async fn check_attachment_limits(
1130        &mut self,
1131        scoping: &Scoping,
1132        quantities: &AttachmentQuantity,
1133    ) -> Result<AttachmentLimits, E> {
1134        let mut attachment_limits = self
1135            .check
1136            .apply(scoping.item(DataCategory::Attachment), quantities.bytes)
1137            .await?;
1138
1139        // Note: The check here is taken from the attachments logic for consistency I think just
1140        // checking `is_empty` should be fine?
1141        if !attachment_limits.is_limited() && quantities.count > 0 {
1142            attachment_limits.merge(
1143                self.check
1144                    .apply(scoping.item(DataCategory::AttachmentItem), quantities.count)
1145                    .await?,
1146            );
1147        }
1148
1149        Ok(AttachmentLimits {
1150            bytes: CategoryLimit::new(
1151                DataCategory::Attachment,
1152                quantities.bytes,
1153                attachment_limits.longest(),
1154            ),
1155            count: CategoryLimit::new(
1156                DataCategory::AttachmentItem,
1157                quantities.count,
1158                attachment_limits.longest(),
1159            ),
1160        })
1161    }
1162}
1163
1164impl<F, E, R> fmt::Debug for EnvelopeLimiter<F, E, R> {
1165    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1166        f.debug_struct("EnvelopeLimiter")
1167            .field("event_category", &self.event_category)
1168            .finish()
1169    }
1170}
1171
1172#[cfg(test)]
1173mod tests {
1174
1175    use std::collections::{BTreeMap, BTreeSet};
1176    use std::sync::Arc;
1177
1178    use relay_base_schema::organization::OrganizationId;
1179    use relay_base_schema::project::{ProjectId, ProjectKey};
1180    use relay_metrics::MetricNamespace;
1181    use relay_quotas::RetryAfter;
1182    use relay_system::Addr;
1183    use smallvec::smallvec;
1184    use tokio::sync::Mutex;
1185
1186    use super::*;
1187    use crate::envelope::ParentId;
1188    use crate::{
1189        envelope::{AttachmentType, ContentType, SourceQuantities},
1190        extractors::RequestMeta,
1191    };
1192
1193    struct RateLimitTestCase {
1194        name: &'static str,
1195        denied_categories: &'static [DataCategory],
1196        expect_attachment_limit_active: bool,
1197        expected_limiter_calls: &'static [(DataCategory, usize)],
1198        expected_outcomes: &'static [(DataCategory, usize)],
1199    }
1200
1201    #[tokio::test]
1202    async fn test_format_rate_limits() {
1203        let mut rate_limits = RateLimits::new();
1204
1205        // Add a generic rate limit for all categories.
1206        rate_limits.add(RateLimit {
1207            categories: Default::default(),
1208            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1209            reason_code: Some(ReasonCode::new("my_limit")),
1210            retry_after: RetryAfter::from_secs(42),
1211            namespaces: smallvec![],
1212        });
1213
1214        // Add a more specific rate limit for just one category.
1215        rate_limits.add(RateLimit {
1216            categories: [DataCategory::Transaction, DataCategory::Security].into(),
1217            scope: RateLimitScope::Project(ProjectId::new(21)),
1218            reason_code: None,
1219            retry_after: RetryAfter::from_secs(4711),
1220            namespaces: smallvec![],
1221        });
1222
1223        let formatted = format_rate_limits(&rate_limits);
1224        let expected = "42::organization:my_limit, 4711:transaction;security:project";
1225        assert_eq!(formatted, expected);
1226    }
1227
1228    #[tokio::test]
1229    async fn test_format_rate_limits_namespace() {
1230        let mut rate_limits = RateLimits::new();
1231
1232        // Rate limit with reason code and namespace.
1233        rate_limits.add(RateLimit {
1234            categories: [DataCategory::MetricBucket].into(),
1235            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1236            reason_code: Some(ReasonCode::new("my_limit")),
1237            retry_after: RetryAfter::from_secs(42),
1238            namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1239        });
1240
1241        // Rate limit without reason code.
1242        rate_limits.add(RateLimit {
1243            categories: [DataCategory::MetricBucket].into(),
1244            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1245            reason_code: None,
1246            retry_after: RetryAfter::from_secs(42),
1247            namespaces: smallvec![MetricNamespace::Spans],
1248        });
1249
1250        let formatted = format_rate_limits(&rate_limits);
1251        let expected = "42:metric_bucket:organization:my_limit:custom;spans, 42:metric_bucket:organization::spans";
1252        assert_eq!(formatted, expected);
1253    }
1254
1255    #[tokio::test]
1256    async fn test_parse_invalid_rate_limits() {
1257        let scoping = Scoping {
1258            organization_id: OrganizationId::new(42),
1259            project_id: ProjectId::new(21),
1260            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1261            key_id: Some(17),
1262        };
1263
1264        assert!(parse_rate_limits(&scoping, "").is_ok());
1265        assert!(parse_rate_limits(&scoping, "invalid").is_ok());
1266        assert!(parse_rate_limits(&scoping, ",,,").is_ok());
1267    }
1268
1269    #[tokio::test]
1270    async fn test_parse_rate_limits() {
1271        let scoping = Scoping {
1272            organization_id: OrganizationId::new(42),
1273            project_id: ProjectId::new(21),
1274            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1275            key_id: Some(17),
1276        };
1277
1278        // contains "foobar", an unknown scope that should be mapped to Unknown
1279        let formatted =
1280            "42::organization:my_limit, invalid, 4711:foobar;transaction;security:project";
1281        let rate_limits: Vec<RateLimit> =
1282            parse_rate_limits(&scoping, formatted).into_iter().collect();
1283
1284        assert_eq!(
1285            rate_limits,
1286            vec![
1287                RateLimit {
1288                    categories: Default::default(),
1289                    scope: RateLimitScope::Organization(OrganizationId::new(42)),
1290                    reason_code: Some(ReasonCode::new("my_limit")),
1291                    retry_after: rate_limits[0].retry_after,
1292                    namespaces: smallvec![],
1293                },
1294                RateLimit {
1295                    categories: [
1296                        DataCategory::Unknown,
1297                        DataCategory::Transaction,
1298                        DataCategory::Security,
1299                    ]
1300                    .into(),
1301                    scope: RateLimitScope::Project(ProjectId::new(21)),
1302                    reason_code: None,
1303                    retry_after: rate_limits[1].retry_after,
1304                    namespaces: smallvec![],
1305                }
1306            ]
1307        );
1308
1309        assert_eq!(42, rate_limits[0].retry_after.remaining_seconds());
1310        assert_eq!(4711, rate_limits[1].retry_after.remaining_seconds());
1311    }
1312
1313    #[tokio::test]
1314    async fn test_parse_rate_limits_namespace() {
1315        let scoping = Scoping {
1316            organization_id: OrganizationId::new(42),
1317            project_id: ProjectId::new(21),
1318            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1319            key_id: Some(17),
1320        };
1321
1322        let formatted = "42:metric_bucket:organization::custom;spans";
1323        let rate_limits: Vec<RateLimit> =
1324            parse_rate_limits(&scoping, formatted).into_iter().collect();
1325
1326        assert_eq!(
1327            rate_limits,
1328            vec![RateLimit {
1329                categories: [DataCategory::MetricBucket].into(),
1330                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1331                reason_code: None,
1332                retry_after: rate_limits[0].retry_after,
1333                namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1334            }]
1335        );
1336    }
1337
1338    #[tokio::test]
1339    async fn test_parse_rate_limits_empty_namespace() {
1340        let scoping = Scoping {
1341            organization_id: OrganizationId::new(42),
1342            project_id: ProjectId::new(21),
1343            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1344            key_id: Some(17),
1345        };
1346
1347        // notice the trailing colon
1348        let formatted = "42:metric_bucket:organization:some_reason:";
1349        let rate_limits: Vec<RateLimit> =
1350            parse_rate_limits(&scoping, formatted).into_iter().collect();
1351
1352        assert_eq!(
1353            rate_limits,
1354            vec![RateLimit {
1355                categories: [DataCategory::MetricBucket].into(),
1356                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1357                reason_code: Some(ReasonCode::new("some_reason")),
1358                retry_after: rate_limits[0].retry_after,
1359                namespaces: smallvec![],
1360            }]
1361        );
1362    }
1363
1364    #[tokio::test]
1365    async fn test_parse_rate_limits_only_unknown() {
1366        let scoping = Scoping {
1367            organization_id: OrganizationId::new(42),
1368            project_id: ProjectId::new(21),
1369            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1370            key_id: Some(17),
1371        };
1372
1373        let formatted = "42:foo;bar:organization";
1374        let rate_limits: Vec<RateLimit> =
1375            parse_rate_limits(&scoping, formatted).into_iter().collect();
1376
1377        assert_eq!(
1378            rate_limits,
1379            vec![RateLimit {
1380                categories: [DataCategory::Unknown, DataCategory::Unknown].into(),
1381                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1382                reason_code: None,
1383                retry_after: rate_limits[0].retry_after,
1384                namespaces: smallvec![],
1385            },]
1386        );
1387    }
1388
1389    macro_rules! envelope {
1390        ($( $item_type:ident $( :: $attachment_type:ident )? ),*) => {{
1391            let bytes = "{\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}";
1392            #[allow(unused_mut)]
1393            let mut envelope = Envelope::parse_bytes(bytes.into()).unwrap();
1394            $(
1395                let mut item = Item::new(ItemType::$item_type);
1396                item.set_payload(ContentType::OctetStream, "0123456789");
1397                $( item.set_attachment_type(AttachmentType::$attachment_type); )?
1398                envelope.add_item(item);
1399            )*
1400
1401            let (outcome_aggregator, _) = Addr::custom();
1402
1403            ManagedEnvelope::new(
1404                envelope,
1405                outcome_aggregator,
1406            )
1407        }}
1408    }
1409
1410    fn set_extracted(envelope: &mut Envelope, ty: ItemType) {
1411        envelope
1412            .get_item_by_mut(|item| *item.ty() == ty)
1413            .unwrap()
1414            .set_metrics_extracted(true);
1415    }
1416
1417    fn rate_limit(category: DataCategory) -> RateLimit {
1418        RateLimit {
1419            categories: [category].into(),
1420            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1421            reason_code: None,
1422            retry_after: RetryAfter::from_secs(60),
1423            namespaces: smallvec![],
1424        }
1425    }
1426
1427    fn trace_attachment_item(bytes: usize, parent_id: Option<ParentId>) -> Item {
1428        let mut item = Item::new(ItemType::Attachment);
1429        item.set_payload(ContentType::TraceAttachment, "0".repeat(bytes));
1430        item.set_parent_id(parent_id);
1431        item
1432    }
1433
1434    #[derive(Debug, Default)]
1435    struct MockLimiter {
1436        denied: Vec<DataCategory>,
1437        called: BTreeMap<DataCategory, usize>,
1438        checked: BTreeSet<DataCategory>,
1439    }
1440
1441    impl MockLimiter {
1442        pub fn deny(mut self, category: DataCategory) -> Self {
1443            self.denied.push(category);
1444            self
1445        }
1446
1447        pub fn check(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, ()> {
1448            let cat = scoping.category;
1449            let previous = self.called.insert(cat, quantity);
1450            assert!(previous.is_none(), "rate limiter invoked twice for {cat}");
1451
1452            let mut limits = RateLimits::new();
1453            if self.denied.contains(&cat) {
1454                limits.add(rate_limit(cat));
1455            }
1456            Ok(limits)
1457        }
1458
1459        #[track_caller]
1460        pub fn assert_call(&mut self, category: DataCategory, expected: usize) {
1461            self.checked.insert(category);
1462
1463            let quantity = self.called.get(&category).copied();
1464            assert_eq!(
1465                quantity,
1466                Some(expected),
1467                "Expected quantity `{expected}` for data category `{category}`, got {quantity:?}."
1468            );
1469        }
1470    }
1471
1472    impl Drop for MockLimiter {
1473        fn drop(&mut self) {
1474            if std::thread::panicking() {
1475                return;
1476            }
1477
1478            for checked in &self.checked {
1479                self.called.remove(checked);
1480            }
1481
1482            if self.called.is_empty() {
1483                return;
1484            }
1485
1486            let not_asserted = self
1487                .called
1488                .iter()
1489                .map(|(k, v)| format!("- {k}: {v}"))
1490                .collect::<Vec<_>>()
1491                .join("\n");
1492
1493            panic!("Following calls to the limiter were not asserted:\n{not_asserted}");
1494        }
1495    }
1496
1497    async fn enforce_and_apply(
1498        mock: Arc<Mutex<MockLimiter>>,
1499        envelope: &mut ManagedEnvelope,
1500        #[allow(unused_variables)] assume_event: Option<DataCategory>,
1501    ) -> (Enforcement, RateLimits) {
1502        let scoping = envelope.scoping();
1503
1504        #[allow(unused_mut)]
1505        let mut limiter = EnvelopeLimiter::new(CheckLimits::All, move |s, q| {
1506            let mock = mock.clone();
1507            async move {
1508                let mut mock = mock.lock().await;
1509                mock.check(s, q)
1510            }
1511        });
1512        #[cfg(feature = "processing")]
1513        if let Some(assume_event) = assume_event {
1514            limiter.assume_event(assume_event);
1515        }
1516
1517        let (enforcement, limits) = limiter
1518            .compute(envelope.envelope_mut(), &scoping)
1519            .await
1520            .unwrap();
1521
1522        // We implemented `clone` only for tests because we don't want to make `apply_with_outcomes`
1523        // &self because we want move semantics to prevent double tracking.
1524        enforcement.clone().apply_with_outcomes(envelope);
1525
1526        (enforcement, limits)
1527    }
1528
1529    fn mock_limiter(categories: &[DataCategory]) -> Arc<Mutex<MockLimiter>> {
1530        let mut mock = MockLimiter::default();
1531        for &category in categories {
1532            mock = mock.deny(category);
1533        }
1534
1535        Arc::new(Mutex::new(mock))
1536    }
1537
1538    #[tokio::test]
1539    async fn test_enforce_pass_empty() {
1540        let mut envelope = envelope![];
1541
1542        let mock = mock_limiter(&[]);
1543        let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1544
1545        assert!(!limits.is_limited());
1546        assert!(envelope.envelope().is_empty());
1547    }
1548
1549    #[tokio::test]
1550    async fn test_enforce_limit_error_event() {
1551        let mut envelope = envelope![Event];
1552
1553        let mock = mock_limiter(&[DataCategory::Error]);
1554        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1555
1556        assert!(limits.is_limited());
1557        assert!(envelope.envelope().is_empty());
1558        mock.lock().await.assert_call(DataCategory::Error, 1);
1559    }
1560
1561    #[tokio::test]
1562    async fn test_enforce_limit_error_with_attachments() {
1563        let mut envelope = envelope![Event, Attachment];
1564
1565        let mock = mock_limiter(&[DataCategory::Error]);
1566        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1567
1568        assert!(limits.is_limited());
1569        assert!(envelope.envelope().is_empty());
1570        mock.lock().await.assert_call(DataCategory::Error, 1);
1571    }
1572
1573    #[tokio::test]
1574    async fn test_enforce_limit_minidump() {
1575        let mut envelope = envelope![Attachment::Minidump];
1576
1577        let mock = mock_limiter(&[DataCategory::Error]);
1578        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1579
1580        assert!(limits.is_limited());
1581        assert!(envelope.envelope().is_empty());
1582        mock.lock().await.assert_call(DataCategory::Error, 1);
1583    }
1584
1585    #[tokio::test]
1586    async fn test_enforce_limit_attachments() {
1587        let mut envelope = envelope![Attachment::Minidump, Attachment];
1588
1589        let mock = mock_limiter(&[DataCategory::Attachment]);
1590        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1591
1592        // Attachments would be limited, but crash reports create events and are thus allowed.
1593        assert!(limits.is_limited());
1594        assert_eq!(envelope.envelope().len(), 1);
1595        mock.lock().await.assert_call(DataCategory::Error, 1);
1596        mock.lock().await.assert_call(DataCategory::Attachment, 20);
1597    }
1598
1599    /// Limit stand-alone profiles.
1600    #[tokio::test]
1601    async fn test_enforce_limit_profiles() {
1602        let mut envelope = envelope![Profile, Profile];
1603
1604        let mock = mock_limiter(&[DataCategory::Profile]);
1605        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1606
1607        assert!(limits.is_limited());
1608        assert_eq!(envelope.envelope().len(), 0);
1609        mock.lock().await.assert_call(DataCategory::Profile, 2);
1610
1611        assert_eq!(
1612            get_outcomes(enforcement),
1613            vec![
1614                (DataCategory::Profile, 2),
1615                (DataCategory::ProfileIndexed, 2)
1616            ]
1617        );
1618    }
1619
1620    /// Limit profile chunks.
1621    #[tokio::test]
1622    async fn test_enforce_limit_profile_chunks_no_profile_type() {
1623        // In this test we have profile chunks which have not yet been classified, which means they
1624        // should not be rate limited.
1625        let mut envelope = envelope![ProfileChunk, ProfileChunk];
1626
1627        let mock = mock_limiter(&[DataCategory::ProfileChunk]);
1628        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1629        assert!(!limits.is_limited());
1630        assert_eq!(get_outcomes(enforcement), vec![]);
1631
1632        let mock = mock_limiter(&[DataCategory::ProfileChunkUi]);
1633        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1634        assert!(!limits.is_limited());
1635        assert_eq!(get_outcomes(enforcement), vec![]);
1636
1637        assert_eq!(envelope.envelope().len(), 2);
1638    }
1639
1640    #[tokio::test]
1641    async fn test_enforce_limit_profile_chunks_ui() {
1642        let mut envelope = envelope![];
1643
1644        let mut item = Item::new(ItemType::ProfileChunk);
1645        item.set_profile_type(ProfileType::Backend);
1646        envelope.envelope_mut().add_item(item);
1647        let mut item = Item::new(ItemType::ProfileChunk);
1648        item.set_profile_type(ProfileType::Ui);
1649        envelope.envelope_mut().add_item(item);
1650
1651        let mock = mock_limiter(&[DataCategory::ProfileChunkUi]);
1652        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1653
1654        assert!(limits.is_limited());
1655        assert_eq!(envelope.envelope().len(), 1);
1656        mock.lock()
1657            .await
1658            .assert_call(DataCategory::ProfileChunkUi, 1);
1659        mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1660
1661        assert_eq!(
1662            get_outcomes(enforcement),
1663            vec![(DataCategory::ProfileChunkUi, 1)]
1664        );
1665    }
1666
1667    #[tokio::test]
1668    async fn test_enforce_limit_profile_chunks_backend() {
1669        let mut envelope = envelope![];
1670
1671        let mut item = Item::new(ItemType::ProfileChunk);
1672        item.set_profile_type(ProfileType::Backend);
1673        envelope.envelope_mut().add_item(item);
1674        let mut item = Item::new(ItemType::ProfileChunk);
1675        item.set_profile_type(ProfileType::Ui);
1676        envelope.envelope_mut().add_item(item);
1677
1678        let mock = mock_limiter(&[DataCategory::ProfileChunk]);
1679        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1680
1681        assert!(limits.is_limited());
1682        assert_eq!(envelope.envelope().len(), 1);
1683        mock.lock()
1684            .await
1685            .assert_call(DataCategory::ProfileChunkUi, 1);
1686        mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1687
1688        assert_eq!(
1689            get_outcomes(enforcement),
1690            vec![(DataCategory::ProfileChunk, 1)]
1691        );
1692    }
1693
1694    /// Limit replays.
1695    #[tokio::test]
1696    async fn test_enforce_limit_replays() {
1697        let mut envelope = envelope![ReplayEvent, ReplayRecording, ReplayVideo];
1698
1699        let mock = mock_limiter(&[DataCategory::Replay]);
1700        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1701
1702        assert!(limits.is_limited());
1703        assert_eq!(envelope.envelope().len(), 0);
1704        mock.lock().await.assert_call(DataCategory::Replay, 3);
1705
1706        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Replay, 3),]);
1707    }
1708
1709    /// Limit monitor checkins.
1710    #[tokio::test]
1711    async fn test_enforce_limit_monitor_checkins() {
1712        let mut envelope = envelope![CheckIn];
1713
1714        let mock = mock_limiter(&[DataCategory::Monitor]);
1715        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1716
1717        assert!(limits.is_limited());
1718        assert_eq!(envelope.envelope().len(), 0);
1719        mock.lock().await.assert_call(DataCategory::Monitor, 1);
1720
1721        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Monitor, 1)])
1722    }
1723
1724    #[tokio::test]
1725    async fn test_enforce_pass_minidump() {
1726        let mut envelope = envelope![Attachment::Minidump];
1727
1728        let mock = mock_limiter(&[DataCategory::Attachment]);
1729        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1730
1731        // If only crash report attachments are present, we don't emit a rate limit.
1732        assert!(!limits.is_limited());
1733        assert_eq!(envelope.envelope().len(), 1);
1734        mock.lock().await.assert_call(DataCategory::Error, 1);
1735        mock.lock().await.assert_call(DataCategory::Attachment, 10);
1736    }
1737
1738    #[tokio::test]
1739    async fn test_enforce_skip_rate_limited() {
1740        let mut envelope = envelope![];
1741
1742        let mut item = Item::new(ItemType::Attachment);
1743        item.set_payload(ContentType::OctetStream, "0123456789");
1744        item.set_rate_limited(true);
1745        envelope.envelope_mut().add_item(item);
1746
1747        let mock = mock_limiter(&[DataCategory::Error]);
1748        let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1749
1750        assert!(!limits.is_limited()); // No new rate limits applied.
1751        assert_eq!(envelope.envelope().len(), 1); // The item was retained
1752    }
1753
1754    #[tokio::test]
1755    async fn test_enforce_pass_sessions() {
1756        let mut envelope = envelope![Session, Session, Session];
1757
1758        let mock = mock_limiter(&[DataCategory::Error]);
1759        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1760
1761        // If only crash report attachments are present, we don't emit a rate limit.
1762        assert!(!limits.is_limited());
1763        assert_eq!(envelope.envelope().len(), 3);
1764        mock.lock().await.assert_call(DataCategory::Session, 3);
1765    }
1766
1767    #[tokio::test]
1768    async fn test_enforce_limit_sessions() {
1769        let mut envelope = envelope![Session, Session, Event];
1770
1771        let mock = mock_limiter(&[DataCategory::Session]);
1772        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1773
1774        // If only crash report attachments are present, we don't emit a rate limit.
1775        assert!(limits.is_limited());
1776        assert_eq!(envelope.envelope().len(), 1);
1777        mock.lock().await.assert_call(DataCategory::Error, 1);
1778        mock.lock().await.assert_call(DataCategory::Session, 2);
1779    }
1780
1781    #[tokio::test]
1782    #[cfg(feature = "processing")]
1783    async fn test_enforce_limit_assumed_event() {
1784        let mut envelope = envelope![];
1785
1786        let mock = mock_limiter(&[DataCategory::Transaction]);
1787        let (_, limits) =
1788            enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Transaction)).await;
1789
1790        assert!(limits.is_limited());
1791        assert!(envelope.envelope().is_empty()); // obviously
1792        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1793    }
1794
1795    #[tokio::test]
1796    #[cfg(feature = "processing")]
1797    async fn test_enforce_limit_assumed_attachments() {
1798        let mut envelope = envelope![Attachment, Attachment];
1799
1800        let mock = mock_limiter(&[DataCategory::Error]);
1801        let (_, limits) =
1802            enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Error)).await;
1803
1804        assert!(limits.is_limited());
1805        assert!(envelope.envelope().is_empty());
1806        mock.lock().await.assert_call(DataCategory::Error, 1);
1807    }
1808
1809    #[tokio::test]
1810    async fn test_enforce_transaction() {
1811        let mut envelope = envelope![Transaction];
1812
1813        let mock = mock_limiter(&[DataCategory::Transaction]);
1814        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1815
1816        assert!(limits.is_limited());
1817        assert!(enforcement.event_indexed.is_active());
1818        assert!(enforcement.event.is_active());
1819        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1820
1821        assert_eq!(
1822            get_outcomes(enforcement),
1823            vec![
1824                (DataCategory::Transaction, 1),
1825                (DataCategory::TransactionIndexed, 1),
1826                (DataCategory::Span, 1),
1827                (DataCategory::SpanIndexed, 1),
1828            ]
1829        );
1830    }
1831
1832    #[tokio::test]
1833    async fn test_enforce_transaction_non_indexed() {
1834        let mut envelope = envelope![Transaction, Profile];
1835        let scoping = envelope.scoping();
1836
1837        let mock = mock_limiter(&[DataCategory::TransactionIndexed]);
1838
1839        let mock_clone = mock.clone();
1840        let limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, move |s, q| {
1841            let mock_clone = mock_clone.clone();
1842            async move {
1843                let mut mock = mock_clone.lock().await;
1844                mock.check(s, q)
1845            }
1846        });
1847        let (enforcement, limits) = limiter
1848            .compute(envelope.envelope_mut(), &scoping)
1849            .await
1850            .unwrap();
1851        enforcement.clone().apply_with_outcomes(&mut envelope);
1852
1853        assert!(!limits.is_limited());
1854        assert!(!enforcement.event_indexed.is_active());
1855        assert!(!enforcement.event.is_active());
1856        assert!(!enforcement.profiles_indexed.is_active());
1857        assert!(!enforcement.profiles.is_active());
1858        assert!(!enforcement.spans.is_active());
1859        assert!(!enforcement.spans_indexed.is_active());
1860        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1861        mock.lock().await.assert_call(DataCategory::Profile, 1);
1862        mock.lock().await.assert_call(DataCategory::Span, 1);
1863    }
1864
1865    #[tokio::test]
1866    async fn test_enforce_transaction_no_indexing_quota() {
1867        let mut envelope = envelope![Transaction];
1868
1869        let mock = mock_limiter(&[DataCategory::TransactionIndexed]);
1870        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1871
1872        assert!(limits.is_limited());
1873        assert!(enforcement.event_indexed.is_active());
1874        assert!(!enforcement.event.is_active());
1875        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1876        mock.lock()
1877            .await
1878            .assert_call(DataCategory::TransactionIndexed, 1);
1879    }
1880
1881    #[tokio::test]
1882    async fn test_enforce_transaction_attachment_enforced() {
1883        let mut envelope = envelope![Transaction, Attachment];
1884
1885        let mock = mock_limiter(&[DataCategory::Transaction]);
1886        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1887
1888        assert!(enforcement.event.is_active());
1889        assert!(enforcement.attachments_limits.event.is_active());
1890        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1891    }
1892
1893    fn get_outcomes(enforcement: Enforcement) -> Vec<(DataCategory, usize)> {
1894        enforcement
1895            .get_outcomes()
1896            .map(|(_, data_category, quantity)| (data_category, quantity))
1897            .collect::<Vec<_>>()
1898    }
1899
1900    #[tokio::test]
1901    async fn test_enforce_transaction_profile_enforced() {
1902        let mut envelope = envelope![Transaction, Profile];
1903
1904        let mock = mock_limiter(&[DataCategory::Transaction]);
1905        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1906
1907        assert!(enforcement.event.is_active());
1908        assert!(enforcement.profiles.is_active());
1909        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1910
1911        assert_eq!(
1912            get_outcomes(enforcement),
1913            vec![
1914                (DataCategory::Transaction, 1),
1915                (DataCategory::TransactionIndexed, 1),
1916                (DataCategory::Profile, 1),
1917                (DataCategory::ProfileIndexed, 1),
1918                (DataCategory::Span, 1),
1919                (DataCategory::SpanIndexed, 1),
1920            ]
1921        );
1922    }
1923
1924    #[tokio::test]
1925    async fn test_enforce_transaction_standalone_profile_enforced() {
1926        // When the transaction is sampled, the profile survives as standalone.
1927        let mut envelope = envelope![Profile];
1928
1929        let mock = mock_limiter(&[DataCategory::Transaction]);
1930        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1931
1932        assert!(enforcement.profiles.is_active());
1933        mock.lock().await.assert_call(DataCategory::Profile, 1);
1934        mock.lock().await.assert_call(DataCategory::Transaction, 0);
1935
1936        assert_eq!(
1937            get_outcomes(enforcement),
1938            vec![
1939                (DataCategory::Profile, 1),
1940                (DataCategory::ProfileIndexed, 1),
1941            ]
1942        );
1943    }
1944
1945    #[tokio::test]
1946    async fn test_enforce_transaction_attachment_enforced_indexing_quota() {
1947        let mut envelope = envelope![Transaction, Attachment];
1948        set_extracted(envelope.envelope_mut(), ItemType::Transaction);
1949
1950        let mock = mock_limiter(&[DataCategory::TransactionIndexed]);
1951        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1952
1953        assert!(!enforcement.event.is_active());
1954        assert!(enforcement.event_indexed.is_active());
1955        assert!(enforcement.attachments_limits.event.is_active());
1956        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1957        mock.lock()
1958            .await
1959            .assert_call(DataCategory::TransactionIndexed, 1);
1960
1961        assert_eq!(
1962            get_outcomes(enforcement),
1963            vec![
1964                (DataCategory::TransactionIndexed, 1),
1965                (DataCategory::Attachment, 10),
1966                (DataCategory::AttachmentItem, 1),
1967                (DataCategory::SpanIndexed, 1),
1968            ]
1969        );
1970    }
1971
1972    #[tokio::test]
1973    async fn test_enforce_span() {
1974        let mut envelope = envelope![Span, Span];
1975
1976        let mock = mock_limiter(&[DataCategory::Span]);
1977        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1978
1979        assert!(limits.is_limited());
1980        assert!(enforcement.spans_indexed.is_active());
1981        assert!(enforcement.spans.is_active());
1982        mock.lock().await.assert_call(DataCategory::Span, 2);
1983
1984        assert_eq!(
1985            get_outcomes(enforcement),
1986            vec![(DataCategory::Span, 2), (DataCategory::SpanIndexed, 2)]
1987        );
1988    }
1989
1990    #[tokio::test]
1991    async fn test_enforce_span_no_indexing_quota() {
1992        let mut envelope = envelope![Span, Span];
1993
1994        let mock = mock_limiter(&[DataCategory::SpanIndexed]);
1995        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1996
1997        assert!(limits.is_limited());
1998        assert!(enforcement.spans_indexed.is_active());
1999        assert!(!enforcement.spans.is_active());
2000        mock.lock().await.assert_call(DataCategory::Span, 2);
2001        mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
2002
2003        assert_eq!(
2004            get_outcomes(enforcement),
2005            vec![(DataCategory::SpanIndexed, 2)]
2006        );
2007    }
2008
2009    #[tokio::test]
2010    async fn test_enforce_span_metrics_extracted_no_indexing_quota() {
2011        let mut envelope = envelope![Span, Span];
2012        set_extracted(envelope.envelope_mut(), ItemType::Span);
2013
2014        let mock = mock_limiter(&[DataCategory::SpanIndexed]);
2015        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
2016
2017        assert!(limits.is_limited());
2018        assert!(enforcement.spans_indexed.is_active());
2019        assert!(!enforcement.spans.is_active());
2020        mock.lock().await.assert_call(DataCategory::Span, 2);
2021        mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
2022
2023        assert_eq!(
2024            get_outcomes(enforcement),
2025            vec![(DataCategory::SpanIndexed, 2)]
2026        );
2027    }
2028
2029    #[test]
2030    fn test_source_quantity_for_total_quantity() {
2031        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2032            .parse()
2033            .unwrap();
2034        let request_meta = RequestMeta::new(dsn);
2035
2036        let mut envelope = Envelope::from_request(None, request_meta);
2037
2038        let mut item = Item::new(ItemType::MetricBuckets);
2039        item.set_source_quantities(SourceQuantities {
2040            transactions: 5,
2041            spans: 0,
2042            profiles: 2,
2043            buckets: 5,
2044        });
2045        envelope.add_item(item);
2046
2047        let mut item = Item::new(ItemType::MetricBuckets);
2048        item.set_source_quantities(SourceQuantities {
2049            transactions: 2,
2050            spans: 0,
2051            profiles: 0,
2052            buckets: 3,
2053        });
2054        envelope.add_item(item);
2055
2056        let summary = EnvelopeSummary::compute(&envelope);
2057
2058        assert_eq!(summary.profile_quantity, 2);
2059        assert_eq!(summary.secondary_transaction_quantity, 7);
2060    }
2061
2062    #[tokio::test]
2063    async fn test_enforce_limit_logs_count() {
2064        let mut envelope = envelope![Log, Log];
2065
2066        let mock = mock_limiter(&[DataCategory::LogItem]);
2067        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
2068
2069        assert!(limits.is_limited());
2070        assert_eq!(envelope.envelope().len(), 0);
2071        mock.lock().await.assert_call(DataCategory::LogItem, 2);
2072        mock.lock().await.assert_call(DataCategory::LogByte, 20);
2073
2074        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogItem, 2)]);
2075    }
2076
2077    #[tokio::test]
2078    async fn test_enforce_limit_logs_bytes() {
2079        let mut envelope = envelope![Log, Log];
2080
2081        let mock = mock_limiter(&[DataCategory::LogByte]);
2082        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
2083
2084        assert!(limits.is_limited());
2085        assert_eq!(envelope.envelope().len(), 0);
2086        mock.lock().await.assert_call(DataCategory::LogItem, 2);
2087        mock.lock().await.assert_call(DataCategory::LogByte, 20);
2088
2089        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogByte, 20)]);
2090    }
2091
2092    #[tokio::test]
2093    async fn test_enforce_standalone_span_attachment() {
2094        let test_cases = &[
2095            RateLimitTestCase {
2096                name: "span_limit",
2097                denied_categories: &[DataCategory::Span],
2098                expect_attachment_limit_active: true,
2099                expected_limiter_calls: &[(DataCategory::Span, 0)],
2100                expected_outcomes: &[
2101                    (DataCategory::Attachment, 7),
2102                    (DataCategory::AttachmentItem, 1),
2103                ],
2104            },
2105            RateLimitTestCase {
2106                name: "span_indexed_limit",
2107                denied_categories: &[DataCategory::SpanIndexed],
2108                expect_attachment_limit_active: true,
2109                expected_limiter_calls: &[(DataCategory::Span, 0), (DataCategory::SpanIndexed, 0)],
2110                expected_outcomes: &[
2111                    (DataCategory::Attachment, 7),
2112                    (DataCategory::AttachmentItem, 1),
2113                ],
2114            },
2115            RateLimitTestCase {
2116                name: "attachment_limit",
2117                denied_categories: &[DataCategory::Attachment],
2118                expect_attachment_limit_active: true,
2119                expected_limiter_calls: &[
2120                    (DataCategory::Span, 0),
2121                    (DataCategory::SpanIndexed, 0),
2122                    (DataCategory::Attachment, 7),
2123                ],
2124                expected_outcomes: &[
2125                    (DataCategory::Attachment, 7),
2126                    (DataCategory::AttachmentItem, 1),
2127                ],
2128            },
2129            RateLimitTestCase {
2130                name: "attachment_indexed_limit",
2131                denied_categories: &[DataCategory::AttachmentItem],
2132                expect_attachment_limit_active: true,
2133                expected_limiter_calls: &[
2134                    (DataCategory::Span, 0),
2135                    (DataCategory::SpanIndexed, 0),
2136                    (DataCategory::Attachment, 7),
2137                    (DataCategory::AttachmentItem, 1),
2138                ],
2139                expected_outcomes: &[
2140                    (DataCategory::Attachment, 7),
2141                    (DataCategory::AttachmentItem, 1),
2142                ],
2143            },
2144            RateLimitTestCase {
2145                name: "transaction_limit",
2146                denied_categories: &[DataCategory::Transaction],
2147                expect_attachment_limit_active: false,
2148                expected_limiter_calls: &[
2149                    (DataCategory::Span, 0),
2150                    (DataCategory::SpanIndexed, 0),
2151                    (DataCategory::Attachment, 7),
2152                    (DataCategory::AttachmentItem, 1),
2153                ],
2154                expected_outcomes: &[],
2155            },
2156            RateLimitTestCase {
2157                name: "error_limit",
2158                denied_categories: &[DataCategory::Error],
2159                expect_attachment_limit_active: false,
2160                expected_limiter_calls: &[
2161                    (DataCategory::Span, 0),
2162                    (DataCategory::SpanIndexed, 0),
2163                    (DataCategory::Attachment, 7),
2164                    (DataCategory::AttachmentItem, 1),
2165                ],
2166                expected_outcomes: &[],
2167            },
2168            RateLimitTestCase {
2169                name: "no_limits",
2170                denied_categories: &[],
2171                expect_attachment_limit_active: false,
2172                expected_limiter_calls: &[
2173                    (DataCategory::Span, 0),
2174                    (DataCategory::SpanIndexed, 0),
2175                    (DataCategory::Attachment, 7),
2176                    (DataCategory::AttachmentItem, 1),
2177                ],
2178                expected_outcomes: &[],
2179            },
2180        ];
2181
2182        for RateLimitTestCase {
2183            name,
2184            denied_categories,
2185            expect_attachment_limit_active,
2186            expected_limiter_calls,
2187            expected_outcomes,
2188        } in test_cases
2189        {
2190            let mut envelope = envelope![];
2191            envelope
2192                .envelope_mut()
2193                .add_item(trace_attachment_item(7, Some(ParentId::SpanId(None))));
2194
2195            let mock = mock_limiter(denied_categories);
2196            let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
2197
2198            for &(category, quantity) in *expected_limiter_calls {
2199                mock.lock().await.assert_call(category, quantity);
2200            }
2201
2202            assert_eq!(
2203                enforcement.attachments_limits.span.bytes.is_active(),
2204                *expect_attachment_limit_active,
2205                "{name}: span_attachment byte limit mismatch"
2206            );
2207            assert_eq!(
2208                enforcement.attachments_limits.span.count.is_active(),
2209                *expect_attachment_limit_active,
2210                "{name}: span_attachment count limit mismatch"
2211            );
2212
2213            assert_eq!(
2214                get_outcomes(enforcement),
2215                *expected_outcomes,
2216                "{name}: outcome mismatch"
2217            );
2218        }
2219    }
2220
2221    #[tokio::test]
2222    async fn test_enforce_span_with_span_attachment() {
2223        let test_cases = &[
2224            RateLimitTestCase {
2225                name: "span_limit",
2226                denied_categories: &[DataCategory::Span, DataCategory::Attachment], // Attachment here has no effect
2227                expect_attachment_limit_active: true,
2228                expected_limiter_calls: &[(DataCategory::Span, 1)],
2229                expected_outcomes: &[
2230                    (DataCategory::Attachment, 7),
2231                    (DataCategory::AttachmentItem, 1),
2232                    (DataCategory::Span, 1),
2233                    (DataCategory::SpanIndexed, 1),
2234                ],
2235            },
2236            RateLimitTestCase {
2237                name: "span_indexed_limit",
2238                denied_categories: &[DataCategory::SpanIndexed, DataCategory::Attachment],
2239                expect_attachment_limit_active: true,
2240                expected_limiter_calls: &[(DataCategory::Span, 1), (DataCategory::SpanIndexed, 1)],
2241                expected_outcomes: &[
2242                    (DataCategory::Attachment, 7),
2243                    (DataCategory::AttachmentItem, 1),
2244                    (DataCategory::SpanIndexed, 1),
2245                ],
2246            },
2247            RateLimitTestCase {
2248                name: "attachment_limit",
2249                denied_categories: &[DataCategory::Attachment],
2250                expect_attachment_limit_active: true,
2251                expected_limiter_calls: &[
2252                    (DataCategory::Span, 1),
2253                    (DataCategory::SpanIndexed, 1),
2254                    (DataCategory::Attachment, 7),
2255                ],
2256                expected_outcomes: &[
2257                    (DataCategory::Attachment, 7),
2258                    (DataCategory::AttachmentItem, 1),
2259                ],
2260            },
2261            RateLimitTestCase {
2262                name: "attachment_indexed_limit",
2263                denied_categories: &[DataCategory::AttachmentItem],
2264                expect_attachment_limit_active: true,
2265                expected_limiter_calls: &[
2266                    (DataCategory::Span, 1),
2267                    (DataCategory::SpanIndexed, 1),
2268                    (DataCategory::Attachment, 7),
2269                    (DataCategory::AttachmentItem, 1),
2270                ],
2271                expected_outcomes: &[
2272                    (DataCategory::Attachment, 7),
2273                    (DataCategory::AttachmentItem, 1),
2274                ],
2275            },
2276            RateLimitTestCase {
2277                name: "transaction_limit",
2278                denied_categories: &[DataCategory::Transaction],
2279                expect_attachment_limit_active: false,
2280                expected_limiter_calls: &[
2281                    (DataCategory::Span, 1),
2282                    (DataCategory::SpanIndexed, 1),
2283                    (DataCategory::Attachment, 7),
2284                    (DataCategory::AttachmentItem, 1),
2285                ],
2286                expected_outcomes: &[],
2287            },
2288            RateLimitTestCase {
2289                name: "error_limit",
2290                denied_categories: &[DataCategory::Error],
2291                expect_attachment_limit_active: false,
2292                expected_limiter_calls: &[
2293                    (DataCategory::Span, 1),
2294                    (DataCategory::SpanIndexed, 1),
2295                    (DataCategory::Attachment, 7),
2296                    (DataCategory::AttachmentItem, 1),
2297                ],
2298                expected_outcomes: &[],
2299            },
2300            RateLimitTestCase {
2301                name: "no_limits",
2302                denied_categories: &[],
2303                expect_attachment_limit_active: false,
2304                expected_limiter_calls: &[
2305                    (DataCategory::Span, 1),
2306                    (DataCategory::SpanIndexed, 1),
2307                    (DataCategory::Attachment, 7),
2308                    (DataCategory::AttachmentItem, 1),
2309                ],
2310                expected_outcomes: &[],
2311            },
2312        ];
2313
2314        for RateLimitTestCase {
2315            name,
2316            denied_categories,
2317            expect_attachment_limit_active,
2318            expected_limiter_calls,
2319            expected_outcomes,
2320        } in test_cases
2321        {
2322            let mut envelope = envelope![Span];
2323            envelope
2324                .envelope_mut()
2325                .add_item(trace_attachment_item(7, Some(ParentId::SpanId(None))));
2326
2327            let mock = mock_limiter(denied_categories);
2328            let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
2329
2330            for &(category, quantity) in *expected_limiter_calls {
2331                mock.lock().await.assert_call(category, quantity);
2332            }
2333
2334            assert_eq!(
2335                enforcement.attachments_limits.span.bytes.is_active(),
2336                *expect_attachment_limit_active,
2337                "{name}: span_attachment byte limit mismatch"
2338            );
2339            assert_eq!(
2340                enforcement.attachments_limits.span.count.is_active(),
2341                *expect_attachment_limit_active,
2342                "{name}: span_attachment count limit mismatch"
2343            );
2344
2345            assert_eq!(
2346                get_outcomes(enforcement),
2347                *expected_outcomes,
2348                "{name}: outcome mismatch"
2349            );
2350        }
2351    }
2352
2353    #[tokio::test]
2354    async fn test_enforce_transaction_span_attachment() {
2355        let test_cases = &[
2356            RateLimitTestCase {
2357                name: "span_limit",
2358                denied_categories: &[DataCategory::Span],
2359                expect_attachment_limit_active: true,
2360                expected_limiter_calls: &[
2361                    (DataCategory::Transaction, 1),
2362                    (DataCategory::TransactionIndexed, 1),
2363                    (DataCategory::Span, 1),
2364                ],
2365                expected_outcomes: &[
2366                    (DataCategory::Attachment, 7),
2367                    (DataCategory::AttachmentItem, 1),
2368                    (DataCategory::Span, 1),
2369                    (DataCategory::SpanIndexed, 1),
2370                ],
2371            },
2372            RateLimitTestCase {
2373                name: "transaction_limit",
2374                denied_categories: &[DataCategory::Transaction],
2375                expect_attachment_limit_active: true,
2376                expected_limiter_calls: &[(DataCategory::Transaction, 1)],
2377                expected_outcomes: &[
2378                    (DataCategory::Transaction, 1),
2379                    (DataCategory::TransactionIndexed, 1),
2380                    (DataCategory::Attachment, 7),
2381                    (DataCategory::AttachmentItem, 1),
2382                    (DataCategory::Span, 1),
2383                    (DataCategory::SpanIndexed, 1),
2384                ],
2385            },
2386            RateLimitTestCase {
2387                name: "error_limit",
2388                denied_categories: &[DataCategory::Error],
2389                expect_attachment_limit_active: false,
2390                expected_limiter_calls: &[
2391                    (DataCategory::Transaction, 1),
2392                    (DataCategory::TransactionIndexed, 1),
2393                    (DataCategory::Span, 1),
2394                    (DataCategory::SpanIndexed, 1),
2395                    (DataCategory::Attachment, 7),
2396                    (DataCategory::AttachmentItem, 1),
2397                ],
2398                expected_outcomes: &[],
2399            },
2400            RateLimitTestCase {
2401                name: "no_limits",
2402                denied_categories: &[],
2403                expect_attachment_limit_active: false,
2404                expected_limiter_calls: &[
2405                    (DataCategory::Transaction, 1),
2406                    (DataCategory::TransactionIndexed, 1),
2407                    (DataCategory::Span, 1),
2408                    (DataCategory::SpanIndexed, 1),
2409                    (DataCategory::Attachment, 7),
2410                    (DataCategory::AttachmentItem, 1),
2411                ],
2412                expected_outcomes: &[],
2413            },
2414        ];
2415
2416        for RateLimitTestCase {
2417            name,
2418            denied_categories,
2419            expect_attachment_limit_active,
2420            expected_limiter_calls,
2421            expected_outcomes,
2422        } in test_cases
2423        {
2424            let mut envelope = envelope![Transaction];
2425            envelope
2426                .envelope_mut()
2427                .add_item(trace_attachment_item(7, Some(ParentId::SpanId(None))));
2428
2429            let mock = mock_limiter(denied_categories);
2430            let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
2431
2432            for &(category, quantity) in *expected_limiter_calls {
2433                mock.lock().await.assert_call(category, quantity);
2434            }
2435
2436            assert_eq!(
2437                enforcement.attachments_limits.span.bytes.is_active(),
2438                *expect_attachment_limit_active,
2439                "{name}: span_attachment byte limit mismatch"
2440            );
2441            assert_eq!(
2442                enforcement.attachments_limits.span.count.is_active(),
2443                *expect_attachment_limit_active,
2444                "{name}: span_attachment count limit mismatch"
2445            );
2446
2447            assert_eq!(
2448                get_outcomes(enforcement),
2449                *expected_outcomes,
2450                "{name}: outcome mismatch"
2451            );
2452        }
2453    }
2454
2455    #[tokio::test]
2456    async fn test_enforce_standalone_trace_attachment() {
2457        let test_cases = &[
2458            RateLimitTestCase {
2459                name: "attachment_limit",
2460                denied_categories: &[DataCategory::Attachment],
2461                expect_attachment_limit_active: true,
2462                expected_limiter_calls: &[(DataCategory::Attachment, 7)],
2463                expected_outcomes: &[
2464                    (DataCategory::Attachment, 7),
2465                    (DataCategory::AttachmentItem, 1),
2466                ],
2467            },
2468            RateLimitTestCase {
2469                name: "attachment_limit_and_attachment_item_limit",
2470                denied_categories: &[DataCategory::Attachment, DataCategory::AttachmentItem],
2471                expect_attachment_limit_active: true,
2472                expected_limiter_calls: &[(DataCategory::Attachment, 7)],
2473                expected_outcomes: &[
2474                    (DataCategory::Attachment, 7),
2475                    (DataCategory::AttachmentItem, 1),
2476                ],
2477            },
2478            RateLimitTestCase {
2479                name: "attachment_item_limit",
2480                denied_categories: &[DataCategory::AttachmentItem],
2481                expect_attachment_limit_active: true,
2482                expected_limiter_calls: &[
2483                    (DataCategory::Attachment, 7),
2484                    (DataCategory::AttachmentItem, 1),
2485                ],
2486                expected_outcomes: &[
2487                    (DataCategory::Attachment, 7),
2488                    (DataCategory::AttachmentItem, 1),
2489                ],
2490            },
2491            RateLimitTestCase {
2492                name: "no_limits",
2493                denied_categories: &[],
2494                expect_attachment_limit_active: false,
2495                expected_limiter_calls: &[
2496                    (DataCategory::Attachment, 7),
2497                    (DataCategory::AttachmentItem, 1),
2498                ],
2499                expected_outcomes: &[],
2500            },
2501        ];
2502
2503        for RateLimitTestCase {
2504            name,
2505            denied_categories,
2506            expect_attachment_limit_active,
2507            expected_limiter_calls,
2508            expected_outcomes,
2509        } in test_cases
2510        {
2511            let mut envelope = envelope![];
2512            envelope
2513                .envelope_mut()
2514                .add_item(trace_attachment_item(7, None));
2515
2516            let mock = mock_limiter(denied_categories);
2517            let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
2518
2519            for &(category, quantity) in *expected_limiter_calls {
2520                mock.lock().await.assert_call(category, quantity);
2521            }
2522
2523            assert_eq!(
2524                enforcement.attachments_limits.trace.bytes.is_active(),
2525                *expect_attachment_limit_active,
2526                "{name}: trace_attachment byte limit mismatch"
2527            );
2528            assert_eq!(
2529                enforcement.attachments_limits.trace.count.is_active(),
2530                *expect_attachment_limit_active,
2531                "{name}: trace_attachment count limit mismatch"
2532            );
2533
2534            assert_eq!(
2535                get_outcomes(enforcement),
2536                *expected_outcomes,
2537                "{name}: outcome mismatch"
2538            );
2539        }
2540    }
2541}