1use std::fmt::{self, Write};
2use std::future::Future;
3use std::marker::PhantomData;
4
5use relay_profiling::ProfileType;
6use relay_quotas::{
7 DataCategories, DataCategory, ItemScoping, QuotaScope, RateLimit, RateLimitScope, RateLimits,
8 ReasonCode, Scoping,
9};
10
11use crate::envelope::{Envelope, Item, ItemType};
12use crate::managed::ManagedEnvelope;
13use crate::services::outcome::Outcome;
14
15pub const RATE_LIMITS_HEADER: &str = "X-Sentry-Rate-Limits";
17
18pub fn format_rate_limits(rate_limits: &RateLimits) -> String {
20 let mut header = String::new();
21
22 for rate_limit in rate_limits {
23 if !header.is_empty() {
24 header.push_str(", ");
25 }
26
27 write!(header, "{}:", rate_limit.retry_after.remaining_seconds()).ok();
28
29 for (index, category) in rate_limit.categories.iter().enumerate() {
30 if index > 0 {
31 header.push(';');
32 }
33 write!(header, "{category}").ok();
34 }
35
36 write!(header, ":{}", rate_limit.scope.name()).ok();
37
38 if let Some(ref reason_code) = rate_limit.reason_code {
39 write!(header, ":{reason_code}").ok();
40 } else if !rate_limit.namespaces.is_empty() {
41 write!(header, ":").ok(); }
43
44 for (index, namespace) in rate_limit.namespaces.iter().enumerate() {
45 header.push(if index == 0 { ':' } else { ';' });
46 write!(header, "{namespace}").ok();
47 }
48 }
49
50 header
51}
52
53pub fn parse_rate_limits(scoping: &Scoping, string: &str) -> RateLimits {
55 let mut rate_limits = RateLimits::new();
56
57 for limit in string.split(',') {
58 let limit = limit.trim();
59 if limit.is_empty() {
60 continue;
61 }
62
63 let mut components = limit.split(':');
64
65 let retry_after = match components.next().and_then(|s| s.parse().ok()) {
66 Some(retry_after) => retry_after,
67 None => continue,
68 };
69
70 let mut categories = DataCategories::new();
71 for category in components.next().unwrap_or("").split(';') {
72 if !category.is_empty() {
73 categories.push(DataCategory::from_name(category));
74 }
75 }
76
77 let quota_scope = QuotaScope::from_name(components.next().unwrap_or(""));
78 let scope = RateLimitScope::for_quota(*scoping, quota_scope);
79
80 let reason_code = components
81 .next()
82 .filter(|s| !s.is_empty())
83 .map(ReasonCode::new);
84
85 let namespace = components
86 .next()
87 .unwrap_or("")
88 .split(';')
89 .filter(|s| !s.is_empty())
90 .filter_map(|s| s.parse().ok())
91 .collect();
92
93 rate_limits.add(RateLimit {
94 categories,
95 scope,
96 reason_code,
97 retry_after,
98 namespaces: namespace,
99 });
100 }
101
102 rate_limits
103}
104
105fn infer_event_category(item: &Item) -> Option<DataCategory> {
113 match item.ty() {
114 ItemType::Event => Some(DataCategory::Error),
115 ItemType::Transaction => Some(DataCategory::Transaction),
116 ItemType::Security | ItemType::RawSecurity => Some(DataCategory::Security),
117 ItemType::UnrealReport => Some(DataCategory::Error),
118 ItemType::UserReportV2 => Some(DataCategory::UserReportV2),
119 ItemType::Attachment if item.creates_event() => Some(DataCategory::Error),
120 ItemType::Attachment => None,
121 ItemType::Session => None,
122 ItemType::Sessions => None,
123 ItemType::Statsd => None,
124 ItemType::MetricBuckets => None,
125 ItemType::FormData => None,
126 ItemType::UserReport => None,
127 ItemType::Profile => None,
128 ItemType::ReplayEvent => None,
129 ItemType::ReplayRecording => None,
130 ItemType::ReplayVideo => None,
131 ItemType::ClientReport => None,
132 ItemType::CheckIn => None,
133 ItemType::Nel => None,
134 ItemType::Log => None,
135 ItemType::Span => None,
136 ItemType::OtelSpan => None,
137 ItemType::OtelTracesData => None,
138 ItemType::OtelLogsData => None,
139 ItemType::ProfileChunk => None,
140 ItemType::Unknown(_) => None,
141 }
142}
143
144#[non_exhaustive]
149#[derive(Clone, Copy, Debug, Default)]
150pub struct EnvelopeSummary {
151 pub event_category: Option<DataCategory>,
153
154 pub attachment_quantity: usize,
156
157 pub attachment_item_quantity: usize,
159
160 pub session_quantity: usize,
162
163 pub profile_quantity: usize,
165
166 pub replay_quantity: usize,
168
169 pub user_report_quantity: usize,
171
172 pub monitor_quantity: usize,
174
175 pub log_item_quantity: usize,
177
178 pub log_byte_quantity: usize,
180
181 pub secondary_transaction_quantity: usize,
190
191 pub secondary_span_quantity: usize,
193
194 pub span_quantity: usize,
196
197 pub has_plain_attachments: bool,
199
200 pub payload_size: usize,
202
203 pub profile_chunk_quantity: usize,
205 pub profile_chunk_ui_quantity: usize,
207}
208
209impl EnvelopeSummary {
210 pub fn empty() -> Self {
212 Self::default()
213 }
214
215 pub fn compute(envelope: &Envelope) -> Self {
217 let mut summary = Self::empty();
218
219 for item in envelope.items() {
220 if item.creates_event() {
221 summary.infer_category(item);
222 } else if item.ty() == &ItemType::Attachment {
223 summary.has_plain_attachments = true;
225 }
226
227 if item.rate_limited() {
230 continue;
231 }
232
233 if let Some(source_quantities) = item.source_quantities() {
234 summary.secondary_transaction_quantity += source_quantities.transactions;
235 summary.secondary_span_quantity += source_quantities.spans;
236 summary.profile_quantity += source_quantities.profiles;
237 }
238
239 summary.payload_size += item.len();
240
241 for (category, quantity) in item.quantities() {
242 summary.add_quantity(category, quantity);
243 }
244
245 if item.ty() == &ItemType::UserReport {
248 summary.user_report_quantity += 1;
249 }
250 }
251
252 summary
253 }
254
255 fn add_quantity(&mut self, category: DataCategory, quantity: usize) {
256 let target_quantity = match category {
257 DataCategory::Attachment => &mut self.attachment_quantity,
258 DataCategory::AttachmentItem => &mut self.attachment_item_quantity,
259 DataCategory::Session => &mut self.session_quantity,
260 DataCategory::Profile => &mut self.profile_quantity,
261 DataCategory::Replay => &mut self.replay_quantity,
262 DataCategory::DoNotUseReplayVideo => &mut self.replay_quantity,
263 DataCategory::Monitor => &mut self.monitor_quantity,
264 DataCategory::Span => &mut self.span_quantity,
265 DataCategory::LogItem => &mut self.log_item_quantity,
266 DataCategory::LogByte => &mut self.log_byte_quantity,
267 DataCategory::ProfileChunk => &mut self.profile_chunk_quantity,
268 DataCategory::ProfileChunkUi => &mut self.profile_chunk_ui_quantity,
269 _ => return,
271 };
272 *target_quantity += quantity;
273 }
274
275 fn infer_category(&mut self, item: &Item) {
280 if matches!(self.event_category, None | Some(DataCategory::Default))
281 && let Some(category) = infer_event_category(item)
282 {
283 self.event_category = Some(category);
284 }
285 }
286}
287
288#[derive(Debug)]
290#[cfg_attr(test, derive(Clone))]
291pub struct CategoryLimit {
292 category: DataCategory,
294 quantity: usize,
298 reason_code: Option<ReasonCode>,
302}
303
304impl CategoryLimit {
305 fn new(category: DataCategory, quantity: usize, rate_limit: Option<&RateLimit>) -> Self {
309 match rate_limit {
310 Some(limit) => Self {
311 category,
312 quantity,
313 reason_code: limit.reason_code.clone(),
314 },
315 None => Self::default(),
316 }
317 }
318
319 pub fn clone_for(&self, category: DataCategory, quantity: usize) -> CategoryLimit {
321 if !self.is_active() {
322 return Self::default();
323 }
324
325 Self {
326 category,
327 quantity,
328 reason_code: self.reason_code.clone(),
329 }
330 }
331
332 pub fn is_active(&self) -> bool {
337 self.quantity > 0
338 }
339}
340
341impl Default for CategoryLimit {
342 fn default() -> Self {
343 Self {
344 category: DataCategory::Default,
345 quantity: 0,
346 reason_code: None,
347 }
348 }
349}
350
351#[derive(Default, Debug)]
353#[cfg_attr(test, derive(Clone))]
354pub struct Enforcement {
355 pub event: CategoryLimit,
357 pub event_indexed: CategoryLimit,
359 pub attachments: CategoryLimit,
361 pub attachment_items: CategoryLimit,
363 pub sessions: CategoryLimit,
365 pub profiles: CategoryLimit,
367 pub profiles_indexed: CategoryLimit,
369 pub replays: CategoryLimit,
371 pub check_ins: CategoryLimit,
373 pub log_items: CategoryLimit,
375 pub log_bytes: CategoryLimit,
377 pub spans: CategoryLimit,
379 pub spans_indexed: CategoryLimit,
381 pub user_reports: CategoryLimit,
383 pub profile_chunks: CategoryLimit,
385 pub profile_chunks_ui: CategoryLimit,
387}
388
389impl Enforcement {
390 pub fn active_event(&self) -> Option<&CategoryLimit> {
394 if self.event.is_active() {
395 Some(&self.event)
396 } else if self.event_indexed.is_active() {
397 Some(&self.event_indexed)
398 } else {
399 None
400 }
401 }
402
403 pub fn is_event_active(&self) -> bool {
405 self.active_event().is_some()
406 }
407
408 fn get_outcomes(self) -> impl Iterator<Item = (Outcome, DataCategory, usize)> {
410 let Self {
411 event,
412 event_indexed,
413 attachments,
414 attachment_items,
415 sessions: _, profiles,
417 profiles_indexed,
418 replays,
419 check_ins,
420 log_items,
421 log_bytes,
422 spans,
423 spans_indexed,
424 user_reports,
425 profile_chunks,
426 profile_chunks_ui,
427 } = self;
428
429 let limits = [
430 event,
431 event_indexed,
432 attachments,
433 attachment_items,
434 profiles,
435 profiles_indexed,
436 replays,
437 check_ins,
438 log_items,
439 log_bytes,
440 spans,
441 spans_indexed,
442 user_reports,
443 profile_chunks,
444 profile_chunks_ui,
445 ];
446
447 limits
448 .into_iter()
449 .filter(move |limit| limit.is_active())
450 .map(move |limit| {
451 (
452 Outcome::RateLimited(limit.reason_code),
453 limit.category,
454 limit.quantity,
455 )
456 })
457 }
458
459 pub fn apply_with_outcomes(self, envelope: &mut ManagedEnvelope) {
495 envelope
496 .envelope_mut()
497 .retain_items(|item| self.retain_item(item));
498 self.track_outcomes(envelope);
499 }
500
501 fn retain_item(&self, item: &mut Item) -> bool {
503 if self.event.is_active() && item.requires_event() {
505 return false;
506 }
507
508 match item.ty() {
512 ItemType::Attachment => {
513 if !(self.attachments.is_active() || self.attachment_items.is_active()) {
514 return true;
515 }
516 if item.creates_event() {
517 item.set_rate_limited(true);
518 true
519 } else {
520 false
521 }
522 }
523 ItemType::Session => !self.sessions.is_active(),
524 ItemType::Profile => !self.profiles_indexed.is_active(),
525 ItemType::ReplayEvent => !self.replays.is_active(),
526 ItemType::ReplayVideo => !self.replays.is_active(),
527 ItemType::ReplayRecording => !self.replays.is_active(),
528 ItemType::UserReport => !self.user_reports.is_active(),
529 ItemType::CheckIn => !self.check_ins.is_active(),
530 ItemType::Log | ItemType::OtelLogsData => {
531 !(self.log_items.is_active() || self.log_bytes.is_active())
532 }
533 ItemType::Span | ItemType::OtelSpan | ItemType::OtelTracesData => {
534 !self.spans_indexed.is_active()
535 }
536 ItemType::ProfileChunk => match item.profile_type() {
537 Some(ProfileType::Backend) => !self.profile_chunks.is_active(),
538 Some(ProfileType::Ui) => !self.profile_chunks_ui.is_active(),
539 None => true,
540 },
541 ItemType::Event
542 | ItemType::Transaction
543 | ItemType::Security
544 | ItemType::FormData
545 | ItemType::RawSecurity
546 | ItemType::Nel
547 | ItemType::UnrealReport
548 | ItemType::Sessions
549 | ItemType::Statsd
550 | ItemType::MetricBuckets
551 | ItemType::ClientReport
552 | ItemType::UserReportV2 | ItemType::Unknown(_) => true,
554 }
555 }
556
557 fn track_outcomes(self, envelope: &mut ManagedEnvelope) {
561 for (outcome, category, quantity) in self.get_outcomes() {
562 envelope.track_outcome(outcome, category, quantity)
563 }
564 }
565}
566
567#[derive(Debug, Copy, Clone)]
569pub enum CheckLimits {
570 NonIndexed,
577 All,
579}
580
581struct Check<F, E, R> {
582 limits: CheckLimits,
583 check: F,
584 _1: PhantomData<E>,
585 _2: PhantomData<R>,
586}
587
588impl<F, E, R> Check<F, E, R>
589where
590 F: FnMut(ItemScoping, usize) -> R,
591 R: Future<Output = Result<RateLimits, E>>,
592{
593 async fn apply(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, E> {
594 if matches!(self.limits, CheckLimits::NonIndexed) && scoping.category.is_indexed() {
595 return Ok(RateLimits::default());
596 }
597
598 (self.check)(scoping, quantity).await
599 }
600}
601
602pub struct EnvelopeLimiter<F, E, R> {
614 check: Check<F, E, R>,
615 event_category: Option<DataCategory>,
616}
617
618impl<'a, F, E, R> EnvelopeLimiter<F, E, R>
619where
620 F: FnMut(ItemScoping, usize) -> R,
621 R: Future<Output = Result<RateLimits, E>>,
622{
623 pub fn new(limits: CheckLimits, check: F) -> Self {
625 Self {
626 check: Check {
627 check,
628 limits,
629 _1: PhantomData,
630 _2: PhantomData,
631 },
632 event_category: None,
633 }
634 }
635
636 pub fn assume_event(&mut self, category: DataCategory) {
642 self.event_category = Some(category);
643 }
644
645 pub async fn compute(
656 mut self,
657 envelope: &mut Envelope,
658 scoping: &'a Scoping,
659 ) -> Result<(Enforcement, RateLimits), E> {
660 let mut summary = EnvelopeSummary::compute(envelope);
661 summary.event_category = self.event_category.or(summary.event_category);
662
663 let (enforcement, rate_limits) = self.execute(&summary, scoping).await?;
664 Ok((enforcement, rate_limits))
665 }
666
667 async fn execute(
668 &mut self,
669 summary: &EnvelopeSummary,
670 scoping: &'a Scoping,
671 ) -> Result<(Enforcement, RateLimits), E> {
672 let mut rate_limits = RateLimits::new();
673 let mut enforcement = Enforcement::default();
674
675 if let Some(category) = summary.event_category {
677 let mut event_limits = self.check.apply(scoping.item(category), 1).await?;
679 enforcement.event = CategoryLimit::new(category, 1, event_limits.longest());
680
681 if let Some(index_category) = category.index_category() {
682 if event_limits.is_empty() {
685 event_limits.merge(self.check.apply(scoping.item(index_category), 1).await?);
686 }
687
688 enforcement.event_indexed =
689 CategoryLimit::new(index_category, 1, event_limits.longest());
690 };
691
692 rate_limits.merge(event_limits);
693 }
694
695 if let Some(limit) = enforcement.active_event() {
697 let limit1 = limit.clone_for(DataCategory::Attachment, summary.attachment_quantity);
698 let limit2 = limit.clone_for(
699 DataCategory::AttachmentItem,
700 summary.attachment_item_quantity,
701 );
702 enforcement.attachments = limit1;
703 enforcement.attachment_items = limit2;
704 } else {
705 let mut attachment_limits = RateLimits::new();
706 if summary.attachment_quantity > 0 {
707 let item_scoping = scoping.item(DataCategory::Attachment);
708
709 let attachment_byte_limits = self
710 .check
711 .apply(item_scoping, summary.attachment_quantity)
712 .await?;
713
714 enforcement.attachments = CategoryLimit::new(
715 DataCategory::Attachment,
716 summary.attachment_quantity,
717 attachment_byte_limits.longest(),
718 );
719 attachment_limits.merge(attachment_byte_limits);
720 }
721 if !attachment_limits.is_limited() && summary.attachment_item_quantity > 0 {
722 let item_scoping = scoping.item(DataCategory::AttachmentItem);
723
724 let attachment_item_limits = self
725 .check
726 .apply(item_scoping, summary.attachment_item_quantity)
727 .await?;
728
729 enforcement.attachment_items = CategoryLimit::new(
730 DataCategory::AttachmentItem,
731 summary.attachment_item_quantity,
732 attachment_item_limits.longest(),
733 );
734 attachment_limits.merge(attachment_item_limits);
735 }
736
737 if summary.has_plain_attachments {
741 rate_limits.merge(attachment_limits);
742 }
743 }
744
745 if summary.session_quantity > 0 {
747 let item_scoping = scoping.item(DataCategory::Session);
748 let session_limits = self
749 .check
750 .apply(item_scoping, summary.session_quantity)
751 .await?;
752 enforcement.sessions = CategoryLimit::new(
753 DataCategory::Session,
754 summary.session_quantity,
755 session_limits.longest(),
756 );
757 rate_limits.merge(session_limits);
758 }
759
760 if summary.log_item_quantity > 0 {
762 let item_scoping = scoping.item(DataCategory::LogItem);
763 let log_limits = self
764 .check
765 .apply(item_scoping, summary.log_item_quantity)
766 .await?;
767 enforcement.log_items = CategoryLimit::new(
768 DataCategory::LogItem,
769 summary.log_item_quantity,
770 log_limits.longest(),
771 );
772 rate_limits.merge(log_limits);
773 }
774 if summary.log_byte_quantity > 0 {
775 let item_scoping = scoping.item(DataCategory::LogByte);
776 let log_limits = self
777 .check
778 .apply(item_scoping, summary.log_byte_quantity)
779 .await?;
780 enforcement.log_bytes = CategoryLimit::new(
781 DataCategory::LogByte,
782 summary.log_byte_quantity,
783 log_limits.longest(),
784 );
785 rate_limits.merge(log_limits);
786 }
787
788 if enforcement.is_event_active() {
790 enforcement.profiles = enforcement
791 .event
792 .clone_for(DataCategory::Profile, summary.profile_quantity);
793
794 enforcement.profiles_indexed = enforcement
795 .event_indexed
796 .clone_for(DataCategory::ProfileIndexed, summary.profile_quantity)
797 } else if summary.profile_quantity > 0 {
798 let mut profile_limits = self
799 .check
800 .apply(
801 scoping.item(DataCategory::Profile),
802 summary.profile_quantity,
803 )
804 .await?;
805
806 if profile_limits.is_empty() && summary.event_category.is_none() {
809 profile_limits = self
810 .check
811 .apply(scoping.item(DataCategory::Transaction), 0)
812 .await?;
813 }
814
815 enforcement.profiles = CategoryLimit::new(
816 DataCategory::Profile,
817 summary.profile_quantity,
818 profile_limits.longest(),
819 );
820
821 if profile_limits.is_empty() {
822 profile_limits.merge(
823 self.check
824 .apply(
825 scoping.item(DataCategory::ProfileIndexed),
826 summary.profile_quantity,
827 )
828 .await?,
829 );
830 }
831
832 enforcement.profiles_indexed = CategoryLimit::new(
833 DataCategory::ProfileIndexed,
834 summary.profile_quantity,
835 profile_limits.longest(),
836 );
837
838 rate_limits.merge(profile_limits);
839 }
840
841 if summary.replay_quantity > 0 {
843 let item_scoping = scoping.item(DataCategory::Replay);
844 let replay_limits = self
845 .check
846 .apply(item_scoping, summary.replay_quantity)
847 .await?;
848 enforcement.replays = CategoryLimit::new(
849 DataCategory::Replay,
850 summary.replay_quantity,
851 replay_limits.longest(),
852 );
853 rate_limits.merge(replay_limits);
854 }
855
856 if summary.user_report_quantity > 0 {
858 let item_scoping = scoping.item(DataCategory::UserReportV2);
859 let user_report_v2_limits = self
860 .check
861 .apply(item_scoping, summary.user_report_quantity)
862 .await?;
863 enforcement.user_reports = CategoryLimit::new(
864 DataCategory::UserReportV2,
865 summary.user_report_quantity,
866 user_report_v2_limits.longest(),
867 );
868 rate_limits.merge(user_report_v2_limits);
869 }
870
871 if summary.monitor_quantity > 0 {
873 let item_scoping = scoping.item(DataCategory::Monitor);
874 let checkin_limits = self
875 .check
876 .apply(item_scoping, summary.monitor_quantity)
877 .await?;
878 enforcement.check_ins = CategoryLimit::new(
879 DataCategory::Monitor,
880 summary.monitor_quantity,
881 checkin_limits.longest(),
882 );
883 rate_limits.merge(checkin_limits);
884 }
885
886 if enforcement.is_event_active() {
888 enforcement.spans = enforcement
889 .event
890 .clone_for(DataCategory::Span, summary.span_quantity);
891
892 enforcement.spans_indexed = enforcement
893 .event_indexed
894 .clone_for(DataCategory::SpanIndexed, summary.span_quantity);
895 } else if summary.span_quantity > 0 {
896 let mut span_limits = self
897 .check
898 .apply(scoping.item(DataCategory::Span), summary.span_quantity)
899 .await?;
900 enforcement.spans = CategoryLimit::new(
901 DataCategory::Span,
902 summary.span_quantity,
903 span_limits.longest(),
904 );
905
906 if span_limits.is_empty() {
907 span_limits.merge(
908 self.check
909 .apply(
910 scoping.item(DataCategory::SpanIndexed),
911 summary.span_quantity,
912 )
913 .await?,
914 );
915 }
916
917 enforcement.spans_indexed = CategoryLimit::new(
918 DataCategory::SpanIndexed,
919 summary.span_quantity,
920 span_limits.longest(),
921 );
922
923 rate_limits.merge(span_limits);
924 }
925
926 if summary.profile_chunk_quantity > 0 {
928 let item_scoping = scoping.item(DataCategory::ProfileChunk);
929 let limits = self
930 .check
931 .apply(item_scoping, summary.profile_chunk_quantity)
932 .await?;
933 enforcement.profile_chunks = CategoryLimit::new(
934 DataCategory::ProfileChunk,
935 summary.profile_chunk_quantity,
936 limits.longest(),
937 );
938 rate_limits.merge(limits);
939 }
940
941 if summary.profile_chunk_ui_quantity > 0 {
942 let item_scoping = scoping.item(DataCategory::ProfileChunkUi);
943 let limits = self
944 .check
945 .apply(item_scoping, summary.profile_chunk_ui_quantity)
946 .await?;
947 enforcement.profile_chunks_ui = CategoryLimit::new(
948 DataCategory::ProfileChunkUi,
949 summary.profile_chunk_ui_quantity,
950 limits.longest(),
951 );
952 rate_limits.merge(limits);
953 }
954
955 Ok((enforcement, rate_limits))
956 }
957}
958
959impl<F, E, R> fmt::Debug for EnvelopeLimiter<F, E, R> {
960 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
961 f.debug_struct("EnvelopeLimiter")
962 .field("event_category", &self.event_category)
963 .finish()
964 }
965}
966
967#[cfg(test)]
968mod tests {
969
970 use std::collections::{BTreeMap, BTreeSet};
971 use std::sync::Arc;
972
973 use relay_base_schema::organization::OrganizationId;
974 use relay_base_schema::project::{ProjectId, ProjectKey};
975 use relay_metrics::MetricNamespace;
976 use relay_quotas::RetryAfter;
977 use relay_system::Addr;
978 use smallvec::smallvec;
979 use tokio::sync::Mutex;
980
981 use super::*;
982 use crate::{
983 envelope::{AttachmentType, ContentType, SourceQuantities},
984 extractors::RequestMeta,
985 };
986
987 #[tokio::test]
988 async fn test_format_rate_limits() {
989 let mut rate_limits = RateLimits::new();
990
991 rate_limits.add(RateLimit {
993 categories: DataCategories::new(),
994 scope: RateLimitScope::Organization(OrganizationId::new(42)),
995 reason_code: Some(ReasonCode::new("my_limit")),
996 retry_after: RetryAfter::from_secs(42),
997 namespaces: smallvec![],
998 });
999
1000 rate_limits.add(RateLimit {
1002 categories: smallvec![DataCategory::Transaction, DataCategory::Security],
1003 scope: RateLimitScope::Project(ProjectId::new(21)),
1004 reason_code: None,
1005 retry_after: RetryAfter::from_secs(4711),
1006 namespaces: smallvec![],
1007 });
1008
1009 let formatted = format_rate_limits(&rate_limits);
1010 let expected = "42::organization:my_limit, 4711:transaction;security:project";
1011 assert_eq!(formatted, expected);
1012 }
1013
1014 #[tokio::test]
1015 async fn test_format_rate_limits_namespace() {
1016 let mut rate_limits = RateLimits::new();
1017
1018 rate_limits.add(RateLimit {
1020 categories: smallvec![DataCategory::MetricBucket],
1021 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1022 reason_code: Some(ReasonCode::new("my_limit")),
1023 retry_after: RetryAfter::from_secs(42),
1024 namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1025 });
1026
1027 rate_limits.add(RateLimit {
1029 categories: smallvec![DataCategory::MetricBucket],
1030 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1031 reason_code: None,
1032 retry_after: RetryAfter::from_secs(42),
1033 namespaces: smallvec![MetricNamespace::Spans],
1034 });
1035
1036 let formatted = format_rate_limits(&rate_limits);
1037 let expected = "42:metric_bucket:organization:my_limit:custom;spans, 42:metric_bucket:organization::spans";
1038 assert_eq!(formatted, expected);
1039 }
1040
1041 #[tokio::test]
1042 async fn test_parse_invalid_rate_limits() {
1043 let scoping = Scoping {
1044 organization_id: OrganizationId::new(42),
1045 project_id: ProjectId::new(21),
1046 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1047 key_id: Some(17),
1048 };
1049
1050 assert!(parse_rate_limits(&scoping, "").is_ok());
1051 assert!(parse_rate_limits(&scoping, "invalid").is_ok());
1052 assert!(parse_rate_limits(&scoping, ",,,").is_ok());
1053 }
1054
1055 #[tokio::test]
1056 async fn test_parse_rate_limits() {
1057 let scoping = Scoping {
1058 organization_id: OrganizationId::new(42),
1059 project_id: ProjectId::new(21),
1060 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1061 key_id: Some(17),
1062 };
1063
1064 let formatted =
1066 "42::organization:my_limit, invalid, 4711:foobar;transaction;security:project";
1067 let rate_limits: Vec<RateLimit> =
1068 parse_rate_limits(&scoping, formatted).into_iter().collect();
1069
1070 assert_eq!(
1071 rate_limits,
1072 vec![
1073 RateLimit {
1074 categories: DataCategories::new(),
1075 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1076 reason_code: Some(ReasonCode::new("my_limit")),
1077 retry_after: rate_limits[0].retry_after,
1078 namespaces: smallvec![],
1079 },
1080 RateLimit {
1081 categories: smallvec![
1082 DataCategory::Unknown,
1083 DataCategory::Transaction,
1084 DataCategory::Security,
1085 ],
1086 scope: RateLimitScope::Project(ProjectId::new(21)),
1087 reason_code: None,
1088 retry_after: rate_limits[1].retry_after,
1089 namespaces: smallvec![],
1090 }
1091 ]
1092 );
1093
1094 assert_eq!(42, rate_limits[0].retry_after.remaining_seconds());
1095 assert_eq!(4711, rate_limits[1].retry_after.remaining_seconds());
1096 }
1097
1098 #[tokio::test]
1099 async fn test_parse_rate_limits_namespace() {
1100 let scoping = Scoping {
1101 organization_id: OrganizationId::new(42),
1102 project_id: ProjectId::new(21),
1103 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1104 key_id: Some(17),
1105 };
1106
1107 let formatted = "42:metric_bucket:organization::custom;spans";
1108 let rate_limits: Vec<RateLimit> =
1109 parse_rate_limits(&scoping, formatted).into_iter().collect();
1110
1111 assert_eq!(
1112 rate_limits,
1113 vec![RateLimit {
1114 categories: smallvec![DataCategory::MetricBucket],
1115 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1116 reason_code: None,
1117 retry_after: rate_limits[0].retry_after,
1118 namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1119 }]
1120 );
1121 }
1122
1123 #[tokio::test]
1124 async fn test_parse_rate_limits_empty_namespace() {
1125 let scoping = Scoping {
1126 organization_id: OrganizationId::new(42),
1127 project_id: ProjectId::new(21),
1128 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1129 key_id: Some(17),
1130 };
1131
1132 let formatted = "42:metric_bucket:organization:some_reason:";
1134 let rate_limits: Vec<RateLimit> =
1135 parse_rate_limits(&scoping, formatted).into_iter().collect();
1136
1137 assert_eq!(
1138 rate_limits,
1139 vec![RateLimit {
1140 categories: smallvec![DataCategory::MetricBucket],
1141 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1142 reason_code: Some(ReasonCode::new("some_reason")),
1143 retry_after: rate_limits[0].retry_after,
1144 namespaces: smallvec![],
1145 }]
1146 );
1147 }
1148
1149 #[tokio::test]
1150 async fn test_parse_rate_limits_only_unknown() {
1151 let scoping = Scoping {
1152 organization_id: OrganizationId::new(42),
1153 project_id: ProjectId::new(21),
1154 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1155 key_id: Some(17),
1156 };
1157
1158 let formatted = "42:foo;bar:organization";
1159 let rate_limits: Vec<RateLimit> =
1160 parse_rate_limits(&scoping, formatted).into_iter().collect();
1161
1162 assert_eq!(
1163 rate_limits,
1164 vec![RateLimit {
1165 categories: smallvec![DataCategory::Unknown, DataCategory::Unknown],
1166 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1167 reason_code: None,
1168 retry_after: rate_limits[0].retry_after,
1169 namespaces: smallvec![],
1170 },]
1171 );
1172 }
1173
1174 macro_rules! envelope {
1175 ($( $item_type:ident $( :: $attachment_type:ident )? ),*) => {{
1176 let bytes = "{\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}";
1177 #[allow(unused_mut)]
1178 let mut envelope = Envelope::parse_bytes(bytes.into()).unwrap();
1179 $(
1180 let mut item = Item::new(ItemType::$item_type);
1181 item.set_payload(ContentType::OctetStream, "0123456789");
1182 $( item.set_attachment_type(AttachmentType::$attachment_type); )?
1183 envelope.add_item(item);
1184 )*
1185
1186 let (outcome_aggregator, _) = Addr::custom();
1187
1188 ManagedEnvelope::new(
1189 envelope,
1190 outcome_aggregator,
1191 )
1192 }}
1193 }
1194
1195 fn set_extracted(envelope: &mut Envelope, ty: ItemType) {
1196 envelope
1197 .get_item_by_mut(|item| *item.ty() == ty)
1198 .unwrap()
1199 .set_metrics_extracted(true);
1200 }
1201
1202 fn rate_limit(category: DataCategory) -> RateLimit {
1203 RateLimit {
1204 categories: vec![category].into(),
1205 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1206 reason_code: None,
1207 retry_after: RetryAfter::from_secs(60),
1208 namespaces: smallvec![],
1209 }
1210 }
1211
1212 #[derive(Debug, Default)]
1213 struct MockLimiter {
1214 denied: Vec<DataCategory>,
1215 called: BTreeMap<DataCategory, usize>,
1216 checked: BTreeSet<DataCategory>,
1217 }
1218
1219 impl MockLimiter {
1220 pub fn deny(mut self, category: DataCategory) -> Self {
1221 self.denied.push(category);
1222 self
1223 }
1224
1225 pub fn check(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, ()> {
1226 let cat = scoping.category;
1227 let previous = self.called.insert(cat, quantity);
1228 assert!(previous.is_none(), "rate limiter invoked twice for {cat}");
1229
1230 let mut limits = RateLimits::new();
1231 if self.denied.contains(&cat) {
1232 limits.add(rate_limit(cat));
1233 }
1234 Ok(limits)
1235 }
1236
1237 #[track_caller]
1238 pub fn assert_call(&mut self, category: DataCategory, expected: usize) {
1239 self.checked.insert(category);
1240
1241 let quantity = self.called.get(&category).copied();
1242 assert_eq!(
1243 quantity,
1244 Some(expected),
1245 "Expected quantity `{expected}` for data category `{category}`, got {quantity:?}."
1246 );
1247 }
1248 }
1249
1250 impl Drop for MockLimiter {
1251 fn drop(&mut self) {
1252 if std::thread::panicking() {
1253 return;
1254 }
1255
1256 for checked in &self.checked {
1257 self.called.remove(checked);
1258 }
1259
1260 if self.called.is_empty() {
1261 return;
1262 }
1263
1264 let not_asserted = self
1265 .called
1266 .iter()
1267 .map(|(k, v)| format!("- {k}: {v}"))
1268 .collect::<Vec<_>>()
1269 .join("\n");
1270
1271 panic!("Following calls to the limiter were not asserted:\n{not_asserted}");
1272 }
1273 }
1274
1275 async fn enforce_and_apply(
1276 mock: Arc<Mutex<MockLimiter>>,
1277 envelope: &mut ManagedEnvelope,
1278 #[allow(unused_variables)] assume_event: Option<DataCategory>,
1279 ) -> (Enforcement, RateLimits) {
1280 let scoping = envelope.scoping();
1281
1282 #[allow(unused_mut)]
1283 let mut limiter = EnvelopeLimiter::new(CheckLimits::All, move |s, q| {
1284 let mock = mock.clone();
1285 async move {
1286 let mut mock = mock.lock().await;
1287 mock.check(s, q)
1288 }
1289 });
1290 #[cfg(feature = "processing")]
1291 if let Some(assume_event) = assume_event {
1292 limiter.assume_event(assume_event);
1293 }
1294
1295 let (enforcement, limits) = limiter
1296 .compute(envelope.envelope_mut(), &scoping)
1297 .await
1298 .unwrap();
1299
1300 enforcement.clone().apply_with_outcomes(envelope);
1303
1304 (enforcement, limits)
1305 }
1306
1307 fn mock_limiter(category: Option<DataCategory>) -> Arc<Mutex<MockLimiter>> {
1308 let mut mock = MockLimiter::default();
1309 if let Some(category) = category {
1310 mock = mock.deny(category);
1311 }
1312
1313 Arc::new(Mutex::new(mock))
1314 }
1315
1316 #[tokio::test]
1317 async fn test_enforce_pass_empty() {
1318 let mut envelope = envelope![];
1319
1320 let mock = mock_limiter(None);
1321 let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1322
1323 assert!(!limits.is_limited());
1324 assert!(envelope.envelope().is_empty());
1325 }
1326
1327 #[tokio::test]
1328 async fn test_enforce_limit_error_event() {
1329 let mut envelope = envelope![Event];
1330
1331 let mock = mock_limiter(Some(DataCategory::Error));
1332 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1333
1334 assert!(limits.is_limited());
1335 assert!(envelope.envelope().is_empty());
1336 mock.lock().await.assert_call(DataCategory::Error, 1);
1337 }
1338
1339 #[tokio::test]
1340 async fn test_enforce_limit_error_with_attachments() {
1341 let mut envelope = envelope![Event, Attachment];
1342
1343 let mock = mock_limiter(Some(DataCategory::Error));
1344 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1345
1346 assert!(limits.is_limited());
1347 assert!(envelope.envelope().is_empty());
1348 mock.lock().await.assert_call(DataCategory::Error, 1);
1349 }
1350
1351 #[tokio::test]
1352 async fn test_enforce_limit_minidump() {
1353 let mut envelope = envelope![Attachment::Minidump];
1354
1355 let mock = mock_limiter(Some(DataCategory::Error));
1356 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1357
1358 assert!(limits.is_limited());
1359 assert!(envelope.envelope().is_empty());
1360 mock.lock().await.assert_call(DataCategory::Error, 1);
1361 }
1362
1363 #[tokio::test]
1364 async fn test_enforce_limit_attachments() {
1365 let mut envelope = envelope![Attachment::Minidump, Attachment];
1366
1367 let mock = mock_limiter(Some(DataCategory::Attachment));
1368 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1369
1370 assert!(limits.is_limited());
1372 assert_eq!(envelope.envelope().len(), 1);
1373 mock.lock().await.assert_call(DataCategory::Error, 1);
1374 mock.lock().await.assert_call(DataCategory::Attachment, 20);
1375 }
1376
1377 #[tokio::test]
1379 async fn test_enforce_limit_profiles() {
1380 let mut envelope = envelope![Profile, Profile];
1381
1382 let mock = mock_limiter(Some(DataCategory::Profile));
1383 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1384
1385 assert!(limits.is_limited());
1386 assert_eq!(envelope.envelope().len(), 0);
1387 mock.lock().await.assert_call(DataCategory::Profile, 2);
1388
1389 assert_eq!(
1390 get_outcomes(enforcement),
1391 vec![
1392 (DataCategory::Profile, 2),
1393 (DataCategory::ProfileIndexed, 2)
1394 ]
1395 );
1396 }
1397
1398 #[tokio::test]
1400 async fn test_enforce_limit_profile_chunks_no_profile_type() {
1401 let mut envelope = envelope![ProfileChunk, ProfileChunk];
1404
1405 let mock = mock_limiter(Some(DataCategory::ProfileChunk));
1406 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1407 assert!(!limits.is_limited());
1408 assert_eq!(get_outcomes(enforcement), vec![]);
1409
1410 let mock = mock_limiter(Some(DataCategory::ProfileChunkUi));
1411 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1412 assert!(!limits.is_limited());
1413 assert_eq!(get_outcomes(enforcement), vec![]);
1414
1415 assert_eq!(envelope.envelope().len(), 2);
1416 }
1417
1418 #[tokio::test]
1419 async fn test_enforce_limit_profile_chunks_ui() {
1420 let mut envelope = envelope![];
1421
1422 let mut item = Item::new(ItemType::ProfileChunk);
1423 item.set_profile_type(ProfileType::Backend);
1424 envelope.envelope_mut().add_item(item);
1425 let mut item = Item::new(ItemType::ProfileChunk);
1426 item.set_profile_type(ProfileType::Ui);
1427 envelope.envelope_mut().add_item(item);
1428
1429 let mock = mock_limiter(Some(DataCategory::ProfileChunkUi));
1430 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1431
1432 assert!(limits.is_limited());
1433 assert_eq!(envelope.envelope().len(), 1);
1434 mock.lock()
1435 .await
1436 .assert_call(DataCategory::ProfileChunkUi, 1);
1437 mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1438
1439 assert_eq!(
1440 get_outcomes(enforcement),
1441 vec![(DataCategory::ProfileChunkUi, 1)]
1442 );
1443 }
1444
1445 #[tokio::test]
1446 async fn test_enforce_limit_profile_chunks_backend() {
1447 let mut envelope = envelope![];
1448
1449 let mut item = Item::new(ItemType::ProfileChunk);
1450 item.set_profile_type(ProfileType::Backend);
1451 envelope.envelope_mut().add_item(item);
1452 let mut item = Item::new(ItemType::ProfileChunk);
1453 item.set_profile_type(ProfileType::Ui);
1454 envelope.envelope_mut().add_item(item);
1455
1456 let mock = mock_limiter(Some(DataCategory::ProfileChunk));
1457 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1458
1459 assert!(limits.is_limited());
1460 assert_eq!(envelope.envelope().len(), 1);
1461 mock.lock()
1462 .await
1463 .assert_call(DataCategory::ProfileChunkUi, 1);
1464 mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1465
1466 assert_eq!(
1467 get_outcomes(enforcement),
1468 vec![(DataCategory::ProfileChunk, 1)]
1469 );
1470 }
1471
1472 #[tokio::test]
1474 async fn test_enforce_limit_replays() {
1475 let mut envelope = envelope![ReplayEvent, ReplayRecording, ReplayVideo];
1476
1477 let mock = mock_limiter(Some(DataCategory::Replay));
1478 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1479
1480 assert!(limits.is_limited());
1481 assert_eq!(envelope.envelope().len(), 0);
1482 mock.lock().await.assert_call(DataCategory::Replay, 3);
1483
1484 assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Replay, 3),]);
1485 }
1486
1487 #[tokio::test]
1489 async fn test_enforce_limit_monitor_checkins() {
1490 let mut envelope = envelope![CheckIn];
1491
1492 let mock = mock_limiter(Some(DataCategory::Monitor));
1493 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1494
1495 assert!(limits.is_limited());
1496 assert_eq!(envelope.envelope().len(), 0);
1497 mock.lock().await.assert_call(DataCategory::Monitor, 1);
1498
1499 assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Monitor, 1)])
1500 }
1501
1502 #[tokio::test]
1503 async fn test_enforce_pass_minidump() {
1504 let mut envelope = envelope![Attachment::Minidump];
1505
1506 let mock = mock_limiter(Some(DataCategory::Attachment));
1507 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1508
1509 assert!(!limits.is_limited());
1511 assert_eq!(envelope.envelope().len(), 1);
1512 mock.lock().await.assert_call(DataCategory::Error, 1);
1513 mock.lock().await.assert_call(DataCategory::Attachment, 10);
1514 }
1515
1516 #[tokio::test]
1517 async fn test_enforce_skip_rate_limited() {
1518 let mut envelope = envelope![];
1519
1520 let mut item = Item::new(ItemType::Attachment);
1521 item.set_payload(ContentType::OctetStream, "0123456789");
1522 item.set_rate_limited(true);
1523 envelope.envelope_mut().add_item(item);
1524
1525 let mock = mock_limiter(Some(DataCategory::Error));
1526 let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1527
1528 assert!(!limits.is_limited()); assert_eq!(envelope.envelope().len(), 1); }
1531
1532 #[tokio::test]
1533 async fn test_enforce_pass_sessions() {
1534 let mut envelope = envelope![Session, Session, Session];
1535
1536 let mock = mock_limiter(Some(DataCategory::Error));
1537 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1538
1539 assert!(!limits.is_limited());
1541 assert_eq!(envelope.envelope().len(), 3);
1542 mock.lock().await.assert_call(DataCategory::Session, 3);
1543 }
1544
1545 #[tokio::test]
1546 async fn test_enforce_limit_sessions() {
1547 let mut envelope = envelope![Session, Session, Event];
1548
1549 let mock = mock_limiter(Some(DataCategory::Session));
1550 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1551
1552 assert!(limits.is_limited());
1554 assert_eq!(envelope.envelope().len(), 1);
1555 mock.lock().await.assert_call(DataCategory::Error, 1);
1556 mock.lock().await.assert_call(DataCategory::Session, 2);
1557 }
1558
1559 #[tokio::test]
1560 #[cfg(feature = "processing")]
1561 async fn test_enforce_limit_assumed_event() {
1562 let mut envelope = envelope![];
1563
1564 let mock = mock_limiter(Some(DataCategory::Transaction));
1565 let (_, limits) =
1566 enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Transaction)).await;
1567
1568 assert!(limits.is_limited());
1569 assert!(envelope.envelope().is_empty()); mock.lock().await.assert_call(DataCategory::Transaction, 1);
1571 }
1572
1573 #[tokio::test]
1574 #[cfg(feature = "processing")]
1575 async fn test_enforce_limit_assumed_attachments() {
1576 let mut envelope = envelope![Attachment, Attachment];
1577
1578 let mock = mock_limiter(Some(DataCategory::Error));
1579 let (_, limits) =
1580 enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Error)).await;
1581
1582 assert!(limits.is_limited());
1583 assert!(envelope.envelope().is_empty());
1584 mock.lock().await.assert_call(DataCategory::Error, 1);
1585 }
1586
1587 #[tokio::test]
1588 async fn test_enforce_transaction() {
1589 let mut envelope = envelope![Transaction];
1590
1591 let mock = mock_limiter(Some(DataCategory::Transaction));
1592 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1593
1594 assert!(limits.is_limited());
1595 assert!(enforcement.event_indexed.is_active());
1596 assert!(enforcement.event.is_active());
1597 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1598
1599 assert_eq!(
1600 get_outcomes(enforcement),
1601 vec![
1602 (DataCategory::Transaction, 1),
1603 (DataCategory::TransactionIndexed, 1),
1604 ]
1605 );
1606 }
1607
1608 #[tokio::test]
1609 async fn test_enforce_transaction_non_indexed() {
1610 let mut envelope = envelope![Transaction, Profile];
1611 let scoping = envelope.scoping();
1612
1613 let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1614
1615 let mock_clone = mock.clone();
1616 let limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, move |s, q| {
1617 let mock_clone = mock_clone.clone();
1618 async move {
1619 let mut mock = mock_clone.lock().await;
1620 mock.check(s, q)
1621 }
1622 });
1623 let (enforcement, limits) = limiter
1624 .compute(envelope.envelope_mut(), &scoping)
1625 .await
1626 .unwrap();
1627 enforcement.clone().apply_with_outcomes(&mut envelope);
1628
1629 assert!(!limits.is_limited());
1630 assert!(!enforcement.event_indexed.is_active());
1631 assert!(!enforcement.event.is_active());
1632 assert!(!enforcement.profiles_indexed.is_active());
1633 assert!(!enforcement.profiles.is_active());
1634 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1635 mock.lock().await.assert_call(DataCategory::Profile, 1);
1636 }
1637
1638 #[tokio::test]
1639 async fn test_enforce_transaction_no_indexing_quota() {
1640 let mut envelope = envelope![Transaction];
1641
1642 let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1643 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1644
1645 assert!(limits.is_limited());
1646 assert!(enforcement.event_indexed.is_active());
1647 assert!(!enforcement.event.is_active());
1648 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1649 mock.lock()
1650 .await
1651 .assert_call(DataCategory::TransactionIndexed, 1);
1652 }
1653
1654 #[tokio::test]
1655 async fn test_enforce_transaction_attachment_enforced() {
1656 let mut envelope = envelope![Transaction, Attachment];
1657
1658 let mock = mock_limiter(Some(DataCategory::Transaction));
1659 let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1660
1661 assert!(enforcement.event.is_active());
1662 assert!(enforcement.attachments.is_active());
1663 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1664 }
1665
1666 fn get_outcomes(enforcement: Enforcement) -> Vec<(DataCategory, usize)> {
1667 enforcement
1668 .get_outcomes()
1669 .map(|(_, data_category, quantity)| (data_category, quantity))
1670 .collect::<Vec<_>>()
1671 }
1672
1673 #[tokio::test]
1674 async fn test_enforce_transaction_profile_enforced() {
1675 let mut envelope = envelope![Transaction, Profile];
1676
1677 let mock = mock_limiter(Some(DataCategory::Transaction));
1678 let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1679
1680 assert!(enforcement.event.is_active());
1681 assert!(enforcement.profiles.is_active());
1682 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1683
1684 assert_eq!(
1685 get_outcomes(enforcement),
1686 vec![
1687 (DataCategory::Transaction, 1),
1688 (DataCategory::TransactionIndexed, 1),
1689 (DataCategory::Profile, 1),
1690 (DataCategory::ProfileIndexed, 1),
1691 ]
1692 );
1693 }
1694
1695 #[tokio::test]
1696 async fn test_enforce_transaction_standalone_profile_enforced() {
1697 let mut envelope = envelope![Profile];
1699
1700 let mock = mock_limiter(Some(DataCategory::Transaction));
1701 let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1702
1703 assert!(enforcement.profiles.is_active());
1704 mock.lock().await.assert_call(DataCategory::Profile, 1);
1705 mock.lock().await.assert_call(DataCategory::Transaction, 0);
1706
1707 assert_eq!(
1708 get_outcomes(enforcement),
1709 vec![
1710 (DataCategory::Profile, 1),
1711 (DataCategory::ProfileIndexed, 1),
1712 ]
1713 );
1714 }
1715
1716 #[tokio::test]
1717 async fn test_enforce_transaction_attachment_enforced_indexing_quota() {
1718 let mut envelope = envelope![Transaction, Attachment];
1719 set_extracted(envelope.envelope_mut(), ItemType::Transaction);
1720
1721 let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1722 let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1723
1724 assert!(!enforcement.event.is_active());
1725 assert!(enforcement.event_indexed.is_active());
1726 assert!(enforcement.attachments.is_active());
1727 assert!(enforcement.attachment_items.is_active());
1728 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1729 mock.lock()
1730 .await
1731 .assert_call(DataCategory::TransactionIndexed, 1);
1732
1733 assert_eq!(
1734 get_outcomes(enforcement),
1735 vec![
1736 (DataCategory::TransactionIndexed, 1),
1737 (DataCategory::Attachment, 10),
1738 (DataCategory::AttachmentItem, 1)
1739 ]
1740 );
1741 }
1742
1743 #[tokio::test]
1744 async fn test_enforce_span() {
1745 let mut envelope = envelope![Span, OtelSpan];
1746
1747 let mock = mock_limiter(Some(DataCategory::Span));
1748 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1749
1750 assert!(limits.is_limited());
1751 assert!(enforcement.spans_indexed.is_active());
1752 assert!(enforcement.spans.is_active());
1753 mock.lock().await.assert_call(DataCategory::Span, 2);
1754
1755 assert_eq!(
1756 get_outcomes(enforcement),
1757 vec![(DataCategory::Span, 2), (DataCategory::SpanIndexed, 2)]
1758 );
1759 }
1760
1761 #[tokio::test]
1762 async fn test_enforce_span_no_indexing_quota() {
1763 let mut envelope = envelope![OtelSpan, Span];
1764
1765 let mock = mock_limiter(Some(DataCategory::SpanIndexed));
1766 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1767
1768 assert!(limits.is_limited());
1769 assert!(enforcement.spans_indexed.is_active());
1770 assert!(!enforcement.spans.is_active());
1771 mock.lock().await.assert_call(DataCategory::Span, 2);
1772 mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
1773
1774 assert_eq!(
1775 get_outcomes(enforcement),
1776 vec![(DataCategory::SpanIndexed, 2)]
1777 );
1778 }
1779
1780 #[tokio::test]
1781 async fn test_enforce_span_metrics_extracted_no_indexing_quota() {
1782 let mut envelope = envelope![Span, OtelSpan];
1783 set_extracted(envelope.envelope_mut(), ItemType::Span);
1784
1785 let mock = mock_limiter(Some(DataCategory::SpanIndexed));
1786 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1787
1788 assert!(limits.is_limited());
1789 assert!(enforcement.spans_indexed.is_active());
1790 assert!(!enforcement.spans.is_active());
1791 mock.lock().await.assert_call(DataCategory::Span, 2);
1792 mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
1793
1794 assert_eq!(
1795 get_outcomes(enforcement),
1796 vec![(DataCategory::SpanIndexed, 2)]
1797 );
1798 }
1799
1800 #[test]
1801 fn test_source_quantity_for_total_quantity() {
1802 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
1803 .parse()
1804 .unwrap();
1805 let request_meta = RequestMeta::new(dsn);
1806
1807 let mut envelope = Envelope::from_request(None, request_meta);
1808
1809 let mut item = Item::new(ItemType::MetricBuckets);
1810 item.set_source_quantities(SourceQuantities {
1811 transactions: 5,
1812 spans: 0,
1813 profiles: 2,
1814 buckets: 5,
1815 });
1816 envelope.add_item(item);
1817
1818 let mut item = Item::new(ItemType::MetricBuckets);
1819 item.set_source_quantities(SourceQuantities {
1820 transactions: 2,
1821 spans: 0,
1822 profiles: 0,
1823 buckets: 3,
1824 });
1825 envelope.add_item(item);
1826
1827 let summary = EnvelopeSummary::compute(&envelope);
1828
1829 assert_eq!(summary.profile_quantity, 2);
1830 assert_eq!(summary.secondary_transaction_quantity, 7);
1831 }
1832
1833 #[tokio::test]
1834 async fn test_enforce_limit_logs_count() {
1835 let mut envelope = envelope![Log, Log];
1836
1837 let mock = mock_limiter(Some(DataCategory::LogItem));
1838 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1839
1840 assert!(limits.is_limited());
1841 assert_eq!(envelope.envelope().len(), 0);
1842 mock.lock().await.assert_call(DataCategory::LogItem, 2);
1843 mock.lock().await.assert_call(DataCategory::LogByte, 20);
1844
1845 assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogItem, 2)]);
1846 }
1847
1848 #[tokio::test]
1849 async fn test_enforce_limit_logs_bytes() {
1850 let mut envelope = envelope![Log, Log];
1851
1852 let mock = mock_limiter(Some(DataCategory::LogByte));
1853 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1854
1855 assert!(limits.is_limited());
1856 assert_eq!(envelope.envelope().len(), 0);
1857 mock.lock().await.assert_call(DataCategory::LogItem, 2);
1858 mock.lock().await.assert_call(DataCategory::LogByte, 20);
1859
1860 assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogByte, 20)]);
1861 }
1862}