1use std::fmt::{self, Write};
2use std::future::Future;
3use std::marker::PhantomData;
4
5use relay_profiling::ProfileType;
6use relay_quotas::{
7 DataCategories, DataCategory, ItemScoping, QuotaScope, RateLimit, RateLimitScope, RateLimits,
8 ReasonCode, Scoping,
9};
10
11use crate::envelope::{Envelope, Item, ItemType};
12use crate::integrations::Integration;
13use crate::managed::ManagedEnvelope;
14use crate::services::outcome::Outcome;
15
16pub const RATE_LIMITS_HEADER: &str = "X-Sentry-Rate-Limits";
18
19pub fn format_rate_limits(rate_limits: &RateLimits) -> String {
21 let mut header = String::new();
22
23 for rate_limit in rate_limits {
24 if !header.is_empty() {
25 header.push_str(", ");
26 }
27
28 write!(header, "{}:", rate_limit.retry_after.remaining_seconds()).ok();
29
30 for (index, category) in rate_limit.categories.iter().enumerate() {
31 if index > 0 {
32 header.push(';');
33 }
34 write!(header, "{category}").ok();
35 }
36
37 write!(header, ":{}", rate_limit.scope.name()).ok();
38
39 if let Some(ref reason_code) = rate_limit.reason_code {
40 write!(header, ":{reason_code}").ok();
41 } else if !rate_limit.namespaces.is_empty() {
42 write!(header, ":").ok(); }
44
45 for (index, namespace) in rate_limit.namespaces.iter().enumerate() {
46 header.push(if index == 0 { ':' } else { ';' });
47 write!(header, "{namespace}").ok();
48 }
49 }
50
51 header
52}
53
54pub fn parse_rate_limits(scoping: &Scoping, string: &str) -> RateLimits {
56 let mut rate_limits = RateLimits::new();
57
58 for limit in string.split(',') {
59 let limit = limit.trim();
60 if limit.is_empty() {
61 continue;
62 }
63
64 let mut components = limit.split(':');
65
66 let retry_after = match components.next().and_then(|s| s.parse().ok()) {
67 Some(retry_after) => retry_after,
68 None => continue,
69 };
70
71 let mut categories = DataCategories::new();
72 for category in components.next().unwrap_or("").split(';') {
73 if !category.is_empty() {
74 categories.push(DataCategory::from_name(category));
75 }
76 }
77
78 let quota_scope = QuotaScope::from_name(components.next().unwrap_or(""));
79 let scope = RateLimitScope::for_quota(*scoping, quota_scope);
80
81 let reason_code = components
82 .next()
83 .filter(|s| !s.is_empty())
84 .map(ReasonCode::new);
85
86 let namespace = components
87 .next()
88 .unwrap_or("")
89 .split(';')
90 .filter(|s| !s.is_empty())
91 .filter_map(|s| s.parse().ok())
92 .collect();
93
94 rate_limits.add(RateLimit {
95 categories,
96 scope,
97 reason_code,
98 retry_after,
99 namespaces: namespace,
100 });
101 }
102
103 rate_limits
104}
105
106fn infer_event_category(item: &Item) -> Option<DataCategory> {
114 match item.ty() {
115 ItemType::Event => Some(DataCategory::Error),
116 ItemType::Transaction => Some(DataCategory::Transaction),
117 ItemType::Security | ItemType::RawSecurity => Some(DataCategory::Security),
118 ItemType::UnrealReport => Some(DataCategory::Error),
119 ItemType::UserReportV2 => Some(DataCategory::UserReportV2),
120 ItemType::Attachment if item.creates_event() => Some(DataCategory::Error),
121 ItemType::Attachment => None,
122 ItemType::Session => None,
123 ItemType::Sessions => None,
124 ItemType::Statsd => None,
125 ItemType::MetricBuckets => None,
126 ItemType::FormData => None,
127 ItemType::UserReport => None,
128 ItemType::Profile => None,
129 ItemType::ReplayEvent => None,
130 ItemType::ReplayRecording => None,
131 ItemType::ReplayVideo => None,
132 ItemType::ClientReport => None,
133 ItemType::CheckIn => None,
134 ItemType::Nel => None,
135 ItemType::Log => None,
136 ItemType::TraceMetric => None,
137 ItemType::Span => None,
138 ItemType::ProfileChunk => None,
139 ItemType::Integration => None,
140 ItemType::Unknown(_) => None,
141 }
142}
143
144#[non_exhaustive]
149#[derive(Clone, Copy, Debug, Default)]
150pub struct EnvelopeSummary {
151 pub event_category: Option<DataCategory>,
153
154 pub attachment_quantity: usize,
156
157 pub attachment_item_quantity: usize,
159
160 pub session_quantity: usize,
162
163 pub profile_quantity: usize,
165
166 pub replay_quantity: usize,
168
169 pub user_report_quantity: usize,
171
172 pub monitor_quantity: usize,
174
175 pub log_item_quantity: usize,
177
178 pub log_byte_quantity: usize,
180
181 pub secondary_transaction_quantity: usize,
190
191 pub secondary_span_quantity: usize,
193
194 pub span_quantity: usize,
196
197 pub has_plain_attachments: bool,
199
200 pub payload_size: usize,
202
203 pub profile_chunk_quantity: usize,
205 pub profile_chunk_ui_quantity: usize,
207
208 pub trace_metric_quantity: usize,
210}
211
212impl EnvelopeSummary {
213 pub fn empty() -> Self {
215 Self::default()
216 }
217
218 pub fn compute(envelope: &Envelope) -> Self {
220 let mut summary = Self::empty();
221
222 for item in envelope.items() {
223 if item.creates_event() {
224 summary.infer_category(item);
225 } else if item.ty() == &ItemType::Attachment {
226 summary.has_plain_attachments = true;
228 }
229
230 if item.rate_limited() {
233 continue;
234 }
235
236 if let Some(source_quantities) = item.source_quantities() {
237 summary.secondary_transaction_quantity += source_quantities.transactions;
238 summary.secondary_span_quantity += source_quantities.spans;
239 summary.profile_quantity += source_quantities.profiles;
240 }
241
242 summary.payload_size += item.len();
243
244 for (category, quantity) in item.quantities() {
245 summary.add_quantity(category, quantity);
246 }
247
248 if item.ty() == &ItemType::UserReport {
251 summary.user_report_quantity += 1;
252 }
253 }
254
255 summary
256 }
257
258 fn add_quantity(&mut self, category: DataCategory, quantity: usize) {
259 let target_quantity = match category {
260 DataCategory::Attachment => &mut self.attachment_quantity,
261 DataCategory::AttachmentItem => &mut self.attachment_item_quantity,
262 DataCategory::Session => &mut self.session_quantity,
263 DataCategory::Profile => &mut self.profile_quantity,
264 DataCategory::Replay => &mut self.replay_quantity,
265 DataCategory::DoNotUseReplayVideo => &mut self.replay_quantity,
266 DataCategory::Monitor => &mut self.monitor_quantity,
267 DataCategory::Span => &mut self.span_quantity,
268 DataCategory::TraceMetric => &mut self.trace_metric_quantity,
269 DataCategory::LogItem => &mut self.log_item_quantity,
270 DataCategory::LogByte => &mut self.log_byte_quantity,
271 DataCategory::ProfileChunk => &mut self.profile_chunk_quantity,
272 DataCategory::ProfileChunkUi => &mut self.profile_chunk_ui_quantity,
273 _ => return,
275 };
276 *target_quantity += quantity;
277 }
278
279 fn infer_category(&mut self, item: &Item) {
284 if matches!(self.event_category, None | Some(DataCategory::Default))
285 && let Some(category) = infer_event_category(item)
286 {
287 self.event_category = Some(category);
288 }
289 }
290}
291
292#[derive(Debug)]
294#[cfg_attr(test, derive(Clone))]
295pub struct CategoryLimit {
296 category: DataCategory,
298 quantity: usize,
302 reason_code: Option<ReasonCode>,
306}
307
308impl CategoryLimit {
309 fn new(category: DataCategory, quantity: usize, rate_limit: Option<&RateLimit>) -> Self {
313 match rate_limit {
314 Some(limit) => Self {
315 category,
316 quantity,
317 reason_code: limit.reason_code.clone(),
318 },
319 None => Self::default(),
320 }
321 }
322
323 pub fn clone_for(&self, category: DataCategory, quantity: usize) -> CategoryLimit {
325 if !self.is_active() {
326 return Self::default();
327 }
328
329 Self {
330 category,
331 quantity,
332 reason_code: self.reason_code.clone(),
333 }
334 }
335
336 pub fn is_active(&self) -> bool {
341 self.quantity > 0
342 }
343}
344
345impl Default for CategoryLimit {
346 fn default() -> Self {
347 Self {
348 category: DataCategory::Default,
349 quantity: 0,
350 reason_code: None,
351 }
352 }
353}
354
355#[derive(Default, Debug)]
357#[cfg_attr(test, derive(Clone))]
358pub struct Enforcement {
359 pub event: CategoryLimit,
361 pub event_indexed: CategoryLimit,
363 pub attachments: CategoryLimit,
365 pub attachment_items: CategoryLimit,
367 pub sessions: CategoryLimit,
369 pub profiles: CategoryLimit,
371 pub profiles_indexed: CategoryLimit,
373 pub replays: CategoryLimit,
375 pub check_ins: CategoryLimit,
377 pub log_items: CategoryLimit,
379 pub log_bytes: CategoryLimit,
381 pub spans: CategoryLimit,
383 pub spans_indexed: CategoryLimit,
385 pub user_reports: CategoryLimit,
387 pub profile_chunks: CategoryLimit,
389 pub profile_chunks_ui: CategoryLimit,
391 pub trace_metrics: CategoryLimit,
393}
394
395impl Enforcement {
396 pub fn active_event(&self) -> Option<&CategoryLimit> {
400 if self.event.is_active() {
401 Some(&self.event)
402 } else if self.event_indexed.is_active() {
403 Some(&self.event_indexed)
404 } else {
405 None
406 }
407 }
408
409 pub fn is_event_active(&self) -> bool {
411 self.active_event().is_some()
412 }
413
414 fn get_outcomes(self) -> impl Iterator<Item = (Outcome, DataCategory, usize)> {
416 let Self {
417 event,
418 event_indexed,
419 attachments,
420 attachment_items,
421 sessions: _, profiles,
423 profiles_indexed,
424 replays,
425 check_ins,
426 log_items,
427 log_bytes,
428 spans,
429 spans_indexed,
430 user_reports,
431 profile_chunks,
432 profile_chunks_ui,
433 trace_metrics,
434 } = self;
435
436 let limits = [
437 event,
438 event_indexed,
439 attachments,
440 attachment_items,
441 profiles,
442 profiles_indexed,
443 replays,
444 check_ins,
445 log_items,
446 log_bytes,
447 spans,
448 spans_indexed,
449 user_reports,
450 profile_chunks,
451 profile_chunks_ui,
452 trace_metrics,
453 ];
454
455 limits
456 .into_iter()
457 .filter(move |limit| limit.is_active())
458 .map(move |limit| {
459 (
460 Outcome::RateLimited(limit.reason_code),
461 limit.category,
462 limit.quantity,
463 )
464 })
465 }
466
467 pub fn apply_with_outcomes(self, envelope: &mut ManagedEnvelope) {
503 envelope
504 .envelope_mut()
505 .retain_items(|item| self.retain_item(item));
506 self.track_outcomes(envelope);
507 }
508
509 fn retain_item(&self, item: &mut Item) -> bool {
511 if self.event.is_active() && item.requires_event() {
513 return false;
514 }
515
516 match item.ty() {
520 ItemType::Attachment => {
521 if !(self.attachments.is_active() || self.attachment_items.is_active()) {
522 return true;
523 }
524 if item.creates_event() {
525 item.set_rate_limited(true);
526 true
527 } else {
528 false
529 }
530 }
531 ItemType::Session => !self.sessions.is_active(),
532 ItemType::Profile => !self.profiles_indexed.is_active(),
533 ItemType::ReplayEvent => !self.replays.is_active(),
534 ItemType::ReplayVideo => !self.replays.is_active(),
535 ItemType::ReplayRecording => !self.replays.is_active(),
536 ItemType::UserReport => !self.user_reports.is_active(),
537 ItemType::CheckIn => !self.check_ins.is_active(),
538 ItemType::Log => {
539 !(self.log_items.is_active() || self.log_bytes.is_active())
540 }
541 ItemType::Span => !self.spans_indexed.is_active(),
542 ItemType::ProfileChunk => match item.profile_type() {
543 Some(ProfileType::Backend) => !self.profile_chunks.is_active(),
544 Some(ProfileType::Ui) => !self.profile_chunks_ui.is_active(),
545 None => true,
546 },
547 ItemType::TraceMetric => !self.trace_metrics.is_active(),
548 ItemType::Integration => match item.integration() {
549 Some(Integration::Logs(_)) => !(self.log_items.is_active() || self.log_bytes.is_active()),
550 Some(Integration::Spans(_)) => !self.spans_indexed.is_active(),
551 None => true,
552 },
553 ItemType::Event
554 | ItemType::Transaction
555 | ItemType::Security
556 | ItemType::FormData
557 | ItemType::RawSecurity
558 | ItemType::Nel
559 | ItemType::UnrealReport
560 | ItemType::Sessions
561 | ItemType::Statsd
562 | ItemType::MetricBuckets
563 | ItemType::ClientReport
564 | ItemType::UserReportV2 | ItemType::Unknown(_) => true,
566 }
567 }
568
569 fn track_outcomes(self, envelope: &mut ManagedEnvelope) {
573 for (outcome, category, quantity) in self.get_outcomes() {
574 envelope.track_outcome(outcome, category, quantity)
575 }
576 }
577}
578
579#[derive(Debug, Copy, Clone)]
581pub enum CheckLimits {
582 NonIndexed,
589 All,
591}
592
593struct Check<F, E, R> {
594 limits: CheckLimits,
595 check: F,
596 _1: PhantomData<E>,
597 _2: PhantomData<R>,
598}
599
600impl<F, E, R> Check<F, E, R>
601where
602 F: FnMut(ItemScoping, usize) -> R,
603 R: Future<Output = Result<RateLimits, E>>,
604{
605 async fn apply(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, E> {
606 if matches!(self.limits, CheckLimits::NonIndexed) && scoping.category.is_indexed() {
607 return Ok(RateLimits::default());
608 }
609
610 (self.check)(scoping, quantity).await
611 }
612}
613
614pub struct EnvelopeLimiter<F, E, R> {
626 check: Check<F, E, R>,
627 event_category: Option<DataCategory>,
628}
629
630impl<'a, F, E, R> EnvelopeLimiter<F, E, R>
631where
632 F: FnMut(ItemScoping, usize) -> R,
633 R: Future<Output = Result<RateLimits, E>>,
634{
635 pub fn new(limits: CheckLimits, check: F) -> Self {
637 Self {
638 check: Check {
639 check,
640 limits,
641 _1: PhantomData,
642 _2: PhantomData,
643 },
644 event_category: None,
645 }
646 }
647
648 pub fn assume_event(&mut self, category: DataCategory) {
654 self.event_category = Some(category);
655 }
656
657 pub async fn compute(
668 mut self,
669 envelope: &mut Envelope,
670 scoping: &'a Scoping,
671 ) -> Result<(Enforcement, RateLimits), E> {
672 let mut summary = EnvelopeSummary::compute(envelope);
673 summary.event_category = self.event_category.or(summary.event_category);
674
675 let (enforcement, rate_limits) = self.execute(&summary, scoping).await?;
676 Ok((enforcement, rate_limits))
677 }
678
679 async fn execute(
680 &mut self,
681 summary: &EnvelopeSummary,
682 scoping: &'a Scoping,
683 ) -> Result<(Enforcement, RateLimits), E> {
684 let mut rate_limits = RateLimits::new();
685 let mut enforcement = Enforcement::default();
686
687 if let Some(category) = summary.event_category {
689 let mut event_limits = self.check.apply(scoping.item(category), 1).await?;
691 enforcement.event = CategoryLimit::new(category, 1, event_limits.longest());
692
693 if let Some(index_category) = category.index_category() {
694 if event_limits.is_empty() {
697 event_limits.merge(self.check.apply(scoping.item(index_category), 1).await?);
698 }
699
700 enforcement.event_indexed =
701 CategoryLimit::new(index_category, 1, event_limits.longest());
702 };
703
704 rate_limits.merge(event_limits);
705 }
706
707 if let Some(limit) = enforcement.active_event() {
709 let limit1 = limit.clone_for(DataCategory::Attachment, summary.attachment_quantity);
710 let limit2 = limit.clone_for(
711 DataCategory::AttachmentItem,
712 summary.attachment_item_quantity,
713 );
714 enforcement.attachments = limit1;
715 enforcement.attachment_items = limit2;
716 } else {
717 let mut attachment_limits = RateLimits::new();
718 if summary.attachment_quantity > 0 {
719 let item_scoping = scoping.item(DataCategory::Attachment);
720
721 let attachment_byte_limits = self
722 .check
723 .apply(item_scoping, summary.attachment_quantity)
724 .await?;
725
726 enforcement.attachments = CategoryLimit::new(
727 DataCategory::Attachment,
728 summary.attachment_quantity,
729 attachment_byte_limits.longest(),
730 );
731 enforcement.attachment_items = enforcement.attachments.clone_for(
732 DataCategory::AttachmentItem,
733 summary.attachment_item_quantity,
734 );
735 attachment_limits.merge(attachment_byte_limits);
736 }
737 if !attachment_limits.is_limited() && summary.attachment_item_quantity > 0 {
738 let item_scoping = scoping.item(DataCategory::AttachmentItem);
739
740 let attachment_item_limits = self
741 .check
742 .apply(item_scoping, summary.attachment_item_quantity)
743 .await?;
744
745 enforcement.attachment_items = CategoryLimit::new(
746 DataCategory::AttachmentItem,
747 summary.attachment_item_quantity,
748 attachment_item_limits.longest(),
749 );
750 enforcement.attachments = enforcement
751 .attachment_items
752 .clone_for(DataCategory::Attachment, summary.attachment_quantity);
753 attachment_limits.merge(attachment_item_limits);
754 }
755
756 if summary.has_plain_attachments {
760 rate_limits.merge(attachment_limits);
761 }
762 }
763
764 if summary.session_quantity > 0 {
766 let item_scoping = scoping.item(DataCategory::Session);
767 let session_limits = self
768 .check
769 .apply(item_scoping, summary.session_quantity)
770 .await?;
771 enforcement.sessions = CategoryLimit::new(
772 DataCategory::Session,
773 summary.session_quantity,
774 session_limits.longest(),
775 );
776 rate_limits.merge(session_limits);
777 }
778
779 if summary.trace_metric_quantity > 0 {
781 let item_scoping = scoping.item(DataCategory::TraceMetric);
782 let trace_metric_limits = self
783 .check
784 .apply(item_scoping, summary.trace_metric_quantity)
785 .await?;
786 enforcement.trace_metrics = CategoryLimit::new(
787 DataCategory::TraceMetric,
788 summary.trace_metric_quantity,
789 trace_metric_limits.longest(),
790 );
791 rate_limits.merge(trace_metric_limits);
792 }
793
794 if summary.log_item_quantity > 0 {
796 let item_scoping = scoping.item(DataCategory::LogItem);
797 let log_limits = self
798 .check
799 .apply(item_scoping, summary.log_item_quantity)
800 .await?;
801 enforcement.log_items = CategoryLimit::new(
802 DataCategory::LogItem,
803 summary.log_item_quantity,
804 log_limits.longest(),
805 );
806 rate_limits.merge(log_limits);
807 }
808 if summary.log_byte_quantity > 0 {
809 let item_scoping = scoping.item(DataCategory::LogByte);
810 let log_limits = self
811 .check
812 .apply(item_scoping, summary.log_byte_quantity)
813 .await?;
814 enforcement.log_bytes = CategoryLimit::new(
815 DataCategory::LogByte,
816 summary.log_byte_quantity,
817 log_limits.longest(),
818 );
819 rate_limits.merge(log_limits);
820 }
821
822 if enforcement.is_event_active() {
824 enforcement.profiles = enforcement
825 .event
826 .clone_for(DataCategory::Profile, summary.profile_quantity);
827
828 enforcement.profiles_indexed = enforcement
829 .event_indexed
830 .clone_for(DataCategory::ProfileIndexed, summary.profile_quantity)
831 } else if summary.profile_quantity > 0 {
832 let mut profile_limits = self
833 .check
834 .apply(
835 scoping.item(DataCategory::Profile),
836 summary.profile_quantity,
837 )
838 .await?;
839
840 if profile_limits.is_empty() && summary.event_category.is_none() {
843 profile_limits = self
844 .check
845 .apply(scoping.item(DataCategory::Transaction), 0)
846 .await?;
847 }
848
849 enforcement.profiles = CategoryLimit::new(
850 DataCategory::Profile,
851 summary.profile_quantity,
852 profile_limits.longest(),
853 );
854
855 if profile_limits.is_empty() {
856 profile_limits.merge(
857 self.check
858 .apply(
859 scoping.item(DataCategory::ProfileIndexed),
860 summary.profile_quantity,
861 )
862 .await?,
863 );
864 }
865
866 enforcement.profiles_indexed = CategoryLimit::new(
867 DataCategory::ProfileIndexed,
868 summary.profile_quantity,
869 profile_limits.longest(),
870 );
871
872 rate_limits.merge(profile_limits);
873 }
874
875 if summary.replay_quantity > 0 {
877 let item_scoping = scoping.item(DataCategory::Replay);
878 let replay_limits = self
879 .check
880 .apply(item_scoping, summary.replay_quantity)
881 .await?;
882 enforcement.replays = CategoryLimit::new(
883 DataCategory::Replay,
884 summary.replay_quantity,
885 replay_limits.longest(),
886 );
887 rate_limits.merge(replay_limits);
888 }
889
890 if summary.user_report_quantity > 0 {
892 let item_scoping = scoping.item(DataCategory::UserReportV2);
893 let user_report_v2_limits = self
894 .check
895 .apply(item_scoping, summary.user_report_quantity)
896 .await?;
897 enforcement.user_reports = CategoryLimit::new(
898 DataCategory::UserReportV2,
899 summary.user_report_quantity,
900 user_report_v2_limits.longest(),
901 );
902 rate_limits.merge(user_report_v2_limits);
903 }
904
905 if summary.monitor_quantity > 0 {
907 let item_scoping = scoping.item(DataCategory::Monitor);
908 let checkin_limits = self
909 .check
910 .apply(item_scoping, summary.monitor_quantity)
911 .await?;
912 enforcement.check_ins = CategoryLimit::new(
913 DataCategory::Monitor,
914 summary.monitor_quantity,
915 checkin_limits.longest(),
916 );
917 rate_limits.merge(checkin_limits);
918 }
919
920 if enforcement.is_event_active() {
922 enforcement.spans = enforcement
923 .event
924 .clone_for(DataCategory::Span, summary.span_quantity);
925
926 enforcement.spans_indexed = enforcement
927 .event_indexed
928 .clone_for(DataCategory::SpanIndexed, summary.span_quantity);
929 } else if summary.span_quantity > 0 {
930 let mut span_limits = self
931 .check
932 .apply(scoping.item(DataCategory::Span), summary.span_quantity)
933 .await?;
934 enforcement.spans = CategoryLimit::new(
935 DataCategory::Span,
936 summary.span_quantity,
937 span_limits.longest(),
938 );
939
940 if span_limits.is_empty() {
941 span_limits.merge(
942 self.check
943 .apply(
944 scoping.item(DataCategory::SpanIndexed),
945 summary.span_quantity,
946 )
947 .await?,
948 );
949 }
950
951 enforcement.spans_indexed = CategoryLimit::new(
952 DataCategory::SpanIndexed,
953 summary.span_quantity,
954 span_limits.longest(),
955 );
956
957 rate_limits.merge(span_limits);
958 }
959
960 if summary.profile_chunk_quantity > 0 {
962 let item_scoping = scoping.item(DataCategory::ProfileChunk);
963 let limits = self
964 .check
965 .apply(item_scoping, summary.profile_chunk_quantity)
966 .await?;
967 enforcement.profile_chunks = CategoryLimit::new(
968 DataCategory::ProfileChunk,
969 summary.profile_chunk_quantity,
970 limits.longest(),
971 );
972 rate_limits.merge(limits);
973 }
974
975 if summary.profile_chunk_ui_quantity > 0 {
976 let item_scoping = scoping.item(DataCategory::ProfileChunkUi);
977 let limits = self
978 .check
979 .apply(item_scoping, summary.profile_chunk_ui_quantity)
980 .await?;
981 enforcement.profile_chunks_ui = CategoryLimit::new(
982 DataCategory::ProfileChunkUi,
983 summary.profile_chunk_ui_quantity,
984 limits.longest(),
985 );
986 rate_limits.merge(limits);
987 }
988
989 Ok((enforcement, rate_limits))
990 }
991}
992
993impl<F, E, R> fmt::Debug for EnvelopeLimiter<F, E, R> {
994 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
995 f.debug_struct("EnvelopeLimiter")
996 .field("event_category", &self.event_category)
997 .finish()
998 }
999}
1000
1001#[cfg(test)]
1002mod tests {
1003
1004 use std::collections::{BTreeMap, BTreeSet};
1005 use std::sync::Arc;
1006
1007 use relay_base_schema::organization::OrganizationId;
1008 use relay_base_schema::project::{ProjectId, ProjectKey};
1009 use relay_metrics::MetricNamespace;
1010 use relay_quotas::RetryAfter;
1011 use relay_system::Addr;
1012 use smallvec::smallvec;
1013 use tokio::sync::Mutex;
1014
1015 use super::*;
1016 use crate::{
1017 envelope::{AttachmentType, ContentType, SourceQuantities},
1018 extractors::RequestMeta,
1019 };
1020
1021 #[tokio::test]
1022 async fn test_format_rate_limits() {
1023 let mut rate_limits = RateLimits::new();
1024
1025 rate_limits.add(RateLimit {
1027 categories: DataCategories::new(),
1028 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1029 reason_code: Some(ReasonCode::new("my_limit")),
1030 retry_after: RetryAfter::from_secs(42),
1031 namespaces: smallvec![],
1032 });
1033
1034 rate_limits.add(RateLimit {
1036 categories: smallvec![DataCategory::Transaction, DataCategory::Security],
1037 scope: RateLimitScope::Project(ProjectId::new(21)),
1038 reason_code: None,
1039 retry_after: RetryAfter::from_secs(4711),
1040 namespaces: smallvec![],
1041 });
1042
1043 let formatted = format_rate_limits(&rate_limits);
1044 let expected = "42::organization:my_limit, 4711:transaction;security:project";
1045 assert_eq!(formatted, expected);
1046 }
1047
1048 #[tokio::test]
1049 async fn test_format_rate_limits_namespace() {
1050 let mut rate_limits = RateLimits::new();
1051
1052 rate_limits.add(RateLimit {
1054 categories: smallvec![DataCategory::MetricBucket],
1055 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1056 reason_code: Some(ReasonCode::new("my_limit")),
1057 retry_after: RetryAfter::from_secs(42),
1058 namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1059 });
1060
1061 rate_limits.add(RateLimit {
1063 categories: smallvec![DataCategory::MetricBucket],
1064 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1065 reason_code: None,
1066 retry_after: RetryAfter::from_secs(42),
1067 namespaces: smallvec![MetricNamespace::Spans],
1068 });
1069
1070 let formatted = format_rate_limits(&rate_limits);
1071 let expected = "42:metric_bucket:organization:my_limit:custom;spans, 42:metric_bucket:organization::spans";
1072 assert_eq!(formatted, expected);
1073 }
1074
1075 #[tokio::test]
1076 async fn test_parse_invalid_rate_limits() {
1077 let scoping = Scoping {
1078 organization_id: OrganizationId::new(42),
1079 project_id: ProjectId::new(21),
1080 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1081 key_id: Some(17),
1082 };
1083
1084 assert!(parse_rate_limits(&scoping, "").is_ok());
1085 assert!(parse_rate_limits(&scoping, "invalid").is_ok());
1086 assert!(parse_rate_limits(&scoping, ",,,").is_ok());
1087 }
1088
1089 #[tokio::test]
1090 async fn test_parse_rate_limits() {
1091 let scoping = Scoping {
1092 organization_id: OrganizationId::new(42),
1093 project_id: ProjectId::new(21),
1094 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1095 key_id: Some(17),
1096 };
1097
1098 let formatted =
1100 "42::organization:my_limit, invalid, 4711:foobar;transaction;security:project";
1101 let rate_limits: Vec<RateLimit> =
1102 parse_rate_limits(&scoping, formatted).into_iter().collect();
1103
1104 assert_eq!(
1105 rate_limits,
1106 vec![
1107 RateLimit {
1108 categories: DataCategories::new(),
1109 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1110 reason_code: Some(ReasonCode::new("my_limit")),
1111 retry_after: rate_limits[0].retry_after,
1112 namespaces: smallvec![],
1113 },
1114 RateLimit {
1115 categories: smallvec![
1116 DataCategory::Unknown,
1117 DataCategory::Transaction,
1118 DataCategory::Security,
1119 ],
1120 scope: RateLimitScope::Project(ProjectId::new(21)),
1121 reason_code: None,
1122 retry_after: rate_limits[1].retry_after,
1123 namespaces: smallvec![],
1124 }
1125 ]
1126 );
1127
1128 assert_eq!(42, rate_limits[0].retry_after.remaining_seconds());
1129 assert_eq!(4711, rate_limits[1].retry_after.remaining_seconds());
1130 }
1131
1132 #[tokio::test]
1133 async fn test_parse_rate_limits_namespace() {
1134 let scoping = Scoping {
1135 organization_id: OrganizationId::new(42),
1136 project_id: ProjectId::new(21),
1137 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1138 key_id: Some(17),
1139 };
1140
1141 let formatted = "42:metric_bucket:organization::custom;spans";
1142 let rate_limits: Vec<RateLimit> =
1143 parse_rate_limits(&scoping, formatted).into_iter().collect();
1144
1145 assert_eq!(
1146 rate_limits,
1147 vec![RateLimit {
1148 categories: smallvec![DataCategory::MetricBucket],
1149 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1150 reason_code: None,
1151 retry_after: rate_limits[0].retry_after,
1152 namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1153 }]
1154 );
1155 }
1156
1157 #[tokio::test]
1158 async fn test_parse_rate_limits_empty_namespace() {
1159 let scoping = Scoping {
1160 organization_id: OrganizationId::new(42),
1161 project_id: ProjectId::new(21),
1162 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1163 key_id: Some(17),
1164 };
1165
1166 let formatted = "42:metric_bucket:organization:some_reason:";
1168 let rate_limits: Vec<RateLimit> =
1169 parse_rate_limits(&scoping, formatted).into_iter().collect();
1170
1171 assert_eq!(
1172 rate_limits,
1173 vec![RateLimit {
1174 categories: smallvec![DataCategory::MetricBucket],
1175 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1176 reason_code: Some(ReasonCode::new("some_reason")),
1177 retry_after: rate_limits[0].retry_after,
1178 namespaces: smallvec![],
1179 }]
1180 );
1181 }
1182
1183 #[tokio::test]
1184 async fn test_parse_rate_limits_only_unknown() {
1185 let scoping = Scoping {
1186 organization_id: OrganizationId::new(42),
1187 project_id: ProjectId::new(21),
1188 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1189 key_id: Some(17),
1190 };
1191
1192 let formatted = "42:foo;bar:organization";
1193 let rate_limits: Vec<RateLimit> =
1194 parse_rate_limits(&scoping, formatted).into_iter().collect();
1195
1196 assert_eq!(
1197 rate_limits,
1198 vec![RateLimit {
1199 categories: smallvec![DataCategory::Unknown, DataCategory::Unknown],
1200 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1201 reason_code: None,
1202 retry_after: rate_limits[0].retry_after,
1203 namespaces: smallvec![],
1204 },]
1205 );
1206 }
1207
1208 macro_rules! envelope {
1209 ($( $item_type:ident $( :: $attachment_type:ident )? ),*) => {{
1210 let bytes = "{\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}";
1211 #[allow(unused_mut)]
1212 let mut envelope = Envelope::parse_bytes(bytes.into()).unwrap();
1213 $(
1214 let mut item = Item::new(ItemType::$item_type);
1215 item.set_payload(ContentType::OctetStream, "0123456789");
1216 $( item.set_attachment_type(AttachmentType::$attachment_type); )?
1217 envelope.add_item(item);
1218 )*
1219
1220 let (outcome_aggregator, _) = Addr::custom();
1221
1222 ManagedEnvelope::new(
1223 envelope,
1224 outcome_aggregator,
1225 )
1226 }}
1227 }
1228
1229 fn set_extracted(envelope: &mut Envelope, ty: ItemType) {
1230 envelope
1231 .get_item_by_mut(|item| *item.ty() == ty)
1232 .unwrap()
1233 .set_metrics_extracted(true);
1234 }
1235
1236 fn rate_limit(category: DataCategory) -> RateLimit {
1237 RateLimit {
1238 categories: vec![category].into(),
1239 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1240 reason_code: None,
1241 retry_after: RetryAfter::from_secs(60),
1242 namespaces: smallvec![],
1243 }
1244 }
1245
1246 #[derive(Debug, Default)]
1247 struct MockLimiter {
1248 denied: Vec<DataCategory>,
1249 called: BTreeMap<DataCategory, usize>,
1250 checked: BTreeSet<DataCategory>,
1251 }
1252
1253 impl MockLimiter {
1254 pub fn deny(mut self, category: DataCategory) -> Self {
1255 self.denied.push(category);
1256 self
1257 }
1258
1259 pub fn check(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, ()> {
1260 let cat = scoping.category;
1261 let previous = self.called.insert(cat, quantity);
1262 assert!(previous.is_none(), "rate limiter invoked twice for {cat}");
1263
1264 let mut limits = RateLimits::new();
1265 if self.denied.contains(&cat) {
1266 limits.add(rate_limit(cat));
1267 }
1268 Ok(limits)
1269 }
1270
1271 #[track_caller]
1272 pub fn assert_call(&mut self, category: DataCategory, expected: usize) {
1273 self.checked.insert(category);
1274
1275 let quantity = self.called.get(&category).copied();
1276 assert_eq!(
1277 quantity,
1278 Some(expected),
1279 "Expected quantity `{expected}` for data category `{category}`, got {quantity:?}."
1280 );
1281 }
1282 }
1283
1284 impl Drop for MockLimiter {
1285 fn drop(&mut self) {
1286 if std::thread::panicking() {
1287 return;
1288 }
1289
1290 for checked in &self.checked {
1291 self.called.remove(checked);
1292 }
1293
1294 if self.called.is_empty() {
1295 return;
1296 }
1297
1298 let not_asserted = self
1299 .called
1300 .iter()
1301 .map(|(k, v)| format!("- {k}: {v}"))
1302 .collect::<Vec<_>>()
1303 .join("\n");
1304
1305 panic!("Following calls to the limiter were not asserted:\n{not_asserted}");
1306 }
1307 }
1308
1309 async fn enforce_and_apply(
1310 mock: Arc<Mutex<MockLimiter>>,
1311 envelope: &mut ManagedEnvelope,
1312 #[allow(unused_variables)] assume_event: Option<DataCategory>,
1313 ) -> (Enforcement, RateLimits) {
1314 let scoping = envelope.scoping();
1315
1316 #[allow(unused_mut)]
1317 let mut limiter = EnvelopeLimiter::new(CheckLimits::All, move |s, q| {
1318 let mock = mock.clone();
1319 async move {
1320 let mut mock = mock.lock().await;
1321 mock.check(s, q)
1322 }
1323 });
1324 #[cfg(feature = "processing")]
1325 if let Some(assume_event) = assume_event {
1326 limiter.assume_event(assume_event);
1327 }
1328
1329 let (enforcement, limits) = limiter
1330 .compute(envelope.envelope_mut(), &scoping)
1331 .await
1332 .unwrap();
1333
1334 enforcement.clone().apply_with_outcomes(envelope);
1337
1338 (enforcement, limits)
1339 }
1340
1341 fn mock_limiter(category: Option<DataCategory>) -> Arc<Mutex<MockLimiter>> {
1342 let mut mock = MockLimiter::default();
1343 if let Some(category) = category {
1344 mock = mock.deny(category);
1345 }
1346
1347 Arc::new(Mutex::new(mock))
1348 }
1349
1350 #[tokio::test]
1351 async fn test_enforce_pass_empty() {
1352 let mut envelope = envelope![];
1353
1354 let mock = mock_limiter(None);
1355 let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1356
1357 assert!(!limits.is_limited());
1358 assert!(envelope.envelope().is_empty());
1359 }
1360
1361 #[tokio::test]
1362 async fn test_enforce_limit_error_event() {
1363 let mut envelope = envelope![Event];
1364
1365 let mock = mock_limiter(Some(DataCategory::Error));
1366 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1367
1368 assert!(limits.is_limited());
1369 assert!(envelope.envelope().is_empty());
1370 mock.lock().await.assert_call(DataCategory::Error, 1);
1371 }
1372
1373 #[tokio::test]
1374 async fn test_enforce_limit_error_with_attachments() {
1375 let mut envelope = envelope![Event, Attachment];
1376
1377 let mock = mock_limiter(Some(DataCategory::Error));
1378 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1379
1380 assert!(limits.is_limited());
1381 assert!(envelope.envelope().is_empty());
1382 mock.lock().await.assert_call(DataCategory::Error, 1);
1383 }
1384
1385 #[tokio::test]
1386 async fn test_enforce_limit_minidump() {
1387 let mut envelope = envelope![Attachment::Minidump];
1388
1389 let mock = mock_limiter(Some(DataCategory::Error));
1390 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1391
1392 assert!(limits.is_limited());
1393 assert!(envelope.envelope().is_empty());
1394 mock.lock().await.assert_call(DataCategory::Error, 1);
1395 }
1396
1397 #[tokio::test]
1398 async fn test_enforce_limit_attachments() {
1399 let mut envelope = envelope![Attachment::Minidump, Attachment];
1400
1401 let mock = mock_limiter(Some(DataCategory::Attachment));
1402 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1403
1404 assert!(limits.is_limited());
1406 assert_eq!(envelope.envelope().len(), 1);
1407 mock.lock().await.assert_call(DataCategory::Error, 1);
1408 mock.lock().await.assert_call(DataCategory::Attachment, 20);
1409 }
1410
1411 #[tokio::test]
1413 async fn test_enforce_limit_profiles() {
1414 let mut envelope = envelope![Profile, Profile];
1415
1416 let mock = mock_limiter(Some(DataCategory::Profile));
1417 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1418
1419 assert!(limits.is_limited());
1420 assert_eq!(envelope.envelope().len(), 0);
1421 mock.lock().await.assert_call(DataCategory::Profile, 2);
1422
1423 assert_eq!(
1424 get_outcomes(enforcement),
1425 vec![
1426 (DataCategory::Profile, 2),
1427 (DataCategory::ProfileIndexed, 2)
1428 ]
1429 );
1430 }
1431
1432 #[tokio::test]
1434 async fn test_enforce_limit_profile_chunks_no_profile_type() {
1435 let mut envelope = envelope![ProfileChunk, ProfileChunk];
1438
1439 let mock = mock_limiter(Some(DataCategory::ProfileChunk));
1440 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1441 assert!(!limits.is_limited());
1442 assert_eq!(get_outcomes(enforcement), vec![]);
1443
1444 let mock = mock_limiter(Some(DataCategory::ProfileChunkUi));
1445 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1446 assert!(!limits.is_limited());
1447 assert_eq!(get_outcomes(enforcement), vec![]);
1448
1449 assert_eq!(envelope.envelope().len(), 2);
1450 }
1451
1452 #[tokio::test]
1453 async fn test_enforce_limit_profile_chunks_ui() {
1454 let mut envelope = envelope![];
1455
1456 let mut item = Item::new(ItemType::ProfileChunk);
1457 item.set_profile_type(ProfileType::Backend);
1458 envelope.envelope_mut().add_item(item);
1459 let mut item = Item::new(ItemType::ProfileChunk);
1460 item.set_profile_type(ProfileType::Ui);
1461 envelope.envelope_mut().add_item(item);
1462
1463 let mock = mock_limiter(Some(DataCategory::ProfileChunkUi));
1464 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1465
1466 assert!(limits.is_limited());
1467 assert_eq!(envelope.envelope().len(), 1);
1468 mock.lock()
1469 .await
1470 .assert_call(DataCategory::ProfileChunkUi, 1);
1471 mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1472
1473 assert_eq!(
1474 get_outcomes(enforcement),
1475 vec![(DataCategory::ProfileChunkUi, 1)]
1476 );
1477 }
1478
1479 #[tokio::test]
1480 async fn test_enforce_limit_profile_chunks_backend() {
1481 let mut envelope = envelope![];
1482
1483 let mut item = Item::new(ItemType::ProfileChunk);
1484 item.set_profile_type(ProfileType::Backend);
1485 envelope.envelope_mut().add_item(item);
1486 let mut item = Item::new(ItemType::ProfileChunk);
1487 item.set_profile_type(ProfileType::Ui);
1488 envelope.envelope_mut().add_item(item);
1489
1490 let mock = mock_limiter(Some(DataCategory::ProfileChunk));
1491 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1492
1493 assert!(limits.is_limited());
1494 assert_eq!(envelope.envelope().len(), 1);
1495 mock.lock()
1496 .await
1497 .assert_call(DataCategory::ProfileChunkUi, 1);
1498 mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1499
1500 assert_eq!(
1501 get_outcomes(enforcement),
1502 vec![(DataCategory::ProfileChunk, 1)]
1503 );
1504 }
1505
1506 #[tokio::test]
1508 async fn test_enforce_limit_replays() {
1509 let mut envelope = envelope![ReplayEvent, ReplayRecording, ReplayVideo];
1510
1511 let mock = mock_limiter(Some(DataCategory::Replay));
1512 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1513
1514 assert!(limits.is_limited());
1515 assert_eq!(envelope.envelope().len(), 0);
1516 mock.lock().await.assert_call(DataCategory::Replay, 3);
1517
1518 assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Replay, 3),]);
1519 }
1520
1521 #[tokio::test]
1523 async fn test_enforce_limit_monitor_checkins() {
1524 let mut envelope = envelope![CheckIn];
1525
1526 let mock = mock_limiter(Some(DataCategory::Monitor));
1527 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1528
1529 assert!(limits.is_limited());
1530 assert_eq!(envelope.envelope().len(), 0);
1531 mock.lock().await.assert_call(DataCategory::Monitor, 1);
1532
1533 assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Monitor, 1)])
1534 }
1535
1536 #[tokio::test]
1537 async fn test_enforce_pass_minidump() {
1538 let mut envelope = envelope![Attachment::Minidump];
1539
1540 let mock = mock_limiter(Some(DataCategory::Attachment));
1541 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1542
1543 assert!(!limits.is_limited());
1545 assert_eq!(envelope.envelope().len(), 1);
1546 mock.lock().await.assert_call(DataCategory::Error, 1);
1547 mock.lock().await.assert_call(DataCategory::Attachment, 10);
1548 }
1549
1550 #[tokio::test]
1551 async fn test_enforce_skip_rate_limited() {
1552 let mut envelope = envelope![];
1553
1554 let mut item = Item::new(ItemType::Attachment);
1555 item.set_payload(ContentType::OctetStream, "0123456789");
1556 item.set_rate_limited(true);
1557 envelope.envelope_mut().add_item(item);
1558
1559 let mock = mock_limiter(Some(DataCategory::Error));
1560 let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1561
1562 assert!(!limits.is_limited()); assert_eq!(envelope.envelope().len(), 1); }
1565
1566 #[tokio::test]
1567 async fn test_enforce_pass_sessions() {
1568 let mut envelope = envelope![Session, Session, Session];
1569
1570 let mock = mock_limiter(Some(DataCategory::Error));
1571 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1572
1573 assert!(!limits.is_limited());
1575 assert_eq!(envelope.envelope().len(), 3);
1576 mock.lock().await.assert_call(DataCategory::Session, 3);
1577 }
1578
1579 #[tokio::test]
1580 async fn test_enforce_limit_sessions() {
1581 let mut envelope = envelope![Session, Session, Event];
1582
1583 let mock = mock_limiter(Some(DataCategory::Session));
1584 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1585
1586 assert!(limits.is_limited());
1588 assert_eq!(envelope.envelope().len(), 1);
1589 mock.lock().await.assert_call(DataCategory::Error, 1);
1590 mock.lock().await.assert_call(DataCategory::Session, 2);
1591 }
1592
1593 #[tokio::test]
1594 #[cfg(feature = "processing")]
1595 async fn test_enforce_limit_assumed_event() {
1596 let mut envelope = envelope![];
1597
1598 let mock = mock_limiter(Some(DataCategory::Transaction));
1599 let (_, limits) =
1600 enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Transaction)).await;
1601
1602 assert!(limits.is_limited());
1603 assert!(envelope.envelope().is_empty()); mock.lock().await.assert_call(DataCategory::Transaction, 1);
1605 }
1606
1607 #[tokio::test]
1608 #[cfg(feature = "processing")]
1609 async fn test_enforce_limit_assumed_attachments() {
1610 let mut envelope = envelope![Attachment, Attachment];
1611
1612 let mock = mock_limiter(Some(DataCategory::Error));
1613 let (_, limits) =
1614 enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Error)).await;
1615
1616 assert!(limits.is_limited());
1617 assert!(envelope.envelope().is_empty());
1618 mock.lock().await.assert_call(DataCategory::Error, 1);
1619 }
1620
1621 #[tokio::test]
1622 async fn test_enforce_transaction() {
1623 let mut envelope = envelope![Transaction];
1624
1625 let mock = mock_limiter(Some(DataCategory::Transaction));
1626 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1627
1628 assert!(limits.is_limited());
1629 assert!(enforcement.event_indexed.is_active());
1630 assert!(enforcement.event.is_active());
1631 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1632
1633 assert_eq!(
1634 get_outcomes(enforcement),
1635 vec![
1636 (DataCategory::Transaction, 1),
1637 (DataCategory::TransactionIndexed, 1),
1638 ]
1639 );
1640 }
1641
1642 #[tokio::test]
1643 async fn test_enforce_transaction_non_indexed() {
1644 let mut envelope = envelope![Transaction, Profile];
1645 let scoping = envelope.scoping();
1646
1647 let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1648
1649 let mock_clone = mock.clone();
1650 let limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, move |s, q| {
1651 let mock_clone = mock_clone.clone();
1652 async move {
1653 let mut mock = mock_clone.lock().await;
1654 mock.check(s, q)
1655 }
1656 });
1657 let (enforcement, limits) = limiter
1658 .compute(envelope.envelope_mut(), &scoping)
1659 .await
1660 .unwrap();
1661 enforcement.clone().apply_with_outcomes(&mut envelope);
1662
1663 assert!(!limits.is_limited());
1664 assert!(!enforcement.event_indexed.is_active());
1665 assert!(!enforcement.event.is_active());
1666 assert!(!enforcement.profiles_indexed.is_active());
1667 assert!(!enforcement.profiles.is_active());
1668 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1669 mock.lock().await.assert_call(DataCategory::Profile, 1);
1670 }
1671
1672 #[tokio::test]
1673 async fn test_enforce_transaction_no_indexing_quota() {
1674 let mut envelope = envelope![Transaction];
1675
1676 let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1677 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1678
1679 assert!(limits.is_limited());
1680 assert!(enforcement.event_indexed.is_active());
1681 assert!(!enforcement.event.is_active());
1682 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1683 mock.lock()
1684 .await
1685 .assert_call(DataCategory::TransactionIndexed, 1);
1686 }
1687
1688 #[tokio::test]
1689 async fn test_enforce_transaction_attachment_enforced() {
1690 let mut envelope = envelope![Transaction, Attachment];
1691
1692 let mock = mock_limiter(Some(DataCategory::Transaction));
1693 let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1694
1695 assert!(enforcement.event.is_active());
1696 assert!(enforcement.attachments.is_active());
1697 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1698 }
1699
1700 fn get_outcomes(enforcement: Enforcement) -> Vec<(DataCategory, usize)> {
1701 enforcement
1702 .get_outcomes()
1703 .map(|(_, data_category, quantity)| (data_category, quantity))
1704 .collect::<Vec<_>>()
1705 }
1706
1707 #[tokio::test]
1708 async fn test_enforce_transaction_profile_enforced() {
1709 let mut envelope = envelope![Transaction, Profile];
1710
1711 let mock = mock_limiter(Some(DataCategory::Transaction));
1712 let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1713
1714 assert!(enforcement.event.is_active());
1715 assert!(enforcement.profiles.is_active());
1716 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1717
1718 assert_eq!(
1719 get_outcomes(enforcement),
1720 vec![
1721 (DataCategory::Transaction, 1),
1722 (DataCategory::TransactionIndexed, 1),
1723 (DataCategory::Profile, 1),
1724 (DataCategory::ProfileIndexed, 1),
1725 ]
1726 );
1727 }
1728
1729 #[tokio::test]
1730 async fn test_enforce_transaction_standalone_profile_enforced() {
1731 let mut envelope = envelope![Profile];
1733
1734 let mock = mock_limiter(Some(DataCategory::Transaction));
1735 let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1736
1737 assert!(enforcement.profiles.is_active());
1738 mock.lock().await.assert_call(DataCategory::Profile, 1);
1739 mock.lock().await.assert_call(DataCategory::Transaction, 0);
1740
1741 assert_eq!(
1742 get_outcomes(enforcement),
1743 vec![
1744 (DataCategory::Profile, 1),
1745 (DataCategory::ProfileIndexed, 1),
1746 ]
1747 );
1748 }
1749
1750 #[tokio::test]
1751 async fn test_enforce_transaction_attachment_enforced_indexing_quota() {
1752 let mut envelope = envelope![Transaction, Attachment];
1753 set_extracted(envelope.envelope_mut(), ItemType::Transaction);
1754
1755 let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1756 let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1757
1758 assert!(!enforcement.event.is_active());
1759 assert!(enforcement.event_indexed.is_active());
1760 assert!(enforcement.attachments.is_active());
1761 assert!(enforcement.attachment_items.is_active());
1762 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1763 mock.lock()
1764 .await
1765 .assert_call(DataCategory::TransactionIndexed, 1);
1766
1767 assert_eq!(
1768 get_outcomes(enforcement),
1769 vec![
1770 (DataCategory::TransactionIndexed, 1),
1771 (DataCategory::Attachment, 10),
1772 (DataCategory::AttachmentItem, 1)
1773 ]
1774 );
1775 }
1776
1777 #[tokio::test]
1778 async fn test_enforce_span() {
1779 let mut envelope = envelope![Span, Span];
1780
1781 let mock = mock_limiter(Some(DataCategory::Span));
1782 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1783
1784 assert!(limits.is_limited());
1785 assert!(enforcement.spans_indexed.is_active());
1786 assert!(enforcement.spans.is_active());
1787 mock.lock().await.assert_call(DataCategory::Span, 2);
1788
1789 assert_eq!(
1790 get_outcomes(enforcement),
1791 vec![(DataCategory::Span, 2), (DataCategory::SpanIndexed, 2)]
1792 );
1793 }
1794
1795 #[tokio::test]
1796 async fn test_enforce_span_no_indexing_quota() {
1797 let mut envelope = envelope![Span, Span];
1798
1799 let mock = mock_limiter(Some(DataCategory::SpanIndexed));
1800 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1801
1802 assert!(limits.is_limited());
1803 assert!(enforcement.spans_indexed.is_active());
1804 assert!(!enforcement.spans.is_active());
1805 mock.lock().await.assert_call(DataCategory::Span, 2);
1806 mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
1807
1808 assert_eq!(
1809 get_outcomes(enforcement),
1810 vec![(DataCategory::SpanIndexed, 2)]
1811 );
1812 }
1813
1814 #[tokio::test]
1815 async fn test_enforce_span_metrics_extracted_no_indexing_quota() {
1816 let mut envelope = envelope![Span, Span];
1817 set_extracted(envelope.envelope_mut(), ItemType::Span);
1818
1819 let mock = mock_limiter(Some(DataCategory::SpanIndexed));
1820 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1821
1822 assert!(limits.is_limited());
1823 assert!(enforcement.spans_indexed.is_active());
1824 assert!(!enforcement.spans.is_active());
1825 mock.lock().await.assert_call(DataCategory::Span, 2);
1826 mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
1827
1828 assert_eq!(
1829 get_outcomes(enforcement),
1830 vec![(DataCategory::SpanIndexed, 2)]
1831 );
1832 }
1833
1834 #[test]
1835 fn test_source_quantity_for_total_quantity() {
1836 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
1837 .parse()
1838 .unwrap();
1839 let request_meta = RequestMeta::new(dsn);
1840
1841 let mut envelope = Envelope::from_request(None, request_meta);
1842
1843 let mut item = Item::new(ItemType::MetricBuckets);
1844 item.set_source_quantities(SourceQuantities {
1845 transactions: 5,
1846 spans: 0,
1847 profiles: 2,
1848 buckets: 5,
1849 });
1850 envelope.add_item(item);
1851
1852 let mut item = Item::new(ItemType::MetricBuckets);
1853 item.set_source_quantities(SourceQuantities {
1854 transactions: 2,
1855 spans: 0,
1856 profiles: 0,
1857 buckets: 3,
1858 });
1859 envelope.add_item(item);
1860
1861 let summary = EnvelopeSummary::compute(&envelope);
1862
1863 assert_eq!(summary.profile_quantity, 2);
1864 assert_eq!(summary.secondary_transaction_quantity, 7);
1865 }
1866
1867 #[tokio::test]
1868 async fn test_enforce_limit_logs_count() {
1869 let mut envelope = envelope![Log, Log];
1870
1871 let mock = mock_limiter(Some(DataCategory::LogItem));
1872 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1873
1874 assert!(limits.is_limited());
1875 assert_eq!(envelope.envelope().len(), 0);
1876 mock.lock().await.assert_call(DataCategory::LogItem, 2);
1877 mock.lock().await.assert_call(DataCategory::LogByte, 20);
1878
1879 assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogItem, 2)]);
1880 }
1881
1882 #[tokio::test]
1883 async fn test_enforce_limit_logs_bytes() {
1884 let mut envelope = envelope![Log, Log];
1885
1886 let mock = mock_limiter(Some(DataCategory::LogByte));
1887 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1888
1889 assert!(limits.is_limited());
1890 assert_eq!(envelope.envelope().len(), 0);
1891 mock.lock().await.assert_call(DataCategory::LogItem, 2);
1892 mock.lock().await.assert_call(DataCategory::LogByte, 20);
1893
1894 assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogByte, 20)]);
1895 }
1896}