relay_server/utils/
rate_limits.rs

1use std::fmt::{self, Write};
2use std::future::Future;
3use std::marker::PhantomData;
4
5use relay_profiling::ProfileType;
6use relay_quotas::{
7    DataCategory, ItemScoping, QuotaScope, RateLimit, RateLimitScope, RateLimits, ReasonCode,
8    Scoping,
9};
10
11use crate::envelope::{AttachmentParentType, Envelope, Item, ItemType};
12use crate::integrations::Integration;
13use crate::managed::{Managed, ManagedEnvelope};
14use crate::services::outcome::Outcome;
15
16/// Name of the rate limits header.
17pub const RATE_LIMITS_HEADER: &str = "X-Sentry-Rate-Limits";
18
19/// Formats the `X-Sentry-Rate-Limits` header.
20pub fn format_rate_limits(rate_limits: &RateLimits) -> String {
21    let mut header = String::new();
22
23    for rate_limit in rate_limits {
24        if !header.is_empty() {
25            header.push_str(", ");
26        }
27
28        write!(header, "{}:", rate_limit.retry_after.remaining_seconds()).ok();
29
30        for (index, category) in rate_limit.categories.iter().enumerate() {
31            if index > 0 {
32                header.push(';');
33            }
34            write!(header, "{category}").ok();
35        }
36
37        write!(header, ":{}", rate_limit.scope.name()).ok();
38
39        if let Some(ref reason_code) = rate_limit.reason_code {
40            write!(header, ":{reason_code}").ok();
41        } else if !rate_limit.namespaces.is_empty() {
42            write!(header, ":").ok(); // delimits the empty reason code for namespaces
43        }
44
45        for (index, namespace) in rate_limit.namespaces.iter().enumerate() {
46            header.push(if index == 0 { ':' } else { ';' });
47            write!(header, "{namespace}").ok();
48        }
49    }
50
51    header
52}
53
54/// Parses the `X-Sentry-Rate-Limits` header.
55pub fn parse_rate_limits(scoping: &Scoping, string: &str) -> RateLimits {
56    let mut rate_limits = RateLimits::new();
57
58    for limit in string.split(',') {
59        let limit = limit.trim();
60        if limit.is_empty() {
61            continue;
62        }
63
64        let mut components = limit.split(':');
65
66        let retry_after = match components.next().and_then(|s| s.parse().ok()) {
67            Some(retry_after) => retry_after,
68            None => continue,
69        };
70
71        let categories = components
72            .next()
73            .unwrap_or("")
74            .split(';')
75            .filter(|category| !category.is_empty())
76            .map(DataCategory::from_name)
77            .collect();
78
79        let quota_scope = QuotaScope::from_name(components.next().unwrap_or(""));
80        let scope = RateLimitScope::for_quota(*scoping, quota_scope);
81
82        let reason_code = components
83            .next()
84            .filter(|s| !s.is_empty())
85            .map(ReasonCode::new);
86
87        let namespace = components
88            .next()
89            .unwrap_or("")
90            .split(';')
91            .filter(|s| !s.is_empty())
92            .filter_map(|s| s.parse().ok())
93            .collect();
94
95        rate_limits.add(RateLimit {
96            categories,
97            scope,
98            reason_code,
99            retry_after,
100            namespaces: namespace,
101        });
102    }
103
104    rate_limits
105}
106
107/// Infer the data category from an item.
108///
109/// Categories depend mostly on the item type, with a few special cases:
110/// - `Event`: the category is inferred from the event type. This requires the `event_type` header
111///   to be set on the event item.
112/// - `Attachment`: If the attachment creates an event (e.g. for minidumps), the category is assumed
113///   to be `Error`.
114fn infer_event_category(item: &Item) -> Option<DataCategory> {
115    match item.ty() {
116        ItemType::Event => Some(DataCategory::Error),
117        ItemType::Transaction => Some(DataCategory::Transaction),
118        ItemType::Security | ItemType::RawSecurity => Some(DataCategory::Security),
119        ItemType::UnrealReport => Some(DataCategory::Error),
120        ItemType::UserReportV2 => Some(DataCategory::UserReportV2),
121        ItemType::Attachment if item.creates_event() => Some(DataCategory::Error),
122        ItemType::Attachment => None,
123        ItemType::Session => None,
124        ItemType::Sessions => None,
125        ItemType::Statsd => None,
126        ItemType::MetricBuckets => None,
127        ItemType::FormData => None,
128        ItemType::UserReport => None,
129        ItemType::Profile => None,
130        ItemType::ReplayEvent => None,
131        ItemType::ReplayRecording => None,
132        ItemType::ReplayVideo => None,
133        ItemType::ClientReport => None,
134        ItemType::CheckIn => None,
135        ItemType::Nel => None,
136        ItemType::Log => None,
137        ItemType::TraceMetric => None,
138        ItemType::Span => None,
139        ItemType::ProfileChunk => None,
140        ItemType::Integration => None,
141        ItemType::Unknown(_) => None,
142    }
143}
144
145/// Quantity metrics for a single category of attachments.
146///
147/// Tracks both the count of attachments and size in bytes.
148#[derive(Clone, Copy, Debug, Default)]
149pub struct AttachmentQuantity {
150    /// Number of attachment items.
151    pub count: usize,
152    /// Total size of attachments in bytes.
153    pub bytes: usize,
154}
155
156impl AttachmentQuantity {
157    pub fn is_empty(&self) -> bool {
158        self.count == 0 && self.bytes == 0
159    }
160}
161
162/// Aggregated attachment quantities grouped by [`AttachmentParentType`].
163///
164/// This separation is necessary since rate limiting logic varies by [`AttachmentParentType`].
165#[derive(Clone, Copy, Debug, Default)]
166pub struct AttachmentQuantities {
167    /// Quantities of Event Attachments.
168    ///
169    /// See also: [`AttachmentParentType::Event`].
170    pub event: AttachmentQuantity,
171    /// Quantities of trace V2 Attachments.
172    pub trace: AttachmentQuantity,
173    /// Quantities of span V2 Attachments.
174    pub span: AttachmentQuantity,
175}
176
177impl AttachmentQuantities {
178    /// Returns the total count of all attachments across all parent types.
179    pub fn count(&self) -> usize {
180        let AttachmentQuantities { event, trace, span } = self;
181        event.count + trace.count + span.count
182    }
183
184    /// Returns the total size in bytes of all attachments across all parent types.
185    pub fn bytes(&self) -> usize {
186        let AttachmentQuantities { event, trace, span } = self;
187        event.bytes + trace.bytes + span.bytes
188    }
189}
190
191/// A summary of `Envelope` contents.
192///
193/// Summarizes the contained event, size of attachments, session updates, and whether there are
194/// plain attachments. This is used for efficient rate limiting or outcome handling.
195#[non_exhaustive]
196#[derive(Clone, Copy, Debug, Default)]
197pub struct EnvelopeSummary {
198    /// The data category of the event in the envelope. `None` if there is no event.
199    pub event_category: Option<DataCategory>,
200
201    /// The quantities of all attachments combined.
202    pub attachment_quantities: AttachmentQuantities,
203
204    /// The number of all session updates.
205    pub session_quantity: usize,
206
207    /// The number of profiles.
208    pub profile_quantity: usize,
209
210    /// The number of replays.
211    pub replay_quantity: usize,
212
213    /// The number of user reports (legacy item type for user feedback).
214    pub user_report_quantity: usize,
215
216    /// The number of monitor check-ins.
217    pub monitor_quantity: usize,
218
219    /// The number of log for the log product sent.
220    pub log_item_quantity: usize,
221
222    /// The number of log bytes for the log product sent, in bytes
223    pub log_byte_quantity: usize,
224
225    /// Secondary number of transactions.
226    ///
227    /// This is 0 for envelopes which contain a transaction,
228    /// only secondary transaction quantity should be tracked here,
229    /// these are for example transaction counts extracted from metrics.
230    ///
231    /// A "primary" transaction is contained within the envelope,
232    /// marking the envelope data category a [`DataCategory::Transaction`].
233    pub secondary_transaction_quantity: usize,
234
235    /// See `secondary_transaction_quantity`.
236    pub secondary_span_quantity: usize,
237
238    /// The number of standalone spans.
239    pub span_quantity: usize,
240
241    /// Indicates that the envelope contains regular attachments that do not create event payloads.
242    pub has_plain_attachments: bool,
243
244    /// The payload size of this envelope.
245    pub payload_size: usize,
246
247    /// The number of profile chunks in this envelope.
248    pub profile_chunk_quantity: usize,
249    /// The number of UI profile chunks in this envelope.
250    pub profile_chunk_ui_quantity: usize,
251
252    /// The number of trace metrics in this envelope.
253    pub trace_metric_quantity: usize,
254}
255
256impl EnvelopeSummary {
257    /// Creates an empty summary.
258    pub fn empty() -> Self {
259        Self::default()
260    }
261
262    /// Creates an envelope summary and aggregates the given envelope.
263    pub fn compute(envelope: &Envelope) -> Self {
264        let mut summary = Self::empty();
265
266        for item in envelope.items() {
267            if item.creates_event() {
268                summary.infer_category(item);
269            } else if item.ty() == &ItemType::Attachment {
270                // Plain attachments do not create events.
271                summary.has_plain_attachments = true;
272            }
273
274            // If the item has been rate limited before, the quota has been consumed and outcomes
275            // emitted. We can skip it here.
276            if item.rate_limited() {
277                continue;
278            }
279
280            if let Some(source_quantities) = item.source_quantities() {
281                summary.secondary_transaction_quantity += source_quantities.transactions;
282                summary.secondary_span_quantity += source_quantities.spans;
283            }
284
285            summary.payload_size += item.len();
286
287            summary.add_quantities(item);
288
289            // Special case since v1 and v2 share a data category.
290            // Adding this in add_quantity would include v2 in the count.
291            if item.ty() == &ItemType::UserReport {
292                summary.user_report_quantity += 1;
293            }
294        }
295
296        summary
297    }
298
299    fn add_quantities(&mut self, item: &Item) {
300        for (category, quantity) in item.quantities() {
301            let target_quantity = match category {
302                DataCategory::Attachment => match item.attachment_parent_type() {
303                    AttachmentParentType::Span => &mut self.attachment_quantities.span.bytes,
304                    AttachmentParentType::Trace => &mut self.attachment_quantities.trace.bytes,
305                    AttachmentParentType::Event => &mut self.attachment_quantities.event.bytes,
306                },
307                DataCategory::AttachmentItem => match item.attachment_parent_type() {
308                    AttachmentParentType::Span => &mut self.attachment_quantities.span.count,
309                    AttachmentParentType::Trace => &mut self.attachment_quantities.trace.count,
310                    AttachmentParentType::Event => &mut self.attachment_quantities.event.count,
311                },
312                DataCategory::Session => &mut self.session_quantity,
313                DataCategory::Profile => &mut self.profile_quantity,
314                DataCategory::Replay => &mut self.replay_quantity,
315                DataCategory::DoNotUseReplayVideo => &mut self.replay_quantity,
316                DataCategory::Monitor => &mut self.monitor_quantity,
317                DataCategory::Span => &mut self.span_quantity,
318                DataCategory::TraceMetric => &mut self.trace_metric_quantity,
319                DataCategory::LogItem => &mut self.log_item_quantity,
320                DataCategory::LogByte => &mut self.log_byte_quantity,
321                DataCategory::ProfileChunk => &mut self.profile_chunk_quantity,
322                DataCategory::ProfileChunkUi => &mut self.profile_chunk_ui_quantity,
323                // TODO: This catch-all looks dangerous
324                _ => continue,
325            };
326            *target_quantity += quantity;
327        }
328    }
329
330    /// Infers the appropriate [`DataCategory`] for the envelope [`Item`].
331    ///
332    /// The inferred category is only applied to the [`EnvelopeSummary`] if there is not yet
333    /// a category set.
334    fn infer_category(&mut self, item: &Item) {
335        if matches!(self.event_category, None | Some(DataCategory::Default))
336            && let Some(category) = infer_event_category(item)
337        {
338            self.event_category = Some(category);
339        }
340    }
341
342    /// Returns `true` if the envelope contains items that depend on spans.
343    ///
344    /// This is used to determined if we should be checking span quota, as the quota should be
345    /// checked both if there are spans or if there are span dependent items (e.g. span attachments).
346    pub fn has_span_dependent_items(&self) -> bool {
347        !self.attachment_quantities.span.is_empty()
348    }
349}
350
351/// Rate limiting information for a data category.
352#[derive(Debug, Default, PartialEq)]
353#[cfg_attr(test, derive(Clone))]
354pub struct CategoryLimit {
355    /// The limited data category.
356    category: Option<DataCategory>,
357    /// The total rate limited quantity across all items.
358    ///
359    /// This will be `0` if nothing was rate limited.
360    quantity: usize,
361    /// The reason code of the applied rate limit.
362    ///
363    /// Defaults to `None` if the quota does not declare a reason code.
364    reason_code: Option<ReasonCode>,
365}
366
367impl CategoryLimit {
368    /// Creates a new `CategoryLimit`.
369    ///
370    /// Returns an inactive limit if `rate_limit` is `None`.
371    fn new(category: DataCategory, quantity: usize, rate_limit: Option<&RateLimit>) -> Self {
372        match rate_limit {
373            Some(limit) => Self {
374                category: Some(category),
375                quantity,
376                reason_code: limit.reason_code.clone(),
377            },
378            None => Self::default(),
379        }
380    }
381
382    /// Recreates the category limit, if active, for a new category with the same reason.
383    pub fn clone_for(&self, category: DataCategory, quantity: usize) -> CategoryLimit {
384        if !self.is_active() {
385            return Self::default();
386        }
387
388        Self {
389            category: Some(category),
390            quantity,
391            reason_code: self.reason_code.clone(),
392        }
393    }
394
395    /// Returns `true` if this is an active limit.
396    ///
397    /// Inactive limits are placeholders with no category set.
398    pub fn is_active(&self) -> bool {
399        self.category.is_some()
400    }
401}
402
403/// Rate limiting information for a single category of attachments.
404#[derive(Default, Debug)]
405#[cfg_attr(test, derive(Clone))]
406pub struct AttachmentLimits {
407    /// Rate limit applied to attachment bytes ([`DataCategory::Attachment`]).
408    pub bytes: CategoryLimit,
409    /// Rate limit applied to attachment item count ([`DataCategory::AttachmentItem`]).
410    pub count: CategoryLimit,
411}
412
413impl AttachmentLimits {
414    fn is_active(&self) -> bool {
415        self.bytes.is_active() || self.count.is_active()
416    }
417}
418
419/// Rate limiting information for attachments grouped by [`AttachmentParentType`].
420///
421/// See [`AttachmentQuantities`] for the corresponding quantity tracking.
422#[derive(Default, Debug)]
423#[cfg_attr(test, derive(Clone))]
424pub struct AttachmentsLimits {
425    /// Limits for V1 Attachments.
426    pub event: AttachmentLimits,
427    /// Limits for trace V2 Attachments.
428    pub trace: AttachmentLimits,
429    /// Limits for span V2 Attachments.
430    pub span: AttachmentLimits,
431}
432
433/// Information on the limited quantities returned by [`EnvelopeLimiter::compute`].
434#[derive(Default, Debug)]
435#[cfg_attr(test, derive(Clone))]
436pub struct Enforcement {
437    /// The event item rate limit.
438    pub event: CategoryLimit,
439    /// The rate limit for the indexed category of the event.
440    pub event_indexed: CategoryLimit,
441    /// The attachments limits
442    pub attachments_limits: AttachmentsLimits,
443    /// The combined session item rate limit.
444    pub sessions: CategoryLimit,
445    /// The combined profile item rate limit.
446    pub profiles: CategoryLimit,
447    /// The rate limit for the indexed profiles category.
448    pub profiles_indexed: CategoryLimit,
449    /// The combined replay item rate limit.
450    pub replays: CategoryLimit,
451    /// The combined check-in item rate limit.
452    pub check_ins: CategoryLimit,
453    /// The combined logs (our product logs) rate limit.
454    pub log_items: CategoryLimit,
455    /// The combined logs (our product logs) rate limit.
456    pub log_bytes: CategoryLimit,
457    /// The combined spans rate limit.
458    pub spans: CategoryLimit,
459    /// The rate limit for the indexed span category.
460    pub spans_indexed: CategoryLimit,
461    /// The rate limit for user report v1.
462    pub user_reports: CategoryLimit,
463    /// The combined profile chunk item rate limit.
464    pub profile_chunks: CategoryLimit,
465    /// The combined profile chunk ui item rate limit.
466    pub profile_chunks_ui: CategoryLimit,
467    /// The combined trace metric item rate limit.
468    pub trace_metrics: CategoryLimit,
469}
470
471impl Enforcement {
472    /// Returns the `CategoryLimit` for the event.
473    ///
474    /// `None` if the event is not rate limited.
475    pub fn active_event(&self) -> Option<&CategoryLimit> {
476        if self.event.is_active() {
477            Some(&self.event)
478        } else if self.event_indexed.is_active() {
479            Some(&self.event_indexed)
480        } else {
481            None
482        }
483    }
484
485    /// Returns `true` if the event is rate limited.
486    pub fn is_event_active(&self) -> bool {
487        self.active_event().is_some()
488    }
489
490    /// Helper for `track_outcomes`.
491    fn get_outcomes(self) -> impl Iterator<Item = (Outcome, DataCategory, usize)> {
492        let Self {
493            event,
494            event_indexed,
495            attachments_limits:
496                AttachmentsLimits {
497                    event:
498                        AttachmentLimits {
499                            bytes: event_attachment_bytes,
500                            count: event_attachment_item,
501                        },
502                    trace:
503                        AttachmentLimits {
504                            bytes: trace_attachment_bytes,
505                            count: trace_attachment_item,
506                        },
507                    span:
508                        AttachmentLimits {
509                            bytes: span_attachment_bytes,
510                            count: span_attachment_item,
511                        },
512                },
513            sessions: _, // Do not report outcomes for sessions.
514            profiles,
515            profiles_indexed,
516            replays,
517            check_ins,
518            log_items,
519            log_bytes,
520            spans,
521            spans_indexed,
522            user_reports,
523            profile_chunks,
524            profile_chunks_ui,
525            trace_metrics,
526        } = self;
527
528        let limits = [
529            event,
530            event_indexed,
531            event_attachment_bytes,
532            event_attachment_item,
533            trace_attachment_bytes,
534            trace_attachment_item,
535            span_attachment_bytes,
536            span_attachment_item,
537            profiles,
538            profiles_indexed,
539            replays,
540            check_ins,
541            log_items,
542            log_bytes,
543            spans,
544            spans_indexed,
545            user_reports,
546            profile_chunks,
547            profile_chunks_ui,
548            trace_metrics,
549        ];
550
551        limits
552            .into_iter()
553            .filter(|limit| limit.quantity > 0)
554            .filter_map(move |limit| {
555                Some((
556                    Outcome::RateLimited(limit.reason_code),
557                    limit.category?,
558                    limit.quantity,
559                ))
560            })
561    }
562
563    /// Applies the [`Enforcement`] on the [`Envelope`] by removing all items that were rate limited
564    /// and emits outcomes for each rate limited category.
565    ///
566    /// # Example
567    ///
568    /// ## Interaction between Events and Attachments
569    ///
570    /// An envelope with an `Error` event and an `Attachment`. Two quotas specify to drop all
571    /// attachments (reason `"a"`) and all errors (reason `"e"`). The result of enforcement will be:
572    ///
573    /// 1. All items are removed from the envelope.
574    /// 2. Enforcements report both the event and the attachment dropped with reason `"e"`, since
575    ///    dropping an event automatically drops all attachments with the same reason.
576    /// 3. Rate limits report the single event limit `"e"`, since attachment limits do not need to
577    ///    be checked in this case.
578    ///
579    /// ## Required Attachments
580    ///
581    /// An envelope with a single Minidump `Attachment`, and a single quota specifying to drop all
582    /// attachments with reason `"a"`:
583    ///
584    /// 1. Since the minidump creates an event and is required for processing, it remains in the
585    ///    envelope and is marked as `rate_limited`.
586    /// 2. Enforcements report the attachment dropped with reason `"a"`.
587    /// 3. Rate limits are empty since it is allowed to send required attachments even when rate
588    ///    limited.
589    ///
590    /// ## Previously Rate Limited Attachments
591    ///
592    /// An envelope with a single item marked as `rate_limited`, and a quota specifying to drop
593    /// everything with reason `"d"`:
594    ///
595    /// 1. The item remains in the envelope.
596    /// 2. Enforcements are empty. Rate limiting has occurred at an earlier stage in the pipeline.
597    /// 3. Rate limits are empty.
598    pub fn apply_with_outcomes(self, envelope: &mut ManagedEnvelope) {
599        envelope
600            .envelope_mut()
601            .retain_items(|item| self.retain_item(item));
602        self.track_outcomes(envelope);
603    }
604
605    /// Applies the [`Enforcement`] on the [`Envelope`] by removing all items that were rate limited
606    /// and emits outcomes for each rate limited category.
607    ///
608    /// Works exactly like [`Self::apply_with_outcomes`], but instead operates on [`Managed`]
609    /// instead of [`ManagedEnvelope`].
610    pub fn apply_to_managed(self, envelope: &mut Managed<Box<Envelope>>) {
611        envelope.modify(|envelope, records| {
612            envelope.retain_items(|item| self.retain_item(item));
613
614            // Sessions currently do not emit any outcomes, but may be dropped.
615            records.lenient(DataCategory::Session);
616            // This is an existing bug in how user reports handle rate limits and emit outcomes.
617            //
618            // User report v1 and v2 (feedback) are counting into the same category, but that is not
619            // completely consistent leading to some mismatches when emitting outcomes from rate
620            // limiting vs how outcomes are counted on the `Managed` instance.
621            //
622            // Issue: <https://github.com/getsentry/relay/issues/5524>.
623            records.lenient(DataCategory::UserReportV2);
624
625            for (outcome, category, quantity) in self.get_outcomes() {
626                records.reject_err(outcome, (category, quantity))
627            }
628        });
629    }
630
631    /// Returns `true` when an [`Item`] can be retained, `false` otherwise.
632    fn retain_item(&self, item: &mut Item) -> bool {
633        // Remove event items and all items that depend on this event
634        if self.event.is_active() && item.requires_event() {
635            return false;
636        }
637
638        // When checking limits for categories that have an indexed variant,
639        // we only have to check the more specific, the indexed, variant
640        // to determine whether an item is limited.
641        match item.ty() {
642            ItemType::Attachment => {
643                match item.attachment_parent_type() {
644                    AttachmentParentType::Span => !self.attachments_limits.span.is_active(),
645                    AttachmentParentType::Trace => !self.attachments_limits.trace.is_active(),
646                    AttachmentParentType::Event => {
647                        if !self.attachments_limits.event.is_active() {
648                            return true;
649                        }
650                        if item.creates_event() {
651                            item.set_rate_limited(true);
652                            true
653                        } else {
654                            false
655                        }
656                    }
657                }
658            }
659            ItemType::Session => !self.sessions.is_active(),
660            ItemType::Profile => !self.profiles_indexed.is_active(),
661            ItemType::ReplayEvent => !self.replays.is_active(),
662            ItemType::ReplayVideo => !self.replays.is_active(),
663            ItemType::ReplayRecording => !self.replays.is_active(),
664            ItemType::UserReport => !self.user_reports.is_active(),
665            ItemType::CheckIn => !self.check_ins.is_active(),
666            ItemType::Log => {
667                !(self.log_items.is_active() || self.log_bytes.is_active())
668            }
669            ItemType::Span => !self.spans_indexed.is_active(),
670            ItemType::ProfileChunk => match item.profile_type() {
671                Some(ProfileType::Backend) => !self.profile_chunks.is_active(),
672                Some(ProfileType::Ui) => !self.profile_chunks_ui.is_active(),
673                None => true,
674            },
675            ItemType::TraceMetric => !self.trace_metrics.is_active(),
676            ItemType::Integration => match item.integration() {
677                Some(Integration::Logs(_)) => !(self.log_items.is_active() || self.log_bytes.is_active()),
678                Some(Integration::Spans(_)) => !self.spans_indexed.is_active(),
679                None => true,
680            },
681            ItemType::Event
682            | ItemType::Transaction
683            | ItemType::Security
684            | ItemType::FormData
685            | ItemType::RawSecurity
686            | ItemType::Nel
687            | ItemType::UnrealReport
688            | ItemType::Sessions
689            | ItemType::Statsd
690            | ItemType::MetricBuckets
691            | ItemType::ClientReport
692            | ItemType::UserReportV2  // This is an event type.
693            | ItemType::Unknown(_) => true,
694        }
695    }
696
697    /// Invokes track outcome on all enforcements reported by the [`EnvelopeLimiter`].
698    ///
699    /// Relay generally does not emit outcomes for sessions, so those are skipped.
700    fn track_outcomes(self, envelope: &mut ManagedEnvelope) {
701        for (outcome, category, quantity) in self.get_outcomes() {
702            envelope.track_outcome(outcome, category, quantity)
703        }
704    }
705}
706
707/// Which limits to check with the [`EnvelopeLimiter`].
708#[derive(Debug, Copy, Clone)]
709pub enum CheckLimits {
710    /// Checks all limits except indexed categories.
711    ///
712    /// In the fast path it is necessary to apply cached rate limits but to not enforce indexed rate limits.
713    /// Because at the time of the check the decision whether an envelope is sampled or not is not yet known.
714    /// Additionally even if the item is later dropped by dynamic sampling, it must still be around to extract metrics
715    /// and cannot be dropped too early.
716    NonIndexed,
717    /// Checks all limits against the envelope.
718    All,
719}
720
721struct Check<F, E, R> {
722    limits: CheckLimits,
723    check: F,
724    _1: PhantomData<E>,
725    _2: PhantomData<R>,
726}
727
728impl<F, E, R> Check<F, E, R>
729where
730    F: FnMut(ItemScoping, usize) -> R,
731    R: Future<Output = Result<RateLimits, E>>,
732{
733    async fn apply(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, E> {
734        if matches!(self.limits, CheckLimits::NonIndexed) && scoping.category.is_indexed() {
735            return Ok(RateLimits::default());
736        }
737
738        (self.check)(scoping, quantity).await
739    }
740}
741
742/// Enforces rate limits with the given `check` function on items in the envelope.
743///
744/// The `check` function is called with the following rules:
745///  - Once for a single event, if present in the envelope.
746///  - Once for all comprised attachments, unless the event was rate limited.
747///  - Once for all comprised sessions.
748///
749/// Items violating the rate limit are removed from the envelope. This follows a set of rules:
750///  - If the event is removed, all items depending on the event are removed (e.g. attachments).
751///  - Attachments are not removed if they create events (e.g. minidumps).
752///  - Sessions are handled separately from all of the above.
753pub struct EnvelopeLimiter<F, E, R> {
754    check: Check<F, E, R>,
755    event_category: Option<DataCategory>,
756}
757
758impl<'a, F, E, R> EnvelopeLimiter<F, E, R>
759where
760    F: FnMut(ItemScoping, usize) -> R,
761    R: Future<Output = Result<RateLimits, E>>,
762{
763    /// Create a new `EnvelopeLimiter` with the given `check` function.
764    pub fn new(limits: CheckLimits, check: F) -> Self {
765        Self {
766            check: Check {
767                check,
768                limits,
769                _1: PhantomData,
770                _2: PhantomData,
771            },
772            event_category: None,
773        }
774    }
775
776    /// Assume an event with the given category, even if no item is present in the envelope.
777    ///
778    /// This ensures that rate limits for the given data category are checked even if there is no
779    /// matching item in the envelope. Other items are handled according to the rules as if the
780    /// event item were present.
781    pub fn assume_event(&mut self, category: DataCategory) {
782        self.event_category = Some(category);
783    }
784
785    /// Process rate limits for the envelope, returning applied limits.
786    ///
787    /// Returns a tuple of `Enforcement` and `RateLimits`:
788    ///
789    /// - Enforcements declare the quantities of categories that have been rate limited with the
790    ///   individual reason codes that caused rate limiting. If multiple rate limits applied to a
791    ///   category, then the longest limit is reported.
792    /// - Rate limits declare all active rate limits, regardless of whether they have been applied
793    ///   to items in the envelope. This excludes rate limits applied to required attachments, since
794    ///   clients are allowed to continue sending them.
795    pub async fn compute(
796        mut self,
797        envelope: &Envelope,
798        scoping: &'a Scoping,
799    ) -> Result<(Enforcement, RateLimits), E> {
800        let mut summary = EnvelopeSummary::compute(envelope);
801        summary.event_category = self.event_category.or(summary.event_category);
802
803        let (enforcement, rate_limits) = self.execute(&summary, scoping).await?;
804        Ok((enforcement, rate_limits))
805    }
806
807    async fn execute(
808        &mut self,
809        summary: &EnvelopeSummary,
810        scoping: &'a Scoping,
811    ) -> Result<(Enforcement, RateLimits), E> {
812        let mut rate_limits = RateLimits::new();
813        let mut enforcement = Enforcement::default();
814
815        // Handle event.
816        if let Some(category) = summary.event_category {
817            // Check the broad category for limits.
818            let mut event_limits = self.check.apply(scoping.item(category), 1).await?;
819            enforcement.event = CategoryLimit::new(category, 1, event_limits.longest());
820
821            if let Some(index_category) = category.index_category() {
822                // Check the specific/indexed category for limits only if the specific one has not already
823                // an enforced limit.
824                if event_limits.is_empty() {
825                    event_limits.merge(self.check.apply(scoping.item(index_category), 1).await?);
826                }
827
828                enforcement.event_indexed =
829                    CategoryLimit::new(index_category, 1, event_limits.longest());
830            };
831
832            rate_limits.merge(event_limits);
833        }
834
835        // Handle spans.
836        if enforcement.is_event_active() {
837            enforcement.spans = enforcement
838                .event
839                .clone_for(DataCategory::Span, summary.span_quantity);
840
841            enforcement.spans_indexed = enforcement
842                .event_indexed
843                .clone_for(DataCategory::SpanIndexed, summary.span_quantity);
844        } else if summary.span_quantity > 0 || summary.has_span_dependent_items() {
845            let mut span_limits = self
846                .check
847                .apply(scoping.item(DataCategory::Span), summary.span_quantity)
848                .await?;
849            enforcement.spans = CategoryLimit::new(
850                DataCategory::Span,
851                summary.span_quantity,
852                span_limits.longest(),
853            );
854
855            if span_limits.is_empty() {
856                span_limits.merge(
857                    self.check
858                        .apply(
859                            scoping.item(DataCategory::SpanIndexed),
860                            summary.span_quantity,
861                        )
862                        .await?,
863                );
864            }
865
866            enforcement.spans_indexed = CategoryLimit::new(
867                DataCategory::SpanIndexed,
868                summary.span_quantity,
869                span_limits.longest(),
870            );
871
872            rate_limits.merge(span_limits);
873        }
874
875        // Handle span attachments
876        if enforcement.spans_indexed.is_active() {
877            enforcement.attachments_limits.span.bytes = enforcement.spans_indexed.clone_for(
878                DataCategory::Attachment,
879                summary.attachment_quantities.span.bytes,
880            );
881            enforcement.attachments_limits.span.count = enforcement.spans_indexed.clone_for(
882                DataCategory::AttachmentItem,
883                summary.attachment_quantities.span.count,
884            );
885        } else if !summary.attachment_quantities.span.is_empty() {
886            // While we could combine this check with the check that we do for event and trace
887            // attachments, this would complicate the logic so we opted against doing that.
888            // In practice the performance impact should be negligible since different types
889            // of attachments should rarely be send together.
890            enforcement.attachments_limits.span = self
891                .check_attachment_limits(scoping, &summary.attachment_quantities.span)
892                .await?;
893        }
894
895        // Handle attachments.
896        if let Some(limit) = enforcement.active_event() {
897            let limit1 = limit.clone_for(
898                DataCategory::Attachment,
899                summary.attachment_quantities.event.bytes,
900            );
901            let limit2 = limit.clone_for(
902                DataCategory::AttachmentItem,
903                summary.attachment_quantities.event.count,
904            );
905
906            enforcement.attachments_limits.event.bytes = limit1;
907            enforcement.attachments_limits.event.count = limit2;
908        } else {
909            let mut attachment_limits = RateLimits::new();
910            if summary.attachment_quantities.event.bytes > 0 {
911                let item_scoping = scoping.item(DataCategory::Attachment);
912
913                let attachment_byte_limits = self
914                    .check
915                    .apply(item_scoping, summary.attachment_quantities.event.bytes)
916                    .await?;
917
918                enforcement.attachments_limits.event.bytes = CategoryLimit::new(
919                    DataCategory::Attachment,
920                    summary.attachment_quantities.event.bytes,
921                    attachment_byte_limits.longest(),
922                );
923                enforcement.attachments_limits.event.count =
924                    enforcement.attachments_limits.event.bytes.clone_for(
925                        DataCategory::AttachmentItem,
926                        summary.attachment_quantities.event.count,
927                    );
928                attachment_limits.merge(attachment_byte_limits);
929            }
930            if !attachment_limits.is_limited() && summary.attachment_quantities.event.count > 0 {
931                let item_scoping = scoping.item(DataCategory::AttachmentItem);
932
933                let attachment_item_limits = self
934                    .check
935                    .apply(item_scoping, summary.attachment_quantities.event.count)
936                    .await?;
937
938                enforcement.attachments_limits.event.count = CategoryLimit::new(
939                    DataCategory::AttachmentItem,
940                    summary.attachment_quantities.event.count,
941                    attachment_item_limits.longest(),
942                );
943                enforcement.attachments_limits.event.bytes =
944                    enforcement.attachments_limits.event.count.clone_for(
945                        DataCategory::Attachment,
946                        summary.attachment_quantities.event.bytes,
947                    );
948                attachment_limits.merge(attachment_item_limits);
949            }
950
951            // Only record rate limits for plain attachments. For all other attachments, it's
952            // perfectly "legal" to send them. They will still be discarded in Sentry, but clients
953            // can continue to send them.
954            if summary.has_plain_attachments {
955                rate_limits.merge(attachment_limits);
956            }
957        }
958
959        // Handle trace attachments.
960        if !summary.attachment_quantities.trace.is_empty() {
961            enforcement.attachments_limits.trace = self
962                .check_attachment_limits(scoping, &summary.attachment_quantities.trace)
963                .await?;
964        }
965
966        // Handle sessions.
967        if summary.session_quantity > 0 {
968            let item_scoping = scoping.item(DataCategory::Session);
969            let session_limits = self
970                .check
971                .apply(item_scoping, summary.session_quantity)
972                .await?;
973            enforcement.sessions = CategoryLimit::new(
974                DataCategory::Session,
975                summary.session_quantity,
976                session_limits.longest(),
977            );
978            rate_limits.merge(session_limits);
979        }
980
981        // Handle trace metrics.
982        if summary.trace_metric_quantity > 0 {
983            let item_scoping = scoping.item(DataCategory::TraceMetric);
984            let trace_metric_limits = self
985                .check
986                .apply(item_scoping, summary.trace_metric_quantity)
987                .await?;
988            enforcement.trace_metrics = CategoryLimit::new(
989                DataCategory::TraceMetric,
990                summary.trace_metric_quantity,
991                trace_metric_limits.longest(),
992            );
993            rate_limits.merge(trace_metric_limits);
994        }
995
996        // Handle logs.
997        if summary.log_item_quantity > 0 {
998            let item_scoping = scoping.item(DataCategory::LogItem);
999            let log_limits = self
1000                .check
1001                .apply(item_scoping, summary.log_item_quantity)
1002                .await?;
1003            enforcement.log_items = CategoryLimit::new(
1004                DataCategory::LogItem,
1005                summary.log_item_quantity,
1006                log_limits.longest(),
1007            );
1008            rate_limits.merge(log_limits);
1009        }
1010        if summary.log_byte_quantity > 0 {
1011            let item_scoping = scoping.item(DataCategory::LogByte);
1012            let log_limits = self
1013                .check
1014                .apply(item_scoping, summary.log_byte_quantity)
1015                .await?;
1016            enforcement.log_bytes = CategoryLimit::new(
1017                DataCategory::LogByte,
1018                summary.log_byte_quantity,
1019                log_limits.longest(),
1020            );
1021            rate_limits.merge(log_limits);
1022        }
1023
1024        // Handle profiles.
1025        if enforcement.is_event_active() {
1026            enforcement.profiles = enforcement
1027                .event
1028                .clone_for(DataCategory::Profile, summary.profile_quantity);
1029
1030            enforcement.profiles_indexed = enforcement
1031                .event_indexed
1032                .clone_for(DataCategory::ProfileIndexed, summary.profile_quantity)
1033        } else if summary.profile_quantity > 0 {
1034            let mut profile_limits = self
1035                .check
1036                .apply(
1037                    scoping.item(DataCategory::Profile),
1038                    summary.profile_quantity,
1039                )
1040                .await?;
1041
1042            // Profiles can persist in envelopes without transaction if the transaction item
1043            // was dropped by dynamic sampling.
1044            if profile_limits.is_empty() && summary.event_category.is_none() {
1045                profile_limits = self
1046                    .check
1047                    .apply(scoping.item(DataCategory::Transaction), 0)
1048                    .await?;
1049            }
1050
1051            enforcement.profiles = CategoryLimit::new(
1052                DataCategory::Profile,
1053                summary.profile_quantity,
1054                profile_limits.longest(),
1055            );
1056
1057            if profile_limits.is_empty() {
1058                profile_limits.merge(
1059                    self.check
1060                        .apply(
1061                            scoping.item(DataCategory::ProfileIndexed),
1062                            summary.profile_quantity,
1063                        )
1064                        .await?,
1065                );
1066            }
1067
1068            enforcement.profiles_indexed = CategoryLimit::new(
1069                DataCategory::ProfileIndexed,
1070                summary.profile_quantity,
1071                profile_limits.longest(),
1072            );
1073
1074            rate_limits.merge(profile_limits);
1075        }
1076
1077        // Handle replays.
1078        if summary.replay_quantity > 0 {
1079            let item_scoping = scoping.item(DataCategory::Replay);
1080            let replay_limits = self
1081                .check
1082                .apply(item_scoping, summary.replay_quantity)
1083                .await?;
1084            enforcement.replays = CategoryLimit::new(
1085                DataCategory::Replay,
1086                summary.replay_quantity,
1087                replay_limits.longest(),
1088            );
1089            rate_limits.merge(replay_limits);
1090        }
1091
1092        // Handle user report v1s, which share limits with v2.
1093        if summary.user_report_quantity > 0 {
1094            let item_scoping = scoping.item(DataCategory::UserReportV2);
1095            let user_report_v2_limits = self
1096                .check
1097                .apply(item_scoping, summary.user_report_quantity)
1098                .await?;
1099            enforcement.user_reports = CategoryLimit::new(
1100                DataCategory::UserReportV2,
1101                summary.user_report_quantity,
1102                user_report_v2_limits.longest(),
1103            );
1104            rate_limits.merge(user_report_v2_limits);
1105        }
1106
1107        // Handle monitor checkins.
1108        if summary.monitor_quantity > 0 {
1109            let item_scoping = scoping.item(DataCategory::Monitor);
1110            let checkin_limits = self
1111                .check
1112                .apply(item_scoping, summary.monitor_quantity)
1113                .await?;
1114            enforcement.check_ins = CategoryLimit::new(
1115                DataCategory::Monitor,
1116                summary.monitor_quantity,
1117                checkin_limits.longest(),
1118            );
1119            rate_limits.merge(checkin_limits);
1120        }
1121
1122        // Handle profile chunks.
1123        if summary.profile_chunk_quantity > 0 {
1124            let item_scoping = scoping.item(DataCategory::ProfileChunk);
1125            let limits = self
1126                .check
1127                .apply(item_scoping, summary.profile_chunk_quantity)
1128                .await?;
1129            enforcement.profile_chunks = CategoryLimit::new(
1130                DataCategory::ProfileChunk,
1131                summary.profile_chunk_quantity,
1132                limits.longest(),
1133            );
1134            rate_limits.merge(limits);
1135        }
1136
1137        if summary.profile_chunk_ui_quantity > 0 {
1138            let item_scoping = scoping.item(DataCategory::ProfileChunkUi);
1139            let limits = self
1140                .check
1141                .apply(item_scoping, summary.profile_chunk_ui_quantity)
1142                .await?;
1143            enforcement.profile_chunks_ui = CategoryLimit::new(
1144                DataCategory::ProfileChunkUi,
1145                summary.profile_chunk_ui_quantity,
1146                limits.longest(),
1147            );
1148            rate_limits.merge(limits);
1149        }
1150
1151        Ok((enforcement, rate_limits))
1152    }
1153
1154    async fn check_attachment_limits(
1155        &mut self,
1156        scoping: &Scoping,
1157        quantities: &AttachmentQuantity,
1158    ) -> Result<AttachmentLimits, E> {
1159        let mut attachment_limits = self
1160            .check
1161            .apply(scoping.item(DataCategory::Attachment), quantities.bytes)
1162            .await?;
1163
1164        // Note: The check here is taken from the attachments logic for consistency I think just
1165        // checking `is_empty` should be fine?
1166        if !attachment_limits.is_limited() && quantities.count > 0 {
1167            attachment_limits.merge(
1168                self.check
1169                    .apply(scoping.item(DataCategory::AttachmentItem), quantities.count)
1170                    .await?,
1171            );
1172        }
1173
1174        Ok(AttachmentLimits {
1175            bytes: CategoryLimit::new(
1176                DataCategory::Attachment,
1177                quantities.bytes,
1178                attachment_limits.longest(),
1179            ),
1180            count: CategoryLimit::new(
1181                DataCategory::AttachmentItem,
1182                quantities.count,
1183                attachment_limits.longest(),
1184            ),
1185        })
1186    }
1187}
1188
1189impl<F, E, R> fmt::Debug for EnvelopeLimiter<F, E, R> {
1190    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1191        f.debug_struct("EnvelopeLimiter")
1192            .field("event_category", &self.event_category)
1193            .finish()
1194    }
1195}
1196
1197#[cfg(test)]
1198mod tests {
1199
1200    use std::collections::{BTreeMap, BTreeSet};
1201    use std::sync::Arc;
1202
1203    use relay_base_schema::organization::OrganizationId;
1204    use relay_base_schema::project::{ProjectId, ProjectKey};
1205    use relay_metrics::MetricNamespace;
1206    use relay_quotas::RetryAfter;
1207    use relay_system::Addr;
1208    use smallvec::smallvec;
1209    use tokio::sync::Mutex;
1210
1211    use super::*;
1212    use crate::envelope::ParentId;
1213    use crate::{
1214        envelope::{AttachmentType, ContentType, SourceQuantities},
1215        extractors::RequestMeta,
1216    };
1217
1218    struct RateLimitTestCase {
1219        name: &'static str,
1220        denied_categories: &'static [DataCategory],
1221        expect_attachment_limit_active: bool,
1222        expected_limiter_calls: &'static [(DataCategory, usize)],
1223        expected_outcomes: &'static [(DataCategory, usize)],
1224    }
1225
1226    #[tokio::test]
1227    async fn test_format_rate_limits() {
1228        let mut rate_limits = RateLimits::new();
1229
1230        // Add a generic rate limit for all categories.
1231        rate_limits.add(RateLimit {
1232            categories: Default::default(),
1233            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1234            reason_code: Some(ReasonCode::new("my_limit")),
1235            retry_after: RetryAfter::from_secs(42),
1236            namespaces: smallvec![],
1237        });
1238
1239        // Add a more specific rate limit for just one category.
1240        rate_limits.add(RateLimit {
1241            categories: [DataCategory::Transaction, DataCategory::Security].into(),
1242            scope: RateLimitScope::Project(ProjectId::new(21)),
1243            reason_code: None,
1244            retry_after: RetryAfter::from_secs(4711),
1245            namespaces: smallvec![],
1246        });
1247
1248        let formatted = format_rate_limits(&rate_limits);
1249        let expected = "42::organization:my_limit, 4711:transaction;security:project";
1250        assert_eq!(formatted, expected);
1251    }
1252
1253    #[tokio::test]
1254    async fn test_format_rate_limits_namespace() {
1255        let mut rate_limits = RateLimits::new();
1256
1257        // Rate limit with reason code and namespace.
1258        rate_limits.add(RateLimit {
1259            categories: [DataCategory::MetricBucket].into(),
1260            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1261            reason_code: Some(ReasonCode::new("my_limit")),
1262            retry_after: RetryAfter::from_secs(42),
1263            namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1264        });
1265
1266        // Rate limit without reason code.
1267        rate_limits.add(RateLimit {
1268            categories: [DataCategory::MetricBucket].into(),
1269            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1270            reason_code: None,
1271            retry_after: RetryAfter::from_secs(42),
1272            namespaces: smallvec![MetricNamespace::Spans],
1273        });
1274
1275        let formatted = format_rate_limits(&rate_limits);
1276        let expected = "42:metric_bucket:organization:my_limit:custom;spans, 42:metric_bucket:organization::spans";
1277        assert_eq!(formatted, expected);
1278    }
1279
1280    #[tokio::test]
1281    async fn test_parse_invalid_rate_limits() {
1282        let scoping = Scoping {
1283            organization_id: OrganizationId::new(42),
1284            project_id: ProjectId::new(21),
1285            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1286            key_id: Some(17),
1287        };
1288
1289        assert!(parse_rate_limits(&scoping, "").is_ok());
1290        assert!(parse_rate_limits(&scoping, "invalid").is_ok());
1291        assert!(parse_rate_limits(&scoping, ",,,").is_ok());
1292    }
1293
1294    #[tokio::test]
1295    async fn test_parse_rate_limits() {
1296        let scoping = Scoping {
1297            organization_id: OrganizationId::new(42),
1298            project_id: ProjectId::new(21),
1299            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1300            key_id: Some(17),
1301        };
1302
1303        // contains "foobar", an unknown scope that should be mapped to Unknown
1304        let formatted =
1305            "42::organization:my_limit, invalid, 4711:foobar;transaction;security:project";
1306        let rate_limits: Vec<RateLimit> =
1307            parse_rate_limits(&scoping, formatted).into_iter().collect();
1308
1309        assert_eq!(
1310            rate_limits,
1311            vec![
1312                RateLimit {
1313                    categories: Default::default(),
1314                    scope: RateLimitScope::Organization(OrganizationId::new(42)),
1315                    reason_code: Some(ReasonCode::new("my_limit")),
1316                    retry_after: rate_limits[0].retry_after,
1317                    namespaces: smallvec![],
1318                },
1319                RateLimit {
1320                    categories: [
1321                        DataCategory::Unknown,
1322                        DataCategory::Transaction,
1323                        DataCategory::Security,
1324                    ]
1325                    .into(),
1326                    scope: RateLimitScope::Project(ProjectId::new(21)),
1327                    reason_code: None,
1328                    retry_after: rate_limits[1].retry_after,
1329                    namespaces: smallvec![],
1330                }
1331            ]
1332        );
1333
1334        assert_eq!(42, rate_limits[0].retry_after.remaining_seconds());
1335        assert_eq!(4711, rate_limits[1].retry_after.remaining_seconds());
1336    }
1337
1338    #[tokio::test]
1339    async fn test_parse_rate_limits_namespace() {
1340        let scoping = Scoping {
1341            organization_id: OrganizationId::new(42),
1342            project_id: ProjectId::new(21),
1343            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1344            key_id: Some(17),
1345        };
1346
1347        let formatted = "42:metric_bucket:organization::custom;spans";
1348        let rate_limits: Vec<RateLimit> =
1349            parse_rate_limits(&scoping, formatted).into_iter().collect();
1350
1351        assert_eq!(
1352            rate_limits,
1353            vec![RateLimit {
1354                categories: [DataCategory::MetricBucket].into(),
1355                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1356                reason_code: None,
1357                retry_after: rate_limits[0].retry_after,
1358                namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1359            }]
1360        );
1361    }
1362
1363    #[tokio::test]
1364    async fn test_parse_rate_limits_empty_namespace() {
1365        let scoping = Scoping {
1366            organization_id: OrganizationId::new(42),
1367            project_id: ProjectId::new(21),
1368            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1369            key_id: Some(17),
1370        };
1371
1372        // notice the trailing colon
1373        let formatted = "42:metric_bucket:organization:some_reason:";
1374        let rate_limits: Vec<RateLimit> =
1375            parse_rate_limits(&scoping, formatted).into_iter().collect();
1376
1377        assert_eq!(
1378            rate_limits,
1379            vec![RateLimit {
1380                categories: [DataCategory::MetricBucket].into(),
1381                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1382                reason_code: Some(ReasonCode::new("some_reason")),
1383                retry_after: rate_limits[0].retry_after,
1384                namespaces: smallvec![],
1385            }]
1386        );
1387    }
1388
1389    #[tokio::test]
1390    async fn test_parse_rate_limits_only_unknown() {
1391        let scoping = Scoping {
1392            organization_id: OrganizationId::new(42),
1393            project_id: ProjectId::new(21),
1394            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1395            key_id: Some(17),
1396        };
1397
1398        let formatted = "42:foo;bar:organization";
1399        let rate_limits: Vec<RateLimit> =
1400            parse_rate_limits(&scoping, formatted).into_iter().collect();
1401
1402        assert_eq!(
1403            rate_limits,
1404            vec![RateLimit {
1405                categories: [DataCategory::Unknown, DataCategory::Unknown].into(),
1406                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1407                reason_code: None,
1408                retry_after: rate_limits[0].retry_after,
1409                namespaces: smallvec![],
1410            },]
1411        );
1412    }
1413
1414    macro_rules! envelope {
1415        ($( $item_type:ident $( :: $attachment_type:ident )? ),*) => {{
1416            let bytes = "{\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}";
1417            #[allow(unused_mut)]
1418            let mut envelope = Envelope::parse_bytes(bytes.into()).unwrap();
1419            $(
1420                let mut item = Item::new(ItemType::$item_type);
1421                item.set_payload(ContentType::OctetStream, "0123456789");
1422                $( item.set_attachment_type(AttachmentType::$attachment_type); )?
1423                envelope.add_item(item);
1424            )*
1425
1426            let (outcome_aggregator, _) = Addr::custom();
1427
1428            ManagedEnvelope::new(
1429                envelope,
1430                outcome_aggregator,
1431            )
1432        }}
1433    }
1434
1435    fn set_extracted(envelope: &mut Envelope, ty: ItemType) {
1436        envelope
1437            .get_item_by_mut(|item| *item.ty() == ty)
1438            .unwrap()
1439            .set_metrics_extracted(true);
1440    }
1441
1442    fn rate_limit(category: DataCategory) -> RateLimit {
1443        RateLimit {
1444            categories: [category].into(),
1445            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1446            reason_code: None,
1447            retry_after: RetryAfter::from_secs(60),
1448            namespaces: smallvec![],
1449        }
1450    }
1451
1452    fn trace_attachment_item(bytes: usize, parent_id: Option<ParentId>) -> Item {
1453        let mut item = Item::new(ItemType::Attachment);
1454        item.set_payload(ContentType::TraceAttachment, "0".repeat(bytes));
1455        item.set_parent_id(parent_id);
1456        item
1457    }
1458
1459    #[derive(Debug, Default)]
1460    struct MockLimiter {
1461        denied: Vec<DataCategory>,
1462        called: BTreeMap<DataCategory, usize>,
1463        checked: BTreeSet<DataCategory>,
1464    }
1465
1466    impl MockLimiter {
1467        pub fn deny(mut self, category: DataCategory) -> Self {
1468            self.denied.push(category);
1469            self
1470        }
1471
1472        pub fn check(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, ()> {
1473            let cat = scoping.category;
1474            let previous = self.called.insert(cat, quantity);
1475            assert!(previous.is_none(), "rate limiter invoked twice for {cat}");
1476
1477            let mut limits = RateLimits::new();
1478            if self.denied.contains(&cat) {
1479                limits.add(rate_limit(cat));
1480            }
1481            Ok(limits)
1482        }
1483
1484        #[track_caller]
1485        pub fn assert_call(&mut self, category: DataCategory, expected: usize) {
1486            self.checked.insert(category);
1487
1488            let quantity = self.called.get(&category).copied();
1489            assert_eq!(
1490                quantity,
1491                Some(expected),
1492                "Expected quantity `{expected}` for data category `{category}`, got {quantity:?}."
1493            );
1494        }
1495    }
1496
1497    impl Drop for MockLimiter {
1498        fn drop(&mut self) {
1499            if std::thread::panicking() {
1500                return;
1501            }
1502
1503            for checked in &self.checked {
1504                self.called.remove(checked);
1505            }
1506
1507            if self.called.is_empty() {
1508                return;
1509            }
1510
1511            let not_asserted = self
1512                .called
1513                .iter()
1514                .map(|(k, v)| format!("- {k}: {v}"))
1515                .collect::<Vec<_>>()
1516                .join("\n");
1517
1518            panic!("Following calls to the limiter were not asserted:\n{not_asserted}");
1519        }
1520    }
1521
1522    async fn enforce_and_apply(
1523        mock: Arc<Mutex<MockLimiter>>,
1524        envelope: &mut ManagedEnvelope,
1525        #[allow(unused_variables)] assume_event: Option<DataCategory>,
1526    ) -> (Enforcement, RateLimits) {
1527        let scoping = envelope.scoping();
1528
1529        #[allow(unused_mut)]
1530        let mut limiter = EnvelopeLimiter::new(CheckLimits::All, move |s, q| {
1531            let mock = mock.clone();
1532            async move {
1533                let mut mock = mock.lock().await;
1534                mock.check(s, q)
1535            }
1536        });
1537        #[cfg(feature = "processing")]
1538        if let Some(assume_event) = assume_event {
1539            limiter.assume_event(assume_event);
1540        }
1541
1542        let (enforcement, limits) = limiter
1543            .compute(envelope.envelope_mut(), &scoping)
1544            .await
1545            .unwrap();
1546
1547        // We implemented `clone` only for tests because we don't want to make `apply_with_outcomes`
1548        // &self because we want move semantics to prevent double tracking.
1549        enforcement.clone().apply_with_outcomes(envelope);
1550
1551        (enforcement, limits)
1552    }
1553
1554    fn mock_limiter(categories: &[DataCategory]) -> Arc<Mutex<MockLimiter>> {
1555        let mut mock = MockLimiter::default();
1556        for &category in categories {
1557            mock = mock.deny(category);
1558        }
1559
1560        Arc::new(Mutex::new(mock))
1561    }
1562
1563    #[tokio::test]
1564    async fn test_enforce_pass_empty() {
1565        let mut envelope = envelope![];
1566
1567        let mock = mock_limiter(&[]);
1568        let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1569
1570        assert!(!limits.is_limited());
1571        assert!(envelope.envelope().is_empty());
1572    }
1573
1574    #[tokio::test]
1575    async fn test_enforce_limit_error_event() {
1576        let mut envelope = envelope![Event];
1577
1578        let mock = mock_limiter(&[DataCategory::Error]);
1579        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1580
1581        assert!(limits.is_limited());
1582        assert!(envelope.envelope().is_empty());
1583        mock.lock().await.assert_call(DataCategory::Error, 1);
1584    }
1585
1586    #[tokio::test]
1587    async fn test_enforce_limit_error_with_attachments() {
1588        let mut envelope = envelope![Event, Attachment];
1589
1590        let mock = mock_limiter(&[DataCategory::Error]);
1591        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1592
1593        assert!(limits.is_limited());
1594        assert!(envelope.envelope().is_empty());
1595        mock.lock().await.assert_call(DataCategory::Error, 1);
1596    }
1597
1598    #[tokio::test]
1599    async fn test_enforce_limit_minidump() {
1600        let mut envelope = envelope![Attachment::Minidump];
1601
1602        let mock = mock_limiter(&[DataCategory::Error]);
1603        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1604
1605        assert!(limits.is_limited());
1606        assert!(envelope.envelope().is_empty());
1607        mock.lock().await.assert_call(DataCategory::Error, 1);
1608    }
1609
1610    #[tokio::test]
1611    async fn test_enforce_limit_attachments() {
1612        let mut envelope = envelope![Attachment::Minidump, Attachment];
1613
1614        let mock = mock_limiter(&[DataCategory::Attachment]);
1615        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1616
1617        // Attachments would be limited, but crash reports create events and are thus allowed.
1618        assert!(limits.is_limited());
1619        assert_eq!(envelope.envelope().len(), 1);
1620        mock.lock().await.assert_call(DataCategory::Error, 1);
1621        mock.lock().await.assert_call(DataCategory::Attachment, 20);
1622    }
1623
1624    /// Limit stand-alone profiles.
1625    #[tokio::test]
1626    async fn test_enforce_limit_profiles() {
1627        let mut envelope = envelope![Profile, Profile];
1628
1629        let mock = mock_limiter(&[DataCategory::Profile]);
1630        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1631
1632        assert!(limits.is_limited());
1633        assert_eq!(envelope.envelope().len(), 0);
1634        mock.lock().await.assert_call(DataCategory::Profile, 2);
1635
1636        assert_eq!(
1637            get_outcomes(enforcement),
1638            vec![
1639                (DataCategory::Profile, 2),
1640                (DataCategory::ProfileIndexed, 2)
1641            ]
1642        );
1643    }
1644
1645    /// Limit profile chunks.
1646    #[tokio::test]
1647    async fn test_enforce_limit_profile_chunks_no_profile_type() {
1648        // In this test we have profile chunks which have not yet been classified, which means they
1649        // should not be rate limited.
1650        let mut envelope = envelope![ProfileChunk, ProfileChunk];
1651
1652        let mock = mock_limiter(&[DataCategory::ProfileChunk]);
1653        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1654        assert!(!limits.is_limited());
1655        assert_eq!(get_outcomes(enforcement), vec![]);
1656
1657        let mock = mock_limiter(&[DataCategory::ProfileChunkUi]);
1658        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1659        assert!(!limits.is_limited());
1660        assert_eq!(get_outcomes(enforcement), vec![]);
1661
1662        assert_eq!(envelope.envelope().len(), 2);
1663    }
1664
1665    #[tokio::test]
1666    async fn test_enforce_limit_profile_chunks_ui() {
1667        let mut envelope = envelope![];
1668
1669        let mut item = Item::new(ItemType::ProfileChunk);
1670        item.set_profile_type(ProfileType::Backend);
1671        envelope.envelope_mut().add_item(item);
1672        let mut item = Item::new(ItemType::ProfileChunk);
1673        item.set_profile_type(ProfileType::Ui);
1674        envelope.envelope_mut().add_item(item);
1675
1676        let mock = mock_limiter(&[DataCategory::ProfileChunkUi]);
1677        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1678
1679        assert!(limits.is_limited());
1680        assert_eq!(envelope.envelope().len(), 1);
1681        mock.lock()
1682            .await
1683            .assert_call(DataCategory::ProfileChunkUi, 1);
1684        mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1685
1686        assert_eq!(
1687            get_outcomes(enforcement),
1688            vec![(DataCategory::ProfileChunkUi, 1)]
1689        );
1690    }
1691
1692    #[tokio::test]
1693    async fn test_enforce_limit_profile_chunks_backend() {
1694        let mut envelope = envelope![];
1695
1696        let mut item = Item::new(ItemType::ProfileChunk);
1697        item.set_profile_type(ProfileType::Backend);
1698        envelope.envelope_mut().add_item(item);
1699        let mut item = Item::new(ItemType::ProfileChunk);
1700        item.set_profile_type(ProfileType::Ui);
1701        envelope.envelope_mut().add_item(item);
1702
1703        let mock = mock_limiter(&[DataCategory::ProfileChunk]);
1704        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1705
1706        assert!(limits.is_limited());
1707        assert_eq!(envelope.envelope().len(), 1);
1708        mock.lock()
1709            .await
1710            .assert_call(DataCategory::ProfileChunkUi, 1);
1711        mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1712
1713        assert_eq!(
1714            get_outcomes(enforcement),
1715            vec![(DataCategory::ProfileChunk, 1)]
1716        );
1717    }
1718
1719    /// Limit replays.
1720    #[tokio::test]
1721    async fn test_enforce_limit_replays() {
1722        let mut envelope = envelope![ReplayEvent, ReplayRecording, ReplayVideo];
1723
1724        let mock = mock_limiter(&[DataCategory::Replay]);
1725        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1726
1727        assert!(limits.is_limited());
1728        assert_eq!(envelope.envelope().len(), 0);
1729        mock.lock().await.assert_call(DataCategory::Replay, 3);
1730
1731        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Replay, 3),]);
1732    }
1733
1734    /// Limit monitor checkins.
1735    #[tokio::test]
1736    async fn test_enforce_limit_monitor_checkins() {
1737        let mut envelope = envelope![CheckIn];
1738
1739        let mock = mock_limiter(&[DataCategory::Monitor]);
1740        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1741
1742        assert!(limits.is_limited());
1743        assert_eq!(envelope.envelope().len(), 0);
1744        mock.lock().await.assert_call(DataCategory::Monitor, 1);
1745
1746        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Monitor, 1)])
1747    }
1748
1749    #[tokio::test]
1750    async fn test_enforce_pass_minidump() {
1751        let mut envelope = envelope![Attachment::Minidump];
1752
1753        let mock = mock_limiter(&[DataCategory::Attachment]);
1754        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1755
1756        // If only crash report attachments are present, we don't emit a rate limit.
1757        assert!(!limits.is_limited());
1758        assert_eq!(envelope.envelope().len(), 1);
1759        mock.lock().await.assert_call(DataCategory::Error, 1);
1760        mock.lock().await.assert_call(DataCategory::Attachment, 10);
1761    }
1762
1763    #[tokio::test]
1764    async fn test_enforce_skip_rate_limited() {
1765        let mut envelope = envelope![];
1766
1767        let mut item = Item::new(ItemType::Attachment);
1768        item.set_payload(ContentType::OctetStream, "0123456789");
1769        item.set_rate_limited(true);
1770        envelope.envelope_mut().add_item(item);
1771
1772        let mock = mock_limiter(&[DataCategory::Error]);
1773        let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1774
1775        assert!(!limits.is_limited()); // No new rate limits applied.
1776        assert_eq!(envelope.envelope().len(), 1); // The item was retained
1777    }
1778
1779    #[tokio::test]
1780    async fn test_enforce_pass_sessions() {
1781        let mut envelope = envelope![Session, Session, Session];
1782
1783        let mock = mock_limiter(&[DataCategory::Error]);
1784        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1785
1786        // If only crash report attachments are present, we don't emit a rate limit.
1787        assert!(!limits.is_limited());
1788        assert_eq!(envelope.envelope().len(), 3);
1789        mock.lock().await.assert_call(DataCategory::Session, 3);
1790    }
1791
1792    #[tokio::test]
1793    async fn test_enforce_limit_sessions() {
1794        let mut envelope = envelope![Session, Session, Event];
1795
1796        let mock = mock_limiter(&[DataCategory::Session]);
1797        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1798
1799        // If only crash report attachments are present, we don't emit a rate limit.
1800        assert!(limits.is_limited());
1801        assert_eq!(envelope.envelope().len(), 1);
1802        mock.lock().await.assert_call(DataCategory::Error, 1);
1803        mock.lock().await.assert_call(DataCategory::Session, 2);
1804    }
1805
1806    #[tokio::test]
1807    #[cfg(feature = "processing")]
1808    async fn test_enforce_limit_assumed_event() {
1809        let mut envelope = envelope![];
1810
1811        let mock = mock_limiter(&[DataCategory::Transaction]);
1812        let (_, limits) =
1813            enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Transaction)).await;
1814
1815        assert!(limits.is_limited());
1816        assert!(envelope.envelope().is_empty()); // obviously
1817        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1818    }
1819
1820    #[tokio::test]
1821    #[cfg(feature = "processing")]
1822    async fn test_enforce_limit_assumed_attachments() {
1823        let mut envelope = envelope![Attachment, Attachment];
1824
1825        let mock = mock_limiter(&[DataCategory::Error]);
1826        let (_, limits) =
1827            enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Error)).await;
1828
1829        assert!(limits.is_limited());
1830        assert!(envelope.envelope().is_empty());
1831        mock.lock().await.assert_call(DataCategory::Error, 1);
1832    }
1833
1834    #[tokio::test]
1835    async fn test_enforce_transaction() {
1836        let mut envelope = envelope![Transaction];
1837
1838        let mock = mock_limiter(&[DataCategory::Transaction]);
1839        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1840
1841        assert!(limits.is_limited());
1842        assert!(enforcement.event_indexed.is_active());
1843        assert!(enforcement.event.is_active());
1844        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1845
1846        assert_eq!(
1847            get_outcomes(enforcement),
1848            vec![
1849                (DataCategory::Transaction, 1),
1850                (DataCategory::TransactionIndexed, 1),
1851                (DataCategory::Span, 1),
1852                (DataCategory::SpanIndexed, 1),
1853            ]
1854        );
1855    }
1856
1857    #[tokio::test]
1858    async fn test_enforce_transaction_non_indexed() {
1859        let mut envelope = envelope![Transaction, Profile];
1860        let scoping = envelope.scoping();
1861
1862        let mock = mock_limiter(&[DataCategory::TransactionIndexed]);
1863
1864        let mock_clone = mock.clone();
1865        let limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, move |s, q| {
1866            let mock_clone = mock_clone.clone();
1867            async move {
1868                let mut mock = mock_clone.lock().await;
1869                mock.check(s, q)
1870            }
1871        });
1872        let (enforcement, limits) = limiter
1873            .compute(envelope.envelope_mut(), &scoping)
1874            .await
1875            .unwrap();
1876        enforcement.clone().apply_with_outcomes(&mut envelope);
1877
1878        assert!(!limits.is_limited());
1879        assert!(!enforcement.event_indexed.is_active());
1880        assert!(!enforcement.event.is_active());
1881        assert!(!enforcement.profiles_indexed.is_active());
1882        assert!(!enforcement.profiles.is_active());
1883        assert!(!enforcement.spans.is_active());
1884        assert!(!enforcement.spans_indexed.is_active());
1885        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1886        mock.lock().await.assert_call(DataCategory::Profile, 1);
1887        mock.lock().await.assert_call(DataCategory::Span, 1);
1888    }
1889
1890    #[tokio::test]
1891    async fn test_enforce_transaction_no_indexing_quota() {
1892        let mut envelope = envelope![Transaction];
1893
1894        let mock = mock_limiter(&[DataCategory::TransactionIndexed]);
1895        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1896
1897        assert!(limits.is_limited());
1898        assert!(enforcement.event_indexed.is_active());
1899        assert!(!enforcement.event.is_active());
1900        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1901        mock.lock()
1902            .await
1903            .assert_call(DataCategory::TransactionIndexed, 1);
1904    }
1905
1906    #[tokio::test]
1907    async fn test_enforce_transaction_attachment_enforced() {
1908        let mut envelope = envelope![Transaction, Attachment];
1909
1910        let mock = mock_limiter(&[DataCategory::Transaction]);
1911        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1912
1913        assert!(enforcement.event.is_active());
1914        assert!(enforcement.attachments_limits.event.is_active());
1915        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1916    }
1917
1918    fn get_outcomes(enforcement: Enforcement) -> Vec<(DataCategory, usize)> {
1919        enforcement
1920            .get_outcomes()
1921            .map(|(_, data_category, quantity)| (data_category, quantity))
1922            .collect::<Vec<_>>()
1923    }
1924
1925    #[tokio::test]
1926    async fn test_enforce_transaction_profile_enforced() {
1927        let mut envelope = envelope![Transaction, Profile];
1928
1929        let mock = mock_limiter(&[DataCategory::Transaction]);
1930        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1931
1932        assert!(enforcement.event.is_active());
1933        assert!(enforcement.profiles.is_active());
1934        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1935
1936        assert_eq!(
1937            get_outcomes(enforcement),
1938            vec![
1939                (DataCategory::Transaction, 1),
1940                (DataCategory::TransactionIndexed, 1),
1941                (DataCategory::Profile, 1),
1942                (DataCategory::ProfileIndexed, 1),
1943                (DataCategory::Span, 1),
1944                (DataCategory::SpanIndexed, 1),
1945            ]
1946        );
1947    }
1948
1949    #[tokio::test]
1950    async fn test_enforce_transaction_standalone_profile_enforced() {
1951        // When the transaction is sampled, the profile survives as standalone.
1952        let mut envelope = envelope![Profile];
1953
1954        let mock = mock_limiter(&[DataCategory::Transaction]);
1955        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1956
1957        assert!(enforcement.profiles.is_active());
1958        mock.lock().await.assert_call(DataCategory::Profile, 1);
1959        mock.lock().await.assert_call(DataCategory::Transaction, 0);
1960
1961        assert_eq!(
1962            get_outcomes(enforcement),
1963            vec![
1964                (DataCategory::Profile, 1),
1965                (DataCategory::ProfileIndexed, 1),
1966            ]
1967        );
1968    }
1969
1970    #[tokio::test]
1971    async fn test_enforce_transaction_attachment_enforced_indexing_quota() {
1972        let mut envelope = envelope![Transaction, Attachment];
1973        set_extracted(envelope.envelope_mut(), ItemType::Transaction);
1974
1975        let mock = mock_limiter(&[DataCategory::TransactionIndexed]);
1976        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1977
1978        assert!(!enforcement.event.is_active());
1979        assert!(enforcement.event_indexed.is_active());
1980        assert!(enforcement.attachments_limits.event.is_active());
1981        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1982        mock.lock()
1983            .await
1984            .assert_call(DataCategory::TransactionIndexed, 1);
1985
1986        assert_eq!(
1987            get_outcomes(enforcement),
1988            vec![
1989                (DataCategory::TransactionIndexed, 1),
1990                (DataCategory::Attachment, 10),
1991                (DataCategory::AttachmentItem, 1),
1992                (DataCategory::SpanIndexed, 1),
1993            ]
1994        );
1995    }
1996
1997    #[tokio::test]
1998    async fn test_enforce_span() {
1999        let mut envelope = envelope![Span, Span];
2000
2001        let mock = mock_limiter(&[DataCategory::Span]);
2002        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
2003
2004        assert!(limits.is_limited());
2005        assert!(enforcement.spans_indexed.is_active());
2006        assert!(enforcement.spans.is_active());
2007        mock.lock().await.assert_call(DataCategory::Span, 2);
2008
2009        assert_eq!(
2010            get_outcomes(enforcement),
2011            vec![(DataCategory::Span, 2), (DataCategory::SpanIndexed, 2)]
2012        );
2013    }
2014
2015    #[tokio::test]
2016    async fn test_enforce_span_no_indexing_quota() {
2017        let mut envelope = envelope![Span, Span];
2018
2019        let mock = mock_limiter(&[DataCategory::SpanIndexed]);
2020        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
2021
2022        assert!(limits.is_limited());
2023        assert!(enforcement.spans_indexed.is_active());
2024        assert!(!enforcement.spans.is_active());
2025        mock.lock().await.assert_call(DataCategory::Span, 2);
2026        mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
2027
2028        assert_eq!(
2029            get_outcomes(enforcement),
2030            vec![(DataCategory::SpanIndexed, 2)]
2031        );
2032    }
2033
2034    #[tokio::test]
2035    async fn test_enforce_span_metrics_extracted_no_indexing_quota() {
2036        let mut envelope = envelope![Span, Span];
2037        set_extracted(envelope.envelope_mut(), ItemType::Span);
2038
2039        let mock = mock_limiter(&[DataCategory::SpanIndexed]);
2040        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
2041
2042        assert!(limits.is_limited());
2043        assert!(enforcement.spans_indexed.is_active());
2044        assert!(!enforcement.spans.is_active());
2045        mock.lock().await.assert_call(DataCategory::Span, 2);
2046        mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
2047
2048        assert_eq!(
2049            get_outcomes(enforcement),
2050            vec![(DataCategory::SpanIndexed, 2)]
2051        );
2052    }
2053
2054    #[test]
2055    fn test_source_quantity_for_total_quantity() {
2056        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2057            .parse()
2058            .unwrap();
2059        let request_meta = RequestMeta::new(dsn);
2060
2061        let mut envelope = Envelope::from_request(None, request_meta);
2062
2063        let mut item = Item::new(ItemType::MetricBuckets);
2064        item.set_source_quantities(SourceQuantities {
2065            transactions: 5,
2066            spans: 0,
2067            buckets: 5,
2068        });
2069        envelope.add_item(item);
2070
2071        let mut item = Item::new(ItemType::MetricBuckets);
2072        item.set_source_quantities(SourceQuantities {
2073            transactions: 2,
2074            spans: 0,
2075            buckets: 3,
2076        });
2077        envelope.add_item(item);
2078
2079        let summary = EnvelopeSummary::compute(&envelope);
2080
2081        assert_eq!(summary.secondary_transaction_quantity, 7);
2082    }
2083
2084    #[tokio::test]
2085    async fn test_enforce_limit_logs_count() {
2086        let mut envelope = envelope![Log, Log];
2087
2088        let mock = mock_limiter(&[DataCategory::LogItem]);
2089        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
2090
2091        assert!(limits.is_limited());
2092        assert_eq!(envelope.envelope().len(), 0);
2093        mock.lock().await.assert_call(DataCategory::LogItem, 2);
2094        mock.lock().await.assert_call(DataCategory::LogByte, 20);
2095
2096        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogItem, 2)]);
2097    }
2098
2099    #[tokio::test]
2100    async fn test_enforce_limit_logs_bytes() {
2101        let mut envelope = envelope![Log, Log];
2102
2103        let mock = mock_limiter(&[DataCategory::LogByte]);
2104        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
2105
2106        assert!(limits.is_limited());
2107        assert_eq!(envelope.envelope().len(), 0);
2108        mock.lock().await.assert_call(DataCategory::LogItem, 2);
2109        mock.lock().await.assert_call(DataCategory::LogByte, 20);
2110
2111        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogByte, 20)]);
2112    }
2113
2114    #[tokio::test]
2115    async fn test_enforce_standalone_span_attachment() {
2116        let test_cases = &[
2117            RateLimitTestCase {
2118                name: "span_limit",
2119                denied_categories: &[DataCategory::Span],
2120                expect_attachment_limit_active: true,
2121                expected_limiter_calls: &[(DataCategory::Span, 0)],
2122                expected_outcomes: &[
2123                    (DataCategory::Attachment, 7),
2124                    (DataCategory::AttachmentItem, 1),
2125                ],
2126            },
2127            RateLimitTestCase {
2128                name: "span_indexed_limit",
2129                denied_categories: &[DataCategory::SpanIndexed],
2130                expect_attachment_limit_active: true,
2131                expected_limiter_calls: &[(DataCategory::Span, 0), (DataCategory::SpanIndexed, 0)],
2132                expected_outcomes: &[
2133                    (DataCategory::Attachment, 7),
2134                    (DataCategory::AttachmentItem, 1),
2135                ],
2136            },
2137            RateLimitTestCase {
2138                name: "attachment_limit",
2139                denied_categories: &[DataCategory::Attachment],
2140                expect_attachment_limit_active: true,
2141                expected_limiter_calls: &[
2142                    (DataCategory::Span, 0),
2143                    (DataCategory::SpanIndexed, 0),
2144                    (DataCategory::Attachment, 7),
2145                ],
2146                expected_outcomes: &[
2147                    (DataCategory::Attachment, 7),
2148                    (DataCategory::AttachmentItem, 1),
2149                ],
2150            },
2151            RateLimitTestCase {
2152                name: "attachment_indexed_limit",
2153                denied_categories: &[DataCategory::AttachmentItem],
2154                expect_attachment_limit_active: true,
2155                expected_limiter_calls: &[
2156                    (DataCategory::Span, 0),
2157                    (DataCategory::SpanIndexed, 0),
2158                    (DataCategory::Attachment, 7),
2159                    (DataCategory::AttachmentItem, 1),
2160                ],
2161                expected_outcomes: &[
2162                    (DataCategory::Attachment, 7),
2163                    (DataCategory::AttachmentItem, 1),
2164                ],
2165            },
2166            RateLimitTestCase {
2167                name: "transaction_limit",
2168                denied_categories: &[DataCategory::Transaction],
2169                expect_attachment_limit_active: false,
2170                expected_limiter_calls: &[
2171                    (DataCategory::Span, 0),
2172                    (DataCategory::SpanIndexed, 0),
2173                    (DataCategory::Attachment, 7),
2174                    (DataCategory::AttachmentItem, 1),
2175                ],
2176                expected_outcomes: &[],
2177            },
2178            RateLimitTestCase {
2179                name: "error_limit",
2180                denied_categories: &[DataCategory::Error],
2181                expect_attachment_limit_active: false,
2182                expected_limiter_calls: &[
2183                    (DataCategory::Span, 0),
2184                    (DataCategory::SpanIndexed, 0),
2185                    (DataCategory::Attachment, 7),
2186                    (DataCategory::AttachmentItem, 1),
2187                ],
2188                expected_outcomes: &[],
2189            },
2190            RateLimitTestCase {
2191                name: "no_limits",
2192                denied_categories: &[],
2193                expect_attachment_limit_active: false,
2194                expected_limiter_calls: &[
2195                    (DataCategory::Span, 0),
2196                    (DataCategory::SpanIndexed, 0),
2197                    (DataCategory::Attachment, 7),
2198                    (DataCategory::AttachmentItem, 1),
2199                ],
2200                expected_outcomes: &[],
2201            },
2202        ];
2203
2204        for RateLimitTestCase {
2205            name,
2206            denied_categories,
2207            expect_attachment_limit_active,
2208            expected_limiter_calls,
2209            expected_outcomes,
2210        } in test_cases
2211        {
2212            let mut envelope = envelope![];
2213            envelope
2214                .envelope_mut()
2215                .add_item(trace_attachment_item(7, Some(ParentId::SpanId(None))));
2216
2217            let mock = mock_limiter(denied_categories);
2218            let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
2219
2220            for &(category, quantity) in *expected_limiter_calls {
2221                mock.lock().await.assert_call(category, quantity);
2222            }
2223
2224            assert_eq!(
2225                enforcement.attachments_limits.span.bytes.is_active(),
2226                *expect_attachment_limit_active,
2227                "{name}: span_attachment byte limit mismatch"
2228            );
2229            assert_eq!(
2230                enforcement.attachments_limits.span.count.is_active(),
2231                *expect_attachment_limit_active,
2232                "{name}: span_attachment count limit mismatch"
2233            );
2234
2235            assert_eq!(
2236                get_outcomes(enforcement),
2237                *expected_outcomes,
2238                "{name}: outcome mismatch"
2239            );
2240        }
2241    }
2242
2243    #[tokio::test]
2244    async fn test_enforce_span_with_span_attachment() {
2245        let test_cases = &[
2246            RateLimitTestCase {
2247                name: "span_limit",
2248                denied_categories: &[DataCategory::Span, DataCategory::Attachment], // Attachment here has no effect
2249                expect_attachment_limit_active: true,
2250                expected_limiter_calls: &[(DataCategory::Span, 1)],
2251                expected_outcomes: &[
2252                    (DataCategory::Attachment, 7),
2253                    (DataCategory::AttachmentItem, 1),
2254                    (DataCategory::Span, 1),
2255                    (DataCategory::SpanIndexed, 1),
2256                ],
2257            },
2258            RateLimitTestCase {
2259                name: "span_indexed_limit",
2260                denied_categories: &[DataCategory::SpanIndexed, DataCategory::Attachment],
2261                expect_attachment_limit_active: true,
2262                expected_limiter_calls: &[(DataCategory::Span, 1), (DataCategory::SpanIndexed, 1)],
2263                expected_outcomes: &[
2264                    (DataCategory::Attachment, 7),
2265                    (DataCategory::AttachmentItem, 1),
2266                    (DataCategory::SpanIndexed, 1),
2267                ],
2268            },
2269            RateLimitTestCase {
2270                name: "attachment_limit",
2271                denied_categories: &[DataCategory::Attachment],
2272                expect_attachment_limit_active: true,
2273                expected_limiter_calls: &[
2274                    (DataCategory::Span, 1),
2275                    (DataCategory::SpanIndexed, 1),
2276                    (DataCategory::Attachment, 7),
2277                ],
2278                expected_outcomes: &[
2279                    (DataCategory::Attachment, 7),
2280                    (DataCategory::AttachmentItem, 1),
2281                ],
2282            },
2283            RateLimitTestCase {
2284                name: "attachment_indexed_limit",
2285                denied_categories: &[DataCategory::AttachmentItem],
2286                expect_attachment_limit_active: true,
2287                expected_limiter_calls: &[
2288                    (DataCategory::Span, 1),
2289                    (DataCategory::SpanIndexed, 1),
2290                    (DataCategory::Attachment, 7),
2291                    (DataCategory::AttachmentItem, 1),
2292                ],
2293                expected_outcomes: &[
2294                    (DataCategory::Attachment, 7),
2295                    (DataCategory::AttachmentItem, 1),
2296                ],
2297            },
2298            RateLimitTestCase {
2299                name: "transaction_limit",
2300                denied_categories: &[DataCategory::Transaction],
2301                expect_attachment_limit_active: false,
2302                expected_limiter_calls: &[
2303                    (DataCategory::Span, 1),
2304                    (DataCategory::SpanIndexed, 1),
2305                    (DataCategory::Attachment, 7),
2306                    (DataCategory::AttachmentItem, 1),
2307                ],
2308                expected_outcomes: &[],
2309            },
2310            RateLimitTestCase {
2311                name: "error_limit",
2312                denied_categories: &[DataCategory::Error],
2313                expect_attachment_limit_active: false,
2314                expected_limiter_calls: &[
2315                    (DataCategory::Span, 1),
2316                    (DataCategory::SpanIndexed, 1),
2317                    (DataCategory::Attachment, 7),
2318                    (DataCategory::AttachmentItem, 1),
2319                ],
2320                expected_outcomes: &[],
2321            },
2322            RateLimitTestCase {
2323                name: "no_limits",
2324                denied_categories: &[],
2325                expect_attachment_limit_active: false,
2326                expected_limiter_calls: &[
2327                    (DataCategory::Span, 1),
2328                    (DataCategory::SpanIndexed, 1),
2329                    (DataCategory::Attachment, 7),
2330                    (DataCategory::AttachmentItem, 1),
2331                ],
2332                expected_outcomes: &[],
2333            },
2334        ];
2335
2336        for RateLimitTestCase {
2337            name,
2338            denied_categories,
2339            expect_attachment_limit_active,
2340            expected_limiter_calls,
2341            expected_outcomes,
2342        } in test_cases
2343        {
2344            let mut envelope = envelope![Span];
2345            envelope
2346                .envelope_mut()
2347                .add_item(trace_attachment_item(7, Some(ParentId::SpanId(None))));
2348
2349            let mock = mock_limiter(denied_categories);
2350            let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
2351
2352            for &(category, quantity) in *expected_limiter_calls {
2353                mock.lock().await.assert_call(category, quantity);
2354            }
2355
2356            assert_eq!(
2357                enforcement.attachments_limits.span.bytes.is_active(),
2358                *expect_attachment_limit_active,
2359                "{name}: span_attachment byte limit mismatch"
2360            );
2361            assert_eq!(
2362                enforcement.attachments_limits.span.count.is_active(),
2363                *expect_attachment_limit_active,
2364                "{name}: span_attachment count limit mismatch"
2365            );
2366
2367            assert_eq!(
2368                get_outcomes(enforcement),
2369                *expected_outcomes,
2370                "{name}: outcome mismatch"
2371            );
2372        }
2373    }
2374
2375    #[tokio::test]
2376    async fn test_enforce_transaction_span_attachment() {
2377        let test_cases = &[
2378            RateLimitTestCase {
2379                name: "span_limit",
2380                denied_categories: &[DataCategory::Span],
2381                expect_attachment_limit_active: true,
2382                expected_limiter_calls: &[
2383                    (DataCategory::Transaction, 1),
2384                    (DataCategory::TransactionIndexed, 1),
2385                    (DataCategory::Span, 1),
2386                ],
2387                expected_outcomes: &[
2388                    (DataCategory::Attachment, 7),
2389                    (DataCategory::AttachmentItem, 1),
2390                    (DataCategory::Span, 1),
2391                    (DataCategory::SpanIndexed, 1),
2392                ],
2393            },
2394            RateLimitTestCase {
2395                name: "transaction_limit",
2396                denied_categories: &[DataCategory::Transaction],
2397                expect_attachment_limit_active: true,
2398                expected_limiter_calls: &[(DataCategory::Transaction, 1)],
2399                expected_outcomes: &[
2400                    (DataCategory::Transaction, 1),
2401                    (DataCategory::TransactionIndexed, 1),
2402                    (DataCategory::Attachment, 7),
2403                    (DataCategory::AttachmentItem, 1),
2404                    (DataCategory::Span, 1),
2405                    (DataCategory::SpanIndexed, 1),
2406                ],
2407            },
2408            RateLimitTestCase {
2409                name: "error_limit",
2410                denied_categories: &[DataCategory::Error],
2411                expect_attachment_limit_active: false,
2412                expected_limiter_calls: &[
2413                    (DataCategory::Transaction, 1),
2414                    (DataCategory::TransactionIndexed, 1),
2415                    (DataCategory::Span, 1),
2416                    (DataCategory::SpanIndexed, 1),
2417                    (DataCategory::Attachment, 7),
2418                    (DataCategory::AttachmentItem, 1),
2419                ],
2420                expected_outcomes: &[],
2421            },
2422            RateLimitTestCase {
2423                name: "no_limits",
2424                denied_categories: &[],
2425                expect_attachment_limit_active: false,
2426                expected_limiter_calls: &[
2427                    (DataCategory::Transaction, 1),
2428                    (DataCategory::TransactionIndexed, 1),
2429                    (DataCategory::Span, 1),
2430                    (DataCategory::SpanIndexed, 1),
2431                    (DataCategory::Attachment, 7),
2432                    (DataCategory::AttachmentItem, 1),
2433                ],
2434                expected_outcomes: &[],
2435            },
2436        ];
2437
2438        for RateLimitTestCase {
2439            name,
2440            denied_categories,
2441            expect_attachment_limit_active,
2442            expected_limiter_calls,
2443            expected_outcomes,
2444        } in test_cases
2445        {
2446            let mut envelope = envelope![Transaction];
2447            envelope
2448                .envelope_mut()
2449                .add_item(trace_attachment_item(7, Some(ParentId::SpanId(None))));
2450
2451            let mock = mock_limiter(denied_categories);
2452            let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
2453
2454            for &(category, quantity) in *expected_limiter_calls {
2455                mock.lock().await.assert_call(category, quantity);
2456            }
2457
2458            assert_eq!(
2459                enforcement.attachments_limits.span.bytes.is_active(),
2460                *expect_attachment_limit_active,
2461                "{name}: span_attachment byte limit mismatch"
2462            );
2463            assert_eq!(
2464                enforcement.attachments_limits.span.count.is_active(),
2465                *expect_attachment_limit_active,
2466                "{name}: span_attachment count limit mismatch"
2467            );
2468
2469            assert_eq!(
2470                get_outcomes(enforcement),
2471                *expected_outcomes,
2472                "{name}: outcome mismatch"
2473            );
2474        }
2475    }
2476
2477    #[tokio::test]
2478    async fn test_enforce_standalone_trace_attachment() {
2479        let test_cases = &[
2480            RateLimitTestCase {
2481                name: "attachment_limit",
2482                denied_categories: &[DataCategory::Attachment],
2483                expect_attachment_limit_active: true,
2484                expected_limiter_calls: &[(DataCategory::Attachment, 7)],
2485                expected_outcomes: &[
2486                    (DataCategory::Attachment, 7),
2487                    (DataCategory::AttachmentItem, 1),
2488                ],
2489            },
2490            RateLimitTestCase {
2491                name: "attachment_limit_and_attachment_item_limit",
2492                denied_categories: &[DataCategory::Attachment, DataCategory::AttachmentItem],
2493                expect_attachment_limit_active: true,
2494                expected_limiter_calls: &[(DataCategory::Attachment, 7)],
2495                expected_outcomes: &[
2496                    (DataCategory::Attachment, 7),
2497                    (DataCategory::AttachmentItem, 1),
2498                ],
2499            },
2500            RateLimitTestCase {
2501                name: "attachment_item_limit",
2502                denied_categories: &[DataCategory::AttachmentItem],
2503                expect_attachment_limit_active: true,
2504                expected_limiter_calls: &[
2505                    (DataCategory::Attachment, 7),
2506                    (DataCategory::AttachmentItem, 1),
2507                ],
2508                expected_outcomes: &[
2509                    (DataCategory::Attachment, 7),
2510                    (DataCategory::AttachmentItem, 1),
2511                ],
2512            },
2513            RateLimitTestCase {
2514                name: "no_limits",
2515                denied_categories: &[],
2516                expect_attachment_limit_active: false,
2517                expected_limiter_calls: &[
2518                    (DataCategory::Attachment, 7),
2519                    (DataCategory::AttachmentItem, 1),
2520                ],
2521                expected_outcomes: &[],
2522            },
2523        ];
2524
2525        for RateLimitTestCase {
2526            name,
2527            denied_categories,
2528            expect_attachment_limit_active,
2529            expected_limiter_calls,
2530            expected_outcomes,
2531        } in test_cases
2532        {
2533            let mut envelope = envelope![];
2534            envelope
2535                .envelope_mut()
2536                .add_item(trace_attachment_item(7, None));
2537
2538            let mock = mock_limiter(denied_categories);
2539            let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
2540
2541            for &(category, quantity) in *expected_limiter_calls {
2542                mock.lock().await.assert_call(category, quantity);
2543            }
2544
2545            assert_eq!(
2546                enforcement.attachments_limits.trace.bytes.is_active(),
2547                *expect_attachment_limit_active,
2548                "{name}: trace_attachment byte limit mismatch"
2549            );
2550            assert_eq!(
2551                enforcement.attachments_limits.trace.count.is_active(),
2552                *expect_attachment_limit_active,
2553                "{name}: trace_attachment count limit mismatch"
2554            );
2555
2556            assert_eq!(
2557                get_outcomes(enforcement),
2558                *expected_outcomes,
2559                "{name}: outcome mismatch"
2560            );
2561        }
2562    }
2563}