relay_server/utils/
rate_limits.rs

1use std::fmt::{self, Write};
2use std::future::Future;
3use std::marker::PhantomData;
4
5use relay_profiling::ProfileType;
6use relay_quotas::{
7    DataCategories, DataCategory, ItemScoping, QuotaScope, RateLimit, RateLimitScope, RateLimits,
8    ReasonCode, Scoping,
9};
10
11use crate::envelope::{Envelope, Item, ItemType};
12use crate::managed::ManagedEnvelope;
13use crate::services::outcome::Outcome;
14
15/// Name of the rate limits header.
16pub const RATE_LIMITS_HEADER: &str = "X-Sentry-Rate-Limits";
17
18/// Formats the `X-Sentry-Rate-Limits` header.
19pub fn format_rate_limits(rate_limits: &RateLimits) -> String {
20    let mut header = String::new();
21
22    for rate_limit in rate_limits {
23        if !header.is_empty() {
24            header.push_str(", ");
25        }
26
27        write!(header, "{}:", rate_limit.retry_after.remaining_seconds()).ok();
28
29        for (index, category) in rate_limit.categories.iter().enumerate() {
30            if index > 0 {
31                header.push(';');
32            }
33            write!(header, "{category}").ok();
34        }
35
36        write!(header, ":{}", rate_limit.scope.name()).ok();
37
38        if let Some(ref reason_code) = rate_limit.reason_code {
39            write!(header, ":{reason_code}").ok();
40        } else if !rate_limit.namespaces.is_empty() {
41            write!(header, ":").ok(); // delimits the empty reason code for namespaces
42        }
43
44        for (index, namespace) in rate_limit.namespaces.iter().enumerate() {
45            header.push(if index == 0 { ':' } else { ';' });
46            write!(header, "{namespace}").ok();
47        }
48    }
49
50    header
51}
52
53/// Parses the `X-Sentry-Rate-Limits` header.
54pub fn parse_rate_limits(scoping: &Scoping, string: &str) -> RateLimits {
55    let mut rate_limits = RateLimits::new();
56
57    for limit in string.split(',') {
58        let limit = limit.trim();
59        if limit.is_empty() {
60            continue;
61        }
62
63        let mut components = limit.split(':');
64
65        let retry_after = match components.next().and_then(|s| s.parse().ok()) {
66            Some(retry_after) => retry_after,
67            None => continue,
68        };
69
70        let mut categories = DataCategories::new();
71        for category in components.next().unwrap_or("").split(';') {
72            if !category.is_empty() {
73                categories.push(DataCategory::from_name(category));
74            }
75        }
76
77        let quota_scope = QuotaScope::from_name(components.next().unwrap_or(""));
78        let scope = RateLimitScope::for_quota(*scoping, quota_scope);
79
80        let reason_code = components
81            .next()
82            .filter(|s| !s.is_empty())
83            .map(ReasonCode::new);
84
85        let namespace = components
86            .next()
87            .unwrap_or("")
88            .split(';')
89            .filter(|s| !s.is_empty())
90            .filter_map(|s| s.parse().ok())
91            .collect();
92
93        rate_limits.add(RateLimit {
94            categories,
95            scope,
96            reason_code,
97            retry_after,
98            namespaces: namespace,
99        });
100    }
101
102    rate_limits
103}
104
105/// Infer the data category from an item.
106///
107/// Categories depend mostly on the item type, with a few special cases:
108/// - `Event`: the category is inferred from the event type. This requires the `event_type` header
109///   to be set on the event item.
110/// - `Attachment`: If the attachment creates an event (e.g. for minidumps), the category is assumed
111///   to be `Error`.
112fn infer_event_category(item: &Item) -> Option<DataCategory> {
113    match item.ty() {
114        ItemType::Event => Some(DataCategory::Error),
115        ItemType::Transaction => Some(DataCategory::Transaction),
116        ItemType::Security | ItemType::RawSecurity => Some(DataCategory::Security),
117        ItemType::UnrealReport => Some(DataCategory::Error),
118        ItemType::UserReportV2 => Some(DataCategory::UserReportV2),
119        ItemType::Attachment if item.creates_event() => Some(DataCategory::Error),
120        ItemType::Attachment => None,
121        ItemType::Session => None,
122        ItemType::Sessions => None,
123        ItemType::Statsd => None,
124        ItemType::MetricBuckets => None,
125        ItemType::FormData => None,
126        ItemType::UserReport => None,
127        ItemType::Profile => None,
128        ItemType::ReplayEvent => None,
129        ItemType::ReplayRecording => None,
130        ItemType::ReplayVideo => None,
131        ItemType::ClientReport => None,
132        ItemType::CheckIn => None,
133        ItemType::Nel => None,
134        ItemType::Log => None,
135        ItemType::OtelLog => None,
136        ItemType::Span => None,
137        ItemType::OtelSpan => None,
138        ItemType::OtelTracesData => None,
139        ItemType::ProfileChunk => None,
140        ItemType::Unknown(_) => None,
141    }
142}
143
144/// A summary of `Envelope` contents.
145///
146/// Summarizes the contained event, size of attachments, session updates, and whether there are
147/// plain attachments. This is used for efficient rate limiting or outcome handling.
148#[non_exhaustive]
149#[derive(Clone, Copy, Debug, Default)]
150pub struct EnvelopeSummary {
151    /// The data category of the event in the envelope. `None` if there is no event.
152    pub event_category: Option<DataCategory>,
153
154    /// The quantity of all attachments combined in bytes.
155    pub attachment_quantity: usize,
156
157    /// The number of attachments.
158    pub attachment_item_quantity: usize,
159
160    /// The number of all session updates.
161    pub session_quantity: usize,
162
163    /// The number of profiles.
164    pub profile_quantity: usize,
165
166    /// The number of replays.
167    pub replay_quantity: usize,
168
169    /// The number of user reports (legacy item type for user feedback).
170    pub user_report_quantity: usize,
171
172    /// The number of monitor check-ins.
173    pub monitor_quantity: usize,
174
175    /// The number of log for the log product sent.
176    pub log_item_quantity: usize,
177
178    /// The number of log bytes for the log product sent, in bytes
179    pub log_byte_quantity: usize,
180
181    /// Secondary number of transactions.
182    ///
183    /// This is 0 for envelopes which contain a transaction,
184    /// only secondary transaction quantity should be tracked here,
185    /// these are for example transaction counts extracted from metrics.
186    ///
187    /// A "primary" transaction is contained within the envelope,
188    /// marking the envelope data category a [`DataCategory::Transaction`].
189    pub secondary_transaction_quantity: usize,
190
191    /// See `secondary_transaction_quantity`.
192    pub secondary_span_quantity: usize,
193
194    /// The number of standalone spans.
195    pub span_quantity: usize,
196
197    /// Indicates that the envelope contains regular attachments that do not create event payloads.
198    pub has_plain_attachments: bool,
199
200    /// The payload size of this envelope.
201    pub payload_size: usize,
202
203    /// The number of profile chunks in this envelope.
204    pub profile_chunk_quantity: usize,
205    /// The number of UI profile chunks in this envelope.
206    pub profile_chunk_ui_quantity: usize,
207}
208
209impl EnvelopeSummary {
210    /// Creates an empty summary.
211    pub fn empty() -> Self {
212        Self::default()
213    }
214
215    /// Creates an envelope summary and aggregates the given envelope.
216    pub fn compute(envelope: &Envelope) -> Self {
217        let mut summary = Self::empty();
218
219        for item in envelope.items() {
220            if item.creates_event() {
221                summary.infer_category(item);
222            } else if item.ty() == &ItemType::Attachment {
223                // Plain attachments do not create events.
224                summary.has_plain_attachments = true;
225            }
226
227            // If the item has been rate limited before, the quota has been consumed and outcomes
228            // emitted. We can skip it here.
229            if item.rate_limited() {
230                continue;
231            }
232
233            if let Some(source_quantities) = item.source_quantities() {
234                summary.secondary_transaction_quantity += source_quantities.transactions;
235                summary.secondary_span_quantity += source_quantities.spans;
236                summary.profile_quantity += source_quantities.profiles;
237            }
238
239            summary.payload_size += item.len();
240
241            for (category, quantity) in item.quantities() {
242                summary.add_quantity(category, quantity);
243            }
244
245            // Special case since v1 and v2 share a data category.
246            // Adding this in add_quantity would include v2 in the count.
247            if item.ty() == &ItemType::UserReport {
248                summary.user_report_quantity += 1;
249            }
250        }
251
252        summary
253    }
254
255    fn add_quantity(&mut self, category: DataCategory, quantity: usize) {
256        let target_quantity = match category {
257            DataCategory::Attachment => &mut self.attachment_quantity,
258            DataCategory::AttachmentItem => &mut self.attachment_item_quantity,
259            DataCategory::Session => &mut self.session_quantity,
260            DataCategory::Profile => &mut self.profile_quantity,
261            DataCategory::Replay => &mut self.replay_quantity,
262            DataCategory::DoNotUseReplayVideo => &mut self.replay_quantity,
263            DataCategory::Monitor => &mut self.monitor_quantity,
264            DataCategory::Span => &mut self.span_quantity,
265            DataCategory::LogItem => &mut self.log_item_quantity,
266            DataCategory::LogByte => &mut self.log_byte_quantity,
267            DataCategory::ProfileChunk => &mut self.profile_chunk_quantity,
268            DataCategory::ProfileChunkUi => &mut self.profile_chunk_ui_quantity,
269            // TODO: This catch-all return looks dangerous
270            _ => return,
271        };
272        *target_quantity += quantity;
273    }
274
275    /// Infers the appropriate [`DataCategory`] for the envelope [`Item`].
276    ///
277    /// The inferred category is only applied to the [`EnvelopeSummary`] if there is not yet
278    /// a category set.
279    fn infer_category(&mut self, item: &Item) {
280        if matches!(self.event_category, None | Some(DataCategory::Default)) {
281            if let Some(category) = infer_event_category(item) {
282                self.event_category = Some(category);
283            }
284        }
285    }
286}
287
288/// Rate limiting information for a data category.
289#[derive(Debug)]
290#[cfg_attr(test, derive(Clone))]
291pub struct CategoryLimit {
292    /// The limited data category.
293    category: DataCategory,
294    /// The total rate limited quantity across all items.
295    ///
296    /// This will be `0` if nothing was rate limited.
297    quantity: usize,
298    /// The reason code of the applied rate limit.
299    ///
300    /// Defaults to `None` if the quota does not declare a reason code.
301    reason_code: Option<ReasonCode>,
302}
303
304impl CategoryLimit {
305    /// Creates a new `CategoryLimit`.
306    ///
307    /// Returns an inactive limit if `quantity` is `0` or `rate_limit` is `None`.
308    fn new(category: DataCategory, quantity: usize, rate_limit: Option<&RateLimit>) -> Self {
309        match rate_limit {
310            Some(limit) => Self {
311                category,
312                quantity,
313                reason_code: limit.reason_code.clone(),
314            },
315            None => Self::default(),
316        }
317    }
318
319    /// Recreates the category limit, if active, for a new category with the same reason.
320    pub fn clone_for(&self, category: DataCategory, quantity: usize) -> CategoryLimit {
321        if !self.is_active() {
322            return Self::default();
323        }
324
325        Self {
326            category,
327            quantity,
328            reason_code: self.reason_code.clone(),
329        }
330    }
331
332    /// Returns `true` if this is an active limit.
333    ///
334    /// This indicates that the category is limited and a certain quantity is removed from the
335    /// Envelope. If the limit is inactive, there is no change.
336    pub fn is_active(&self) -> bool {
337        self.quantity > 0
338    }
339}
340
341impl Default for CategoryLimit {
342    fn default() -> Self {
343        Self {
344            category: DataCategory::Default,
345            quantity: 0,
346            reason_code: None,
347        }
348    }
349}
350
351/// Information on the limited quantities returned by [`EnvelopeLimiter::compute`].
352#[derive(Default, Debug)]
353#[cfg_attr(test, derive(Clone))]
354pub struct Enforcement {
355    /// The event item rate limit.
356    pub event: CategoryLimit,
357    /// The rate limit for the indexed category of the event.
358    pub event_indexed: CategoryLimit,
359    /// The combined attachment bytes rate limit.
360    pub attachments: CategoryLimit,
361    /// The combined attachment item rate limit.
362    pub attachment_items: CategoryLimit,
363    /// The combined session item rate limit.
364    pub sessions: CategoryLimit,
365    /// The combined profile item rate limit.
366    pub profiles: CategoryLimit,
367    /// The rate limit for the indexed profiles category.
368    pub profiles_indexed: CategoryLimit,
369    /// The combined replay item rate limit.
370    pub replays: CategoryLimit,
371    /// The combined check-in item rate limit.
372    pub check_ins: CategoryLimit,
373    /// The combined logs (our product logs) rate limit.
374    pub log_items: CategoryLimit,
375    /// The combined logs (our product logs) rate limit.
376    pub log_bytes: CategoryLimit,
377    /// The combined spans rate limit.
378    pub spans: CategoryLimit,
379    /// The rate limit for the indexed span category.
380    pub spans_indexed: CategoryLimit,
381    /// The rate limit for user report v1.
382    pub user_reports: CategoryLimit,
383    /// The combined profile chunk item rate limit.
384    pub profile_chunks: CategoryLimit,
385    /// The combined profile chunk ui item rate limit.
386    pub profile_chunks_ui: CategoryLimit,
387}
388
389impl Enforcement {
390    /// Returns the `CategoryLimit` for the event.
391    ///
392    /// `None` if the event is not rate limited.
393    pub fn active_event(&self) -> Option<&CategoryLimit> {
394        if self.event.is_active() {
395            Some(&self.event)
396        } else if self.event_indexed.is_active() {
397            Some(&self.event_indexed)
398        } else {
399            None
400        }
401    }
402
403    /// Returns `true` if the event is rate limited.
404    pub fn is_event_active(&self) -> bool {
405        self.active_event().is_some()
406    }
407
408    /// Helper for `track_outcomes`.
409    fn get_outcomes(self) -> impl Iterator<Item = (Outcome, DataCategory, usize)> {
410        let Self {
411            event,
412            event_indexed,
413            attachments,
414            attachment_items,
415            sessions: _, // Do not report outcomes for sessions.
416            profiles,
417            profiles_indexed,
418            replays,
419            check_ins,
420            log_items,
421            log_bytes,
422            spans,
423            spans_indexed,
424            user_reports,
425            profile_chunks,
426            profile_chunks_ui,
427        } = self;
428
429        let limits = [
430            event,
431            event_indexed,
432            attachments,
433            attachment_items,
434            profiles,
435            profiles_indexed,
436            replays,
437            check_ins,
438            log_items,
439            log_bytes,
440            spans,
441            spans_indexed,
442            user_reports,
443            profile_chunks,
444            profile_chunks_ui,
445        ];
446
447        limits
448            .into_iter()
449            .filter(move |limit| limit.is_active())
450            .map(move |limit| {
451                (
452                    Outcome::RateLimited(limit.reason_code),
453                    limit.category,
454                    limit.quantity,
455                )
456            })
457    }
458
459    /// Applies the [`Enforcement`] on the [`Envelope`] by removing all items that were rate limited
460    /// and emits outcomes for each rate limited category.
461    ///
462    /// # Example
463    ///
464    /// ## Interaction between Events and Attachments
465    ///
466    /// An envelope with an `Error` event and an `Attachment`. Two quotas specify to drop all
467    /// attachments (reason `"a"`) and all errors (reason `"e"`). The result of enforcement will be:
468    ///
469    /// 1. All items are removed from the envelope.
470    /// 2. Enforcements report both the event and the attachment dropped with reason `"e"`, since
471    ///    dropping an event automatically drops all attachments with the same reason.
472    /// 3. Rate limits report the single event limit `"e"`, since attachment limits do not need to
473    ///    be checked in this case.
474    ///
475    /// ## Required Attachments
476    ///
477    /// An envelope with a single Minidump `Attachment`, and a single quota specifying to drop all
478    /// attachments with reason `"a"`:
479    ///
480    /// 1. Since the minidump creates an event and is required for processing, it remains in the
481    ///    envelope and is marked as `rate_limited`.
482    /// 2. Enforcements report the attachment dropped with reason `"a"`.
483    /// 3. Rate limits are empty since it is allowed to send required attachments even when rate
484    ///    limited.
485    ///
486    /// ## Previously Rate Limited Attachments
487    ///
488    /// An envelope with a single item marked as `rate_limited`, and a quota specifying to drop
489    /// everything with reason `"d"`:
490    ///
491    /// 1. The item remains in the envelope.
492    /// 2. Enforcements are empty. Rate limiting has occurred at an earlier stage in the pipeline.
493    /// 3. Rate limits are empty.
494    pub fn apply_with_outcomes(self, envelope: &mut ManagedEnvelope) {
495        envelope
496            .envelope_mut()
497            .retain_items(|item| self.retain_item(item));
498        self.track_outcomes(envelope);
499    }
500
501    /// Returns `true` when an [`Item`] can be retained, `false` otherwise.
502    fn retain_item(&self, item: &mut Item) -> bool {
503        // Remove event items and all items that depend on this event
504        if self.event.is_active() && item.requires_event() {
505            return false;
506        }
507
508        // When checking limits for categories that have an indexed variant,
509        // we only have to check the more specific, the indexed, variant
510        // to determine whether an item is limited.
511        match item.ty() {
512            ItemType::Attachment => {
513                if !(self.attachments.is_active() || self.attachment_items.is_active()) {
514                    return true;
515                }
516                if item.creates_event() {
517                    item.set_rate_limited(true);
518                    true
519                } else {
520                    false
521                }
522            }
523            ItemType::Session => !self.sessions.is_active(),
524            ItemType::Profile => !self.profiles_indexed.is_active(),
525            ItemType::ReplayEvent => !self.replays.is_active(),
526            ItemType::ReplayVideo => !self.replays.is_active(),
527            ItemType::ReplayRecording => !self.replays.is_active(),
528            ItemType::UserReport => !self.user_reports.is_active(),
529            ItemType::CheckIn => !self.check_ins.is_active(),
530            ItemType::OtelLog | ItemType::Log => {
531                !(self.log_items.is_active() || self.log_bytes.is_active())
532            }
533            ItemType::Span | ItemType::OtelSpan | ItemType::OtelTracesData => {
534                !self.spans_indexed.is_active()
535            }
536            ItemType::ProfileChunk => match item.profile_type() {
537                Some(ProfileType::Backend) => !self.profile_chunks.is_active(),
538                Some(ProfileType::Ui) => !self.profile_chunks_ui.is_active(),
539                None => true,
540            },
541            ItemType::Event
542            | ItemType::Transaction
543            | ItemType::Security
544            | ItemType::FormData
545            | ItemType::RawSecurity
546            | ItemType::Nel
547            | ItemType::UnrealReport
548            | ItemType::Sessions
549            | ItemType::Statsd
550            | ItemType::MetricBuckets
551            | ItemType::ClientReport
552            | ItemType::UserReportV2  // This is an event type.
553            | ItemType::Unknown(_) => true,
554        }
555    }
556
557    /// Invokes track outcome on all enforcements reported by the [`EnvelopeLimiter`].
558    ///
559    /// Relay generally does not emit outcomes for sessions, so those are skipped.
560    fn track_outcomes(self, envelope: &mut ManagedEnvelope) {
561        for (outcome, category, quantity) in self.get_outcomes() {
562            envelope.track_outcome(outcome, category, quantity)
563        }
564    }
565}
566
567/// Which limits to check with the [`EnvelopeLimiter`].
568#[derive(Debug, Copy, Clone)]
569pub enum CheckLimits {
570    /// Checks all limits except indexed categories.
571    ///
572    /// In the fast path it is necessary to apply cached rate limits but to not enforce indexed rate limits.
573    /// Because at the time of the check the decision whether an envelope is sampled or not is not yet known.
574    /// Additionally even if the item is later dropped by dynamic sampling, it must still be around to extract metrics
575    /// and cannot be dropped too early.
576    NonIndexed,
577    /// Checks all limits against the envelope.
578    All,
579}
580
581struct Check<F, E, R> {
582    limits: CheckLimits,
583    check: F,
584    _1: PhantomData<E>,
585    _2: PhantomData<R>,
586}
587
588impl<F, E, R> Check<F, E, R>
589where
590    F: FnMut(ItemScoping, usize) -> R,
591    R: Future<Output = Result<RateLimits, E>>,
592{
593    async fn apply(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, E> {
594        if matches!(self.limits, CheckLimits::NonIndexed) && scoping.category.is_indexed() {
595            return Ok(RateLimits::default());
596        }
597
598        (self.check)(scoping, quantity).await
599    }
600}
601
602/// Enforces rate limits with the given `check` function on items in the envelope.
603///
604/// The `check` function is called with the following rules:
605///  - Once for a single event, if present in the envelope.
606///  - Once for all comprised attachments, unless the event was rate limited.
607///  - Once for all comprised sessions.
608///
609/// Items violating the rate limit are removed from the envelope. This follows a set of rules:
610///  - If the event is removed, all items depending on the event are removed (e.g. attachments).
611///  - Attachments are not removed if they create events (e.g. minidumps).
612///  - Sessions are handled separately from all of the above.
613pub struct EnvelopeLimiter<F, E, R> {
614    check: Check<F, E, R>,
615    event_category: Option<DataCategory>,
616}
617
618impl<'a, F, E, R> EnvelopeLimiter<F, E, R>
619where
620    F: FnMut(ItemScoping, usize) -> R,
621    R: Future<Output = Result<RateLimits, E>>,
622{
623    /// Create a new `EnvelopeLimiter` with the given `check` function.
624    pub fn new(limits: CheckLimits, check: F) -> Self {
625        Self {
626            check: Check {
627                check,
628                limits,
629                _1: PhantomData,
630                _2: PhantomData,
631            },
632            event_category: None,
633        }
634    }
635
636    /// Assume an event with the given category, even if no item is present in the envelope.
637    ///
638    /// This ensures that rate limits for the given data category are checked even if there is no
639    /// matching item in the envelope. Other items are handled according to the rules as if the
640    /// event item were present.
641    pub fn assume_event(&mut self, category: DataCategory) {
642        self.event_category = Some(category);
643    }
644
645    /// Process rate limits for the envelope, returning applied limits.
646    ///
647    /// Returns a tuple of `Enforcement` and `RateLimits`:
648    ///
649    /// - Enforcements declare the quantities of categories that have been rate limited with the
650    ///   individual reason codes that caused rate limiting. If multiple rate limits applied to a
651    ///   category, then the longest limit is reported.
652    /// - Rate limits declare all active rate limits, regardless of whether they have been applied
653    ///   to items in the envelope. This excludes rate limits applied to required attachments, since
654    ///   clients are allowed to continue sending them.
655    pub async fn compute(
656        mut self,
657        envelope: &mut Envelope,
658        scoping: &'a Scoping,
659    ) -> Result<(Enforcement, RateLimits), E> {
660        let mut summary = EnvelopeSummary::compute(envelope);
661        summary.event_category = self.event_category.or(summary.event_category);
662
663        let (enforcement, rate_limits) = self.execute(&summary, scoping).await?;
664        Ok((enforcement, rate_limits))
665    }
666
667    async fn execute(
668        &mut self,
669        summary: &EnvelopeSummary,
670        scoping: &'a Scoping,
671    ) -> Result<(Enforcement, RateLimits), E> {
672        let mut rate_limits = RateLimits::new();
673        let mut enforcement = Enforcement::default();
674
675        // Handle event.
676        if let Some(category) = summary.event_category {
677            // Check the broad category for limits.
678            let mut event_limits = self.check.apply(scoping.item(category), 1).await?;
679            enforcement.event = CategoryLimit::new(category, 1, event_limits.longest());
680
681            if let Some(index_category) = category.index_category() {
682                // Check the specific/indexed category for limits only if the specific one has not already
683                // an enforced limit.
684                if event_limits.is_empty() {
685                    event_limits.merge(self.check.apply(scoping.item(index_category), 1).await?);
686                }
687
688                enforcement.event_indexed =
689                    CategoryLimit::new(index_category, 1, event_limits.longest());
690            };
691
692            rate_limits.merge(event_limits);
693        }
694
695        // Handle attachments.
696        if let Some(limit) = enforcement.active_event() {
697            let limit1 = limit.clone_for(DataCategory::Attachment, summary.attachment_quantity);
698            let limit2 = limit.clone_for(
699                DataCategory::AttachmentItem,
700                summary.attachment_item_quantity,
701            );
702            enforcement.attachments = limit1;
703            enforcement.attachment_items = limit2;
704        } else {
705            let mut attachment_limits = RateLimits::new();
706            if summary.attachment_quantity > 0 {
707                let item_scoping = scoping.item(DataCategory::Attachment);
708
709                let attachment_byte_limits = self
710                    .check
711                    .apply(item_scoping, summary.attachment_quantity)
712                    .await?;
713
714                enforcement.attachments = CategoryLimit::new(
715                    DataCategory::Attachment,
716                    summary.attachment_quantity,
717                    attachment_byte_limits.longest(),
718                );
719                attachment_limits.merge(attachment_byte_limits);
720            }
721            if !attachment_limits.is_limited() && summary.attachment_item_quantity > 0 {
722                let item_scoping = scoping.item(DataCategory::AttachmentItem);
723
724                let attachment_item_limits = self
725                    .check
726                    .apply(item_scoping, summary.attachment_item_quantity)
727                    .await?;
728
729                enforcement.attachment_items = CategoryLimit::new(
730                    DataCategory::AttachmentItem,
731                    summary.attachment_item_quantity,
732                    attachment_item_limits.longest(),
733                );
734                attachment_limits.merge(attachment_item_limits);
735            }
736
737            // Only record rate limits for plain attachments. For all other attachments, it's
738            // perfectly "legal" to send them. They will still be discarded in Sentry, but clients
739            // can continue to send them.
740            if summary.has_plain_attachments {
741                rate_limits.merge(attachment_limits);
742            }
743        }
744
745        // Handle sessions.
746        if summary.session_quantity > 0 {
747            let item_scoping = scoping.item(DataCategory::Session);
748            let session_limits = self
749                .check
750                .apply(item_scoping, summary.session_quantity)
751                .await?;
752            enforcement.sessions = CategoryLimit::new(
753                DataCategory::Session,
754                summary.session_quantity,
755                session_limits.longest(),
756            );
757            rate_limits.merge(session_limits);
758        }
759
760        // Handle logs.
761        if summary.log_item_quantity > 0 {
762            let item_scoping = scoping.item(DataCategory::LogItem);
763            let log_limits = self
764                .check
765                .apply(item_scoping, summary.log_item_quantity)
766                .await?;
767            enforcement.log_items = CategoryLimit::new(
768                DataCategory::LogItem,
769                summary.log_item_quantity,
770                log_limits.longest(),
771            );
772            rate_limits.merge(log_limits);
773        }
774        if summary.log_byte_quantity > 0 {
775            let item_scoping = scoping.item(DataCategory::LogByte);
776            let log_limits = self
777                .check
778                .apply(item_scoping, summary.log_byte_quantity)
779                .await?;
780            enforcement.log_bytes = CategoryLimit::new(
781                DataCategory::LogByte,
782                summary.log_byte_quantity,
783                log_limits.longest(),
784            );
785            rate_limits.merge(log_limits);
786        }
787
788        // Handle profiles.
789        if enforcement.is_event_active() {
790            enforcement.profiles = enforcement
791                .event
792                .clone_for(DataCategory::Profile, summary.profile_quantity);
793
794            enforcement.profiles_indexed = enforcement
795                .event_indexed
796                .clone_for(DataCategory::ProfileIndexed, summary.profile_quantity)
797        } else if summary.profile_quantity > 0 {
798            let mut profile_limits = self
799                .check
800                .apply(
801                    scoping.item(DataCategory::Profile),
802                    summary.profile_quantity,
803                )
804                .await?;
805
806            // Profiles can persist in envelopes without transaction if the transaction item
807            // was dropped by dynamic sampling.
808            if profile_limits.is_empty() && summary.event_category.is_none() {
809                profile_limits = self
810                    .check
811                    .apply(scoping.item(DataCategory::Transaction), 0)
812                    .await?;
813            }
814
815            enforcement.profiles = CategoryLimit::new(
816                DataCategory::Profile,
817                summary.profile_quantity,
818                profile_limits.longest(),
819            );
820
821            if profile_limits.is_empty() {
822                profile_limits.merge(
823                    self.check
824                        .apply(
825                            scoping.item(DataCategory::ProfileIndexed),
826                            summary.profile_quantity,
827                        )
828                        .await?,
829                );
830            }
831
832            enforcement.profiles_indexed = CategoryLimit::new(
833                DataCategory::ProfileIndexed,
834                summary.profile_quantity,
835                profile_limits.longest(),
836            );
837
838            rate_limits.merge(profile_limits);
839        }
840
841        // Handle replays.
842        if summary.replay_quantity > 0 {
843            let item_scoping = scoping.item(DataCategory::Replay);
844            let replay_limits = self
845                .check
846                .apply(item_scoping, summary.replay_quantity)
847                .await?;
848            enforcement.replays = CategoryLimit::new(
849                DataCategory::Replay,
850                summary.replay_quantity,
851                replay_limits.longest(),
852            );
853            rate_limits.merge(replay_limits);
854        }
855
856        // Handle user report v1s, which share limits with v2.
857        if summary.user_report_quantity > 0 {
858            let item_scoping = scoping.item(DataCategory::UserReportV2);
859            let user_report_v2_limits = self
860                .check
861                .apply(item_scoping, summary.user_report_quantity)
862                .await?;
863            enforcement.user_reports = CategoryLimit::new(
864                DataCategory::UserReportV2,
865                summary.user_report_quantity,
866                user_report_v2_limits.longest(),
867            );
868            rate_limits.merge(user_report_v2_limits);
869        }
870
871        // Handle monitor checkins.
872        if summary.monitor_quantity > 0 {
873            let item_scoping = scoping.item(DataCategory::Monitor);
874            let checkin_limits = self
875                .check
876                .apply(item_scoping, summary.monitor_quantity)
877                .await?;
878            enforcement.check_ins = CategoryLimit::new(
879                DataCategory::Monitor,
880                summary.monitor_quantity,
881                checkin_limits.longest(),
882            );
883            rate_limits.merge(checkin_limits);
884        }
885
886        // Handle spans.
887        if enforcement.is_event_active() {
888            enforcement.spans = enforcement
889                .event
890                .clone_for(DataCategory::Span, summary.span_quantity);
891
892            enforcement.spans_indexed = enforcement
893                .event_indexed
894                .clone_for(DataCategory::SpanIndexed, summary.span_quantity);
895        } else if summary.span_quantity > 0 {
896            let mut span_limits = self
897                .check
898                .apply(scoping.item(DataCategory::Span), summary.span_quantity)
899                .await?;
900            enforcement.spans = CategoryLimit::new(
901                DataCategory::Span,
902                summary.span_quantity,
903                span_limits.longest(),
904            );
905
906            if span_limits.is_empty() {
907                span_limits.merge(
908                    self.check
909                        .apply(
910                            scoping.item(DataCategory::SpanIndexed),
911                            summary.span_quantity,
912                        )
913                        .await?,
914                );
915            }
916
917            enforcement.spans_indexed = CategoryLimit::new(
918                DataCategory::SpanIndexed,
919                summary.span_quantity,
920                span_limits.longest(),
921            );
922
923            rate_limits.merge(span_limits);
924        }
925
926        // Handle profile chunks.
927        if summary.profile_chunk_quantity > 0 {
928            let item_scoping = scoping.item(DataCategory::ProfileChunk);
929            let limits = self
930                .check
931                .apply(item_scoping, summary.profile_chunk_quantity)
932                .await?;
933            enforcement.profile_chunks = CategoryLimit::new(
934                DataCategory::ProfileChunk,
935                summary.profile_chunk_quantity,
936                limits.longest(),
937            );
938            rate_limits.merge(limits);
939        }
940
941        if summary.profile_chunk_ui_quantity > 0 {
942            let item_scoping = scoping.item(DataCategory::ProfileChunkUi);
943            let limits = self
944                .check
945                .apply(item_scoping, summary.profile_chunk_ui_quantity)
946                .await?;
947            enforcement.profile_chunks_ui = CategoryLimit::new(
948                DataCategory::ProfileChunkUi,
949                summary.profile_chunk_ui_quantity,
950                limits.longest(),
951            );
952            rate_limits.merge(limits);
953        }
954
955        Ok((enforcement, rate_limits))
956    }
957}
958
959impl<F, E, R> fmt::Debug for EnvelopeLimiter<F, E, R> {
960    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
961        f.debug_struct("EnvelopeLimiter")
962            .field("event_category", &self.event_category)
963            .finish()
964    }
965}
966
967#[cfg(test)]
968mod tests {
969
970    use std::collections::{BTreeMap, BTreeSet};
971    use std::sync::Arc;
972
973    use relay_base_schema::organization::OrganizationId;
974    use relay_base_schema::project::{ProjectId, ProjectKey};
975    use relay_metrics::MetricNamespace;
976    use relay_quotas::RetryAfter;
977    use relay_system::Addr;
978    use smallvec::smallvec;
979    use tokio::sync::Mutex;
980
981    use super::*;
982    use crate::{
983        envelope::{AttachmentType, ContentType, SourceQuantities},
984        extractors::RequestMeta,
985    };
986
987    #[tokio::test]
988    async fn test_format_rate_limits() {
989        let mut rate_limits = RateLimits::new();
990
991        // Add a generic rate limit for all categories.
992        rate_limits.add(RateLimit {
993            categories: DataCategories::new(),
994            scope: RateLimitScope::Organization(OrganizationId::new(42)),
995            reason_code: Some(ReasonCode::new("my_limit")),
996            retry_after: RetryAfter::from_secs(42),
997            namespaces: smallvec![],
998        });
999
1000        // Add a more specific rate limit for just one category.
1001        rate_limits.add(RateLimit {
1002            categories: smallvec![DataCategory::Transaction, DataCategory::Security],
1003            scope: RateLimitScope::Project(ProjectId::new(21)),
1004            reason_code: None,
1005            retry_after: RetryAfter::from_secs(4711),
1006            namespaces: smallvec![],
1007        });
1008
1009        let formatted = format_rate_limits(&rate_limits);
1010        let expected = "42::organization:my_limit, 4711:transaction;security:project";
1011        assert_eq!(formatted, expected);
1012    }
1013
1014    #[tokio::test]
1015    async fn test_format_rate_limits_namespace() {
1016        let mut rate_limits = RateLimits::new();
1017
1018        // Rate limit with reason code and namespace.
1019        rate_limits.add(RateLimit {
1020            categories: smallvec![DataCategory::MetricBucket],
1021            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1022            reason_code: Some(ReasonCode::new("my_limit")),
1023            retry_after: RetryAfter::from_secs(42),
1024            namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1025        });
1026
1027        // Rate limit without reason code.
1028        rate_limits.add(RateLimit {
1029            categories: smallvec![DataCategory::MetricBucket],
1030            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1031            reason_code: None,
1032            retry_after: RetryAfter::from_secs(42),
1033            namespaces: smallvec![MetricNamespace::Spans],
1034        });
1035
1036        let formatted = format_rate_limits(&rate_limits);
1037        let expected = "42:metric_bucket:organization:my_limit:custom;spans, 42:metric_bucket:organization::spans";
1038        assert_eq!(formatted, expected);
1039    }
1040
1041    #[tokio::test]
1042    async fn test_parse_invalid_rate_limits() {
1043        let scoping = Scoping {
1044            organization_id: OrganizationId::new(42),
1045            project_id: ProjectId::new(21),
1046            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1047            key_id: Some(17),
1048        };
1049
1050        assert!(parse_rate_limits(&scoping, "").is_ok());
1051        assert!(parse_rate_limits(&scoping, "invalid").is_ok());
1052        assert!(parse_rate_limits(&scoping, ",,,").is_ok());
1053    }
1054
1055    #[tokio::test]
1056    async fn test_parse_rate_limits() {
1057        let scoping = Scoping {
1058            organization_id: OrganizationId::new(42),
1059            project_id: ProjectId::new(21),
1060            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1061            key_id: Some(17),
1062        };
1063
1064        // contains "foobar", an unknown scope that should be mapped to Unknown
1065        let formatted =
1066            "42::organization:my_limit, invalid, 4711:foobar;transaction;security:project";
1067        let rate_limits: Vec<RateLimit> =
1068            parse_rate_limits(&scoping, formatted).into_iter().collect();
1069
1070        assert_eq!(
1071            rate_limits,
1072            vec![
1073                RateLimit {
1074                    categories: DataCategories::new(),
1075                    scope: RateLimitScope::Organization(OrganizationId::new(42)),
1076                    reason_code: Some(ReasonCode::new("my_limit")),
1077                    retry_after: rate_limits[0].retry_after,
1078                    namespaces: smallvec![],
1079                },
1080                RateLimit {
1081                    categories: smallvec![
1082                        DataCategory::Unknown,
1083                        DataCategory::Transaction,
1084                        DataCategory::Security,
1085                    ],
1086                    scope: RateLimitScope::Project(ProjectId::new(21)),
1087                    reason_code: None,
1088                    retry_after: rate_limits[1].retry_after,
1089                    namespaces: smallvec![],
1090                }
1091            ]
1092        );
1093
1094        assert_eq!(42, rate_limits[0].retry_after.remaining_seconds());
1095        assert_eq!(4711, rate_limits[1].retry_after.remaining_seconds());
1096    }
1097
1098    #[tokio::test]
1099    async fn test_parse_rate_limits_namespace() {
1100        let scoping = Scoping {
1101            organization_id: OrganizationId::new(42),
1102            project_id: ProjectId::new(21),
1103            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1104            key_id: Some(17),
1105        };
1106
1107        let formatted = "42:metric_bucket:organization::custom;spans";
1108        let rate_limits: Vec<RateLimit> =
1109            parse_rate_limits(&scoping, formatted).into_iter().collect();
1110
1111        assert_eq!(
1112            rate_limits,
1113            vec![RateLimit {
1114                categories: smallvec![DataCategory::MetricBucket],
1115                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1116                reason_code: None,
1117                retry_after: rate_limits[0].retry_after,
1118                namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1119            }]
1120        );
1121    }
1122
1123    #[tokio::test]
1124    async fn test_parse_rate_limits_empty_namespace() {
1125        let scoping = Scoping {
1126            organization_id: OrganizationId::new(42),
1127            project_id: ProjectId::new(21),
1128            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1129            key_id: Some(17),
1130        };
1131
1132        // notice the trailing colon
1133        let formatted = "42:metric_bucket:organization:some_reason:";
1134        let rate_limits: Vec<RateLimit> =
1135            parse_rate_limits(&scoping, formatted).into_iter().collect();
1136
1137        assert_eq!(
1138            rate_limits,
1139            vec![RateLimit {
1140                categories: smallvec![DataCategory::MetricBucket],
1141                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1142                reason_code: Some(ReasonCode::new("some_reason")),
1143                retry_after: rate_limits[0].retry_after,
1144                namespaces: smallvec![],
1145            }]
1146        );
1147    }
1148
1149    #[tokio::test]
1150    async fn test_parse_rate_limits_only_unknown() {
1151        let scoping = Scoping {
1152            organization_id: OrganizationId::new(42),
1153            project_id: ProjectId::new(21),
1154            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1155            key_id: Some(17),
1156        };
1157
1158        let formatted = "42:foo;bar:organization";
1159        let rate_limits: Vec<RateLimit> =
1160            parse_rate_limits(&scoping, formatted).into_iter().collect();
1161
1162        assert_eq!(
1163            rate_limits,
1164            vec![RateLimit {
1165                categories: smallvec![DataCategory::Unknown, DataCategory::Unknown],
1166                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1167                reason_code: None,
1168                retry_after: rate_limits[0].retry_after,
1169                namespaces: smallvec![],
1170            },]
1171        );
1172    }
1173
1174    macro_rules! envelope {
1175        ($( $item_type:ident $( :: $attachment_type:ident )? ),*) => {{
1176            let bytes = "{\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}";
1177            #[allow(unused_mut)]
1178            let mut envelope = Envelope::parse_bytes(bytes.into()).unwrap();
1179            $(
1180                let mut item = Item::new(ItemType::$item_type);
1181                item.set_payload(ContentType::OctetStream, "0123456789");
1182                $( item.set_attachment_type(AttachmentType::$attachment_type); )?
1183                envelope.add_item(item);
1184            )*
1185
1186            let (outcome_aggregator, _) = Addr::custom();
1187            let (test_store, _) = Addr::custom();
1188
1189            ManagedEnvelope::new(
1190                envelope,
1191                outcome_aggregator,
1192                test_store,
1193            )
1194        }}
1195    }
1196
1197    fn set_extracted(envelope: &mut Envelope, ty: ItemType) {
1198        envelope
1199            .get_item_by_mut(|item| *item.ty() == ty)
1200            .unwrap()
1201            .set_metrics_extracted(true);
1202    }
1203
1204    fn rate_limit(category: DataCategory) -> RateLimit {
1205        RateLimit {
1206            categories: vec![category].into(),
1207            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1208            reason_code: None,
1209            retry_after: RetryAfter::from_secs(60),
1210            namespaces: smallvec![],
1211        }
1212    }
1213
1214    #[derive(Debug, Default)]
1215    struct MockLimiter {
1216        denied: Vec<DataCategory>,
1217        called: BTreeMap<DataCategory, usize>,
1218        checked: BTreeSet<DataCategory>,
1219    }
1220
1221    impl MockLimiter {
1222        pub fn deny(mut self, category: DataCategory) -> Self {
1223            self.denied.push(category);
1224            self
1225        }
1226
1227        pub fn check(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, ()> {
1228            let cat = scoping.category;
1229            let previous = self.called.insert(cat, quantity);
1230            assert!(previous.is_none(), "rate limiter invoked twice for {cat}");
1231
1232            let mut limits = RateLimits::new();
1233            if self.denied.contains(&cat) {
1234                limits.add(rate_limit(cat));
1235            }
1236            Ok(limits)
1237        }
1238
1239        #[track_caller]
1240        pub fn assert_call(&mut self, category: DataCategory, expected: usize) {
1241            self.checked.insert(category);
1242
1243            let quantity = self.called.get(&category).copied();
1244            assert_eq!(
1245                quantity,
1246                Some(expected),
1247                "Expected quantity `{expected}` for data category `{category}`, got {quantity:?}."
1248            );
1249        }
1250    }
1251
1252    impl Drop for MockLimiter {
1253        fn drop(&mut self) {
1254            if std::thread::panicking() {
1255                return;
1256            }
1257
1258            for checked in &self.checked {
1259                self.called.remove(checked);
1260            }
1261
1262            if self.called.is_empty() {
1263                return;
1264            }
1265
1266            let not_asserted = self
1267                .called
1268                .iter()
1269                .map(|(k, v)| format!("- {k}: {v}"))
1270                .collect::<Vec<_>>()
1271                .join("\n");
1272
1273            panic!("Following calls to the limiter were not asserted:\n{not_asserted}");
1274        }
1275    }
1276
1277    async fn enforce_and_apply(
1278        mock: Arc<Mutex<MockLimiter>>,
1279        envelope: &mut ManagedEnvelope,
1280        #[allow(unused_variables)] assume_event: Option<DataCategory>,
1281    ) -> (Enforcement, RateLimits) {
1282        let scoping = envelope.scoping();
1283
1284        #[allow(unused_mut)]
1285        let mut limiter = EnvelopeLimiter::new(CheckLimits::All, move |s, q| {
1286            let mock = mock.clone();
1287            async move {
1288                let mut mock = mock.lock().await;
1289                mock.check(s, q)
1290            }
1291        });
1292        #[cfg(feature = "processing")]
1293        if let Some(assume_event) = assume_event {
1294            limiter.assume_event(assume_event);
1295        }
1296
1297        let (enforcement, limits) = limiter
1298            .compute(envelope.envelope_mut(), &scoping)
1299            .await
1300            .unwrap();
1301
1302        // We implemented `clone` only for tests because we don't want to make `apply_with_outcomes`
1303        // &self because we want move semantics to prevent double tracking.
1304        enforcement.clone().apply_with_outcomes(envelope);
1305
1306        (enforcement, limits)
1307    }
1308
1309    fn mock_limiter(category: Option<DataCategory>) -> Arc<Mutex<MockLimiter>> {
1310        let mut mock = MockLimiter::default();
1311        if let Some(category) = category {
1312            mock = mock.deny(category);
1313        }
1314
1315        Arc::new(Mutex::new(mock))
1316    }
1317
1318    #[tokio::test]
1319    async fn test_enforce_pass_empty() {
1320        let mut envelope = envelope![];
1321
1322        let mock = mock_limiter(None);
1323        let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1324
1325        assert!(!limits.is_limited());
1326        assert!(envelope.envelope().is_empty());
1327    }
1328
1329    #[tokio::test]
1330    async fn test_enforce_limit_error_event() {
1331        let mut envelope = envelope![Event];
1332
1333        let mock = mock_limiter(Some(DataCategory::Error));
1334        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1335
1336        assert!(limits.is_limited());
1337        assert!(envelope.envelope().is_empty());
1338        mock.lock().await.assert_call(DataCategory::Error, 1);
1339    }
1340
1341    #[tokio::test]
1342    async fn test_enforce_limit_error_with_attachments() {
1343        let mut envelope = envelope![Event, Attachment];
1344
1345        let mock = mock_limiter(Some(DataCategory::Error));
1346        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1347
1348        assert!(limits.is_limited());
1349        assert!(envelope.envelope().is_empty());
1350        mock.lock().await.assert_call(DataCategory::Error, 1);
1351    }
1352
1353    #[tokio::test]
1354    async fn test_enforce_limit_minidump() {
1355        let mut envelope = envelope![Attachment::Minidump];
1356
1357        let mock = mock_limiter(Some(DataCategory::Error));
1358        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1359
1360        assert!(limits.is_limited());
1361        assert!(envelope.envelope().is_empty());
1362        mock.lock().await.assert_call(DataCategory::Error, 1);
1363    }
1364
1365    #[tokio::test]
1366    async fn test_enforce_limit_attachments() {
1367        let mut envelope = envelope![Attachment::Minidump, Attachment];
1368
1369        let mock = mock_limiter(Some(DataCategory::Attachment));
1370        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1371
1372        // Attachments would be limited, but crash reports create events and are thus allowed.
1373        assert!(limits.is_limited());
1374        assert_eq!(envelope.envelope().len(), 1);
1375        mock.lock().await.assert_call(DataCategory::Error, 1);
1376        mock.lock().await.assert_call(DataCategory::Attachment, 20);
1377    }
1378
1379    /// Limit stand-alone profiles.
1380    #[tokio::test]
1381    async fn test_enforce_limit_profiles() {
1382        let mut envelope = envelope![Profile, Profile];
1383
1384        let mock = mock_limiter(Some(DataCategory::Profile));
1385        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1386
1387        assert!(limits.is_limited());
1388        assert_eq!(envelope.envelope().len(), 0);
1389        mock.lock().await.assert_call(DataCategory::Profile, 2);
1390
1391        assert_eq!(
1392            get_outcomes(enforcement),
1393            vec![
1394                (DataCategory::Profile, 2),
1395                (DataCategory::ProfileIndexed, 2)
1396            ]
1397        );
1398    }
1399
1400    /// Limit profile chunks.
1401    #[tokio::test]
1402    async fn test_enforce_limit_profile_chunks_no_profile_type() {
1403        // In this test we have profile chunks which have not yet been classified, which means they
1404        // should not be rate limited.
1405        let mut envelope = envelope![ProfileChunk, ProfileChunk];
1406
1407        let mock = mock_limiter(Some(DataCategory::ProfileChunk));
1408        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1409        assert!(!limits.is_limited());
1410        assert_eq!(get_outcomes(enforcement), vec![]);
1411
1412        let mock = mock_limiter(Some(DataCategory::ProfileChunkUi));
1413        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1414        assert!(!limits.is_limited());
1415        assert_eq!(get_outcomes(enforcement), vec![]);
1416
1417        assert_eq!(envelope.envelope().len(), 2);
1418    }
1419
1420    #[tokio::test]
1421    async fn test_enforce_limit_profile_chunks_ui() {
1422        let mut envelope = envelope![];
1423
1424        let mut item = Item::new(ItemType::ProfileChunk);
1425        item.set_profile_type(ProfileType::Backend);
1426        envelope.envelope_mut().add_item(item);
1427        let mut item = Item::new(ItemType::ProfileChunk);
1428        item.set_profile_type(ProfileType::Ui);
1429        envelope.envelope_mut().add_item(item);
1430
1431        let mock = mock_limiter(Some(DataCategory::ProfileChunkUi));
1432        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1433
1434        assert!(limits.is_limited());
1435        assert_eq!(envelope.envelope().len(), 1);
1436        mock.lock()
1437            .await
1438            .assert_call(DataCategory::ProfileChunkUi, 1);
1439        mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1440
1441        assert_eq!(
1442            get_outcomes(enforcement),
1443            vec![(DataCategory::ProfileChunkUi, 1)]
1444        );
1445    }
1446
1447    #[tokio::test]
1448    async fn test_enforce_limit_profile_chunks_backend() {
1449        let mut envelope = envelope![];
1450
1451        let mut item = Item::new(ItemType::ProfileChunk);
1452        item.set_profile_type(ProfileType::Backend);
1453        envelope.envelope_mut().add_item(item);
1454        let mut item = Item::new(ItemType::ProfileChunk);
1455        item.set_profile_type(ProfileType::Ui);
1456        envelope.envelope_mut().add_item(item);
1457
1458        let mock = mock_limiter(Some(DataCategory::ProfileChunk));
1459        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1460
1461        assert!(limits.is_limited());
1462        assert_eq!(envelope.envelope().len(), 1);
1463        mock.lock()
1464            .await
1465            .assert_call(DataCategory::ProfileChunkUi, 1);
1466        mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1467
1468        assert_eq!(
1469            get_outcomes(enforcement),
1470            vec![(DataCategory::ProfileChunk, 1)]
1471        );
1472    }
1473
1474    /// Limit replays.
1475    #[tokio::test]
1476    async fn test_enforce_limit_replays() {
1477        let mut envelope = envelope![ReplayEvent, ReplayRecording, ReplayVideo];
1478
1479        let mock = mock_limiter(Some(DataCategory::Replay));
1480        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1481
1482        assert!(limits.is_limited());
1483        assert_eq!(envelope.envelope().len(), 0);
1484        mock.lock().await.assert_call(DataCategory::Replay, 3);
1485
1486        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Replay, 3),]);
1487    }
1488
1489    /// Limit monitor checkins.
1490    #[tokio::test]
1491    async fn test_enforce_limit_monitor_checkins() {
1492        let mut envelope = envelope![CheckIn];
1493
1494        let mock = mock_limiter(Some(DataCategory::Monitor));
1495        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1496
1497        assert!(limits.is_limited());
1498        assert_eq!(envelope.envelope().len(), 0);
1499        mock.lock().await.assert_call(DataCategory::Monitor, 1);
1500
1501        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Monitor, 1)])
1502    }
1503
1504    #[tokio::test]
1505    async fn test_enforce_pass_minidump() {
1506        let mut envelope = envelope![Attachment::Minidump];
1507
1508        let mock = mock_limiter(Some(DataCategory::Attachment));
1509        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1510
1511        // If only crash report attachments are present, we don't emit a rate limit.
1512        assert!(!limits.is_limited());
1513        assert_eq!(envelope.envelope().len(), 1);
1514        mock.lock().await.assert_call(DataCategory::Error, 1);
1515        mock.lock().await.assert_call(DataCategory::Attachment, 10);
1516    }
1517
1518    #[tokio::test]
1519    async fn test_enforce_skip_rate_limited() {
1520        let mut envelope = envelope![];
1521
1522        let mut item = Item::new(ItemType::Attachment);
1523        item.set_payload(ContentType::OctetStream, "0123456789");
1524        item.set_rate_limited(true);
1525        envelope.envelope_mut().add_item(item);
1526
1527        let mock = mock_limiter(Some(DataCategory::Error));
1528        let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1529
1530        assert!(!limits.is_limited()); // No new rate limits applied.
1531        assert_eq!(envelope.envelope().len(), 1); // The item was retained
1532    }
1533
1534    #[tokio::test]
1535    async fn test_enforce_pass_sessions() {
1536        let mut envelope = envelope![Session, Session, Session];
1537
1538        let mock = mock_limiter(Some(DataCategory::Error));
1539        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1540
1541        // If only crash report attachments are present, we don't emit a rate limit.
1542        assert!(!limits.is_limited());
1543        assert_eq!(envelope.envelope().len(), 3);
1544        mock.lock().await.assert_call(DataCategory::Session, 3);
1545    }
1546
1547    #[tokio::test]
1548    async fn test_enforce_limit_sessions() {
1549        let mut envelope = envelope![Session, Session, Event];
1550
1551        let mock = mock_limiter(Some(DataCategory::Session));
1552        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1553
1554        // If only crash report attachments are present, we don't emit a rate limit.
1555        assert!(limits.is_limited());
1556        assert_eq!(envelope.envelope().len(), 1);
1557        mock.lock().await.assert_call(DataCategory::Error, 1);
1558        mock.lock().await.assert_call(DataCategory::Session, 2);
1559    }
1560
1561    #[tokio::test]
1562    #[cfg(feature = "processing")]
1563    async fn test_enforce_limit_assumed_event() {
1564        let mut envelope = envelope![];
1565
1566        let mock = mock_limiter(Some(DataCategory::Transaction));
1567        let (_, limits) =
1568            enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Transaction)).await;
1569
1570        assert!(limits.is_limited());
1571        assert!(envelope.envelope().is_empty()); // obviously
1572        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1573    }
1574
1575    #[tokio::test]
1576    #[cfg(feature = "processing")]
1577    async fn test_enforce_limit_assumed_attachments() {
1578        let mut envelope = envelope![Attachment, Attachment];
1579
1580        let mock = mock_limiter(Some(DataCategory::Error));
1581        let (_, limits) =
1582            enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Error)).await;
1583
1584        assert!(limits.is_limited());
1585        assert!(envelope.envelope().is_empty());
1586        mock.lock().await.assert_call(DataCategory::Error, 1);
1587    }
1588
1589    #[tokio::test]
1590    async fn test_enforce_transaction() {
1591        let mut envelope = envelope![Transaction];
1592
1593        let mock = mock_limiter(Some(DataCategory::Transaction));
1594        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1595
1596        assert!(limits.is_limited());
1597        assert!(enforcement.event_indexed.is_active());
1598        assert!(enforcement.event.is_active());
1599        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1600
1601        assert_eq!(
1602            get_outcomes(enforcement),
1603            vec![
1604                (DataCategory::Transaction, 1),
1605                (DataCategory::TransactionIndexed, 1),
1606            ]
1607        );
1608    }
1609
1610    #[tokio::test]
1611    async fn test_enforce_transaction_non_indexed() {
1612        let mut envelope = envelope![Transaction, Profile];
1613        let scoping = envelope.scoping();
1614
1615        let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1616
1617        let mock_clone = mock.clone();
1618        let limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, move |s, q| {
1619            let mock_clone = mock_clone.clone();
1620            async move {
1621                let mut mock = mock_clone.lock().await;
1622                mock.check(s, q)
1623            }
1624        });
1625        let (enforcement, limits) = limiter
1626            .compute(envelope.envelope_mut(), &scoping)
1627            .await
1628            .unwrap();
1629        enforcement.clone().apply_with_outcomes(&mut envelope);
1630
1631        assert!(!limits.is_limited());
1632        assert!(!enforcement.event_indexed.is_active());
1633        assert!(!enforcement.event.is_active());
1634        assert!(!enforcement.profiles_indexed.is_active());
1635        assert!(!enforcement.profiles.is_active());
1636        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1637        mock.lock().await.assert_call(DataCategory::Profile, 1);
1638    }
1639
1640    #[tokio::test]
1641    async fn test_enforce_transaction_no_indexing_quota() {
1642        let mut envelope = envelope![Transaction];
1643
1644        let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1645        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1646
1647        assert!(limits.is_limited());
1648        assert!(enforcement.event_indexed.is_active());
1649        assert!(!enforcement.event.is_active());
1650        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1651        mock.lock()
1652            .await
1653            .assert_call(DataCategory::TransactionIndexed, 1);
1654    }
1655
1656    #[tokio::test]
1657    async fn test_enforce_transaction_attachment_enforced() {
1658        let mut envelope = envelope![Transaction, Attachment];
1659
1660        let mock = mock_limiter(Some(DataCategory::Transaction));
1661        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1662
1663        assert!(enforcement.event.is_active());
1664        assert!(enforcement.attachments.is_active());
1665        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1666    }
1667
1668    fn get_outcomes(enforcement: Enforcement) -> Vec<(DataCategory, usize)> {
1669        enforcement
1670            .get_outcomes()
1671            .map(|(_, data_category, quantity)| (data_category, quantity))
1672            .collect::<Vec<_>>()
1673    }
1674
1675    #[tokio::test]
1676    async fn test_enforce_transaction_profile_enforced() {
1677        let mut envelope = envelope![Transaction, Profile];
1678
1679        let mock = mock_limiter(Some(DataCategory::Transaction));
1680        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1681
1682        assert!(enforcement.event.is_active());
1683        assert!(enforcement.profiles.is_active());
1684        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1685
1686        assert_eq!(
1687            get_outcomes(enforcement),
1688            vec![
1689                (DataCategory::Transaction, 1),
1690                (DataCategory::TransactionIndexed, 1),
1691                (DataCategory::Profile, 1),
1692                (DataCategory::ProfileIndexed, 1),
1693            ]
1694        );
1695    }
1696
1697    #[tokio::test]
1698    async fn test_enforce_transaction_standalone_profile_enforced() {
1699        // When the transaction is sampled, the profile survives as standalone.
1700        let mut envelope = envelope![Profile];
1701
1702        let mock = mock_limiter(Some(DataCategory::Transaction));
1703        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1704
1705        assert!(enforcement.profiles.is_active());
1706        mock.lock().await.assert_call(DataCategory::Profile, 1);
1707        mock.lock().await.assert_call(DataCategory::Transaction, 0);
1708
1709        assert_eq!(
1710            get_outcomes(enforcement),
1711            vec![
1712                (DataCategory::Profile, 1),
1713                (DataCategory::ProfileIndexed, 1),
1714            ]
1715        );
1716    }
1717
1718    #[tokio::test]
1719    async fn test_enforce_transaction_attachment_enforced_indexing_quota() {
1720        let mut envelope = envelope![Transaction, Attachment];
1721        set_extracted(envelope.envelope_mut(), ItemType::Transaction);
1722
1723        let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1724        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1725
1726        assert!(!enforcement.event.is_active());
1727        assert!(enforcement.event_indexed.is_active());
1728        assert!(enforcement.attachments.is_active());
1729        assert!(enforcement.attachment_items.is_active());
1730        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1731        mock.lock()
1732            .await
1733            .assert_call(DataCategory::TransactionIndexed, 1);
1734
1735        assert_eq!(
1736            get_outcomes(enforcement),
1737            vec![
1738                (DataCategory::TransactionIndexed, 1),
1739                (DataCategory::Attachment, 10),
1740                (DataCategory::AttachmentItem, 1)
1741            ]
1742        );
1743    }
1744
1745    #[tokio::test]
1746    async fn test_enforce_span() {
1747        let mut envelope = envelope![Span, OtelSpan];
1748
1749        let mock = mock_limiter(Some(DataCategory::Span));
1750        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1751
1752        assert!(limits.is_limited());
1753        assert!(enforcement.spans_indexed.is_active());
1754        assert!(enforcement.spans.is_active());
1755        mock.lock().await.assert_call(DataCategory::Span, 2);
1756
1757        assert_eq!(
1758            get_outcomes(enforcement),
1759            vec![(DataCategory::Span, 2), (DataCategory::SpanIndexed, 2)]
1760        );
1761    }
1762
1763    #[tokio::test]
1764    async fn test_enforce_span_no_indexing_quota() {
1765        let mut envelope = envelope![OtelSpan, Span];
1766
1767        let mock = mock_limiter(Some(DataCategory::SpanIndexed));
1768        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1769
1770        assert!(limits.is_limited());
1771        assert!(enforcement.spans_indexed.is_active());
1772        assert!(!enforcement.spans.is_active());
1773        mock.lock().await.assert_call(DataCategory::Span, 2);
1774        mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
1775
1776        assert_eq!(
1777            get_outcomes(enforcement),
1778            vec![(DataCategory::SpanIndexed, 2)]
1779        );
1780    }
1781
1782    #[tokio::test]
1783    async fn test_enforce_span_metrics_extracted_no_indexing_quota() {
1784        let mut envelope = envelope![Span, OtelSpan];
1785        set_extracted(envelope.envelope_mut(), ItemType::Span);
1786
1787        let mock = mock_limiter(Some(DataCategory::SpanIndexed));
1788        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1789
1790        assert!(limits.is_limited());
1791        assert!(enforcement.spans_indexed.is_active());
1792        assert!(!enforcement.spans.is_active());
1793        mock.lock().await.assert_call(DataCategory::Span, 2);
1794        mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
1795
1796        assert_eq!(
1797            get_outcomes(enforcement),
1798            vec![(DataCategory::SpanIndexed, 2)]
1799        );
1800    }
1801
1802    #[test]
1803    fn test_source_quantity_for_total_quantity() {
1804        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
1805            .parse()
1806            .unwrap();
1807        let request_meta = RequestMeta::new(dsn);
1808
1809        let mut envelope = Envelope::from_request(None, request_meta);
1810
1811        let mut item = Item::new(ItemType::MetricBuckets);
1812        item.set_source_quantities(SourceQuantities {
1813            transactions: 5,
1814            spans: 0,
1815            profiles: 2,
1816            buckets: 5,
1817        });
1818        envelope.add_item(item);
1819
1820        let mut item = Item::new(ItemType::MetricBuckets);
1821        item.set_source_quantities(SourceQuantities {
1822            transactions: 2,
1823            spans: 0,
1824            profiles: 0,
1825            buckets: 3,
1826        });
1827        envelope.add_item(item);
1828
1829        let summary = EnvelopeSummary::compute(&envelope);
1830
1831        assert_eq!(summary.profile_quantity, 2);
1832        assert_eq!(summary.secondary_transaction_quantity, 7);
1833    }
1834
1835    #[tokio::test]
1836    async fn test_enforce_limit_logs_count() {
1837        let mut envelope = envelope![Log, Log];
1838
1839        let mock = mock_limiter(Some(DataCategory::LogItem));
1840        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1841
1842        assert!(limits.is_limited());
1843        assert_eq!(envelope.envelope().len(), 0);
1844        mock.lock().await.assert_call(DataCategory::LogItem, 2);
1845        mock.lock().await.assert_call(DataCategory::LogByte, 20);
1846
1847        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogItem, 2)]);
1848    }
1849
1850    #[tokio::test]
1851    async fn test_enforce_limit_logs_bytes() {
1852        let mut envelope = envelope![Log, Log];
1853
1854        let mock = mock_limiter(Some(DataCategory::LogByte));
1855        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1856
1857        assert!(limits.is_limited());
1858        assert_eq!(envelope.envelope().len(), 0);
1859        mock.lock().await.assert_call(DataCategory::LogItem, 2);
1860        mock.lock().await.assert_call(DataCategory::LogByte, 20);
1861
1862        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogByte, 20)]);
1863    }
1864}