1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27 ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42 self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::{Integration, SpansIntegration};
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::ExtractedMetrics;
49use crate::metrics_extraction::transactions::types::ExtractMetricsError;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::profile_chunks::ProfileChunksProcessor;
53use crate::processing::sessions::SessionsProcessor;
54use crate::processing::spans::SpansProcessor;
55use crate::processing::trace_attachments::TraceAttachmentsProcessor;
56use crate::processing::trace_metrics::TraceMetricsProcessor;
57use crate::processing::transactions::TransactionProcessor;
58use crate::processing::utils::event::{
59 EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
60 event_type,
61};
62use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
63use crate::service::ServiceError;
64use crate::services::global_config::GlobalConfigHandle;
65use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
66use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
67use crate::services::projects::cache::ProjectCacheHandle;
68use crate::services::projects::project::{ProjectInfo, ProjectState};
69use crate::services::upstream::{
70 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
71};
72use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
73use crate::utils::{self, CheckLimits, EnvelopeLimiter};
74use crate::{http, processing};
75use relay_threading::AsyncPool;
76#[cfg(feature = "processing")]
77use {
78 crate::services::processor::nnswitch::SwitchProcessingError,
79 crate::services::store::{Store, StoreEnvelope},
80 crate::services::upload::Upload,
81 crate::utils::Enforcement,
82 itertools::Itertools,
83 relay_cardinality::{
84 CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
85 RedisSetLimiterOptions,
86 },
87 relay_dynamic_config::CardinalityLimiterMode,
88 relay_quotas::{RateLimitingError, RedisRateLimiter},
89 relay_redis::RedisClients,
90 std::time::Instant,
91 symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
92};
93
94mod attachment;
95mod dynamic_sampling;
96mod event;
97mod metrics;
98mod nel;
99mod profile;
100mod replay;
101mod report;
102mod span;
103
104#[cfg(all(sentry, feature = "processing"))]
105mod playstation;
106mod standalone;
107#[cfg(feature = "processing")]
108mod unreal;
109
110#[cfg(feature = "processing")]
111mod nnswitch;
112
113macro_rules! if_processing {
117 ($config:expr, $if_true:block) => {
118 #[cfg(feature = "processing")] {
119 if $config.processing_enabled() $if_true
120 }
121 };
122 ($config:expr, $if_true:block else $if_false:block) => {
123 {
124 #[cfg(feature = "processing")] {
125 if $config.processing_enabled() $if_true else $if_false
126 }
127 #[cfg(not(feature = "processing"))] {
128 $if_false
129 }
130 }
131 };
132}
133
134pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
136
137#[derive(Debug)]
138pub struct GroupTypeError;
139
140impl Display for GroupTypeError {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 f.write_str("failed to convert processing group into corresponding type")
143 }
144}
145
146impl std::error::Error for GroupTypeError {}
147
148macro_rules! processing_group {
149 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
150 #[derive(Clone, Copy, Debug)]
151 pub struct $ty;
152
153 impl From<$ty> for ProcessingGroup {
154 fn from(_: $ty) -> Self {
155 ProcessingGroup::$variant
156 }
157 }
158
159 impl TryFrom<ProcessingGroup> for $ty {
160 type Error = GroupTypeError;
161
162 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
163 if matches!(value, ProcessingGroup::$variant) {
164 return Ok($ty);
165 }
166 $($(
167 if matches!(value, ProcessingGroup::$other) {
168 return Ok($ty);
169 }
170 )+)?
171 return Err(GroupTypeError);
172 }
173 }
174 };
175}
176
177pub trait EventProcessing {}
181
182processing_group!(TransactionGroup, Transaction);
183impl EventProcessing for TransactionGroup {}
184
185processing_group!(ErrorGroup, Error);
186impl EventProcessing for ErrorGroup {}
187
188processing_group!(SessionGroup, Session);
189processing_group!(StandaloneGroup, Standalone);
190processing_group!(ClientReportGroup, ClientReport);
191processing_group!(ReplayGroup, Replay);
192processing_group!(CheckInGroup, CheckIn);
193processing_group!(LogGroup, Log, Nel);
194processing_group!(TraceMetricGroup, TraceMetric);
195processing_group!(SpanGroup, Span);
196
197processing_group!(ProfileChunkGroup, ProfileChunk);
198processing_group!(MetricsGroup, Metrics);
199processing_group!(ForwardUnknownGroup, ForwardUnknown);
200processing_group!(Ungrouped, Ungrouped);
201
202#[derive(Clone, Copy, Debug)]
206pub struct Processed;
207
208#[derive(Clone, Copy, Debug)]
210pub enum ProcessingGroup {
211 Transaction,
215 Error,
220 Session,
222 Standalone,
225 ClientReport,
227 Replay,
229 CheckIn,
231 Nel,
233 Log,
235 TraceMetric,
237 Span,
239 SpanV2,
241 Metrics,
243 ProfileChunk,
245 TraceAttachment,
247 ForwardUnknown,
250 Ungrouped,
252}
253
254impl ProcessingGroup {
255 fn split_envelope(
257 mut envelope: Envelope,
258 project_info: &ProjectInfo,
259 ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
260 let headers = envelope.headers().clone();
261 let mut grouped_envelopes = smallvec![];
262
263 let replay_items = envelope.take_items_by(|item| {
265 matches!(
266 item.ty(),
267 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
268 )
269 });
270 if !replay_items.is_empty() {
271 grouped_envelopes.push((
272 ProcessingGroup::Replay,
273 Envelope::from_parts(headers.clone(), replay_items),
274 ))
275 }
276
277 let session_items = envelope
279 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
280 if !session_items.is_empty() {
281 grouped_envelopes.push((
282 ProcessingGroup::Session,
283 Envelope::from_parts(headers.clone(), session_items),
284 ))
285 }
286
287 let span_v2_items = envelope.take_items_by(|item| {
288 let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
289 let otlp_feature = project_info.has_feature(Feature::SpanV2OtlpProcessing);
290 let is_supported_integration = {
291 matches!(
292 item.integration(),
293 Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
294 )
295 };
296 let is_span = matches!(item.ty(), &ItemType::Span);
297 let is_span_attachment = item.is_span_attachment();
298
299 ItemContainer::<SpanV2>::is_container(item)
300 || (exp_feature && is_span)
301 || ((exp_feature || otlp_feature) && is_supported_integration)
302 || (exp_feature && is_span_attachment)
303 });
304
305 if !span_v2_items.is_empty() {
306 grouped_envelopes.push((
307 ProcessingGroup::SpanV2,
308 Envelope::from_parts(headers.clone(), span_v2_items),
309 ))
310 }
311
312 let span_items = envelope.take_items_by(|item| {
314 matches!(item.ty(), &ItemType::Span)
315 || matches!(item.integration(), Some(Integration::Spans(_)))
316 });
317
318 if !span_items.is_empty() {
319 grouped_envelopes.push((
320 ProcessingGroup::Span,
321 Envelope::from_parts(headers.clone(), span_items),
322 ))
323 }
324
325 let logs_items = envelope.take_items_by(|item| {
327 matches!(item.ty(), &ItemType::Log)
328 || matches!(item.integration(), Some(Integration::Logs(_)))
329 });
330 if !logs_items.is_empty() {
331 grouped_envelopes.push((
332 ProcessingGroup::Log,
333 Envelope::from_parts(headers.clone(), logs_items),
334 ))
335 }
336
337 let trace_metric_items =
339 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
340 if !trace_metric_items.is_empty() {
341 grouped_envelopes.push((
342 ProcessingGroup::TraceMetric,
343 Envelope::from_parts(headers.clone(), trace_metric_items),
344 ))
345 }
346
347 let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
349 if !nel_items.is_empty() {
350 grouped_envelopes.push((
351 ProcessingGroup::Nel,
352 Envelope::from_parts(headers.clone(), nel_items),
353 ))
354 }
355
356 let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
361 if !metric_items.is_empty() {
362 grouped_envelopes.push((
363 ProcessingGroup::Metrics,
364 Envelope::from_parts(headers.clone(), metric_items),
365 ))
366 }
367
368 let profile_chunk_items =
370 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
371 if !profile_chunk_items.is_empty() {
372 grouped_envelopes.push((
373 ProcessingGroup::ProfileChunk,
374 Envelope::from_parts(headers.clone(), profile_chunk_items),
375 ))
376 }
377
378 let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
379 if !trace_attachment_items.is_empty() {
380 grouped_envelopes.push((
381 ProcessingGroup::TraceAttachment,
382 Envelope::from_parts(headers.clone(), trace_attachment_items),
383 ))
384 }
385
386 if !envelope.items().any(Item::creates_event) {
391 let standalone_items = envelope.take_items_by(Item::requires_event);
392 if !standalone_items.is_empty() {
393 grouped_envelopes.push((
394 ProcessingGroup::Standalone,
395 Envelope::from_parts(headers.clone(), standalone_items),
396 ))
397 }
398 };
399
400 let security_reports_items = envelope
402 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
403 .into_iter()
404 .map(|item| {
405 let headers = headers.clone();
406 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
407 let mut envelope = Envelope::from_parts(headers, items);
408 envelope.set_event_id(EventId::new());
409 (ProcessingGroup::Error, envelope)
410 });
411 grouped_envelopes.extend(security_reports_items);
412
413 let require_event_items = envelope.take_items_by(Item::requires_event);
415 if !require_event_items.is_empty() {
416 let group = if require_event_items
417 .iter()
418 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
419 {
420 ProcessingGroup::Transaction
421 } else {
422 ProcessingGroup::Error
423 };
424
425 grouped_envelopes.push((
426 group,
427 Envelope::from_parts(headers.clone(), require_event_items),
428 ))
429 }
430
431 let envelopes = envelope.items_mut().map(|item| {
433 let headers = headers.clone();
434 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
435 let envelope = Envelope::from_parts(headers, items);
436 let item_type = item.ty();
437 let group = if matches!(item_type, &ItemType::CheckIn) {
438 ProcessingGroup::CheckIn
439 } else if matches!(item.ty(), &ItemType::ClientReport) {
440 ProcessingGroup::ClientReport
441 } else if matches!(item_type, &ItemType::Unknown(_)) {
442 ProcessingGroup::ForwardUnknown
443 } else {
444 ProcessingGroup::Ungrouped
446 };
447
448 (group, envelope)
449 });
450 grouped_envelopes.extend(envelopes);
451
452 grouped_envelopes
453 }
454
455 pub fn variant(&self) -> &'static str {
457 match self {
458 ProcessingGroup::Transaction => "transaction",
459 ProcessingGroup::Error => "error",
460 ProcessingGroup::Session => "session",
461 ProcessingGroup::Standalone => "standalone",
462 ProcessingGroup::ClientReport => "client_report",
463 ProcessingGroup::Replay => "replay",
464 ProcessingGroup::CheckIn => "check_in",
465 ProcessingGroup::Log => "log",
466 ProcessingGroup::TraceMetric => "trace_metric",
467 ProcessingGroup::Nel => "nel",
468 ProcessingGroup::Span => "span",
469 ProcessingGroup::SpanV2 => "span_v2",
470 ProcessingGroup::Metrics => "metrics",
471 ProcessingGroup::ProfileChunk => "profile_chunk",
472 ProcessingGroup::TraceAttachment => "trace_attachment",
473 ProcessingGroup::ForwardUnknown => "forward_unknown",
474 ProcessingGroup::Ungrouped => "ungrouped",
475 }
476 }
477}
478
479impl From<ProcessingGroup> for AppFeature {
480 fn from(value: ProcessingGroup) -> Self {
481 match value {
482 ProcessingGroup::Transaction => AppFeature::Transactions,
483 ProcessingGroup::Error => AppFeature::Errors,
484 ProcessingGroup::Session => AppFeature::Sessions,
485 ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
486 ProcessingGroup::ClientReport => AppFeature::ClientReports,
487 ProcessingGroup::Replay => AppFeature::Replays,
488 ProcessingGroup::CheckIn => AppFeature::CheckIns,
489 ProcessingGroup::Log => AppFeature::Logs,
490 ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
491 ProcessingGroup::Nel => AppFeature::Logs,
492 ProcessingGroup::Span => AppFeature::Spans,
493 ProcessingGroup::SpanV2 => AppFeature::Spans,
494 ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
495 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
496 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
497 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
498 ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
499 }
500 }
501}
502
503#[derive(Debug, thiserror::Error)]
505pub enum ProcessingError {
506 #[error("invalid json in event")]
507 InvalidJson(#[source] serde_json::Error),
508
509 #[error("invalid message pack event payload")]
510 InvalidMsgpack(#[from] rmp_serde::decode::Error),
511
512 #[cfg(feature = "processing")]
513 #[error("invalid unreal crash report")]
514 InvalidUnrealReport(#[source] Unreal4Error),
515
516 #[error("event payload too large")]
517 PayloadTooLarge(DiscardItemType),
518
519 #[error("invalid transaction event")]
520 InvalidTransaction,
521
522 #[error("envelope processor failed")]
523 ProcessingFailed(#[from] ProcessingAction),
524
525 #[error("duplicate {0} in event")]
526 DuplicateItem(ItemType),
527
528 #[error("failed to extract event payload")]
529 NoEventPayload,
530
531 #[error("missing project id in DSN")]
532 MissingProjectId,
533
534 #[error("invalid security report type: {0:?}")]
535 InvalidSecurityType(Bytes),
536
537 #[error("unsupported security report type")]
538 UnsupportedSecurityType,
539
540 #[error("invalid security report")]
541 InvalidSecurityReport(#[source] serde_json::Error),
542
543 #[error("invalid nel report")]
544 InvalidNelReport(#[source] NetworkReportError),
545
546 #[error("event filtered with reason: {0:?}")]
547 EventFiltered(FilterStatKey),
548
549 #[error("missing or invalid required event timestamp")]
550 InvalidTimestamp,
551
552 #[error("could not serialize event payload")]
553 SerializeFailed(#[source] serde_json::Error),
554
555 #[cfg(feature = "processing")]
556 #[error("failed to apply quotas")]
557 QuotasFailed(#[from] RateLimitingError),
558
559 #[error("invalid pii config")]
560 PiiConfigError(PiiConfigError),
561
562 #[error("invalid processing group type")]
563 InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
564
565 #[error("invalid replay")]
566 InvalidReplay(DiscardReason),
567
568 #[error("replay filtered with reason: {0:?}")]
569 ReplayFiltered(FilterStatKey),
570
571 #[cfg(feature = "processing")]
572 #[error("nintendo switch dying message processing failed {0:?}")]
573 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
574
575 #[cfg(all(sentry, feature = "processing"))]
576 #[error("playstation dump processing failed: {0}")]
577 InvalidPlaystationDump(String),
578
579 #[error("processing group does not match specific processor")]
580 ProcessingGroupMismatch,
581 #[error("new processing pipeline failed")]
582 ProcessingFailure,
583}
584
585impl ProcessingError {
586 pub fn to_outcome(&self) -> Option<Outcome> {
587 match self {
588 Self::PayloadTooLarge(payload_type) => {
589 Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
590 }
591 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
592 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
593 Self::InvalidSecurityType(_) => {
594 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
595 }
596 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
597 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
598 Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
599 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
600 Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
601 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
602 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
603 #[cfg(feature = "processing")]
604 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
605 #[cfg(all(sentry, feature = "processing"))]
606 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
607 #[cfg(feature = "processing")]
608 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
609 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
610 }
611 #[cfg(feature = "processing")]
612 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
613 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
614 Some(Outcome::Invalid(DiscardReason::Internal))
615 }
616 #[cfg(feature = "processing")]
617 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
618 Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
619 Self::MissingProjectId => None,
620 Self::EventFiltered(_) => None,
621 Self::InvalidProcessingGroup(_) => None,
622 Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
623 Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
624
625 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
626 Self::ProcessingFailure => None,
628 }
629 }
630
631 fn is_unexpected(&self) -> bool {
632 self.to_outcome()
633 .is_some_and(|outcome| outcome.is_unexpected())
634 }
635}
636
637#[cfg(feature = "processing")]
638impl From<Unreal4Error> for ProcessingError {
639 fn from(err: Unreal4Error) -> Self {
640 match err.kind() {
641 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
642 _ => ProcessingError::InvalidUnrealReport(err),
643 }
644 }
645}
646
647impl From<ExtractMetricsError> for ProcessingError {
648 fn from(error: ExtractMetricsError) -> Self {
649 match error {
650 ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
651 Self::InvalidTimestamp
652 }
653 }
654 }
655}
656
657impl From<InvalidProcessingGroupType> for ProcessingError {
658 fn from(value: InvalidProcessingGroupType) -> Self {
659 Self::InvalidProcessingGroup(Box::new(value))
660 }
661}
662
663type ExtractedEvent = (Annotated<Event>, usize);
664
665#[derive(Debug)]
670pub struct ProcessingExtractedMetrics {
671 metrics: ExtractedMetrics,
672}
673
674impl ProcessingExtractedMetrics {
675 pub fn new() -> Self {
676 Self {
677 metrics: ExtractedMetrics::default(),
678 }
679 }
680
681 pub fn into_inner(self) -> ExtractedMetrics {
682 self.metrics
683 }
684
685 pub fn extend(
687 &mut self,
688 extracted: ExtractedMetrics,
689 sampling_decision: Option<SamplingDecision>,
690 ) {
691 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
692 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
693 }
694
695 pub fn extend_project_metrics<I>(
697 &mut self,
698 buckets: I,
699 sampling_decision: Option<SamplingDecision>,
700 ) where
701 I: IntoIterator<Item = Bucket>,
702 {
703 self.metrics
704 .project_metrics
705 .extend(buckets.into_iter().map(|mut bucket| {
706 bucket.metadata.extracted_from_indexed =
707 sampling_decision == Some(SamplingDecision::Keep);
708 bucket
709 }));
710 }
711
712 pub fn extend_sampling_metrics<I>(
714 &mut self,
715 buckets: I,
716 sampling_decision: Option<SamplingDecision>,
717 ) where
718 I: IntoIterator<Item = Bucket>,
719 {
720 self.metrics
721 .sampling_metrics
722 .extend(buckets.into_iter().map(|mut bucket| {
723 bucket.metadata.extracted_from_indexed =
724 sampling_decision == Some(SamplingDecision::Keep);
725 bucket
726 }));
727 }
728
729 #[cfg(feature = "processing")]
734 fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
735 let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
737 let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
740
741 for (namespace, limit, indexed) in [
742 (
743 MetricNamespace::Transactions,
744 &enforcement.event,
745 &enforcement.event_indexed,
746 ),
747 (
748 MetricNamespace::Spans,
749 &enforcement.spans,
750 &enforcement.spans_indexed,
751 ),
752 ] {
753 if limit.is_active() {
754 drop_namespaces.push(namespace);
755 } else if indexed.is_active() && !enforced_consistently {
756 reset_extracted_from_indexed.push(namespace);
761 }
762 }
763
764 if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
765 self.retain_mut(|bucket| {
766 let Some(namespace) = bucket.name.try_namespace() else {
767 return true;
768 };
769
770 if drop_namespaces.contains(&namespace) {
771 return false;
772 }
773
774 if reset_extracted_from_indexed.contains(&namespace) {
775 bucket.metadata.extracted_from_indexed = false;
776 }
777
778 true
779 });
780 }
781 }
782
783 #[cfg(feature = "processing")]
784 fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
785 self.metrics.project_metrics.retain_mut(&mut f);
786 self.metrics.sampling_metrics.retain_mut(&mut f);
787 }
788}
789
790fn send_metrics(
791 metrics: ExtractedMetrics,
792 project_key: ProjectKey,
793 sampling_key: Option<ProjectKey>,
794 aggregator: &Addr<Aggregator>,
795) {
796 let ExtractedMetrics {
797 project_metrics,
798 sampling_metrics,
799 } = metrics;
800
801 if !project_metrics.is_empty() {
802 aggregator.send(MergeBuckets {
803 project_key,
804 buckets: project_metrics,
805 });
806 }
807
808 if !sampling_metrics.is_empty() {
809 let sampling_project_key = sampling_key.unwrap_or(project_key);
816 aggregator.send(MergeBuckets {
817 project_key: sampling_project_key,
818 buckets: sampling_metrics,
819 });
820 }
821}
822
823fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
828 match config.relay_mode() {
829 RelayMode::Proxy => false,
830 RelayMode::Managed => !project_info.has_feature(feature),
831 }
832}
833
834#[derive(Debug)]
837#[expect(
838 clippy::large_enum_variant,
839 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
840)]
841enum ProcessingResult {
842 Envelope {
843 managed_envelope: TypedEnvelope<Processed>,
844 extracted_metrics: ProcessingExtractedMetrics,
845 },
846 Output(Output<Outputs>),
847}
848
849impl ProcessingResult {
850 fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
852 Self::Envelope {
853 managed_envelope,
854 extracted_metrics: ProcessingExtractedMetrics::new(),
855 }
856 }
857}
858
859#[derive(Debug)]
861#[expect(
862 clippy::large_enum_variant,
863 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
864)]
865enum Submit<'a> {
866 Envelope(TypedEnvelope<Processed>),
868 Output {
870 output: Outputs,
871 ctx: processing::ForwardContext<'a>,
872 },
873}
874
875#[derive(Debug)]
885pub struct ProcessEnvelope {
886 pub envelope: ManagedEnvelope,
888 pub project_info: Arc<ProjectInfo>,
890 pub rate_limits: Arc<RateLimits>,
892 pub sampling_project_info: Option<Arc<ProjectInfo>>,
894 pub reservoir_counters: ReservoirCounters,
896}
897
898#[derive(Debug)]
900struct ProcessEnvelopeGrouped<'a> {
901 pub group: ProcessingGroup,
903 pub envelope: ManagedEnvelope,
905 pub ctx: processing::Context<'a>,
907}
908
909#[derive(Debug)]
921pub struct ProcessMetrics {
922 pub data: MetricData,
924 pub project_key: ProjectKey,
926 pub source: BucketSource,
928 pub received_at: DateTime<Utc>,
930 pub sent_at: Option<DateTime<Utc>>,
933}
934
935#[derive(Debug)]
937pub enum MetricData {
938 Raw(Vec<Item>),
940 Parsed(Vec<Bucket>),
942}
943
944impl MetricData {
945 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
950 let items = match self {
951 Self::Parsed(buckets) => return buckets,
952 Self::Raw(items) => items,
953 };
954
955 let mut buckets = Vec::new();
956 for item in items {
957 let payload = item.payload();
958 if item.ty() == &ItemType::Statsd {
959 for bucket_result in Bucket::parse_all(&payload, timestamp) {
960 match bucket_result {
961 Ok(bucket) => buckets.push(bucket),
962 Err(error) => relay_log::debug!(
963 error = &error as &dyn Error,
964 "failed to parse metric bucket from statsd format",
965 ),
966 }
967 }
968 } else if item.ty() == &ItemType::MetricBuckets {
969 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
970 Ok(parsed_buckets) => {
971 if buckets.is_empty() {
973 buckets = parsed_buckets;
974 } else {
975 buckets.extend(parsed_buckets);
976 }
977 }
978 Err(error) => {
979 relay_log::debug!(
980 error = &error as &dyn Error,
981 "failed to parse metric bucket",
982 );
983 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
984 }
985 }
986 } else {
987 relay_log::error!(
988 "invalid item of type {} passed to ProcessMetrics",
989 item.ty()
990 );
991 }
992 }
993 buckets
994 }
995}
996
997#[derive(Debug)]
998pub struct ProcessBatchedMetrics {
999 pub payload: Bytes,
1001 pub source: BucketSource,
1003 pub received_at: DateTime<Utc>,
1005 pub sent_at: Option<DateTime<Utc>>,
1007}
1008
1009#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1011pub enum BucketSource {
1012 Internal,
1018 External,
1023}
1024
1025impl BucketSource {
1026 pub fn from_meta(meta: &RequestMeta) -> Self {
1028 match meta.request_trust() {
1029 RequestTrust::Trusted => Self::Internal,
1030 RequestTrust::Untrusted => Self::External,
1031 }
1032 }
1033}
1034
1035#[derive(Debug)]
1037pub struct SubmitClientReports {
1038 pub client_reports: Vec<ClientReport>,
1040 pub scoping: Scoping,
1042}
1043
1044#[derive(Debug)]
1046pub enum EnvelopeProcessor {
1047 ProcessEnvelope(Box<ProcessEnvelope>),
1048 ProcessProjectMetrics(Box<ProcessMetrics>),
1049 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1050 FlushBuckets(Box<FlushBuckets>),
1051 SubmitClientReports(Box<SubmitClientReports>),
1052}
1053
1054impl EnvelopeProcessor {
1055 pub fn variant(&self) -> &'static str {
1057 match self {
1058 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1059 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1060 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1061 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1062 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1063 }
1064 }
1065}
1066
1067impl relay_system::Interface for EnvelopeProcessor {}
1068
1069impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1070 type Response = relay_system::NoResponse;
1071
1072 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1073 Self::ProcessEnvelope(Box::new(message))
1074 }
1075}
1076
1077impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1078 type Response = NoResponse;
1079
1080 fn from_message(message: ProcessMetrics, _: ()) -> Self {
1081 Self::ProcessProjectMetrics(Box::new(message))
1082 }
1083}
1084
1085impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1086 type Response = NoResponse;
1087
1088 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1089 Self::ProcessBatchedMetrics(Box::new(message))
1090 }
1091}
1092
1093impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1094 type Response = NoResponse;
1095
1096 fn from_message(message: FlushBuckets, _: ()) -> Self {
1097 Self::FlushBuckets(Box::new(message))
1098 }
1099}
1100
1101impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1102 type Response = NoResponse;
1103
1104 fn from_message(message: SubmitClientReports, _: ()) -> Self {
1105 Self::SubmitClientReports(Box::new(message))
1106 }
1107}
1108
1109pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1111
1112#[derive(Clone)]
1116pub struct EnvelopeProcessorService {
1117 inner: Arc<InnerProcessor>,
1118}
1119
1120pub struct Addrs {
1122 pub outcome_aggregator: Addr<TrackOutcome>,
1123 pub upstream_relay: Addr<UpstreamRelay>,
1124 #[cfg(feature = "processing")]
1125 pub upload: Option<Addr<Upload>>,
1126 #[cfg(feature = "processing")]
1127 pub store_forwarder: Option<Addr<Store>>,
1128 pub aggregator: Addr<Aggregator>,
1129}
1130
1131impl Default for Addrs {
1132 fn default() -> Self {
1133 Addrs {
1134 outcome_aggregator: Addr::dummy(),
1135 upstream_relay: Addr::dummy(),
1136 #[cfg(feature = "processing")]
1137 upload: None,
1138 #[cfg(feature = "processing")]
1139 store_forwarder: None,
1140 aggregator: Addr::dummy(),
1141 }
1142 }
1143}
1144
1145struct InnerProcessor {
1146 pool: EnvelopeProcessorServicePool,
1147 config: Arc<Config>,
1148 global_config: GlobalConfigHandle,
1149 project_cache: ProjectCacheHandle,
1150 cogs: Cogs,
1151 addrs: Addrs,
1152 #[cfg(feature = "processing")]
1153 rate_limiter: Option<Arc<RedisRateLimiter>>,
1154 geoip_lookup: GeoIpLookup,
1155 #[cfg(feature = "processing")]
1156 cardinality_limiter: Option<CardinalityLimiter>,
1157 metric_outcomes: MetricOutcomes,
1158 processing: Processing,
1159}
1160
1161struct Processing {
1162 logs: LogsProcessor,
1163 trace_metrics: TraceMetricsProcessor,
1164 spans: SpansProcessor,
1165 check_ins: CheckInsProcessor,
1166 sessions: SessionsProcessor,
1167 transactions: TransactionProcessor,
1168 profile_chunks: ProfileChunksProcessor,
1169 trace_attachments: TraceAttachmentsProcessor,
1170}
1171
1172impl EnvelopeProcessorService {
1173 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1175 pub fn new(
1176 pool: EnvelopeProcessorServicePool,
1177 config: Arc<Config>,
1178 global_config: GlobalConfigHandle,
1179 project_cache: ProjectCacheHandle,
1180 cogs: Cogs,
1181 #[cfg(feature = "processing")] redis: Option<RedisClients>,
1182 addrs: Addrs,
1183 metric_outcomes: MetricOutcomes,
1184 ) -> Self {
1185 let geoip_lookup = config
1186 .geoip_path()
1187 .and_then(
1188 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1189 Ok(geoip) => Some(geoip),
1190 Err(err) => {
1191 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1192 None
1193 }
1194 },
1195 )
1196 .unwrap_or_else(GeoIpLookup::empty);
1197
1198 #[cfg(feature = "processing")]
1199 let (cardinality, quotas) = match redis {
1200 Some(RedisClients {
1201 cardinality,
1202 quotas,
1203 ..
1204 }) => (Some(cardinality), Some(quotas)),
1205 None => (None, None),
1206 };
1207 #[cfg(not(feature = "processing"))]
1208 let quotas = None;
1209
1210 #[cfg(feature = "processing")]
1211 let rate_limiter = quotas.clone().map(|redis| {
1212 RedisRateLimiter::new(redis)
1213 .max_limit(config.max_rate_limit())
1214 .cache(config.quota_cache_ratio(), config.quota_cache_max())
1215 });
1216
1217 let quota_limiter = Arc::new(QuotaRateLimiter::new(
1218 #[cfg(feature = "processing")]
1219 project_cache.clone(),
1220 #[cfg(feature = "processing")]
1221 rate_limiter.clone(),
1222 ));
1223 #[cfg(feature = "processing")]
1224 let rate_limiter = rate_limiter.map(Arc::new);
1225
1226 let inner = InnerProcessor {
1227 pool,
1228 global_config,
1229 project_cache,
1230 cogs,
1231 #[cfg(feature = "processing")]
1232 rate_limiter,
1233 addrs,
1234 #[cfg(feature = "processing")]
1235 cardinality_limiter: cardinality
1236 .map(|cardinality| {
1237 RedisSetLimiter::new(
1238 RedisSetLimiterOptions {
1239 cache_vacuum_interval: config
1240 .cardinality_limiter_cache_vacuum_interval(),
1241 },
1242 cardinality,
1243 )
1244 })
1245 .map(CardinalityLimiter::new),
1246 metric_outcomes,
1247 processing: Processing {
1248 logs: LogsProcessor::new(Arc::clone("a_limiter)),
1249 trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)),
1250 spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1251 check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)),
1252 sessions: SessionsProcessor::new(Arc::clone("a_limiter)),
1253 transactions: TransactionProcessor::new(
1254 Arc::clone("a_limiter),
1255 geoip_lookup.clone(),
1256 quotas.clone(),
1257 ),
1258 profile_chunks: ProfileChunksProcessor::new(Arc::clone("a_limiter)),
1259 trace_attachments: TraceAttachmentsProcessor::new(quota_limiter),
1260 },
1261 geoip_lookup,
1262 config,
1263 };
1264
1265 Self {
1266 inner: Arc::new(inner),
1267 }
1268 }
1269
1270 async fn enforce_quotas<Group>(
1271 &self,
1272 managed_envelope: &mut TypedEnvelope<Group>,
1273 event: Annotated<Event>,
1274 extracted_metrics: &mut ProcessingExtractedMetrics,
1275 ctx: processing::Context<'_>,
1276 ) -> Result<Annotated<Event>, ProcessingError> {
1277 let cached_result = RateLimiter::Cached
1280 .enforce(managed_envelope, event, extracted_metrics, ctx)
1281 .await?;
1282
1283 if_processing!(self.inner.config, {
1284 let rate_limiter = match self.inner.rate_limiter.clone() {
1285 Some(rate_limiter) => rate_limiter,
1286 None => return Ok(cached_result.event),
1287 };
1288
1289 let consistent_result = RateLimiter::Consistent(rate_limiter)
1291 .enforce(
1292 managed_envelope,
1293 cached_result.event,
1294 extracted_metrics,
1295 ctx
1296 )
1297 .await?;
1298
1299 if !consistent_result.rate_limits.is_empty() {
1301 self.inner
1302 .project_cache
1303 .get(managed_envelope.scoping().project_key)
1304 .rate_limits()
1305 .merge(consistent_result.rate_limits);
1306 }
1307
1308 Ok(consistent_result.event)
1309 } else { Ok(cached_result.event) })
1310 }
1311
1312 async fn process_errors(
1314 &self,
1315 managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1316 project_id: ProjectId,
1317 mut ctx: processing::Context<'_>,
1318 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1319 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1320 let mut metrics = Metrics::default();
1321 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1322
1323 report::process_user_reports(managed_envelope);
1325
1326 if_processing!(self.inner.config, {
1327 unreal::expand(managed_envelope, &self.inner.config)?;
1328 #[cfg(sentry)]
1329 playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1330 nnswitch::expand(managed_envelope)?;
1331 });
1332
1333 let mut event = event::extract(
1334 managed_envelope,
1335 &mut metrics,
1336 event_fully_normalized,
1337 &self.inner.config,
1338 )?;
1339
1340 if_processing!(self.inner.config, {
1341 if let Some(inner_event_fully_normalized) =
1342 unreal::process(managed_envelope, &mut event)?
1343 {
1344 event_fully_normalized = inner_event_fully_normalized;
1345 }
1346 #[cfg(sentry)]
1347 if let Some(inner_event_fully_normalized) =
1348 playstation::process(managed_envelope, &mut event, ctx.project_info)?
1349 {
1350 event_fully_normalized = inner_event_fully_normalized;
1351 }
1352 if let Some(inner_event_fully_normalized) =
1353 attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1354 {
1355 event_fully_normalized = inner_event_fully_normalized;
1356 }
1357 });
1358
1359 ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1360 managed_envelope,
1361 &mut event,
1362 ctx.project_info,
1363 ctx.sampling_project_info,
1364 );
1365
1366 let attachments = managed_envelope
1367 .envelope()
1368 .items()
1369 .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1370 processing::utils::event::finalize(
1371 managed_envelope.envelope().headers(),
1372 &mut event,
1373 attachments,
1374 &mut metrics,
1375 ctx.config,
1376 )?;
1377 event_fully_normalized = processing::utils::event::normalize(
1378 managed_envelope.envelope().headers(),
1379 &mut event,
1380 event_fully_normalized,
1381 project_id,
1382 ctx,
1383 &self.inner.geoip_lookup,
1384 )?;
1385 let filter_run =
1386 processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1387 .map_err(|err| {
1388 managed_envelope.reject(Outcome::Filtered(err.clone()));
1389 ProcessingError::EventFiltered(err)
1390 })?;
1391
1392 if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1393 dynamic_sampling::tag_error_with_sampling_decision(
1394 managed_envelope,
1395 &mut event,
1396 ctx.sampling_project_info,
1397 &self.inner.config,
1398 )
1399 .await;
1400 }
1401
1402 event = self
1403 .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1404 .await?;
1405
1406 if event.value().is_some() {
1407 processing::utils::event::scrub(&mut event, ctx.project_info)?;
1408 event::serialize(
1409 managed_envelope,
1410 &mut event,
1411 event_fully_normalized,
1412 EventMetricsExtracted(false),
1413 SpansExtracted(false),
1414 )?;
1415 event::emit_feedback_metrics(managed_envelope.envelope());
1416 }
1417
1418 let attachments = managed_envelope
1419 .envelope_mut()
1420 .items_mut()
1421 .filter(|i| i.ty() == &ItemType::Attachment);
1422 processing::utils::attachments::scrub(attachments, ctx.project_info);
1423
1424 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1425 relay_log::error!(
1426 tags.project = %project_id,
1427 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1428 "ingested event without normalizing"
1429 );
1430 }
1431
1432 Ok(Some(extracted_metrics))
1433 }
1434
1435 async fn process_standalone(
1437 &self,
1438 managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1439 project_id: ProjectId,
1440 ctx: processing::Context<'_>,
1441 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1442 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1443
1444 standalone::process(managed_envelope);
1445
1446 profile::filter(
1447 managed_envelope,
1448 &Annotated::empty(),
1449 ctx.config,
1450 project_id,
1451 ctx.project_info,
1452 );
1453
1454 self.enforce_quotas(
1455 managed_envelope,
1456 Annotated::empty(),
1457 &mut extracted_metrics,
1458 ctx,
1459 )
1460 .await?;
1461
1462 report::process_user_reports(managed_envelope);
1463 let attachments = managed_envelope
1464 .envelope_mut()
1465 .items_mut()
1466 .filter(|i| i.ty() == &ItemType::Attachment);
1467 processing::utils::attachments::scrub(attachments, ctx.project_info);
1468
1469 Ok(Some(extracted_metrics))
1470 }
1471
1472 async fn process_client_reports(
1474 &self,
1475 managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1476 ctx: processing::Context<'_>,
1477 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1478 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1479
1480 self.enforce_quotas(
1481 managed_envelope,
1482 Annotated::empty(),
1483 &mut extracted_metrics,
1484 ctx,
1485 )
1486 .await?;
1487
1488 report::process_client_reports(
1489 managed_envelope,
1490 ctx.config,
1491 ctx.project_info,
1492 self.inner.addrs.outcome_aggregator.clone(),
1493 );
1494
1495 Ok(Some(extracted_metrics))
1496 }
1497
1498 async fn process_replays(
1500 &self,
1501 managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1502 ctx: processing::Context<'_>,
1503 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1504 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1505
1506 replay::process(
1507 managed_envelope,
1508 ctx.global_config,
1509 ctx.config,
1510 ctx.project_info,
1511 &self.inner.geoip_lookup,
1512 )?;
1513
1514 self.enforce_quotas(
1515 managed_envelope,
1516 Annotated::empty(),
1517 &mut extracted_metrics,
1518 ctx,
1519 )
1520 .await?;
1521
1522 Ok(Some(extracted_metrics))
1523 }
1524
1525 async fn process_nel(
1526 &self,
1527 mut managed_envelope: ManagedEnvelope,
1528 ctx: processing::Context<'_>,
1529 ) -> Result<ProcessingResult, ProcessingError> {
1530 nel::convert_to_logs(&mut managed_envelope);
1531 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1532 .await
1533 }
1534
1535 async fn process_with_processor<P: processing::Processor>(
1536 &self,
1537 processor: &P,
1538 mut managed_envelope: ManagedEnvelope,
1539 ctx: processing::Context<'_>,
1540 ) -> Result<ProcessingResult, ProcessingError>
1541 where
1542 Outputs: From<P::Output>,
1543 {
1544 let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1545 debug_assert!(
1546 false,
1547 "there must be work for the {} processor",
1548 std::any::type_name::<P>(),
1549 );
1550 return Err(ProcessingError::ProcessingGroupMismatch);
1551 };
1552
1553 managed_envelope.update();
1554 match managed_envelope.envelope().is_empty() {
1555 true => managed_envelope.accept(),
1556 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1557 }
1558
1559 processor
1560 .process(work, ctx)
1561 .await
1562 .map_err(|err| {
1563 relay_log::debug!(
1564 error = &err as &dyn std::error::Error,
1565 "processing pipeline failed"
1566 );
1567 ProcessingError::ProcessingFailure
1568 })
1569 .map(|o| o.map(Into::into))
1570 .map(ProcessingResult::Output)
1571 }
1572
1573 async fn process_standalone_spans(
1577 &self,
1578 managed_envelope: &mut TypedEnvelope<SpanGroup>,
1579 _project_id: ProjectId,
1580 ctx: processing::Context<'_>,
1581 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1582 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1583
1584 span::filter(managed_envelope, ctx.config, ctx.project_info);
1585 span::convert_otel_traces_data(managed_envelope);
1586
1587 if_processing!(self.inner.config, {
1588 span::process(
1589 managed_envelope,
1590 &mut Annotated::empty(),
1591 &mut extracted_metrics,
1592 _project_id,
1593 ctx,
1594 &self.inner.geoip_lookup,
1595 )
1596 .await;
1597 });
1598
1599 self.enforce_quotas(
1600 managed_envelope,
1601 Annotated::empty(),
1602 &mut extracted_metrics,
1603 ctx,
1604 )
1605 .await?;
1606
1607 Ok(Some(extracted_metrics))
1608 }
1609
1610 async fn process_envelope(
1611 &self,
1612 project_id: ProjectId,
1613 message: ProcessEnvelopeGrouped<'_>,
1614 ) -> Result<ProcessingResult, ProcessingError> {
1615 let ProcessEnvelopeGrouped {
1616 group,
1617 envelope: mut managed_envelope,
1618 ctx,
1619 } = message;
1620
1621 if let Some(sampling_state) = ctx.sampling_project_info {
1623 managed_envelope
1626 .envelope_mut()
1627 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1628 }
1629
1630 if let Some(retention) = ctx.project_info.config.event_retention {
1633 managed_envelope.envelope_mut().set_retention(retention);
1634 }
1635
1636 if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1639 managed_envelope
1640 .envelope_mut()
1641 .set_downsampled_retention(retention);
1642 }
1643
1644 managed_envelope
1649 .envelope_mut()
1650 .meta_mut()
1651 .set_project_id(project_id);
1652
1653 macro_rules! run {
1654 ($fn_name:ident $(, $args:expr)*) => {
1655 async {
1656 let mut managed_envelope = (managed_envelope, group).try_into()?;
1657 match self.$fn_name(&mut managed_envelope, $($args),*).await {
1658 Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1659 managed_envelope: managed_envelope.into_processed(),
1660 extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1661 }),
1662 Err(error) => {
1663 relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1664 if let Some(outcome) = error.to_outcome() {
1665 managed_envelope.reject(outcome);
1666 }
1667
1668 return Err(error);
1669 }
1670 }
1671 }.await
1672 };
1673 }
1674
1675 relay_log::trace!("Processing {group} group", group = group.variant());
1676
1677 match group {
1678 ProcessingGroup::Error => run!(process_errors, project_id, ctx),
1679 ProcessingGroup::Transaction => {
1680 self.process_with_processor(
1681 &self.inner.processing.transactions,
1682 managed_envelope,
1683 ctx,
1684 )
1685 .await
1686 }
1687 ProcessingGroup::Session => {
1688 self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1689 .await
1690 }
1691 ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx),
1692 ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
1693 ProcessingGroup::Replay => {
1694 run!(process_replays, ctx)
1695 }
1696 ProcessingGroup::CheckIn => {
1697 self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1698 .await
1699 }
1700 ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1701 ProcessingGroup::Log => {
1702 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1703 .await
1704 }
1705 ProcessingGroup::TraceMetric => {
1706 self.process_with_processor(
1707 &self.inner.processing.trace_metrics,
1708 managed_envelope,
1709 ctx,
1710 )
1711 .await
1712 }
1713 ProcessingGroup::SpanV2 => {
1714 self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1715 .await
1716 }
1717 ProcessingGroup::TraceAttachment => {
1718 self.process_with_processor(
1719 &self.inner.processing.trace_attachments,
1720 managed_envelope,
1721 ctx,
1722 )
1723 .await
1724 }
1725 ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1726 ProcessingGroup::ProfileChunk => {
1727 self.process_with_processor(
1728 &self.inner.processing.profile_chunks,
1729 managed_envelope,
1730 ctx,
1731 )
1732 .await
1733 }
1734 ProcessingGroup::Metrics => {
1736 if self.inner.config.relay_mode() != RelayMode::Proxy {
1739 relay_log::error!(
1740 tags.project = %project_id,
1741 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1742 "received metrics in the process_state"
1743 );
1744 }
1745
1746 Ok(ProcessingResult::no_metrics(
1747 managed_envelope.into_processed(),
1748 ))
1749 }
1750 ProcessingGroup::Ungrouped => {
1752 relay_log::error!(
1753 tags.project = %project_id,
1754 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1755 "could not identify the processing group based on the envelope's items"
1756 );
1757
1758 Ok(ProcessingResult::no_metrics(
1759 managed_envelope.into_processed(),
1760 ))
1761 }
1762 ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
1766 managed_envelope.into_processed(),
1767 )),
1768 }
1769 }
1770
1771 async fn process<'a>(
1777 &self,
1778 mut message: ProcessEnvelopeGrouped<'a>,
1779 ) -> Result<Option<Submit<'a>>, ProcessingError> {
1780 let ProcessEnvelopeGrouped {
1781 ref mut envelope,
1782 ctx,
1783 ..
1784 } = message;
1785
1786 let Some(project_id) = ctx
1793 .project_info
1794 .project_id
1795 .or_else(|| envelope.envelope().meta().project_id())
1796 else {
1797 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1798 return Err(ProcessingError::MissingProjectId);
1799 };
1800
1801 let client = envelope.envelope().meta().client().map(str::to_owned);
1802 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1803 let project_key = envelope.envelope().meta().public_key();
1804 let sampling_key = envelope
1808 .envelope()
1809 .sampling_key()
1810 .filter(|_| ctx.sampling_project_info.is_some());
1811
1812 relay_log::configure_scope(|scope| {
1815 scope.set_tag("project", project_id);
1816 if let Some(client) = client {
1817 scope.set_tag("sdk", client);
1818 }
1819 if let Some(user_agent) = user_agent {
1820 scope.set_extra("user_agent", user_agent.into());
1821 }
1822 });
1823
1824 let result = match self.process_envelope(project_id, message).await {
1825 Ok(ProcessingResult::Envelope {
1826 mut managed_envelope,
1827 extracted_metrics,
1828 }) => {
1829 managed_envelope.update();
1832
1833 let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
1834 send_metrics(
1835 extracted_metrics.metrics,
1836 project_key,
1837 sampling_key,
1838 &self.inner.addrs.aggregator,
1839 );
1840
1841 let envelope_response = if managed_envelope.envelope().is_empty() {
1842 if !has_metrics {
1843 managed_envelope.reject(Outcome::RateLimited(None));
1845 } else {
1846 managed_envelope.accept();
1847 }
1848
1849 None
1850 } else {
1851 Some(managed_envelope)
1852 };
1853
1854 Ok(envelope_response.map(Submit::Envelope))
1855 }
1856 Ok(ProcessingResult::Output(Output { main, metrics })) => {
1857 if let Some(metrics) = metrics {
1858 metrics.accept(|metrics| {
1859 send_metrics(
1860 metrics,
1861 project_key,
1862 sampling_key,
1863 &self.inner.addrs.aggregator,
1864 );
1865 });
1866 }
1867
1868 let ctx = ctx.to_forward();
1869 Ok(main.map(|output| Submit::Output { output, ctx }))
1870 }
1871 Err(err) => Err(err),
1872 };
1873
1874 relay_log::configure_scope(|scope| {
1875 scope.remove_tag("project");
1876 scope.remove_tag("sdk");
1877 scope.remove_tag("user_agent");
1878 });
1879
1880 result
1881 }
1882
1883 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1884 let project_key = message.envelope.envelope().meta().public_key();
1885 let wait_time = message.envelope.age();
1886 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1887
1888 cogs.cancel();
1891
1892 let scoping = message.envelope.scoping();
1893 for (group, envelope) in ProcessingGroup::split_envelope(
1894 *message.envelope.into_envelope(),
1895 &message.project_info,
1896 ) {
1897 let mut cogs = self
1898 .inner
1899 .cogs
1900 .timed(ResourceId::Relay, AppFeature::from(group));
1901
1902 let mut envelope =
1903 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1904 envelope.scope(scoping);
1905
1906 let global_config = self.inner.global_config.current();
1907
1908 let ctx = processing::Context {
1909 config: &self.inner.config,
1910 global_config: &global_config,
1911 project_info: &message.project_info,
1912 sampling_project_info: message.sampling_project_info.as_deref(),
1913 rate_limits: &message.rate_limits,
1914 reservoir_counters: &message.reservoir_counters,
1915 };
1916
1917 let message = ProcessEnvelopeGrouped {
1918 group,
1919 envelope,
1920 ctx,
1921 };
1922
1923 let result = metric!(
1924 timer(RelayTimers::EnvelopeProcessingTime),
1925 group = group.variant(),
1926 { self.process(message).await }
1927 );
1928
1929 match result {
1930 Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
1931 Ok(None) => {}
1932 Err(error) if error.is_unexpected() => {
1933 relay_log::error!(
1934 tags.project_key = %project_key,
1935 error = &error as &dyn Error,
1936 "error processing envelope"
1937 )
1938 }
1939 Err(error) => {
1940 relay_log::debug!(
1941 tags.project_key = %project_key,
1942 error = &error as &dyn Error,
1943 "error processing envelope"
1944 )
1945 }
1946 }
1947 }
1948 }
1949
1950 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1951 let ProcessMetrics {
1952 data,
1953 project_key,
1954 received_at,
1955 sent_at,
1956 source,
1957 } = message;
1958
1959 let received_timestamp =
1960 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1961
1962 let mut buckets = data.into_buckets(received_timestamp);
1963 if buckets.is_empty() {
1964 return;
1965 };
1966 cogs.update(relay_metrics::cogs::BySize(&buckets));
1967
1968 let clock_drift_processor =
1969 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
1970
1971 buckets.retain_mut(|bucket| {
1972 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
1973 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
1974 return false;
1975 }
1976
1977 if !self::metrics::is_valid_namespace(bucket) {
1978 return false;
1979 }
1980
1981 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
1982
1983 if !matches!(source, BucketSource::Internal) {
1984 bucket.metadata = BucketMetadata::new(received_timestamp);
1985 }
1986
1987 true
1988 });
1989
1990 let project = self.inner.project_cache.get(project_key);
1991
1992 let buckets = match project.state() {
1995 ProjectState::Enabled(project_info) => {
1996 let rate_limits = project.rate_limits().current_limits();
1997 self.check_buckets(project_key, project_info, &rate_limits, buckets)
1998 }
1999 _ => buckets,
2000 };
2001
2002 relay_log::trace!("merging metric buckets into the aggregator");
2003 self.inner
2004 .addrs
2005 .aggregator
2006 .send(MergeBuckets::new(project_key, buckets));
2007 }
2008
2009 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2010 let ProcessBatchedMetrics {
2011 payload,
2012 source,
2013 received_at,
2014 sent_at,
2015 } = message;
2016
2017 #[derive(serde::Deserialize)]
2018 struct Wrapper {
2019 buckets: HashMap<ProjectKey, Vec<Bucket>>,
2020 }
2021
2022 let buckets = match serde_json::from_slice(&payload) {
2023 Ok(Wrapper { buckets }) => buckets,
2024 Err(error) => {
2025 relay_log::debug!(
2026 error = &error as &dyn Error,
2027 "failed to parse batched metrics",
2028 );
2029 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2030 return;
2031 }
2032 };
2033
2034 for (project_key, buckets) in buckets {
2035 self.handle_process_metrics(
2036 cogs,
2037 ProcessMetrics {
2038 data: MetricData::Parsed(buckets),
2039 project_key,
2040 source,
2041 received_at,
2042 sent_at,
2043 },
2044 )
2045 }
2046 }
2047
2048 fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2049 let _submit = cogs.start_category("submit");
2050
2051 #[cfg(feature = "processing")]
2052 if self.inner.config.processing_enabled()
2053 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2054 {
2055 use crate::processing::StoreHandle;
2056
2057 let upload = self.inner.addrs.upload.as_ref();
2058 match submit {
2059 Submit::Envelope(envelope) => {
2060 let envelope_has_attachments = envelope
2061 .envelope()
2062 .items()
2063 .any(|item| *item.ty() == ItemType::Attachment);
2064 let use_objectstore = || {
2066 let options = &self.inner.global_config.current().options;
2067 utils::sample(options.objectstore_attachments_sample_rate).is_keep()
2068 };
2069
2070 if let Some(upload) = &self.inner.addrs.upload
2071 && envelope_has_attachments
2072 && use_objectstore()
2073 {
2074 upload.send(StoreEnvelope { envelope })
2076 } else {
2077 store_forwarder.send(StoreEnvelope { envelope })
2078 }
2079 }
2080 Submit::Output { output, ctx } => output
2081 .forward_store(StoreHandle::new(store_forwarder, upload), ctx)
2082 .unwrap_or_else(|err| err.into_inner()),
2083 }
2084 return;
2085 }
2086
2087 let mut envelope = match submit {
2088 Submit::Envelope(envelope) => envelope,
2089 Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2090 Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2091 Err(_) => {
2092 relay_log::error!("failed to serialize output to an envelope");
2093 return;
2094 }
2095 },
2096 };
2097
2098 if envelope.envelope_mut().is_empty() {
2099 envelope.accept();
2100 return;
2101 }
2102
2103 envelope.envelope_mut().set_sent_at(Utc::now());
2109
2110 relay_log::trace!("sending envelope to sentry endpoint");
2111 let http_encoding = self.inner.config.http_encoding();
2112 let result = envelope.envelope().to_vec().and_then(|v| {
2113 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2114 });
2115
2116 match result {
2117 Ok(body) => {
2118 self.inner
2119 .addrs
2120 .upstream_relay
2121 .send(SendRequest(SendEnvelope {
2122 envelope,
2123 body,
2124 http_encoding,
2125 project_cache: self.inner.project_cache.clone(),
2126 }));
2127 }
2128 Err(error) => {
2129 relay_log::error!(
2132 error = &error as &dyn Error,
2133 tags.project_key = %envelope.scoping().project_key,
2134 "failed to serialize envelope payload"
2135 );
2136
2137 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2138 }
2139 }
2140 }
2141
2142 fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2143 let SubmitClientReports {
2144 client_reports,
2145 scoping,
2146 } = message;
2147
2148 let upstream = self.inner.config.upstream_descriptor();
2149 let dsn = PartialDsn::outbound(&scoping, upstream);
2150
2151 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2152 for client_report in client_reports {
2153 match client_report.serialize() {
2154 Ok(payload) => {
2155 let mut item = Item::new(ItemType::ClientReport);
2156 item.set_payload(ContentType::Json, payload);
2157 envelope.add_item(item);
2158 }
2159 Err(error) => {
2160 relay_log::error!(
2161 error = &error as &dyn std::error::Error,
2162 "failed to serialize client report"
2163 );
2164 }
2165 }
2166 }
2167
2168 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2169 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2170 }
2171
2172 fn check_buckets(
2173 &self,
2174 project_key: ProjectKey,
2175 project_info: &ProjectInfo,
2176 rate_limits: &RateLimits,
2177 buckets: Vec<Bucket>,
2178 ) -> Vec<Bucket> {
2179 let Some(scoping) = project_info.scoping(project_key) else {
2180 relay_log::error!(
2181 tags.project_key = project_key.as_str(),
2182 "there is no scoping: dropping {} buckets",
2183 buckets.len(),
2184 );
2185 return Vec::new();
2186 };
2187
2188 let mut buckets = self::metrics::apply_project_info(
2189 buckets,
2190 &self.inner.metric_outcomes,
2191 project_info,
2192 scoping,
2193 );
2194
2195 let namespaces: BTreeSet<MetricNamespace> = buckets
2196 .iter()
2197 .filter_map(|bucket| bucket.name.try_namespace())
2198 .collect();
2199
2200 for namespace in namespaces {
2201 let limits = rate_limits.check_with_quotas(
2202 project_info.get_quotas(),
2203 scoping.item(DataCategory::MetricBucket),
2204 );
2205
2206 if limits.is_limited() {
2207 let rejected;
2208 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2209 bucket.name.try_namespace() == Some(namespace)
2210 });
2211
2212 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2213 self.inner.metric_outcomes.track(
2214 scoping,
2215 &rejected,
2216 Outcome::RateLimited(reason_code),
2217 );
2218 }
2219 }
2220
2221 let quotas = project_info.config.quotas.clone();
2222 match MetricsLimiter::create(buckets, quotas, scoping) {
2223 Ok(mut bucket_limiter) => {
2224 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2225 bucket_limiter.into_buckets()
2226 }
2227 Err(buckets) => buckets,
2228 }
2229 }
2230
2231 #[cfg(feature = "processing")]
2232 async fn rate_limit_buckets(
2233 &self,
2234 scoping: Scoping,
2235 project_info: &ProjectInfo,
2236 mut buckets: Vec<Bucket>,
2237 ) -> Vec<Bucket> {
2238 let Some(rate_limiter) = &self.inner.rate_limiter else {
2239 return buckets;
2240 };
2241
2242 let global_config = self.inner.global_config.current();
2243 let namespaces = buckets
2244 .iter()
2245 .filter_map(|bucket| bucket.name.try_namespace())
2246 .counts();
2247
2248 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2249
2250 for (namespace, quantity) in namespaces {
2251 let item_scoping = scoping.metric_bucket(namespace);
2252
2253 let limits = match rate_limiter
2254 .is_rate_limited(quotas, item_scoping, quantity, false)
2255 .await
2256 {
2257 Ok(limits) => limits,
2258 Err(err) => {
2259 relay_log::error!(
2260 error = &err as &dyn std::error::Error,
2261 "failed to check redis rate limits"
2262 );
2263 break;
2264 }
2265 };
2266
2267 if limits.is_limited() {
2268 let rejected;
2269 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2270 bucket.name.try_namespace() == Some(namespace)
2271 });
2272
2273 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2274 self.inner.metric_outcomes.track(
2275 scoping,
2276 &rejected,
2277 Outcome::RateLimited(reason_code),
2278 );
2279
2280 self.inner
2281 .project_cache
2282 .get(item_scoping.scoping.project_key)
2283 .rate_limits()
2284 .merge(limits);
2285 }
2286 }
2287
2288 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2289 Err(buckets) => buckets,
2290 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2291 }
2292 }
2293
2294 #[cfg(feature = "processing")]
2296 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2297 relay_log::trace!("handle_rate_limit_buckets");
2298
2299 let scoping = *bucket_limiter.scoping();
2300
2301 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2302 let global_config = self.inner.global_config.current();
2303 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2304
2305 let over_accept_once = true;
2308 let mut rate_limits = RateLimits::new();
2309
2310 for category in [DataCategory::Transaction, DataCategory::Span] {
2311 let count = bucket_limiter.count(category);
2312
2313 let timer = Instant::now();
2314 let mut is_limited = false;
2315
2316 if let Some(count) = count {
2317 match rate_limiter
2318 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2319 .await
2320 {
2321 Ok(limits) => {
2322 is_limited = limits.is_limited();
2323 rate_limits.merge(limits)
2324 }
2325 Err(e) => relay_log::error!(error = &e as &dyn Error),
2326 }
2327 }
2328
2329 relay_statsd::metric!(
2330 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2331 category = category.name(),
2332 limited = if is_limited { "true" } else { "false" },
2333 count = match count {
2334 None => "none",
2335 Some(0) => "0",
2336 Some(1) => "1",
2337 Some(1..=10) => "10",
2338 Some(1..=25) => "25",
2339 Some(1..=50) => "50",
2340 Some(51..=100) => "100",
2341 Some(101..=500) => "500",
2342 _ => "> 500",
2343 },
2344 );
2345 }
2346
2347 if rate_limits.is_limited() {
2348 let was_enforced =
2349 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2350
2351 if was_enforced {
2352 self.inner
2354 .project_cache
2355 .get(scoping.project_key)
2356 .rate_limits()
2357 .merge(rate_limits);
2358 }
2359 }
2360 }
2361
2362 bucket_limiter.into_buckets()
2363 }
2364
2365 #[cfg(feature = "processing")]
2367 async fn cardinality_limit_buckets(
2368 &self,
2369 scoping: Scoping,
2370 limits: &[CardinalityLimit],
2371 buckets: Vec<Bucket>,
2372 ) -> Vec<Bucket> {
2373 let global_config = self.inner.global_config.current();
2374 let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2375
2376 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2377 return buckets;
2378 }
2379
2380 let Some(ref limiter) = self.inner.cardinality_limiter else {
2381 return buckets;
2382 };
2383
2384 let scope = relay_cardinality::Scoping {
2385 organization_id: scoping.organization_id,
2386 project_id: scoping.project_id,
2387 };
2388
2389 let limits = match limiter
2390 .check_cardinality_limits(scope, limits, buckets)
2391 .await
2392 {
2393 Ok(limits) => limits,
2394 Err((buckets, error)) => {
2395 relay_log::error!(
2396 error = &error as &dyn std::error::Error,
2397 "cardinality limiter failed"
2398 );
2399 return buckets;
2400 }
2401 };
2402
2403 let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2404 if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2405 for limit in limits.exceeded_limits() {
2406 relay_log::with_scope(
2407 |scope| {
2408 scope.set_user(Some(relay_log::sentry::User {
2410 id: Some(scoping.organization_id.to_string()),
2411 ..Default::default()
2412 }));
2413 },
2414 || {
2415 relay_log::error!(
2416 tags.organization_id = scoping.organization_id.value(),
2417 tags.limit_id = limit.id,
2418 tags.passive = limit.passive,
2419 "Cardinality Limit"
2420 );
2421 },
2422 );
2423 }
2424 }
2425
2426 for (limit, reports) in limits.cardinality_reports() {
2427 for report in reports {
2428 self.inner
2429 .metric_outcomes
2430 .cardinality(scoping, limit, report);
2431 }
2432 }
2433
2434 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2435 return limits.into_source();
2436 }
2437
2438 let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2439
2440 for (bucket, exceeded) in rejected {
2441 self.inner.metric_outcomes.track(
2442 scoping,
2443 &[bucket],
2444 Outcome::CardinalityLimited(exceeded.id.clone()),
2445 );
2446 }
2447 accepted
2448 }
2449
2450 #[cfg(feature = "processing")]
2457 async fn encode_metrics_processing(
2458 &self,
2459 message: FlushBuckets,
2460 store_forwarder: &Addr<Store>,
2461 ) {
2462 use crate::constants::DEFAULT_EVENT_RETENTION;
2463 use crate::services::store::StoreMetrics;
2464
2465 for ProjectBuckets {
2466 buckets,
2467 scoping,
2468 project_info,
2469 ..
2470 } in message.buckets.into_values()
2471 {
2472 let buckets = self
2473 .rate_limit_buckets(scoping, &project_info, buckets)
2474 .await;
2475
2476 let limits = project_info.get_cardinality_limits();
2477 let buckets = self
2478 .cardinality_limit_buckets(scoping, limits, buckets)
2479 .await;
2480
2481 if buckets.is_empty() {
2482 continue;
2483 }
2484
2485 let retention = project_info
2486 .config
2487 .event_retention
2488 .unwrap_or(DEFAULT_EVENT_RETENTION);
2489
2490 store_forwarder.send(StoreMetrics {
2493 buckets,
2494 scoping,
2495 retention,
2496 });
2497 }
2498 }
2499
2500 fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2512 let FlushBuckets {
2513 partition_key,
2514 buckets,
2515 } = message;
2516
2517 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2518 let upstream = self.inner.config.upstream_descriptor();
2519
2520 for ProjectBuckets {
2521 buckets, scoping, ..
2522 } in buckets.values()
2523 {
2524 let dsn = PartialDsn::outbound(scoping, upstream);
2525
2526 relay_statsd::metric!(
2527 distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2528 );
2529
2530 let mut num_batches = 0;
2531 for batch in BucketsView::from(buckets).by_size(batch_size) {
2532 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2533
2534 let mut item = Item::new(ItemType::MetricBuckets);
2535 item.set_source_quantities(crate::metrics::extract_quantities(batch));
2536 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2537 envelope.add_item(item);
2538
2539 let mut envelope =
2540 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2541 envelope
2542 .set_partition_key(Some(partition_key))
2543 .scope(*scoping);
2544
2545 relay_statsd::metric!(
2546 distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2547 );
2548
2549 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2550 num_batches += 1;
2551 }
2552
2553 relay_statsd::metric!(
2554 distribution(RelayDistributions::BatchesPerPartition) = num_batches
2555 );
2556 }
2557 }
2558
2559 fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2561 if partition.is_empty() {
2562 return;
2563 }
2564
2565 let (unencoded, project_info) = partition.take();
2566 let http_encoding = self.inner.config.http_encoding();
2567 let encoded = match encode_payload(&unencoded, http_encoding) {
2568 Ok(payload) => payload,
2569 Err(error) => {
2570 let error = &error as &dyn std::error::Error;
2571 relay_log::error!(error, "failed to encode metrics payload");
2572 return;
2573 }
2574 };
2575
2576 let request = SendMetricsRequest {
2577 partition_key: partition_key.to_string(),
2578 unencoded,
2579 encoded,
2580 project_info,
2581 http_encoding,
2582 metric_outcomes: self.inner.metric_outcomes.clone(),
2583 };
2584
2585 self.inner.addrs.upstream_relay.send(SendRequest(request));
2586 }
2587
2588 fn encode_metrics_global(&self, message: FlushBuckets) {
2603 let FlushBuckets {
2604 partition_key,
2605 buckets,
2606 } = message;
2607
2608 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2609 let mut partition = Partition::new(batch_size);
2610 let mut partition_splits = 0;
2611
2612 for ProjectBuckets {
2613 buckets, scoping, ..
2614 } in buckets.values()
2615 {
2616 for bucket in buckets {
2617 let mut remaining = Some(BucketView::new(bucket));
2618
2619 while let Some(bucket) = remaining.take() {
2620 if let Some(next) = partition.insert(bucket, *scoping) {
2621 self.send_global_partition(partition_key, &mut partition);
2625 remaining = Some(next);
2626 partition_splits += 1;
2627 }
2628 }
2629 }
2630 }
2631
2632 if partition_splits > 0 {
2633 metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2634 }
2635
2636 self.send_global_partition(partition_key, &mut partition);
2637 }
2638
2639 async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2640 for (project_key, pb) in message.buckets.iter_mut() {
2641 let buckets = std::mem::take(&mut pb.buckets);
2642 pb.buckets =
2643 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2644 }
2645
2646 #[cfg(feature = "processing")]
2647 if self.inner.config.processing_enabled()
2648 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2649 {
2650 return self
2651 .encode_metrics_processing(message, store_forwarder)
2652 .await;
2653 }
2654
2655 if self.inner.config.http_global_metrics() {
2656 self.encode_metrics_global(message)
2657 } else {
2658 self.encode_metrics_envelope(cogs, message)
2659 }
2660 }
2661
2662 #[cfg(all(test, feature = "processing"))]
2663 fn redis_rate_limiter_enabled(&self) -> bool {
2664 self.inner.rate_limiter.is_some()
2665 }
2666
2667 async fn handle_message(&self, message: EnvelopeProcessor) {
2668 let ty = message.variant();
2669 let feature_weights = self.feature_weights(&message);
2670
2671 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2672 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2673
2674 match message {
2675 EnvelopeProcessor::ProcessEnvelope(m) => {
2676 self.handle_process_envelope(&mut cogs, *m).await
2677 }
2678 EnvelopeProcessor::ProcessProjectMetrics(m) => {
2679 self.handle_process_metrics(&mut cogs, *m)
2680 }
2681 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2682 self.handle_process_batched_metrics(&mut cogs, *m)
2683 }
2684 EnvelopeProcessor::FlushBuckets(m) => {
2685 self.handle_flush_buckets(&mut cogs, *m).await
2686 }
2687 EnvelopeProcessor::SubmitClientReports(m) => {
2688 self.handle_submit_client_reports(&mut cogs, *m)
2689 }
2690 }
2691 });
2692 }
2693
2694 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2695 match message {
2696 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2698 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2699 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2700 EnvelopeProcessor::FlushBuckets(v) => v
2701 .buckets
2702 .values()
2703 .map(|s| {
2704 if self.inner.config.processing_enabled() {
2705 relay_metrics::cogs::ByCount(&s.buckets).into()
2708 } else {
2709 relay_metrics::cogs::BySize(&s.buckets).into()
2710 }
2711 })
2712 .fold(FeatureWeights::none(), FeatureWeights::merge),
2713 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2714 }
2715 }
2716}
2717
2718impl Service for EnvelopeProcessorService {
2719 type Interface = EnvelopeProcessor;
2720
2721 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2722 while let Some(message) = rx.recv().await {
2723 let service = self.clone();
2724 self.inner
2725 .pool
2726 .spawn_async(
2727 async move {
2728 service.handle_message(message).await;
2729 }
2730 .boxed(),
2731 )
2732 .await;
2733 }
2734 }
2735}
2736
2737struct EnforcementResult {
2742 event: Annotated<Event>,
2743 #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2744 rate_limits: RateLimits,
2745}
2746
2747impl EnforcementResult {
2748 pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
2750 Self { event, rate_limits }
2751 }
2752}
2753
2754#[derive(Clone)]
2755enum RateLimiter {
2756 Cached,
2757 #[cfg(feature = "processing")]
2758 Consistent(Arc<RedisRateLimiter>),
2759}
2760
2761impl RateLimiter {
2762 async fn enforce<Group>(
2763 &self,
2764 managed_envelope: &mut TypedEnvelope<Group>,
2765 event: Annotated<Event>,
2766 _extracted_metrics: &mut ProcessingExtractedMetrics,
2767 ctx: processing::Context<'_>,
2768 ) -> Result<EnforcementResult, ProcessingError> {
2769 if managed_envelope.envelope().is_empty() && event.value().is_none() {
2770 return Ok(EnforcementResult::new(event, RateLimits::default()));
2771 }
2772
2773 let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
2774 if quotas.is_empty() {
2775 return Ok(EnforcementResult::new(event, RateLimits::default()));
2776 }
2777
2778 let event_category = event_category(&event);
2779
2780 let this = self.clone();
2786 let mut envelope_limiter =
2787 EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
2788 let this = this.clone();
2789
2790 async move {
2791 match this {
2792 #[cfg(feature = "processing")]
2793 RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
2794 rate_limiter
2795 .is_rate_limited(quotas, item_scope, _quantity, false)
2796 .await?,
2797 ),
2798 _ => Ok::<_, ProcessingError>(
2799 ctx.rate_limits.check_with_quotas(quotas, item_scope),
2800 ),
2801 }
2802 }
2803 });
2804
2805 if let Some(category) = event_category {
2808 envelope_limiter.assume_event(category);
2809 }
2810
2811 let scoping = managed_envelope.scoping();
2812 let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
2813 envelope_limiter
2814 .compute(managed_envelope.envelope_mut(), &scoping)
2815 .await
2816 })?;
2817 let event_active = enforcement.is_event_active();
2818
2819 #[cfg(feature = "processing")]
2823 _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
2824 enforcement.apply_with_outcomes(managed_envelope);
2825
2826 if event_active {
2827 debug_assert!(managed_envelope.envelope().is_empty());
2828 return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
2829 }
2830
2831 Ok(EnforcementResult::new(event, rate_limits))
2832 }
2833
2834 fn name(&self) -> &'static str {
2835 match self {
2836 Self::Cached => "cached",
2837 #[cfg(feature = "processing")]
2838 Self::Consistent(_) => "consistent",
2839 }
2840 }
2841}
2842
2843pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2844 let envelope_body: Vec<u8> = match http_encoding {
2845 HttpEncoding::Identity => return Ok(body.clone()),
2846 HttpEncoding::Deflate => {
2847 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2848 encoder.write_all(body.as_ref())?;
2849 encoder.finish()?
2850 }
2851 HttpEncoding::Gzip => {
2852 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2853 encoder.write_all(body.as_ref())?;
2854 encoder.finish()?
2855 }
2856 HttpEncoding::Br => {
2857 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2859 encoder.write_all(body.as_ref())?;
2860 encoder.into_inner()
2861 }
2862 HttpEncoding::Zstd => {
2863 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2866 encoder.write_all(body.as_ref())?;
2867 encoder.finish()?
2868 }
2869 };
2870
2871 Ok(envelope_body.into())
2872}
2873
2874#[derive(Debug)]
2876pub struct SendEnvelope {
2877 pub envelope: TypedEnvelope<Processed>,
2878 pub body: Bytes,
2879 pub http_encoding: HttpEncoding,
2880 pub project_cache: ProjectCacheHandle,
2881}
2882
2883impl UpstreamRequest for SendEnvelope {
2884 fn method(&self) -> reqwest::Method {
2885 reqwest::Method::POST
2886 }
2887
2888 fn path(&self) -> Cow<'_, str> {
2889 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2890 }
2891
2892 fn route(&self) -> &'static str {
2893 "envelope"
2894 }
2895
2896 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2897 let envelope_body = self.body.clone();
2898 metric!(
2899 distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2900 );
2901
2902 let meta = &self.envelope.meta();
2903 let shard = self.envelope.partition_key().map(|p| p.to_string());
2904 builder
2905 .content_encoding(self.http_encoding)
2906 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2907 .header_opt("User-Agent", meta.user_agent())
2908 .header("X-Sentry-Auth", meta.auth_header())
2909 .header("X-Forwarded-For", meta.forwarded_for())
2910 .header("Content-Type", envelope::CONTENT_TYPE)
2911 .header_opt("X-Sentry-Relay-Shard", shard)
2912 .body(envelope_body);
2913
2914 Ok(())
2915 }
2916
2917 fn sign(&mut self) -> Option<Sign> {
2918 Some(Sign::Optional(SignatureType::RequestSign))
2919 }
2920
2921 fn respond(
2922 self: Box<Self>,
2923 result: Result<http::Response, UpstreamRequestError>,
2924 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2925 Box::pin(async move {
2926 let result = match result {
2927 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2928 Err(error) => Err(error),
2929 };
2930
2931 match result {
2932 Ok(()) => self.envelope.accept(),
2933 Err(error) if error.is_received() => {
2934 let scoping = self.envelope.scoping();
2935 self.envelope.accept();
2936
2937 if let UpstreamRequestError::RateLimited(limits) = error {
2938 self.project_cache
2939 .get(scoping.project_key)
2940 .rate_limits()
2941 .merge(limits.scope(&scoping));
2942 }
2943 }
2944 Err(error) => {
2945 let mut envelope = self.envelope;
2948 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2949 relay_log::error!(
2950 error = &error as &dyn Error,
2951 tags.project_key = %envelope.scoping().project_key,
2952 "error sending envelope"
2953 );
2954 }
2955 }
2956 })
2957 }
2958}
2959
2960#[derive(Debug)]
2967struct Partition<'a> {
2968 max_size: usize,
2969 remaining: usize,
2970 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2971 project_info: HashMap<ProjectKey, Scoping>,
2972}
2973
2974impl<'a> Partition<'a> {
2975 pub fn new(size: usize) -> Self {
2977 Self {
2978 max_size: size,
2979 remaining: size,
2980 views: HashMap::new(),
2981 project_info: HashMap::new(),
2982 }
2983 }
2984
2985 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
2996 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
2997
2998 if let Some(current) = current {
2999 self.remaining = self.remaining.saturating_sub(current.estimated_size());
3000 self.views
3001 .entry(scoping.project_key)
3002 .or_default()
3003 .push(current);
3004
3005 self.project_info
3006 .entry(scoping.project_key)
3007 .or_insert(scoping);
3008 }
3009
3010 next
3011 }
3012
3013 fn is_empty(&self) -> bool {
3015 self.views.is_empty()
3016 }
3017
3018 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3022 #[derive(serde::Serialize)]
3023 struct Wrapper<'a> {
3024 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3025 }
3026
3027 let buckets = &self.views;
3028 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3029
3030 let scopings = self.project_info.clone();
3031 self.project_info.clear();
3032
3033 self.views.clear();
3034 self.remaining = self.max_size;
3035
3036 (payload, scopings)
3037 }
3038}
3039
3040#[derive(Debug)]
3044struct SendMetricsRequest {
3045 partition_key: String,
3047 unencoded: Bytes,
3049 encoded: Bytes,
3051 project_info: HashMap<ProjectKey, Scoping>,
3055 http_encoding: HttpEncoding,
3057 metric_outcomes: MetricOutcomes,
3059}
3060
3061impl SendMetricsRequest {
3062 fn create_error_outcomes(self) {
3063 #[derive(serde::Deserialize)]
3064 struct Wrapper {
3065 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3066 }
3067
3068 let buckets = match serde_json::from_slice(&self.unencoded) {
3069 Ok(Wrapper { buckets }) => buckets,
3070 Err(err) => {
3071 relay_log::error!(
3072 error = &err as &dyn std::error::Error,
3073 "failed to parse buckets from failed transmission"
3074 );
3075 return;
3076 }
3077 };
3078
3079 for (key, buckets) in buckets {
3080 let Some(&scoping) = self.project_info.get(&key) else {
3081 relay_log::error!("missing scoping for project key");
3082 continue;
3083 };
3084
3085 self.metric_outcomes.track(
3086 scoping,
3087 &buckets,
3088 Outcome::Invalid(DiscardReason::Internal),
3089 );
3090 }
3091 }
3092}
3093
3094impl UpstreamRequest for SendMetricsRequest {
3095 fn set_relay_id(&self) -> bool {
3096 true
3097 }
3098
3099 fn sign(&mut self) -> Option<Sign> {
3100 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3101 }
3102
3103 fn method(&self) -> reqwest::Method {
3104 reqwest::Method::POST
3105 }
3106
3107 fn path(&self) -> Cow<'_, str> {
3108 "/api/0/relays/metrics/".into()
3109 }
3110
3111 fn route(&self) -> &'static str {
3112 "global_metrics"
3113 }
3114
3115 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3116 metric!(
3117 distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3118 );
3119
3120 builder
3121 .content_encoding(self.http_encoding)
3122 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3123 .header(header::CONTENT_TYPE, b"application/json")
3124 .body(self.encoded.clone());
3125
3126 Ok(())
3127 }
3128
3129 fn respond(
3130 self: Box<Self>,
3131 result: Result<http::Response, UpstreamRequestError>,
3132 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3133 Box::pin(async {
3134 match result {
3135 Ok(mut response) => {
3136 response.consume().await.ok();
3137 }
3138 Err(error) => {
3139 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3140
3141 if error.is_received() {
3144 return;
3145 }
3146
3147 self.create_error_outcomes()
3148 }
3149 }
3150 })
3151 }
3152}
3153
3154#[derive(Copy, Clone, Debug)]
3156struct CombinedQuotas<'a> {
3157 global_quotas: &'a [Quota],
3158 project_quotas: &'a [Quota],
3159}
3160
3161impl<'a> CombinedQuotas<'a> {
3162 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3164 Self {
3165 global_quotas: &global_config.quotas,
3166 project_quotas,
3167 }
3168 }
3169
3170 pub fn is_empty(&self) -> bool {
3172 self.len() == 0
3173 }
3174
3175 pub fn len(&self) -> usize {
3177 self.global_quotas.len() + self.project_quotas.len()
3178 }
3179}
3180
3181impl<'a> IntoIterator for CombinedQuotas<'a> {
3182 type Item = &'a Quota;
3183 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3184
3185 fn into_iter(self) -> Self::IntoIter {
3186 self.global_quotas.iter().chain(self.project_quotas.iter())
3187 }
3188}
3189
3190#[cfg(test)]
3191mod tests {
3192 use std::collections::BTreeMap;
3193
3194 use insta::assert_debug_snapshot;
3195 use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3196 use relay_common::glob2::LazyGlob;
3197 use relay_dynamic_config::ProjectConfig;
3198 use relay_event_normalization::{
3199 MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3200 TransactionNameRule,
3201 };
3202 use relay_event_schema::protocol::TransactionSource;
3203 use relay_pii::DataScrubbingConfig;
3204 use similar_asserts::assert_eq;
3205
3206 use crate::metrics_extraction::IntoMetric;
3207 use crate::metrics_extraction::transactions::types::{
3208 CommonTags, TransactionMeasurementTags, TransactionMetric,
3209 };
3210 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3211
3212 #[cfg(feature = "processing")]
3213 use {
3214 relay_metrics::BucketValue,
3215 relay_quotas::{QuotaScope, ReasonCode},
3216 relay_test::mock_service,
3217 };
3218
3219 use super::*;
3220
3221 #[cfg(feature = "processing")]
3222 fn mock_quota(id: &str) -> Quota {
3223 Quota {
3224 id: Some(id.into()),
3225 categories: [DataCategory::MetricBucket].into(),
3226 scope: QuotaScope::Organization,
3227 scope_id: None,
3228 limit: Some(0),
3229 window: None,
3230 reason_code: None,
3231 namespace: None,
3232 }
3233 }
3234
3235 #[cfg(feature = "processing")]
3236 #[test]
3237 fn test_dynamic_quotas() {
3238 let global_config = GlobalConfig {
3239 quotas: vec![mock_quota("foo"), mock_quota("bar")],
3240 ..Default::default()
3241 };
3242
3243 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3244
3245 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3246
3247 assert_eq!(dynamic_quotas.len(), 4);
3248 assert!(!dynamic_quotas.is_empty());
3249
3250 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3251 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3252 }
3253
3254 #[cfg(feature = "processing")]
3257 #[tokio::test]
3258 async fn test_ratelimit_per_batch() {
3259 use relay_base_schema::organization::OrganizationId;
3260 use relay_protocol::FiniteF64;
3261
3262 let rate_limited_org = Scoping {
3263 organization_id: OrganizationId::new(1),
3264 project_id: ProjectId::new(21),
3265 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3266 key_id: Some(17),
3267 };
3268
3269 let not_rate_limited_org = Scoping {
3270 organization_id: OrganizationId::new(2),
3271 project_id: ProjectId::new(21),
3272 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3273 key_id: Some(17),
3274 };
3275
3276 let message = {
3277 let project_info = {
3278 let quota = Quota {
3279 id: Some("testing".into()),
3280 categories: [DataCategory::MetricBucket].into(),
3281 scope: relay_quotas::QuotaScope::Organization,
3282 scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3283 limit: Some(0),
3284 window: None,
3285 reason_code: Some(ReasonCode::new("test")),
3286 namespace: None,
3287 };
3288
3289 let mut config = ProjectConfig::default();
3290 config.quotas.push(quota);
3291
3292 Arc::new(ProjectInfo {
3293 config,
3294 ..Default::default()
3295 })
3296 };
3297
3298 let project_metrics = |scoping| ProjectBuckets {
3299 buckets: vec![Bucket {
3300 name: "d:transactions/bar".into(),
3301 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3302 timestamp: UnixTimestamp::now(),
3303 tags: Default::default(),
3304 width: 10,
3305 metadata: BucketMetadata::default(),
3306 }],
3307 rate_limits: Default::default(),
3308 project_info: project_info.clone(),
3309 scoping,
3310 };
3311
3312 let buckets = hashbrown::HashMap::from([
3313 (
3314 rate_limited_org.project_key,
3315 project_metrics(rate_limited_org),
3316 ),
3317 (
3318 not_rate_limited_org.project_key,
3319 project_metrics(not_rate_limited_org),
3320 ),
3321 ]);
3322
3323 FlushBuckets {
3324 partition_key: 0,
3325 buckets,
3326 }
3327 };
3328
3329 assert_eq!(message.buckets.keys().count(), 2);
3331
3332 let config = {
3333 let config_json = serde_json::json!({
3334 "processing": {
3335 "enabled": true,
3336 "kafka_config": [],
3337 "redis": {
3338 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3339 }
3340 }
3341 });
3342 Config::from_json_value(config_json).unwrap()
3343 };
3344
3345 let (store, handle) = {
3346 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3347 let org_id = match msg {
3348 Store::Metrics(x) => x.scoping.organization_id,
3349 _ => panic!("received envelope when expecting only metrics"),
3350 };
3351 org_ids.push(org_id);
3352 };
3353
3354 mock_service("store_forwarder", vec![], f)
3355 };
3356
3357 let processor = create_test_processor(config).await;
3358 assert!(processor.redis_rate_limiter_enabled());
3359
3360 processor.encode_metrics_processing(message, &store).await;
3361
3362 drop(store);
3363 let orgs_not_ratelimited = handle.await.unwrap();
3364
3365 assert_eq!(
3366 orgs_not_ratelimited,
3367 vec![not_rate_limited_org.organization_id]
3368 );
3369 }
3370
3371 #[tokio::test]
3372 async fn test_browser_version_extraction_with_pii_like_data() {
3373 let processor = create_test_processor(Default::default()).await;
3374 let outcome_aggregator = Addr::dummy();
3375 let event_id = EventId::new();
3376
3377 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3378 .parse()
3379 .unwrap();
3380
3381 let request_meta = RequestMeta::new(dsn);
3382 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3383
3384 envelope.add_item({
3385 let mut item = Item::new(ItemType::Event);
3386 item.set_payload(
3387 ContentType::Json,
3388 r#"
3389 {
3390 "request": {
3391 "headers": [
3392 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3393 ]
3394 }
3395 }
3396 "#,
3397 );
3398 item
3399 });
3400
3401 let mut datascrubbing_settings = DataScrubbingConfig::default();
3402 datascrubbing_settings.scrub_data = true;
3404 datascrubbing_settings.scrub_defaults = true;
3405 datascrubbing_settings.scrub_ip_addresses = true;
3406
3407 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3409
3410 let config = ProjectConfig {
3411 datascrubbing_settings,
3412 pii_config: Some(pii_config),
3413 ..Default::default()
3414 };
3415
3416 let project_info = ProjectInfo {
3417 config,
3418 ..Default::default()
3419 };
3420
3421 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3422 assert_eq!(envelopes.len(), 1);
3423
3424 let (group, envelope) = envelopes.pop().unwrap();
3425 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3426
3427 let message = ProcessEnvelopeGrouped {
3428 group,
3429 envelope,
3430 ctx: processing::Context {
3431 project_info: &project_info,
3432 ..processing::Context::for_test()
3433 },
3434 };
3435
3436 let Ok(Some(Submit::Envelope(mut new_envelope))) = processor.process(message).await else {
3437 panic!();
3438 };
3439 let new_envelope = new_envelope.envelope_mut();
3440
3441 let event_item = new_envelope.items().last().unwrap();
3442 let annotated_event: Annotated<Event> =
3443 Annotated::from_json_bytes(&event_item.payload()).unwrap();
3444 let event = annotated_event.into_value().unwrap();
3445 let headers = event
3446 .request
3447 .into_value()
3448 .unwrap()
3449 .headers
3450 .into_value()
3451 .unwrap();
3452
3453 assert_eq!(
3455 Some(
3456 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3457 ),
3458 headers.get_header("User-Agent")
3459 );
3460 let contexts = event.contexts.into_value().unwrap();
3462 let browser = contexts.0.get("browser").unwrap();
3463 assert_eq!(
3464 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3465 browser.to_json().unwrap()
3466 );
3467 }
3468
3469 #[tokio::test]
3470 #[cfg(feature = "processing")]
3471 async fn test_materialize_dsc() {
3472 use crate::services::projects::project::PublicKeyConfig;
3473
3474 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3475 .parse()
3476 .unwrap();
3477 let request_meta = RequestMeta::new(dsn);
3478 let mut envelope = Envelope::from_request(None, request_meta);
3479
3480 let dsc = r#"{
3481 "trace_id": "00000000-0000-0000-0000-000000000000",
3482 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3483 "sample_rate": "0.2"
3484 }"#;
3485 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3486
3487 let mut item = Item::new(ItemType::Event);
3488 item.set_payload(ContentType::Json, r#"{}"#);
3489 envelope.add_item(item);
3490
3491 let outcome_aggregator = Addr::dummy();
3492 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3493
3494 let mut project_info = ProjectInfo::default();
3495 project_info.public_keys.push(PublicKeyConfig {
3496 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3497 numeric_id: Some(1),
3498 });
3499
3500 let config = serde_json::json!({
3501 "processing": {
3502 "enabled": true,
3503 "kafka_config": [],
3504 }
3505 });
3506
3507 let message = ProcessEnvelopeGrouped {
3508 group: ProcessingGroup::Error,
3509 envelope: managed_envelope,
3510 ctx: processing::Context {
3511 config: &Config::from_json_value(config.clone()).unwrap(),
3512 project_info: &project_info,
3513 sampling_project_info: Some(&project_info),
3514 ..processing::Context::for_test()
3515 },
3516 };
3517
3518 let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3519 let Ok(Some(Submit::Envelope(envelope))) = processor.process(message).await else {
3520 panic!();
3521 };
3522 let event = envelope
3523 .envelope()
3524 .get_item_by(|item| item.ty() == &ItemType::Event)
3525 .unwrap();
3526
3527 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3528 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3529 Object(
3530 {
3531 "environment": ~,
3532 "public_key": String(
3533 "e12d836b15bb49d7bbf99e64295d995b",
3534 ),
3535 "release": ~,
3536 "replay_id": ~,
3537 "sample_rate": String(
3538 "0.2",
3539 ),
3540 "trace_id": String(
3541 "00000000000000000000000000000000",
3542 ),
3543 "transaction": ~,
3544 },
3545 )
3546 "###);
3547 }
3548
3549 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3550 let mut event = Annotated::<Event>::from_json(
3551 r#"
3552 {
3553 "type": "transaction",
3554 "transaction": "/foo/",
3555 "timestamp": 946684810.0,
3556 "start_timestamp": 946684800.0,
3557 "contexts": {
3558 "trace": {
3559 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3560 "span_id": "fa90fdead5f74053",
3561 "op": "http.server",
3562 "type": "trace"
3563 }
3564 },
3565 "transaction_info": {
3566 "source": "url"
3567 }
3568 }
3569 "#,
3570 )
3571 .unwrap();
3572 let e = event.value_mut().as_mut().unwrap();
3573 e.transaction.set_value(Some(transaction_name.into()));
3574
3575 e.transaction_info
3576 .value_mut()
3577 .as_mut()
3578 .unwrap()
3579 .source
3580 .set_value(Some(source));
3581
3582 relay_statsd::with_capturing_test_client(|| {
3583 utils::log_transaction_name_metrics(&mut event, |event| {
3584 let config = NormalizationConfig {
3585 transaction_name_config: TransactionNameConfig {
3586 rules: &[TransactionNameRule {
3587 pattern: LazyGlob::new("/foo/*/**".to_owned()),
3588 expiry: DateTime::<Utc>::MAX_UTC,
3589 redaction: RedactionRule::Replace {
3590 substitution: "*".to_owned(),
3591 },
3592 }],
3593 },
3594 ..Default::default()
3595 };
3596 relay_event_normalization::normalize_event(event, &config)
3597 });
3598 })
3599 }
3600
3601 #[test]
3602 fn test_log_transaction_metrics_none() {
3603 let captures = capture_test_event("/nothing", TransactionSource::Url);
3604 insta::assert_debug_snapshot!(captures, @r###"
3605 [
3606 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3607 ]
3608 "###);
3609 }
3610
3611 #[test]
3612 fn test_log_transaction_metrics_rule() {
3613 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3614 insta::assert_debug_snapshot!(captures, @r###"
3615 [
3616 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3617 ]
3618 "###);
3619 }
3620
3621 #[test]
3622 fn test_log_transaction_metrics_pattern() {
3623 let captures = capture_test_event("/something/12345", TransactionSource::Url);
3624 insta::assert_debug_snapshot!(captures, @r###"
3625 [
3626 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3627 ]
3628 "###);
3629 }
3630
3631 #[test]
3632 fn test_log_transaction_metrics_both() {
3633 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3634 insta::assert_debug_snapshot!(captures, @r###"
3635 [
3636 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3637 ]
3638 "###);
3639 }
3640
3641 #[test]
3642 fn test_log_transaction_metrics_no_match() {
3643 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3644 insta::assert_debug_snapshot!(captures, @r###"
3645 [
3646 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3647 ]
3648 "###);
3649 }
3650
3651 #[test]
3655 fn test_mri_overhead_constant() {
3656 let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
3657
3658 let derived_value = {
3659 let name = "foobar".to_owned();
3660 let value = 5.into(); let unit = MetricUnit::Duration(DurationUnit::default());
3662 let tags = TransactionMeasurementTags {
3663 measurement_rating: None,
3664 universal_tags: CommonTags(BTreeMap::new()),
3665 score_profile_version: None,
3666 };
3667
3668 let measurement = TransactionMetric::Measurement {
3669 name: name.clone(),
3670 value,
3671 unit,
3672 tags,
3673 };
3674
3675 let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
3676 metric.name.len() - unit.to_string().len() - name.len()
3677 };
3678 assert_eq!(
3679 hardcoded_value, derived_value,
3680 "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
3681 );
3682 }
3683
3684 #[tokio::test]
3685 async fn test_process_metrics_bucket_metadata() {
3686 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3687 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3688 let received_at = Utc::now();
3689 let config = Config::default();
3690
3691 let (aggregator, mut aggregator_rx) = Addr::custom();
3692 let processor = create_test_processor_with_addrs(
3693 config,
3694 Addrs {
3695 aggregator,
3696 ..Default::default()
3697 },
3698 )
3699 .await;
3700
3701 let mut item = Item::new(ItemType::Statsd);
3702 item.set_payload(
3703 ContentType::Text,
3704 "transactions/foo:3182887624:4267882815|s",
3705 );
3706 for (source, expected_received_at) in [
3707 (
3708 BucketSource::External,
3709 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3710 ),
3711 (BucketSource::Internal, None),
3712 ] {
3713 let message = ProcessMetrics {
3714 data: MetricData::Raw(vec![item.clone()]),
3715 project_key,
3716 source,
3717 received_at,
3718 sent_at: Some(Utc::now()),
3719 };
3720 processor.handle_process_metrics(&mut token, message);
3721
3722 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3723 let buckets = merge_buckets.buckets;
3724 assert_eq!(buckets.len(), 1);
3725 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3726 }
3727 }
3728
3729 #[tokio::test]
3730 async fn test_process_batched_metrics() {
3731 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3732 let received_at = Utc::now();
3733 let config = Config::default();
3734
3735 let (aggregator, mut aggregator_rx) = Addr::custom();
3736 let processor = create_test_processor_with_addrs(
3737 config,
3738 Addrs {
3739 aggregator,
3740 ..Default::default()
3741 },
3742 )
3743 .await;
3744
3745 let payload = r#"{
3746 "buckets": {
3747 "11111111111111111111111111111111": [
3748 {
3749 "timestamp": 1615889440,
3750 "width": 0,
3751 "name": "d:custom/endpoint.response_time@millisecond",
3752 "type": "d",
3753 "value": [
3754 68.0
3755 ],
3756 "tags": {
3757 "route": "user_index"
3758 }
3759 }
3760 ],
3761 "22222222222222222222222222222222": [
3762 {
3763 "timestamp": 1615889440,
3764 "width": 0,
3765 "name": "d:custom/endpoint.cache_rate@none",
3766 "type": "d",
3767 "value": [
3768 36.0
3769 ]
3770 }
3771 ]
3772 }
3773}
3774"#;
3775 let message = ProcessBatchedMetrics {
3776 payload: Bytes::from(payload),
3777 source: BucketSource::Internal,
3778 received_at,
3779 sent_at: Some(Utc::now()),
3780 };
3781 processor.handle_process_batched_metrics(&mut token, message);
3782
3783 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3784 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3785
3786 let mut messages = vec![mb1, mb2];
3787 messages.sort_by_key(|mb| mb.project_key);
3788
3789 let actual = messages
3790 .into_iter()
3791 .map(|mb| (mb.project_key, mb.buckets))
3792 .collect::<Vec<_>>();
3793
3794 assert_debug_snapshot!(actual, @r###"
3795 [
3796 (
3797 ProjectKey("11111111111111111111111111111111"),
3798 [
3799 Bucket {
3800 timestamp: UnixTimestamp(1615889440),
3801 width: 0,
3802 name: MetricName(
3803 "d:custom/endpoint.response_time@millisecond",
3804 ),
3805 value: Distribution(
3806 [
3807 68.0,
3808 ],
3809 ),
3810 tags: {
3811 "route": "user_index",
3812 },
3813 metadata: BucketMetadata {
3814 merges: 1,
3815 received_at: None,
3816 extracted_from_indexed: false,
3817 },
3818 },
3819 ],
3820 ),
3821 (
3822 ProjectKey("22222222222222222222222222222222"),
3823 [
3824 Bucket {
3825 timestamp: UnixTimestamp(1615889440),
3826 width: 0,
3827 name: MetricName(
3828 "d:custom/endpoint.cache_rate@none",
3829 ),
3830 value: Distribution(
3831 [
3832 36.0,
3833 ],
3834 ),
3835 tags: {},
3836 metadata: BucketMetadata {
3837 merges: 1,
3838 received_at: None,
3839 extracted_from_indexed: false,
3840 },
3841 },
3842 ],
3843 ),
3844 ]
3845 "###);
3846 }
3847}