use std::fmt::{self, Write};
use relay_quotas::{
DataCategories, DataCategory, ItemScoping, QuotaScope, RateLimit, RateLimitScope, RateLimits,
ReasonCode, Scoping,
};
use crate::envelope::{CountFor, Envelope, Item, ItemType};
use crate::services::outcome::Outcome;
use crate::utils::ManagedEnvelope;
pub const RATE_LIMITS_HEADER: &str = "X-Sentry-Rate-Limits";
pub fn format_rate_limits(rate_limits: &RateLimits) -> String {
let mut header = String::new();
for rate_limit in rate_limits {
if !header.is_empty() {
header.push_str(", ");
}
write!(header, "{}:", rate_limit.retry_after.remaining_seconds()).ok();
for (index, category) in rate_limit.categories.iter().enumerate() {
if index > 0 {
header.push(';');
}
write!(header, "{category}").ok();
}
write!(header, ":{}", rate_limit.scope.name()).ok();
if let Some(ref reason_code) = rate_limit.reason_code {
write!(header, ":{reason_code}").ok();
} else if !rate_limit.namespaces.is_empty() {
write!(header, ":").ok(); }
for (index, namespace) in rate_limit.namespaces.iter().enumerate() {
header.push(if index == 0 { ':' } else { ';' });
write!(header, "{namespace}").ok();
}
}
header
}
pub fn parse_rate_limits(scoping: &Scoping, string: &str) -> RateLimits {
let mut rate_limits = RateLimits::new();
for limit in string.split(',') {
let limit = limit.trim();
if limit.is_empty() {
continue;
}
let mut components = limit.split(':');
let retry_after = match components.next().and_then(|s| s.parse().ok()) {
Some(retry_after) => retry_after,
None => continue,
};
let mut categories = DataCategories::new();
for category in components.next().unwrap_or("").split(';') {
if !category.is_empty() {
categories.push(DataCategory::from_name(category));
}
}
let quota_scope = QuotaScope::from_name(components.next().unwrap_or(""));
let scope = RateLimitScope::for_quota(scoping, quota_scope);
let reason_code = components
.next()
.filter(|s| !s.is_empty())
.map(ReasonCode::new);
let namespace = components
.next()
.unwrap_or("")
.split(';')
.filter(|s| !s.is_empty())
.filter_map(|s| s.parse().ok())
.collect();
rate_limits.add(RateLimit {
categories,
scope,
reason_code,
retry_after,
namespaces: namespace,
});
}
rate_limits
}
fn infer_event_category(item: &Item) -> Option<DataCategory> {
match item.ty() {
ItemType::Event => Some(DataCategory::Error),
ItemType::Transaction => Some(DataCategory::Transaction),
ItemType::Security | ItemType::RawSecurity => Some(DataCategory::Security),
ItemType::Nel => Some(DataCategory::Error),
ItemType::UnrealReport => Some(DataCategory::Error),
ItemType::UserReportV2 => Some(DataCategory::UserReportV2),
ItemType::Attachment if item.creates_event() => Some(DataCategory::Error),
ItemType::Attachment => None,
ItemType::Session => None,
ItemType::Sessions => None,
ItemType::Statsd => None,
ItemType::MetricBuckets => None,
ItemType::FormData => None,
ItemType::UserReport => None,
ItemType::Profile => None,
ItemType::ReplayEvent => None,
ItemType::ReplayRecording => None,
ItemType::ReplayVideo => None,
ItemType::ClientReport => None,
ItemType::CheckIn => None,
ItemType::Span => None,
ItemType::OtelSpan => None,
ItemType::OtelTracesData => None,
ItemType::ProfileChunk => Some(DataCategory::ProfileChunk),
ItemType::Unknown(_) => None,
}
}
#[non_exhaustive]
#[derive(Clone, Copy, Debug, Default)]
pub struct EnvelopeSummary {
pub event_category: Option<DataCategory>,
pub attachment_quantity: usize,
pub attachment_item_quantity: usize,
pub session_quantity: usize,
pub profile_quantity: usize,
pub replay_quantity: usize,
pub monitor_quantity: usize,
pub secondary_transaction_quantity: usize,
pub secondary_span_quantity: usize,
pub span_quantity: usize,
pub has_plain_attachments: bool,
pub payload_size: usize,
pub profile_chunk_quantity: usize,
}
impl EnvelopeSummary {
pub fn empty() -> Self {
Self::default()
}
pub fn compute(envelope: &Envelope) -> Self {
let mut summary = Self::empty();
for item in envelope.items() {
if item.creates_event() {
summary.infer_category(item);
} else if item.ty() == &ItemType::Attachment {
summary.has_plain_attachments = true;
}
if item.rate_limited() {
continue;
}
if let Some(source_quantities) = item.source_quantities() {
summary.secondary_transaction_quantity += source_quantities.transactions;
summary.secondary_span_quantity += source_quantities.spans;
summary.profile_quantity += source_quantities.profiles;
}
summary.payload_size += item.len();
for (category, quantity) in item.quantities(CountFor::RateLimits) {
summary.add_quantity(category, quantity);
}
}
summary
}
fn add_quantity(&mut self, category: DataCategory, quantity: usize) {
let target_quantity = match category {
DataCategory::Attachment => &mut self.attachment_quantity,
DataCategory::AttachmentItem => &mut self.attachment_item_quantity,
DataCategory::Session => &mut self.session_quantity,
DataCategory::Profile => &mut self.profile_quantity,
DataCategory::Replay => &mut self.replay_quantity,
DataCategory::ReplayVideo => &mut self.replay_quantity,
DataCategory::Monitor => &mut self.monitor_quantity,
DataCategory::Span => &mut self.span_quantity,
DataCategory::ProfileChunk => &mut self.profile_chunk_quantity,
_ => return,
};
*target_quantity += quantity;
}
fn infer_category(&mut self, item: &Item) {
if matches!(self.event_category, None | Some(DataCategory::Default)) {
if let Some(category) = infer_event_category(item) {
self.event_category = Some(category);
}
}
}
}
#[derive(Debug)]
#[cfg_attr(test, derive(Clone))]
pub struct CategoryLimit {
category: DataCategory,
quantity: usize,
reason_code: Option<ReasonCode>,
}
impl CategoryLimit {
fn new(category: DataCategory, quantity: usize, rate_limit: Option<&RateLimit>) -> Self {
match rate_limit {
Some(limit) => Self {
category,
quantity,
reason_code: limit.reason_code.clone(),
},
None => Self::default(),
}
}
pub fn clone_for(&self, category: DataCategory, quantity: usize) -> CategoryLimit {
if !self.is_active() {
return Self::default();
}
Self {
category,
quantity,
reason_code: self.reason_code.clone(),
}
}
pub fn is_active(&self) -> bool {
self.quantity > 0
}
}
impl Default for CategoryLimit {
fn default() -> Self {
Self {
category: DataCategory::Default,
quantity: 0,
reason_code: None,
}
}
}
#[derive(Default, Debug)]
#[cfg_attr(test, derive(Clone))]
pub struct Enforcement {
pub event: CategoryLimit,
pub event_indexed: CategoryLimit,
pub attachments: CategoryLimit,
pub attachment_items: CategoryLimit,
pub sessions: CategoryLimit,
pub profiles: CategoryLimit,
pub profiles_indexed: CategoryLimit,
pub replays: CategoryLimit,
pub check_ins: CategoryLimit,
pub spans: CategoryLimit,
pub spans_indexed: CategoryLimit,
pub user_reports_v2: CategoryLimit,
pub profile_chunks: CategoryLimit,
}
impl Enforcement {
pub fn active_event(&self) -> Option<&CategoryLimit> {
if self.event.is_active() {
Some(&self.event)
} else if self.event_indexed.is_active() {
Some(&self.event_indexed)
} else {
None
}
}
pub fn is_event_active(&self) -> bool {
self.active_event().is_some()
}
fn get_outcomes(self) -> impl Iterator<Item = (Outcome, DataCategory, usize)> {
let Self {
event,
event_indexed,
attachments,
attachment_items,
sessions: _, profiles,
profiles_indexed,
replays,
check_ins,
spans,
spans_indexed,
user_reports_v2,
profile_chunks,
} = self;
let limits = [
event,
event_indexed,
attachments,
attachment_items,
profiles,
profiles_indexed,
replays,
check_ins,
spans,
spans_indexed,
user_reports_v2,
profile_chunks,
];
limits
.into_iter()
.filter(move |limit| limit.is_active())
.map(move |limit| {
(
Outcome::RateLimited(limit.reason_code),
limit.category,
limit.quantity,
)
})
}
pub fn apply_with_outcomes(self, envelope: &mut ManagedEnvelope) {
envelope
.envelope_mut()
.retain_items(|item| self.retain_item(item));
self.track_outcomes(envelope);
}
fn retain_item(&self, item: &mut Item) -> bool {
if self.event.is_active() && item.requires_event() {
return false;
}
match item.ty() {
ItemType::Attachment => {
if !(self.attachments.is_active() || self.attachment_items.is_active()) {
return true;
}
if item.creates_event() {
item.set_rate_limited(true);
true
} else {
false
}
}
ItemType::Session => !self.sessions.is_active(),
ItemType::Profile => !self.profiles_indexed.is_active(),
ItemType::ReplayEvent => !self.replays.is_active(),
ItemType::ReplayVideo => !self.replays.is_active(),
ItemType::ReplayRecording => !self.replays.is_active(),
ItemType::CheckIn => !self.check_ins.is_active(),
ItemType::Span | ItemType::OtelSpan | ItemType::OtelTracesData => {
!self.spans_indexed.is_active()
}
ItemType::ProfileChunk => !self.profile_chunks.is_active(),
ItemType::Event
| ItemType::Transaction
| ItemType::Security
| ItemType::FormData
| ItemType::RawSecurity
| ItemType::Nel
| ItemType::UnrealReport
| ItemType::UserReport
| ItemType::Sessions
| ItemType::Statsd
| ItemType::MetricBuckets
| ItemType::ClientReport
| ItemType::UserReportV2
| ItemType::Unknown(_) => true,
}
}
fn track_outcomes(self, envelope: &mut ManagedEnvelope) {
for (outcome, category, quantity) in self.get_outcomes() {
envelope.track_outcome(outcome, category, quantity)
}
}
}
#[derive(Debug, Copy, Clone)]
pub enum CheckLimits {
NonIndexed,
#[cfg_attr(not(any(feature = "processing", test)), expect(dead_code))]
All,
}
struct Check<F> {
limits: CheckLimits,
check: F,
}
impl<F, E> Check<F>
where
F: FnMut(ItemScoping<'_>, usize) -> Result<RateLimits, E>,
{
fn apply(&mut self, scoping: ItemScoping<'_>, quantity: usize) -> Result<RateLimits, E> {
if matches!(self.limits, CheckLimits::NonIndexed) && scoping.category.is_indexed() {
return Ok(RateLimits::default());
}
(self.check)(scoping, quantity)
}
}
pub struct EnvelopeLimiter<F> {
check: Check<F>,
event_category: Option<DataCategory>,
}
impl<E, F> EnvelopeLimiter<F>
where
F: FnMut(ItemScoping<'_>, usize) -> Result<RateLimits, E>,
{
pub fn new(limits: CheckLimits, check: F) -> Self {
Self {
check: Check { check, limits },
event_category: None,
}
}
#[cfg(feature = "processing")]
pub fn assume_event(&mut self, category: DataCategory) {
self.event_category = Some(category);
}
pub fn compute(
mut self,
envelope: &mut Envelope,
scoping: &Scoping,
) -> Result<(Enforcement, RateLimits), E> {
let mut summary = EnvelopeSummary::compute(envelope);
summary.event_category = self.event_category.or(summary.event_category);
let (enforcement, rate_limits) = self.execute(&summary, scoping)?;
Ok((enforcement, rate_limits))
}
fn execute(
&mut self,
summary: &EnvelopeSummary,
scoping: &Scoping,
) -> Result<(Enforcement, RateLimits), E> {
let mut rate_limits = RateLimits::new();
let mut enforcement = Enforcement::default();
if let Some(category) = summary.event_category {
let mut event_limits = self.check.apply(scoping.item(category), 1)?;
enforcement.event = CategoryLimit::new(category, 1, event_limits.longest());
if let Some(index_category) = category.index_category() {
if event_limits.is_empty() {
event_limits.merge(self.check.apply(scoping.item(index_category), 1)?);
}
enforcement.event_indexed =
CategoryLimit::new(index_category, 1, event_limits.longest());
};
rate_limits.merge(event_limits);
}
if let Some(limit) = enforcement.active_event() {
let limit1 = limit.clone_for(DataCategory::Attachment, summary.attachment_quantity);
let limit2 = limit.clone_for(
DataCategory::AttachmentItem,
summary.attachment_item_quantity,
);
enforcement.attachments = limit1;
enforcement.attachment_items = limit2;
} else {
let mut attachment_limits = RateLimits::new();
if summary.attachment_quantity > 0 {
let item_scoping = scoping.item(DataCategory::Attachment);
let attachment_byte_limits = self
.check
.apply(item_scoping, summary.attachment_quantity)?;
enforcement.attachments = CategoryLimit::new(
DataCategory::Attachment,
summary.attachment_quantity,
attachment_byte_limits.longest(),
);
attachment_limits.merge(attachment_byte_limits);
}
if !attachment_limits.is_limited() && summary.attachment_item_quantity > 0 {
let item_scoping = scoping.item(DataCategory::AttachmentItem);
let attachment_item_limits = self
.check
.apply(item_scoping, summary.attachment_item_quantity)?;
enforcement.attachment_items = CategoryLimit::new(
DataCategory::AttachmentItem,
summary.attachment_item_quantity,
attachment_item_limits.longest(),
);
attachment_limits.merge(attachment_item_limits);
}
if summary.has_plain_attachments {
rate_limits.merge(attachment_limits);
}
}
if summary.session_quantity > 0 {
let item_scoping = scoping.item(DataCategory::Session);
let session_limits = self.check.apply(item_scoping, summary.session_quantity)?;
enforcement.sessions = CategoryLimit::new(
DataCategory::Session,
summary.session_quantity,
session_limits.longest(),
);
rate_limits.merge(session_limits);
}
if enforcement.is_event_active() {
enforcement.profiles = enforcement
.event
.clone_for(DataCategory::Profile, summary.profile_quantity);
enforcement.profiles_indexed = enforcement
.event_indexed
.clone_for(DataCategory::ProfileIndexed, summary.profile_quantity)
} else if summary.profile_quantity > 0 {
let mut profile_limits = self.check.apply(
scoping.item(DataCategory::Profile),
summary.profile_quantity,
)?;
if profile_limits.is_empty() && summary.event_category.is_none() {
profile_limits = self
.check
.apply(scoping.item(DataCategory::Transaction), 0)?;
}
enforcement.profiles = CategoryLimit::new(
DataCategory::Profile,
summary.profile_quantity,
profile_limits.longest(),
);
if profile_limits.is_empty() {
profile_limits.merge(self.check.apply(
scoping.item(DataCategory::ProfileIndexed),
summary.profile_quantity,
)?);
}
enforcement.profiles_indexed = CategoryLimit::new(
DataCategory::ProfileIndexed,
summary.profile_quantity,
profile_limits.longest(),
);
rate_limits.merge(profile_limits);
}
if summary.replay_quantity > 0 {
let item_scoping = scoping.item(DataCategory::Replay);
let replay_limits = self.check.apply(item_scoping, summary.replay_quantity)?;
enforcement.replays = CategoryLimit::new(
DataCategory::Replay,
summary.replay_quantity,
replay_limits.longest(),
);
rate_limits.merge(replay_limits);
}
if summary.monitor_quantity > 0 {
let item_scoping = scoping.item(DataCategory::Monitor);
let checkin_limits = self.check.apply(item_scoping, summary.monitor_quantity)?;
enforcement.check_ins = CategoryLimit::new(
DataCategory::Monitor,
summary.monitor_quantity,
checkin_limits.longest(),
);
rate_limits.merge(checkin_limits);
}
if enforcement.is_event_active() {
enforcement.spans = enforcement
.event
.clone_for(DataCategory::Span, summary.span_quantity);
enforcement.spans_indexed = enforcement
.event_indexed
.clone_for(DataCategory::SpanIndexed, summary.span_quantity);
} else if summary.span_quantity > 0 {
let mut span_limits = self
.check
.apply(scoping.item(DataCategory::Span), summary.span_quantity)?;
enforcement.spans = CategoryLimit::new(
DataCategory::Span,
summary.span_quantity,
span_limits.longest(),
);
if span_limits.is_empty() {
span_limits.merge(self.check.apply(
scoping.item(DataCategory::SpanIndexed),
summary.span_quantity,
)?);
}
enforcement.spans_indexed = CategoryLimit::new(
DataCategory::SpanIndexed,
summary.span_quantity,
span_limits.longest(),
);
rate_limits.merge(span_limits);
}
if summary.profile_chunk_quantity > 0 {
let item_scoping = scoping.item(DataCategory::ProfileChunk);
let profile_chunk_limits = self
.check
.apply(item_scoping, summary.profile_chunk_quantity)?;
enforcement.profile_chunks = CategoryLimit::new(
DataCategory::ProfileChunk,
summary.profile_chunk_quantity,
profile_chunk_limits.longest(),
);
rate_limits.merge(profile_chunk_limits);
}
Ok((enforcement, rate_limits))
}
}
impl<F> fmt::Debug for EnvelopeLimiter<F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EnvelopeLimiter")
.field("event_category", &self.event_category)
.finish()
}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet};
use relay_base_schema::organization::OrganizationId;
use relay_base_schema::project::{ProjectId, ProjectKey};
use relay_metrics::MetricNamespace;
use relay_quotas::RetryAfter;
use relay_system::Addr;
use smallvec::smallvec;
use super::*;
use crate::services::processor::ProcessingGroup;
use crate::{
envelope::{AttachmentType, ContentType, SourceQuantities},
extractors::RequestMeta,
};
#[test]
fn test_format_rate_limits() {
let mut rate_limits = RateLimits::new();
rate_limits.add(RateLimit {
categories: DataCategories::new(),
scope: RateLimitScope::Organization(OrganizationId::new(42)),
reason_code: Some(ReasonCode::new("my_limit")),
retry_after: RetryAfter::from_secs(42),
namespaces: smallvec![],
});
rate_limits.add(RateLimit {
categories: smallvec![DataCategory::Transaction, DataCategory::Security],
scope: RateLimitScope::Project(ProjectId::new(21)),
reason_code: None,
retry_after: RetryAfter::from_secs(4711),
namespaces: smallvec![],
});
let formatted = format_rate_limits(&rate_limits);
let expected = "42::organization:my_limit, 4711:transaction;security:project";
assert_eq!(formatted, expected);
}
#[test]
fn test_format_rate_limits_namespace() {
let mut rate_limits = RateLimits::new();
rate_limits.add(RateLimit {
categories: smallvec![DataCategory::MetricBucket],
scope: RateLimitScope::Organization(OrganizationId::new(42)),
reason_code: Some(ReasonCode::new("my_limit")),
retry_after: RetryAfter::from_secs(42),
namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
});
rate_limits.add(RateLimit {
categories: smallvec![DataCategory::MetricBucket],
scope: RateLimitScope::Organization(OrganizationId::new(42)),
reason_code: None,
retry_after: RetryAfter::from_secs(42),
namespaces: smallvec![MetricNamespace::Spans],
});
let formatted = format_rate_limits(&rate_limits);
let expected =
"42:metric_bucket:organization:my_limit:custom;spans, 42:metric_bucket:organization::spans";
assert_eq!(formatted, expected);
}
#[test]
fn test_parse_invalid_rate_limits() {
let scoping = Scoping {
organization_id: OrganizationId::new(42),
project_id: ProjectId::new(21),
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
};
assert!(parse_rate_limits(&scoping, "").is_ok());
assert!(parse_rate_limits(&scoping, "invalid").is_ok());
assert!(parse_rate_limits(&scoping, ",,,").is_ok());
}
#[test]
fn test_parse_rate_limits() {
let scoping = Scoping {
organization_id: OrganizationId::new(42),
project_id: ProjectId::new(21),
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
};
let formatted =
"42::organization:my_limit, invalid, 4711:foobar;transaction;security:project";
let rate_limits: Vec<RateLimit> =
parse_rate_limits(&scoping, formatted).into_iter().collect();
assert_eq!(
rate_limits,
vec![
RateLimit {
categories: DataCategories::new(),
scope: RateLimitScope::Organization(OrganizationId::new(42)),
reason_code: Some(ReasonCode::new("my_limit")),
retry_after: rate_limits[0].retry_after,
namespaces: smallvec![],
},
RateLimit {
categories: smallvec![
DataCategory::Unknown,
DataCategory::Transaction,
DataCategory::Security,
],
scope: RateLimitScope::Project(ProjectId::new(21)),
reason_code: None,
retry_after: rate_limits[1].retry_after,
namespaces: smallvec![],
}
]
);
assert_eq!(42, rate_limits[0].retry_after.remaining_seconds());
assert_eq!(4711, rate_limits[1].retry_after.remaining_seconds());
}
#[test]
fn test_parse_rate_limits_namespace() {
let scoping = Scoping {
organization_id: OrganizationId::new(42),
project_id: ProjectId::new(21),
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
};
let formatted = "42:metric_bucket:organization::custom;spans";
let rate_limits: Vec<RateLimit> =
parse_rate_limits(&scoping, formatted).into_iter().collect();
assert_eq!(
rate_limits,
vec![RateLimit {
categories: smallvec![DataCategory::MetricBucket],
scope: RateLimitScope::Organization(OrganizationId::new(42)),
reason_code: None,
retry_after: rate_limits[0].retry_after,
namespaces: smallvec![MetricNamespace::Custom, MetricNamespace::Spans],
}]
);
}
#[test]
fn test_parse_rate_limits_empty_namespace() {
let scoping = Scoping {
organization_id: OrganizationId::new(42),
project_id: ProjectId::new(21),
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
};
let formatted = "42:metric_bucket:organization:some_reason:";
let rate_limits: Vec<RateLimit> =
parse_rate_limits(&scoping, formatted).into_iter().collect();
assert_eq!(
rate_limits,
vec![RateLimit {
categories: smallvec![DataCategory::MetricBucket],
scope: RateLimitScope::Organization(OrganizationId::new(42)),
reason_code: Some(ReasonCode::new("some_reason")),
retry_after: rate_limits[0].retry_after,
namespaces: smallvec![],
}]
);
}
#[test]
fn test_parse_rate_limits_only_unknown() {
let scoping = Scoping {
organization_id: OrganizationId::new(42),
project_id: ProjectId::new(21),
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
};
let formatted = "42:foo;bar:organization";
let rate_limits: Vec<RateLimit> =
parse_rate_limits(&scoping, formatted).into_iter().collect();
assert_eq!(
rate_limits,
vec![RateLimit {
categories: smallvec![DataCategory::Unknown, DataCategory::Unknown],
scope: RateLimitScope::Organization(OrganizationId::new(42)),
reason_code: None,
retry_after: rate_limits[0].retry_after,
namespaces: smallvec![],
},]
);
}
macro_rules! envelope {
($( $item_type:ident $( :: $attachment_type:ident )? ),*) => {{
let bytes = "{\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}";
#[allow(unused_mut)]
let mut envelope = Envelope::parse_bytes(bytes.into()).unwrap();
$(
let mut item = Item::new(ItemType::$item_type);
item.set_payload(ContentType::OctetStream, "0123456789");
$( item.set_attachment_type(AttachmentType::$attachment_type); )?
envelope.add_item(item);
)*
let (outcome_aggregator, _) = Addr::custom();
let (test_store, _) = Addr::custom();
ManagedEnvelope::new(
envelope,
outcome_aggregator,
test_store,
ProcessingGroup::Ungrouped,
)
}}
}
fn set_extracted(envelope: &mut Envelope, ty: ItemType) {
envelope
.get_item_by_mut(|item| *item.ty() == ty)
.unwrap()
.set_metrics_extracted(true);
}
fn rate_limit(category: DataCategory) -> RateLimit {
RateLimit {
categories: vec![category].into(),
scope: RateLimitScope::Organization(OrganizationId::new(42)),
reason_code: None,
retry_after: RetryAfter::from_secs(60),
namespaces: smallvec![],
}
}
#[derive(Debug, Default)]
struct MockLimiter {
denied: Vec<DataCategory>,
called: BTreeMap<DataCategory, usize>,
checked: BTreeSet<DataCategory>,
}
impl MockLimiter {
pub fn deny(mut self, category: DataCategory) -> Self {
self.denied.push(category);
self
}
pub fn check(
&mut self,
scoping: ItemScoping<'_>,
quantity: usize,
) -> Result<RateLimits, ()> {
let cat = scoping.category;
let previous = self.called.insert(cat, quantity);
assert!(previous.is_none(), "rate limiter invoked twice for {cat}");
let mut limits = RateLimits::new();
if self.denied.contains(&cat) {
limits.add(rate_limit(cat));
}
Ok(limits)
}
#[track_caller]
pub fn assert_call(&mut self, category: DataCategory, expected: usize) {
self.checked.insert(category);
let quantity = self.called.get(&category).copied();
assert_eq!(
quantity,
Some(expected),
"Expected quantity `{expected}` for data category `{category}`, got {quantity:?}."
);
}
}
impl Drop for MockLimiter {
fn drop(&mut self) {
if std::thread::panicking() {
return;
}
for checked in &self.checked {
self.called.remove(checked);
}
if self.called.is_empty() {
return;
}
let not_asserted = self
.called
.iter()
.map(|(k, v)| format!("- {k}: {v}"))
.collect::<Vec<_>>()
.join("\n");
panic!("Following calls to the limiter were not asserted:\n{not_asserted}");
}
}
fn enforce_and_apply(
mock: &mut MockLimiter,
envelope: &mut ManagedEnvelope,
#[allow(unused_variables)] assume_event: Option<DataCategory>,
) -> (Enforcement, RateLimits) {
let scoping = envelope.scoping();
#[allow(unused_mut)]
let mut limiter = EnvelopeLimiter::new(CheckLimits::All, |s, q| mock.check(s, q));
#[cfg(feature = "processing")]
if let Some(assume_event) = assume_event {
limiter.assume_event(assume_event);
}
let (enforcement, limits) = limiter.compute(envelope.envelope_mut(), &scoping).unwrap();
enforcement.clone().apply_with_outcomes(envelope);
(enforcement, limits)
}
#[test]
fn test_enforce_pass_empty() {
let mut envelope = envelope![];
let mut mock = MockLimiter::default();
let (_, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(!limits.is_limited());
assert!(envelope.envelope().is_empty());
}
#[test]
fn test_enforce_limit_error_event() {
let mut envelope = envelope![Event];
let mut mock = MockLimiter::default().deny(DataCategory::Error);
let (_, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(limits.is_limited());
assert!(envelope.envelope().is_empty());
mock.assert_call(DataCategory::Error, 1);
}
#[test]
fn test_enforce_limit_error_with_attachments() {
let mut envelope = envelope![Event, Attachment];
let mut mock = MockLimiter::default().deny(DataCategory::Error);
let (_, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(limits.is_limited());
assert!(envelope.envelope().is_empty());
mock.assert_call(DataCategory::Error, 1);
}
#[test]
fn test_enforce_limit_minidump() {
let mut envelope = envelope![Attachment::Minidump];
let mut mock = MockLimiter::default().deny(DataCategory::Error);
let (_, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(limits.is_limited());
assert!(envelope.envelope().is_empty());
mock.assert_call(DataCategory::Error, 1);
}
#[test]
fn test_enforce_limit_attachments() {
let mut envelope = envelope![Attachment::Minidump, Attachment];
let mut mock = MockLimiter::default().deny(DataCategory::Attachment);
let (_, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(limits.is_limited());
assert_eq!(envelope.envelope().len(), 1);
mock.assert_call(DataCategory::Error, 1);
mock.assert_call(DataCategory::Attachment, 20);
}
#[test]
fn test_enforce_limit_profiles() {
let mut envelope = envelope![Profile, Profile];
let mut mock = MockLimiter::default().deny(DataCategory::Profile);
let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(limits.is_limited());
assert_eq!(envelope.envelope().len(), 0);
mock.assert_call(DataCategory::Profile, 2);
assert_eq!(
get_outcomes(enforcement),
vec![
(DataCategory::Profile, 2),
(DataCategory::ProfileIndexed, 2)
]
);
}
#[test]
fn test_enforce_limit_profile_chunks() {
let mut envelope = envelope![ProfileChunk, ProfileChunk];
let mut mock = MockLimiter::default().deny(DataCategory::ProfileChunk);
let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(limits.is_limited());
assert_eq!(envelope.envelope().len(), 0);
mock.assert_call(DataCategory::ProfileChunk, 2);
assert_eq!(
get_outcomes(enforcement),
vec![(DataCategory::ProfileChunk, 2),]
);
}
#[test]
fn test_enforce_limit_replays() {
let mut envelope = envelope![ReplayEvent, ReplayRecording, ReplayVideo];
let mut mock = MockLimiter::default().deny(DataCategory::Replay);
let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(limits.is_limited());
assert_eq!(envelope.envelope().len(), 0);
mock.assert_call(DataCategory::Replay, 3);
assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Replay, 3),]);
}
#[test]
fn test_enforce_limit_monitor_checkins() {
let mut envelope = envelope![CheckIn];
let mut mock = MockLimiter::default().deny(DataCategory::Monitor);
let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(limits.is_limited());
assert_eq!(envelope.envelope().len(), 0);
mock.assert_call(DataCategory::Monitor, 1);
assert_eq!(get_outcomes(enforcement), vec![(DataCategory::Monitor, 1)])
}
#[test]
fn test_enforce_pass_minidump() {
let mut envelope = envelope![Attachment::Minidump];
let mut mock = MockLimiter::default().deny(DataCategory::Attachment);
let (_, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(!limits.is_limited());
assert_eq!(envelope.envelope().len(), 1);
mock.assert_call(DataCategory::Error, 1);
mock.assert_call(DataCategory::Attachment, 10);
}
#[test]
fn test_enforce_skip_rate_limited() {
let mut envelope = envelope![];
let mut item = Item::new(ItemType::Attachment);
item.set_payload(ContentType::OctetStream, "0123456789");
item.set_rate_limited(true);
envelope.envelope_mut().add_item(item);
let mut mock = MockLimiter::default().deny(DataCategory::Error);
let (_, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(!limits.is_limited()); assert_eq!(envelope.envelope().len(), 1); }
#[test]
fn test_enforce_pass_sessions() {
let mut envelope = envelope![Session, Session, Session];
let mut mock = MockLimiter::default().deny(DataCategory::Error);
let (_, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(!limits.is_limited());
assert_eq!(envelope.envelope().len(), 3);
mock.assert_call(DataCategory::Session, 3);
}
#[test]
fn test_enforce_limit_sessions() {
let mut envelope = envelope![Session, Session, Event];
let mut mock = MockLimiter::default().deny(DataCategory::Session);
let (_, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(limits.is_limited());
assert_eq!(envelope.envelope().len(), 1);
mock.assert_call(DataCategory::Error, 1);
mock.assert_call(DataCategory::Session, 2);
}
#[test]
#[cfg(feature = "processing")]
fn test_enforce_limit_assumed_event() {
let mut envelope = envelope![];
let mut mock = MockLimiter::default().deny(DataCategory::Transaction);
let (_, limits) =
enforce_and_apply(&mut mock, &mut envelope, Some(DataCategory::Transaction));
assert!(limits.is_limited());
assert!(envelope.envelope().is_empty()); mock.assert_call(DataCategory::Transaction, 1);
}
#[test]
#[cfg(feature = "processing")]
fn test_enforce_limit_assumed_attachments() {
let mut envelope = envelope![Attachment, Attachment];
let mut mock = MockLimiter::default().deny(DataCategory::Error);
let (_, limits) = enforce_and_apply(&mut mock, &mut envelope, Some(DataCategory::Error));
assert!(limits.is_limited());
assert!(envelope.envelope().is_empty());
mock.assert_call(DataCategory::Error, 1);
}
#[test]
fn test_enforce_transaction() {
let mut envelope = envelope![Transaction];
let mut mock = MockLimiter::default().deny(DataCategory::Transaction);
let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(limits.is_limited());
assert!(enforcement.event_indexed.is_active());
assert!(enforcement.event.is_active());
mock.assert_call(DataCategory::Transaction, 1);
assert_eq!(
get_outcomes(enforcement),
vec![
(DataCategory::Transaction, 1),
(DataCategory::TransactionIndexed, 1),
]
);
}
#[test]
fn test_enforce_transaction_non_indexed() {
let mut envelope = envelope![Transaction, Profile];
let scoping = envelope.scoping();
let mut mock = MockLimiter::default().deny(DataCategory::TransactionIndexed);
let limiter = EnvelopeLimiter::new(CheckLimits::NonIndexed, |s, q| mock.check(s, q));
let (enforcement, limits) = limiter.compute(envelope.envelope_mut(), &scoping).unwrap();
enforcement.clone().apply_with_outcomes(&mut envelope);
assert!(!limits.is_limited());
assert!(!enforcement.event_indexed.is_active());
assert!(!enforcement.event.is_active());
assert!(!enforcement.profiles_indexed.is_active());
assert!(!enforcement.profiles.is_active());
mock.assert_call(DataCategory::Transaction, 1);
mock.assert_call(DataCategory::Profile, 1);
}
#[test]
fn test_enforce_transaction_no_indexing_quota() {
let mut envelope = envelope![Transaction];
let mut mock = MockLimiter::default().deny(DataCategory::TransactionIndexed);
let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(limits.is_limited());
assert!(enforcement.event_indexed.is_active());
assert!(!enforcement.event.is_active());
mock.assert_call(DataCategory::Transaction, 1);
mock.assert_call(DataCategory::TransactionIndexed, 1);
}
#[test]
fn test_enforce_transaction_attachment_enforced() {
let mut envelope = envelope![Transaction, Attachment];
let mut mock = MockLimiter::default().deny(DataCategory::Transaction);
let (enforcement, _) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(enforcement.event.is_active());
assert!(enforcement.attachments.is_active());
mock.assert_call(DataCategory::Transaction, 1);
}
fn get_outcomes(enforcement: Enforcement) -> Vec<(DataCategory, usize)> {
enforcement
.get_outcomes()
.map(|(_, data_category, quantity)| (data_category, quantity))
.collect::<Vec<_>>()
}
#[test]
fn test_enforce_transaction_profile_enforced() {
let mut envelope = envelope![Transaction, Profile];
let mut mock = MockLimiter::default().deny(DataCategory::Transaction);
let (enforcement, _) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(enforcement.event.is_active());
assert!(enforcement.profiles.is_active());
mock.assert_call(DataCategory::Transaction, 1);
assert_eq!(
get_outcomes(enforcement),
vec![
(DataCategory::Transaction, 1),
(DataCategory::TransactionIndexed, 1),
(DataCategory::Profile, 1),
(DataCategory::ProfileIndexed, 1),
]
);
}
#[test]
fn test_enforce_transaction_standalone_profile_enforced() {
let mut envelope = envelope![Profile];
let mut mock = MockLimiter::default().deny(DataCategory::Transaction);
let (enforcement, _) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(enforcement.profiles.is_active());
mock.assert_call(DataCategory::Profile, 1);
mock.assert_call(DataCategory::Transaction, 0);
assert_eq!(
get_outcomes(enforcement),
vec![
(DataCategory::Profile, 1),
(DataCategory::ProfileIndexed, 1),
]
);
}
#[test]
fn test_enforce_transaction_attachment_enforced_indexing_quota() {
let mut envelope = envelope![Transaction, Attachment];
set_extracted(envelope.envelope_mut(), ItemType::Transaction);
let mut mock = MockLimiter::default().deny(DataCategory::TransactionIndexed);
let (enforcement, _) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(!enforcement.event.is_active());
assert!(enforcement.event_indexed.is_active());
assert!(enforcement.attachments.is_active());
assert!(enforcement.attachment_items.is_active());
mock.assert_call(DataCategory::Transaction, 1);
mock.assert_call(DataCategory::TransactionIndexed, 1);
assert_eq!(
get_outcomes(enforcement),
vec![
(DataCategory::TransactionIndexed, 1),
(DataCategory::Attachment, 10),
(DataCategory::AttachmentItem, 1)
]
);
}
#[test]
fn test_enforce_span() {
let mut envelope = envelope![Span, OtelSpan];
let mut mock = MockLimiter::default().deny(DataCategory::Span);
let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(limits.is_limited());
assert!(enforcement.spans_indexed.is_active());
assert!(enforcement.spans.is_active());
mock.assert_call(DataCategory::Span, 2);
assert_eq!(
get_outcomes(enforcement),
vec![(DataCategory::Span, 2), (DataCategory::SpanIndexed, 2)]
);
}
#[test]
fn test_enforce_span_no_indexing_quota() {
let mut envelope = envelope![OtelSpan, Span];
let mut mock = MockLimiter::default().deny(DataCategory::SpanIndexed);
let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(limits.is_limited());
assert!(enforcement.spans_indexed.is_active());
assert!(!enforcement.spans.is_active());
mock.assert_call(DataCategory::Span, 2);
mock.assert_call(DataCategory::SpanIndexed, 2);
assert_eq!(
get_outcomes(enforcement),
vec![(DataCategory::SpanIndexed, 2)]
);
}
#[test]
fn test_enforce_span_metrics_extracted_no_indexing_quota() {
let mut envelope = envelope![Span, OtelSpan];
set_extracted(envelope.envelope_mut(), ItemType::Span);
let mut mock = MockLimiter::default().deny(DataCategory::SpanIndexed);
let (enforcement, limits) = enforce_and_apply(&mut mock, &mut envelope, None);
assert!(limits.is_limited());
assert!(enforcement.spans_indexed.is_active());
assert!(!enforcement.spans.is_active());
mock.assert_call(DataCategory::Span, 2);
mock.assert_call(DataCategory::SpanIndexed, 2);
assert_eq!(
get_outcomes(enforcement),
vec![(DataCategory::SpanIndexed, 2)]
);
}
#[test]
fn test_source_quantity_for_total_quantity() {
let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
.parse()
.unwrap();
let request_meta = RequestMeta::new(dsn);
let mut envelope = Envelope::from_request(None, request_meta);
let mut item = Item::new(ItemType::MetricBuckets);
item.set_source_quantities(SourceQuantities {
transactions: 5,
spans: 0,
profiles: 2,
buckets: 5,
});
envelope.add_item(item);
let mut item = Item::new(ItemType::MetricBuckets);
item.set_source_quantities(SourceQuantities {
transactions: 2,
spans: 0,
profiles: 0,
buckets: 3,
});
envelope.add_item(item);
let summary = EnvelopeSummary::compute(&envelope);
assert_eq!(summary.profile_quantity, 2);
assert_eq!(summary.secondary_transaction_quantity, 7);
}
}