relay_server/utils/
rate_limits.rs

1use std::fmt::{self, Write};
2use std::future::Future;
3use std::marker::PhantomData;
4
5use relay_profiling::ProfileType;
6use relay_quotas::{
7    DataCategory, ItemScoping, QuotaScope, RateLimit, RateLimitScope, RateLimits, ReasonCode,
8    Scoping,
9};
10
11use crate::envelope::{Envelope, Item, ItemType, ParentId};
12use crate::integrations::Integration;
13use crate::managed::ManagedEnvelope;
14use crate::services::outcome::Outcome;
15
16/// Name of the rate limits header.
17pub const RATE_LIMITS_HEADER: &str = "X-Sentry-Rate-Limits";
18
19/// Formats the `X-Sentry-Rate-Limits` header.
20pub fn format_rate_limits(rate_limits: &RateLimits) -> String {
21    let mut header = String::new();
22
23    for rate_limit in rate_limits {
24        if !header.is_empty() {
25            header.push_str(", ");
26        }
27
28        write!(header, "{}:", rate_limit.retry_after.remaining_seconds()).ok();
29
30        for (index, category) in rate_limit.categories.iter().enumerate() {
31            if index > 0 {
32                header.push(';');
33            }
34            write!(header, "{category}").ok();
35        }
36
37        write!(header, ":{}", rate_limit.scope.name()).ok();
38
39        if let Some(ref reason_code) = rate_limit.reason_code {
40            write!(header, ":{reason_code}").ok();
41        } else if !rate_limit.namespaces.is_empty() {
42            write!(header, ":").ok(); // delimits the empty reason code for namespaces
43        }
44
45        for (index, namespace) in rate_limit.namespaces.iter().enumerate() {
46            header.push(if index == 0 { ':' } else { ';' });
47            write!(header, "{namespace}").ok();
48        }
49    }
50
51    header
52}
53
54/// Parses the `X-Sentry-Rate-Limits` header.
55pub fn parse_rate_limits(scoping: &Scoping, string: &str) -> RateLimits {
56    let mut rate_limits = RateLimits::new();
57
58    for limit in string.split(',') {
59        let limit = limit.trim();
60        if limit.is_empty() {
61            continue;
62        }
63
64        let mut components = limit.split(':');
65
66        let retry_after = match components.next().and_then(|s| s.parse().ok()) {
67            Some(retry_after) => retry_after,
68            None => continue,
69        };
70
71        let categories = components
72            .next()
73            .unwrap_or("")
74            .split(';')
75            .filter(|category| !category.is_empty())
76            .map(DataCategory::from_name)
77            .collect();
78
79        let quota_scope = QuotaScope::from_name(components.next().unwrap_or(""));
80        let scope = RateLimitScope::for_quota(*scoping, quota_scope);
81
82        let reason_code = components
83            .next()
84            .filter(|s| !s.is_empty())
85            .map(ReasonCode::new);
86
87        let namespace = components
88            .next()
89            .unwrap_or("")
90            .split(';')
91            .filter(|s| !s.is_empty())
92            .filter_map(|s| s.parse().ok())
93            .collect();
94
95        rate_limits.add(RateLimit {
96            categories,
97            scope,
98            reason_code,
99            retry_after,
100            namespaces: namespace,
101        });
102    }
103
104    rate_limits
105}
106
107/// Infer the data category from an item.
108///
109/// Categories depend mostly on the item type, with a few special cases:
110/// - `Event`: the category is inferred from the event type. This requires the `event_type` header
111///   to be set on the event item.
112/// - `Attachment`: If the attachment creates an event (e.g. for minidumps), the category is assumed
113///   to be `Error`.
114fn infer_event_category(item: &Item) -> Option<DataCategory> {
115    match item.ty() {
116        ItemType::Event => Some(DataCategory::Error),
117        ItemType::Transaction => Some(DataCategory::Transaction),
118        ItemType::Security | ItemType::RawSecurity => Some(DataCategory::Security),
119        ItemType::UnrealReport => Some(DataCategory::Error),
120        ItemType::UserReportV2 => Some(DataCategory::UserReportV2),
121        ItemType::Attachment if item.creates_event() => Some(DataCategory::Error),
122        ItemType::Attachment => None,
123        ItemType::Session => None,
124        ItemType::Sessions => None,
125        ItemType::Statsd => None,
126        ItemType::MetricBuckets => None,
127        ItemType::FormData => None,
128        ItemType::UserReport => None,
129        ItemType::Profile => None,
130        ItemType::ReplayEvent => None,
131        ItemType::ReplayRecording => None,
132        ItemType::ReplayVideo => None,
133        ItemType::ClientReport => None,
134        ItemType::CheckIn => None,
135        ItemType::Nel => None,
136        ItemType::Log => None,
137        ItemType::TraceMetric => None,
138        ItemType::Span => None,
139        ItemType::ProfileChunk => None,
140        ItemType::Integration => None,
141        ItemType::Unknown(_) => None,
142    }
143}
144
145/// A summary of `Envelope` contents.
146///
147/// Summarizes the contained event, size of attachments, session updates, and whether there are
148/// plain attachments. This is used for efficient rate limiting or outcome handling.
149#[non_exhaustive]
150#[derive(Clone, Copy, Debug, Default)]
151pub struct EnvelopeSummary {
152    /// The data category of the event in the envelope. `None` if there is no event.
153    pub event_category: Option<DataCategory>,
154
155    /// The quantity of all attachments combined in bytes.
156    pub attachment_quantity: usize,
157
158    /// The number of attachments.
159    pub attachment_item_quantity: usize,
160
161    /// The number of all session updates.
162    pub session_quantity: usize,
163
164    /// The number of profiles.
165    pub profile_quantity: usize,
166
167    /// The number of replays.
168    pub replay_quantity: usize,
169
170    /// The number of user reports (legacy item type for user feedback).
171    pub user_report_quantity: usize,
172
173    /// The number of monitor check-ins.
174    pub monitor_quantity: usize,
175
176    /// The number of log for the log product sent.
177    pub log_item_quantity: usize,
178
179    /// The number of log bytes for the log product sent, in bytes
180    pub log_byte_quantity: usize,
181
182    /// Secondary number of transactions.
183    ///
184    /// This is 0 for envelopes which contain a transaction,
185    /// only secondary transaction quantity should be tracked here,
186    /// these are for example transaction counts extracted from metrics.
187    ///
188    /// A "primary" transaction is contained within the envelope,
189    /// marking the envelope data category a [`DataCategory::Transaction`].
190    pub secondary_transaction_quantity: usize,
191
192    /// See `secondary_transaction_quantity`.
193    pub secondary_span_quantity: usize,
194
195    /// The number of standalone spans.
196    pub span_quantity: usize,
197
198    /// Indicates that the envelope contains regular attachments that do not create event payloads.
199    pub has_plain_attachments: bool,
200
201    /// The payload size of this envelope.
202    pub payload_size: usize,
203
204    /// The number of profile chunks in this envelope.
205    pub profile_chunk_quantity: usize,
206    /// The number of UI profile chunks in this envelope.
207    pub profile_chunk_ui_quantity: usize,
208
209    /// The number of trace metrics in this envelope.
210    pub trace_metric_quantity: usize,
211}
212
213impl EnvelopeSummary {
214    /// Creates an empty summary.
215    pub fn empty() -> Self {
216        Self::default()
217    }
218
219    /// Creates an envelope summary and aggregates the given envelope.
220    pub fn compute(envelope: &Envelope) -> Self {
221        let mut summary = Self::empty();
222
223        for item in envelope.items() {
224            if item.creates_event() {
225                summary.infer_category(item);
226            } else if item.ty() == &ItemType::Attachment {
227                // Plain attachments do not create events.
228                summary.has_plain_attachments = true;
229            }
230
231            // If the item has been rate limited before, the quota has been consumed and outcomes
232            // emitted. We can skip it here.
233            if item.rate_limited() {
234                continue;
235            }
236
237            if let Some(source_quantities) = item.source_quantities() {
238                summary.secondary_transaction_quantity += source_quantities.transactions;
239                summary.secondary_span_quantity += source_quantities.spans;
240                summary.profile_quantity += source_quantities.profiles;
241            }
242
243            summary.payload_size += item.len();
244
245            for (category, quantity) in item.quantities() {
246                summary.add_quantity(category, quantity);
247            }
248
249            // Special case since v1 and v2 share a data category.
250            // Adding this in add_quantity would include v2 in the count.
251            if item.ty() == &ItemType::UserReport {
252                summary.user_report_quantity += 1;
253            }
254        }
255
256        summary
257    }
258
259    fn add_quantity(&mut self, category: DataCategory, quantity: usize) {
260        let target_quantity = match category {
261            DataCategory::Attachment => &mut self.attachment_quantity,
262            DataCategory::AttachmentItem => &mut self.attachment_item_quantity,
263            DataCategory::Session => &mut self.session_quantity,
264            DataCategory::Profile => &mut self.profile_quantity,
265            DataCategory::Replay => &mut self.replay_quantity,
266            DataCategory::DoNotUseReplayVideo => &mut self.replay_quantity,
267            DataCategory::Monitor => &mut self.monitor_quantity,
268            DataCategory::Span => &mut self.span_quantity,
269            DataCategory::TraceMetric => &mut self.trace_metric_quantity,
270            DataCategory::LogItem => &mut self.log_item_quantity,
271            DataCategory::LogByte => &mut self.log_byte_quantity,
272            DataCategory::ProfileChunk => &mut self.profile_chunk_quantity,
273            DataCategory::ProfileChunkUi => &mut self.profile_chunk_ui_quantity,
274            // TODO: This catch-all return looks dangerous
275            _ => return,
276        };
277        *target_quantity += quantity;
278    }
279
280    /// Infers the appropriate [`DataCategory`] for the envelope [`Item`].
281    ///
282    /// The inferred category is only applied to the [`EnvelopeSummary`] if there is not yet
283    /// a category set.
284    fn infer_category(&mut self, item: &Item) {
285        if matches!(self.event_category, None | Some(DataCategory::Default))
286            && let Some(category) = infer_event_category(item)
287        {
288            self.event_category = Some(category);
289        }
290    }
291}
292
293/// Rate limiting information for a data category.
294#[derive(Debug)]
295#[cfg_attr(test, derive(Clone))]
296pub struct CategoryLimit {
297    /// The limited data category.
298    category: DataCategory,
299    /// The total rate limited quantity across all items.
300    ///
301    /// This will be `0` if nothing was rate limited.
302    quantity: usize,
303    /// The reason code of the applied rate limit.
304    ///
305    /// Defaults to `None` if the quota does not declare a reason code.
306    reason_code: Option<ReasonCode>,
307}
308
309impl CategoryLimit {
310    /// Creates a new `CategoryLimit`.
311    ///
312    /// Returns an inactive limit if `quantity` is `0` or `rate_limit` is `None`.
313    fn new(category: DataCategory, quantity: usize, rate_limit: Option<&RateLimit>) -> Self {
314        match rate_limit {
315            Some(limit) => Self {
316                category,
317                quantity,
318                reason_code: limit.reason_code.clone(),
319            },
320            None => Self::default(),
321        }
322    }
323
324    /// Recreates the category limit, if active, for a new category with the same reason.
325    pub fn clone_for(&self, category: DataCategory, quantity: usize) -> CategoryLimit {
326        if !self.is_active() {
327            return Self::default();
328        }
329
330        Self {
331            category,
332            quantity,
333            reason_code: self.reason_code.clone(),
334        }
335    }
336
337    /// Returns `true` if this is an active limit.
338    ///
339    /// This indicates that the category is limited and a certain quantity is removed from the
340    /// Envelope. If the limit is inactive, there is no change.
341    pub fn is_active(&self) -> bool {
342        self.quantity > 0
343    }
344}
345
346impl Default for CategoryLimit {
347    fn default() -> Self {
348        Self {
349            category: DataCategory::Default,
350            quantity: 0,
351            reason_code: None,
352        }
353    }
354}
355
356/// Information on the limited quantities returned by [`EnvelopeLimiter::compute`].
357#[derive(Default, Debug)]
358#[cfg_attr(test, derive(Clone))]
359pub struct Enforcement {
360    /// The event item rate limit.
361    pub event: CategoryLimit,
362    /// The rate limit for the indexed category of the event.
363    pub event_indexed: CategoryLimit,
364    /// The combined attachment bytes rate limit.
365    pub attachments: CategoryLimit,
366    /// The combined attachment item rate limit.
367    pub attachment_items: CategoryLimit,
368    /// The combined session item rate limit.
369    pub sessions: CategoryLimit,
370    /// The combined profile item rate limit.
371    pub profiles: CategoryLimit,
372    /// The rate limit for the indexed profiles category.
373    pub profiles_indexed: CategoryLimit,
374    /// The combined replay item rate limit.
375    pub replays: CategoryLimit,
376    /// The combined check-in item rate limit.
377    pub check_ins: CategoryLimit,
378    /// The combined logs (our product logs) rate limit.
379    pub log_items: CategoryLimit,
380    /// The combined logs (our product logs) rate limit.
381    pub log_bytes: CategoryLimit,
382    /// The combined spans rate limit.
383    pub spans: CategoryLimit,
384    /// The rate limit for the indexed span category.
385    pub spans_indexed: CategoryLimit,
386    /// The rate limit for user report v1.
387    pub user_reports: CategoryLimit,
388    /// The combined profile chunk item rate limit.
389    pub profile_chunks: CategoryLimit,
390    /// The combined profile chunk ui item rate limit.
391    pub profile_chunks_ui: CategoryLimit,
392    /// The combined trace metric item rate limit.
393    pub trace_metrics: CategoryLimit,
394}
395
396impl Enforcement {
397    /// Returns the `CategoryLimit` for the event.
398    ///
399    /// `None` if the event is not rate limited.
400    pub fn active_event(&self) -> Option<&CategoryLimit> {
401        if self.event.is_active() {
402            Some(&self.event)
403        } else if self.event_indexed.is_active() {
404            Some(&self.event_indexed)
405        } else {
406            None
407        }
408    }
409
410    /// Returns `true` if the event is rate limited.
411    pub fn is_event_active(&self) -> bool {
412        self.active_event().is_some()
413    }
414
415    /// Helper for `track_outcomes`.
416    fn get_outcomes(self) -> impl Iterator<Item = (Outcome, DataCategory, usize)> {
417        let Self {
418            event,
419            event_indexed,
420            attachments,
421            attachment_items,
422            sessions: _, // Do not report outcomes for sessions.
423            profiles,
424            profiles_indexed,
425            replays,
426            check_ins,
427            log_items,
428            log_bytes,
429            spans,
430            spans_indexed,
431            user_reports,
432            profile_chunks,
433            profile_chunks_ui,
434            trace_metrics,
435        } = self;
436
437        let limits = [
438            event,
439            event_indexed,
440            attachments,
441            attachment_items,
442            profiles,
443            profiles_indexed,
444            replays,
445            check_ins,
446            log_items,
447            log_bytes,
448            spans,
449            spans_indexed,
450            user_reports,
451            profile_chunks,
452            profile_chunks_ui,
453            trace_metrics,
454        ];
455
456        limits
457            .into_iter()
458            .filter(move |limit| limit.is_active())
459            .map(move |limit| {
460                (
461                    Outcome::RateLimited(limit.reason_code),
462                    limit.category,
463                    limit.quantity,
464                )
465            })
466    }
467
468    /// Applies the [`Enforcement`] on the [`Envelope`] by removing all items that were rate limited
469    /// and emits outcomes for each rate limited category.
470    ///
471    /// # Example
472    ///
473    /// ## Interaction between Events and Attachments
474    ///
475    /// An envelope with an `Error` event and an `Attachment`. Two quotas specify to drop all
476    /// attachments (reason `"a"`) and all errors (reason `"e"`). The result of enforcement will be:
477    ///
478    /// 1. All items are removed from the envelope.
479    /// 2. Enforcements report both the event and the attachment dropped with reason `"e"`, since
480    ///    dropping an event automatically drops all attachments with the same reason.
481    /// 3. Rate limits report the single event limit `"e"`, since attachment limits do not need to
482    ///    be checked in this case.
483    ///
484    /// ## Required Attachments
485    ///
486    /// An envelope with a single Minidump `Attachment`, and a single quota specifying to drop all
487    /// attachments with reason `"a"`:
488    ///
489    /// 1. Since the minidump creates an event and is required for processing, it remains in the
490    ///    envelope and is marked as `rate_limited`.
491    /// 2. Enforcements report the attachment dropped with reason `"a"`.
492    /// 3. Rate limits are empty since it is allowed to send required attachments even when rate
493    ///    limited.
494    ///
495    /// ## Previously Rate Limited Attachments
496    ///
497    /// An envelope with a single item marked as `rate_limited`, and a quota specifying to drop
498    /// everything with reason `"d"`:
499    ///
500    /// 1. The item remains in the envelope.
501    /// 2. Enforcements are empty. Rate limiting has occurred at an earlier stage in the pipeline.
502    /// 3. Rate limits are empty.
503    pub fn apply_with_outcomes(self, envelope: &mut ManagedEnvelope) {
504        envelope
505            .envelope_mut()
506            .retain_items(|item| self.retain_item(item));
507        self.track_outcomes(envelope);
508    }
509
510    /// Returns `true` when an [`Item`] can be retained, `false` otherwise.
511    fn retain_item(&self, item: &mut Item) -> bool {
512        // Remove event items and all items that depend on this event
513        if self.event.is_active() && item.requires_event() {
514            return false;
515        }
516
517        // When checking limits for categories that have an indexed variant,
518        // we only have to check the more specific, the indexed, variant
519        // to determine whether an item is limited.
520        match item.ty() {
521            ItemType::Attachment => {
522                // Drop span attachments if they have a span_id item header and span quota is null.
523                if item.is_attachment_v2() && matches!(item.parent_id(), Some(ParentId::SpanId(_))) && (self.spans_indexed.is_active() || self.spans.is_active()) {
524                    return false;
525                }
526                if !(self.attachments.is_active() || self.attachment_items.is_active()) {
527                    return true;
528                }
529                if item.creates_event() {
530                    item.set_rate_limited(true);
531                    true
532                } else {
533                    false
534                }
535            }
536            ItemType::Session => !self.sessions.is_active(),
537            ItemType::Profile => !self.profiles_indexed.is_active(),
538            ItemType::ReplayEvent => !self.replays.is_active(),
539            ItemType::ReplayVideo => !self.replays.is_active(),
540            ItemType::ReplayRecording => !self.replays.is_active(),
541            ItemType::UserReport => !self.user_reports.is_active(),
542            ItemType::CheckIn => !self.check_ins.is_active(),
543            ItemType::Log => {
544                !(self.log_items.is_active() || self.log_bytes.is_active())
545            }
546            ItemType::Span => !self.spans_indexed.is_active(),
547            ItemType::ProfileChunk => match item.profile_type() {
548                Some(ProfileType::Backend) => !self.profile_chunks.is_active(),
549                Some(ProfileType::Ui) => !self.profile_chunks_ui.is_active(),
550                None => true,
551            },
552            ItemType::TraceMetric => !self.trace_metrics.is_active(),
553            ItemType::Integration => match item.integration() {
554                Some(Integration::Logs(_)) => !(self.log_items.is_active() || self.log_bytes.is_active()),
555                Some(Integration::Spans(_)) => !self.spans_indexed.is_active(),
556                None => true,
557            },
558            ItemType::Event
559            | ItemType::Transaction
560            | ItemType::Security
561            | ItemType::FormData
562            | ItemType::RawSecurity
563            | ItemType::Nel
564            | ItemType::UnrealReport
565            | ItemType::Sessions
566            | ItemType::Statsd
567            | ItemType::MetricBuckets
568            | ItemType::ClientReport
569            | ItemType::UserReportV2  // This is an event type.
570            | ItemType::Unknown(_) => true,
571        }
572    }
573
574    /// Invokes track outcome on all enforcements reported by the [`EnvelopeLimiter`].
575    ///
576    /// Relay generally does not emit outcomes for sessions, so those are skipped.
577    fn track_outcomes(self, envelope: &mut ManagedEnvelope) {
578        for (outcome, category, quantity) in self.get_outcomes() {
579            envelope.track_outcome(outcome, category, quantity)
580        }
581    }
582}
583
584/// Which limits to check with the [`EnvelopeLimiter`].
585#[derive(Debug, Copy, Clone)]
586pub enum CheckLimits {
587    /// Checks all limits except indexed categories.
588    ///
589    /// In the fast path it is necessary to apply cached rate limits but to not enforce indexed rate limits.
590    /// Because at the time of the check the decision whether an envelope is sampled or not is not yet known.
591    /// Additionally even if the item is later dropped by dynamic sampling, it must still be around to extract metrics
592    /// and cannot be dropped too early.
593    NonIndexed,
594    /// Checks all limits against the envelope.
595    All,
596}
597
598struct Check<F, E, R> {
599    limits: CheckLimits,
600    check: F,
601    _1: PhantomData<E>,
602    _2: PhantomData<R>,
603}
604
605impl<F, E, R> Check<F, E, R>
606where
607    F: FnMut(ItemScoping, usize) -> R,
608    R: Future<Output = Result<RateLimits, E>>,
609{
610    async fn apply(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, E> {
611        if matches!(self.limits, CheckLimits::NonIndexed) && scoping.category.is_indexed() {
612            return Ok(RateLimits::default());
613        }
614
615        (self.check)(scoping, quantity).await
616    }
617}
618
619/// Enforces rate limits with the given `check` function on items in the envelope.
620///
621/// The `check` function is called with the following rules:
622///  - Once for a single event, if present in the envelope.
623///  - Once for all comprised attachments, unless the event was rate limited.
624///  - Once for all comprised sessions.
625///
626/// Items violating the rate limit are removed from the envelope. This follows a set of rules:
627///  - If the event is removed, all items depending on the event are removed (e.g. attachments).
628///  - Attachments are not removed if they create events (e.g. minidumps).
629///  - Sessions are handled separately from all of the above.
630pub struct EnvelopeLimiter<F, E, R> {
631    check: Check<F, E, R>,
632    event_category: Option<DataCategory>,
633}
634
635impl<'a, F, E, R> EnvelopeLimiter<F, E, R>
636where
637    F: FnMut(ItemScoping, usize) -> R,
638    R: Future<Output = Result<RateLimits, E>>,
639{
640    /// Create a new `EnvelopeLimiter` with the given `check` function.
641    pub fn new(limits: CheckLimits, check: F) -> Self {
642        Self {
643            check: Check {
644                check,
645                limits,
646                _1: PhantomData,
647                _2: PhantomData,
648            },
649            event_category: None,
650        }
651    }
652
653    /// Assume an event with the given category, even if no item is present in the envelope.
654    ///
655    /// This ensures that rate limits for the given data category are checked even if there is no
656    /// matching item in the envelope. Other items are handled according to the rules as if the
657    /// event item were present.
658    pub fn assume_event(&mut self, category: DataCategory) {
659        self.event_category = Some(category);
660    }
661
662    /// Process rate limits for the envelope, returning applied limits.
663    ///
664    /// Returns a tuple of `Enforcement` and `RateLimits`:
665    ///
666    /// - Enforcements declare the quantities of categories that have been rate limited with the
667    ///   individual reason codes that caused rate limiting. If multiple rate limits applied to a
668    ///   category, then the longest limit is reported.
669    /// - Rate limits declare all active rate limits, regardless of whether they have been applied
670    ///   to items in the envelope. This excludes rate limits applied to required attachments, since
671    ///   clients are allowed to continue sending them.
672    pub async fn compute(
673        mut self,
674        envelope: &mut Envelope,
675        scoping: &'a Scoping,
676    ) -> Result<(Enforcement, RateLimits), E> {
677        let mut summary = EnvelopeSummary::compute(envelope);
678        summary.event_category = self.event_category.or(summary.event_category);
679
680        let (enforcement, rate_limits) = self.execute(&summary, scoping).await?;
681        Ok((enforcement, rate_limits))
682    }
683
684    async fn execute(
685        &mut self,
686        summary: &EnvelopeSummary,
687        scoping: &'a Scoping,
688    ) -> Result<(Enforcement, RateLimits), E> {
689        let mut rate_limits = RateLimits::new();
690        let mut enforcement = Enforcement::default();
691
692        // Handle event.
693        if let Some(category) = summary.event_category {
694            // Check the broad category for limits.
695            let mut event_limits = self.check.apply(scoping.item(category), 1).await?;
696            enforcement.event = CategoryLimit::new(category, 1, event_limits.longest());
697
698            if let Some(index_category) = category.index_category() {
699                // Check the specific/indexed category for limits only if the specific one has not already
700                // an enforced limit.
701                if event_limits.is_empty() {
702                    event_limits.merge(self.check.apply(scoping.item(index_category), 1).await?);
703                }
704
705                enforcement.event_indexed =
706                    CategoryLimit::new(index_category, 1, event_limits.longest());
707            };
708
709            rate_limits.merge(event_limits);
710        }
711
712        // Handle attachments.
713        if let Some(limit) = enforcement.active_event() {
714            let limit1 = limit.clone_for(DataCategory::Attachment, summary.attachment_quantity);
715            let limit2 = limit.clone_for(
716                DataCategory::AttachmentItem,
717                summary.attachment_item_quantity,
718            );
719            enforcement.attachments = limit1;
720            enforcement.attachment_items = limit2;
721        } else {
722            let mut attachment_limits = RateLimits::new();
723            if summary.attachment_quantity > 0 {
724                let item_scoping = scoping.item(DataCategory::Attachment);
725
726                let attachment_byte_limits = self
727                    .check
728                    .apply(item_scoping, summary.attachment_quantity)
729                    .await?;
730
731                enforcement.attachments = CategoryLimit::new(
732                    DataCategory::Attachment,
733                    summary.attachment_quantity,
734                    attachment_byte_limits.longest(),
735                );
736                enforcement.attachment_items = enforcement.attachments.clone_for(
737                    DataCategory::AttachmentItem,
738                    summary.attachment_item_quantity,
739                );
740                attachment_limits.merge(attachment_byte_limits);
741            }
742            if !attachment_limits.is_limited() && summary.attachment_item_quantity > 0 {
743                let item_scoping = scoping.item(DataCategory::AttachmentItem);
744
745                let attachment_item_limits = self
746                    .check
747                    .apply(item_scoping, summary.attachment_item_quantity)
748                    .await?;
749
750                enforcement.attachment_items = CategoryLimit::new(
751                    DataCategory::AttachmentItem,
752                    summary.attachment_item_quantity,
753                    attachment_item_limits.longest(),
754                );
755                enforcement.attachments = enforcement
756                    .attachment_items
757                    .clone_for(DataCategory::Attachment, summary.attachment_quantity);
758                attachment_limits.merge(attachment_item_limits);
759            }
760
761            // Only record rate limits for plain attachments. For all other attachments, it's
762            // perfectly "legal" to send them. They will still be discarded in Sentry, but clients
763            // can continue to send them.
764            if summary.has_plain_attachments {
765                rate_limits.merge(attachment_limits);
766            }
767        }
768
769        // Handle sessions.
770        if summary.session_quantity > 0 {
771            let item_scoping = scoping.item(DataCategory::Session);
772            let session_limits = self
773                .check
774                .apply(item_scoping, summary.session_quantity)
775                .await?;
776            enforcement.sessions = CategoryLimit::new(
777                DataCategory::Session,
778                summary.session_quantity,
779                session_limits.longest(),
780            );
781            rate_limits.merge(session_limits);
782        }
783
784        // Handle trace metrics.
785        if summary.trace_metric_quantity > 0 {
786            let item_scoping = scoping.item(DataCategory::TraceMetric);
787            let trace_metric_limits = self
788                .check
789                .apply(item_scoping, summary.trace_metric_quantity)
790                .await?;
791            enforcement.trace_metrics = CategoryLimit::new(
792                DataCategory::TraceMetric,
793                summary.trace_metric_quantity,
794                trace_metric_limits.longest(),
795            );
796            rate_limits.merge(trace_metric_limits);
797        }
798
799        // Handle logs.
800        if summary.log_item_quantity > 0 {
801            let item_scoping = scoping.item(DataCategory::LogItem);
802            let log_limits = self
803                .check
804                .apply(item_scoping, summary.log_item_quantity)
805                .await?;
806            enforcement.log_items = CategoryLimit::new(
807                DataCategory::LogItem,
808                summary.log_item_quantity,
809                log_limits.longest(),
810            );
811            rate_limits.merge(log_limits);
812        }
813        if summary.log_byte_quantity > 0 {
814            let item_scoping = scoping.item(DataCategory::LogByte);
815            let log_limits = self
816                .check
817                .apply(item_scoping, summary.log_byte_quantity)
818                .await?;
819            enforcement.log_bytes = CategoryLimit::new(
820                DataCategory::LogByte,
821                summary.log_byte_quantity,
822                log_limits.longest(),
823            );
824            rate_limits.merge(log_limits);
825        }
826
827        // Handle profiles.
828        if enforcement.is_event_active() {
829            enforcement.profiles = enforcement
830                .event
831                .clone_for(DataCategory::Profile, summary.profile_quantity);
832
833            enforcement.profiles_indexed = enforcement
834                .event_indexed
835                .clone_for(DataCategory::ProfileIndexed, summary.profile_quantity)
836        } else if summary.profile_quantity > 0 {
837            let mut profile_limits = self
838                .check
839                .apply(
840                    scoping.item(DataCategory::Profile),
841                    summary.profile_quantity,
842                )
843                .await?;
844
845            // Profiles can persist in envelopes without transaction if the transaction item
846            // was dropped by dynamic sampling.
847            if profile_limits.is_empty() && summary.event_category.is_none() {
848                profile_limits = self
849                    .check
850                    .apply(scoping.item(DataCategory::Transaction), 0)
851                    .await?;
852            }
853
854            enforcement.profiles = CategoryLimit::new(
855                DataCategory::Profile,
856                summary.profile_quantity,
857                profile_limits.longest(),
858            );
859
860            if profile_limits.is_empty() {
861                profile_limits.merge(
862                    self.check
863                        .apply(
864                            scoping.item(DataCategory::ProfileIndexed),
865                            summary.profile_quantity,
866                        )
867                        .await?,
868                );
869            }
870
871            enforcement.profiles_indexed = CategoryLimit::new(
872                DataCategory::ProfileIndexed,
873                summary.profile_quantity,
874                profile_limits.longest(),
875            );
876
877            rate_limits.merge(profile_limits);
878        }
879
880        // Handle replays.
881        if summary.replay_quantity > 0 {
882            let item_scoping = scoping.item(DataCategory::Replay);
883            let replay_limits = self
884                .check
885                .apply(item_scoping, summary.replay_quantity)
886                .await?;
887            enforcement.replays = CategoryLimit::new(
888                DataCategory::Replay,
889                summary.replay_quantity,
890                replay_limits.longest(),
891            );
892            rate_limits.merge(replay_limits);
893        }
894
895        // Handle user report v1s, which share limits with v2.
896        if summary.user_report_quantity > 0 {
897            let item_scoping = scoping.item(DataCategory::UserReportV2);
898            let user_report_v2_limits = self
899                .check
900                .apply(item_scoping, summary.user_report_quantity)
901                .await?;
902            enforcement.user_reports = CategoryLimit::new(
903                DataCategory::UserReportV2,
904                summary.user_report_quantity,
905                user_report_v2_limits.longest(),
906            );
907            rate_limits.merge(user_report_v2_limits);
908        }
909
910        // Handle monitor checkins.
911        if summary.monitor_quantity > 0 {
912            let item_scoping = scoping.item(DataCategory::Monitor);
913            let checkin_limits = self
914                .check
915                .apply(item_scoping, summary.monitor_quantity)
916                .await?;
917            enforcement.check_ins = CategoryLimit::new(
918                DataCategory::Monitor,
919                summary.monitor_quantity,
920                checkin_limits.longest(),
921            );
922            rate_limits.merge(checkin_limits);
923        }
924
925        // Handle spans.
926        if enforcement.is_event_active() {
927            enforcement.spans = enforcement
928                .event
929                .clone_for(DataCategory::Span, summary.span_quantity);
930
931            enforcement.spans_indexed = enforcement
932                .event_indexed
933                .clone_for(DataCategory::SpanIndexed, summary.span_quantity);
934        } else if summary.span_quantity > 0 {
935            let mut span_limits = self
936                .check
937                .apply(scoping.item(DataCategory::Span), summary.span_quantity)
938                .await?;
939            enforcement.spans = CategoryLimit::new(
940                DataCategory::Span,
941                summary.span_quantity,
942                span_limits.longest(),
943            );
944
945            if span_limits.is_empty() {
946                span_limits.merge(
947                    self.check
948                        .apply(
949                            scoping.item(DataCategory::SpanIndexed),
950                            summary.span_quantity,
951                        )
952                        .await?,
953                );
954            }
955
956            enforcement.spans_indexed = CategoryLimit::new(
957                DataCategory::SpanIndexed,
958                summary.span_quantity,
959                span_limits.longest(),
960            );
961
962            rate_limits.merge(span_limits);
963        }
964
965        // Handle profile chunks.
966        if summary.profile_chunk_quantity > 0 {
967            let item_scoping = scoping.item(DataCategory::ProfileChunk);
968            let limits = self
969                .check
970                .apply(item_scoping, summary.profile_chunk_quantity)
971                .await?;
972            enforcement.profile_chunks = CategoryLimit::new(
973                DataCategory::ProfileChunk,
974                summary.profile_chunk_quantity,
975                limits.longest(),
976            );
977            rate_limits.merge(limits);
978        }
979
980        if summary.profile_chunk_ui_quantity > 0 {
981            let item_scoping = scoping.item(DataCategory::ProfileChunkUi);
982            let limits = self
983                .check
984                .apply(item_scoping, summary.profile_chunk_ui_quantity)
985                .await?;
986            enforcement.profile_chunks_ui = CategoryLimit::new(
987                DataCategory::ProfileChunkUi,
988                summary.profile_chunk_ui_quantity,
989                limits.longest(),
990            );
991            rate_limits.merge(limits);
992        }
993
994        Ok((enforcement, rate_limits))
995    }
996}
997
998impl<F, E, R> fmt::Debug for EnvelopeLimiter<F, E, R> {
999    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1000        f.debug_struct("EnvelopeLimiter")
1001            .field("event_category", &self.event_category)
1002            .finish()
1003    }
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008
1009    use std::collections::{BTreeMap, BTreeSet};
1010    use std::sync::Arc;
1011
1012    use relay_base_schema::organization::OrganizationId;
1013    use relay_base_schema::project::{ProjectId, ProjectKey};
1014    use relay_metrics::MetricNamespace;
1015    use relay_quotas::RetryAfter;
1016    use relay_system::Addr;
1017    use smallvec::smallvec;
1018    use tokio::sync::Mutex;
1019
1020    use super::*;
1021    use crate::{
1022        envelope::{AttachmentType, ContentType, SourceQuantities},
1023        extractors::RequestMeta,
1024    };
1025
1026    #[tokio::test]
1027    async fn test_format_rate_limits() {
1028        let mut rate_limits = RateLimits::new();
1029
1030        // Add a generic rate limit for all categories.
1031        rate_limits.add(RateLimit {
1032            categories: Default::default(),
1033            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1034            reason_code: Some(ReasonCode::new("my_limit")),
1035            retry_after: RetryAfter::from_secs(42),
1036            namespaces: smallvec![],
1037        });
1038
1039        // Add a more specific rate limit for just one category.
1040        rate_limits.add(RateLimit {
1041            categories: [DataCategory::Transaction, DataCategory::Security].into(),
1042            scope: RateLimitScope::Project(ProjectId::new(21)),
1043            reason_code: None,
1044            retry_after: RetryAfter::from_secs(4711),
1045            namespaces: smallvec![],
1046        });
1047
1048        let formatted = format_rate_limits(&rate_limits);
1049        let expected = "42::organization:my_limit, 4711:transaction;security:project";
1050        assert_eq!(formatted, expected);
1051    }
1052
1053    #[tokio::test]
1054    async fn test_format_rate_limits_namespace() {
1055        let mut rate_limits = RateLimits::new();
1056
1057        // Rate limit with reason code and namespace.
1058        rate_limits.add(RateLimit {
1059            categories: [DataCategory::MetricBucket].into(),
1060            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1061            reason_code: Some(ReasonCode::new("my_limit")),
1062            retry_after: RetryAfter::from_secs(42),
1063            namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1064        });
1065
1066        // Rate limit without reason code.
1067        rate_limits.add(RateLimit {
1068            categories: [DataCategory::MetricBucket].into(),
1069            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1070            reason_code: None,
1071            retry_after: RetryAfter::from_secs(42),
1072            namespaces: smallvec![MetricNamespace::Spans],
1073        });
1074
1075        let formatted = format_rate_limits(&rate_limits);
1076        let expected = "42:metric_bucket:organization:my_limit:custom;spans, 42:metric_bucket:organization::spans";
1077        assert_eq!(formatted, expected);
1078    }
1079
1080    #[tokio::test]
1081    async fn test_parse_invalid_rate_limits() {
1082        let scoping = Scoping {
1083            organization_id: OrganizationId::new(42),
1084            project_id: ProjectId::new(21),
1085            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1086            key_id: Some(17),
1087        };
1088
1089        assert!(parse_rate_limits(&scoping, "").is_ok());
1090        assert!(parse_rate_limits(&scoping, "invalid").is_ok());
1091        assert!(parse_rate_limits(&scoping, ",,,").is_ok());
1092    }
1093
1094    #[tokio::test]
1095    async fn test_parse_rate_limits() {
1096        let scoping = Scoping {
1097            organization_id: OrganizationId::new(42),
1098            project_id: ProjectId::new(21),
1099            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1100            key_id: Some(17),
1101        };
1102
1103        // contains "foobar", an unknown scope that should be mapped to Unknown
1104        let formatted =
1105            "42::organization:my_limit, invalid, 4711:foobar;transaction;security:project";
1106        let rate_limits: Vec<RateLimit> =
1107            parse_rate_limits(&scoping, formatted).into_iter().collect();
1108
1109        assert_eq!(
1110            rate_limits,
1111            vec![
1112                RateLimit {
1113                    categories: Default::default(),
1114                    scope: RateLimitScope::Organization(OrganizationId::new(42)),
1115                    reason_code: Some(ReasonCode::new("my_limit")),
1116                    retry_after: rate_limits[0].retry_after,
1117                    namespaces: smallvec![],
1118                },
1119                RateLimit {
1120                    categories: [
1121                        DataCategory::Unknown,
1122                        DataCategory::Transaction,
1123                        DataCategory::Security,
1124                    ]
1125                    .into(),
1126                    scope: RateLimitScope::Project(ProjectId::new(21)),
1127                    reason_code: None,
1128                    retry_after: rate_limits[1].retry_after,
1129                    namespaces: smallvec![],
1130                }
1131            ]
1132        );
1133
1134        assert_eq!(42, rate_limits[0].retry_after.remaining_seconds());
1135        assert_eq!(4711, rate_limits[1].retry_after.remaining_seconds());
1136    }
1137
1138    #[tokio::test]
1139    async fn test_parse_rate_limits_namespace() {
1140        let scoping = Scoping {
1141            organization_id: OrganizationId::new(42),
1142            project_id: ProjectId::new(21),
1143            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1144            key_id: Some(17),
1145        };
1146
1147        let formatted = "42:metric_bucket:organization::custom;spans";
1148        let rate_limits: Vec<RateLimit> =
1149            parse_rate_limits(&scoping, formatted).into_iter().collect();
1150
1151        assert_eq!(
1152            rate_limits,
1153            vec![RateLimit {
1154                categories: [DataCategory::MetricBucket].into(),
1155                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1156                reason_code: None,
1157                retry_after: rate_limits[0].retry_after,
1158                namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1159            }]
1160        );
1161    }
1162
1163    #[tokio::test]
1164    async fn test_parse_rate_limits_empty_namespace() {
1165        let scoping = Scoping {
1166            organization_id: OrganizationId::new(42),
1167            project_id: ProjectId::new(21),
1168            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1169            key_id: Some(17),
1170        };
1171
1172        // notice the trailing colon
1173        let formatted = "42:metric_bucket:organization:some_reason:";
1174        let rate_limits: Vec<RateLimit> =
1175            parse_rate_limits(&scoping, formatted).into_iter().collect();
1176
1177        assert_eq!(
1178            rate_limits,
1179            vec![RateLimit {
1180                categories: [DataCategory::MetricBucket].into(),
1181                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1182                reason_code: Some(ReasonCode::new("some_reason")),
1183                retry_after: rate_limits[0].retry_after,
1184                namespaces: smallvec![],
1185            }]
1186        );
1187    }
1188
1189    #[tokio::test]
1190    async fn test_parse_rate_limits_only_unknown() {
1191        let scoping = Scoping {
1192            organization_id: OrganizationId::new(42),
1193            project_id: ProjectId::new(21),
1194            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1195            key_id: Some(17),
1196        };
1197
1198        let formatted = "42:foo;bar:organization";
1199        let rate_limits: Vec<RateLimit> =
1200            parse_rate_limits(&scoping, formatted).into_iter().collect();
1201
1202        assert_eq!(
1203            rate_limits,
1204            vec![RateLimit {
1205                categories: [DataCategory::Unknown, DataCategory::Unknown].into(),
1206                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1207                reason_code: None,
1208                retry_after: rate_limits[0].retry_after,
1209                namespaces: smallvec![],
1210            },]
1211        );
1212    }
1213
1214    macro_rules! envelope {
1215        ($( $item_type:ident $( :: $attachment_type:ident )? ),*) => {{
1216            let bytes = "{\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}";
1217            #[allow(unused_mut)]
1218            let mut envelope = Envelope::parse_bytes(bytes.into()).unwrap();
1219            $(
1220                let mut item = Item::new(ItemType::$item_type);
1221                item.set_payload(ContentType::OctetStream, "0123456789");
1222                $( item.set_attachment_type(AttachmentType::$attachment_type); )?
1223                envelope.add_item(item);
1224            )*
1225
1226            let (outcome_aggregator, _) = Addr::custom();
1227
1228            ManagedEnvelope::new(
1229                envelope,
1230                outcome_aggregator,
1231            )
1232        }}
1233    }
1234
1235    fn set_extracted(envelope: &mut Envelope, ty: ItemType) {
1236        envelope
1237            .get_item_by_mut(|item| *item.ty() == ty)
1238            .unwrap()
1239            .set_metrics_extracted(true);
1240    }
1241
1242    fn rate_limit(category: DataCategory) -> RateLimit {
1243        RateLimit {
1244            categories: [category].into(),
1245            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1246            reason_code: None,
1247            retry_after: RetryAfter::from_secs(60),
1248            namespaces: smallvec![],
1249        }
1250    }
1251
1252    #[derive(Debug, Default)]
1253    struct MockLimiter {
1254        denied: Vec<DataCategory>,
1255        called: BTreeMap<DataCategory, usize>,
1256        checked: BTreeSet<DataCategory>,
1257    }
1258
1259    impl MockLimiter {
1260        pub fn deny(mut self, category: DataCategory) -> Self {
1261            self.denied.push(category);
1262            self
1263        }
1264
1265        pub fn check(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, ()> {
1266            let cat = scoping.category;
1267            let previous = self.called.insert(cat, quantity);
1268            assert!(previous.is_none(), "rate limiter invoked twice for {cat}");
1269
1270            let mut limits = RateLimits::new();
1271            if self.denied.contains(&cat) {
1272                limits.add(rate_limit(cat));
1273            }
1274            Ok(limits)
1275        }
1276
1277        #[track_caller]
1278        pub fn assert_call(&mut self, category: DataCategory, expected: usize) {
1279            self.checked.insert(category);
1280
1281            let quantity = self.called.get(&category).copied();
1282            assert_eq!(
1283                quantity,
1284                Some(expected),
1285                "Expected quantity `{expected}` for data category `{category}`, got {quantity:?}."
1286            );
1287        }
1288    }
1289
1290    impl Drop for MockLimiter {
1291        fn drop(&mut self) {
1292            if std::thread::panicking() {
1293                return;
1294            }
1295
1296            for checked in &self.checked {
1297                self.called.remove(checked);
1298            }
1299
1300            if self.called.is_empty() {
1301                return;
1302            }
1303
1304            let not_asserted = self
1305                .called
1306                .iter()
1307                .map(|(k, v)| format!("- {k}: {v}"))
1308                .collect::<Vec<_>>()
1309                .join("\n");
1310
1311            panic!("Following calls to the limiter were not asserted:\n{not_asserted}");
1312        }
1313    }
1314
1315    async fn enforce_and_apply(
1316        mock: Arc<Mutex<MockLimiter>>,
1317        envelope: &mut ManagedEnvelope,
1318        #[allow(unused_variables)] assume_event: Option<DataCategory>,
1319    ) -> (Enforcement, RateLimits) {
1320        let scoping = envelope.scoping();
1321
1322        #[allow(unused_mut)]
1323        let mut limiter = EnvelopeLimiter::new(CheckLimits::All, move |s, q| {
1324            let mock = mock.clone();
1325            async move {
1326                let mut mock = mock.lock().await;
1327                mock.check(s, q)
1328            }
1329        });
1330        #[cfg(feature = "processing")]
1331        if let Some(assume_event) = assume_event {
1332            limiter.assume_event(assume_event);
1333        }
1334
1335        let (enforcement, limits) = limiter
1336            .compute(envelope.envelope_mut(), &scoping)
1337            .await
1338            .unwrap();
1339
1340        // We implemented `clone` only for tests because we don't want to make `apply_with_outcomes`
1341        // &self because we want move semantics to prevent double tracking.
1342        enforcement.clone().apply_with_outcomes(envelope);
1343
1344        (enforcement, limits)
1345    }
1346
1347    fn mock_limiter(category: Option<DataCategory>) -> Arc<Mutex<MockLimiter>> {
1348        let mut mock = MockLimiter::default();
1349        if let Some(category) = category {
1350            mock = mock.deny(category);
1351        }
1352
1353        Arc::new(Mutex::new(mock))
1354    }
1355
1356    #[tokio::test]
1357    async fn test_enforce_pass_empty() {
1358        let mut envelope = envelope![];
1359
1360        let mock = mock_limiter(None);
1361        let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1362
1363        assert!(!limits.is_limited());
1364        assert!(envelope.envelope().is_empty());
1365    }
1366
1367    #[tokio::test]
1368    async fn test_enforce_limit_error_event() {
1369        let mut envelope = envelope![Event];
1370
1371        let mock = mock_limiter(Some(DataCategory::Error));
1372        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1373
1374        assert!(limits.is_limited());
1375        assert!(envelope.envelope().is_empty());
1376        mock.lock().await.assert_call(DataCategory::Error, 1);
1377    }
1378
1379    #[tokio::test]
1380    async fn test_enforce_limit_error_with_attachments() {
1381        let mut envelope = envelope![Event, Attachment];
1382
1383        let mock = mock_limiter(Some(DataCategory::Error));
1384        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1385
1386        assert!(limits.is_limited());
1387        assert!(envelope.envelope().is_empty());
1388        mock.lock().await.assert_call(DataCategory::Error, 1);
1389    }
1390
1391    #[tokio::test]
1392    async fn test_enforce_limit_minidump() {
1393        let mut envelope = envelope![Attachment::Minidump];
1394
1395        let mock = mock_limiter(Some(DataCategory::Error));
1396        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1397
1398        assert!(limits.is_limited());
1399        assert!(envelope.envelope().is_empty());
1400        mock.lock().await.assert_call(DataCategory::Error, 1);
1401    }
1402
1403    #[tokio::test]
1404    async fn test_enforce_limit_attachments() {
1405        let mut envelope = envelope![Attachment::Minidump, Attachment];
1406
1407        let mock = mock_limiter(Some(DataCategory::Attachment));
1408        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1409
1410        // Attachments would be limited, but crash reports create events and are thus allowed.
1411        assert!(limits.is_limited());
1412        assert_eq!(envelope.envelope().len(), 1);
1413        mock.lock().await.assert_call(DataCategory::Error, 1);
1414        mock.lock().await.assert_call(DataCategory::Attachment, 20);
1415    }
1416
1417    /// Limit stand-alone profiles.
1418    #[tokio::test]
1419    async fn test_enforce_limit_profiles() {
1420        let mut envelope = envelope![Profile, Profile];
1421
1422        let mock = mock_limiter(Some(DataCategory::Profile));
1423        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1424
1425        assert!(limits.is_limited());
1426        assert_eq!(envelope.envelope().len(), 0);
1427        mock.lock().await.assert_call(DataCategory::Profile, 2);
1428
1429        assert_eq!(
1430            get_outcomes(enforcement),
1431            vec![
1432                (DataCategory::Profile, 2),
1433                (DataCategory::ProfileIndexed, 2)
1434            ]
1435        );
1436    }
1437
1438    /// Limit profile chunks.
1439    #[tokio::test]
1440    async fn test_enforce_limit_profile_chunks_no_profile_type() {
1441        // In this test we have profile chunks which have not yet been classified, which means they
1442        // should not be rate limited.
1443        let mut envelope = envelope![ProfileChunk, ProfileChunk];
1444
1445        let mock = mock_limiter(Some(DataCategory::ProfileChunk));
1446        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1447        assert!(!limits.is_limited());
1448        assert_eq!(get_outcomes(enforcement), vec![]);
1449
1450        let mock = mock_limiter(Some(DataCategory::ProfileChunkUi));
1451        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1452        assert!(!limits.is_limited());
1453        assert_eq!(get_outcomes(enforcement), vec![]);
1454
1455        assert_eq!(envelope.envelope().len(), 2);
1456    }
1457
1458    #[tokio::test]
1459    async fn test_enforce_limit_profile_chunks_ui() {
1460        let mut envelope = envelope![];
1461
1462        let mut item = Item::new(ItemType::ProfileChunk);
1463        item.set_profile_type(ProfileType::Backend);
1464        envelope.envelope_mut().add_item(item);
1465        let mut item = Item::new(ItemType::ProfileChunk);
1466        item.set_profile_type(ProfileType::Ui);
1467        envelope.envelope_mut().add_item(item);
1468
1469        let mock = mock_limiter(Some(DataCategory::ProfileChunkUi));
1470        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1471
1472        assert!(limits.is_limited());
1473        assert_eq!(envelope.envelope().len(), 1);
1474        mock.lock()
1475            .await
1476            .assert_call(DataCategory::ProfileChunkUi, 1);
1477        mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1478
1479        assert_eq!(
1480            get_outcomes(enforcement),
1481            vec![(DataCategory::ProfileChunkUi, 1)]
1482        );
1483    }
1484
1485    #[tokio::test]
1486    async fn test_enforce_limit_profile_chunks_backend() {
1487        let mut envelope = envelope![];
1488
1489        let mut item = Item::new(ItemType::ProfileChunk);
1490        item.set_profile_type(ProfileType::Backend);
1491        envelope.envelope_mut().add_item(item);
1492        let mut item = Item::new(ItemType::ProfileChunk);
1493        item.set_profile_type(ProfileType::Ui);
1494        envelope.envelope_mut().add_item(item);
1495
1496        let mock = mock_limiter(Some(DataCategory::ProfileChunk));
1497        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1498
1499        assert!(limits.is_limited());
1500        assert_eq!(envelope.envelope().len(), 1);
1501        mock.lock()
1502            .await
1503            .assert_call(DataCategory::ProfileChunkUi, 1);
1504        mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1505
1506        assert_eq!(
1507            get_outcomes(enforcement),
1508            vec![(DataCategory::ProfileChunk, 1)]
1509        );
1510    }
1511
1512    /// Limit replays.
1513    #[tokio::test]
1514    async fn test_enforce_limit_replays() {
1515        let mut envelope = envelope![ReplayEvent, ReplayRecording, ReplayVideo];
1516
1517        let mock = mock_limiter(Some(DataCategory::Replay));
1518        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1519
1520        assert!(limits.is_limited());
1521        assert_eq!(envelope.envelope().len(), 0);
1522        mock.lock().await.assert_call(DataCategory::Replay, 3);
1523
1524        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Replay, 3),]);
1525    }
1526
1527    /// Limit monitor checkins.
1528    #[tokio::test]
1529    async fn test_enforce_limit_monitor_checkins() {
1530        let mut envelope = envelope![CheckIn];
1531
1532        let mock = mock_limiter(Some(DataCategory::Monitor));
1533        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1534
1535        assert!(limits.is_limited());
1536        assert_eq!(envelope.envelope().len(), 0);
1537        mock.lock().await.assert_call(DataCategory::Monitor, 1);
1538
1539        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Monitor, 1)])
1540    }
1541
1542    #[tokio::test]
1543    async fn test_enforce_pass_minidump() {
1544        let mut envelope = envelope![Attachment::Minidump];
1545
1546        let mock = mock_limiter(Some(DataCategory::Attachment));
1547        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1548
1549        // If only crash report attachments are present, we don't emit a rate limit.
1550        assert!(!limits.is_limited());
1551        assert_eq!(envelope.envelope().len(), 1);
1552        mock.lock().await.assert_call(DataCategory::Error, 1);
1553        mock.lock().await.assert_call(DataCategory::Attachment, 10);
1554    }
1555
1556    #[tokio::test]
1557    async fn test_enforce_skip_rate_limited() {
1558        let mut envelope = envelope![];
1559
1560        let mut item = Item::new(ItemType::Attachment);
1561        item.set_payload(ContentType::OctetStream, "0123456789");
1562        item.set_rate_limited(true);
1563        envelope.envelope_mut().add_item(item);
1564
1565        let mock = mock_limiter(Some(DataCategory::Error));
1566        let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1567
1568        assert!(!limits.is_limited()); // No new rate limits applied.
1569        assert_eq!(envelope.envelope().len(), 1); // The item was retained
1570    }
1571
1572    #[tokio::test]
1573    async fn test_enforce_pass_sessions() {
1574        let mut envelope = envelope![Session, Session, Session];
1575
1576        let mock = mock_limiter(Some(DataCategory::Error));
1577        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1578
1579        // If only crash report attachments are present, we don't emit a rate limit.
1580        assert!(!limits.is_limited());
1581        assert_eq!(envelope.envelope().len(), 3);
1582        mock.lock().await.assert_call(DataCategory::Session, 3);
1583    }
1584
1585    #[tokio::test]
1586    async fn test_enforce_limit_sessions() {
1587        let mut envelope = envelope![Session, Session, Event];
1588
1589        let mock = mock_limiter(Some(DataCategory::Session));
1590        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1591
1592        // If only crash report attachments are present, we don't emit a rate limit.
1593        assert!(limits.is_limited());
1594        assert_eq!(envelope.envelope().len(), 1);
1595        mock.lock().await.assert_call(DataCategory::Error, 1);
1596        mock.lock().await.assert_call(DataCategory::Session, 2);
1597    }
1598
1599    #[tokio::test]
1600    #[cfg(feature = "processing")]
1601    async fn test_enforce_limit_assumed_event() {
1602        let mut envelope = envelope![];
1603
1604        let mock = mock_limiter(Some(DataCategory::Transaction));
1605        let (_, limits) =
1606            enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Transaction)).await;
1607
1608        assert!(limits.is_limited());
1609        assert!(envelope.envelope().is_empty()); // obviously
1610        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1611    }
1612
1613    #[tokio::test]
1614    #[cfg(feature = "processing")]
1615    async fn test_enforce_limit_assumed_attachments() {
1616        let mut envelope = envelope![Attachment, Attachment];
1617
1618        let mock = mock_limiter(Some(DataCategory::Error));
1619        let (_, limits) =
1620            enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Error)).await;
1621
1622        assert!(limits.is_limited());
1623        assert!(envelope.envelope().is_empty());
1624        mock.lock().await.assert_call(DataCategory::Error, 1);
1625    }
1626
1627    #[tokio::test]
1628    async fn test_enforce_transaction() {
1629        let mut envelope = envelope![Transaction];
1630
1631        let mock = mock_limiter(Some(DataCategory::Transaction));
1632        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1633
1634        assert!(limits.is_limited());
1635        assert!(enforcement.event_indexed.is_active());
1636        assert!(enforcement.event.is_active());
1637        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1638
1639        assert_eq!(
1640            get_outcomes(enforcement),
1641            vec![
1642                (DataCategory::Transaction, 1),
1643                (DataCategory::TransactionIndexed, 1),
1644                (DataCategory::Span, 1),
1645                (DataCategory::SpanIndexed, 1),
1646            ]
1647        );
1648    }
1649
1650    #[tokio::test]
1651    async fn test_enforce_transaction_non_indexed() {
1652        let mut envelope = envelope![Transaction, Profile];
1653        let scoping = envelope.scoping();
1654
1655        let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1656
1657        let mock_clone = mock.clone();
1658        let limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, move |s, q| {
1659            let mock_clone = mock_clone.clone();
1660            async move {
1661                let mut mock = mock_clone.lock().await;
1662                mock.check(s, q)
1663            }
1664        });
1665        let (enforcement, limits) = limiter
1666            .compute(envelope.envelope_mut(), &scoping)
1667            .await
1668            .unwrap();
1669        enforcement.clone().apply_with_outcomes(&mut envelope);
1670
1671        assert!(!limits.is_limited());
1672        assert!(!enforcement.event_indexed.is_active());
1673        assert!(!enforcement.event.is_active());
1674        assert!(!enforcement.profiles_indexed.is_active());
1675        assert!(!enforcement.profiles.is_active());
1676        assert!(!enforcement.spans.is_active());
1677        assert!(!enforcement.spans_indexed.is_active());
1678        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1679        mock.lock().await.assert_call(DataCategory::Profile, 1);
1680        mock.lock().await.assert_call(DataCategory::Span, 1);
1681    }
1682
1683    #[tokio::test]
1684    async fn test_enforce_transaction_no_indexing_quota() {
1685        let mut envelope = envelope![Transaction];
1686
1687        let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1688        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1689
1690        assert!(limits.is_limited());
1691        assert!(enforcement.event_indexed.is_active());
1692        assert!(!enforcement.event.is_active());
1693        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1694        mock.lock()
1695            .await
1696            .assert_call(DataCategory::TransactionIndexed, 1);
1697    }
1698
1699    #[tokio::test]
1700    async fn test_enforce_transaction_attachment_enforced() {
1701        let mut envelope = envelope![Transaction, Attachment];
1702
1703        let mock = mock_limiter(Some(DataCategory::Transaction));
1704        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1705
1706        assert!(enforcement.event.is_active());
1707        assert!(enforcement.attachments.is_active());
1708        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1709    }
1710
1711    fn get_outcomes(enforcement: Enforcement) -> Vec<(DataCategory, usize)> {
1712        enforcement
1713            .get_outcomes()
1714            .map(|(_, data_category, quantity)| (data_category, quantity))
1715            .collect::<Vec<_>>()
1716    }
1717
1718    #[tokio::test]
1719    async fn test_enforce_transaction_profile_enforced() {
1720        let mut envelope = envelope![Transaction, Profile];
1721
1722        let mock = mock_limiter(Some(DataCategory::Transaction));
1723        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1724
1725        assert!(enforcement.event.is_active());
1726        assert!(enforcement.profiles.is_active());
1727        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1728
1729        assert_eq!(
1730            get_outcomes(enforcement),
1731            vec![
1732                (DataCategory::Transaction, 1),
1733                (DataCategory::TransactionIndexed, 1),
1734                (DataCategory::Profile, 1),
1735                (DataCategory::ProfileIndexed, 1),
1736                (DataCategory::Span, 1),
1737                (DataCategory::SpanIndexed, 1),
1738            ]
1739        );
1740    }
1741
1742    #[tokio::test]
1743    async fn test_enforce_transaction_standalone_profile_enforced() {
1744        // When the transaction is sampled, the profile survives as standalone.
1745        let mut envelope = envelope![Profile];
1746
1747        let mock = mock_limiter(Some(DataCategory::Transaction));
1748        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1749
1750        assert!(enforcement.profiles.is_active());
1751        mock.lock().await.assert_call(DataCategory::Profile, 1);
1752        mock.lock().await.assert_call(DataCategory::Transaction, 0);
1753
1754        assert_eq!(
1755            get_outcomes(enforcement),
1756            vec![
1757                (DataCategory::Profile, 1),
1758                (DataCategory::ProfileIndexed, 1),
1759            ]
1760        );
1761    }
1762
1763    #[tokio::test]
1764    async fn test_enforce_transaction_attachment_enforced_indexing_quota() {
1765        let mut envelope = envelope![Transaction, Attachment];
1766        set_extracted(envelope.envelope_mut(), ItemType::Transaction);
1767
1768        let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1769        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1770
1771        assert!(!enforcement.event.is_active());
1772        assert!(enforcement.event_indexed.is_active());
1773        assert!(enforcement.attachments.is_active());
1774        assert!(enforcement.attachment_items.is_active());
1775        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1776        mock.lock()
1777            .await
1778            .assert_call(DataCategory::TransactionIndexed, 1);
1779
1780        assert_eq!(
1781            get_outcomes(enforcement),
1782            vec![
1783                (DataCategory::TransactionIndexed, 1),
1784                (DataCategory::Attachment, 10),
1785                (DataCategory::AttachmentItem, 1),
1786                (DataCategory::SpanIndexed, 1),
1787            ]
1788        );
1789    }
1790
1791    #[tokio::test]
1792    async fn test_enforce_span() {
1793        let mut envelope = envelope![Span, Span];
1794
1795        let mock = mock_limiter(Some(DataCategory::Span));
1796        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1797
1798        assert!(limits.is_limited());
1799        assert!(enforcement.spans_indexed.is_active());
1800        assert!(enforcement.spans.is_active());
1801        mock.lock().await.assert_call(DataCategory::Span, 2);
1802
1803        assert_eq!(
1804            get_outcomes(enforcement),
1805            vec![(DataCategory::Span, 2), (DataCategory::SpanIndexed, 2)]
1806        );
1807    }
1808
1809    #[tokio::test]
1810    async fn test_enforce_span_no_indexing_quota() {
1811        let mut envelope = envelope![Span, Span];
1812
1813        let mock = mock_limiter(Some(DataCategory::SpanIndexed));
1814        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1815
1816        assert!(limits.is_limited());
1817        assert!(enforcement.spans_indexed.is_active());
1818        assert!(!enforcement.spans.is_active());
1819        mock.lock().await.assert_call(DataCategory::Span, 2);
1820        mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
1821
1822        assert_eq!(
1823            get_outcomes(enforcement),
1824            vec![(DataCategory::SpanIndexed, 2)]
1825        );
1826    }
1827
1828    #[tokio::test]
1829    async fn test_enforce_span_metrics_extracted_no_indexing_quota() {
1830        let mut envelope = envelope![Span, Span];
1831        set_extracted(envelope.envelope_mut(), ItemType::Span);
1832
1833        let mock = mock_limiter(Some(DataCategory::SpanIndexed));
1834        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1835
1836        assert!(limits.is_limited());
1837        assert!(enforcement.spans_indexed.is_active());
1838        assert!(!enforcement.spans.is_active());
1839        mock.lock().await.assert_call(DataCategory::Span, 2);
1840        mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
1841
1842        assert_eq!(
1843            get_outcomes(enforcement),
1844            vec![(DataCategory::SpanIndexed, 2)]
1845        );
1846    }
1847
1848    #[test]
1849    fn test_source_quantity_for_total_quantity() {
1850        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
1851            .parse()
1852            .unwrap();
1853        let request_meta = RequestMeta::new(dsn);
1854
1855        let mut envelope = Envelope::from_request(None, request_meta);
1856
1857        let mut item = Item::new(ItemType::MetricBuckets);
1858        item.set_source_quantities(SourceQuantities {
1859            transactions: 5,
1860            spans: 0,
1861            profiles: 2,
1862            buckets: 5,
1863        });
1864        envelope.add_item(item);
1865
1866        let mut item = Item::new(ItemType::MetricBuckets);
1867        item.set_source_quantities(SourceQuantities {
1868            transactions: 2,
1869            spans: 0,
1870            profiles: 0,
1871            buckets: 3,
1872        });
1873        envelope.add_item(item);
1874
1875        let summary = EnvelopeSummary::compute(&envelope);
1876
1877        assert_eq!(summary.profile_quantity, 2);
1878        assert_eq!(summary.secondary_transaction_quantity, 7);
1879    }
1880
1881    #[tokio::test]
1882    async fn test_enforce_limit_logs_count() {
1883        let mut envelope = envelope![Log, Log];
1884
1885        let mock = mock_limiter(Some(DataCategory::LogItem));
1886        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1887
1888        assert!(limits.is_limited());
1889        assert_eq!(envelope.envelope().len(), 0);
1890        mock.lock().await.assert_call(DataCategory::LogItem, 2);
1891        mock.lock().await.assert_call(DataCategory::LogByte, 20);
1892
1893        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogItem, 2)]);
1894    }
1895
1896    #[tokio::test]
1897    async fn test_enforce_limit_logs_bytes() {
1898        let mut envelope = envelope![Log, Log];
1899
1900        let mock = mock_limiter(Some(DataCategory::LogByte));
1901        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1902
1903        assert!(limits.is_limited());
1904        assert_eq!(envelope.envelope().len(), 0);
1905        mock.lock().await.assert_call(DataCategory::LogItem, 2);
1906        mock.lock().await.assert_call(DataCategory::LogByte, 20);
1907
1908        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogByte, 20)]);
1909    }
1910}