relay_server/utils/
rate_limits.rs

1use std::fmt::{self, Write};
2use std::future::Future;
3use std::marker::PhantomData;
4
5use relay_profiling::ProfileType;
6use relay_quotas::{
7    DataCategories, DataCategory, ItemScoping, QuotaScope, RateLimit, RateLimitScope, RateLimits,
8    ReasonCode, Scoping,
9};
10
11use crate::envelope::{Envelope, Item, ItemType};
12use crate::integrations::Integration;
13use crate::managed::ManagedEnvelope;
14use crate::services::outcome::Outcome;
15
16/// Name of the rate limits header.
17pub const RATE_LIMITS_HEADER: &str = "X-Sentry-Rate-Limits";
18
19/// Formats the `X-Sentry-Rate-Limits` header.
20pub fn format_rate_limits(rate_limits: &RateLimits) -> String {
21    let mut header = String::new();
22
23    for rate_limit in rate_limits {
24        if !header.is_empty() {
25            header.push_str(", ");
26        }
27
28        write!(header, "{}:", rate_limit.retry_after.remaining_seconds()).ok();
29
30        for (index, category) in rate_limit.categories.iter().enumerate() {
31            if index > 0 {
32                header.push(';');
33            }
34            write!(header, "{category}").ok();
35        }
36
37        write!(header, ":{}", rate_limit.scope.name()).ok();
38
39        if let Some(ref reason_code) = rate_limit.reason_code {
40            write!(header, ":{reason_code}").ok();
41        } else if !rate_limit.namespaces.is_empty() {
42            write!(header, ":").ok(); // delimits the empty reason code for namespaces
43        }
44
45        for (index, namespace) in rate_limit.namespaces.iter().enumerate() {
46            header.push(if index == 0 { ':' } else { ';' });
47            write!(header, "{namespace}").ok();
48        }
49    }
50
51    header
52}
53
54/// Parses the `X-Sentry-Rate-Limits` header.
55pub fn parse_rate_limits(scoping: &Scoping, string: &str) -> RateLimits {
56    let mut rate_limits = RateLimits::new();
57
58    for limit in string.split(',') {
59        let limit = limit.trim();
60        if limit.is_empty() {
61            continue;
62        }
63
64        let mut components = limit.split(':');
65
66        let retry_after = match components.next().and_then(|s| s.parse().ok()) {
67            Some(retry_after) => retry_after,
68            None => continue,
69        };
70
71        let mut categories = DataCategories::new();
72        for category in components.next().unwrap_or("").split(';') {
73            if !category.is_empty() {
74                categories.push(DataCategory::from_name(category));
75            }
76        }
77
78        let quota_scope = QuotaScope::from_name(components.next().unwrap_or(""));
79        let scope = RateLimitScope::for_quota(*scoping, quota_scope);
80
81        let reason_code = components
82            .next()
83            .filter(|s| !s.is_empty())
84            .map(ReasonCode::new);
85
86        let namespace = components
87            .next()
88            .unwrap_or("")
89            .split(';')
90            .filter(|s| !s.is_empty())
91            .filter_map(|s| s.parse().ok())
92            .collect();
93
94        rate_limits.add(RateLimit {
95            categories,
96            scope,
97            reason_code,
98            retry_after,
99            namespaces: namespace,
100        });
101    }
102
103    rate_limits
104}
105
106/// Infer the data category from an item.
107///
108/// Categories depend mostly on the item type, with a few special cases:
109/// - `Event`: the category is inferred from the event type. This requires the `event_type` header
110///   to be set on the event item.
111/// - `Attachment`: If the attachment creates an event (e.g. for minidumps), the category is assumed
112///   to be `Error`.
113fn infer_event_category(item: &Item) -> Option<DataCategory> {
114    match item.ty() {
115        ItemType::Event => Some(DataCategory::Error),
116        ItemType::Transaction => Some(DataCategory::Transaction),
117        ItemType::Security | ItemType::RawSecurity => Some(DataCategory::Security),
118        ItemType::UnrealReport => Some(DataCategory::Error),
119        ItemType::UserReportV2 => Some(DataCategory::UserReportV2),
120        ItemType::Attachment if item.creates_event() => Some(DataCategory::Error),
121        ItemType::Attachment => None,
122        ItemType::Session => None,
123        ItemType::Sessions => None,
124        ItemType::Statsd => None,
125        ItemType::MetricBuckets => None,
126        ItemType::FormData => None,
127        ItemType::UserReport => None,
128        ItemType::Profile => None,
129        ItemType::ReplayEvent => None,
130        ItemType::ReplayRecording => None,
131        ItemType::ReplayVideo => None,
132        ItemType::ClientReport => None,
133        ItemType::CheckIn => None,
134        ItemType::Nel => None,
135        ItemType::Log => None,
136        ItemType::TraceMetric => None,
137        ItemType::Span => None,
138        ItemType::ProfileChunk => None,
139        ItemType::Integration => None,
140        ItemType::Unknown(_) => None,
141    }
142}
143
144/// A summary of `Envelope` contents.
145///
146/// Summarizes the contained event, size of attachments, session updates, and whether there are
147/// plain attachments. This is used for efficient rate limiting or outcome handling.
148#[non_exhaustive]
149#[derive(Clone, Copy, Debug, Default)]
150pub struct EnvelopeSummary {
151    /// The data category of the event in the envelope. `None` if there is no event.
152    pub event_category: Option<DataCategory>,
153
154    /// The quantity of all attachments combined in bytes.
155    pub attachment_quantity: usize,
156
157    /// The number of attachments.
158    pub attachment_item_quantity: usize,
159
160    /// The number of all session updates.
161    pub session_quantity: usize,
162
163    /// The number of profiles.
164    pub profile_quantity: usize,
165
166    /// The number of replays.
167    pub replay_quantity: usize,
168
169    /// The number of user reports (legacy item type for user feedback).
170    pub user_report_quantity: usize,
171
172    /// The number of monitor check-ins.
173    pub monitor_quantity: usize,
174
175    /// The number of log for the log product sent.
176    pub log_item_quantity: usize,
177
178    /// The number of log bytes for the log product sent, in bytes
179    pub log_byte_quantity: usize,
180
181    /// Secondary number of transactions.
182    ///
183    /// This is 0 for envelopes which contain a transaction,
184    /// only secondary transaction quantity should be tracked here,
185    /// these are for example transaction counts extracted from metrics.
186    ///
187    /// A "primary" transaction is contained within the envelope,
188    /// marking the envelope data category a [`DataCategory::Transaction`].
189    pub secondary_transaction_quantity: usize,
190
191    /// See `secondary_transaction_quantity`.
192    pub secondary_span_quantity: usize,
193
194    /// The number of standalone spans.
195    pub span_quantity: usize,
196
197    /// Indicates that the envelope contains regular attachments that do not create event payloads.
198    pub has_plain_attachments: bool,
199
200    /// The payload size of this envelope.
201    pub payload_size: usize,
202
203    /// The number of profile chunks in this envelope.
204    pub profile_chunk_quantity: usize,
205    /// The number of UI profile chunks in this envelope.
206    pub profile_chunk_ui_quantity: usize,
207
208    /// The number of trace metrics in this envelope.
209    pub trace_metric_quantity: usize,
210}
211
212impl EnvelopeSummary {
213    /// Creates an empty summary.
214    pub fn empty() -> Self {
215        Self::default()
216    }
217
218    /// Creates an envelope summary and aggregates the given envelope.
219    pub fn compute(envelope: &Envelope) -> Self {
220        let mut summary = Self::empty();
221
222        for item in envelope.items() {
223            if item.creates_event() {
224                summary.infer_category(item);
225            } else if item.ty() == &ItemType::Attachment {
226                // Plain attachments do not create events.
227                summary.has_plain_attachments = true;
228            }
229
230            // If the item has been rate limited before, the quota has been consumed and outcomes
231            // emitted. We can skip it here.
232            if item.rate_limited() {
233                continue;
234            }
235
236            if let Some(source_quantities) = item.source_quantities() {
237                summary.secondary_transaction_quantity += source_quantities.transactions;
238                summary.secondary_span_quantity += source_quantities.spans;
239                summary.profile_quantity += source_quantities.profiles;
240            }
241
242            summary.payload_size += item.len();
243
244            for (category, quantity) in item.quantities() {
245                summary.add_quantity(category, quantity);
246            }
247
248            // Special case since v1 and v2 share a data category.
249            // Adding this in add_quantity would include v2 in the count.
250            if item.ty() == &ItemType::UserReport {
251                summary.user_report_quantity += 1;
252            }
253        }
254
255        summary
256    }
257
258    fn add_quantity(&mut self, category: DataCategory, quantity: usize) {
259        let target_quantity = match category {
260            DataCategory::Attachment => &mut self.attachment_quantity,
261            DataCategory::AttachmentItem => &mut self.attachment_item_quantity,
262            DataCategory::Session => &mut self.session_quantity,
263            DataCategory::Profile => &mut self.profile_quantity,
264            DataCategory::Replay => &mut self.replay_quantity,
265            DataCategory::DoNotUseReplayVideo => &mut self.replay_quantity,
266            DataCategory::Monitor => &mut self.monitor_quantity,
267            DataCategory::Span => &mut self.span_quantity,
268            DataCategory::TraceMetric => &mut self.trace_metric_quantity,
269            DataCategory::LogItem => &mut self.log_item_quantity,
270            DataCategory::LogByte => &mut self.log_byte_quantity,
271            DataCategory::ProfileChunk => &mut self.profile_chunk_quantity,
272            DataCategory::ProfileChunkUi => &mut self.profile_chunk_ui_quantity,
273            // TODO: This catch-all return looks dangerous
274            _ => return,
275        };
276        *target_quantity += quantity;
277    }
278
279    /// Infers the appropriate [`DataCategory`] for the envelope [`Item`].
280    ///
281    /// The inferred category is only applied to the [`EnvelopeSummary`] if there is not yet
282    /// a category set.
283    fn infer_category(&mut self, item: &Item) {
284        if matches!(self.event_category, None | Some(DataCategory::Default))
285            && let Some(category) = infer_event_category(item)
286        {
287            self.event_category = Some(category);
288        }
289    }
290}
291
292/// Rate limiting information for a data category.
293#[derive(Debug)]
294#[cfg_attr(test, derive(Clone))]
295pub struct CategoryLimit {
296    /// The limited data category.
297    category: DataCategory,
298    /// The total rate limited quantity across all items.
299    ///
300    /// This will be `0` if nothing was rate limited.
301    quantity: usize,
302    /// The reason code of the applied rate limit.
303    ///
304    /// Defaults to `None` if the quota does not declare a reason code.
305    reason_code: Option<ReasonCode>,
306}
307
308impl CategoryLimit {
309    /// Creates a new `CategoryLimit`.
310    ///
311    /// Returns an inactive limit if `quantity` is `0` or `rate_limit` is `None`.
312    fn new(category: DataCategory, quantity: usize, rate_limit: Option<&RateLimit>) -> Self {
313        match rate_limit {
314            Some(limit) => Self {
315                category,
316                quantity,
317                reason_code: limit.reason_code.clone(),
318            },
319            None => Self::default(),
320        }
321    }
322
323    /// Recreates the category limit, if active, for a new category with the same reason.
324    pub fn clone_for(&self, category: DataCategory, quantity: usize) -> CategoryLimit {
325        if !self.is_active() {
326            return Self::default();
327        }
328
329        Self {
330            category,
331            quantity,
332            reason_code: self.reason_code.clone(),
333        }
334    }
335
336    /// Returns `true` if this is an active limit.
337    ///
338    /// This indicates that the category is limited and a certain quantity is removed from the
339    /// Envelope. If the limit is inactive, there is no change.
340    pub fn is_active(&self) -> bool {
341        self.quantity > 0
342    }
343}
344
345impl Default for CategoryLimit {
346    fn default() -> Self {
347        Self {
348            category: DataCategory::Default,
349            quantity: 0,
350            reason_code: None,
351        }
352    }
353}
354
355/// Information on the limited quantities returned by [`EnvelopeLimiter::compute`].
356#[derive(Default, Debug)]
357#[cfg_attr(test, derive(Clone))]
358pub struct Enforcement {
359    /// The event item rate limit.
360    pub event: CategoryLimit,
361    /// The rate limit for the indexed category of the event.
362    pub event_indexed: CategoryLimit,
363    /// The combined attachment bytes rate limit.
364    pub attachments: CategoryLimit,
365    /// The combined attachment item rate limit.
366    pub attachment_items: CategoryLimit,
367    /// The combined session item rate limit.
368    pub sessions: CategoryLimit,
369    /// The combined profile item rate limit.
370    pub profiles: CategoryLimit,
371    /// The rate limit for the indexed profiles category.
372    pub profiles_indexed: CategoryLimit,
373    /// The combined replay item rate limit.
374    pub replays: CategoryLimit,
375    /// The combined check-in item rate limit.
376    pub check_ins: CategoryLimit,
377    /// The combined logs (our product logs) rate limit.
378    pub log_items: CategoryLimit,
379    /// The combined logs (our product logs) rate limit.
380    pub log_bytes: CategoryLimit,
381    /// The combined spans rate limit.
382    pub spans: CategoryLimit,
383    /// The rate limit for the indexed span category.
384    pub spans_indexed: CategoryLimit,
385    /// The rate limit for user report v1.
386    pub user_reports: CategoryLimit,
387    /// The combined profile chunk item rate limit.
388    pub profile_chunks: CategoryLimit,
389    /// The combined profile chunk ui item rate limit.
390    pub profile_chunks_ui: CategoryLimit,
391    /// The combined trace metric item rate limit.
392    pub trace_metrics: CategoryLimit,
393}
394
395impl Enforcement {
396    /// Returns the `CategoryLimit` for the event.
397    ///
398    /// `None` if the event is not rate limited.
399    pub fn active_event(&self) -> Option<&CategoryLimit> {
400        if self.event.is_active() {
401            Some(&self.event)
402        } else if self.event_indexed.is_active() {
403            Some(&self.event_indexed)
404        } else {
405            None
406        }
407    }
408
409    /// Returns `true` if the event is rate limited.
410    pub fn is_event_active(&self) -> bool {
411        self.active_event().is_some()
412    }
413
414    /// Helper for `track_outcomes`.
415    fn get_outcomes(self) -> impl Iterator<Item = (Outcome, DataCategory, usize)> {
416        let Self {
417            event,
418            event_indexed,
419            attachments,
420            attachment_items,
421            sessions: _, // Do not report outcomes for sessions.
422            profiles,
423            profiles_indexed,
424            replays,
425            check_ins,
426            log_items,
427            log_bytes,
428            spans,
429            spans_indexed,
430            user_reports,
431            profile_chunks,
432            profile_chunks_ui,
433            trace_metrics,
434        } = self;
435
436        let limits = [
437            event,
438            event_indexed,
439            attachments,
440            attachment_items,
441            profiles,
442            profiles_indexed,
443            replays,
444            check_ins,
445            log_items,
446            log_bytes,
447            spans,
448            spans_indexed,
449            user_reports,
450            profile_chunks,
451            profile_chunks_ui,
452            trace_metrics,
453        ];
454
455        limits
456            .into_iter()
457            .filter(move |limit| limit.is_active())
458            .map(move |limit| {
459                (
460                    Outcome::RateLimited(limit.reason_code),
461                    limit.category,
462                    limit.quantity,
463                )
464            })
465    }
466
467    /// Applies the [`Enforcement`] on the [`Envelope`] by removing all items that were rate limited
468    /// and emits outcomes for each rate limited category.
469    ///
470    /// # Example
471    ///
472    /// ## Interaction between Events and Attachments
473    ///
474    /// An envelope with an `Error` event and an `Attachment`. Two quotas specify to drop all
475    /// attachments (reason `"a"`) and all errors (reason `"e"`). The result of enforcement will be:
476    ///
477    /// 1. All items are removed from the envelope.
478    /// 2. Enforcements report both the event and the attachment dropped with reason `"e"`, since
479    ///    dropping an event automatically drops all attachments with the same reason.
480    /// 3. Rate limits report the single event limit `"e"`, since attachment limits do not need to
481    ///    be checked in this case.
482    ///
483    /// ## Required Attachments
484    ///
485    /// An envelope with a single Minidump `Attachment`, and a single quota specifying to drop all
486    /// attachments with reason `"a"`:
487    ///
488    /// 1. Since the minidump creates an event and is required for processing, it remains in the
489    ///    envelope and is marked as `rate_limited`.
490    /// 2. Enforcements report the attachment dropped with reason `"a"`.
491    /// 3. Rate limits are empty since it is allowed to send required attachments even when rate
492    ///    limited.
493    ///
494    /// ## Previously Rate Limited Attachments
495    ///
496    /// An envelope with a single item marked as `rate_limited`, and a quota specifying to drop
497    /// everything with reason `"d"`:
498    ///
499    /// 1. The item remains in the envelope.
500    /// 2. Enforcements are empty. Rate limiting has occurred at an earlier stage in the pipeline.
501    /// 3. Rate limits are empty.
502    pub fn apply_with_outcomes(self, envelope: &mut ManagedEnvelope) {
503        envelope
504            .envelope_mut()
505            .retain_items(|item| self.retain_item(item));
506        self.track_outcomes(envelope);
507    }
508
509    /// Returns `true` when an [`Item`] can be retained, `false` otherwise.
510    fn retain_item(&self, item: &mut Item) -> bool {
511        // Remove event items and all items that depend on this event
512        if self.event.is_active() && item.requires_event() {
513            return false;
514        }
515
516        // When checking limits for categories that have an indexed variant,
517        // we only have to check the more specific, the indexed, variant
518        // to determine whether an item is limited.
519        match item.ty() {
520            ItemType::Attachment => {
521                if !(self.attachments.is_active() || self.attachment_items.is_active()) {
522                    return true;
523                }
524                if item.creates_event() {
525                    item.set_rate_limited(true);
526                    true
527                } else {
528                    false
529                }
530            }
531            ItemType::Session => !self.sessions.is_active(),
532            ItemType::Profile => !self.profiles_indexed.is_active(),
533            ItemType::ReplayEvent => !self.replays.is_active(),
534            ItemType::ReplayVideo => !self.replays.is_active(),
535            ItemType::ReplayRecording => !self.replays.is_active(),
536            ItemType::UserReport => !self.user_reports.is_active(),
537            ItemType::CheckIn => !self.check_ins.is_active(),
538            ItemType::Log => {
539                !(self.log_items.is_active() || self.log_bytes.is_active())
540            }
541            ItemType::Span => !self.spans_indexed.is_active(),
542            ItemType::ProfileChunk => match item.profile_type() {
543                Some(ProfileType::Backend) => !self.profile_chunks.is_active(),
544                Some(ProfileType::Ui) => !self.profile_chunks_ui.is_active(),
545                None => true,
546            },
547            ItemType::TraceMetric => !self.trace_metrics.is_active(),
548            ItemType::Integration => match item.integration() {
549                Some(Integration::Logs(_)) => !(self.log_items.is_active() || self.log_bytes.is_active()),
550                Some(Integration::Spans(_)) => !self.spans_indexed.is_active(),
551                None => true,
552            },
553            ItemType::Event
554            | ItemType::Transaction
555            | ItemType::Security
556            | ItemType::FormData
557            | ItemType::RawSecurity
558            | ItemType::Nel
559            | ItemType::UnrealReport
560            | ItemType::Sessions
561            | ItemType::Statsd
562            | ItemType::MetricBuckets
563            | ItemType::ClientReport
564            | ItemType::UserReportV2  // This is an event type.
565            | ItemType::Unknown(_) => true,
566        }
567    }
568
569    /// Invokes track outcome on all enforcements reported by the [`EnvelopeLimiter`].
570    ///
571    /// Relay generally does not emit outcomes for sessions, so those are skipped.
572    fn track_outcomes(self, envelope: &mut ManagedEnvelope) {
573        for (outcome, category, quantity) in self.get_outcomes() {
574            envelope.track_outcome(outcome, category, quantity)
575        }
576    }
577}
578
579/// Which limits to check with the [`EnvelopeLimiter`].
580#[derive(Debug, Copy, Clone)]
581pub enum CheckLimits {
582    /// Checks all limits except indexed categories.
583    ///
584    /// In the fast path it is necessary to apply cached rate limits but to not enforce indexed rate limits.
585    /// Because at the time of the check the decision whether an envelope is sampled or not is not yet known.
586    /// Additionally even if the item is later dropped by dynamic sampling, it must still be around to extract metrics
587    /// and cannot be dropped too early.
588    NonIndexed,
589    /// Checks all limits against the envelope.
590    All,
591}
592
593struct Check<F, E, R> {
594    limits: CheckLimits,
595    check: F,
596    _1: PhantomData<E>,
597    _2: PhantomData<R>,
598}
599
600impl<F, E, R> Check<F, E, R>
601where
602    F: FnMut(ItemScoping, usize) -> R,
603    R: Future<Output = Result<RateLimits, E>>,
604{
605    async fn apply(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, E> {
606        if matches!(self.limits, CheckLimits::NonIndexed) && scoping.category.is_indexed() {
607            return Ok(RateLimits::default());
608        }
609
610        (self.check)(scoping, quantity).await
611    }
612}
613
614/// Enforces rate limits with the given `check` function on items in the envelope.
615///
616/// The `check` function is called with the following rules:
617///  - Once for a single event, if present in the envelope.
618///  - Once for all comprised attachments, unless the event was rate limited.
619///  - Once for all comprised sessions.
620///
621/// Items violating the rate limit are removed from the envelope. This follows a set of rules:
622///  - If the event is removed, all items depending on the event are removed (e.g. attachments).
623///  - Attachments are not removed if they create events (e.g. minidumps).
624///  - Sessions are handled separately from all of the above.
625pub struct EnvelopeLimiter<F, E, R> {
626    check: Check<F, E, R>,
627    event_category: Option<DataCategory>,
628}
629
630impl<'a, F, E, R> EnvelopeLimiter<F, E, R>
631where
632    F: FnMut(ItemScoping, usize) -> R,
633    R: Future<Output = Result<RateLimits, E>>,
634{
635    /// Create a new `EnvelopeLimiter` with the given `check` function.
636    pub fn new(limits: CheckLimits, check: F) -> Self {
637        Self {
638            check: Check {
639                check,
640                limits,
641                _1: PhantomData,
642                _2: PhantomData,
643            },
644            event_category: None,
645        }
646    }
647
648    /// Assume an event with the given category, even if no item is present in the envelope.
649    ///
650    /// This ensures that rate limits for the given data category are checked even if there is no
651    /// matching item in the envelope. Other items are handled according to the rules as if the
652    /// event item were present.
653    pub fn assume_event(&mut self, category: DataCategory) {
654        self.event_category = Some(category);
655    }
656
657    /// Process rate limits for the envelope, returning applied limits.
658    ///
659    /// Returns a tuple of `Enforcement` and `RateLimits`:
660    ///
661    /// - Enforcements declare the quantities of categories that have been rate limited with the
662    ///   individual reason codes that caused rate limiting. If multiple rate limits applied to a
663    ///   category, then the longest limit is reported.
664    /// - Rate limits declare all active rate limits, regardless of whether they have been applied
665    ///   to items in the envelope. This excludes rate limits applied to required attachments, since
666    ///   clients are allowed to continue sending them.
667    pub async fn compute(
668        mut self,
669        envelope: &mut Envelope,
670        scoping: &'a Scoping,
671    ) -> Result<(Enforcement, RateLimits), E> {
672        let mut summary = EnvelopeSummary::compute(envelope);
673        summary.event_category = self.event_category.or(summary.event_category);
674
675        let (enforcement, rate_limits) = self.execute(&summary, scoping).await?;
676        Ok((enforcement, rate_limits))
677    }
678
679    async fn execute(
680        &mut self,
681        summary: &EnvelopeSummary,
682        scoping: &'a Scoping,
683    ) -> Result<(Enforcement, RateLimits), E> {
684        let mut rate_limits = RateLimits::new();
685        let mut enforcement = Enforcement::default();
686
687        // Handle event.
688        if let Some(category) = summary.event_category {
689            // Check the broad category for limits.
690            let mut event_limits = self.check.apply(scoping.item(category), 1).await?;
691            enforcement.event = CategoryLimit::new(category, 1, event_limits.longest());
692
693            if let Some(index_category) = category.index_category() {
694                // Check the specific/indexed category for limits only if the specific one has not already
695                // an enforced limit.
696                if event_limits.is_empty() {
697                    event_limits.merge(self.check.apply(scoping.item(index_category), 1).await?);
698                }
699
700                enforcement.event_indexed =
701                    CategoryLimit::new(index_category, 1, event_limits.longest());
702            };
703
704            rate_limits.merge(event_limits);
705        }
706
707        // Handle attachments.
708        if let Some(limit) = enforcement.active_event() {
709            let limit1 = limit.clone_for(DataCategory::Attachment, summary.attachment_quantity);
710            let limit2 = limit.clone_for(
711                DataCategory::AttachmentItem,
712                summary.attachment_item_quantity,
713            );
714            enforcement.attachments = limit1;
715            enforcement.attachment_items = limit2;
716        } else {
717            let mut attachment_limits = RateLimits::new();
718            if summary.attachment_quantity > 0 {
719                let item_scoping = scoping.item(DataCategory::Attachment);
720
721                let attachment_byte_limits = self
722                    .check
723                    .apply(item_scoping, summary.attachment_quantity)
724                    .await?;
725
726                enforcement.attachments = CategoryLimit::new(
727                    DataCategory::Attachment,
728                    summary.attachment_quantity,
729                    attachment_byte_limits.longest(),
730                );
731                enforcement.attachment_items = enforcement.attachments.clone_for(
732                    DataCategory::AttachmentItem,
733                    summary.attachment_item_quantity,
734                );
735                attachment_limits.merge(attachment_byte_limits);
736            }
737            if !attachment_limits.is_limited() && summary.attachment_item_quantity > 0 {
738                let item_scoping = scoping.item(DataCategory::AttachmentItem);
739
740                let attachment_item_limits = self
741                    .check
742                    .apply(item_scoping, summary.attachment_item_quantity)
743                    .await?;
744
745                enforcement.attachment_items = CategoryLimit::new(
746                    DataCategory::AttachmentItem,
747                    summary.attachment_item_quantity,
748                    attachment_item_limits.longest(),
749                );
750                enforcement.attachments = enforcement
751                    .attachment_items
752                    .clone_for(DataCategory::Attachment, summary.attachment_quantity);
753                attachment_limits.merge(attachment_item_limits);
754            }
755
756            // Only record rate limits for plain attachments. For all other attachments, it's
757            // perfectly "legal" to send them. They will still be discarded in Sentry, but clients
758            // can continue to send them.
759            if summary.has_plain_attachments {
760                rate_limits.merge(attachment_limits);
761            }
762        }
763
764        // Handle sessions.
765        if summary.session_quantity > 0 {
766            let item_scoping = scoping.item(DataCategory::Session);
767            let session_limits = self
768                .check
769                .apply(item_scoping, summary.session_quantity)
770                .await?;
771            enforcement.sessions = CategoryLimit::new(
772                DataCategory::Session,
773                summary.session_quantity,
774                session_limits.longest(),
775            );
776            rate_limits.merge(session_limits);
777        }
778
779        // Handle trace metrics.
780        if summary.trace_metric_quantity > 0 {
781            let item_scoping = scoping.item(DataCategory::TraceMetric);
782            let trace_metric_limits = self
783                .check
784                .apply(item_scoping, summary.trace_metric_quantity)
785                .await?;
786            enforcement.trace_metrics = CategoryLimit::new(
787                DataCategory::TraceMetric,
788                summary.trace_metric_quantity,
789                trace_metric_limits.longest(),
790            );
791            rate_limits.merge(trace_metric_limits);
792        }
793
794        // Handle logs.
795        if summary.log_item_quantity > 0 {
796            let item_scoping = scoping.item(DataCategory::LogItem);
797            let log_limits = self
798                .check
799                .apply(item_scoping, summary.log_item_quantity)
800                .await?;
801            enforcement.log_items = CategoryLimit::new(
802                DataCategory::LogItem,
803                summary.log_item_quantity,
804                log_limits.longest(),
805            );
806            rate_limits.merge(log_limits);
807        }
808        if summary.log_byte_quantity > 0 {
809            let item_scoping = scoping.item(DataCategory::LogByte);
810            let log_limits = self
811                .check
812                .apply(item_scoping, summary.log_byte_quantity)
813                .await?;
814            enforcement.log_bytes = CategoryLimit::new(
815                DataCategory::LogByte,
816                summary.log_byte_quantity,
817                log_limits.longest(),
818            );
819            rate_limits.merge(log_limits);
820        }
821
822        // Handle profiles.
823        if enforcement.is_event_active() {
824            enforcement.profiles = enforcement
825                .event
826                .clone_for(DataCategory::Profile, summary.profile_quantity);
827
828            enforcement.profiles_indexed = enforcement
829                .event_indexed
830                .clone_for(DataCategory::ProfileIndexed, summary.profile_quantity)
831        } else if summary.profile_quantity > 0 {
832            let mut profile_limits = self
833                .check
834                .apply(
835                    scoping.item(DataCategory::Profile),
836                    summary.profile_quantity,
837                )
838                .await?;
839
840            // Profiles can persist in envelopes without transaction if the transaction item
841            // was dropped by dynamic sampling.
842            if profile_limits.is_empty() && summary.event_category.is_none() {
843                profile_limits = self
844                    .check
845                    .apply(scoping.item(DataCategory::Transaction), 0)
846                    .await?;
847            }
848
849            enforcement.profiles = CategoryLimit::new(
850                DataCategory::Profile,
851                summary.profile_quantity,
852                profile_limits.longest(),
853            );
854
855            if profile_limits.is_empty() {
856                profile_limits.merge(
857                    self.check
858                        .apply(
859                            scoping.item(DataCategory::ProfileIndexed),
860                            summary.profile_quantity,
861                        )
862                        .await?,
863                );
864            }
865
866            enforcement.profiles_indexed = CategoryLimit::new(
867                DataCategory::ProfileIndexed,
868                summary.profile_quantity,
869                profile_limits.longest(),
870            );
871
872            rate_limits.merge(profile_limits);
873        }
874
875        // Handle replays.
876        if summary.replay_quantity > 0 {
877            let item_scoping = scoping.item(DataCategory::Replay);
878            let replay_limits = self
879                .check
880                .apply(item_scoping, summary.replay_quantity)
881                .await?;
882            enforcement.replays = CategoryLimit::new(
883                DataCategory::Replay,
884                summary.replay_quantity,
885                replay_limits.longest(),
886            );
887            rate_limits.merge(replay_limits);
888        }
889
890        // Handle user report v1s, which share limits with v2.
891        if summary.user_report_quantity > 0 {
892            let item_scoping = scoping.item(DataCategory::UserReportV2);
893            let user_report_v2_limits = self
894                .check
895                .apply(item_scoping, summary.user_report_quantity)
896                .await?;
897            enforcement.user_reports = CategoryLimit::new(
898                DataCategory::UserReportV2,
899                summary.user_report_quantity,
900                user_report_v2_limits.longest(),
901            );
902            rate_limits.merge(user_report_v2_limits);
903        }
904
905        // Handle monitor checkins.
906        if summary.monitor_quantity > 0 {
907            let item_scoping = scoping.item(DataCategory::Monitor);
908            let checkin_limits = self
909                .check
910                .apply(item_scoping, summary.monitor_quantity)
911                .await?;
912            enforcement.check_ins = CategoryLimit::new(
913                DataCategory::Monitor,
914                summary.monitor_quantity,
915                checkin_limits.longest(),
916            );
917            rate_limits.merge(checkin_limits);
918        }
919
920        // Handle spans.
921        if enforcement.is_event_active() {
922            enforcement.spans = enforcement
923                .event
924                .clone_for(DataCategory::Span, summary.span_quantity);
925
926            enforcement.spans_indexed = enforcement
927                .event_indexed
928                .clone_for(DataCategory::SpanIndexed, summary.span_quantity);
929        } else if summary.span_quantity > 0 {
930            let mut span_limits = self
931                .check
932                .apply(scoping.item(DataCategory::Span), summary.span_quantity)
933                .await?;
934            enforcement.spans = CategoryLimit::new(
935                DataCategory::Span,
936                summary.span_quantity,
937                span_limits.longest(),
938            );
939
940            if span_limits.is_empty() {
941                span_limits.merge(
942                    self.check
943                        .apply(
944                            scoping.item(DataCategory::SpanIndexed),
945                            summary.span_quantity,
946                        )
947                        .await?,
948                );
949            }
950
951            enforcement.spans_indexed = CategoryLimit::new(
952                DataCategory::SpanIndexed,
953                summary.span_quantity,
954                span_limits.longest(),
955            );
956
957            rate_limits.merge(span_limits);
958        }
959
960        // Handle profile chunks.
961        if summary.profile_chunk_quantity > 0 {
962            let item_scoping = scoping.item(DataCategory::ProfileChunk);
963            let limits = self
964                .check
965                .apply(item_scoping, summary.profile_chunk_quantity)
966                .await?;
967            enforcement.profile_chunks = CategoryLimit::new(
968                DataCategory::ProfileChunk,
969                summary.profile_chunk_quantity,
970                limits.longest(),
971            );
972            rate_limits.merge(limits);
973        }
974
975        if summary.profile_chunk_ui_quantity > 0 {
976            let item_scoping = scoping.item(DataCategory::ProfileChunkUi);
977            let limits = self
978                .check
979                .apply(item_scoping, summary.profile_chunk_ui_quantity)
980                .await?;
981            enforcement.profile_chunks_ui = CategoryLimit::new(
982                DataCategory::ProfileChunkUi,
983                summary.profile_chunk_ui_quantity,
984                limits.longest(),
985            );
986            rate_limits.merge(limits);
987        }
988
989        Ok((enforcement, rate_limits))
990    }
991}
992
993impl<F, E, R> fmt::Debug for EnvelopeLimiter<F, E, R> {
994    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
995        f.debug_struct("EnvelopeLimiter")
996            .field("event_category", &self.event_category)
997            .finish()
998    }
999}
1000
1001#[cfg(test)]
1002mod tests {
1003
1004    use std::collections::{BTreeMap, BTreeSet};
1005    use std::sync::Arc;
1006
1007    use relay_base_schema::organization::OrganizationId;
1008    use relay_base_schema::project::{ProjectId, ProjectKey};
1009    use relay_metrics::MetricNamespace;
1010    use relay_quotas::RetryAfter;
1011    use relay_system::Addr;
1012    use smallvec::smallvec;
1013    use tokio::sync::Mutex;
1014
1015    use super::*;
1016    use crate::{
1017        envelope::{AttachmentType, ContentType, SourceQuantities},
1018        extractors::RequestMeta,
1019    };
1020
1021    #[tokio::test]
1022    async fn test_format_rate_limits() {
1023        let mut rate_limits = RateLimits::new();
1024
1025        // Add a generic rate limit for all categories.
1026        rate_limits.add(RateLimit {
1027            categories: DataCategories::new(),
1028            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1029            reason_code: Some(ReasonCode::new("my_limit")),
1030            retry_after: RetryAfter::from_secs(42),
1031            namespaces: smallvec![],
1032        });
1033
1034        // Add a more specific rate limit for just one category.
1035        rate_limits.add(RateLimit {
1036            categories: smallvec![DataCategory::Transaction, DataCategory::Security],
1037            scope: RateLimitScope::Project(ProjectId::new(21)),
1038            reason_code: None,
1039            retry_after: RetryAfter::from_secs(4711),
1040            namespaces: smallvec![],
1041        });
1042
1043        let formatted = format_rate_limits(&rate_limits);
1044        let expected = "42::organization:my_limit, 4711:transaction;security:project";
1045        assert_eq!(formatted, expected);
1046    }
1047
1048    #[tokio::test]
1049    async fn test_format_rate_limits_namespace() {
1050        let mut rate_limits = RateLimits::new();
1051
1052        // Rate limit with reason code and namespace.
1053        rate_limits.add(RateLimit {
1054            categories: smallvec![DataCategory::MetricBucket],
1055            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1056            reason_code: Some(ReasonCode::new("my_limit")),
1057            retry_after: RetryAfter::from_secs(42),
1058            namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1059        });
1060
1061        // Rate limit without reason code.
1062        rate_limits.add(RateLimit {
1063            categories: smallvec![DataCategory::MetricBucket],
1064            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1065            reason_code: None,
1066            retry_after: RetryAfter::from_secs(42),
1067            namespaces: smallvec![MetricNamespace::Spans],
1068        });
1069
1070        let formatted = format_rate_limits(&rate_limits);
1071        let expected = "42:metric_bucket:organization:my_limit:custom;spans, 42:metric_bucket:organization::spans";
1072        assert_eq!(formatted, expected);
1073    }
1074
1075    #[tokio::test]
1076    async fn test_parse_invalid_rate_limits() {
1077        let scoping = Scoping {
1078            organization_id: OrganizationId::new(42),
1079            project_id: ProjectId::new(21),
1080            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1081            key_id: Some(17),
1082        };
1083
1084        assert!(parse_rate_limits(&scoping, "").is_ok());
1085        assert!(parse_rate_limits(&scoping, "invalid").is_ok());
1086        assert!(parse_rate_limits(&scoping, ",,,").is_ok());
1087    }
1088
1089    #[tokio::test]
1090    async fn test_parse_rate_limits() {
1091        let scoping = Scoping {
1092            organization_id: OrganizationId::new(42),
1093            project_id: ProjectId::new(21),
1094            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1095            key_id: Some(17),
1096        };
1097
1098        // contains "foobar", an unknown scope that should be mapped to Unknown
1099        let formatted =
1100            "42::organization:my_limit, invalid, 4711:foobar;transaction;security:project";
1101        let rate_limits: Vec<RateLimit> =
1102            parse_rate_limits(&scoping, formatted).into_iter().collect();
1103
1104        assert_eq!(
1105            rate_limits,
1106            vec![
1107                RateLimit {
1108                    categories: DataCategories::new(),
1109                    scope: RateLimitScope::Organization(OrganizationId::new(42)),
1110                    reason_code: Some(ReasonCode::new("my_limit")),
1111                    retry_after: rate_limits[0].retry_after,
1112                    namespaces: smallvec![],
1113                },
1114                RateLimit {
1115                    categories: smallvec![
1116                        DataCategory::Unknown,
1117                        DataCategory::Transaction,
1118                        DataCategory::Security,
1119                    ],
1120                    scope: RateLimitScope::Project(ProjectId::new(21)),
1121                    reason_code: None,
1122                    retry_after: rate_limits[1].retry_after,
1123                    namespaces: smallvec![],
1124                }
1125            ]
1126        );
1127
1128        assert_eq!(42, rate_limits[0].retry_after.remaining_seconds());
1129        assert_eq!(4711, rate_limits[1].retry_after.remaining_seconds());
1130    }
1131
1132    #[tokio::test]
1133    async fn test_parse_rate_limits_namespace() {
1134        let scoping = Scoping {
1135            organization_id: OrganizationId::new(42),
1136            project_id: ProjectId::new(21),
1137            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1138            key_id: Some(17),
1139        };
1140
1141        let formatted = "42:metric_bucket:organization::custom;spans";
1142        let rate_limits: Vec<RateLimit> =
1143            parse_rate_limits(&scoping, formatted).into_iter().collect();
1144
1145        assert_eq!(
1146            rate_limits,
1147            vec![RateLimit {
1148                categories: smallvec![DataCategory::MetricBucket],
1149                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1150                reason_code: None,
1151                retry_after: rate_limits[0].retry_after,
1152                namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1153            }]
1154        );
1155    }
1156
1157    #[tokio::test]
1158    async fn test_parse_rate_limits_empty_namespace() {
1159        let scoping = Scoping {
1160            organization_id: OrganizationId::new(42),
1161            project_id: ProjectId::new(21),
1162            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1163            key_id: Some(17),
1164        };
1165
1166        // notice the trailing colon
1167        let formatted = "42:metric_bucket:organization:some_reason:";
1168        let rate_limits: Vec<RateLimit> =
1169            parse_rate_limits(&scoping, formatted).into_iter().collect();
1170
1171        assert_eq!(
1172            rate_limits,
1173            vec![RateLimit {
1174                categories: smallvec![DataCategory::MetricBucket],
1175                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1176                reason_code: Some(ReasonCode::new("some_reason")),
1177                retry_after: rate_limits[0].retry_after,
1178                namespaces: smallvec![],
1179            }]
1180        );
1181    }
1182
1183    #[tokio::test]
1184    async fn test_parse_rate_limits_only_unknown() {
1185        let scoping = Scoping {
1186            organization_id: OrganizationId::new(42),
1187            project_id: ProjectId::new(21),
1188            project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1189            key_id: Some(17),
1190        };
1191
1192        let formatted = "42:foo;bar:organization";
1193        let rate_limits: Vec<RateLimit> =
1194            parse_rate_limits(&scoping, formatted).into_iter().collect();
1195
1196        assert_eq!(
1197            rate_limits,
1198            vec![RateLimit {
1199                categories: smallvec![DataCategory::Unknown, DataCategory::Unknown],
1200                scope: RateLimitScope::Organization(OrganizationId::new(42)),
1201                reason_code: None,
1202                retry_after: rate_limits[0].retry_after,
1203                namespaces: smallvec![],
1204            },]
1205        );
1206    }
1207
1208    macro_rules! envelope {
1209        ($( $item_type:ident $( :: $attachment_type:ident )? ),*) => {{
1210            let bytes = "{\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}";
1211            #[allow(unused_mut)]
1212            let mut envelope = Envelope::parse_bytes(bytes.into()).unwrap();
1213            $(
1214                let mut item = Item::new(ItemType::$item_type);
1215                item.set_payload(ContentType::OctetStream, "0123456789");
1216                $( item.set_attachment_type(AttachmentType::$attachment_type); )?
1217                envelope.add_item(item);
1218            )*
1219
1220            let (outcome_aggregator, _) = Addr::custom();
1221
1222            ManagedEnvelope::new(
1223                envelope,
1224                outcome_aggregator,
1225            )
1226        }}
1227    }
1228
1229    fn set_extracted(envelope: &mut Envelope, ty: ItemType) {
1230        envelope
1231            .get_item_by_mut(|item| *item.ty() == ty)
1232            .unwrap()
1233            .set_metrics_extracted(true);
1234    }
1235
1236    fn rate_limit(category: DataCategory) -> RateLimit {
1237        RateLimit {
1238            categories: vec![category].into(),
1239            scope: RateLimitScope::Organization(OrganizationId::new(42)),
1240            reason_code: None,
1241            retry_after: RetryAfter::from_secs(60),
1242            namespaces: smallvec![],
1243        }
1244    }
1245
1246    #[derive(Debug, Default)]
1247    struct MockLimiter {
1248        denied: Vec<DataCategory>,
1249        called: BTreeMap<DataCategory, usize>,
1250        checked: BTreeSet<DataCategory>,
1251    }
1252
1253    impl MockLimiter {
1254        pub fn deny(mut self, category: DataCategory) -> Self {
1255            self.denied.push(category);
1256            self
1257        }
1258
1259        pub fn check(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, ()> {
1260            let cat = scoping.category;
1261            let previous = self.called.insert(cat, quantity);
1262            assert!(previous.is_none(), "rate limiter invoked twice for {cat}");
1263
1264            let mut limits = RateLimits::new();
1265            if self.denied.contains(&cat) {
1266                limits.add(rate_limit(cat));
1267            }
1268            Ok(limits)
1269        }
1270
1271        #[track_caller]
1272        pub fn assert_call(&mut self, category: DataCategory, expected: usize) {
1273            self.checked.insert(category);
1274
1275            let quantity = self.called.get(&category).copied();
1276            assert_eq!(
1277                quantity,
1278                Some(expected),
1279                "Expected quantity `{expected}` for data category `{category}`, got {quantity:?}."
1280            );
1281        }
1282    }
1283
1284    impl Drop for MockLimiter {
1285        fn drop(&mut self) {
1286            if std::thread::panicking() {
1287                return;
1288            }
1289
1290            for checked in &self.checked {
1291                self.called.remove(checked);
1292            }
1293
1294            if self.called.is_empty() {
1295                return;
1296            }
1297
1298            let not_asserted = self
1299                .called
1300                .iter()
1301                .map(|(k, v)| format!("- {k}: {v}"))
1302                .collect::<Vec<_>>()
1303                .join("\n");
1304
1305            panic!("Following calls to the limiter were not asserted:\n{not_asserted}");
1306        }
1307    }
1308
1309    async fn enforce_and_apply(
1310        mock: Arc<Mutex<MockLimiter>>,
1311        envelope: &mut ManagedEnvelope,
1312        #[allow(unused_variables)] assume_event: Option<DataCategory>,
1313    ) -> (Enforcement, RateLimits) {
1314        let scoping = envelope.scoping();
1315
1316        #[allow(unused_mut)]
1317        let mut limiter = EnvelopeLimiter::new(CheckLimits::All, move |s, q| {
1318            let mock = mock.clone();
1319            async move {
1320                let mut mock = mock.lock().await;
1321                mock.check(s, q)
1322            }
1323        });
1324        #[cfg(feature = "processing")]
1325        if let Some(assume_event) = assume_event {
1326            limiter.assume_event(assume_event);
1327        }
1328
1329        let (enforcement, limits) = limiter
1330            .compute(envelope.envelope_mut(), &scoping)
1331            .await
1332            .unwrap();
1333
1334        // We implemented `clone` only for tests because we don't want to make `apply_with_outcomes`
1335        // &self because we want move semantics to prevent double tracking.
1336        enforcement.clone().apply_with_outcomes(envelope);
1337
1338        (enforcement, limits)
1339    }
1340
1341    fn mock_limiter(category: Option<DataCategory>) -> Arc<Mutex<MockLimiter>> {
1342        let mut mock = MockLimiter::default();
1343        if let Some(category) = category {
1344            mock = mock.deny(category);
1345        }
1346
1347        Arc::new(Mutex::new(mock))
1348    }
1349
1350    #[tokio::test]
1351    async fn test_enforce_pass_empty() {
1352        let mut envelope = envelope![];
1353
1354        let mock = mock_limiter(None);
1355        let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1356
1357        assert!(!limits.is_limited());
1358        assert!(envelope.envelope().is_empty());
1359    }
1360
1361    #[tokio::test]
1362    async fn test_enforce_limit_error_event() {
1363        let mut envelope = envelope![Event];
1364
1365        let mock = mock_limiter(Some(DataCategory::Error));
1366        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1367
1368        assert!(limits.is_limited());
1369        assert!(envelope.envelope().is_empty());
1370        mock.lock().await.assert_call(DataCategory::Error, 1);
1371    }
1372
1373    #[tokio::test]
1374    async fn test_enforce_limit_error_with_attachments() {
1375        let mut envelope = envelope![Event, Attachment];
1376
1377        let mock = mock_limiter(Some(DataCategory::Error));
1378        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1379
1380        assert!(limits.is_limited());
1381        assert!(envelope.envelope().is_empty());
1382        mock.lock().await.assert_call(DataCategory::Error, 1);
1383    }
1384
1385    #[tokio::test]
1386    async fn test_enforce_limit_minidump() {
1387        let mut envelope = envelope![Attachment::Minidump];
1388
1389        let mock = mock_limiter(Some(DataCategory::Error));
1390        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1391
1392        assert!(limits.is_limited());
1393        assert!(envelope.envelope().is_empty());
1394        mock.lock().await.assert_call(DataCategory::Error, 1);
1395    }
1396
1397    #[tokio::test]
1398    async fn test_enforce_limit_attachments() {
1399        let mut envelope = envelope![Attachment::Minidump, Attachment];
1400
1401        let mock = mock_limiter(Some(DataCategory::Attachment));
1402        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1403
1404        // Attachments would be limited, but crash reports create events and are thus allowed.
1405        assert!(limits.is_limited());
1406        assert_eq!(envelope.envelope().len(), 1);
1407        mock.lock().await.assert_call(DataCategory::Error, 1);
1408        mock.lock().await.assert_call(DataCategory::Attachment, 20);
1409    }
1410
1411    /// Limit stand-alone profiles.
1412    #[tokio::test]
1413    async fn test_enforce_limit_profiles() {
1414        let mut envelope = envelope![Profile, Profile];
1415
1416        let mock = mock_limiter(Some(DataCategory::Profile));
1417        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1418
1419        assert!(limits.is_limited());
1420        assert_eq!(envelope.envelope().len(), 0);
1421        mock.lock().await.assert_call(DataCategory::Profile, 2);
1422
1423        assert_eq!(
1424            get_outcomes(enforcement),
1425            vec![
1426                (DataCategory::Profile, 2),
1427                (DataCategory::ProfileIndexed, 2)
1428            ]
1429        );
1430    }
1431
1432    /// Limit profile chunks.
1433    #[tokio::test]
1434    async fn test_enforce_limit_profile_chunks_no_profile_type() {
1435        // In this test we have profile chunks which have not yet been classified, which means they
1436        // should not be rate limited.
1437        let mut envelope = envelope![ProfileChunk, ProfileChunk];
1438
1439        let mock = mock_limiter(Some(DataCategory::ProfileChunk));
1440        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1441        assert!(!limits.is_limited());
1442        assert_eq!(get_outcomes(enforcement), vec![]);
1443
1444        let mock = mock_limiter(Some(DataCategory::ProfileChunkUi));
1445        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1446        assert!(!limits.is_limited());
1447        assert_eq!(get_outcomes(enforcement), vec![]);
1448
1449        assert_eq!(envelope.envelope().len(), 2);
1450    }
1451
1452    #[tokio::test]
1453    async fn test_enforce_limit_profile_chunks_ui() {
1454        let mut envelope = envelope![];
1455
1456        let mut item = Item::new(ItemType::ProfileChunk);
1457        item.set_profile_type(ProfileType::Backend);
1458        envelope.envelope_mut().add_item(item);
1459        let mut item = Item::new(ItemType::ProfileChunk);
1460        item.set_profile_type(ProfileType::Ui);
1461        envelope.envelope_mut().add_item(item);
1462
1463        let mock = mock_limiter(Some(DataCategory::ProfileChunkUi));
1464        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1465
1466        assert!(limits.is_limited());
1467        assert_eq!(envelope.envelope().len(), 1);
1468        mock.lock()
1469            .await
1470            .assert_call(DataCategory::ProfileChunkUi, 1);
1471        mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1472
1473        assert_eq!(
1474            get_outcomes(enforcement),
1475            vec![(DataCategory::ProfileChunkUi, 1)]
1476        );
1477    }
1478
1479    #[tokio::test]
1480    async fn test_enforce_limit_profile_chunks_backend() {
1481        let mut envelope = envelope![];
1482
1483        let mut item = Item::new(ItemType::ProfileChunk);
1484        item.set_profile_type(ProfileType::Backend);
1485        envelope.envelope_mut().add_item(item);
1486        let mut item = Item::new(ItemType::ProfileChunk);
1487        item.set_profile_type(ProfileType::Ui);
1488        envelope.envelope_mut().add_item(item);
1489
1490        let mock = mock_limiter(Some(DataCategory::ProfileChunk));
1491        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1492
1493        assert!(limits.is_limited());
1494        assert_eq!(envelope.envelope().len(), 1);
1495        mock.lock()
1496            .await
1497            .assert_call(DataCategory::ProfileChunkUi, 1);
1498        mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1499
1500        assert_eq!(
1501            get_outcomes(enforcement),
1502            vec![(DataCategory::ProfileChunk, 1)]
1503        );
1504    }
1505
1506    /// Limit replays.
1507    #[tokio::test]
1508    async fn test_enforce_limit_replays() {
1509        let mut envelope = envelope![ReplayEvent, ReplayRecording, ReplayVideo];
1510
1511        let mock = mock_limiter(Some(DataCategory::Replay));
1512        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1513
1514        assert!(limits.is_limited());
1515        assert_eq!(envelope.envelope().len(), 0);
1516        mock.lock().await.assert_call(DataCategory::Replay, 3);
1517
1518        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Replay, 3),]);
1519    }
1520
1521    /// Limit monitor checkins.
1522    #[tokio::test]
1523    async fn test_enforce_limit_monitor_checkins() {
1524        let mut envelope = envelope![CheckIn];
1525
1526        let mock = mock_limiter(Some(DataCategory::Monitor));
1527        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1528
1529        assert!(limits.is_limited());
1530        assert_eq!(envelope.envelope().len(), 0);
1531        mock.lock().await.assert_call(DataCategory::Monitor, 1);
1532
1533        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Monitor, 1)])
1534    }
1535
1536    #[tokio::test]
1537    async fn test_enforce_pass_minidump() {
1538        let mut envelope = envelope![Attachment::Minidump];
1539
1540        let mock = mock_limiter(Some(DataCategory::Attachment));
1541        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1542
1543        // If only crash report attachments are present, we don't emit a rate limit.
1544        assert!(!limits.is_limited());
1545        assert_eq!(envelope.envelope().len(), 1);
1546        mock.lock().await.assert_call(DataCategory::Error, 1);
1547        mock.lock().await.assert_call(DataCategory::Attachment, 10);
1548    }
1549
1550    #[tokio::test]
1551    async fn test_enforce_skip_rate_limited() {
1552        let mut envelope = envelope![];
1553
1554        let mut item = Item::new(ItemType::Attachment);
1555        item.set_payload(ContentType::OctetStream, "0123456789");
1556        item.set_rate_limited(true);
1557        envelope.envelope_mut().add_item(item);
1558
1559        let mock = mock_limiter(Some(DataCategory::Error));
1560        let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1561
1562        assert!(!limits.is_limited()); // No new rate limits applied.
1563        assert_eq!(envelope.envelope().len(), 1); // The item was retained
1564    }
1565
1566    #[tokio::test]
1567    async fn test_enforce_pass_sessions() {
1568        let mut envelope = envelope![Session, Session, Session];
1569
1570        let mock = mock_limiter(Some(DataCategory::Error));
1571        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1572
1573        // If only crash report attachments are present, we don't emit a rate limit.
1574        assert!(!limits.is_limited());
1575        assert_eq!(envelope.envelope().len(), 3);
1576        mock.lock().await.assert_call(DataCategory::Session, 3);
1577    }
1578
1579    #[tokio::test]
1580    async fn test_enforce_limit_sessions() {
1581        let mut envelope = envelope![Session, Session, Event];
1582
1583        let mock = mock_limiter(Some(DataCategory::Session));
1584        let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1585
1586        // If only crash report attachments are present, we don't emit a rate limit.
1587        assert!(limits.is_limited());
1588        assert_eq!(envelope.envelope().len(), 1);
1589        mock.lock().await.assert_call(DataCategory::Error, 1);
1590        mock.lock().await.assert_call(DataCategory::Session, 2);
1591    }
1592
1593    #[tokio::test]
1594    #[cfg(feature = "processing")]
1595    async fn test_enforce_limit_assumed_event() {
1596        let mut envelope = envelope![];
1597
1598        let mock = mock_limiter(Some(DataCategory::Transaction));
1599        let (_, limits) =
1600            enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Transaction)).await;
1601
1602        assert!(limits.is_limited());
1603        assert!(envelope.envelope().is_empty()); // obviously
1604        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1605    }
1606
1607    #[tokio::test]
1608    #[cfg(feature = "processing")]
1609    async fn test_enforce_limit_assumed_attachments() {
1610        let mut envelope = envelope![Attachment, Attachment];
1611
1612        let mock = mock_limiter(Some(DataCategory::Error));
1613        let (_, limits) =
1614            enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Error)).await;
1615
1616        assert!(limits.is_limited());
1617        assert!(envelope.envelope().is_empty());
1618        mock.lock().await.assert_call(DataCategory::Error, 1);
1619    }
1620
1621    #[tokio::test]
1622    async fn test_enforce_transaction() {
1623        let mut envelope = envelope![Transaction];
1624
1625        let mock = mock_limiter(Some(DataCategory::Transaction));
1626        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1627
1628        assert!(limits.is_limited());
1629        assert!(enforcement.event_indexed.is_active());
1630        assert!(enforcement.event.is_active());
1631        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1632
1633        assert_eq!(
1634            get_outcomes(enforcement),
1635            vec![
1636                (DataCategory::Transaction, 1),
1637                (DataCategory::TransactionIndexed, 1),
1638            ]
1639        );
1640    }
1641
1642    #[tokio::test]
1643    async fn test_enforce_transaction_non_indexed() {
1644        let mut envelope = envelope![Transaction, Profile];
1645        let scoping = envelope.scoping();
1646
1647        let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1648
1649        let mock_clone = mock.clone();
1650        let limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, move |s, q| {
1651            let mock_clone = mock_clone.clone();
1652            async move {
1653                let mut mock = mock_clone.lock().await;
1654                mock.check(s, q)
1655            }
1656        });
1657        let (enforcement, limits) = limiter
1658            .compute(envelope.envelope_mut(), &scoping)
1659            .await
1660            .unwrap();
1661        enforcement.clone().apply_with_outcomes(&mut envelope);
1662
1663        assert!(!limits.is_limited());
1664        assert!(!enforcement.event_indexed.is_active());
1665        assert!(!enforcement.event.is_active());
1666        assert!(!enforcement.profiles_indexed.is_active());
1667        assert!(!enforcement.profiles.is_active());
1668        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1669        mock.lock().await.assert_call(DataCategory::Profile, 1);
1670    }
1671
1672    #[tokio::test]
1673    async fn test_enforce_transaction_no_indexing_quota() {
1674        let mut envelope = envelope![Transaction];
1675
1676        let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1677        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1678
1679        assert!(limits.is_limited());
1680        assert!(enforcement.event_indexed.is_active());
1681        assert!(!enforcement.event.is_active());
1682        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1683        mock.lock()
1684            .await
1685            .assert_call(DataCategory::TransactionIndexed, 1);
1686    }
1687
1688    #[tokio::test]
1689    async fn test_enforce_transaction_attachment_enforced() {
1690        let mut envelope = envelope![Transaction, Attachment];
1691
1692        let mock = mock_limiter(Some(DataCategory::Transaction));
1693        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1694
1695        assert!(enforcement.event.is_active());
1696        assert!(enforcement.attachments.is_active());
1697        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1698    }
1699
1700    fn get_outcomes(enforcement: Enforcement) -> Vec<(DataCategory, usize)> {
1701        enforcement
1702            .get_outcomes()
1703            .map(|(_, data_category, quantity)| (data_category, quantity))
1704            .collect::<Vec<_>>()
1705    }
1706
1707    #[tokio::test]
1708    async fn test_enforce_transaction_profile_enforced() {
1709        let mut envelope = envelope![Transaction, Profile];
1710
1711        let mock = mock_limiter(Some(DataCategory::Transaction));
1712        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1713
1714        assert!(enforcement.event.is_active());
1715        assert!(enforcement.profiles.is_active());
1716        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1717
1718        assert_eq!(
1719            get_outcomes(enforcement),
1720            vec![
1721                (DataCategory::Transaction, 1),
1722                (DataCategory::TransactionIndexed, 1),
1723                (DataCategory::Profile, 1),
1724                (DataCategory::ProfileIndexed, 1),
1725            ]
1726        );
1727    }
1728
1729    #[tokio::test]
1730    async fn test_enforce_transaction_standalone_profile_enforced() {
1731        // When the transaction is sampled, the profile survives as standalone.
1732        let mut envelope = envelope![Profile];
1733
1734        let mock = mock_limiter(Some(DataCategory::Transaction));
1735        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1736
1737        assert!(enforcement.profiles.is_active());
1738        mock.lock().await.assert_call(DataCategory::Profile, 1);
1739        mock.lock().await.assert_call(DataCategory::Transaction, 0);
1740
1741        assert_eq!(
1742            get_outcomes(enforcement),
1743            vec![
1744                (DataCategory::Profile, 1),
1745                (DataCategory::ProfileIndexed, 1),
1746            ]
1747        );
1748    }
1749
1750    #[tokio::test]
1751    async fn test_enforce_transaction_attachment_enforced_indexing_quota() {
1752        let mut envelope = envelope![Transaction, Attachment];
1753        set_extracted(envelope.envelope_mut(), ItemType::Transaction);
1754
1755        let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1756        let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1757
1758        assert!(!enforcement.event.is_active());
1759        assert!(enforcement.event_indexed.is_active());
1760        assert!(enforcement.attachments.is_active());
1761        assert!(enforcement.attachment_items.is_active());
1762        mock.lock().await.assert_call(DataCategory::Transaction, 1);
1763        mock.lock()
1764            .await
1765            .assert_call(DataCategory::TransactionIndexed, 1);
1766
1767        assert_eq!(
1768            get_outcomes(enforcement),
1769            vec![
1770                (DataCategory::TransactionIndexed, 1),
1771                (DataCategory::Attachment, 10),
1772                (DataCategory::AttachmentItem, 1)
1773            ]
1774        );
1775    }
1776
1777    #[tokio::test]
1778    async fn test_enforce_span() {
1779        let mut envelope = envelope![Span, Span];
1780
1781        let mock = mock_limiter(Some(DataCategory::Span));
1782        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1783
1784        assert!(limits.is_limited());
1785        assert!(enforcement.spans_indexed.is_active());
1786        assert!(enforcement.spans.is_active());
1787        mock.lock().await.assert_call(DataCategory::Span, 2);
1788
1789        assert_eq!(
1790            get_outcomes(enforcement),
1791            vec![(DataCategory::Span, 2), (DataCategory::SpanIndexed, 2)]
1792        );
1793    }
1794
1795    #[tokio::test]
1796    async fn test_enforce_span_no_indexing_quota() {
1797        let mut envelope = envelope![Span, Span];
1798
1799        let mock = mock_limiter(Some(DataCategory::SpanIndexed));
1800        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1801
1802        assert!(limits.is_limited());
1803        assert!(enforcement.spans_indexed.is_active());
1804        assert!(!enforcement.spans.is_active());
1805        mock.lock().await.assert_call(DataCategory::Span, 2);
1806        mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
1807
1808        assert_eq!(
1809            get_outcomes(enforcement),
1810            vec![(DataCategory::SpanIndexed, 2)]
1811        );
1812    }
1813
1814    #[tokio::test]
1815    async fn test_enforce_span_metrics_extracted_no_indexing_quota() {
1816        let mut envelope = envelope![Span, Span];
1817        set_extracted(envelope.envelope_mut(), ItemType::Span);
1818
1819        let mock = mock_limiter(Some(DataCategory::SpanIndexed));
1820        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1821
1822        assert!(limits.is_limited());
1823        assert!(enforcement.spans_indexed.is_active());
1824        assert!(!enforcement.spans.is_active());
1825        mock.lock().await.assert_call(DataCategory::Span, 2);
1826        mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
1827
1828        assert_eq!(
1829            get_outcomes(enforcement),
1830            vec![(DataCategory::SpanIndexed, 2)]
1831        );
1832    }
1833
1834    #[test]
1835    fn test_source_quantity_for_total_quantity() {
1836        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
1837            .parse()
1838            .unwrap();
1839        let request_meta = RequestMeta::new(dsn);
1840
1841        let mut envelope = Envelope::from_request(None, request_meta);
1842
1843        let mut item = Item::new(ItemType::MetricBuckets);
1844        item.set_source_quantities(SourceQuantities {
1845            transactions: 5,
1846            spans: 0,
1847            profiles: 2,
1848            buckets: 5,
1849        });
1850        envelope.add_item(item);
1851
1852        let mut item = Item::new(ItemType::MetricBuckets);
1853        item.set_source_quantities(SourceQuantities {
1854            transactions: 2,
1855            spans: 0,
1856            profiles: 0,
1857            buckets: 3,
1858        });
1859        envelope.add_item(item);
1860
1861        let summary = EnvelopeSummary::compute(&envelope);
1862
1863        assert_eq!(summary.profile_quantity, 2);
1864        assert_eq!(summary.secondary_transaction_quantity, 7);
1865    }
1866
1867    #[tokio::test]
1868    async fn test_enforce_limit_logs_count() {
1869        let mut envelope = envelope![Log, Log];
1870
1871        let mock = mock_limiter(Some(DataCategory::LogItem));
1872        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1873
1874        assert!(limits.is_limited());
1875        assert_eq!(envelope.envelope().len(), 0);
1876        mock.lock().await.assert_call(DataCategory::LogItem, 2);
1877        mock.lock().await.assert_call(DataCategory::LogByte, 20);
1878
1879        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogItem, 2)]);
1880    }
1881
1882    #[tokio::test]
1883    async fn test_enforce_limit_logs_bytes() {
1884        let mut envelope = envelope![Log, Log];
1885
1886        let mock = mock_limiter(Some(DataCategory::LogByte));
1887        let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1888
1889        assert!(limits.is_limited());
1890        assert_eq!(envelope.envelope().len(), 0);
1891        mock.lock().await.assert_call(DataCategory::LogItem, 2);
1892        mock.lock().await.assert_call(DataCategory::LogByte, 20);
1893
1894        assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogByte, 20)]);
1895    }
1896}