1use std::fmt::{self, Write};
2use std::future::Future;
3use std::marker::PhantomData;
4
5use relay_profiling::ProfileType;
6use relay_quotas::{
7 DataCategories, DataCategory, ItemScoping, QuotaScope, RateLimit, RateLimitScope, RateLimits,
8 ReasonCode, Scoping,
9};
10
11use crate::envelope::{Envelope, Item, ItemType};
12use crate::managed::ManagedEnvelope;
13use crate::services::outcome::Outcome;
14
15pub const RATE_LIMITS_HEADER: &str = "X-Sentry-Rate-Limits";
17
18pub fn format_rate_limits(rate_limits: &RateLimits) -> String {
20 let mut header = String::new();
21
22 for rate_limit in rate_limits {
23 if !header.is_empty() {
24 header.push_str(", ");
25 }
26
27 write!(header, "{}:", rate_limit.retry_after.remaining_seconds()).ok();
28
29 for (index, category) in rate_limit.categories.iter().enumerate() {
30 if index > 0 {
31 header.push(';');
32 }
33 write!(header, "{category}").ok();
34 }
35
36 write!(header, ":{}", rate_limit.scope.name()).ok();
37
38 if let Some(ref reason_code) = rate_limit.reason_code {
39 write!(header, ":{reason_code}").ok();
40 } else if !rate_limit.namespaces.is_empty() {
41 write!(header, ":").ok(); }
43
44 for (index, namespace) in rate_limit.namespaces.iter().enumerate() {
45 header.push(if index == 0 { ':' } else { ';' });
46 write!(header, "{namespace}").ok();
47 }
48 }
49
50 header
51}
52
53pub fn parse_rate_limits(scoping: &Scoping, string: &str) -> RateLimits {
55 let mut rate_limits = RateLimits::new();
56
57 for limit in string.split(',') {
58 let limit = limit.trim();
59 if limit.is_empty() {
60 continue;
61 }
62
63 let mut components = limit.split(':');
64
65 let retry_after = match components.next().and_then(|s| s.parse().ok()) {
66 Some(retry_after) => retry_after,
67 None => continue,
68 };
69
70 let mut categories = DataCategories::new();
71 for category in components.next().unwrap_or("").split(';') {
72 if !category.is_empty() {
73 categories.push(DataCategory::from_name(category));
74 }
75 }
76
77 let quota_scope = QuotaScope::from_name(components.next().unwrap_or(""));
78 let scope = RateLimitScope::for_quota(*scoping, quota_scope);
79
80 let reason_code = components
81 .next()
82 .filter(|s| !s.is_empty())
83 .map(ReasonCode::new);
84
85 let namespace = components
86 .next()
87 .unwrap_or("")
88 .split(';')
89 .filter(|s| !s.is_empty())
90 .filter_map(|s| s.parse().ok())
91 .collect();
92
93 rate_limits.add(RateLimit {
94 categories,
95 scope,
96 reason_code,
97 retry_after,
98 namespaces: namespace,
99 });
100 }
101
102 rate_limits
103}
104
105fn infer_event_category(item: &Item) -> Option<DataCategory> {
113 match item.ty() {
114 ItemType::Event => Some(DataCategory::Error),
115 ItemType::Transaction => Some(DataCategory::Transaction),
116 ItemType::Security | ItemType::RawSecurity => Some(DataCategory::Security),
117 ItemType::UnrealReport => Some(DataCategory::Error),
118 ItemType::UserReportV2 => Some(DataCategory::UserReportV2),
119 ItemType::Attachment if item.creates_event() => Some(DataCategory::Error),
120 ItemType::Attachment => None,
121 ItemType::Session => None,
122 ItemType::Sessions => None,
123 ItemType::Statsd => None,
124 ItemType::MetricBuckets => None,
125 ItemType::FormData => None,
126 ItemType::UserReport => None,
127 ItemType::Profile => None,
128 ItemType::ReplayEvent => None,
129 ItemType::ReplayRecording => None,
130 ItemType::ReplayVideo => None,
131 ItemType::ClientReport => None,
132 ItemType::CheckIn => None,
133 ItemType::Nel => None,
134 ItemType::Log => None,
135 ItemType::OtelLog => None,
136 ItemType::Span => None,
137 ItemType::OtelSpan => None,
138 ItemType::OtelTracesData => None,
139 ItemType::ProfileChunk => None,
140 ItemType::Unknown(_) => None,
141 }
142}
143
144#[non_exhaustive]
149#[derive(Clone, Copy, Debug, Default)]
150pub struct EnvelopeSummary {
151 pub event_category: Option<DataCategory>,
153
154 pub attachment_quantity: usize,
156
157 pub attachment_item_quantity: usize,
159
160 pub session_quantity: usize,
162
163 pub profile_quantity: usize,
165
166 pub replay_quantity: usize,
168
169 pub user_report_quantity: usize,
171
172 pub monitor_quantity: usize,
174
175 pub log_item_quantity: usize,
177
178 pub log_byte_quantity: usize,
180
181 pub secondary_transaction_quantity: usize,
190
191 pub secondary_span_quantity: usize,
193
194 pub span_quantity: usize,
196
197 pub has_plain_attachments: bool,
199
200 pub payload_size: usize,
202
203 pub profile_chunk_quantity: usize,
205 pub profile_chunk_ui_quantity: usize,
207}
208
209impl EnvelopeSummary {
210 pub fn empty() -> Self {
212 Self::default()
213 }
214
215 pub fn compute(envelope: &Envelope) -> Self {
217 let mut summary = Self::empty();
218
219 for item in envelope.items() {
220 if item.creates_event() {
221 summary.infer_category(item);
222 } else if item.ty() == &ItemType::Attachment {
223 summary.has_plain_attachments = true;
225 }
226
227 if item.rate_limited() {
230 continue;
231 }
232
233 if let Some(source_quantities) = item.source_quantities() {
234 summary.secondary_transaction_quantity += source_quantities.transactions;
235 summary.secondary_span_quantity += source_quantities.spans;
236 summary.profile_quantity += source_quantities.profiles;
237 }
238
239 summary.payload_size += item.len();
240
241 for (category, quantity) in item.quantities() {
242 summary.add_quantity(category, quantity);
243 }
244
245 if item.ty() == &ItemType::UserReport {
248 summary.user_report_quantity += 1;
249 }
250 }
251
252 summary
253 }
254
255 fn add_quantity(&mut self, category: DataCategory, quantity: usize) {
256 let target_quantity = match category {
257 DataCategory::Attachment => &mut self.attachment_quantity,
258 DataCategory::AttachmentItem => &mut self.attachment_item_quantity,
259 DataCategory::Session => &mut self.session_quantity,
260 DataCategory::Profile => &mut self.profile_quantity,
261 DataCategory::Replay => &mut self.replay_quantity,
262 DataCategory::DoNotUseReplayVideo => &mut self.replay_quantity,
263 DataCategory::Monitor => &mut self.monitor_quantity,
264 DataCategory::Span => &mut self.span_quantity,
265 DataCategory::LogItem => &mut self.log_item_quantity,
266 DataCategory::LogByte => &mut self.log_byte_quantity,
267 DataCategory::ProfileChunk => &mut self.profile_chunk_quantity,
268 DataCategory::ProfileChunkUi => &mut self.profile_chunk_ui_quantity,
269 _ => return,
271 };
272 *target_quantity += quantity;
273 }
274
275 fn infer_category(&mut self, item: &Item) {
280 if matches!(self.event_category, None | Some(DataCategory::Default)) {
281 if let Some(category) = infer_event_category(item) {
282 self.event_category = Some(category);
283 }
284 }
285 }
286}
287
288#[derive(Debug)]
290#[cfg_attr(test, derive(Clone))]
291pub struct CategoryLimit {
292 category: DataCategory,
294 quantity: usize,
298 reason_code: Option<ReasonCode>,
302}
303
304impl CategoryLimit {
305 fn new(category: DataCategory, quantity: usize, rate_limit: Option<&RateLimit>) -> Self {
309 match rate_limit {
310 Some(limit) => Self {
311 category,
312 quantity,
313 reason_code: limit.reason_code.clone(),
314 },
315 None => Self::default(),
316 }
317 }
318
319 pub fn clone_for(&self, category: DataCategory, quantity: usize) -> CategoryLimit {
321 if !self.is_active() {
322 return Self::default();
323 }
324
325 Self {
326 category,
327 quantity,
328 reason_code: self.reason_code.clone(),
329 }
330 }
331
332 pub fn is_active(&self) -> bool {
337 self.quantity > 0
338 }
339}
340
341impl Default for CategoryLimit {
342 fn default() -> Self {
343 Self {
344 category: DataCategory::Default,
345 quantity: 0,
346 reason_code: None,
347 }
348 }
349}
350
351#[derive(Default, Debug)]
353#[cfg_attr(test, derive(Clone))]
354pub struct Enforcement {
355 pub event: CategoryLimit,
357 pub event_indexed: CategoryLimit,
359 pub attachments: CategoryLimit,
361 pub attachment_items: CategoryLimit,
363 pub sessions: CategoryLimit,
365 pub profiles: CategoryLimit,
367 pub profiles_indexed: CategoryLimit,
369 pub replays: CategoryLimit,
371 pub check_ins: CategoryLimit,
373 pub log_items: CategoryLimit,
375 pub log_bytes: CategoryLimit,
377 pub spans: CategoryLimit,
379 pub spans_indexed: CategoryLimit,
381 pub user_reports: CategoryLimit,
383 pub profile_chunks: CategoryLimit,
385 pub profile_chunks_ui: CategoryLimit,
387}
388
389impl Enforcement {
390 pub fn active_event(&self) -> Option<&CategoryLimit> {
394 if self.event.is_active() {
395 Some(&self.event)
396 } else if self.event_indexed.is_active() {
397 Some(&self.event_indexed)
398 } else {
399 None
400 }
401 }
402
403 pub fn is_event_active(&self) -> bool {
405 self.active_event().is_some()
406 }
407
408 fn get_outcomes(self) -> impl Iterator<Item = (Outcome, DataCategory, usize)> {
410 let Self {
411 event,
412 event_indexed,
413 attachments,
414 attachment_items,
415 sessions: _, profiles,
417 profiles_indexed,
418 replays,
419 check_ins,
420 log_items,
421 log_bytes,
422 spans,
423 spans_indexed,
424 user_reports,
425 profile_chunks,
426 profile_chunks_ui,
427 } = self;
428
429 let limits = [
430 event,
431 event_indexed,
432 attachments,
433 attachment_items,
434 profiles,
435 profiles_indexed,
436 replays,
437 check_ins,
438 log_items,
439 log_bytes,
440 spans,
441 spans_indexed,
442 user_reports,
443 profile_chunks,
444 profile_chunks_ui,
445 ];
446
447 limits
448 .into_iter()
449 .filter(move |limit| limit.is_active())
450 .map(move |limit| {
451 (
452 Outcome::RateLimited(limit.reason_code),
453 limit.category,
454 limit.quantity,
455 )
456 })
457 }
458
459 pub fn apply_with_outcomes(self, envelope: &mut ManagedEnvelope) {
495 envelope
496 .envelope_mut()
497 .retain_items(|item| self.retain_item(item));
498 self.track_outcomes(envelope);
499 }
500
501 fn retain_item(&self, item: &mut Item) -> bool {
503 if self.event.is_active() && item.requires_event() {
505 return false;
506 }
507
508 match item.ty() {
512 ItemType::Attachment => {
513 if !(self.attachments.is_active() || self.attachment_items.is_active()) {
514 return true;
515 }
516 if item.creates_event() {
517 item.set_rate_limited(true);
518 true
519 } else {
520 false
521 }
522 }
523 ItemType::Session => !self.sessions.is_active(),
524 ItemType::Profile => !self.profiles_indexed.is_active(),
525 ItemType::ReplayEvent => !self.replays.is_active(),
526 ItemType::ReplayVideo => !self.replays.is_active(),
527 ItemType::ReplayRecording => !self.replays.is_active(),
528 ItemType::UserReport => !self.user_reports.is_active(),
529 ItemType::CheckIn => !self.check_ins.is_active(),
530 ItemType::OtelLog | ItemType::Log => {
531 !(self.log_items.is_active() || self.log_bytes.is_active())
532 }
533 ItemType::Span | ItemType::OtelSpan | ItemType::OtelTracesData => {
534 !self.spans_indexed.is_active()
535 }
536 ItemType::ProfileChunk => match item.profile_type() {
537 Some(ProfileType::Backend) => !self.profile_chunks.is_active(),
538 Some(ProfileType::Ui) => !self.profile_chunks_ui.is_active(),
539 None => true,
540 },
541 ItemType::Event
542 | ItemType::Transaction
543 | ItemType::Security
544 | ItemType::FormData
545 | ItemType::RawSecurity
546 | ItemType::Nel
547 | ItemType::UnrealReport
548 | ItemType::Sessions
549 | ItemType::Statsd
550 | ItemType::MetricBuckets
551 | ItemType::ClientReport
552 | ItemType::UserReportV2 | ItemType::Unknown(_) => true,
554 }
555 }
556
557 fn track_outcomes(self, envelope: &mut ManagedEnvelope) {
561 for (outcome, category, quantity) in self.get_outcomes() {
562 envelope.track_outcome(outcome, category, quantity)
563 }
564 }
565}
566
567#[derive(Debug, Copy, Clone)]
569pub enum CheckLimits {
570 NonIndexed,
577 All,
579}
580
581struct Check<F, E, R> {
582 limits: CheckLimits,
583 check: F,
584 _1: PhantomData<E>,
585 _2: PhantomData<R>,
586}
587
588impl<F, E, R> Check<F, E, R>
589where
590 F: FnMut(ItemScoping, usize) -> R,
591 R: Future<Output = Result<RateLimits, E>>,
592{
593 async fn apply(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, E> {
594 if matches!(self.limits, CheckLimits::NonIndexed) && scoping.category.is_indexed() {
595 return Ok(RateLimits::default());
596 }
597
598 (self.check)(scoping, quantity).await
599 }
600}
601
602pub struct EnvelopeLimiter<F, E, R> {
614 check: Check<F, E, R>,
615 event_category: Option<DataCategory>,
616}
617
618impl<'a, F, E, R> EnvelopeLimiter<F, E, R>
619where
620 F: FnMut(ItemScoping, usize) -> R,
621 R: Future<Output = Result<RateLimits, E>>,
622{
623 pub fn new(limits: CheckLimits, check: F) -> Self {
625 Self {
626 check: Check {
627 check,
628 limits,
629 _1: PhantomData,
630 _2: PhantomData,
631 },
632 event_category: None,
633 }
634 }
635
636 pub fn assume_event(&mut self, category: DataCategory) {
642 self.event_category = Some(category);
643 }
644
645 pub async fn compute(
656 mut self,
657 envelope: &mut Envelope,
658 scoping: &'a Scoping,
659 ) -> Result<(Enforcement, RateLimits), E> {
660 let mut summary = EnvelopeSummary::compute(envelope);
661 summary.event_category = self.event_category.or(summary.event_category);
662
663 let (enforcement, rate_limits) = self.execute(&summary, scoping).await?;
664 Ok((enforcement, rate_limits))
665 }
666
667 async fn execute(
668 &mut self,
669 summary: &EnvelopeSummary,
670 scoping: &'a Scoping,
671 ) -> Result<(Enforcement, RateLimits), E> {
672 let mut rate_limits = RateLimits::new();
673 let mut enforcement = Enforcement::default();
674
675 if let Some(category) = summary.event_category {
677 let mut event_limits = self.check.apply(scoping.item(category), 1).await?;
679 enforcement.event = CategoryLimit::new(category, 1, event_limits.longest());
680
681 if let Some(index_category) = category.index_category() {
682 if event_limits.is_empty() {
685 event_limits.merge(self.check.apply(scoping.item(index_category), 1).await?);
686 }
687
688 enforcement.event_indexed =
689 CategoryLimit::new(index_category, 1, event_limits.longest());
690 };
691
692 rate_limits.merge(event_limits);
693 }
694
695 if let Some(limit) = enforcement.active_event() {
697 let limit1 = limit.clone_for(DataCategory::Attachment, summary.attachment_quantity);
698 let limit2 = limit.clone_for(
699 DataCategory::AttachmentItem,
700 summary.attachment_item_quantity,
701 );
702 enforcement.attachments = limit1;
703 enforcement.attachment_items = limit2;
704 } else {
705 let mut attachment_limits = RateLimits::new();
706 if summary.attachment_quantity > 0 {
707 let item_scoping = scoping.item(DataCategory::Attachment);
708
709 let attachment_byte_limits = self
710 .check
711 .apply(item_scoping, summary.attachment_quantity)
712 .await?;
713
714 enforcement.attachments = CategoryLimit::new(
715 DataCategory::Attachment,
716 summary.attachment_quantity,
717 attachment_byte_limits.longest(),
718 );
719 attachment_limits.merge(attachment_byte_limits);
720 }
721 if !attachment_limits.is_limited() && summary.attachment_item_quantity > 0 {
722 let item_scoping = scoping.item(DataCategory::AttachmentItem);
723
724 let attachment_item_limits = self
725 .check
726 .apply(item_scoping, summary.attachment_item_quantity)
727 .await?;
728
729 enforcement.attachment_items = CategoryLimit::new(
730 DataCategory::AttachmentItem,
731 summary.attachment_item_quantity,
732 attachment_item_limits.longest(),
733 );
734 attachment_limits.merge(attachment_item_limits);
735 }
736
737 if summary.has_plain_attachments {
741 rate_limits.merge(attachment_limits);
742 }
743 }
744
745 if summary.session_quantity > 0 {
747 let item_scoping = scoping.item(DataCategory::Session);
748 let session_limits = self
749 .check
750 .apply(item_scoping, summary.session_quantity)
751 .await?;
752 enforcement.sessions = CategoryLimit::new(
753 DataCategory::Session,
754 summary.session_quantity,
755 session_limits.longest(),
756 );
757 rate_limits.merge(session_limits);
758 }
759
760 if summary.log_item_quantity > 0 {
762 let item_scoping = scoping.item(DataCategory::LogItem);
763 let log_limits = self
764 .check
765 .apply(item_scoping, summary.log_item_quantity)
766 .await?;
767 enforcement.log_items = CategoryLimit::new(
768 DataCategory::LogItem,
769 summary.log_item_quantity,
770 log_limits.longest(),
771 );
772 rate_limits.merge(log_limits);
773 }
774 if summary.log_byte_quantity > 0 {
775 let item_scoping = scoping.item(DataCategory::LogByte);
776 let log_limits = self
777 .check
778 .apply(item_scoping, summary.log_byte_quantity)
779 .await?;
780 enforcement.log_bytes = CategoryLimit::new(
781 DataCategory::LogByte,
782 summary.log_byte_quantity,
783 log_limits.longest(),
784 );
785 rate_limits.merge(log_limits);
786 }
787
788 if enforcement.is_event_active() {
790 enforcement.profiles = enforcement
791 .event
792 .clone_for(DataCategory::Profile, summary.profile_quantity);
793
794 enforcement.profiles_indexed = enforcement
795 .event_indexed
796 .clone_for(DataCategory::ProfileIndexed, summary.profile_quantity)
797 } else if summary.profile_quantity > 0 {
798 let mut profile_limits = self
799 .check
800 .apply(
801 scoping.item(DataCategory::Profile),
802 summary.profile_quantity,
803 )
804 .await?;
805
806 if profile_limits.is_empty() && summary.event_category.is_none() {
809 profile_limits = self
810 .check
811 .apply(scoping.item(DataCategory::Transaction), 0)
812 .await?;
813 }
814
815 enforcement.profiles = CategoryLimit::new(
816 DataCategory::Profile,
817 summary.profile_quantity,
818 profile_limits.longest(),
819 );
820
821 if profile_limits.is_empty() {
822 profile_limits.merge(
823 self.check
824 .apply(
825 scoping.item(DataCategory::ProfileIndexed),
826 summary.profile_quantity,
827 )
828 .await?,
829 );
830 }
831
832 enforcement.profiles_indexed = CategoryLimit::new(
833 DataCategory::ProfileIndexed,
834 summary.profile_quantity,
835 profile_limits.longest(),
836 );
837
838 rate_limits.merge(profile_limits);
839 }
840
841 if summary.replay_quantity > 0 {
843 let item_scoping = scoping.item(DataCategory::Replay);
844 let replay_limits = self
845 .check
846 .apply(item_scoping, summary.replay_quantity)
847 .await?;
848 enforcement.replays = CategoryLimit::new(
849 DataCategory::Replay,
850 summary.replay_quantity,
851 replay_limits.longest(),
852 );
853 rate_limits.merge(replay_limits);
854 }
855
856 if summary.user_report_quantity > 0 {
858 let item_scoping = scoping.item(DataCategory::UserReportV2);
859 let user_report_v2_limits = self
860 .check
861 .apply(item_scoping, summary.user_report_quantity)
862 .await?;
863 enforcement.user_reports = CategoryLimit::new(
864 DataCategory::UserReportV2,
865 summary.user_report_quantity,
866 user_report_v2_limits.longest(),
867 );
868 rate_limits.merge(user_report_v2_limits);
869 }
870
871 if summary.monitor_quantity > 0 {
873 let item_scoping = scoping.item(DataCategory::Monitor);
874 let checkin_limits = self
875 .check
876 .apply(item_scoping, summary.monitor_quantity)
877 .await?;
878 enforcement.check_ins = CategoryLimit::new(
879 DataCategory::Monitor,
880 summary.monitor_quantity,
881 checkin_limits.longest(),
882 );
883 rate_limits.merge(checkin_limits);
884 }
885
886 if enforcement.is_event_active() {
888 enforcement.spans = enforcement
889 .event
890 .clone_for(DataCategory::Span, summary.span_quantity);
891
892 enforcement.spans_indexed = enforcement
893 .event_indexed
894 .clone_for(DataCategory::SpanIndexed, summary.span_quantity);
895 } else if summary.span_quantity > 0 {
896 let mut span_limits = self
897 .check
898 .apply(scoping.item(DataCategory::Span), summary.span_quantity)
899 .await?;
900 enforcement.spans = CategoryLimit::new(
901 DataCategory::Span,
902 summary.span_quantity,
903 span_limits.longest(),
904 );
905
906 if span_limits.is_empty() {
907 span_limits.merge(
908 self.check
909 .apply(
910 scoping.item(DataCategory::SpanIndexed),
911 summary.span_quantity,
912 )
913 .await?,
914 );
915 }
916
917 enforcement.spans_indexed = CategoryLimit::new(
918 DataCategory::SpanIndexed,
919 summary.span_quantity,
920 span_limits.longest(),
921 );
922
923 rate_limits.merge(span_limits);
924 }
925
926 if summary.profile_chunk_quantity > 0 {
928 let item_scoping = scoping.item(DataCategory::ProfileChunk);
929 let limits = self
930 .check
931 .apply(item_scoping, summary.profile_chunk_quantity)
932 .await?;
933 enforcement.profile_chunks = CategoryLimit::new(
934 DataCategory::ProfileChunk,
935 summary.profile_chunk_quantity,
936 limits.longest(),
937 );
938 rate_limits.merge(limits);
939 }
940
941 if summary.profile_chunk_ui_quantity > 0 {
942 let item_scoping = scoping.item(DataCategory::ProfileChunkUi);
943 let limits = self
944 .check
945 .apply(item_scoping, summary.profile_chunk_ui_quantity)
946 .await?;
947 enforcement.profile_chunks_ui = CategoryLimit::new(
948 DataCategory::ProfileChunkUi,
949 summary.profile_chunk_ui_quantity,
950 limits.longest(),
951 );
952 rate_limits.merge(limits);
953 }
954
955 Ok((enforcement, rate_limits))
956 }
957}
958
959impl<F, E, R> fmt::Debug for EnvelopeLimiter<F, E, R> {
960 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
961 f.debug_struct("EnvelopeLimiter")
962 .field("event_category", &self.event_category)
963 .finish()
964 }
965}
966
967#[cfg(test)]
968mod tests {
969
970 use std::collections::{BTreeMap, BTreeSet};
971 use std::sync::Arc;
972
973 use relay_base_schema::organization::OrganizationId;
974 use relay_base_schema::project::{ProjectId, ProjectKey};
975 use relay_metrics::MetricNamespace;
976 use relay_quotas::RetryAfter;
977 use relay_system::Addr;
978 use smallvec::smallvec;
979 use tokio::sync::Mutex;
980
981 use super::*;
982 use crate::{
983 envelope::{AttachmentType, ContentType, SourceQuantities},
984 extractors::RequestMeta,
985 };
986
987 #[tokio::test]
988 async fn test_format_rate_limits() {
989 let mut rate_limits = RateLimits::new();
990
991 rate_limits.add(RateLimit {
993 categories: DataCategories::new(),
994 scope: RateLimitScope::Organization(OrganizationId::new(42)),
995 reason_code: Some(ReasonCode::new("my_limit")),
996 retry_after: RetryAfter::from_secs(42),
997 namespaces: smallvec![],
998 });
999
1000 rate_limits.add(RateLimit {
1002 categories: smallvec![DataCategory::Transaction, DataCategory::Security],
1003 scope: RateLimitScope::Project(ProjectId::new(21)),
1004 reason_code: None,
1005 retry_after: RetryAfter::from_secs(4711),
1006 namespaces: smallvec![],
1007 });
1008
1009 let formatted = format_rate_limits(&rate_limits);
1010 let expected = "42::organization:my_limit, 4711:transaction;security:project";
1011 assert_eq!(formatted, expected);
1012 }
1013
1014 #[tokio::test]
1015 async fn test_format_rate_limits_namespace() {
1016 let mut rate_limits = RateLimits::new();
1017
1018 rate_limits.add(RateLimit {
1020 categories: smallvec![DataCategory::MetricBucket],
1021 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1022 reason_code: Some(ReasonCode::new("my_limit")),
1023 retry_after: RetryAfter::from_secs(42),
1024 namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1025 });
1026
1027 rate_limits.add(RateLimit {
1029 categories: smallvec![DataCategory::MetricBucket],
1030 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1031 reason_code: None,
1032 retry_after: RetryAfter::from_secs(42),
1033 namespaces: smallvec![MetricNamespace::Spans],
1034 });
1035
1036 let formatted = format_rate_limits(&rate_limits);
1037 let expected = "42:metric_bucket:organization:my_limit:custom;spans, 42:metric_bucket:organization::spans";
1038 assert_eq!(formatted, expected);
1039 }
1040
1041 #[tokio::test]
1042 async fn test_parse_invalid_rate_limits() {
1043 let scoping = Scoping {
1044 organization_id: OrganizationId::new(42),
1045 project_id: ProjectId::new(21),
1046 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1047 key_id: Some(17),
1048 };
1049
1050 assert!(parse_rate_limits(&scoping, "").is_ok());
1051 assert!(parse_rate_limits(&scoping, "invalid").is_ok());
1052 assert!(parse_rate_limits(&scoping, ",,,").is_ok());
1053 }
1054
1055 #[tokio::test]
1056 async fn test_parse_rate_limits() {
1057 let scoping = Scoping {
1058 organization_id: OrganizationId::new(42),
1059 project_id: ProjectId::new(21),
1060 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1061 key_id: Some(17),
1062 };
1063
1064 let formatted =
1066 "42::organization:my_limit, invalid, 4711:foobar;transaction;security:project";
1067 let rate_limits: Vec<RateLimit> =
1068 parse_rate_limits(&scoping, formatted).into_iter().collect();
1069
1070 assert_eq!(
1071 rate_limits,
1072 vec![
1073 RateLimit {
1074 categories: DataCategories::new(),
1075 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1076 reason_code: Some(ReasonCode::new("my_limit")),
1077 retry_after: rate_limits[0].retry_after,
1078 namespaces: smallvec![],
1079 },
1080 RateLimit {
1081 categories: smallvec![
1082 DataCategory::Unknown,
1083 DataCategory::Transaction,
1084 DataCategory::Security,
1085 ],
1086 scope: RateLimitScope::Project(ProjectId::new(21)),
1087 reason_code: None,
1088 retry_after: rate_limits[1].retry_after,
1089 namespaces: smallvec![],
1090 }
1091 ]
1092 );
1093
1094 assert_eq!(42, rate_limits[0].retry_after.remaining_seconds());
1095 assert_eq!(4711, rate_limits[1].retry_after.remaining_seconds());
1096 }
1097
1098 #[tokio::test]
1099 async fn test_parse_rate_limits_namespace() {
1100 let scoping = Scoping {
1101 organization_id: OrganizationId::new(42),
1102 project_id: ProjectId::new(21),
1103 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1104 key_id: Some(17),
1105 };
1106
1107 let formatted = "42:metric_bucket:organization::custom;spans";
1108 let rate_limits: Vec<RateLimit> =
1109 parse_rate_limits(&scoping, formatted).into_iter().collect();
1110
1111 assert_eq!(
1112 rate_limits,
1113 vec![RateLimit {
1114 categories: smallvec![DataCategory::MetricBucket],
1115 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1116 reason_code: None,
1117 retry_after: rate_limits[0].retry_after,
1118 namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
1119 }]
1120 );
1121 }
1122
1123 #[tokio::test]
1124 async fn test_parse_rate_limits_empty_namespace() {
1125 let scoping = Scoping {
1126 organization_id: OrganizationId::new(42),
1127 project_id: ProjectId::new(21),
1128 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1129 key_id: Some(17),
1130 };
1131
1132 let formatted = "42:metric_bucket:organization:some_reason:";
1134 let rate_limits: Vec<RateLimit> =
1135 parse_rate_limits(&scoping, formatted).into_iter().collect();
1136
1137 assert_eq!(
1138 rate_limits,
1139 vec![RateLimit {
1140 categories: smallvec![DataCategory::MetricBucket],
1141 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1142 reason_code: Some(ReasonCode::new("some_reason")),
1143 retry_after: rate_limits[0].retry_after,
1144 namespaces: smallvec![],
1145 }]
1146 );
1147 }
1148
1149 #[tokio::test]
1150 async fn test_parse_rate_limits_only_unknown() {
1151 let scoping = Scoping {
1152 organization_id: OrganizationId::new(42),
1153 project_id: ProjectId::new(21),
1154 project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
1155 key_id: Some(17),
1156 };
1157
1158 let formatted = "42:foo;bar:organization";
1159 let rate_limits: Vec<RateLimit> =
1160 parse_rate_limits(&scoping, formatted).into_iter().collect();
1161
1162 assert_eq!(
1163 rate_limits,
1164 vec![RateLimit {
1165 categories: smallvec![DataCategory::Unknown, DataCategory::Unknown],
1166 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1167 reason_code: None,
1168 retry_after: rate_limits[0].retry_after,
1169 namespaces: smallvec![],
1170 },]
1171 );
1172 }
1173
1174 macro_rules! envelope {
1175 ($( $item_type:ident $( :: $attachment_type:ident )? ),*) => {{
1176 let bytes = "{\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}";
1177 #[allow(unused_mut)]
1178 let mut envelope = Envelope::parse_bytes(bytes.into()).unwrap();
1179 $(
1180 let mut item = Item::new(ItemType::$item_type);
1181 item.set_payload(ContentType::OctetStream, "0123456789");
1182 $( item.set_attachment_type(AttachmentType::$attachment_type); )?
1183 envelope.add_item(item);
1184 )*
1185
1186 let (outcome_aggregator, _) = Addr::custom();
1187 let (test_store, _) = Addr::custom();
1188
1189 ManagedEnvelope::new(
1190 envelope,
1191 outcome_aggregator,
1192 test_store,
1193 )
1194 }}
1195 }
1196
1197 fn set_extracted(envelope: &mut Envelope, ty: ItemType) {
1198 envelope
1199 .get_item_by_mut(|item| *item.ty() == ty)
1200 .unwrap()
1201 .set_metrics_extracted(true);
1202 }
1203
1204 fn rate_limit(category: DataCategory) -> RateLimit {
1205 RateLimit {
1206 categories: vec![category].into(),
1207 scope: RateLimitScope::Organization(OrganizationId::new(42)),
1208 reason_code: None,
1209 retry_after: RetryAfter::from_secs(60),
1210 namespaces: smallvec![],
1211 }
1212 }
1213
1214 #[derive(Debug, Default)]
1215 struct MockLimiter {
1216 denied: Vec<DataCategory>,
1217 called: BTreeMap<DataCategory, usize>,
1218 checked: BTreeSet<DataCategory>,
1219 }
1220
1221 impl MockLimiter {
1222 pub fn deny(mut self, category: DataCategory) -> Self {
1223 self.denied.push(category);
1224 self
1225 }
1226
1227 pub fn check(&mut self, scoping: ItemScoping, quantity: usize) -> Result<RateLimits, ()> {
1228 let cat = scoping.category;
1229 let previous = self.called.insert(cat, quantity);
1230 assert!(previous.is_none(), "rate limiter invoked twice for {cat}");
1231
1232 let mut limits = RateLimits::new();
1233 if self.denied.contains(&cat) {
1234 limits.add(rate_limit(cat));
1235 }
1236 Ok(limits)
1237 }
1238
1239 #[track_caller]
1240 pub fn assert_call(&mut self, category: DataCategory, expected: usize) {
1241 self.checked.insert(category);
1242
1243 let quantity = self.called.get(&category).copied();
1244 assert_eq!(
1245 quantity,
1246 Some(expected),
1247 "Expected quantity `{expected}` for data category `{category}`, got {quantity:?}."
1248 );
1249 }
1250 }
1251
1252 impl Drop for MockLimiter {
1253 fn drop(&mut self) {
1254 if std::thread::panicking() {
1255 return;
1256 }
1257
1258 for checked in &self.checked {
1259 self.called.remove(checked);
1260 }
1261
1262 if self.called.is_empty() {
1263 return;
1264 }
1265
1266 let not_asserted = self
1267 .called
1268 .iter()
1269 .map(|(k, v)| format!("- {k}: {v}"))
1270 .collect::<Vec<_>>()
1271 .join("\n");
1272
1273 panic!("Following calls to the limiter were not asserted:\n{not_asserted}");
1274 }
1275 }
1276
1277 async fn enforce_and_apply(
1278 mock: Arc<Mutex<MockLimiter>>,
1279 envelope: &mut ManagedEnvelope,
1280 #[allow(unused_variables)] assume_event: Option<DataCategory>,
1281 ) -> (Enforcement, RateLimits) {
1282 let scoping = envelope.scoping();
1283
1284 #[allow(unused_mut)]
1285 let mut limiter = EnvelopeLimiter::new(CheckLimits::All, move |s, q| {
1286 let mock = mock.clone();
1287 async move {
1288 let mut mock = mock.lock().await;
1289 mock.check(s, q)
1290 }
1291 });
1292 #[cfg(feature = "processing")]
1293 if let Some(assume_event) = assume_event {
1294 limiter.assume_event(assume_event);
1295 }
1296
1297 let (enforcement, limits) = limiter
1298 .compute(envelope.envelope_mut(), &scoping)
1299 .await
1300 .unwrap();
1301
1302 enforcement.clone().apply_with_outcomes(envelope);
1305
1306 (enforcement, limits)
1307 }
1308
1309 fn mock_limiter(category: Option<DataCategory>) -> Arc<Mutex<MockLimiter>> {
1310 let mut mock = MockLimiter::default();
1311 if let Some(category) = category {
1312 mock = mock.deny(category);
1313 }
1314
1315 Arc::new(Mutex::new(mock))
1316 }
1317
1318 #[tokio::test]
1319 async fn test_enforce_pass_empty() {
1320 let mut envelope = envelope![];
1321
1322 let mock = mock_limiter(None);
1323 let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1324
1325 assert!(!limits.is_limited());
1326 assert!(envelope.envelope().is_empty());
1327 }
1328
1329 #[tokio::test]
1330 async fn test_enforce_limit_error_event() {
1331 let mut envelope = envelope![Event];
1332
1333 let mock = mock_limiter(Some(DataCategory::Error));
1334 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1335
1336 assert!(limits.is_limited());
1337 assert!(envelope.envelope().is_empty());
1338 mock.lock().await.assert_call(DataCategory::Error, 1);
1339 }
1340
1341 #[tokio::test]
1342 async fn test_enforce_limit_error_with_attachments() {
1343 let mut envelope = envelope![Event, Attachment];
1344
1345 let mock = mock_limiter(Some(DataCategory::Error));
1346 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1347
1348 assert!(limits.is_limited());
1349 assert!(envelope.envelope().is_empty());
1350 mock.lock().await.assert_call(DataCategory::Error, 1);
1351 }
1352
1353 #[tokio::test]
1354 async fn test_enforce_limit_minidump() {
1355 let mut envelope = envelope![Attachment::Minidump];
1356
1357 let mock = mock_limiter(Some(DataCategory::Error));
1358 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1359
1360 assert!(limits.is_limited());
1361 assert!(envelope.envelope().is_empty());
1362 mock.lock().await.assert_call(DataCategory::Error, 1);
1363 }
1364
1365 #[tokio::test]
1366 async fn test_enforce_limit_attachments() {
1367 let mut envelope = envelope![Attachment::Minidump, Attachment];
1368
1369 let mock = mock_limiter(Some(DataCategory::Attachment));
1370 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1371
1372 assert!(limits.is_limited());
1374 assert_eq!(envelope.envelope().len(), 1);
1375 mock.lock().await.assert_call(DataCategory::Error, 1);
1376 mock.lock().await.assert_call(DataCategory::Attachment, 20);
1377 }
1378
1379 #[tokio::test]
1381 async fn test_enforce_limit_profiles() {
1382 let mut envelope = envelope![Profile, Profile];
1383
1384 let mock = mock_limiter(Some(DataCategory::Profile));
1385 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1386
1387 assert!(limits.is_limited());
1388 assert_eq!(envelope.envelope().len(), 0);
1389 mock.lock().await.assert_call(DataCategory::Profile, 2);
1390
1391 assert_eq!(
1392 get_outcomes(enforcement),
1393 vec![
1394 (DataCategory::Profile, 2),
1395 (DataCategory::ProfileIndexed, 2)
1396 ]
1397 );
1398 }
1399
1400 #[tokio::test]
1402 async fn test_enforce_limit_profile_chunks_no_profile_type() {
1403 let mut envelope = envelope![ProfileChunk, ProfileChunk];
1406
1407 let mock = mock_limiter(Some(DataCategory::ProfileChunk));
1408 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1409 assert!(!limits.is_limited());
1410 assert_eq!(get_outcomes(enforcement), vec![]);
1411
1412 let mock = mock_limiter(Some(DataCategory::ProfileChunkUi));
1413 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1414 assert!(!limits.is_limited());
1415 assert_eq!(get_outcomes(enforcement), vec![]);
1416
1417 assert_eq!(envelope.envelope().len(), 2);
1418 }
1419
1420 #[tokio::test]
1421 async fn test_enforce_limit_profile_chunks_ui() {
1422 let mut envelope = envelope![];
1423
1424 let mut item = Item::new(ItemType::ProfileChunk);
1425 item.set_profile_type(ProfileType::Backend);
1426 envelope.envelope_mut().add_item(item);
1427 let mut item = Item::new(ItemType::ProfileChunk);
1428 item.set_profile_type(ProfileType::Ui);
1429 envelope.envelope_mut().add_item(item);
1430
1431 let mock = mock_limiter(Some(DataCategory::ProfileChunkUi));
1432 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1433
1434 assert!(limits.is_limited());
1435 assert_eq!(envelope.envelope().len(), 1);
1436 mock.lock()
1437 .await
1438 .assert_call(DataCategory::ProfileChunkUi, 1);
1439 mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1440
1441 assert_eq!(
1442 get_outcomes(enforcement),
1443 vec![(DataCategory::ProfileChunkUi, 1)]
1444 );
1445 }
1446
1447 #[tokio::test]
1448 async fn test_enforce_limit_profile_chunks_backend() {
1449 let mut envelope = envelope![];
1450
1451 let mut item = Item::new(ItemType::ProfileChunk);
1452 item.set_profile_type(ProfileType::Backend);
1453 envelope.envelope_mut().add_item(item);
1454 let mut item = Item::new(ItemType::ProfileChunk);
1455 item.set_profile_type(ProfileType::Ui);
1456 envelope.envelope_mut().add_item(item);
1457
1458 let mock = mock_limiter(Some(DataCategory::ProfileChunk));
1459 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1460
1461 assert!(limits.is_limited());
1462 assert_eq!(envelope.envelope().len(), 1);
1463 mock.lock()
1464 .await
1465 .assert_call(DataCategory::ProfileChunkUi, 1);
1466 mock.lock().await.assert_call(DataCategory::ProfileChunk, 1);
1467
1468 assert_eq!(
1469 get_outcomes(enforcement),
1470 vec![(DataCategory::ProfileChunk, 1)]
1471 );
1472 }
1473
1474 #[tokio::test]
1476 async fn test_enforce_limit_replays() {
1477 let mut envelope = envelope![ReplayEvent, ReplayRecording, ReplayVideo];
1478
1479 let mock = mock_limiter(Some(DataCategory::Replay));
1480 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1481
1482 assert!(limits.is_limited());
1483 assert_eq!(envelope.envelope().len(), 0);
1484 mock.lock().await.assert_call(DataCategory::Replay, 3);
1485
1486 assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Replay, 3),]);
1487 }
1488
1489 #[tokio::test]
1491 async fn test_enforce_limit_monitor_checkins() {
1492 let mut envelope = envelope![CheckIn];
1493
1494 let mock = mock_limiter(Some(DataCategory::Monitor));
1495 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1496
1497 assert!(limits.is_limited());
1498 assert_eq!(envelope.envelope().len(), 0);
1499 mock.lock().await.assert_call(DataCategory::Monitor, 1);
1500
1501 assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Monitor, 1)])
1502 }
1503
1504 #[tokio::test]
1505 async fn test_enforce_pass_minidump() {
1506 let mut envelope = envelope![Attachment::Minidump];
1507
1508 let mock = mock_limiter(Some(DataCategory::Attachment));
1509 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1510
1511 assert!(!limits.is_limited());
1513 assert_eq!(envelope.envelope().len(), 1);
1514 mock.lock().await.assert_call(DataCategory::Error, 1);
1515 mock.lock().await.assert_call(DataCategory::Attachment, 10);
1516 }
1517
1518 #[tokio::test]
1519 async fn test_enforce_skip_rate_limited() {
1520 let mut envelope = envelope![];
1521
1522 let mut item = Item::new(ItemType::Attachment);
1523 item.set_payload(ContentType::OctetStream, "0123456789");
1524 item.set_rate_limited(true);
1525 envelope.envelope_mut().add_item(item);
1526
1527 let mock = mock_limiter(Some(DataCategory::Error));
1528 let (_, limits) = enforce_and_apply(mock, &mut envelope, None).await;
1529
1530 assert!(!limits.is_limited()); assert_eq!(envelope.envelope().len(), 1); }
1533
1534 #[tokio::test]
1535 async fn test_enforce_pass_sessions() {
1536 let mut envelope = envelope![Session, Session, Session];
1537
1538 let mock = mock_limiter(Some(DataCategory::Error));
1539 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1540
1541 assert!(!limits.is_limited());
1543 assert_eq!(envelope.envelope().len(), 3);
1544 mock.lock().await.assert_call(DataCategory::Session, 3);
1545 }
1546
1547 #[tokio::test]
1548 async fn test_enforce_limit_sessions() {
1549 let mut envelope = envelope![Session, Session, Event];
1550
1551 let mock = mock_limiter(Some(DataCategory::Session));
1552 let (_, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1553
1554 assert!(limits.is_limited());
1556 assert_eq!(envelope.envelope().len(), 1);
1557 mock.lock().await.assert_call(DataCategory::Error, 1);
1558 mock.lock().await.assert_call(DataCategory::Session, 2);
1559 }
1560
1561 #[tokio::test]
1562 #[cfg(feature = "processing")]
1563 async fn test_enforce_limit_assumed_event() {
1564 let mut envelope = envelope![];
1565
1566 let mock = mock_limiter(Some(DataCategory::Transaction));
1567 let (_, limits) =
1568 enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Transaction)).await;
1569
1570 assert!(limits.is_limited());
1571 assert!(envelope.envelope().is_empty()); mock.lock().await.assert_call(DataCategory::Transaction, 1);
1573 }
1574
1575 #[tokio::test]
1576 #[cfg(feature = "processing")]
1577 async fn test_enforce_limit_assumed_attachments() {
1578 let mut envelope = envelope![Attachment, Attachment];
1579
1580 let mock = mock_limiter(Some(DataCategory::Error));
1581 let (_, limits) =
1582 enforce_and_apply(mock.clone(), &mut envelope, Some(DataCategory::Error)).await;
1583
1584 assert!(limits.is_limited());
1585 assert!(envelope.envelope().is_empty());
1586 mock.lock().await.assert_call(DataCategory::Error, 1);
1587 }
1588
1589 #[tokio::test]
1590 async fn test_enforce_transaction() {
1591 let mut envelope = envelope![Transaction];
1592
1593 let mock = mock_limiter(Some(DataCategory::Transaction));
1594 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1595
1596 assert!(limits.is_limited());
1597 assert!(enforcement.event_indexed.is_active());
1598 assert!(enforcement.event.is_active());
1599 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1600
1601 assert_eq!(
1602 get_outcomes(enforcement),
1603 vec![
1604 (DataCategory::Transaction, 1),
1605 (DataCategory::TransactionIndexed, 1),
1606 ]
1607 );
1608 }
1609
1610 #[tokio::test]
1611 async fn test_enforce_transaction_non_indexed() {
1612 let mut envelope = envelope![Transaction, Profile];
1613 let scoping = envelope.scoping();
1614
1615 let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1616
1617 let mock_clone = mock.clone();
1618 let limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, move |s, q| {
1619 let mock_clone = mock_clone.clone();
1620 async move {
1621 let mut mock = mock_clone.lock().await;
1622 mock.check(s, q)
1623 }
1624 });
1625 let (enforcement, limits) = limiter
1626 .compute(envelope.envelope_mut(), &scoping)
1627 .await
1628 .unwrap();
1629 enforcement.clone().apply_with_outcomes(&mut envelope);
1630
1631 assert!(!limits.is_limited());
1632 assert!(!enforcement.event_indexed.is_active());
1633 assert!(!enforcement.event.is_active());
1634 assert!(!enforcement.profiles_indexed.is_active());
1635 assert!(!enforcement.profiles.is_active());
1636 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1637 mock.lock().await.assert_call(DataCategory::Profile, 1);
1638 }
1639
1640 #[tokio::test]
1641 async fn test_enforce_transaction_no_indexing_quota() {
1642 let mut envelope = envelope![Transaction];
1643
1644 let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1645 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1646
1647 assert!(limits.is_limited());
1648 assert!(enforcement.event_indexed.is_active());
1649 assert!(!enforcement.event.is_active());
1650 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1651 mock.lock()
1652 .await
1653 .assert_call(DataCategory::TransactionIndexed, 1);
1654 }
1655
1656 #[tokio::test]
1657 async fn test_enforce_transaction_attachment_enforced() {
1658 let mut envelope = envelope![Transaction, Attachment];
1659
1660 let mock = mock_limiter(Some(DataCategory::Transaction));
1661 let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1662
1663 assert!(enforcement.event.is_active());
1664 assert!(enforcement.attachments.is_active());
1665 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1666 }
1667
1668 fn get_outcomes(enforcement: Enforcement) -> Vec<(DataCategory, usize)> {
1669 enforcement
1670 .get_outcomes()
1671 .map(|(_, data_category, quantity)| (data_category, quantity))
1672 .collect::<Vec<_>>()
1673 }
1674
1675 #[tokio::test]
1676 async fn test_enforce_transaction_profile_enforced() {
1677 let mut envelope = envelope![Transaction, Profile];
1678
1679 let mock = mock_limiter(Some(DataCategory::Transaction));
1680 let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1681
1682 assert!(enforcement.event.is_active());
1683 assert!(enforcement.profiles.is_active());
1684 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1685
1686 assert_eq!(
1687 get_outcomes(enforcement),
1688 vec![
1689 (DataCategory::Transaction, 1),
1690 (DataCategory::TransactionIndexed, 1),
1691 (DataCategory::Profile, 1),
1692 (DataCategory::ProfileIndexed, 1),
1693 ]
1694 );
1695 }
1696
1697 #[tokio::test]
1698 async fn test_enforce_transaction_standalone_profile_enforced() {
1699 let mut envelope = envelope![Profile];
1701
1702 let mock = mock_limiter(Some(DataCategory::Transaction));
1703 let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1704
1705 assert!(enforcement.profiles.is_active());
1706 mock.lock().await.assert_call(DataCategory::Profile, 1);
1707 mock.lock().await.assert_call(DataCategory::Transaction, 0);
1708
1709 assert_eq!(
1710 get_outcomes(enforcement),
1711 vec![
1712 (DataCategory::Profile, 1),
1713 (DataCategory::ProfileIndexed, 1),
1714 ]
1715 );
1716 }
1717
1718 #[tokio::test]
1719 async fn test_enforce_transaction_attachment_enforced_indexing_quota() {
1720 let mut envelope = envelope![Transaction, Attachment];
1721 set_extracted(envelope.envelope_mut(), ItemType::Transaction);
1722
1723 let mock = mock_limiter(Some(DataCategory::TransactionIndexed));
1724 let (enforcement, _) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1725
1726 assert!(!enforcement.event.is_active());
1727 assert!(enforcement.event_indexed.is_active());
1728 assert!(enforcement.attachments.is_active());
1729 assert!(enforcement.attachment_items.is_active());
1730 mock.lock().await.assert_call(DataCategory::Transaction, 1);
1731 mock.lock()
1732 .await
1733 .assert_call(DataCategory::TransactionIndexed, 1);
1734
1735 assert_eq!(
1736 get_outcomes(enforcement),
1737 vec![
1738 (DataCategory::TransactionIndexed, 1),
1739 (DataCategory::Attachment, 10),
1740 (DataCategory::AttachmentItem, 1)
1741 ]
1742 );
1743 }
1744
1745 #[tokio::test]
1746 async fn test_enforce_span() {
1747 let mut envelope = envelope![Span, OtelSpan];
1748
1749 let mock = mock_limiter(Some(DataCategory::Span));
1750 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1751
1752 assert!(limits.is_limited());
1753 assert!(enforcement.spans_indexed.is_active());
1754 assert!(enforcement.spans.is_active());
1755 mock.lock().await.assert_call(DataCategory::Span, 2);
1756
1757 assert_eq!(
1758 get_outcomes(enforcement),
1759 vec![(DataCategory::Span, 2), (DataCategory::SpanIndexed, 2)]
1760 );
1761 }
1762
1763 #[tokio::test]
1764 async fn test_enforce_span_no_indexing_quota() {
1765 let mut envelope = envelope![OtelSpan, Span];
1766
1767 let mock = mock_limiter(Some(DataCategory::SpanIndexed));
1768 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1769
1770 assert!(limits.is_limited());
1771 assert!(enforcement.spans_indexed.is_active());
1772 assert!(!enforcement.spans.is_active());
1773 mock.lock().await.assert_call(DataCategory::Span, 2);
1774 mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
1775
1776 assert_eq!(
1777 get_outcomes(enforcement),
1778 vec![(DataCategory::SpanIndexed, 2)]
1779 );
1780 }
1781
1782 #[tokio::test]
1783 async fn test_enforce_span_metrics_extracted_no_indexing_quota() {
1784 let mut envelope = envelope![Span, OtelSpan];
1785 set_extracted(envelope.envelope_mut(), ItemType::Span);
1786
1787 let mock = mock_limiter(Some(DataCategory::SpanIndexed));
1788 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1789
1790 assert!(limits.is_limited());
1791 assert!(enforcement.spans_indexed.is_active());
1792 assert!(!enforcement.spans.is_active());
1793 mock.lock().await.assert_call(DataCategory::Span, 2);
1794 mock.lock().await.assert_call(DataCategory::SpanIndexed, 2);
1795
1796 assert_eq!(
1797 get_outcomes(enforcement),
1798 vec![(DataCategory::SpanIndexed, 2)]
1799 );
1800 }
1801
1802 #[test]
1803 fn test_source_quantity_for_total_quantity() {
1804 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
1805 .parse()
1806 .unwrap();
1807 let request_meta = RequestMeta::new(dsn);
1808
1809 let mut envelope = Envelope::from_request(None, request_meta);
1810
1811 let mut item = Item::new(ItemType::MetricBuckets);
1812 item.set_source_quantities(SourceQuantities {
1813 transactions: 5,
1814 spans: 0,
1815 profiles: 2,
1816 buckets: 5,
1817 });
1818 envelope.add_item(item);
1819
1820 let mut item = Item::new(ItemType::MetricBuckets);
1821 item.set_source_quantities(SourceQuantities {
1822 transactions: 2,
1823 spans: 0,
1824 profiles: 0,
1825 buckets: 3,
1826 });
1827 envelope.add_item(item);
1828
1829 let summary = EnvelopeSummary::compute(&envelope);
1830
1831 assert_eq!(summary.profile_quantity, 2);
1832 assert_eq!(summary.secondary_transaction_quantity, 7);
1833 }
1834
1835 #[tokio::test]
1836 async fn test_enforce_limit_logs_count() {
1837 let mut envelope = envelope![Log, Log];
1838
1839 let mock = mock_limiter(Some(DataCategory::LogItem));
1840 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1841
1842 assert!(limits.is_limited());
1843 assert_eq!(envelope.envelope().len(), 0);
1844 mock.lock().await.assert_call(DataCategory::LogItem, 2);
1845 mock.lock().await.assert_call(DataCategory::LogByte, 20);
1846
1847 assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogItem, 2)]);
1848 }
1849
1850 #[tokio::test]
1851 async fn test_enforce_limit_logs_bytes() {
1852 let mut envelope = envelope![Log, Log];
1853
1854 let mock = mock_limiter(Some(DataCategory::LogByte));
1855 let (enforcement, limits) = enforce_and_apply(mock.clone(), &mut envelope, None).await;
1856
1857 assert!(limits.is_limited());
1858 assert_eq!(envelope.envelope().len(), 0);
1859 mock.lock().await.assert_call(DataCategory::LogItem, 2);
1860 mock.lock().await.assert_call(DataCategory::LogByte, 20);
1861
1862 assert_eq!(get_outcomes(enforcement), vec![(DataCategory::LogByte, 20)]);
1863 }
1864}