Skip to main content

relay_server/utils/
rate_limits.rs

1use std::fmt::{self, Write};
2use std::future::Future;
3use std::marker::PhantomData;
4
5use relay_profiling::ProfileType;
6use relay_quotas::{
7    DataCategory, ItemScoping, QuotaScope, RateLimit, RateLimitScope, RateLimits, ReasonCode,
8    Scoping,
9};
10use smallvec::SmallVec;
11
12use crate::envelope::{AttachmentParentType, AttachmentType, Envelope, Item, ItemType};
13use crate::integrations::Integration;
14use crate::managed::Managed;
15use crate::services::outcome::Outcome;
16
17/// Name of the rate limits header.
18pub const RATE_LIMITS_HEADER: &str = "X-Sentry-Rate-Limits";
19
20/// Formats the `X-Sentry-Rate-Limits` header.
21pub fn format_rate_limits(rate_limits: &RateLimits) -> String {
22    let mut header = String::new();
23
24    for rate_limit in rate_limits {
25        if !header.is_empty() {
26            header.push_str(", ");
27        }
28
29        write!(header, "{}:", rate_limit.retry_after.remaining_seconds()).ok();
30
31        for (index, category) in rate_limit.categories.iter().enumerate() {
32            if index > 0 {
33                header.push(';');
34            }
35            write!(header, "{category}").ok();
36        }
37
38        write!(header, ":{}", rate_limit.scope.name()).ok();
39
40        if let Some(ref reason_code) = rate_limit.reason_code {
41            write!(header, ":{reason_code}").ok();
42        } else if !rate_limit.namespaces.is_empty() {
43            write!(header, ":").ok(); // delimits the empty reason code for namespaces
44        }
45
46        for (index, namespace) in rate_limit.namespaces.iter().enumerate() {
47            header.push(if index == 0 { ':' } else { ';' });
48            write!(header, "{namespace}").ok();
49        }
50    }
51
52    header
53}
54
55/// Parses the `X-Sentry-Rate-Limits` header.
56pub fn parse_rate_limits(scoping: &Scoping, string: &str) -> RateLimits {
57    let mut rate_limits = RateLimits::new();
58
59    for limit in string.split(',') {
60        let limit = limit.trim();
61        if limit.is_empty() {
62            continue;
63        }
64
65        let mut components = limit.split(':');
66
67        let retry_after = match components.next().and_then(|s| s.parse().ok()) {
68            Some(retry_after) => retry_after,
69            None => continue,
70        };
71
72        let categories = components
73            .next()
74            .unwrap_or("")
75            .split(';')
76            .filter(|category| !category.is_empty())
77            .map(DataCategory::from_name)
78            .collect();
79
80        let quota_scope = QuotaScope::from_name(components.next().unwrap_or(""));
81        let scope = RateLimitScope::for_quota(*scoping, quota_scope);
82
83        let reason_code = components
84            .next()
85            .filter(|s| !s.is_empty())
86            .map(ReasonCode::new);
87
88        let namespace = components
89            .next()
90            .unwrap_or("")
91            .split(';')
92            .filter(|s| !s.is_empty())
93            .filter_map(|s| s.parse().ok())
94            .collect();
95
96        rate_limits.add(RateLimit {
97            categories,
98            scope,
99            reason_code,
100            retry_after,
101            namespaces: namespace,
102        });
103    }
104
105    rate_limits
106}
107
108/// Infer the data category from an item.
109///
110/// Categories depend mostly on the item type, with a few special cases:
111/// - `Event`: the category is inferred from the event type. This requires the `event_type` header
112///   to be set on the event item.
113/// - `Attachment`: If the attachment creates an event (e.g. for minidumps), the category is assumed
114///   to be `Error`.
115fn infer_event_category(item: &Item) -> Option<DataCategory> {
116    match item.ty() {
117        ItemType::Event => Some(DataCategory::Error),
118        ItemType::Transaction => Some(DataCategory::Transaction),
119        ItemType::Security | ItemType::RawSecurity => Some(DataCategory::Security),
120        ItemType::UnrealReport => Some(DataCategory::Error),
121        ItemType::UserReportV2 => Some(DataCategory::UserReportV2),
122        ItemType::Attachment if item.creates_event() => Some(DataCategory::Error),
123        ItemType::Attachment => None,
124        ItemType::Session => None,
125        ItemType::Sessions => None,
126        ItemType::Statsd => None,
127        ItemType::MetricBuckets => None,
128        ItemType::FormData => None,
129        ItemType::UserReport => None,
130        ItemType::Profile => None,
131        ItemType::ReplayEvent => None,
132        ItemType::ReplayRecording => None,
133        ItemType::ReplayVideo => None,
134        ItemType::ClientReport => None,
135        ItemType::CheckIn => None,
136        ItemType::Log => None,
137        ItemType::TraceMetric => None,
138        ItemType::Span => None,
139        ItemType::ProfileChunk => None,
140        ItemType::Integration => None,
141        ItemType::Unknown(_) => None,
142    }
143}
144
145/// Quantity metrics for a single category of attachments.
146///
147/// Tracks both the count of attachments and size in bytes.
148#[derive(Clone, Copy, Debug, Default)]
149pub struct AttachmentQuantity {
150    /// Number of attachment items.
151    pub count: usize,
152    /// Total size of attachments in bytes.
153    pub bytes: usize,
154}
155
156impl AttachmentQuantity {
157    pub fn is_empty(&self) -> bool {
158        self.count == 0 && self.bytes == 0
159    }
160}
161
162/// Aggregated attachment quantities grouped by [`AttachmentParentType`].
163///
164/// This separation is necessary since rate limiting logic varies by [`AttachmentParentType`].
165#[derive(Clone, Copy, Debug, Default)]
166pub struct AttachmentQuantities {
167    /// Quantities of Event Attachments.
168    ///
169    /// See also: [`AttachmentParentType::Event`].
170    pub event: AttachmentQuantity,
171    /// Quantities of trace V2 Attachments.
172    pub trace: AttachmentQuantity,
173    /// Quantities of span V2 Attachments.
174    pub span: AttachmentQuantity,
175}
176
177impl AttachmentQuantities {
178    /// Returns the total count of all attachments across all parent types.
179    pub fn count(&self) -> usize {
180        let AttachmentQuantities { event, trace, span } = self;
181        event.count + trace.count + span.count
182    }
183
184    /// Returns the total size in bytes of all attachments across all parent types.
185    pub fn bytes(&self) -> usize {
186        let AttachmentQuantities { event, trace, span } = self;
187        event.bytes + trace.bytes + span.bytes
188    }
189}
190
191/// Collection of all transaction profile quantities.
192#[derive(Clone, Copy, Debug, Default)]
193pub struct ProfileQuantities {
194    /// All transaction profiles in the backend category.
195    pub backend: usize,
196    /// All transaction profiles in the ui category.
197    pub ui: usize,
198    /// All transaction profiles, includes profiles in the backend and ui categories as well as
199    /// profiles which are in neither category.
200    pub total: usize,
201}
202
203/// A summary of `Envelope` contents.
204///
205/// Summarizes the contained event, size of attachments, session updates, and whether there are
206/// plain attachments. This is used for efficient rate limiting or outcome handling.
207#[non_exhaustive]
208#[derive(Clone, Copy, Debug, Default)]
209pub struct EnvelopeSummary {
210    /// The data category of the event in the envelope. `None` if there is no event.
211    pub event_category: Option<DataCategory>,
212
213    /// The quantities of all attachments combined.
214    pub attachment_quantities: AttachmentQuantities,
215
216    /// The number of all session updates.
217    pub session_quantity: usize,
218
219    /// The number of profiles.
220    pub profile_quantity: ProfileQuantities,
221
222    /// The number of replays.
223    pub replay_quantity: usize,
224
225    /// The number of user reports (legacy item type for user feedback).
226    pub user_report_quantity: usize,
227
228    /// The number of monitor check-ins.
229    pub monitor_quantity: usize,
230
231    /// The number of log for the log product sent.
232    pub log_item_quantity: usize,
233
234    /// The number of log bytes for the log product sent, in bytes
235    pub log_byte_quantity: usize,
236
237    /// Secondary number of transactions.
238    ///
239    /// This is 0 for envelopes which contain a transaction,
240    /// only secondary transaction quantity should be tracked here,
241    /// these are for example transaction counts extracted from metrics.
242    ///
243    /// A "primary" transaction is contained within the envelope,
244    /// marking the envelope data category a [`DataCategory::Transaction`].
245    pub secondary_transaction_quantity: usize,
246
247    /// See `secondary_transaction_quantity`.
248    pub secondary_span_quantity: usize,
249
250    /// The number of standalone spans.
251    pub span_quantity: usize,
252
253    /// Indicates that the envelope contains regular attachments that do not create event payloads.
254    pub has_plain_attachments: bool,
255
256    /// The payload size of this envelope.
257    pub payload_size: usize,
258
259    /// The number of profile chunks in this envelope.
260    pub profile_chunk_quantity: usize,
261    /// The number of UI profile chunks in this envelope.
262    pub profile_chunk_ui_quantity: usize,
263
264    /// The number of trace metrics in this envelope.
265    pub trace_metric_quantity: usize,
266
267    /// The number of trace metric bytes in this envelope.
268    pub trace_metric_byte_quantity: usize,
269}
270
271impl EnvelopeSummary {
272    /// Creates an empty summary.
273    pub fn empty() -> Self {
274        Self::default()
275    }
276
277    /// Creates an envelope summary and aggregates the given envelope.
278    pub fn compute(envelope: &Envelope) -> Self {
279        Self::compute_items(envelope.items())
280    }
281
282    pub fn compute_items<'a>(items: impl IntoIterator<Item = &'a Item>) -> Self {
283        let mut summary = Self::empty();
284
285        for item in items {
286            if item.creates_event() {
287                summary.infer_category(item);
288            } else if item.ty() == &ItemType::Attachment {
289                // Plain attachments do not create events.
290                summary.has_plain_attachments = true;
291            }
292
293            // If the item has been rate limited before, the quota has been consumed and outcomes
294            // emitted. We can skip it here.
295            if item.rate_limited() {
296                continue;
297            }
298
299            if let Some(source_quantities) = item.source_quantities() {
300                summary.secondary_transaction_quantity += source_quantities.transactions;
301                summary.secondary_span_quantity += source_quantities.spans;
302            }
303
304            summary.payload_size += item.len();
305
306            summary.add_quantities(item);
307
308            // Special case since v1 and v2 share a data category.
309            // Adding this in add_quantity would include v2 in the count.
310            if item.ty() == &ItemType::UserReport {
311                summary.user_report_quantity += 1;
312            }
313        }
314
315        summary
316    }
317
318    fn add_quantities(&mut self, item: &Item) {
319        // The Nintendo switch item is a special case which should've been modelled like the
320        // `Unreal4Context` as potentially a separate item type which does not have its own data
321        // category.
322        //
323        // Currently there is no outcome category for this item, as it will be dissolved into
324        // multiple different items once processed.
325        if item.attachment_type() == Some(AttachmentType::NintendoSwitchDyingMessage) {
326            return;
327        }
328
329        for (category, quantity) in item.quantities() {
330            let target_quantity = match category {
331                DataCategory::Attachment => match item.attachment_parent_type() {
332                    AttachmentParentType::Span => &mut self.attachment_quantities.span.bytes,
333                    AttachmentParentType::Trace => &mut self.attachment_quantities.trace.bytes,
334                    AttachmentParentType::Event => &mut self.attachment_quantities.event.bytes,
335                },
336                DataCategory::AttachmentItem => match item.attachment_parent_type() {
337                    AttachmentParentType::Span => &mut self.attachment_quantities.span.count,
338                    AttachmentParentType::Trace => &mut self.attachment_quantities.trace.count,
339                    AttachmentParentType::Event => &mut self.attachment_quantities.event.count,
340                },
341                DataCategory::Session => &mut self.session_quantity,
342                DataCategory::Profile => &mut self.profile_quantity.total,
343                DataCategory::ProfileBackend => &mut self.profile_quantity.backend,
344                DataCategory::ProfileUi => &mut self.profile_quantity.ui,
345                DataCategory::Replay => &mut self.replay_quantity,
346                DataCategory::DoNotUseReplayVideo => &mut self.replay_quantity,
347                DataCategory::Monitor => &mut self.monitor_quantity,
348                DataCategory::Span => &mut self.span_quantity,
349                DataCategory::TraceMetric => &mut self.trace_metric_quantity,
350                DataCategory::TraceMetricByte => &mut self.trace_metric_byte_quantity,
351                DataCategory::LogItem => &mut self.log_item_quantity,
352                DataCategory::LogByte => &mut self.log_byte_quantity,
353                DataCategory::ProfileChunk => &mut self.profile_chunk_quantity,
354                DataCategory::ProfileChunkUi => &mut self.profile_chunk_ui_quantity,
355                // TODO: This catch-all looks dangerous
356                _ => continue,
357            };
358            *target_quantity += quantity;
359        }
360    }
361
362    /// Infers the appropriate [`DataCategory`] for the envelope [`Item`].
363    ///
364    /// The inferred category is only applied to the [`EnvelopeSummary`] if there is not yet
365    /// a category set.
366    fn infer_category(&mut self, item: &Item) {
367        if matches!(self.event_category, None | Some(DataCategory::Default))
368            && let Some(category) = infer_event_category(item)
369        {
370            self.event_category = Some(category);
371        }
372    }
373
374    /// Returns `true` if the envelope contains items that depend on spans.
375    ///
376    /// This is used to determined if we should be checking span quota, as the quota should be
377    /// checked both if there are spans or if there are span dependent items (e.g. span attachments).
378    pub fn has_span_dependent_items(&self) -> bool {
379        !self.attachment_quantities.span.is_empty()
380    }
381}
382
383/// Rate limiting information for a data category.
384#[derive(Debug, Default, PartialEq)]
385#[cfg_attr(test, derive(Clone))]
386pub struct CategoryLimit {
387    /// The limited data category.
388    category: Option<DataCategory>,
389    /// Additional and optional data categories in which outcomes will be produced.
390    extra_outcome_categories: SmallVec<[DataCategory; 1]>,
391    /// The total rate limited quantity across all items.
392    ///
393    /// This will be `0` if nothing was rate limited.
394    quantity: usize,
395    /// The reason code of the applied rate limit.
396    ///
397    /// Defaults to `None` if the quota does not declare a reason code.
398    reason_code: Option<ReasonCode>,
399}
400
401impl CategoryLimit {
402    /// Creates a new `CategoryLimit`.
403    ///
404    /// Returns an inactive limit if `rate_limit` is `None`.
405    fn new(category: DataCategory, quantity: usize, rate_limit: Option<&RateLimit>) -> Self {
406        match rate_limit {
407            Some(limit) => Self {
408                category: Some(category),
409                quantity,
410                extra_outcome_categories: Default::default(),
411                reason_code: limit.reason_code.clone(),
412            },
413            None => Self::default(),
414        }
415    }
416
417    /// Adds an additional outcome in the specified category to the limit.
418    pub fn add_outcome_category(mut self, category: DataCategory) -> Self {
419        self.extra_outcome_categories.push(category);
420        self
421    }
422
423    /// Recreates the category limit, if active, for a new category with the same reason.
424    pub fn clone_for(&self, category: DataCategory, quantity: usize) -> CategoryLimit {
425        if !self.is_active() {
426            return Self::default();
427        }
428
429        Self {
430            category: Some(category),
431            extra_outcome_categories: Default::default(),
432            quantity,
433            reason_code: self.reason_code.clone(),
434        }
435    }
436
437    /// Returns `true` if this is an active limit.
438    ///
439    /// Inactive limits are placeholders with no category set.
440    pub fn is_active(&self) -> bool {
441        self.category.is_some()
442    }
443
444    fn outcomes(self) -> impl Iterator<Item = (Outcome, DataCategory, usize)> {
445        let Self {
446            category,
447            extra_outcome_categories,
448            quantity,
449            reason_code,
450        } = self;
451
452        if category.is_none() || quantity == 0 {
453            return either::Either::Left(std::iter::empty());
454        }
455
456        let outcomes = std::iter::chain(category, extra_outcome_categories).map(move |category| {
457            (
458                Outcome::RateLimited(reason_code.clone()),
459                category,
460                quantity,
461            )
462        });
463
464        either::Either::Right(outcomes)
465    }
466}
467
468/// Rate limiting information for a single category of attachments.
469#[derive(Default, Debug)]
470#[cfg_attr(test, derive(Clone))]
471pub struct AttachmentLimits {
472    /// Rate limit applied to attachment bytes ([`DataCategory::Attachment`]).
473    pub bytes: CategoryLimit,
474    /// Rate limit applied to attachment item count ([`DataCategory::AttachmentItem`]).
475    pub count: CategoryLimit,
476}
477
478impl AttachmentLimits {
479    fn is_active(&self) -> bool {
480        self.bytes.is_active() || self.count.is_active()
481    }
482}
483
484/// Rate limiting information for attachments grouped by [`AttachmentParentType`].
485///
486/// See [`AttachmentQuantities`] for the corresponding quantity tracking.
487#[derive(Default, Debug)]
488#[cfg_attr(test, derive(Clone))]
489pub struct AttachmentsLimits {
490    /// Limits for V1 Attachments.
491    pub event: AttachmentLimits,
492    /// Limits for trace V2 Attachments.
493    pub trace: AttachmentLimits,
494    /// Limits for span V2 Attachments.
495    pub span: AttachmentLimits,
496}
497
498/// Information on the limited quantities returned by [`EnvelopeLimiter::compute`].
499#[derive(Default, Debug)]
500#[cfg_attr(test, derive(Clone))]
501pub struct Enforcement {
502    /// The event item rate limit.
503    pub event: CategoryLimit,
504    /// The rate limit for the indexed category of the event.
505    pub event_indexed: CategoryLimit,
506    /// The attachments limits
507    pub attachments_limits: AttachmentsLimits,
508    /// The combined session item rate limit.
509    pub sessions: CategoryLimit,
510    /// The combined transaction profile item rate limits, for all transaction profiles.
511    ///
512    /// This is at least the sum of [`Self::profiles_backend`] and [`Self::profiles_ui`],
513    /// potentially more if there are profiles without a known platform.
514    pub profiles: CategoryLimit,
515    /// The combined backend transaction profile item rate limit.
516    pub profiles_backend: CategoryLimit,
517    /// The combined ui transaction profile item rate limit.
518    pub profiles_ui: CategoryLimit,
519    /// The rate limit for the indexed profiles category.
520    pub profiles_indexed: CategoryLimit,
521    /// The combined replay item rate limit.
522    pub replays: CategoryLimit,
523    /// The combined check-in item rate limit.
524    pub check_ins: CategoryLimit,
525    /// The combined logs (our product logs) rate limit.
526    pub log_items: CategoryLimit,
527    /// The combined logs (our product logs) rate limit.
528    pub log_bytes: CategoryLimit,
529    /// The combined spans rate limit.
530    pub spans: CategoryLimit,
531    /// The rate limit for the indexed span category.
532    pub spans_indexed: CategoryLimit,
533    /// The rate limit for user report v1.
534    pub user_reports: CategoryLimit,
535    /// The combined profile chunk item rate limit.
536    pub profile_chunks: CategoryLimit,
537    /// The combined profile chunk ui item rate limit.
538    pub profile_chunks_ui: CategoryLimit,
539    /// The combined trace metric item rate limit.
540    pub trace_metrics: CategoryLimit,
541    /// The combined trace metric byte rate limit.
542    pub trace_metrics_bytes: CategoryLimit,
543}
544
545impl Enforcement {
546    /// Returns the `CategoryLimit` for the event.
547    ///
548    /// `None` if the event is not rate limited.
549    pub fn active_event(&self) -> Option<&CategoryLimit> {
550        if self.event.is_active() {
551            Some(&self.event)
552        } else if self.event_indexed.is_active() {
553            Some(&self.event_indexed)
554        } else {
555            None
556        }
557    }
558
559    /// Returns `true` if the event is rate limited.
560    pub fn is_event_active(&self) -> bool {
561        self.active_event().is_some()
562    }
563
564    /// Helper for `track_outcomes`.
565    fn get_outcomes(self) -> impl Iterator<Item = (Outcome, DataCategory, usize)> {
566        let Self {
567            event,
568            event_indexed,
569            attachments_limits:
570                AttachmentsLimits {
571                    event:
572                        AttachmentLimits {
573                            bytes: event_attachment_bytes,
574                            count: event_attachment_item,
575                        },
576                    trace:
577                        AttachmentLimits {
578                            bytes: trace_attachment_bytes,
579                            count: trace_attachment_item,
580                        },
581                    span:
582                        AttachmentLimits {
583                            bytes: span_attachment_bytes,
584                            count: span_attachment_item,
585                        },
586                },
587            sessions: _, // Do not report outcomes for sessions.
588            profiles,
589            profiles_backend,
590            profiles_ui,
591            profiles_indexed,
592            replays,
593            check_ins,
594            log_items,
595            log_bytes,
596            spans,
597            spans_indexed,
598            user_reports,
599            profile_chunks,
600            profile_chunks_ui,
601            trace_metrics,
602            trace_metrics_bytes,
603        } = self;
604
605        let limits = [
606            event,
607            event_indexed,
608            event_attachment_bytes,
609            event_attachment_item,
610            trace_attachment_bytes,
611            trace_attachment_item,
612            span_attachment_bytes,
613            span_attachment_item,
614            profiles,
615            profiles_backend,
616            profiles_ui,
617            profiles_indexed,
618            replays,
619            check_ins,
620            log_items,
621            log_bytes,
622            spans,
623            spans_indexed,
624            user_reports,
625            profile_chunks,
626            profile_chunks_ui,
627            trace_metrics,
628            trace_metrics_bytes,
629        ];
630
631        limits.into_iter().flat_map(|limit| limit.outcomes())
632    }
633
634    /// Applies the [`Enforcement`] on the [`Envelope`] by removing all items that were rate limited
635    /// and emits outcomes for each rate limited category.
636    ///
637    /// # Example
638    ///
639    /// ## Interaction between Events and Attachments
640    ///
641    /// An envelope with an `Error` event and an `Attachment`. Two quotas specify to drop all
642    /// attachments (reason `"a"`) and all errors (reason `"e"`). The result of enforcement will be:
643    ///
644    /// 1. All items are removed from the envelope.
645    /// 2. Enforcements report both the event and the attachment dropped with reason `"e"`, since
646    ///    dropping an event automatically drops all attachments with the same reason.
647    /// 3. Rate limits report the single event limit `"e"`, since attachment limits do not need to
648    ///    be checked in this case.
649    ///
650    /// ## Required Attachments
651    ///
652    /// An envelope with a single Minidump `Attachment`, and a single quota specifying to drop all
653    /// attachments with reason `"a"`:
654    ///
655    /// 1. Since the minidump creates an event and is required for processing, it remains in the
656    ///    envelope and is marked as `rate_limited`.
657    /// 2. Enforcements report the attachment dropped with reason `"a"`.
658    /// 3. Rate limits are empty since it is allowed to send required attachments even when rate
659    ///    limited.
660    ///
661    /// ## Previously Rate Limited Attachments
662    ///
663    /// An envelope with a single item marked as `rate_limited`, and a quota specifying to drop
664    /// everything with reason `"d"`:
665    ///
666    /// 1. The item remains in the envelope.
667    /// 2. Enforcements are empty. Rate limiting has occurred at an earlier stage in the pipeline.
668    /// 3. Rate limits are empty.
669    pub fn apply_to_managed(self, envelope: &mut Managed<Box<Envelope>>) {
670        envelope.modify(|envelope, records| {
671            envelope.retain_items(|item| self.retain_item(item));
672
673            // Sessions currently do not emit any outcomes, but may be dropped.
674            records.lenient(DataCategory::Session);
675            // This is an existing bug in how user reports handle rate limits and emit outcomes.
676            //
677            // User report v1 and v2 (feedback) are counting into the same category, but that is not
678            // completely consistent leading to some mismatches when emitting outcomes from rate
679            // limiting vs how outcomes are counted on the `Managed` instance.
680            //
681            // Issue: <https://github.com/getsentry/relay/issues/5524>.
682            records.lenient(DataCategory::UserReportV2);
683
684            for (outcome, category, quantity) in self.get_outcomes() {
685                records.reject_err(outcome, (category, quantity))
686            }
687        });
688    }
689
690    /// Returns `true` when an [`Item`] can be retained, `false` otherwise.
691    fn retain_item(&self, item: &mut Item) -> bool {
692        // Remove event items and all items that depend on this event
693        if self.event.is_active() && item.requires_event() {
694            return false;
695        }
696
697        // When checking limits for categories that have an indexed variant,
698        // we only have to check the more specific, the indexed, variant
699        // to determine whether an item is limited.
700        match item.ty() {
701            ItemType::Attachment => {
702                match item.attachment_parent_type() {
703                    AttachmentParentType::Span => !self.attachments_limits.span.is_active(),
704                    AttachmentParentType::Trace => !self.attachments_limits.trace.is_active(),
705                    AttachmentParentType::Event => {
706                        if !self.attachments_limits.event.is_active() {
707                            return true;
708                        }
709                        if item.creates_event() {
710                            item.set_rate_limited(true);
711                            true
712                        } else {
713                            false
714                        }
715                    }
716                }
717            }
718            ItemType::Session => !self.sessions.is_active(),
719            ItemType::Profile => {
720                if self.profiles_indexed.is_active() {
721                    false
722                } else if let Some(platform) = item.profile_type() {
723                    match platform {
724                        ProfileType::Backend => !self.profiles_backend.is_active(),
725                        ProfileType::Ui => !self.profiles_ui.is_active(),
726                    }
727                } else {
728                    true
729                }
730            }
731            ItemType::ReplayEvent => !self.replays.is_active(),
732            ItemType::ReplayVideo => !self.replays.is_active(),
733            ItemType::ReplayRecording => !self.replays.is_active(),
734            ItemType::UserReport => !self.user_reports.is_active(),
735            ItemType::CheckIn => !self.check_ins.is_active(),
736            ItemType::Log => {
737                !(self.log_items.is_active() || self.log_bytes.is_active())
738            }
739            ItemType::Span => !self.spans_indexed.is_active(),
740            ItemType::ProfileChunk => match item.profile_type() {
741                Some(ProfileType::Backend) => !self.profile_chunks.is_active(),
742                Some(ProfileType::Ui) => !self.profile_chunks_ui.is_active(),
743                None => true,
744            },
745            ItemType::TraceMetric => !(self.trace_metrics.is_active() || self.trace_metrics_bytes.is_active()),
746            ItemType::Integration => match item.integration() {
747                Some(Integration::Logs(_)) => !(self.log_items.is_active() || self.log_bytes.is_active()),
748                Some(Integration::Spans(_)) => !self.spans_indexed.is_active(),
749                None => true,
750            },
751            ItemType::Event
752            | ItemType::Transaction
753            | ItemType::Security
754            | ItemType::FormData
755            | ItemType::RawSecurity
756            | ItemType::UnrealReport
757            | ItemType::Sessions
758            | ItemType::Statsd
759            | ItemType::MetricBuckets
760            | ItemType::ClientReport
761            | ItemType::UserReportV2  // This is an event type.
762            | ItemType::Unknown(_) => true,
763        }
764    }
765}
766
767/// Which limits to check with the [`EnvelopeLimiter`].
768#[derive(Debug, Copy, Clone)]
769pub enum CheckLimits {
770    /// Checks all limits except indexed categories.
771    ///
772    /// In the fast path it is necessary to apply cached rate limits but to not enforce indexed rate limits.
773    /// Because at the time of the check the decision whether an envelope is sampled or not is not yet known.
774    /// Additionally even if the item is later dropped by dynamic sampling, it must still be around to extract metrics
775    /// and cannot be dropped too early.
776    NonIndexed,
777}
778
779struct Check<F, E, R> {
780    limits: CheckLimits,
781    check: F,
782    _1: PhantomData<E>,
783    _2: PhantomData<R>,
784}
785
786impl<F, E, R> Check<F, E, R>
787where
788    F: FnMut(ItemScoping, usize) -> R,
789    R: Future<Output = Result<RateLimits, E>>,
790{
791    async fn apply(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, E> {
792        if matches!(self.limits, CheckLimits::NonIndexed) && scoping.category.is_indexed() {
793            return Ok(RateLimits::default());
794        }
795
796        (self.check)(scoping, quantity).await
797    }
798}
799
800/// Enforces rate limits with the given `check` function on items in the envelope.
801///
802/// The `check` function is called with the following rules:
803///  - Once for a single event, if present in the envelope.
804///  - Once for all comprised attachments, unless the event was rate limited.
805///  - Once for all comprised sessions.
806///
807/// Items violating the rate limit are removed from the envelope. This follows a set of rules:
808///  - If the event is removed, all items depending on the event are removed (e.g. attachments).
809///  - Attachments are not removed if they create events (e.g. minidumps).
810///  - Sessions are handled separately from all of the above.
811pub struct EnvelopeLimiter<F, E, R> {
812    check: Check<F, E, R>,
813    event_category: Option<DataCategory>,
814}
815
816impl<'a, F, E, R> EnvelopeLimiter<F, E, R>
817where
818    F: FnMut(ItemScoping, usize) -> R,
819    R: Future<Output = Result<RateLimits, E>>,
820{
821    /// Create a new `EnvelopeLimiter` with the given `check` function.
822    pub fn new(limits: CheckLimits, check: F) -> Self {
823        Self {
824            check: Check {
825                check,
826                limits,
827                _1: PhantomData,
828                _2: PhantomData,
829            },
830            event_category: None,
831        }
832    }
833
834    /// Process rate limits for the envelope, returning applied limits.
835    ///
836    /// Returns a tuple of `Enforcement` and `RateLimits`:
837    ///
838    /// - Enforcements declare the quantities of categories that have been rate limited with the
839    ///   individual reason codes that caused rate limiting. If multiple rate limits applied to a
840    ///   category, then the longest limit is reported.
841    /// - Rate limits declare all active rate limits, regardless of whether they have been applied
842    ///   to items in the envelope. This excludes rate limits applied to required attachments, since
843    ///   clients are allowed to continue sending them.
844    pub async fn compute(
845        mut self,
846        envelope: &Envelope,
847        scoping: &'a Scoping,
848    ) -> Result<(Enforcement, RateLimits), E> {
849        let mut summary = EnvelopeSummary::compute(envelope);
850        summary.event_category = self.event_category.or(summary.event_category);
851
852        let (enforcement, rate_limits) = self.execute(&summary, scoping).await?;
853        Ok((enforcement, rate_limits))
854    }
855
856    async fn execute(
857        &mut self,
858        summary: &EnvelopeSummary,
859        scoping: &'a Scoping,
860    ) -> Result<(Enforcement, RateLimits), E> {
861        let mut rate_limits = RateLimits::new();
862        let mut enforcement = Enforcement::default();
863
864        // Handle event.
865        if let Some(category) = summary.event_category {
866            // Check the broad category for limits.
867            let mut event_limits = self.check.apply(scoping.item(category), 1).await?;
868            enforcement.event = CategoryLimit::new(category, 1, event_limits.longest());
869
870            if let Some(index_category) = category.index_category() {
871                // Check the specific/indexed category for limits only if the specific one has not already
872                // an enforced limit.
873                if event_limits.is_empty() {
874                    event_limits.merge(self.check.apply(scoping.item(index_category), 1).await?);
875                }
876
877                enforcement.event_indexed =
878                    CategoryLimit::new(index_category, 1, event_limits.longest());
879            };
880
881            rate_limits.merge(event_limits);
882        }
883
884        // Handle spans.
885        if enforcement.is_event_active() {
886            enforcement.spans = enforcement
887                .event
888                .clone_for(DataCategory::Span, summary.span_quantity);
889
890            enforcement.spans_indexed = enforcement
891                .event_indexed
892                .clone_for(DataCategory::SpanIndexed, summary.span_quantity);
893        } else if summary.span_quantity > 0 || summary.has_span_dependent_items() {
894            let mut span_limits = self
895                .check
896                .apply(scoping.item(DataCategory::Span), summary.span_quantity)
897                .await?;
898            enforcement.spans = CategoryLimit::new(
899                DataCategory::Span,
900                summary.span_quantity,
901                span_limits.longest(),
902            );
903
904            if span_limits.is_empty() {
905                span_limits.merge(
906                    self.check
907                        .apply(
908                            scoping.item(DataCategory::SpanIndexed),
909                            summary.span_quantity,
910                        )
911                        .await?,
912                );
913            }
914
915            enforcement.spans_indexed = CategoryLimit::new(
916                DataCategory::SpanIndexed,
917                summary.span_quantity,
918                span_limits.longest(),
919            );
920
921            rate_limits.merge(span_limits);
922        }
923
924        // Handle span attachments
925        if enforcement.spans_indexed.is_active() {
926            enforcement.attachments_limits.span.bytes = enforcement.spans_indexed.clone_for(
927                DataCategory::Attachment,
928                summary.attachment_quantities.span.bytes,
929            );
930            enforcement.attachments_limits.span.count = enforcement.spans_indexed.clone_for(
931                DataCategory::AttachmentItem,
932                summary.attachment_quantities.span.count,
933            );
934        } else if !summary.attachment_quantities.span.is_empty() {
935            // While we could combine this check with the check that we do for event and trace
936            // attachments, this would complicate the logic so we opted against doing that.
937            // In practice the performance impact should be negligible since different types
938            // of attachments should rarely be send together.
939            enforcement.attachments_limits.span = self
940                .check_attachment_limits(scoping, &summary.attachment_quantities.span)
941                .await?;
942        }
943
944        // Handle attachments.
945        if let Some(limit) = enforcement.active_event() {
946            let limit1 = limit.clone_for(
947                DataCategory::Attachment,
948                summary.attachment_quantities.event.bytes,
949            );
950            let limit2 = limit.clone_for(
951                DataCategory::AttachmentItem,
952                summary.attachment_quantities.event.count,
953            );
954
955            enforcement.attachments_limits.event.bytes = limit1;
956            enforcement.attachments_limits.event.count = limit2;
957        } else {
958            let mut attachment_limits = RateLimits::new();
959            if summary.attachment_quantities.event.bytes > 0 {
960                let item_scoping = scoping.item(DataCategory::Attachment);
961
962                let attachment_byte_limits = self
963                    .check
964                    .apply(item_scoping, summary.attachment_quantities.event.bytes)
965                    .await?;
966
967                enforcement.attachments_limits.event.bytes = CategoryLimit::new(
968                    DataCategory::Attachment,
969                    summary.attachment_quantities.event.bytes,
970                    attachment_byte_limits.longest(),
971                );
972                enforcement.attachments_limits.event.count =
973                    enforcement.attachments_limits.event.bytes.clone_for(
974                        DataCategory::AttachmentItem,
975                        summary.attachment_quantities.event.count,
976                    );
977                attachment_limits.merge(attachment_byte_limits);
978            }
979            if !attachment_limits.is_limited() && summary.attachment_quantities.event.count > 0 {
980                let item_scoping = scoping.item(DataCategory::AttachmentItem);
981
982                let attachment_item_limits = self
983                    .check
984                    .apply(item_scoping, summary.attachment_quantities.event.count)
985                    .await?;
986
987                enforcement.attachments_limits.event.count = CategoryLimit::new(
988                    DataCategory::AttachmentItem,
989                    summary.attachment_quantities.event.count,
990                    attachment_item_limits.longest(),
991                );
992                enforcement.attachments_limits.event.bytes =
993                    enforcement.attachments_limits.event.count.clone_for(
994                        DataCategory::Attachment,
995                        summary.attachment_quantities.event.bytes,
996                    );
997                attachment_limits.merge(attachment_item_limits);
998            }
999
1000            // Only record rate limits for plain attachments. For all other attachments, it's
1001            // perfectly "legal" to send them. They will still be discarded in Sentry, but clients
1002            // can continue to send them.
1003            if summary.has_plain_attachments {
1004                rate_limits.merge(attachment_limits);
1005            }
1006        }
1007
1008        // Handle trace attachments.
1009        if !summary.attachment_quantities.trace.is_empty() {
1010            enforcement.attachments_limits.trace = self
1011                .check_attachment_limits(scoping, &summary.attachment_quantities.trace)
1012                .await?;
1013        }
1014
1015        // Handle sessions.
1016        if summary.session_quantity > 0 {
1017            let item_scoping = scoping.item(DataCategory::Session);
1018            let session_limits = self
1019                .check
1020                .apply(item_scoping, summary.session_quantity)
1021                .await?;
1022            enforcement.sessions = CategoryLimit::new(
1023                DataCategory::Session,
1024                summary.session_quantity,
1025                session_limits.longest(),
1026            );
1027            rate_limits.merge(session_limits);
1028        }
1029
1030        // Handle trace metrics.
1031        let mut trace_metric_limits = RateLimits::new();
1032        if summary.trace_metric_quantity > 0 {
1033            let item_scoping = scoping.item(DataCategory::TraceMetric);
1034            trace_metric_limits = self
1035                .check
1036                .apply(item_scoping, summary.trace_metric_quantity)
1037                .await?;
1038            enforcement.trace_metrics = CategoryLimit::new(
1039                DataCategory::TraceMetric,
1040                summary.trace_metric_quantity,
1041                trace_metric_limits.longest(),
1042            );
1043            enforcement.trace_metrics_bytes = CategoryLimit::new(
1044                DataCategory::TraceMetricByte,
1045                summary.trace_metric_byte_quantity,
1046                trace_metric_limits.longest(),
1047            );
1048        }
1049        if !trace_metric_limits.is_limited() && summary.trace_metric_byte_quantity > 0 {
1050            let item_scoping = scoping.item(DataCategory::TraceMetricByte);
1051            trace_metric_limits = self
1052                .check
1053                .apply(item_scoping, summary.trace_metric_byte_quantity)
1054                .await?;
1055            enforcement.trace_metrics = CategoryLimit::new(
1056                DataCategory::TraceMetric,
1057                summary.trace_metric_quantity,
1058                trace_metric_limits.longest(),
1059            );
1060            enforcement.trace_metrics_bytes = CategoryLimit::new(
1061                DataCategory::TraceMetricByte,
1062                summary.trace_metric_byte_quantity,
1063                trace_metric_limits.longest(),
1064            );
1065        }
1066        rate_limits.merge(trace_metric_limits);
1067
1068        // Handle logs.
1069        let mut log_limits = RateLimits::new();
1070        if summary.log_item_quantity > 0 {
1071            let item_scoping = scoping.item(DataCategory::LogItem);
1072            log_limits = self
1073                .check
1074                .apply(item_scoping, summary.log_item_quantity)
1075                .await?;
1076            enforcement.log_bytes = CategoryLimit::new(
1077                DataCategory::LogByte,
1078                summary.log_byte_quantity,
1079                log_limits.longest(),
1080            );
1081            enforcement.log_items = CategoryLimit::new(
1082                DataCategory::LogItem,
1083                summary.log_item_quantity,
1084                log_limits.longest(),
1085            );
1086        }
1087        if !log_limits.is_limited() && summary.log_byte_quantity > 0 {
1088            let item_scoping = scoping.item(DataCategory::LogByte);
1089            log_limits = self
1090                .check
1091                .apply(item_scoping, summary.log_byte_quantity)
1092                .await?;
1093            enforcement.log_bytes = CategoryLimit::new(
1094                DataCategory::LogByte,
1095                summary.log_byte_quantity,
1096                log_limits.longest(),
1097            );
1098            enforcement.log_items = CategoryLimit::new(
1099                DataCategory::LogItem,
1100                summary.log_item_quantity,
1101                log_limits.longest(),
1102            );
1103        }
1104        rate_limits.merge(log_limits);
1105
1106        // Handle profiles.
1107        if enforcement.is_event_active() {
1108            enforcement.profiles = enforcement
1109                .event
1110                .clone_for(DataCategory::Profile, summary.profile_quantity.total);
1111            enforcement.profiles_indexed = enforcement
1112                .event_indexed
1113                .clone_for(DataCategory::ProfileIndexed, summary.profile_quantity.total);
1114
1115            enforcement.profiles_backend = enforcement.event.clone_for(
1116                DataCategory::ProfileBackend,
1117                summary.profile_quantity.backend,
1118            );
1119            enforcement.profiles_ui = enforcement
1120                .event
1121                .clone_for(DataCategory::ProfileUi, summary.profile_quantity.ui);
1122        } else if summary.profile_quantity.total > 0 {
1123            let mut profile_limits = self
1124                .check
1125                .apply(
1126                    scoping.item(DataCategory::Profile),
1127                    summary.profile_quantity.total,
1128                )
1129                .await?;
1130
1131            // Profiles can persist in envelopes without transaction if the transaction item
1132            // was dropped by dynamic sampling.
1133            if profile_limits.is_empty() && summary.event_category.is_none() {
1134                profile_limits = self
1135                    .check
1136                    .apply(scoping.item(DataCategory::Transaction), 0)
1137                    .await?;
1138            }
1139
1140            enforcement.profiles = CategoryLimit::new(
1141                DataCategory::Profile,
1142                summary.profile_quantity.total,
1143                profile_limits.longest(),
1144            );
1145
1146            if enforcement.profiles.quantity == 0 {
1147                if summary.profile_quantity.backend > 0 {
1148                    let limit = self
1149                        .check
1150                        .apply(
1151                            scoping.item(DataCategory::ProfileBackend),
1152                            summary.profile_quantity.backend,
1153                        )
1154                        .await?;
1155
1156                    enforcement.profiles_backend = CategoryLimit::new(
1157                        DataCategory::ProfileBackend,
1158                        summary.profile_quantity.backend,
1159                        limit.longest(),
1160                    )
1161                    .add_outcome_category(DataCategory::Profile);
1162
1163                    profile_limits.merge(limit);
1164                }
1165                if summary.profile_quantity.ui > 0 {
1166                    let limit = self
1167                        .check
1168                        .apply(
1169                            scoping.item(DataCategory::ProfileUi),
1170                            summary.profile_quantity.ui,
1171                        )
1172                        .await?;
1173
1174                    enforcement.profiles_ui = CategoryLimit::new(
1175                        DataCategory::ProfileUi,
1176                        summary.profile_quantity.ui,
1177                        limit.longest(),
1178                    )
1179                    .add_outcome_category(DataCategory::Profile);
1180
1181                    profile_limits.merge(limit);
1182                }
1183            } else {
1184                enforcement.profiles_backend = CategoryLimit::new(
1185                    DataCategory::ProfileBackend,
1186                    summary.profile_quantity.backend,
1187                    profile_limits.longest(),
1188                );
1189                enforcement.profiles_ui = CategoryLimit::new(
1190                    DataCategory::ProfileUi,
1191                    summary.profile_quantity.ui,
1192                    profile_limits.longest(),
1193                );
1194            }
1195
1196            if enforcement.profiles.quantity > 0 {
1197                enforcement.profiles_indexed = enforcement
1198                    .profiles
1199                    .clone_for(DataCategory::ProfileIndexed, summary.profile_quantity.total);
1200            } else {
1201                let limit = self
1202                    .check
1203                    .apply(
1204                        scoping.item(DataCategory::ProfileIndexed),
1205                        summary.profile_quantity.total,
1206                    )
1207                    .await?;
1208
1209                if !limit.is_empty() {
1210                    enforcement.profiles_indexed = CategoryLimit::new(
1211                        DataCategory::ProfileIndexed,
1212                        summary.profile_quantity.total,
1213                        limit.longest(),
1214                    );
1215
1216                    profile_limits.merge(limit);
1217                } else {
1218                    enforcement.profiles_backend = enforcement
1219                        .profiles_backend
1220                        .add_outcome_category(DataCategory::ProfileIndexed);
1221                    enforcement.profiles_ui = enforcement
1222                        .profiles_ui
1223                        .add_outcome_category(DataCategory::ProfileIndexed);
1224                }
1225            }
1226
1227            rate_limits.merge(profile_limits);
1228        }
1229
1230        // Handle replays.
1231        if summary.replay_quantity > 0 {
1232            let item_scoping = scoping.item(DataCategory::Replay);
1233            let replay_limits = self
1234                .check
1235                .apply(item_scoping, summary.replay_quantity)
1236                .await?;
1237            enforcement.replays = CategoryLimit::new(
1238                DataCategory::Replay,
1239                summary.replay_quantity,
1240                replay_limits.longest(),
1241            );
1242            rate_limits.merge(replay_limits);
1243        }
1244
1245        // Handle user report v1s, which share limits with v2.
1246        if summary.user_report_quantity > 0 {
1247            let item_scoping = scoping.item(DataCategory::UserReportV2);
1248            let user_report_v2_limits = self
1249                .check
1250                .apply(item_scoping, summary.user_report_quantity)
1251                .await?;
1252            enforcement.user_reports = CategoryLimit::new(
1253                DataCategory::UserReportV2,
1254                summary.user_report_quantity,
1255                user_report_v2_limits.longest(),
1256            );
1257            rate_limits.merge(user_report_v2_limits);
1258        }
1259
1260        // Handle monitor checkins.
1261        if summary.monitor_quantity > 0 {
1262            let item_scoping = scoping.item(DataCategory::Monitor);
1263            let checkin_limits = self
1264                .check
1265                .apply(item_scoping, summary.monitor_quantity)
1266                .await?;
1267            enforcement.check_ins = CategoryLimit::new(
1268                DataCategory::Monitor,
1269                summary.monitor_quantity,
1270                checkin_limits.longest(),
1271            );
1272            rate_limits.merge(checkin_limits);
1273        }
1274
1275        // Handle profile chunks.
1276        if summary.profile_chunk_quantity > 0 {
1277            let item_scoping = scoping.item(DataCategory::ProfileChunk);
1278            let limits = self
1279                .check
1280                .apply(item_scoping, summary.profile_chunk_quantity)
1281                .await?;
1282            enforcement.profile_chunks = CategoryLimit::new(
1283                DataCategory::ProfileChunk,
1284                summary.profile_chunk_quantity,
1285                limits.longest(),
1286            );
1287            rate_limits.merge(limits);
1288        }
1289
1290        if summary.profile_chunk_ui_quantity > 0 {
1291            let item_scoping = scoping.item(DataCategory::ProfileChunkUi);
1292            let limits = self
1293                .check
1294                .apply(item_scoping, summary.profile_chunk_ui_quantity)
1295                .await?;
1296            enforcement.profile_chunks_ui = CategoryLimit::new(
1297                DataCategory::ProfileChunkUi,
1298                summary.profile_chunk_ui_quantity,
1299                limits.longest(),
1300            );
1301            rate_limits.merge(limits);
1302        }
1303
1304        Ok((enforcement, rate_limits))
1305    }
1306
1307    async fn check_attachment_limits(
1308        &mut self,
1309        scoping: &Scoping,
1310        quantities: &AttachmentQuantity,
1311    ) -> Result<AttachmentLimits, E> {
1312        let mut attachment_limits = self
1313            .check
1314            .apply(scoping.item(DataCategory::Attachment), quantities.bytes)
1315            .await?;
1316
1317        // Note: The check here is taken from the attachments logic for consistency I think just
1318        // checking `is_empty` should be fine?
1319        if !attachment_limits.is_limited() && quantities.count > 0 {
1320            attachment_limits.merge(
1321                self.check
1322                    .apply(scoping.item(DataCategory::AttachmentItem), quantities.count)
1323                    .await?,
1324            );
1325        }
1326
1327        Ok(AttachmentLimits {
1328            bytes: CategoryLimit::new(
1329                DataCategory::Attachment,
1330                quantities.bytes,
1331                attachment_limits.longest(),
1332            ),
1333            count: CategoryLimit::new(
1334                DataCategory::AttachmentItem,
1335                quantities.count,
1336                attachment_limits.longest(),
1337            ),
1338        })
1339    }
1340}
1341
1342impl<F, E, R> fmt::Debug for EnvelopeLimiter<F, E, R> {
1343    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1344        f.debug_struct("EnvelopeLimiter")
1345            .field("event_category", &self.event_category)
1346            .finish()
1347    }
1348}
1349
1350#[cfg(test)]
1351mod tests {
1352
1353    use std::collections::{BTreeMap, BTreeSet};
1354    use std::sync::Arc;
1355
1356    use relay_base_schema::organization::OrganizationId;
1357    use relay_base_schema::project::{ProjectId, ProjectKey};
1358    use relay_metrics::MetricNamespace;
1359    use relay_quotas::RetryAfter;
1360    use relay_system::Addr;
1361    use smallvec::smallvec;
1362    use tokio::sync::Mutex;
1363
1364    use super::*;
1365    use crate::envelope::ParentId;
1366    use crate::{
1367        envelope::{AttachmentType, ContentType, SourceQuantities},
1368        extractors::RequestMeta,
1369    };
1370
1371    struct RateLimitTestCase {
1372        name: &'static str,
1373        denied_categories: &'static [DataCategory],
1374        expect_attachment_limit_active: bool,
1375        expected_limiter_calls: &'static [(DataCategory, usize)],
1376        expected_outcomes: &'static [(DataCategory, usize)],
1377    }
1378
1379    #[tokio::test]
1380    async fn test_format_rate_limits() {
1381        let mut rate_limits = RateLimits::new();
1382
1383        // Add a generic rate limit for all categories.
1384        rate_limits.add(RateLimit {
1385            categories: Default::default(),
1386            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1387            reason_code: Some(ReasonCode::new("my_limit")),
1388            retry_after: RetryAfter::from_secs(42),
1389            namespaces: smallvec![],
1390        });
1391
1392        // Add a more specific rate limit for just one category.
1393        rate_limits.add(RateLimit {
1394            categories: [DataCategory::Transaction, DataCategory::Security].into(),
1395            scope: RateLimitScope::Project(ProjectId::new(21)),
1396            reason_code: None,
1397            retry_after: RetryAfter::from_secs(4711),
1398            namespaces: smallvec![],
1399        });
1400
1401        let formatted = format_rate_limits(&rate_limits);
1402        let expected = "42::organization:my_limit, 4711:transaction;security:project";
1403        assert_eq!(formatted, expected);
1404    }
1405
1406    #[tokio::test]
1407    async fn test_format_rate_limits_namespace() {
1408        let mut rate_limits = RateLimits::new();
1409
1410        // Rate limit with reason code and namespace.
1411        rate_limits.add(RateLimit {
1412            categories: [DataCategory::MetricBucket].into(),
1413            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1414            reason_code: Some(ReasonCode::new("my_limit")),
1415            retry_after: RetryAfter::from_secs(42),
1416            namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1417        });
1418
1419        // Rate limit without reason code.
1420        rate_limits.add(RateLimit {
1421            categories: [DataCategory::MetricBucket].into(),
1422            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1423            reason_code: None,
1424            retry_after: RetryAfter::from_secs(42),
1425            namespaces: smallvec![MetricNamespace::Spans],
1426        });
1427
1428        let formatted = format_rate_limits(&rate_limits);
1429        let expected = "42:metric_bucket:organization:my_limit:custom;spans, 42:metric_bucket:organization::spans";
1430        assert_eq!(formatted, expected);
1431    }
1432
1433    #[tokio::test]
1434    async fn test_parse_invalid_rate_limits() {
1435        let scoping = Scoping {
1436            organization_id: OrganizationId::new(42),
1437            project_id: ProjectId::new(21),
1438            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1439            key_id: Some(17),
1440        };
1441
1442        assert!(parse_rate_limits(&scoping, "").is_ok());
1443        assert!(parse_rate_limits(&scoping, "invalid").is_ok());
1444        assert!(parse_rate_limits(&scoping, ",,,").is_ok());
1445    }
1446
1447    #[tokio::test]
1448    async fn test_parse_rate_limits() {
1449        let scoping = Scoping {
1450            organization_id: OrganizationId::new(42),
1451            project_id: ProjectId::new(21),
1452            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1453            key_id: Some(17),
1454        };
1455
1456        // contains "foobar", an unknown scope that should be mapped to Unknown
1457        let formatted =
1458            "42::organization:my_limit, invalid, 4711:foobar;transaction;security:project";
1459        let rate_limits: Vec<RateLimit> =
1460            parse_rate_limits(&scoping, formatted).into_iter().collect();
1461
1462        assert_eq!(
1463            rate_limits,
1464            vec![
1465                RateLimit {
1466                    categories: Default::default(),
1467                    scope: RateLimitScope::Organization(OrganizationId::new(42)),
1468                    reason_code: Some(ReasonCode::new("my_limit")),
1469                    retry_after: rate_limits[0].retry_after,
1470                    namespaces: smallvec![],
1471                },
1472                RateLimit {
1473                    categories: [
1474                        DataCategory::Unknown,
1475                        DataCategory::Transaction,
1476                        DataCategory::Security,
1477                    ]
1478                    .into(),
1479                    scope: RateLimitScope::Project(ProjectId::new(21)),
1480                    reason_code: None,
1481                    retry_after: rate_limits[1].retry_after,
1482                    namespaces: smallvec![],
1483                }
1484            ]
1485        );
1486
1487        assert_eq!(42, rate_limits[0].retry_after.remaining_seconds());
1488        assert_eq!(4711, rate_limits[1].retry_after.remaining_seconds());
1489    }
1490
1491    #[tokio::test]
1492    async fn test_parse_rate_limits_namespace() {
1493        let scoping = Scoping {
1494            organization_id: OrganizationId::new(42),
1495            project_id: ProjectId::new(21),
1496            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1497            key_id: Some(17),
1498        };
1499
1500        let formatted = "42:metric_bucket:organization::custom;spans";
1501        let rate_limits: Vec<RateLimit> =
1502            parse_rate_limits(&scoping, formatted).into_iter().collect();
1503
1504        assert_eq!(
1505            rate_limits,
1506            vec![RateLimit {
1507                categories: [DataCategory::MetricBucket].into(),
1508                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1509                reason_code: None,
1510                retry_after: rate_limits[0].retry_after,
1511                namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1512            }]
1513        );
1514    }
1515
1516    #[tokio::test]
1517    async fn test_parse_rate_limits_empty_namespace() {
1518        let scoping = Scoping {
1519            organization_id: OrganizationId::new(42),
1520            project_id: ProjectId::new(21),
1521            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1522            key_id: Some(17),
1523        };
1524
1525        // notice the trailing colon
1526        let formatted = "42:metric_bucket:organization:some_reason:";
1527        let rate_limits: Vec<RateLimit> =
1528            parse_rate_limits(&scoping, formatted).into_iter().collect();
1529
1530        assert_eq!(
1531            rate_limits,
1532            vec![RateLimit {
1533                categories: [DataCategory::MetricBucket].into(),
1534                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1535                reason_code: Some(ReasonCode::new("some_reason")),
1536                retry_after: rate_limits[0].retry_after,
1537                namespaces: smallvec![],
1538            }]
1539        );
1540    }
1541
1542    #[tokio::test]
1543    async fn test_parse_rate_limits_only_unknown() {
1544        let scoping = Scoping {
1545            organization_id: OrganizationId::new(42),
1546            project_id: ProjectId::new(21),
1547            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1548            key_id: Some(17),
1549        };
1550
1551        let formatted = "42:foo;bar:organization";
1552        let rate_limits: Vec<RateLimit> =
1553            parse_rate_limits(&scoping, formatted).into_iter().collect();
1554
1555        assert_eq!(
1556            rate_limits,
1557            vec![RateLimit {
1558                categories: [DataCategory::Unknown, DataCategory::Unknown].into(),
1559                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1560                reason_code: None,
1561                retry_after: rate_limits[0].retry_after,
1562                namespaces: smallvec![],
1563            },]
1564        );
1565    }
1566
1567    macro_rules! envelope {
1568        ($( $item_type:ident $( :: $attachment_type:ident )? ),*) => {{
1569            let bytes = "{\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}";
1570            #[allow(unused_mut)]
1571            let mut envelope = Envelope::parse_bytes(bytes.into()).unwrap();
1572            $(
1573                let mut item = Item::new(ItemType::$item_type);
1574                item.set_payload(ContentType::OctetStream, "0123456789");
1575                $( item.set_attachment_type(AttachmentType::$attachment_type); )?
1576                envelope.add_item(item);
1577            )*
1578
1579            envelope
1580        }}
1581    }
1582
1583    fn rate_limit(category: DataCategory) -> RateLimit {
1584        RateLimit {
1585            categories: [category].into(),
1586            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1587            reason_code: None,
1588            retry_after: RetryAfter::from_secs(60),
1589            namespaces: smallvec![],
1590        }
1591    }
1592
1593    fn trace_attachment_item(bytes: usize, parent_id: Option<ParentId>) -> Item {
1594        let mut item = Item::new(ItemType::Attachment);
1595        item.set_payload(ContentType::TraceAttachment, "0".repeat(bytes));
1596        item.set_parent_id(parent_id);
1597        item
1598    }
1599
1600    #[derive(Debug, Default)]
1601    struct MockLimiter {
1602        denied: Vec<DataCategory>,
1603        called: BTreeMap<DataCategory, usize>,
1604        checked: BTreeSet<DataCategory>,
1605    }
1606
1607    impl MockLimiter {
1608        pub fn deny(mut self, category: DataCategory) -> Self {
1609            self.denied.push(category);
1610            self
1611        }
1612
1613        pub fn check(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, ()> {
1614            let cat = scoping.category;
1615            let previous = self.called.insert(cat, quantity);
1616            assert!(previous.is_none(), "rate limiter invoked twice for {cat}");
1617
1618            let mut limits = RateLimits::new();
1619            if self.denied.contains(&cat) {
1620                limits.add(rate_limit(cat));
1621            }
1622            Ok(limits)
1623        }
1624
1625        #[track_caller]
1626        pub fn assert_call(&mut self, category: DataCategory, expected: usize) {
1627            self.checked.insert(category);
1628
1629            let quantity = self.called.get(&category).copied();
1630            assert_eq!(
1631                quantity,
1632                Some(expected),
1633                "Expected quantity `{expected}` for data category `{category}`, got {quantity:?}."
1634            );
1635        }
1636    }
1637
1638    impl Drop for MockLimiter {
1639        fn drop(&mut self) {
1640            if std::thread::panicking() {
1641                return;
1642            }
1643
1644            for checked in &self.checked {
1645                self.called.remove(checked);
1646            }
1647
1648            if self.called.is_empty() {
1649                return;
1650            }
1651
1652            let not_asserted = self
1653                .called
1654                .iter()
1655                .map(|(k, v)| format!("- {k}: {v}"))
1656                .collect::<Vec<_>>()
1657                .join("\n");
1658
1659            panic!("Following calls to the limiter were not asserted:\n{not_asserted}");
1660        }
1661    }
1662
1663    async fn enforce_and_apply(
1664        mock: Arc<Mutex<MockLimiter>>,
1665        envelope: Box<Envelope>,
1666    ) -> (Box<Envelope>, Enforcement, RateLimits) {
1667        let mut envelope = Managed::from_envelope(envelope, Addr::custom().0);
1668        let scoping = envelope.scoping();
1669
1670        #[allow(unused_mut)]
1671        let mut limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, move |s, q| {
1672            let mock = mock.clone();
1673            async move {
1674                let mut mock = mock.lock().await;
1675                mock.check(s, q)
1676            }
1677        });
1678
1679        let (enforcement, limits) = limiter.compute(&envelope, &scoping).await.unwrap();
1680
1681        // We implemented `clone` only for tests because we don't want to make `apply_with_outcomes`
1682        // &self because we want move semantics to prevent double tracking.
1683        enforcement.clone().apply_to_managed(&mut envelope);
1684        let envelope = envelope.accept(|envelope| envelope);
1685
1686        (envelope, enforcement, limits)
1687    }
1688
1689    fn mock_limiter(categories: &[DataCategory]) -> Arc<Mutex<MockLimiter>> {
1690        let mut mock = MockLimiter::default();
1691        for &category in categories {
1692            mock = mock.deny(category);
1693        }
1694
1695        Arc::new(Mutex::new(mock))
1696    }
1697
1698    #[tokio::test]
1699    async fn test_enforce_pass_empty() {
1700        let envelope = envelope![];
1701
1702        let mock = mock_limiter(&[]);
1703        let (envelope, _, limits) = enforce_and_apply(mock, envelope).await;
1704
1705        assert!(!limits.is_limited());
1706        assert!(envelope.is_empty());
1707    }
1708
1709    #[tokio::test]
1710    async fn test_enforce_limit_error_event() {
1711        let envelope = envelope![Event];
1712
1713        let mock = mock_limiter(&[DataCategory::Error]);
1714        let (envelope, _, limits) = enforce_and_apply(mock.clone(), envelope).await;
1715
1716        assert!(limits.is_limited());
1717        assert!(envelope.is_empty());
1718        mock.lock().await.assert_call(DataCategory::Error, 1);
1719    }
1720
1721    #[tokio::test]
1722    async fn test_enforce_limit_error_with_attachments() {
1723        let envelope = envelope![Event, Attachment];
1724
1725        let mock = mock_limiter(&[DataCategory::Error]);
1726        let (envelope, _, limits) = enforce_and_apply(mock.clone(), envelope).await;
1727
1728        assert!(limits.is_limited());
1729        assert!(envelope.is_empty());
1730        mock.lock().await.assert_call(DataCategory::Error, 1);
1731    }
1732
1733    #[tokio::test]
1734    async fn test_enforce_limit_minidump() {
1735        let envelope = envelope![Attachment::Minidump];
1736
1737        let mock = mock_limiter(&[DataCategory::Error]);
1738        let (envelope, _, limits) = enforce_and_apply(mock.clone(), envelope).await;
1739
1740        assert!(limits.is_limited());
1741        assert!(envelope.is_empty());
1742        mock.lock().await.assert_call(DataCategory::Error, 1);
1743    }
1744
1745    #[tokio::test]
1746    async fn test_enforce_limit_attachments() {
1747        let envelope = envelope![Attachment::Minidump, Attachment];
1748
1749        let mock = mock_limiter(&[DataCategory::Attachment]);
1750        let (envelope, _, limits) = enforce_and_apply(mock.clone(), envelope).await;
1751
1752        // Attachments would be limited, but crash reports create events and are thus allowed.
1753        assert!(limits.is_limited());
1754        assert_eq!(envelope.len(), 1);
1755        mock.lock().await.assert_call(DataCategory::Error, 1);
1756        mock.lock().await.assert_call(DataCategory::Attachment, 20);
1757    }
1758
1759    /// Limit stand-alone profiles.
1760    #[tokio::test]
1761    async fn test_enforce_limit_profiles() {
1762        let envelope = envelope![Profile, Profile];
1763
1764        let mock = mock_limiter(&[DataCategory::Profile]);
1765        let (envelope, enforcement, limits) = enforce_and_apply(mock.clone(), envelope).await;
1766
1767        assert!(limits.is_limited());
1768        assert_eq!(envelope.len(), 0);
1769        mock.lock().await.assert_call(DataCategory::Profile, 2);
1770
1771        assert_eq!(
1772            get_outcomes(enforcement),
1773            vec![
1774                (DataCategory::Profile, 2),
1775                (DataCategory::ProfileIndexed, 2)
1776            ]
1777        );
1778    }
1779
1780    /// Limit profile chunks.
1781    #[tokio::test]
1782    async fn test_enforce_limit_profile_chunks_no_profile_type() {
1783        // In this test we have profile chunks which have not yet been classified, which means they
1784        // should not be rate limited.
1785        let envelope = envelope![ProfileChunk, ProfileChunk];
1786
1787        let mock = mock_limiter(&[DataCategory::ProfileChunk]);
1788        let (envelope, enforcement, limits) = enforce_and_apply(mock.clone(), envelope).await;
1789        assert!(!limits.is_limited());
1790        assert_eq!(get_outcomes(enforcement), vec![]);
1791
1792        let mock = mock_limiter(&[DataCategory::ProfileChunkUi]);
1793        let (envelope, enforcement, limits) = enforce_and_apply(mock.clone(), envelope).await;
1794        assert!(!limits.is_limited());
1795        assert_eq!(get_outcomes(enforcement), vec![]);
1796
1797        assert_eq!(envelope.len(), 2);
1798    }
1799
1800    #[tokio::test]
1801    async fn test_enforce_limit_profile_chunks_ui() {
1802        let mut envelope = envelope![];
1803
1804        let mut item = Item::new(ItemType::ProfileChunk);
1805        item.set_platform("python".to_owned());
1806        envelope.add_item(item);
1807        let mut item = Item::new(ItemType::ProfileChunk);
1808        item.set_platform("javascript".to_owned());
1809        envelope.add_item(item);
1810
1811        let mock = mock_limiter(&[DataCategory::ProfileChunkUi]);
1812        let (envelope, enforcement, limits) = enforce_and_apply(mock.clone(), envelope).await;
1813
1814        assert!(limits.is_limited());
1815        assert_eq!(envelope.len(), 1);
1816        mock.lock()
1817            .await
1818            .assert_call(DataCategory::ProfileChunkUi, 1);
1819        mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1820
1821        assert_eq!(
1822            get_outcomes(enforcement),
1823            vec![(DataCategory::ProfileChunkUi, 1)]
1824        );
1825    }
1826
1827    #[tokio::test]
1828    async fn test_enforce_limit_profile_chunks_backend() {
1829        let mut envelope = envelope![];
1830
1831        let mut item = Item::new(ItemType::ProfileChunk);
1832        item.set_platform("python".to_owned());
1833        envelope.add_item(item);
1834        let mut item = Item::new(ItemType::ProfileChunk);
1835        item.set_platform("javascript".to_owned());
1836        envelope.add_item(item);
1837
1838        let mock = mock_limiter(&[DataCategory::ProfileChunk]);
1839        let (envelope, enforcement, limits) = enforce_and_apply(mock.clone(), envelope).await;
1840
1841        assert!(limits.is_limited());
1842        assert_eq!(envelope.len(), 1);
1843        mock.lock()
1844            .await
1845            .assert_call(DataCategory::ProfileChunkUi, 1);
1846        mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1847
1848        assert_eq!(
1849            get_outcomes(enforcement),
1850            vec![(DataCategory::ProfileChunk, 1)]
1851        );
1852    }
1853
1854    /// Limit replays.
1855    #[tokio::test]
1856    async fn test_enforce_limit_replays() {
1857        let envelope = envelope![ReplayEvent, ReplayRecording, ReplayVideo];
1858
1859        let mock = mock_limiter(&[DataCategory::Replay]);
1860        let (envelope, enforcement, limits) = enforce_and_apply(mock.clone(), envelope).await;
1861
1862        assert!(limits.is_limited());
1863        assert_eq!(envelope.len(), 0);
1864        mock.lock().await.assert_call(DataCategory::Replay, 3);
1865
1866        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Replay, 3),]);
1867    }
1868
1869    /// Limit monitor checkins.
1870    #[tokio::test]
1871    async fn test_enforce_limit_monitor_checkins() {
1872        let envelope = envelope![CheckIn];
1873
1874        let mock = mock_limiter(&[DataCategory::Monitor]);
1875        let (envelope, enforcement, limits) = enforce_and_apply(mock.clone(), envelope).await;
1876
1877        assert!(limits.is_limited());
1878        assert_eq!(envelope.len(), 0);
1879        mock.lock().await.assert_call(DataCategory::Monitor, 1);
1880
1881        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Monitor, 1)])
1882    }
1883
1884    #[tokio::test]
1885    async fn test_enforce_pass_minidump() {
1886        let envelope = envelope![Attachment::Minidump];
1887
1888        let mock = mock_limiter(&[DataCategory::Attachment]);
1889        let (envelope, _, limits) = enforce_and_apply(mock.clone(), envelope).await;
1890
1891        // If only crash report attachments are present, we don't emit a rate limit.
1892        assert!(!limits.is_limited());
1893        assert_eq!(envelope.len(), 1);
1894        mock.lock().await.assert_call(DataCategory::Error, 1);
1895        mock.lock().await.assert_call(DataCategory::Attachment, 10);
1896    }
1897
1898    #[tokio::test]
1899    async fn test_enforce_skip_rate_limited() {
1900        let mut envelope = envelope![];
1901
1902        let mut item = Item::new(ItemType::Attachment);
1903        item.set_payload(ContentType::OctetStream, "0123456789");
1904        item.set_rate_limited(true);
1905        envelope.add_item(item);
1906
1907        let mock = mock_limiter(&[DataCategory::Error]);
1908        let (envelope, _, limits) = enforce_and_apply(mock, envelope).await;
1909
1910        assert!(!limits.is_limited()); // No new rate limits applied.
1911        assert_eq!(envelope.len(), 1); // The item was retained
1912    }
1913
1914    #[tokio::test]
1915    async fn test_enforce_pass_sessions() {
1916        let envelope = envelope![Session, Session, Session];
1917
1918        let mock = mock_limiter(&[DataCategory::Error]);
1919        let (envelope, _, limits) = enforce_and_apply(mock.clone(), envelope).await;
1920
1921        // If only crash report attachments are present, we don't emit a rate limit.
1922        assert!(!limits.is_limited());
1923        assert_eq!(envelope.len(), 3);
1924        mock.lock().await.assert_call(DataCategory::Session, 3);
1925    }
1926
1927    #[tokio::test]
1928    async fn test_enforce_limit_sessions() {
1929        let envelope = envelope![Session, Session, Event];
1930
1931        let mock = mock_limiter(&[DataCategory::Session]);
1932        let (envelope, _, limits) = enforce_and_apply(mock.clone(), envelope).await;
1933
1934        // If only crash report attachments are present, we don't emit a rate limit.
1935        assert!(limits.is_limited());
1936        assert_eq!(envelope.len(), 1);
1937        mock.lock().await.assert_call(DataCategory::Error, 1);
1938        mock.lock().await.assert_call(DataCategory::Session, 2);
1939    }
1940
1941    #[tokio::test]
1942    #[cfg(feature = "processing")]
1943    async fn test_enforce_limit_assumed_event() {
1944        let envelope = envelope![Transaction];
1945
1946        let mock = mock_limiter(&[DataCategory::Transaction]);
1947        let (envelope, _, limits) = enforce_and_apply(mock.clone(), envelope).await;
1948
1949        assert!(limits.is_limited());
1950        assert!(envelope.is_empty()); // obviously
1951        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1952    }
1953
1954    #[tokio::test]
1955    #[cfg(feature = "processing")]
1956    async fn test_enforce_limit_assumed_attachments() {
1957        let envelope = envelope![Event, Attachment, Attachment];
1958
1959        let mock = mock_limiter(&[DataCategory::Error]);
1960        let (envelope, _, limits) = enforce_and_apply(mock.clone(), envelope).await;
1961
1962        assert!(limits.is_limited());
1963        assert!(envelope.is_empty());
1964        mock.lock().await.assert_call(DataCategory::Error, 1);
1965    }
1966
1967    #[tokio::test]
1968    async fn test_enforce_transaction() {
1969        let envelope = envelope![Transaction];
1970
1971        let mock = mock_limiter(&[DataCategory::Transaction]);
1972        let (_, enforcement, limits) = enforce_and_apply(mock.clone(), envelope).await;
1973
1974        assert!(limits.is_limited());
1975        assert!(enforcement.event_indexed.is_active());
1976        assert!(enforcement.event.is_active());
1977        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1978
1979        assert_eq!(
1980            get_outcomes(enforcement),
1981            vec![
1982                (DataCategory::Transaction, 1),
1983                (DataCategory::TransactionIndexed, 1),
1984                (DataCategory::Span, 1),
1985                (DataCategory::SpanIndexed, 1),
1986            ]
1987        );
1988    }
1989
1990    #[tokio::test]
1991    async fn test_enforce_transaction_non_indexed() {
1992        let envelope = envelope![Transaction, Profile];
1993        let scoping = envelope
1994            .headers()
1995            .meta()
1996            .get_partial_scoping()
1997            .into_scoping();
1998
1999        let mock = mock_limiter(&[DataCategory::TransactionIndexed]);
2000
2001        let mock_clone = mock.clone();
2002        let limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, move |s, q| {
2003            let mock_clone = mock_clone.clone();
2004            async move {
2005                let mut mock = mock_clone.lock().await;
2006                mock.check(s, q)
2007            }
2008        });
2009        let (enforcement, limits) = limiter.compute(&envelope, &scoping).await.unwrap();
2010
2011        assert!(!limits.is_limited());
2012        assert!(!enforcement.event_indexed.is_active());
2013        assert!(!enforcement.event.is_active());
2014        assert!(!enforcement.profiles_indexed.is_active());
2015        assert!(!enforcement.profiles.is_active());
2016        assert!(!enforcement.spans.is_active());
2017        assert!(!enforcement.spans_indexed.is_active());
2018        mock.lock().await.assert_call(DataCategory::Transaction, 1);
2019        mock.lock().await.assert_call(DataCategory::Profile, 1);
2020        mock.lock().await.assert_call(DataCategory::Span, 1);
2021    }
2022
2023    #[tokio::test]
2024    async fn test_enforce_transaction_no_indexing_quota() {
2025        let envelope = envelope![Transaction];
2026
2027        let mock = mock_limiter(&[DataCategory::TransactionIndexed]);
2028        let (_, enforcement, limits) = enforce_and_apply(mock.clone(), envelope).await;
2029
2030        assert!(!limits.is_limited());
2031        assert!(!enforcement.event_indexed.is_active());
2032        assert!(!enforcement.event.is_active());
2033        mock.lock().await.assert_call(DataCategory::Transaction, 1);
2034        mock.lock().await.assert_call(DataCategory::Span, 1);
2035    }
2036
2037    #[tokio::test]
2038    async fn test_enforce_transaction_attachment_enforced() {
2039        let envelope = envelope![Transaction, Attachment];
2040
2041        let mock = mock_limiter(&[DataCategory::Transaction]);
2042        let (_, enforcement, _) = enforce_and_apply(mock.clone(), envelope).await;
2043
2044        assert!(enforcement.event.is_active());
2045        assert!(enforcement.attachments_limits.event.is_active());
2046        mock.lock().await.assert_call(DataCategory::Transaction, 1);
2047    }
2048
2049    fn get_outcomes(enforcement: Enforcement) -> Vec<(DataCategory, usize)> {
2050        enforcement
2051            .get_outcomes()
2052            .map(|(_, data_category, quantity)| (data_category, quantity))
2053            .collect::<Vec<_>>()
2054    }
2055
2056    #[tokio::test]
2057    async fn test_enforce_transaction_profile_enforced() {
2058        let envelope = envelope![Transaction, Profile];
2059
2060        let mock = mock_limiter(&[DataCategory::Transaction]);
2061        let (_, enforcement, _) = enforce_and_apply(mock.clone(), envelope).await;
2062
2063        assert!(enforcement.event.is_active());
2064        assert!(enforcement.profiles.is_active());
2065        mock.lock().await.assert_call(DataCategory::Transaction, 1);
2066
2067        assert_eq!(
2068            get_outcomes(enforcement),
2069            vec![
2070                (DataCategory::Transaction, 1),
2071                (DataCategory::TransactionIndexed, 1),
2072                (DataCategory::Profile, 1),
2073                (DataCategory::ProfileIndexed, 1),
2074                (DataCategory::Span, 1),
2075                (DataCategory::SpanIndexed, 1),
2076            ]
2077        );
2078    }
2079
2080    #[tokio::test]
2081    async fn test_enforce_transaction_standalone_profile_enforced() {
2082        // When the transaction is sampled, the profile survives as standalone.
2083        let envelope = envelope![Profile];
2084
2085        let mock = mock_limiter(&[DataCategory::Transaction]);
2086        let (_, enforcement, _) = enforce_and_apply(mock.clone(), envelope).await;
2087
2088        assert!(enforcement.profiles.is_active());
2089        mock.lock().await.assert_call(DataCategory::Profile, 1);
2090        mock.lock().await.assert_call(DataCategory::Transaction, 0);
2091
2092        assert_eq!(
2093            get_outcomes(enforcement),
2094            vec![
2095                (DataCategory::Profile, 1),
2096                (DataCategory::ProfileIndexed, 1),
2097            ]
2098        );
2099    }
2100
2101    #[tokio::test]
2102    async fn test_enforce_transaction_attachment_enforced_indexing_quota() {
2103        let envelope = envelope![Transaction, Attachment];
2104
2105        let mock = mock_limiter(&[DataCategory::TransactionIndexed]);
2106        let (_, enforcement, _) = enforce_and_apply(mock.clone(), envelope).await;
2107
2108        assert!(!enforcement.event.is_active());
2109        assert!(!enforcement.event_indexed.is_active());
2110        assert!(!enforcement.attachments_limits.event.is_active());
2111        mock.lock().await.assert_call(DataCategory::Transaction, 1);
2112        mock.lock().await.assert_call(DataCategory::Span, 1);
2113        mock.lock().await.assert_call(DataCategory::Attachment, 10);
2114        mock.lock()
2115            .await
2116            .assert_call(DataCategory::AttachmentItem, 1);
2117
2118        assert_eq!(get_outcomes(enforcement), vec![]);
2119    }
2120
2121    #[tokio::test]
2122    async fn test_enforce_span() {
2123        let envelope = envelope![Span, Span];
2124
2125        let mock = mock_limiter(&[DataCategory::Span]);
2126        let (_, enforcement, limits) = enforce_and_apply(mock.clone(), envelope).await;
2127
2128        assert!(limits.is_limited());
2129        assert!(enforcement.spans_indexed.is_active());
2130        assert!(enforcement.spans.is_active());
2131        mock.lock().await.assert_call(DataCategory::Span, 2);
2132
2133        assert_eq!(
2134            get_outcomes(enforcement),
2135            vec![(DataCategory::Span, 2), (DataCategory::SpanIndexed, 2)]
2136        );
2137    }
2138
2139    #[tokio::test]
2140    async fn test_enforce_span_no_indexing_quota() {
2141        let envelope = envelope![Span, Span];
2142
2143        let mock = mock_limiter(&[DataCategory::SpanIndexed]);
2144        let (_, enforcement, limits) = enforce_and_apply(mock.clone(), envelope).await;
2145
2146        assert!(!limits.is_limited());
2147        assert!(!enforcement.spans_indexed.is_active());
2148        assert!(!enforcement.spans.is_active());
2149        mock.lock().await.assert_call(DataCategory::Span, 2);
2150
2151        assert_eq!(get_outcomes(enforcement), vec![]);
2152    }
2153
2154    #[test]
2155    fn test_source_quantity_for_total_quantity() {
2156        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2157            .parse()
2158            .unwrap();
2159        let request_meta = RequestMeta::new(dsn);
2160
2161        let mut envelope = Envelope::from_request(None, request_meta);
2162
2163        let mut item = Item::new(ItemType::MetricBuckets);
2164        item.set_source_quantities(SourceQuantities {
2165            transactions: 5,
2166            spans: 0,
2167            buckets: 5,
2168        });
2169        envelope.add_item(item);
2170
2171        let mut item = Item::new(ItemType::MetricBuckets);
2172        item.set_source_quantities(SourceQuantities {
2173            transactions: 2,
2174            spans: 0,
2175            buckets: 3,
2176        });
2177        envelope.add_item(item);
2178
2179        let summary = EnvelopeSummary::compute(&envelope);
2180
2181        assert_eq!(summary.secondary_transaction_quantity, 7);
2182    }
2183
2184    #[tokio::test]
2185    async fn test_enforce_limit_logs_count() {
2186        let envelope = envelope![Log, Log];
2187
2188        let mock = mock_limiter(&[DataCategory::LogItem]);
2189        let (envelope, enforcement, limits) = enforce_and_apply(mock.clone(), envelope).await;
2190
2191        assert!(limits.is_limited());
2192        assert_eq!(envelope.len(), 0);
2193        mock.lock().await.assert_call(DataCategory::LogItem, 2);
2194
2195        assert_eq!(
2196            get_outcomes(enforcement),
2197            vec![(DataCategory::LogItem, 2), (DataCategory::LogByte, 20)]
2198        );
2199    }
2200
2201    #[tokio::test]
2202    async fn test_enforce_limit_logs_bytes() {
2203        let envelope = envelope![Log, Log];
2204
2205        let mock = mock_limiter(&[DataCategory::LogByte]);
2206        let (envelope, enforcement, limits) = enforce_and_apply(mock.clone(), envelope).await;
2207
2208        assert!(limits.is_limited());
2209        assert_eq!(envelope.len(), 0);
2210        mock.lock().await.assert_call(DataCategory::LogItem, 2);
2211        mock.lock().await.assert_call(DataCategory::LogByte, 20);
2212
2213        assert_eq!(
2214            get_outcomes(enforcement),
2215            vec![(DataCategory::LogItem, 2), (DataCategory::LogByte, 20)]
2216        );
2217    }
2218
2219    #[tokio::test]
2220    async fn test_enforce_standalone_span_attachment() {
2221        let test_cases = &[
2222            RateLimitTestCase {
2223                name: "span_limit",
2224                denied_categories: &[DataCategory::Span],
2225                expect_attachment_limit_active: true,
2226                expected_limiter_calls: &[(DataCategory::Span, 0)],
2227                expected_outcomes: &[
2228                    (DataCategory::Attachment, 7),
2229                    (DataCategory::AttachmentItem, 1),
2230                ],
2231            },
2232            RateLimitTestCase {
2233                name: "span_indexed_limit",
2234                denied_categories: &[DataCategory::SpanIndexed],
2235                expect_attachment_limit_active: false,
2236                expected_limiter_calls: &[
2237                    (DataCategory::Span, 0),
2238                    (DataCategory::Attachment, 7),
2239                    (DataCategory::AttachmentItem, 1),
2240                ],
2241                expected_outcomes: &[],
2242            },
2243            RateLimitTestCase {
2244                name: "attachment_limit",
2245                denied_categories: &[DataCategory::Attachment],
2246                expect_attachment_limit_active: true,
2247                expected_limiter_calls: &[(DataCategory::Span, 0), (DataCategory::Attachment, 7)],
2248                expected_outcomes: &[
2249                    (DataCategory::Attachment, 7),
2250                    (DataCategory::AttachmentItem, 1),
2251                ],
2252            },
2253            RateLimitTestCase {
2254                name: "attachment_indexed_limit",
2255                denied_categories: &[DataCategory::AttachmentItem],
2256                expect_attachment_limit_active: true,
2257                expected_limiter_calls: &[
2258                    (DataCategory::Span, 0),
2259                    (DataCategory::Attachment, 7),
2260                    (DataCategory::AttachmentItem, 1),
2261                ],
2262                expected_outcomes: &[
2263                    (DataCategory::Attachment, 7),
2264                    (DataCategory::AttachmentItem, 1),
2265                ],
2266            },
2267            RateLimitTestCase {
2268                name: "transaction_limit",
2269                denied_categories: &[DataCategory::Transaction],
2270                expect_attachment_limit_active: false,
2271                expected_limiter_calls: &[
2272                    (DataCategory::Span, 0),
2273                    (DataCategory::Attachment, 7),
2274                    (DataCategory::AttachmentItem, 1),
2275                ],
2276                expected_outcomes: &[],
2277            },
2278            RateLimitTestCase {
2279                name: "error_limit",
2280                denied_categories: &[DataCategory::Error],
2281                expect_attachment_limit_active: false,
2282                expected_limiter_calls: &[
2283                    (DataCategory::Span, 0),
2284                    (DataCategory::Attachment, 7),
2285                    (DataCategory::AttachmentItem, 1),
2286                ],
2287                expected_outcomes: &[],
2288            },
2289            RateLimitTestCase {
2290                name: "no_limits",
2291                denied_categories: &[],
2292                expect_attachment_limit_active: false,
2293                expected_limiter_calls: &[
2294                    (DataCategory::Span, 0),
2295                    (DataCategory::Attachment, 7),
2296                    (DataCategory::AttachmentItem, 1),
2297                ],
2298                expected_outcomes: &[],
2299            },
2300        ];
2301
2302        for RateLimitTestCase {
2303            name,
2304            denied_categories,
2305            expect_attachment_limit_active,
2306            expected_limiter_calls,
2307            expected_outcomes,
2308        } in test_cases
2309        {
2310            let mut envelope = envelope![];
2311            envelope.add_item(trace_attachment_item(7, Some(ParentId::SpanId(None))));
2312
2313            let mock = mock_limiter(denied_categories);
2314            let (_, enforcement, _) = enforce_and_apply(mock.clone(), envelope).await;
2315
2316            for &(category, quantity) in *expected_limiter_calls {
2317                mock.lock().await.assert_call(category, quantity);
2318            }
2319
2320            assert_eq!(
2321                enforcement.attachments_limits.span.bytes.is_active(),
2322                *expect_attachment_limit_active,
2323                "{name}: span_attachment byte limit mismatch"
2324            );
2325            assert_eq!(
2326                enforcement.attachments_limits.span.count.is_active(),
2327                *expect_attachment_limit_active,
2328                "{name}: span_attachment count limit mismatch"
2329            );
2330
2331            assert_eq!(
2332                get_outcomes(enforcement),
2333                *expected_outcomes,
2334                "{name}: outcome mismatch"
2335            );
2336        }
2337    }
2338
2339    #[tokio::test]
2340    async fn test_enforce_span_with_span_attachment() {
2341        let test_cases = &[
2342            RateLimitTestCase {
2343                name: "span_limit",
2344                denied_categories: &[DataCategory::Span, DataCategory::Attachment], // Attachment here has no effect
2345                expect_attachment_limit_active: true,
2346                expected_limiter_calls: &[(DataCategory::Span, 1)],
2347                expected_outcomes: &[
2348                    (DataCategory::Attachment, 7),
2349                    (DataCategory::AttachmentItem, 1),
2350                    (DataCategory::Span, 1),
2351                    (DataCategory::SpanIndexed, 1),
2352                ],
2353            },
2354            RateLimitTestCase {
2355                name: "span_indexed_limit",
2356                denied_categories: &[DataCategory::SpanIndexed, DataCategory::Attachment],
2357                expect_attachment_limit_active: true,
2358                expected_limiter_calls: &[(DataCategory::Span, 1), (DataCategory::Attachment, 7)],
2359                expected_outcomes: &[
2360                    (DataCategory::Attachment, 7),
2361                    (DataCategory::AttachmentItem, 1),
2362                ],
2363            },
2364            RateLimitTestCase {
2365                name: "attachment_limit",
2366                denied_categories: &[DataCategory::Attachment],
2367                expect_attachment_limit_active: true,
2368                expected_limiter_calls: &[(DataCategory::Span, 1), (DataCategory::Attachment, 7)],
2369                expected_outcomes: &[
2370                    (DataCategory::Attachment, 7),
2371                    (DataCategory::AttachmentItem, 1),
2372                ],
2373            },
2374            RateLimitTestCase {
2375                name: "attachment_indexed_limit",
2376                denied_categories: &[DataCategory::AttachmentItem],
2377                expect_attachment_limit_active: true,
2378                expected_limiter_calls: &[
2379                    (DataCategory::Span, 1),
2380                    (DataCategory::Attachment, 7),
2381                    (DataCategory::AttachmentItem, 1),
2382                ],
2383                expected_outcomes: &[
2384                    (DataCategory::Attachment, 7),
2385                    (DataCategory::AttachmentItem, 1),
2386                ],
2387            },
2388            RateLimitTestCase {
2389                name: "transaction_limit",
2390                denied_categories: &[DataCategory::Transaction],
2391                expect_attachment_limit_active: false,
2392                expected_limiter_calls: &[
2393                    (DataCategory::Span, 1),
2394                    (DataCategory::Attachment, 7),
2395                    (DataCategory::AttachmentItem, 1),
2396                ],
2397                expected_outcomes: &[],
2398            },
2399            RateLimitTestCase {
2400                name: "error_limit",
2401                denied_categories: &[DataCategory::Error],
2402                expect_attachment_limit_active: false,
2403                expected_limiter_calls: &[
2404                    (DataCategory::Span, 1),
2405                    (DataCategory::Attachment, 7),
2406                    (DataCategory::AttachmentItem, 1),
2407                ],
2408                expected_outcomes: &[],
2409            },
2410            RateLimitTestCase {
2411                name: "no_limits",
2412                denied_categories: &[],
2413                expect_attachment_limit_active: false,
2414                expected_limiter_calls: &[
2415                    (DataCategory::Span, 1),
2416                    (DataCategory::Attachment, 7),
2417                    (DataCategory::AttachmentItem, 1),
2418                ],
2419                expected_outcomes: &[],
2420            },
2421        ];
2422
2423        for RateLimitTestCase {
2424            name,
2425            denied_categories,
2426            expect_attachment_limit_active,
2427            expected_limiter_calls,
2428            expected_outcomes,
2429        } in test_cases
2430        {
2431            let mut envelope = envelope![Span];
2432            envelope.add_item(trace_attachment_item(7, Some(ParentId::SpanId(None))));
2433
2434            let mock = mock_limiter(denied_categories);
2435            let (_, enforcement, _) = enforce_and_apply(mock.clone(), envelope).await;
2436
2437            for &(category, quantity) in *expected_limiter_calls {
2438                mock.lock().await.assert_call(category, quantity);
2439            }
2440
2441            assert_eq!(
2442                enforcement.attachments_limits.span.bytes.is_active(),
2443                *expect_attachment_limit_active,
2444                "{name}: span_attachment byte limit mismatch"
2445            );
2446            assert_eq!(
2447                enforcement.attachments_limits.span.count.is_active(),
2448                *expect_attachment_limit_active,
2449                "{name}: span_attachment count limit mismatch"
2450            );
2451
2452            assert_eq!(
2453                get_outcomes(enforcement),
2454                *expected_outcomes,
2455                "{name}: outcome mismatch"
2456            );
2457        }
2458    }
2459
2460    #[tokio::test]
2461    async fn test_enforce_transaction_span_attachment() {
2462        let test_cases = &[
2463            RateLimitTestCase {
2464                name: "transaction_limit",
2465                denied_categories: &[DataCategory::Transaction],
2466                expect_attachment_limit_active: true,
2467                expected_limiter_calls: &[(DataCategory::Transaction, 1)],
2468                expected_outcomes: &[
2469                    (DataCategory::Transaction, 1),
2470                    (DataCategory::TransactionIndexed, 1),
2471                    (DataCategory::Attachment, 7),
2472                    (DataCategory::AttachmentItem, 1),
2473                    (DataCategory::Span, 1),
2474                    (DataCategory::SpanIndexed, 1),
2475                ],
2476            },
2477            RateLimitTestCase {
2478                name: "error_limit",
2479                denied_categories: &[DataCategory::Error],
2480                expect_attachment_limit_active: false,
2481                expected_limiter_calls: &[
2482                    (DataCategory::Transaction, 1),
2483                    (DataCategory::Span, 1),
2484                    (DataCategory::Attachment, 7),
2485                    (DataCategory::AttachmentItem, 1),
2486                ],
2487                expected_outcomes: &[],
2488            },
2489            RateLimitTestCase {
2490                name: "no_limits",
2491                denied_categories: &[],
2492                expect_attachment_limit_active: false,
2493                expected_limiter_calls: &[
2494                    (DataCategory::Transaction, 1),
2495                    (DataCategory::Span, 1),
2496                    (DataCategory::Attachment, 7),
2497                    (DataCategory::AttachmentItem, 1),
2498                ],
2499                expected_outcomes: &[],
2500            },
2501        ];
2502
2503        for RateLimitTestCase {
2504            name,
2505            denied_categories,
2506            expect_attachment_limit_active,
2507            expected_limiter_calls,
2508            expected_outcomes,
2509        } in test_cases
2510        {
2511            let mut envelope = envelope![Transaction];
2512            envelope.add_item(trace_attachment_item(7, Some(ParentId::SpanId(None))));
2513
2514            let mock = mock_limiter(denied_categories);
2515            let (_, enforcement, _) = enforce_and_apply(mock.clone(), envelope).await;
2516
2517            for &(category, quantity) in *expected_limiter_calls {
2518                mock.lock().await.assert_call(category, quantity);
2519            }
2520
2521            assert_eq!(
2522                enforcement.attachments_limits.span.bytes.is_active(),
2523                *expect_attachment_limit_active,
2524                "{name}: span_attachment byte limit mismatch"
2525            );
2526            assert_eq!(
2527                enforcement.attachments_limits.span.count.is_active(),
2528                *expect_attachment_limit_active,
2529                "{name}: span_attachment count limit mismatch"
2530            );
2531
2532            assert_eq!(
2533                get_outcomes(enforcement),
2534                *expected_outcomes,
2535                "{name}: outcome mismatch"
2536            );
2537        }
2538    }
2539
2540    #[tokio::test]
2541    async fn test_enforce_standalone_trace_attachment() {
2542        let test_cases = &[
2543            RateLimitTestCase {
2544                name: "attachment_limit",
2545                denied_categories: &[DataCategory::Attachment],
2546                expect_attachment_limit_active: true,
2547                expected_limiter_calls: &[(DataCategory::Attachment, 7)],
2548                expected_outcomes: &[
2549                    (DataCategory::Attachment, 7),
2550                    (DataCategory::AttachmentItem, 1),
2551                ],
2552            },
2553            RateLimitTestCase {
2554                name: "attachment_limit_and_attachment_item_limit",
2555                denied_categories: &[DataCategory::Attachment, DataCategory::AttachmentItem],
2556                expect_attachment_limit_active: true,
2557                expected_limiter_calls: &[(DataCategory::Attachment, 7)],
2558                expected_outcomes: &[
2559                    (DataCategory::Attachment, 7),
2560                    (DataCategory::AttachmentItem, 1),
2561                ],
2562            },
2563            RateLimitTestCase {
2564                name: "attachment_item_limit",
2565                denied_categories: &[DataCategory::AttachmentItem],
2566                expect_attachment_limit_active: true,
2567                expected_limiter_calls: &[
2568                    (DataCategory::Attachment, 7),
2569                    (DataCategory::AttachmentItem, 1),
2570                ],
2571                expected_outcomes: &[
2572                    (DataCategory::Attachment, 7),
2573                    (DataCategory::AttachmentItem, 1),
2574                ],
2575            },
2576            RateLimitTestCase {
2577                name: "no_limits",
2578                denied_categories: &[],
2579                expect_attachment_limit_active: false,
2580                expected_limiter_calls: &[
2581                    (DataCategory::Attachment, 7),
2582                    (DataCategory::AttachmentItem, 1),
2583                ],
2584                expected_outcomes: &[],
2585            },
2586        ];
2587
2588        for RateLimitTestCase {
2589            name,
2590            denied_categories,
2591            expect_attachment_limit_active,
2592            expected_limiter_calls,
2593            expected_outcomes,
2594        } in test_cases
2595        {
2596            let mut envelope = envelope![];
2597            envelope.add_item(trace_attachment_item(7, None));
2598
2599            let mock = mock_limiter(denied_categories);
2600            let (_, enforcement, _) = enforce_and_apply(mock.clone(), envelope).await;
2601
2602            for &(category, quantity) in *expected_limiter_calls {
2603                mock.lock().await.assert_call(category, quantity);
2604            }
2605
2606            assert_eq!(
2607                enforcement.attachments_limits.trace.bytes.is_active(),
2608                *expect_attachment_limit_active,
2609                "{name}: trace_attachment byte limit mismatch"
2610            );
2611            assert_eq!(
2612                enforcement.attachments_limits.trace.count.is_active(),
2613                *expect_attachment_limit_active,
2614                "{name}: trace_attachment count limit mismatch"
2615            );
2616
2617            assert_eq!(
2618                get_outcomes(enforcement),
2619                *expected_outcomes,
2620                "{name}: outcome mismatch"
2621            );
2622        }
2623    }
2624}