1use std::fmt::{self, Write};
2use std::future::Future;
3use std::marker::PhantomData;
4
5use relay_profiling::ProfileType;
6use relay_quotas::{
7 DataCategory, ItemScoping, QuotaScope, RateLimit, RateLimitScope, RateLimits, ReasonCode,
8 Scoping,
9};
10
11use crate::envelope::{Envelope, Item, ItemType, ParentId};
12use crate::integrations::Integration;
13use crate::managed::ManagedEnvelope;
14use crate::services::outcome::Outcome;
15
16pub const RATE_LIMITS_HEADER: &str = "X-Sentry-Rate-Limits";
18
19pub fn format_rate_limits(rate_limits: &RateLimits) -> String {
21 let mut header = String::new();
22
23 for rate_limit in rate_limits {
24 if !header.is_empty() {
25 header.push_str(", ");
26 }
27
28 write!(header, "{}:", rate_limit.retry_after.remaining_seconds()).ok();
29
30 for (index, category) in rate_limit.categories.iter().enumerate() {
31 if index > 0 {
32 header.push(';');
33 }
34 write!(header, "{category}").ok();
35 }
36
37 write!(header, ":{}", rate_limit.scope.name()).ok();
38
39 if let Some(ref reason_code) = rate_limit.reason_code {
40 write!(header, ":{reason_code}").ok();
41 } else if !rate_limit.namespaces.is_empty() {
42 write!(header, ":").ok(); }
44
45 for (index, namespace) in rate_limit.namespaces.iter().enumerate() {
46 header.push(if index == 0 { ':' } else { ';' });
47 write!(header, "{namespace}").ok();
48 }
49 }
50
51 header
52}
53
54pub fn parse_rate_limits(scoping: &Scoping, string: &str) -> RateLimits {
56 let mut rate_limits = RateLimits::new();
57
58 for limit in string.split(',') {
59 let limit = limit.trim();
60 if limit.is_empty() {
61 continue;
62 }
63
64 let mut components = limit.split(':');
65
66 let retry_after = match components.next().and_then(|s| s.parse().ok()) {
67 Some(retry_after) => retry_after,
68 None => continue,
69 };
70
71 let categories = components
72 .next()
73 .unwrap_or("")
74 .split(';')
75 .filter(|category| !category.is_empty())
76 .map(DataCategory::from_name)
77 .collect();
78
79 let quota_scope = QuotaScope::from_name(components.next().unwrap_or(""));
80 let scope = RateLimitScope::for_quota(*scoping, quota_scope);
81
82 let reason_code = components
83 .next()
84 .filter(|s| !s.is_empty())
85 .map(ReasonCode::new);
86
87 let namespace = components
88 .next()
89 .unwrap_or("")
90 .split(';')
91 .filter(|s| !s.is_empty())
92 .filter_map(|s| s.parse().ok())
93 .collect();
94
95 rate_limits.add(RateLimit {
96 categories,
97 scope,
98 reason_code,
99 retry_after,
100 namespaces: namespace,
101 });
102 }
103
104 rate_limits
105}
106
107fn infer_event_category(item: &Item) -> Option<DataCategory> {
115 match item.ty() {
116 ItemType::Event => Some(DataCategory::Error),
117 ItemType::Transaction => Some(DataCategory::Transaction),
118 ItemType::Security | ItemType::RawSecurity => Some(DataCategory::Security),
119 ItemType::UnrealReport => Some(DataCategory::Error),
120 ItemType::UserReportV2 => Some(DataCategory::UserReportV2),
121 ItemType::Attachment if item.creates_event() => Some(DataCategory::Error),
122 ItemType::Attachment => None,
123 ItemType::Session => None,
124 ItemType::Sessions => None,
125 ItemType::Statsd => None,
126 ItemType::MetricBuckets => None,
127 ItemType::FormData => None,
128 ItemType::UserReport => None,
129 ItemType::Profile => None,
130 ItemType::ReplayEvent => None,
131 ItemType::ReplayRecording => None,
132 ItemType::ReplayVideo => None,
133 ItemType::ClientReport => None,
134 ItemType::CheckIn => None,
135 ItemType::Nel => None,
136 ItemType::Log => None,
137 ItemType::TraceMetric => None,
138 ItemType::Span => None,
139 ItemType::ProfileChunk => None,
140 ItemType::Integration => None,
141 ItemType::Unknown(_) => None,
142 }
143}
144
145#[non_exhaustive]
150#[derive(Clone, Copy, Debug, Default)]
151pub struct EnvelopeSummary {
152 pub event_category: Option<DataCategory>,
154
155 pub attachment_quantity: usize,
157
158 pub attachment_item_quantity: usize,
160
161 pub session_quantity: usize,
163
164 pub profile_quantity: usize,
166
167 pub replay_quantity: usize,
169
170 pub user_report_quantity: usize,
172
173 pub monitor_quantity: usize,
175
176 pub log_item_quantity: usize,
178
179 pub log_byte_quantity: usize,
181
182 pub secondary_transaction_quantity: usize,
191
192 pub secondary_span_quantity: usize,
194
195 pub span_quantity: usize,
197
198 pub has_plain_attachments: bool,
200
201 pub payload_size: usize,
203
204 pub profile_chunk_quantity: usize,
206 pub profile_chunk_ui_quantity: usize,
208
209 pub trace_metric_quantity: usize,
211}
212
213impl EnvelopeSummary {
214 pub fn empty() -> Self {
216 Self::default()
217 }
218
219 pub fn compute(envelope: &Envelope) -> Self {
221 let mut summary = Self::empty();
222
223 for item in envelope.items() {
224 if item.creates_event() {
225 summary.infer_category(item);
226 } else if item.ty() == &ItemType::Attachment {
227 summary.has_plain_attachments = true;
229 }
230
231 if item.rate_limited() {
234 continue;
235 }
236
237 if let Some(source_quantities) = item.source_quantities() {
238 summary.secondary_transaction_quantity += source_quantities.transactions;
239 summary.secondary_span_quantity += source_quantities.spans;
240 summary.profile_quantity += source_quantities.profiles;
241 }
242
243 summary.payload_size += item.len();
244
245 for (category, quantity) in item.quantities() {
246 summary.add_quantity(category, quantity);
247 }
248
249 if item.ty() == &ItemType::UserReport {
252 summary.user_report_quantity += 1;
253 }
254 }
255
256 summary
257 }
258
259 fn add_quantity(&mut self, category: DataCategory, quantity: usize) {
260 let target_quantity = match category {
261 DataCategory::Attachment => &mut self.attachment_quantity,
262 DataCategory::AttachmentItem => &mut self.attachment_item_quantity,
263 DataCategory::Session => &mut self.session_quantity,
264 DataCategory::Profile => &mut self.profile_quantity,
265 DataCategory::Replay => &mut self.replay_quantity,
266 DataCategory::DoNotUseReplayVideo => &mut self.replay_quantity,
267 DataCategory::Monitor => &mut self.monitor_quantity,
268 DataCategory::Span => &mut self.span_quantity,
269 DataCategory::TraceMetric => &mut self.trace_metric_quantity,
270 DataCategory::LogItem => &mut self.log_item_quantity,
271 DataCategory::LogByte => &mut self.log_byte_quantity,
272 DataCategory::ProfileChunk => &mut self.profile_chunk_quantity,
273 DataCategory::ProfileChunkUi => &mut self.profile_chunk_ui_quantity,
274 _ => return,
276 };
277 *target_quantity += quantity;
278 }
279
280 fn infer_category(&mut self, item: &Item) {
285 if matches!(self.event_category, None | Some(DataCategory::Default))
286 && let Some(category) = infer_event_category(item)
287 {
288 self.event_category = Some(category);
289 }
290 }
291}
292
293#[derive(Debug)]
295#[cfg_attr(test, derive(Clone))]
296pub struct CategoryLimit {
297 category: DataCategory,
299 quantity: usize,
303 reason_code: Option<ReasonCode>,
307}
308
309impl CategoryLimit {
310 fn new(category: DataCategory, quantity: usize, rate_limit: Option<&RateLimit>) -> Self {
314 match rate_limit {
315 Some(limit) => Self {
316 category,
317 quantity,
318 reason_code: limit.reason_code.clone(),
319 },
320 None => Self::default(),
321 }
322 }
323
324 pub fn clone_for(&self, category: DataCategory, quantity: usize) -> CategoryLimit {
326 if !self.is_active() {
327 return Self::default();
328 }
329
330 Self {
331 category,
332 quantity,
333 reason_code: self.reason_code.clone(),
334 }
335 }
336
337 pub fn is_active(&self) -> bool {
342 self.quantity > 0
343 }
344}
345
346impl Default for CategoryLimit {
347 fn default() -> Self {
348 Self {
349 category: DataCategory::Default,
350 quantity: 0,
351 reason_code: None,
352 }
353 }
354}
355
356#[derive(Default, Debug)]
358#[cfg_attr(test, derive(Clone))]
359pub struct Enforcement {
360 pub event: CategoryLimit,
362 pub event_indexed: CategoryLimit,
364 pub attachments: CategoryLimit,
366 pub attachment_items: CategoryLimit,
368 pub sessions: CategoryLimit,
370 pub profiles: CategoryLimit,
372 pub profiles_indexed: CategoryLimit,
374 pub replays: CategoryLimit,
376 pub check_ins: CategoryLimit,
378 pub log_items: CategoryLimit,
380 pub log_bytes: CategoryLimit,
382 pub spans: CategoryLimit,
384 pub spans_indexed: CategoryLimit,
386 pub user_reports: CategoryLimit,
388 pub profile_chunks: CategoryLimit,
390 pub profile_chunks_ui: CategoryLimit,
392 pub trace_metrics: CategoryLimit,
394}
395
396impl Enforcement {
397 pub fn active_event(&self) -> Option<&CategoryLimit> {
401 if self.event.is_active() {
402 Some(&self.event)
403 } else if self.event_indexed.is_active() {
404 Some(&self.event_indexed)
405 } else {
406 None
407 }
408 }
409
410 pub fn is_event_active(&self) -> bool {
412 self.active_event().is_some()
413 }
414
415 fn get_outcomes(self) -> impl Iterator<Item = (Outcome, DataCategory, usize)> {
417 let Self {
418 event,
419 event_indexed,
420 attachments,
421 attachment_items,
422 sessions: _, profiles,
424 profiles_indexed,
425 replays,
426 check_ins,
427 log_items,
428 log_bytes,
429 spans,
430 spans_indexed,
431 user_reports,
432 profile_chunks,
433 profile_chunks_ui,
434 trace_metrics,
435 } = self;
436
437 let limits = [
438 event,
439 event_indexed,
440 attachments,
441 attachment_items,
442 profiles,
443 profiles_indexed,
444 replays,
445 check_ins,
446 log_items,
447 log_bytes,
448 spans,
449 spans_indexed,
450 user_reports,
451 profile_chunks,
452 profile_chunks_ui,
453 trace_metrics,
454 ];
455
456 limits
457 .into_iter()
458 .filter(move |limit| limit.is_active())
459 .map(move |limit| {
460 (
461 Outcome::RateLimited(limit.reason_code),
462 limit.category,
463 limit.quantity,
464 )
465 })
466 }
467
468 pub fn apply_with_outcomes(self, envelope: &mut ManagedEnvelope) {
504 envelope
505 .envelope_mut()
506 .retain_items(|item| self.retain_item(item));
507 self.track_outcomes(envelope);
508 }
509
510 fn retain_item(&self, item: &mut Item) -> bool {
512 if self.event.is_active() && item.requires_event() {
514 return false;
515 }
516
517 match item.ty() {
521 ItemType::Attachment => {
522 if item.is_attachment_v2() && matches!(item.parent_id(), Some(ParentId::SpanId(_))) && (self.spans_indexed.is_active() || self.spans.is_active()) {
524 return false;
525 }
526 if !(self.attachments.is_active() || self.attachment_items.is_active()) {
527 return true;
528 }
529 if item.creates_event() {
530 item.set_rate_limited(true);
531 true
532 } else {
533 false
534 }
535 }
536 ItemType::Session => !self.sessions.is_active(),
537 ItemType::Profile => !self.profiles_indexed.is_active(),
538 ItemType::ReplayEvent => !self.replays.is_active(),
539 ItemType::ReplayVideo => !self.replays.is_active(),
540 ItemType::ReplayRecording => !self.replays.is_active(),
541 ItemType::UserReport => !self.user_reports.is_active(),
542 ItemType::CheckIn => !self.check_ins.is_active(),
543 ItemType::Log => {
544 !(self.log_items.is_active() || self.log_bytes.is_active())
545 }
546 ItemType::Span => !self.spans_indexed.is_active(),
547 ItemType::ProfileChunk => match item.profile_type() {
548 Some(ProfileType::Backend) => !self.profile_chunks.is_active(),
549 Some(ProfileType::Ui) => !self.profile_chunks_ui.is_active(),
550 None => true,
551 },
552 ItemType::TraceMetric => !self.trace_metrics.is_active(),
553 ItemType::Integration => match item.integration() {
554 Some(Integration::Logs(_)) => !(self.log_items.is_active() || self.log_bytes.is_active()),
555 Some(Integration::Spans(_)) => !self.spans_indexed.is_active(),
556 None => true,
557 },
558 ItemType::Event
559 | ItemType::Transaction
560 | ItemType::Security
561 | ItemType::FormData
562 | ItemType::RawSecurity
563 | ItemType::Nel
564 | ItemType::UnrealReport
565 | ItemType::Sessions
566 | ItemType::Statsd
567 | ItemType::MetricBuckets
568 | ItemType::ClientReport
569 | ItemType::UserReportV2 | ItemType::Unknown(_) => true,
571 }
572 }
573
574 fn track_outcomes(self, envelope: &mut ManagedEnvelope) {
578 for (outcome, category, quantity) in self.get_outcomes() {
579 envelope.track_outcome(outcome, category, quantity)
580 }
581 }
582}
583
584#[derive(Debug, Copy, Clone)]
586pub enum CheckLimits {
587 NonIndexed,
594 All,
596}
597
598struct Check<F, E, R> {
599 limits: CheckLimits,
600 check: F,
601 _1: PhantomData<E>,
602 _2: PhantomData<R>,
603}
604
605impl<F, E, R> Check<F, E, R>
606where
607 F: FnMut(ItemScoping, usize) -> R,
608 R: Future<Output = Result<RateLimits, E>>,
609{
610 async fn apply(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, E> {
611 if matches!(self.limits, CheckLimits::NonIndexed) && scoping.category.is_indexed() {
612 return Ok(RateLimits::default());
613 }
614
615 (self.check)(scoping, quantity).await
616 }
617}
618
619pub struct EnvelopeLimiter<F, E, R> {
631 check: Check<F, E, R>,
632 event_category: Option<DataCategory>,
633}
634
635impl<'a, F, E, R> EnvelopeLimiter<F, E, R>
636where
637 F: FnMut(ItemScoping, usize) -> R,
638 R: Future<Output = Result<RateLimits, E>>,
639{
640 pub fn new(limits: CheckLimits, check: F) -> Self {
642 Self {
643 check: Check {
644 check,
645 limits,
646 _1: PhantomData,
647 _2: PhantomData,
648 },
649 event_category: None,
650 }
651 }
652
653 pub fn assume_event(&mut self, category: DataCategory) {
659 self.event_category = Some(category);
660 }
661
662 pub async fn compute(
673 mut self,
674 envelope: &mut Envelope,
675 scoping: &'a Scoping,
676 ) -> Result<(Enforcement, RateLimits), E> {
677 let mut summary = EnvelopeSummary::compute(envelope);
678 summary.event_category = self.event_category.or(summary.event_category);
679
680 let (enforcement, rate_limits) = self.execute(&summary, scoping).await?;
681 Ok((enforcement, rate_limits))
682 }
683
684 async fn execute(
685 &mut self,
686 summary: &EnvelopeSummary,
687 scoping: &'a Scoping,
688 ) -> Result<(Enforcement, RateLimits), E> {
689 let mut rate_limits = RateLimits::new();
690 let mut enforcement = Enforcement::default();
691
692 if let Some(category) = summary.event_category {
694 let mut event_limits = self.check.apply(scoping.item(category), 1).await?;
696 enforcement.event = CategoryLimit::new(category, 1, event_limits.longest());
697
698 if let Some(index_category) = category.index_category() {
699 if event_limits.is_empty() {
702 event_limits.merge(self.check.apply(scoping.item(index_category), 1).await?);
703 }
704
705 enforcement.event_indexed =
706 CategoryLimit::new(index_category, 1, event_limits.longest());
707 };
708
709 rate_limits.merge(event_limits);
710 }
711
712 if let Some(limit) = enforcement.active_event() {
714 let limit1 = limit.clone_for(DataCategory::Attachment, summary.attachment_quantity);
715 let limit2 = limit.clone_for(
716 DataCategory::AttachmentItem,
717 summary.attachment_item_quantity,
718 );
719 enforcement.attachments = limit1;
720 enforcement.attachment_items = limit2;
721 } else {
722 let mut attachment_limits = RateLimits::new();
723 if summary.attachment_quantity > 0 {
724 let item_scoping = scoping.item(DataCategory::Attachment);
725
726 let attachment_byte_limits = self
727 .check
728 .apply(item_scoping, summary.attachment_quantity)
729 .await?;
730
731 enforcement.attachments = CategoryLimit::new(
732 DataCategory::Attachment,
733 summary.attachment_quantity,
734 attachment_byte_limits.longest(),
735 );
736 enforcement.attachment_items = enforcement.attachments.clone_for(
737 DataCategory::AttachmentItem,
738 summary.attachment_item_quantity,
739 );
740 attachment_limits.merge(attachment_byte_limits);
741 }
742 if !attachment_limits.is_limited() && summary.attachment_item_quantity > 0 {
743 let item_scoping = scoping.item(DataCategory::AttachmentItem);
744
745 let attachment_item_limits = self
746 .check
747 .apply(item_scoping, summary.attachment_item_quantity)
748 .await?;
749
750 enforcement.attachment_items = CategoryLimit::new(
751 DataCategory::AttachmentItem,
752 summary.attachment_item_quantity,
753 attachment_item_limits.longest(),
754 );
755 enforcement.attachments = enforcement
756 .attachment_items
757 .clone_for(DataCategory::Attachment, summary.attachment_quantity);
758 attachment_limits.merge(attachment_item_limits);
759 }
760
761 if summary.has_plain_attachments {
765 rate_limits.merge(attachment_limits);
766 }
767 }
768
769 if summary.session_quantity > 0 {
771 let item_scoping = scoping.item(DataCategory::Session);
772 let session_limits = self
773 .check
774 .apply(item_scoping, summary.session_quantity)
775 .await?;
776 enforcement.sessions = CategoryLimit::new(
777 DataCategory::Session,
778 summary.session_quantity,
779 session_limits.longest(),
780 );
781 rate_limits.merge(session_limits);
782 }
783
784 if summary.trace_metric_quantity > 0 {
786 let item_scoping = scoping.item(DataCategory::TraceMetric);
787 let trace_metric_limits = self
788 .check
789 .apply(item_scoping, summary.trace_metric_quantity)
790 .await?;
791 enforcement.trace_metrics = CategoryLimit::new(
792 DataCategory::TraceMetric,
793 summary.trace_metric_quantity,
794 trace_metric_limits.longest(),
795 );
796 rate_limits.merge(trace_metric_limits);
797 }
798
799 if summary.log_item_quantity > 0 {
801 let item_scoping = scoping.item(DataCategory::LogItem);
802 let log_limits = self
803 .check
804 .apply(item_scoping, summary.log_item_quantity)
805 .await?;
806 enforcement.log_items = CategoryLimit::new(
807 DataCategory::LogItem,
808 summary.log_item_quantity,
809 log_limits.longest(),
810 );
811 rate_limits.merge(log_limits);
812 }
813 if summary.log_byte_quantity > 0 {
814 let item_scoping = scoping.item(DataCategory::LogByte);
815 let log_limits = self
816 .check
817 .apply(item_scoping, summary.log_byte_quantity)
818 .await?;
819 enforcement.log_bytes = CategoryLimit::new(
820 DataCategory::LogByte,
821 summary.log_byte_quantity,
822 log_limits.longest(),
823 );
824 rate_limits.merge(log_limits);
825 }
826
827 if enforcement.is_event_active() {
829 enforcement.profiles = enforcement
830 .event
831 .clone_for(DataCategory::Profile, summary.profile_quantity);
832
833 enforcement.profiles_indexed = enforcement
834 .event_indexed
835 .clone_for(DataCategory::ProfileIndexed, summary.profile_quantity)
836 } else if summary.profile_quantity > 0 {
837 let mut profile_limits = self
838 .check
839 .apply(
840 scoping.item(DataCategory::Profile),
841 summary.profile_quantity,
842 )
843 .await?;
844
845 if profile_limits.is_empty() && summary.event_category.is_none() {
848 profile_limits = self
849 .check
850 .apply(scoping.item(DataCategory::Transaction), 0)
851 .await?;
852 }
853
854 enforcement.profiles = CategoryLimit::new(
855 DataCategory::Profile,
856 summary.profile_quantity,
857 profile_limits.longest(),
858 );
859
860 if profile_limits.is_empty() {
861 profile_limits.merge(
862 self.check
863 .apply(
864 scoping.item(DataCategory::ProfileIndexed),
865 summary.profile_quantity,
866 )
867 .await?,
868 );
869 }
870
871 enforcement.profiles_indexed = CategoryLimit::new(
872 DataCategory::ProfileIndexed,
873 summary.profile_quantity,
874 profile_limits.longest(),
875 );
876
877 rate_limits.merge(profile_limits);
878 }
879
880 if summary.replay_quantity > 0 {
882 let item_scoping = scoping.item(DataCategory::Replay);
883 let replay_limits = self
884 .check
885 .apply(item_scoping, summary.replay_quantity)
886 .await?;
887 enforcement.replays = CategoryLimit::new(
888 DataCategory::Replay,
889 summary.replay_quantity,
890 replay_limits.longest(),
891 );
892 rate_limits.merge(replay_limits);
893 }
894
895 if summary.user_report_quantity > 0 {
897 let item_scoping = scoping.item(DataCategory::UserReportV2);
898 let user_report_v2_limits = self
899 .check
900 .apply(item_scoping, summary.user_report_quantity)
901 .await?;
902 enforcement.user_reports = CategoryLimit::new(
903 DataCategory::UserReportV2,
904 summary.user_report_quantity,
905 user_report_v2_limits.longest(),
906 );
907 rate_limits.merge(user_report_v2_limits);
908 }
909
910 if summary.monitor_quantity > 0 {
912 let item_scoping = scoping.item(DataCategory::Monitor);
913 let checkin_limits = self
914 .check
915 .apply(item_scoping, summary.monitor_quantity)
916 .await?;
917 enforcement.check_ins = CategoryLimit::new(
918 DataCategory::Monitor,
919 summary.monitor_quantity,
920 checkin_limits.longest(),
921 );
922 rate_limits.merge(checkin_limits);
923 }
924
925 if enforcement.is_event_active() {
927 enforcement.spans = enforcement
928 .event
929 .clone_for(DataCategory::Span, summary.span_quantity);
930
931 enforcement.spans_indexed = enforcement
932 .event_indexed
933 .clone_for(DataCategory::SpanIndexed, summary.span_quantity);
934 } else if summary.span_quantity > 0 {
935 let mut span_limits = self
936 .check
937 .apply(scoping.item(DataCategory::Span), summary.span_quantity)
938 .await?;
939 enforcement.spans = CategoryLimit::new(
940 DataCategory::Span,
941 summary.span_quantity,
942 span_limits.longest(),
943 );
944
945 if span_limits.is_empty() {
946 span_limits.merge(
947 self.check
948 .apply(
949 scoping.item(DataCategory::SpanIndexed),
950 summary.span_quantity,
951 )
952 .await?,
953 );
954 }
955
956 enforcement.spans_indexed = CategoryLimit::new(
957 DataCategory::SpanIndexed,
958 summary.span_quantity,
959 span_limits.longest(),
960 );
961
962 rate_limits.merge(span_limits);
963 }
964
965 if summary.profile_chunk_quantity > 0 {
967 let item_scoping = scoping.item(DataCategory::ProfileChunk);
968 let limits = self
969 .check
970 .apply(item_scoping, summary.profile_chunk_quantity)
971 .await?;
972 enforcement.profile_chunks = CategoryLimit::new(
973 DataCategory::ProfileChunk,
974 summary.profile_chunk_quantity,
975 limits.longest(),
976 );
977 rate_limits.merge(limits);
978 }
979
980 if summary.profile_chunk_ui_quantity > 0 {
981 let item_scoping = scoping.item(DataCategory::ProfileChunkUi);
982 let limits = self
983 .check
984 .apply(item_scoping, summary.profile_chunk_ui_quantity)
985 .await?;
986 enforcement.profile_chunks_ui = CategoryLimit::new(
987 DataCategory::ProfileChunkUi,
988 summary.profile_chunk_ui_quantity,
989 limits.longest(),
990 );
991 rate_limits.merge(limits);
992 }
993
994 Ok((enforcement, rate_limits))
995 }
996}
997
998impl<F, E, R> fmt::Debug for EnvelopeLimiter<F, E, R> {
999 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1000 f.debug_struct("EnvelopeLimiter")
1001 .field("event_category", &self.event_category)
1002 .finish()
1003 }
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008
1009 use std::collections::{BTreeMap, BTreeSet};
1010 use std::sync::Arc;
1011
1012 use relay_base_schema::organization::OrganizationId;
1013 use relay_base_schema::project::{ProjectId, ProjectKey};
1014 use relay_metrics::MetricNamespace;
1015 use relay_quotas::RetryAfter;
1016 use relay_system::Addr;
1017 use smallvec::smallvec;
1018 use tokio::sync::Mutex;
1019
1020 use super::*;
1021 use crate::{
1022 envelope::{AttachmentType, ContentType, SourceQuantities},
1023 extractors::RequestMeta,
1024 };
1025
1026 #[tokio::test]
1027 async fn test_format_rate_limits() {
1028 let mut rate_limits = RateLimits::new();
1029
1030 rate_limits.add(RateLimit {
1032 categories: Default::default(),
1033 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1034 reason_code: Some(ReasonCode::new("my_limit")),
1035 retry_after: RetryAfter::from_secs(42),
1036 namespaces: smallvec![],
1037 });
1038
1039 rate_limits.add(RateLimit {
1041 categories: [DataCategory::Transaction, DataCategory::Security].into(),
1042 scope: RateLimitScope::Project(ProjectId::new(21)),
1043 reason_code: None,
1044 retry_after: RetryAfter::from_secs(4711),
1045 namespaces: smallvec![],
1046 });
1047
1048 let formatted = format_rate_limits(&rate_limits);
1049 let expected = "42::organization:my_limit, 4711:transaction;security:project";
1050 assert_eq!(formatted, expected);
1051 }
1052
1053 #[tokio::test]
1054 async fn test_format_rate_limits_namespace() {
1055 let mut rate_limits = RateLimits::new();
1056
1057 rate_limits.add(RateLimit {
1059 categories: [DataCategory::MetricBucket].into(),
1060 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1061 reason_code: Some(ReasonCode::new("my_limit")),
1062 retry_after: RetryAfter::from_secs(42),
1063 namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1064 });
1065
1066 rate_limits.add(RateLimit {
1068 categories: [DataCategory::MetricBucket].into(),
1069 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1070 reason_code: None,
1071 retry_after: RetryAfter::from_secs(42),
1072 namespaces: smallvec![MetricNamespace::Spans],
1073 });
1074
1075 let formatted = format_rate_limits(&rate_limits);
1076 let expected = "42:metric_bucket:organization:my_limit:custom;spans, 42:metric_bucket:organization::spans";
1077 assert_eq!(formatted, expected);
1078 }
1079
1080 #[tokio::test]
1081 async fn test_parse_invalid_rate_limits() {
1082 let scoping = Scoping {
1083 organization_id: OrganizationId::new(42),
1084 project_id: ProjectId::new(21),
1085 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1086 key_id: Some(17),
1087 };
1088
1089 assert!(parse_rate_limits(&scoping, "").is_ok());
1090 assert!(parse_rate_limits(&scoping, "invalid").is_ok());
1091 assert!(parse_rate_limits(&scoping, ",,,").is_ok());
1092 }
1093
1094 #[tokio::test]
1095 async fn test_parse_rate_limits() {
1096 let scoping = Scoping {
1097 organization_id: OrganizationId::new(42),
1098 project_id: ProjectId::new(21),
1099 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1100 key_id: Some(17),
1101 };
1102
1103 let formatted =
1105 "42::organization:my_limit, invalid, 4711:foobar;transaction;security:project";
1106 let rate_limits: Vec<RateLimit> =
1107 parse_rate_limits(&scoping, formatted).into_iter().collect();
1108
1109 assert_eq!(
1110 rate_limits,
1111 vec![
1112 RateLimit {
1113 categories: Default::default(),
1114 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1115 reason_code: Some(ReasonCode::new("my_limit")),
1116 retry_after: rate_limits[0].retry_after,
1117 namespaces: smallvec![],
1118 },
1119 RateLimit {
1120 categories: [
1121 DataCategory::Unknown,
1122 DataCategory::Transaction,
1123 DataCategory::Security,
1124 ]
1125 .into(),
1126 scope: RateLimitScope::Project(ProjectId::new(21)),
1127 reason_code: None,
1128 retry_after: rate_limits[1].retry_after,
1129 namespaces: smallvec![],
1130 }
1131 ]
1132 );
1133
1134 assert_eq!(42, rate_limits[0].retry_after.remaining_seconds());
1135 assert_eq!(4711, rate_limits[1].retry_after.remaining_seconds());
1136 }
1137
1138 #[tokio::test]
1139 async fn test_parse_rate_limits_namespace() {
1140 let scoping = Scoping {
1141 organization_id: OrganizationId::new(42),
1142 project_id: ProjectId::new(21),
1143 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1144 key_id: Some(17),
1145 };
1146
1147 let formatted = "42:metric_bucket:organization::custom;spans";
1148 let rate_limits: Vec<RateLimit> =
1149 parse_rate_limits(&scoping, formatted).into_iter().collect();
1150
1151 assert_eq!(
1152 rate_limits,
1153 vec![RateLimit {
1154 categories: [DataCategory::MetricBucket].into(),
1155 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1156 reason_code: None,
1157 retry_after: rate_limits[0].retry_after,
1158 namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1159 }]
1160 );
1161 }
1162
1163 #[tokio::test]
1164 async fn test_parse_rate_limits_empty_namespace() {
1165 let scoping = Scoping {
1166 organization_id: OrganizationId::new(42),
1167 project_id: ProjectId::new(21),
1168 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1169 key_id: Some(17),
1170 };
1171
1172 let formatted = "42:metric_bucket:organization:some_reason:";
1174 let rate_limits: Vec<RateLimit> =
1175 parse_rate_limits(&scoping, formatted).into_iter().collect();
1176
1177 assert_eq!(
1178 rate_limits,
1179 vec![RateLimit {
1180 categories: [DataCategory::MetricBucket].into(),
1181 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1182 reason_code: Some(ReasonCode::new("some_reason")),
1183 retry_after: rate_limits[0].retry_after,
1184 namespaces: smallvec![],
1185 }]
1186 );
1187 }
1188
1189 #[tokio::test]
1190 async fn test_parse_rate_limits_only_unknown() {
1191 let scoping = Scoping {
1192 organization_id: OrganizationId::new(42),
1193 project_id: ProjectId::new(21),
1194 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1195 key_id: Some(17),
1196 };
1197
1198 let formatted = "42:foo;bar:organization";
1199 let rate_limits: Vec<RateLimit> =
1200 parse_rate_limits(&scoping, formatted).into_iter().collect();
1201
1202 assert_eq!(
1203 rate_limits,
1204 vec![RateLimit {
1205 categories: [DataCategory::Unknown, DataCategory::Unknown].into(),
1206 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1207 reason_code: None,
1208 retry_after: rate_limits[0].retry_after,
1209 namespaces: smallvec![],
1210 },]
1211 );
1212 }
1213
1214 macro_rules! envelope {
1215 ($( $item_type:ident $( :: $attachment_type:ident )? ),*) => {{
1216 let bytes = "{\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}";
1217 #[allow(unused_mut)]
1218 let mut envelope = Envelope::parse_bytes(bytes.into()).unwrap();
1219 $(
1220 let mut item = Item::new(ItemType::$item_type);
1221 item.set_payload(ContentType::OctetStream, "0123456789");
1222 $( item.set_attachment_type(AttachmentType::$attachment_type); )?
1223 envelope.add_item(item);
1224 )*
1225
1226 let (outcome_aggregator, _) = Addr::custom();
1227
1228 ManagedEnvelope::new(
1229 envelope,
1230 outcome_aggregator,
1231 )
1232 }}
1233 }
1234
1235 fn set_extracted(envelope: &mut Envelope, ty: ItemType) {
1236 envelope
1237 .get_item_by_mut(|item| *item.ty() == ty)
1238 .unwrap()
1239 .set_metrics_extracted(true);
1240 }
1241
1242 fn rate_limit(category: DataCategory) -> RateLimit {
1243 RateLimit {
1244 categories: [category].into(),
1245 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1246 reason_code: None,
1247 retry_after: RetryAfter::from_secs(60),
1248 namespaces: smallvec![],
1249 }
1250 }
1251
1252 #[derive(Debug, Default)]
1253 struct MockLimiter {
1254 denied: Vec<DataCategory>,
1255 called: BTreeMap<DataCategory, usize>,
1256 checked: BTreeSet<DataCategory>,
1257 }
1258
1259 impl MockLimiter {
1260 pub fn deny(mut self, category: DataCategory) -> Self {
1261 self.denied.push(category);
1262 self
1263 }
1264
1265 pub fn check(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, ()> {
1266 let cat = scoping.category;
1267 let previous = self.called.insert(cat, quantity);
1268 assert!(previous.is_none(), "rate limiter invoked twice for {cat}");
1269
1270 let mut limits = RateLimits::new();
1271 if self.denied.contains(&cat) {
1272 limits.add(rate_limit(cat));
1273 }
1274 Ok(limits)
1275 }
1276
1277 #[track_caller]
1278 pub fn assert_call(&mut self, category: DataCategory, expected: usize) {
1279 self.checked.insert(category);
1280
1281 let quantity = self.called.get(&category).copied();
1282 assert_eq!(
1283 quantity,
1284 Some(expected),
1285 "Expected quantity `{expected}` for data category `{category}`, got {quantity:?}."
1286 );
1287 }
1288 }
1289
1290 impl Drop for MockLimiter {
1291 fn drop(&mut self) {
1292 if std::thread::panicking() {
1293 return;
1294 }
1295
1296 for checked in &self.checked {
1297 self.called.remove(checked);
1298 }
1299
1300 if self.called.is_empty() {
1301 return;
1302 }
1303
1304 let not_asserted = self
1305 .called
1306 .iter()
1307 .map(|(k, v)| format!("- {k}: {v}"))
1308 .collect::<Vec<_>>()
1309 .join("\n");
1310
1311 panic!("Following calls to the limiter were not asserted:\n{not_asserted}");
1312 }
1313 }
1314
1315 async fn enforce_and_apply(
1316 mock: Arc<Mutex<MockLimiter>>,
1317 envelope: &mut ManagedEnvelope,
1318 #[allow(unused_variables)] assume_event: Option<DataCategory>,
1319 ) -> (Enforcement, RateLimits) {
1320 let scoping = envelope.scoping();
1321
1322 #[allow(unused_mut)]
1323 let mut limiter = EnvelopeLimiter::new(CheckLimits::All, move |s, q| {
1324 let mock = mock.clone();
1325 async move {
1326 let mut mock = mock.lock().await;
1327 mock.check(s, q)
1328 }
1329 });
1330 #[cfg(feature = "processing")]
1331 if let Some(assume_event) = assume_event {
1332 limiter.assume_event(assume_event);
1333 }
1334
1335 let (enforcement, limits) = limiter
1336 .compute(envelope.envelope_mut(), &scoping)
1337 .await
1338 .unwrap();
1339
1340 enforcement.clone().apply_with_outcomes(envelope);
1343
1344 (enforcement, limits)
1345 }
1346
1347 fn mock_limiter(category: Option<DataCategory>) -> Arc<Mutex<MockLimiter>> {
1348 let mut mock = MockLimiter::default();
1349 if let Some(category) = category {
1350 mock = mock.deny(category);
1351 }
1352
1353 Arc::new(Mutex::new(mock))
1354 }
1355
1356 #[tokio::test]
1357 async fn test_enforce_pass_empty() {
1358 let mut envelope = envelope![];
1359
1360 let mock = mock_limiter(None);
1361 let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1362
1363 assert!(!limits.is_limited());
1364 assert!(envelope.envelope().is_empty());
1365 }
1366
1367 #[tokio::test]
1368 async fn test_enforce_limit_error_event() {
1369 let mut envelope = envelope![Event];
1370
1371 let mock = mock_limiter(Some(DataCategory::Error));
1372 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1373
1374 assert!(limits.is_limited());
1375 assert!(envelope.envelope().is_empty());
1376 mock.lock().await.assert_call(DataCategory::Error, 1);
1377 }
1378
1379 #[tokio::test]
1380 async fn test_enforce_limit_error_with_attachments() {
1381 let mut envelope = envelope![Event, Attachment];
1382
1383 let mock = mock_limiter(Some(DataCategory::Error));
1384 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1385
1386 assert!(limits.is_limited());
1387 assert!(envelope.envelope().is_empty());
1388 mock.lock().await.assert_call(DataCategory::Error, 1);
1389 }
1390
1391 #[tokio::test]
1392 async fn test_enforce_limit_minidump() {
1393 let mut envelope = envelope![Attachment::Minidump];
1394
1395 let mock = mock_limiter(Some(DataCategory::Error));
1396 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1397
1398 assert!(limits.is_limited());
1399 assert!(envelope.envelope().is_empty());
1400 mock.lock().await.assert_call(DataCategory::Error, 1);
1401 }
1402
1403 #[tokio::test]
1404 async fn test_enforce_limit_attachments() {
1405 let mut envelope = envelope![Attachment::Minidump, Attachment];
1406
1407 let mock = mock_limiter(Some(DataCategory::Attachment));
1408 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1409
1410 assert!(limits.is_limited());
1412 assert_eq!(envelope.envelope().len(), 1);
1413 mock.lock().await.assert_call(DataCategory::Error, 1);
1414 mock.lock().await.assert_call(DataCategory::Attachment, 20);
1415 }
1416
1417 #[tokio::test]
1419 async fn test_enforce_limit_profiles() {
1420 let mut envelope = envelope![Profile, Profile];
1421
1422 let mock = mock_limiter(Some(DataCategory::Profile));
1423 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1424
1425 assert!(limits.is_limited());
1426 assert_eq!(envelope.envelope().len(), 0);
1427 mock.lock().await.assert_call(DataCategory::Profile, 2);
1428
1429 assert_eq!(
1430 get_outcomes(enforcement),
1431 vec![
1432 (DataCategory::Profile, 2),
1433 (DataCategory::ProfileIndexed, 2)
1434 ]
1435 );
1436 }
1437
1438 #[tokio::test]
1440 async fn test_enforce_limit_profile_chunks_no_profile_type() {
1441 let mut envelope = envelope![ProfileChunk, ProfileChunk];
1444
1445 let mock = mock_limiter(Some(DataCategory::ProfileChunk));
1446 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1447 assert!(!limits.is_limited());
1448 assert_eq!(get_outcomes(enforcement), vec![]);
1449
1450 let mock = mock_limiter(Some(DataCategory::ProfileChunkUi));
1451 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1452 assert!(!limits.is_limited());
1453 assert_eq!(get_outcomes(enforcement), vec![]);
1454
1455 assert_eq!(envelope.envelope().len(), 2);
1456 }
1457
1458 #[tokio::test]
1459 async fn test_enforce_limit_profile_chunks_ui() {
1460 let mut envelope = envelope![];
1461
1462 let mut item = Item::new(ItemType::ProfileChunk);
1463 item.set_profile_type(ProfileType::Backend);
1464 envelope.envelope_mut().add_item(item);
1465 let mut item = Item::new(ItemType::ProfileChunk);
1466 item.set_profile_type(ProfileType::Ui);
1467 envelope.envelope_mut().add_item(item);
1468
1469 let mock = mock_limiter(Some(DataCategory::ProfileChunkUi));
1470 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1471
1472 assert!(limits.is_limited());
1473 assert_eq!(envelope.envelope().len(), 1);
1474 mock.lock()
1475 .await
1476 .assert_call(DataCategory::ProfileChunkUi, 1);
1477 mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1478
1479 assert_eq!(
1480 get_outcomes(enforcement),
1481 vec![(DataCategory::ProfileChunkUi, 1)]
1482 );
1483 }
1484
1485 #[tokio::test]
1486 async fn test_enforce_limit_profile_chunks_backend() {
1487 let mut envelope = envelope![];
1488
1489 let mut item = Item::new(ItemType::ProfileChunk);
1490 item.set_profile_type(ProfileType::Backend);
1491 envelope.envelope_mut().add_item(item);
1492 let mut item = Item::new(ItemType::ProfileChunk);
1493 item.set_profile_type(ProfileType::Ui);
1494 envelope.envelope_mut().add_item(item);
1495
1496 let mock = mock_limiter(Some(DataCategory::ProfileChunk));
1497 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1498
1499 assert!(limits.is_limited());
1500 assert_eq!(envelope.envelope().len(), 1);
1501 mock.lock()
1502 .await
1503 .assert_call(DataCategory::ProfileChunkUi, 1);
1504 mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1505
1506 assert_eq!(
1507 get_outcomes(enforcement),
1508 vec![(DataCategory::ProfileChunk, 1)]
1509 );
1510 }
1511
1512 #[tokio::test]
1514 async fn test_enforce_limit_replays() {
1515 let mut envelope = envelope![ReplayEvent, ReplayRecording, ReplayVideo];
1516
1517 let mock = mock_limiter(Some(DataCategory::Replay));
1518 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1519
1520 assert!(limits.is_limited());
1521 assert_eq!(envelope.envelope().len(), 0);
1522 mock.lock().await.assert_call(DataCategory::Replay, 3);
1523
1524 assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Replay, 3),]);
1525 }
1526
1527 #[tokio::test]
1529 async fn test_enforce_limit_monitor_checkins() {
1530 let mut envelope = envelope![CheckIn];
1531
1532 let mock = mock_limiter(Some(DataCategory::Monitor));
1533 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1534
1535 assert!(limits.is_limited());
1536 assert_eq!(envelope.envelope().len(), 0);
1537 mock.lock().await.assert_call(DataCategory::Monitor, 1);
1538
1539 assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Monitor, 1)])
1540 }
1541
1542 #[tokio::test]
1543 async fn test_enforce_pass_minidump() {
1544 let mut envelope = envelope![Attachment::Minidump];
1545
1546 let mock = mock_limiter(Some(DataCategory::Attachment));
1547 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1548
1549 assert!(!limits.is_limited());
1551 assert_eq!(envelope.envelope().len(), 1);
1552 mock.lock().await.assert_call(DataCategory::Error, 1);
1553 mock.lock().await.assert_call(DataCategory::Attachment, 10);
1554 }
1555
1556 #[tokio::test]
1557 async fn test_enforce_skip_rate_limited() {
1558 let mut envelope = envelope![];
1559
1560 let mut item = Item::new(ItemType::Attachment);
1561 item.set_payload(ContentType::OctetStream, "0123456789");
1562 item.set_rate_limited(true);
1563 envelope.envelope_mut().add_item(item);
1564
1565 let mock = mock_limiter(Some(DataCategory::Error));
1566 let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1567
1568 assert!(!limits.is_limited()); assert_eq!(envelope.envelope().len(), 1); }
1571
1572 #[tokio::test]
1573 async fn test_enforce_pass_sessions() {
1574 let mut envelope = envelope![Session, Session, Session];
1575
1576 let mock = mock_limiter(Some(DataCategory::Error));
1577 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1578
1579 assert!(!limits.is_limited());
1581 assert_eq!(envelope.envelope().len(), 3);
1582 mock.lock().await.assert_call(DataCategory::Session, 3);
1583 }
1584
1585 #[tokio::test]
1586 async fn test_enforce_limit_sessions() {
1587 let mut envelope = envelope![Session, Session, Event];
1588
1589 let mock = mock_limiter(Some(DataCategory::Session));
1590 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1591
1592 assert!(limits.is_limited());
1594 assert_eq!(envelope.envelope().len(), 1);
1595 mock.lock().await.assert_call(DataCategory::Error, 1);
1596 mock.lock().await.assert_call(DataCategory::Session, 2);
1597 }
1598
1599 #[tokio::test]
1600 #[cfg(feature = "processing")]
1601 async fn test_enforce_limit_assumed_event() {
1602 let mut envelope = envelope![];
1603
1604 let mock = mock_limiter(Some(DataCategory::Transaction));
1605 let (_, limits) =
1606 enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Transaction)).await;
1607
1608 assert!(limits.is_limited());
1609 assert!(envelope.envelope().is_empty()); mock.lock().await.assert_call(DataCategory::Transaction, 1);
1611 }
1612
1613 #[tokio::test]
1614 #[cfg(feature = "processing")]
1615 async fn test_enforce_limit_assumed_attachments() {
1616 let mut envelope = envelope![Attachment, Attachment];
1617
1618 let mock = mock_limiter(Some(DataCategory::Error));
1619 let (_, limits) =
1620 enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Error)).await;
1621
1622 assert!(limits.is_limited());
1623 assert!(envelope.envelope().is_empty());
1624 mock.lock().await.assert_call(DataCategory::Error, 1);
1625 }
1626
1627 #[tokio::test]
1628 async fn test_enforce_transaction() {
1629 let mut envelope = envelope![Transaction];
1630
1631 let mock = mock_limiter(Some(DataCategory::Transaction));
1632 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1633
1634 assert!(limits.is_limited());
1635 assert!(enforcement.event_indexed.is_active());
1636 assert!(enforcement.event.is_active());
1637 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1638
1639 assert_eq!(
1640 get_outcomes(enforcement),
1641 vec![
1642 (DataCategory::Transaction, 1),
1643 (DataCategory::TransactionIndexed, 1),
1644 (DataCategory::Span, 1),
1645 (DataCategory::SpanIndexed, 1),
1646 ]
1647 );
1648 }
1649
1650 #[tokio::test]
1651 async fn test_enforce_transaction_non_indexed() {
1652 let mut envelope = envelope![Transaction, Profile];
1653 let scoping = envelope.scoping();
1654
1655 let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1656
1657 let mock_clone = mock.clone();
1658 let limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, move |s, q| {
1659 let mock_clone = mock_clone.clone();
1660 async move {
1661 let mut mock = mock_clone.lock().await;
1662 mock.check(s, q)
1663 }
1664 });
1665 let (enforcement, limits) = limiter
1666 .compute(envelope.envelope_mut(), &scoping)
1667 .await
1668 .unwrap();
1669 enforcement.clone().apply_with_outcomes(&mut envelope);
1670
1671 assert!(!limits.is_limited());
1672 assert!(!enforcement.event_indexed.is_active());
1673 assert!(!enforcement.event.is_active());
1674 assert!(!enforcement.profiles_indexed.is_active());
1675 assert!(!enforcement.profiles.is_active());
1676 assert!(!enforcement.spans.is_active());
1677 assert!(!enforcement.spans_indexed.is_active());
1678 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1679 mock.lock().await.assert_call(DataCategory::Profile, 1);
1680 mock.lock().await.assert_call(DataCategory::Span, 1);
1681 }
1682
1683 #[tokio::test]
1684 async fn test_enforce_transaction_no_indexing_quota() {
1685 let mut envelope = envelope![Transaction];
1686
1687 let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1688 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1689
1690 assert!(limits.is_limited());
1691 assert!(enforcement.event_indexed.is_active());
1692 assert!(!enforcement.event.is_active());
1693 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1694 mock.lock()
1695 .await
1696 .assert_call(DataCategory::TransactionIndexed, 1);
1697 }
1698
1699 #[tokio::test]
1700 async fn test_enforce_transaction_attachment_enforced() {
1701 let mut envelope = envelope![Transaction, Attachment];
1702
1703 let mock = mock_limiter(Some(DataCategory::Transaction));
1704 let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1705
1706 assert!(enforcement.event.is_active());
1707 assert!(enforcement.attachments.is_active());
1708 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1709 }
1710
1711 fn get_outcomes(enforcement: Enforcement) -> Vec<(DataCategory, usize)> {
1712 enforcement
1713 .get_outcomes()
1714 .map(|(_, data_category, quantity)| (data_category, quantity))
1715 .collect::<Vec<_>>()
1716 }
1717
1718 #[tokio::test]
1719 async fn test_enforce_transaction_profile_enforced() {
1720 let mut envelope = envelope![Transaction, Profile];
1721
1722 let mock = mock_limiter(Some(DataCategory::Transaction));
1723 let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1724
1725 assert!(enforcement.event.is_active());
1726 assert!(enforcement.profiles.is_active());
1727 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1728
1729 assert_eq!(
1730 get_outcomes(enforcement),
1731 vec![
1732 (DataCategory::Transaction, 1),
1733 (DataCategory::TransactionIndexed, 1),
1734 (DataCategory::Profile, 1),
1735 (DataCategory::ProfileIndexed, 1),
1736 (DataCategory::Span, 1),
1737 (DataCategory::SpanIndexed, 1),
1738 ]
1739 );
1740 }
1741
1742 #[tokio::test]
1743 async fn test_enforce_transaction_standalone_profile_enforced() {
1744 let mut envelope = envelope![Profile];
1746
1747 let mock = mock_limiter(Some(DataCategory::Transaction));
1748 let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1749
1750 assert!(enforcement.profiles.is_active());
1751 mock.lock().await.assert_call(DataCategory::Profile, 1);
1752 mock.lock().await.assert_call(DataCategory::Transaction, 0);
1753
1754 assert_eq!(
1755 get_outcomes(enforcement),
1756 vec![
1757 (DataCategory::Profile, 1),
1758 (DataCategory::ProfileIndexed, 1),
1759 ]
1760 );
1761 }
1762
1763 #[tokio::test]
1764 async fn test_enforce_transaction_attachment_enforced_indexing_quota() {
1765 let mut envelope = envelope![Transaction, Attachment];
1766 set_extracted(envelope.envelope_mut(), ItemType::Transaction);
1767
1768 let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1769 let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1770
1771 assert!(!enforcement.event.is_active());
1772 assert!(enforcement.event_indexed.is_active());
1773 assert!(enforcement.attachments.is_active());
1774 assert!(enforcement.attachment_items.is_active());
1775 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1776 mock.lock()
1777 .await
1778 .assert_call(DataCategory::TransactionIndexed, 1);
1779
1780 assert_eq!(
1781 get_outcomes(enforcement),
1782 vec![
1783 (DataCategory::TransactionIndexed, 1),
1784 (DataCategory::Attachment, 10),
1785 (DataCategory::AttachmentItem, 1),
1786 (DataCategory::SpanIndexed, 1),
1787 ]
1788 );
1789 }
1790
1791 #[tokio::test]
1792 async fn test_enforce_span() {
1793 let mut envelope = envelope![Span, Span];
1794
1795 let mock = mock_limiter(Some(DataCategory::Span));
1796 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1797
1798 assert!(limits.is_limited());
1799 assert!(enforcement.spans_indexed.is_active());
1800 assert!(enforcement.spans.is_active());
1801 mock.lock().await.assert_call(DataCategory::Span, 2);
1802
1803 assert_eq!(
1804 get_outcomes(enforcement),
1805 vec![(DataCategory::Span, 2), (DataCategory::SpanIndexed, 2)]
1806 );
1807 }
1808
1809 #[tokio::test]
1810 async fn test_enforce_span_no_indexing_quota() {
1811 let mut envelope = envelope![Span, Span];
1812
1813 let mock = mock_limiter(Some(DataCategory::SpanIndexed));
1814 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1815
1816 assert!(limits.is_limited());
1817 assert!(enforcement.spans_indexed.is_active());
1818 assert!(!enforcement.spans.is_active());
1819 mock.lock().await.assert_call(DataCategory::Span, 2);
1820 mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
1821
1822 assert_eq!(
1823 get_outcomes(enforcement),
1824 vec![(DataCategory::SpanIndexed, 2)]
1825 );
1826 }
1827
1828 #[tokio::test]
1829 async fn test_enforce_span_metrics_extracted_no_indexing_quota() {
1830 let mut envelope = envelope![Span, Span];
1831 set_extracted(envelope.envelope_mut(), ItemType::Span);
1832
1833 let mock = mock_limiter(Some(DataCategory::SpanIndexed));
1834 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1835
1836 assert!(limits.is_limited());
1837 assert!(enforcement.spans_indexed.is_active());
1838 assert!(!enforcement.spans.is_active());
1839 mock.lock().await.assert_call(DataCategory::Span, 2);
1840 mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
1841
1842 assert_eq!(
1843 get_outcomes(enforcement),
1844 vec![(DataCategory::SpanIndexed, 2)]
1845 );
1846 }
1847
1848 #[test]
1849 fn test_source_quantity_for_total_quantity() {
1850 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
1851 .parse()
1852 .unwrap();
1853 let request_meta = RequestMeta::new(dsn);
1854
1855 let mut envelope = Envelope::from_request(None, request_meta);
1856
1857 let mut item = Item::new(ItemType::MetricBuckets);
1858 item.set_source_quantities(SourceQuantities {
1859 transactions: 5,
1860 spans: 0,
1861 profiles: 2,
1862 buckets: 5,
1863 });
1864 envelope.add_item(item);
1865
1866 let mut item = Item::new(ItemType::MetricBuckets);
1867 item.set_source_quantities(SourceQuantities {
1868 transactions: 2,
1869 spans: 0,
1870 profiles: 0,
1871 buckets: 3,
1872 });
1873 envelope.add_item(item);
1874
1875 let summary = EnvelopeSummary::compute(&envelope);
1876
1877 assert_eq!(summary.profile_quantity, 2);
1878 assert_eq!(summary.secondary_transaction_quantity, 7);
1879 }
1880
1881 #[tokio::test]
1882 async fn test_enforce_limit_logs_count() {
1883 let mut envelope = envelope![Log, Log];
1884
1885 let mock = mock_limiter(Some(DataCategory::LogItem));
1886 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1887
1888 assert!(limits.is_limited());
1889 assert_eq!(envelope.envelope().len(), 0);
1890 mock.lock().await.assert_call(DataCategory::LogItem, 2);
1891 mock.lock().await.assert_call(DataCategory::LogByte, 20);
1892
1893 assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogItem, 2)]);
1894 }
1895
1896 #[tokio::test]
1897 async fn test_enforce_limit_logs_bytes() {
1898 let mut envelope = envelope![Log, Log];
1899
1900 let mock = mock_limiter(Some(DataCategory::LogByte));
1901 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1902
1903 assert!(limits.is_limited());
1904 assert_eq!(envelope.envelope().len(), 0);
1905 mock.lock().await.assert_call(DataCategory::LogItem, 2);
1906 mock.lock().await.assert_call(DataCategory::LogByte, 20);
1907
1908 assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogByte, 20)]);
1909 }
1910}