relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27    ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42    self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::Integration;
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::ExtractedMetrics;
49use crate::metrics_extraction::transactions::types::ExtractMetricsError;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::profile_chunks::ProfileChunksProcessor;
53use crate::processing::replays::ReplaysProcessor;
54use crate::processing::sessions::SessionsProcessor;
55use crate::processing::spans::SpansProcessor;
56use crate::processing::trace_attachments::TraceAttachmentsProcessor;
57use crate::processing::trace_metrics::TraceMetricsProcessor;
58use crate::processing::transactions::TransactionProcessor;
59use crate::processing::utils::event::{
60    EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
61    event_type,
62};
63use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
64use crate::service::ServiceError;
65use crate::services::global_config::GlobalConfigHandle;
66use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
67use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
68use crate::services::projects::cache::ProjectCacheHandle;
69use crate::services::projects::project::{ProjectInfo, ProjectState};
70use crate::services::upstream::{
71    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
72};
73use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
74use crate::utils::{self, CheckLimits, EnvelopeLimiter};
75use crate::{http, processing};
76use relay_threading::AsyncPool;
77#[cfg(feature = "processing")]
78use {
79    crate::services::processor::nnswitch::SwitchProcessingError,
80    crate::services::store::{Store, StoreEnvelope},
81    crate::services::upload::Upload,
82    crate::utils::Enforcement,
83    itertools::Itertools,
84    relay_cardinality::{
85        CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
86        RedisSetLimiterOptions,
87    },
88    relay_dynamic_config::CardinalityLimiterMode,
89    relay_quotas::{RateLimitingError, RedisRateLimiter},
90    relay_redis::RedisClients,
91    std::time::Instant,
92    symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
93};
94
95mod attachment;
96mod dynamic_sampling;
97mod event;
98mod metrics;
99mod nel;
100mod profile;
101mod replay;
102mod report;
103mod span;
104
105#[cfg(all(sentry, feature = "processing"))]
106mod playstation;
107mod standalone;
108#[cfg(feature = "processing")]
109mod unreal;
110
111#[cfg(feature = "processing")]
112mod nnswitch;
113
114/// Creates the block only if used with `processing` feature.
115///
116/// Provided code block will be executed only if the provided config has `processing_enabled` set.
117macro_rules! if_processing {
118    ($config:expr, $if_true:block) => {
119        #[cfg(feature = "processing")] {
120            if $config.processing_enabled() $if_true
121        }
122    };
123    ($config:expr, $if_true:block else $if_false:block) => {
124        {
125            #[cfg(feature = "processing")] {
126                if $config.processing_enabled() $if_true else $if_false
127            }
128            #[cfg(not(feature = "processing"))] {
129                $if_false
130            }
131        }
132    };
133}
134
135/// The minimum clock drift for correction to apply.
136pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
137
138#[derive(Debug)]
139pub struct GroupTypeError;
140
141impl Display for GroupTypeError {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        f.write_str("failed to convert processing group into corresponding type")
144    }
145}
146
147impl std::error::Error for GroupTypeError {}
148
149macro_rules! processing_group {
150    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
151        #[derive(Clone, Copy, Debug)]
152        pub struct $ty;
153
154        impl From<$ty> for ProcessingGroup {
155            fn from(_: $ty) -> Self {
156                ProcessingGroup::$variant
157            }
158        }
159
160        impl TryFrom<ProcessingGroup> for $ty {
161            type Error = GroupTypeError;
162
163            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
164                if matches!(value, ProcessingGroup::$variant) {
165                    return Ok($ty);
166                }
167                $($(
168                    if matches!(value, ProcessingGroup::$other) {
169                        return Ok($ty);
170                    }
171                )+)?
172                return Err(GroupTypeError);
173            }
174        }
175    };
176}
177
178/// A marker trait.
179///
180/// Should be used only with groups which are responsible for processing envelopes with events.
181pub trait EventProcessing {}
182
183processing_group!(TransactionGroup, Transaction);
184impl EventProcessing for TransactionGroup {}
185
186processing_group!(ErrorGroup, Error);
187impl EventProcessing for ErrorGroup {}
188
189processing_group!(SessionGroup, Session);
190processing_group!(StandaloneGroup, Standalone);
191processing_group!(ClientReportGroup, ClientReport);
192processing_group!(ReplayGroup, Replay);
193processing_group!(CheckInGroup, CheckIn);
194processing_group!(LogGroup, Log, Nel);
195processing_group!(TraceMetricGroup, TraceMetric);
196processing_group!(SpanGroup, Span);
197
198processing_group!(ProfileChunkGroup, ProfileChunk);
199processing_group!(MetricsGroup, Metrics);
200processing_group!(ForwardUnknownGroup, ForwardUnknown);
201processing_group!(Ungrouped, Ungrouped);
202
203/// Processed group type marker.
204///
205/// Marks the envelopes which passed through the processing pipeline.
206#[derive(Clone, Copy, Debug)]
207pub struct Processed;
208
209/// Describes the groups of the processable items.
210#[derive(Clone, Copy, Debug)]
211pub enum ProcessingGroup {
212    /// All the transaction related items.
213    ///
214    /// Includes transactions, related attachments, profiles.
215    Transaction,
216    /// All the items which require (have or create) events.
217    ///
218    /// This includes: errors, NEL, security reports, user reports, some of the
219    /// attachments.
220    Error,
221    /// Session events.
222    Session,
223    /// Standalone items which can be sent alone without any event attached to it in the current
224    /// envelope e.g. some attachments, user reports.
225    Standalone,
226    /// Outcomes.
227    ClientReport,
228    /// Replays and ReplayRecordings.
229    Replay,
230    /// Crons.
231    CheckIn,
232    /// NEL reports.
233    Nel,
234    /// Logs.
235    Log,
236    /// Trace metrics.
237    TraceMetric,
238    /// Spans.
239    Span,
240    /// Span V2 spans.
241    SpanV2,
242    /// Metrics.
243    Metrics,
244    /// ProfileChunk.
245    ProfileChunk,
246    /// V2 attachments without span / log association.
247    TraceAttachment,
248    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
249    /// decide what to do with them.
250    ForwardUnknown,
251    /// All the items in the envelope that could not be grouped.
252    Ungrouped,
253}
254
255impl ProcessingGroup {
256    /// Splits provided envelope into list of tuples of groups with associated envelopes.
257    fn split_envelope(
258        mut envelope: Envelope,
259        project_info: &ProjectInfo,
260    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
261        let headers = envelope.headers().clone();
262        let mut grouped_envelopes = smallvec![];
263
264        // Extract replays.
265        let replay_items = envelope.take_items_by(|item| {
266            matches!(
267                item.ty(),
268                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
269            )
270        });
271        if !replay_items.is_empty() {
272            grouped_envelopes.push((
273                ProcessingGroup::Replay,
274                Envelope::from_parts(headers.clone(), replay_items),
275            ))
276        }
277
278        // Keep all the sessions together in one envelope.
279        let session_items = envelope
280            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
281        if !session_items.is_empty() {
282            grouped_envelopes.push((
283                ProcessingGroup::Session,
284                Envelope::from_parts(headers.clone(), session_items),
285            ))
286        }
287
288        let span_v2_items = envelope.take_items_by(|item| {
289            let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
290
291            ItemContainer::<SpanV2>::is_container(item)
292                || matches!(item.integration(), Some(Integration::Spans(_)))
293                // Process standalone spans (v1) via the v2 pipeline
294                || (exp_feature && matches!(item.ty(), &ItemType::Span))
295                || (exp_feature && item.is_span_attachment())
296        });
297
298        if !span_v2_items.is_empty() {
299            grouped_envelopes.push((
300                ProcessingGroup::SpanV2,
301                Envelope::from_parts(headers.clone(), span_v2_items),
302            ))
303        }
304
305        // Extract spans.
306        let span_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Span));
307        if !span_items.is_empty() {
308            grouped_envelopes.push((
309                ProcessingGroup::Span,
310                Envelope::from_parts(headers.clone(), span_items),
311            ))
312        }
313
314        // Extract logs.
315        let logs_items = envelope.take_items_by(|item| {
316            matches!(item.ty(), &ItemType::Log)
317                || matches!(item.integration(), Some(Integration::Logs(_)))
318        });
319        if !logs_items.is_empty() {
320            grouped_envelopes.push((
321                ProcessingGroup::Log,
322                Envelope::from_parts(headers.clone(), logs_items),
323            ))
324        }
325
326        // Extract trace metrics.
327        let trace_metric_items =
328            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
329        if !trace_metric_items.is_empty() {
330            grouped_envelopes.push((
331                ProcessingGroup::TraceMetric,
332                Envelope::from_parts(headers.clone(), trace_metric_items),
333            ))
334        }
335
336        // NEL items are transformed into logs in their own processing step.
337        let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
338        if !nel_items.is_empty() {
339            grouped_envelopes.push((
340                ProcessingGroup::Nel,
341                Envelope::from_parts(headers.clone(), nel_items),
342            ))
343        }
344
345        // Extract all metric items.
346        //
347        // Note: Should only be relevant in proxy mode. In other modes we send metrics through
348        // a separate pipeline.
349        let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
350        if !metric_items.is_empty() {
351            grouped_envelopes.push((
352                ProcessingGroup::Metrics,
353                Envelope::from_parts(headers.clone(), metric_items),
354            ))
355        }
356
357        // Extract profile chunks.
358        let profile_chunk_items =
359            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
360        if !profile_chunk_items.is_empty() {
361            grouped_envelopes.push((
362                ProcessingGroup::ProfileChunk,
363                Envelope::from_parts(headers.clone(), profile_chunk_items),
364            ))
365        }
366
367        let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
368        if !trace_attachment_items.is_empty() {
369            grouped_envelopes.push((
370                ProcessingGroup::TraceAttachment,
371                Envelope::from_parts(headers.clone(), trace_attachment_items),
372            ))
373        }
374
375        // Extract all standalone items.
376        //
377        // Note: only if there are no items in the envelope which can create events, otherwise they
378        // will be in the same envelope with all require event items.
379        if !envelope.items().any(Item::creates_event) {
380            let standalone_items = envelope.take_items_by(Item::requires_event);
381            if !standalone_items.is_empty() {
382                grouped_envelopes.push((
383                    ProcessingGroup::Standalone,
384                    Envelope::from_parts(headers.clone(), standalone_items),
385                ))
386            }
387        };
388
389        // Make sure we create separate envelopes for each `RawSecurity` report.
390        let security_reports_items = envelope
391            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
392            .into_iter()
393            .map(|item| {
394                let headers = headers.clone();
395                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
396                let mut envelope = Envelope::from_parts(headers, items);
397                envelope.set_event_id(EventId::new());
398                (ProcessingGroup::Error, envelope)
399            });
400        grouped_envelopes.extend(security_reports_items);
401
402        // Extract all the items which require an event into separate envelope.
403        let require_event_items = envelope.take_items_by(Item::requires_event);
404        if !require_event_items.is_empty() {
405            let group = if require_event_items
406                .iter()
407                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
408            {
409                ProcessingGroup::Transaction
410            } else {
411                ProcessingGroup::Error
412            };
413
414            grouped_envelopes.push((
415                group,
416                Envelope::from_parts(headers.clone(), require_event_items),
417            ))
418        }
419
420        // Get the rest of the envelopes, one per item.
421        let envelopes = envelope.items_mut().map(|item| {
422            let headers = headers.clone();
423            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
424            let envelope = Envelope::from_parts(headers, items);
425            let item_type = item.ty();
426            let group = if matches!(item_type, &ItemType::CheckIn) {
427                ProcessingGroup::CheckIn
428            } else if matches!(item.ty(), &ItemType::ClientReport) {
429                ProcessingGroup::ClientReport
430            } else if matches!(item_type, &ItemType::Unknown(_)) {
431                ProcessingGroup::ForwardUnknown
432            } else {
433                // Cannot group this item type.
434                ProcessingGroup::Ungrouped
435            };
436
437            (group, envelope)
438        });
439        grouped_envelopes.extend(envelopes);
440
441        grouped_envelopes
442    }
443
444    /// Returns the name of the group.
445    pub fn variant(&self) -> &'static str {
446        match self {
447            ProcessingGroup::Transaction => "transaction",
448            ProcessingGroup::Error => "error",
449            ProcessingGroup::Session => "session",
450            ProcessingGroup::Standalone => "standalone",
451            ProcessingGroup::ClientReport => "client_report",
452            ProcessingGroup::Replay => "replay",
453            ProcessingGroup::CheckIn => "check_in",
454            ProcessingGroup::Log => "log",
455            ProcessingGroup::TraceMetric => "trace_metric",
456            ProcessingGroup::Nel => "nel",
457            ProcessingGroup::Span => "span",
458            ProcessingGroup::SpanV2 => "span_v2",
459            ProcessingGroup::Metrics => "metrics",
460            ProcessingGroup::ProfileChunk => "profile_chunk",
461            ProcessingGroup::TraceAttachment => "trace_attachment",
462            ProcessingGroup::ForwardUnknown => "forward_unknown",
463            ProcessingGroup::Ungrouped => "ungrouped",
464        }
465    }
466}
467
468impl From<ProcessingGroup> for AppFeature {
469    fn from(value: ProcessingGroup) -> Self {
470        match value {
471            ProcessingGroup::Transaction => AppFeature::Transactions,
472            ProcessingGroup::Error => AppFeature::Errors,
473            ProcessingGroup::Session => AppFeature::Sessions,
474            ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
475            ProcessingGroup::ClientReport => AppFeature::ClientReports,
476            ProcessingGroup::Replay => AppFeature::Replays,
477            ProcessingGroup::CheckIn => AppFeature::CheckIns,
478            ProcessingGroup::Log => AppFeature::Logs,
479            ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
480            ProcessingGroup::Nel => AppFeature::Logs,
481            ProcessingGroup::Span => AppFeature::Spans,
482            ProcessingGroup::SpanV2 => AppFeature::Spans,
483            ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
484            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
485            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
486            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
487            ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
488        }
489    }
490}
491
492/// An error returned when handling [`ProcessEnvelope`].
493#[derive(Debug, thiserror::Error)]
494pub enum ProcessingError {
495    #[error("invalid json in event")]
496    InvalidJson(#[source] serde_json::Error),
497
498    #[error("invalid message pack event payload")]
499    InvalidMsgpack(#[from] rmp_serde::decode::Error),
500
501    #[cfg(feature = "processing")]
502    #[error("invalid unreal crash report")]
503    InvalidUnrealReport(#[source] Unreal4Error),
504
505    #[error("event payload too large")]
506    PayloadTooLarge(DiscardItemType),
507
508    #[error("invalid transaction event")]
509    InvalidTransaction,
510
511    #[error("envelope processor failed")]
512    ProcessingFailed(#[from] ProcessingAction),
513
514    #[error("duplicate {0} in event")]
515    DuplicateItem(ItemType),
516
517    #[error("failed to extract event payload")]
518    NoEventPayload,
519
520    #[error("missing project id in DSN")]
521    MissingProjectId,
522
523    #[error("invalid security report type: {0:?}")]
524    InvalidSecurityType(Bytes),
525
526    #[error("unsupported security report type")]
527    UnsupportedSecurityType,
528
529    #[error("invalid security report")]
530    InvalidSecurityReport(#[source] serde_json::Error),
531
532    #[error("invalid nel report")]
533    InvalidNelReport(#[source] NetworkReportError),
534
535    #[error("event filtered with reason: {0:?}")]
536    EventFiltered(FilterStatKey),
537
538    #[error("missing or invalid required event timestamp")]
539    InvalidTimestamp,
540
541    #[error("could not serialize event payload")]
542    SerializeFailed(#[source] serde_json::Error),
543
544    #[cfg(feature = "processing")]
545    #[error("failed to apply quotas")]
546    QuotasFailed(#[from] RateLimitingError),
547
548    #[error("invalid pii config")]
549    PiiConfigError(PiiConfigError),
550
551    #[error("invalid processing group type")]
552    InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
553
554    #[error("invalid replay")]
555    InvalidReplay(DiscardReason),
556
557    #[error("replay filtered with reason: {0:?}")]
558    ReplayFiltered(FilterStatKey),
559
560    #[cfg(feature = "processing")]
561    #[error("nintendo switch dying message processing failed {0:?}")]
562    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
563
564    #[cfg(all(sentry, feature = "processing"))]
565    #[error("playstation dump processing failed: {0}")]
566    InvalidPlaystationDump(String),
567
568    #[error("processing group does not match specific processor")]
569    ProcessingGroupMismatch,
570    #[error("new processing pipeline failed")]
571    ProcessingFailure,
572}
573
574impl ProcessingError {
575    pub fn to_outcome(&self) -> Option<Outcome> {
576        match self {
577            Self::PayloadTooLarge(payload_type) => {
578                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
579            }
580            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
581            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
582            Self::InvalidSecurityType(_) => {
583                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
584            }
585            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
586            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
587            Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
588            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
589            Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
590            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
591            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
592            #[cfg(feature = "processing")]
593            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
594            #[cfg(all(sentry, feature = "processing"))]
595            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
596            #[cfg(feature = "processing")]
597            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
598                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
599            }
600            #[cfg(feature = "processing")]
601            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
602            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
603                Some(Outcome::Invalid(DiscardReason::Internal))
604            }
605            #[cfg(feature = "processing")]
606            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
607            Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
608            Self::MissingProjectId => None,
609            Self::EventFiltered(_) => None,
610            Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
611            Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
612            Self::InvalidProcessingGroup(_) => None,
613
614            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
615            // Outcomes are emitted in the new processing pipeline already.
616            Self::ProcessingFailure => None,
617        }
618    }
619
620    fn is_unexpected(&self) -> bool {
621        self.to_outcome()
622            .is_some_and(|outcome| outcome.is_unexpected())
623    }
624}
625
626#[cfg(feature = "processing")]
627impl From<Unreal4Error> for ProcessingError {
628    fn from(err: Unreal4Error) -> Self {
629        match err.kind() {
630            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
631            _ => ProcessingError::InvalidUnrealReport(err),
632        }
633    }
634}
635
636impl From<ExtractMetricsError> for ProcessingError {
637    fn from(error: ExtractMetricsError) -> Self {
638        match error {
639            ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
640                Self::InvalidTimestamp
641            }
642        }
643    }
644}
645
646impl From<InvalidProcessingGroupType> for ProcessingError {
647    fn from(value: InvalidProcessingGroupType) -> Self {
648        Self::InvalidProcessingGroup(Box::new(value))
649    }
650}
651
652type ExtractedEvent = (Annotated<Event>, usize);
653
654/// A container for extracted metrics during processing.
655///
656/// The container enforces that the extracted metrics are correctly tagged
657/// with the dynamic sampling decision.
658#[derive(Debug)]
659pub struct ProcessingExtractedMetrics {
660    metrics: ExtractedMetrics,
661}
662
663impl ProcessingExtractedMetrics {
664    pub fn new() -> Self {
665        Self {
666            metrics: ExtractedMetrics::default(),
667        }
668    }
669
670    pub fn into_inner(self) -> ExtractedMetrics {
671        self.metrics
672    }
673
674    /// Extends the contained metrics with [`ExtractedMetrics`].
675    pub fn extend(
676        &mut self,
677        extracted: ExtractedMetrics,
678        sampling_decision: Option<SamplingDecision>,
679    ) {
680        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
681        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
682    }
683
684    /// Extends the contained project metrics.
685    pub fn extend_project_metrics<I>(
686        &mut self,
687        buckets: I,
688        sampling_decision: Option<SamplingDecision>,
689    ) where
690        I: IntoIterator<Item = Bucket>,
691    {
692        self.metrics
693            .project_metrics
694            .extend(buckets.into_iter().map(|mut bucket| {
695                bucket.metadata.extracted_from_indexed =
696                    sampling_decision == Some(SamplingDecision::Keep);
697                bucket
698            }));
699    }
700
701    /// Extends the contained sampling metrics.
702    pub fn extend_sampling_metrics<I>(
703        &mut self,
704        buckets: I,
705        sampling_decision: Option<SamplingDecision>,
706    ) where
707        I: IntoIterator<Item = Bucket>,
708    {
709        self.metrics
710            .sampling_metrics
711            .extend(buckets.into_iter().map(|mut bucket| {
712                bucket.metadata.extracted_from_indexed =
713                    sampling_decision == Some(SamplingDecision::Keep);
714                bucket
715            }));
716    }
717
718    /// Applies rate limits to the contained metrics.
719    ///
720    /// This is used to apply rate limits which have been enforced on sampled items of an envelope
721    /// to also consistently apply to the metrics extracted from these items.
722    #[cfg(feature = "processing")]
723    fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
724        // Metric namespaces which need to be dropped.
725        let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
726        // Metrics belonging to this metric namespace need to have the `extracted_from_indexed`
727        // flag reset to `false`.
728        let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
729
730        for (namespace, limit, indexed) in [
731            (
732                MetricNamespace::Transactions,
733                &enforcement.event,
734                &enforcement.event_indexed,
735            ),
736            (
737                MetricNamespace::Spans,
738                &enforcement.spans,
739                &enforcement.spans_indexed,
740            ),
741        ] {
742            if limit.is_active() {
743                drop_namespaces.push(namespace);
744            } else if indexed.is_active() && !enforced_consistently {
745                // If the enforcement was not computed by consistently checking the limits,
746                // the quota for the metrics has not yet been incremented.
747                // In this case we have a dropped indexed payload but a metric which still needs to
748                // be accounted for, make sure the metric will still be rate limited.
749                reset_extracted_from_indexed.push(namespace);
750            }
751        }
752
753        if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
754            self.retain_mut(|bucket| {
755                let Some(namespace) = bucket.name.try_namespace() else {
756                    return true;
757                };
758
759                if drop_namespaces.contains(&namespace) {
760                    return false;
761                }
762
763                if reset_extracted_from_indexed.contains(&namespace) {
764                    bucket.metadata.extracted_from_indexed = false;
765                }
766
767                true
768            });
769        }
770    }
771
772    #[cfg(feature = "processing")]
773    fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
774        self.metrics.project_metrics.retain_mut(&mut f);
775        self.metrics.sampling_metrics.retain_mut(&mut f);
776    }
777}
778
779fn send_metrics(
780    metrics: ExtractedMetrics,
781    project_key: ProjectKey,
782    sampling_key: Option<ProjectKey>,
783    aggregator: &Addr<Aggregator>,
784) {
785    let ExtractedMetrics {
786        project_metrics,
787        sampling_metrics,
788    } = metrics;
789
790    if !project_metrics.is_empty() {
791        aggregator.send(MergeBuckets {
792            project_key,
793            buckets: project_metrics,
794        });
795    }
796
797    if !sampling_metrics.is_empty() {
798        // If no sampling project state is available, we associate the sampling
799        // metrics with the current project.
800        //
801        // project_without_tracing         -> metrics goes to self
802        // dependent_project_with_tracing  -> metrics goes to root
803        // root_project_with_tracing       -> metrics goes to root == self
804        let sampling_project_key = sampling_key.unwrap_or(project_key);
805        aggregator.send(MergeBuckets {
806            project_key: sampling_project_key,
807            buckets: sampling_metrics,
808        });
809    }
810}
811
812/// Function for on-off switches that filter specific item types (profiles, spans)
813/// based on a feature flag.
814///
815/// If the project config did not come from the upstream, we keep the items.
816fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
817    match config.relay_mode() {
818        RelayMode::Proxy => false,
819        RelayMode::Managed => !project_info.has_feature(feature),
820    }
821}
822
823/// The result of the envelope processing containing the processed envelope along with the partial
824/// result.
825#[derive(Debug)]
826#[expect(
827    clippy::large_enum_variant,
828    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
829)]
830enum ProcessingResult {
831    Envelope {
832        managed_envelope: TypedEnvelope<Processed>,
833        extracted_metrics: ProcessingExtractedMetrics,
834    },
835    Output(Output<Outputs>),
836}
837
838impl ProcessingResult {
839    /// Creates a [`ProcessingResult`] with no metrics.
840    fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
841        Self::Envelope {
842            managed_envelope,
843            extracted_metrics: ProcessingExtractedMetrics::new(),
844        }
845    }
846}
847
848/// All items which can be submitted upstream.
849#[derive(Debug)]
850#[expect(
851    clippy::large_enum_variant,
852    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
853)]
854enum Submit<'a> {
855    /// A processed envelope.
856    Envelope(TypedEnvelope<Processed>),
857    /// The output of a [`processing::Processor`].
858    Output {
859        output: Outputs,
860        ctx: processing::ForwardContext<'a>,
861    },
862}
863
864/// Applies processing to all contents of the given envelope.
865///
866/// Depending on the contents of the envelope and Relay's mode, this includes:
867///
868///  - Basic normalization and validation for all item types.
869///  - Clock drift correction if the required `sent_at` header is present.
870///  - Expansion of certain item types (e.g. unreal).
871///  - Store normalization for event payloads in processing mode.
872///  - Rate limiters and inbound filters on events in processing mode.
873#[derive(Debug)]
874pub struct ProcessEnvelope {
875    /// Envelope to process.
876    pub envelope: ManagedEnvelope,
877    /// The project info.
878    pub project_info: Arc<ProjectInfo>,
879    /// Currently active cached rate limits for this project.
880    pub rate_limits: Arc<RateLimits>,
881    /// Root sampling project info.
882    pub sampling_project_info: Option<Arc<ProjectInfo>>,
883    /// Sampling reservoir counters.
884    pub reservoir_counters: ReservoirCounters,
885}
886
887/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
888#[derive(Debug)]
889struct ProcessEnvelopeGrouped<'a> {
890    /// The group the envelope belongs to.
891    pub group: ProcessingGroup,
892    /// Envelope to process.
893    pub envelope: ManagedEnvelope,
894    /// The processing context.
895    pub ctx: processing::Context<'a>,
896}
897
898/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
899///
900/// This parses and validates the metrics:
901///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
902///    ignored independently.
903///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
904///    dropped together on parsing failure.
905///  - Other envelope items will be ignored with an error message.
906///
907/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
908/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
909#[derive(Debug)]
910pub struct ProcessMetrics {
911    /// A list of metric items.
912    pub data: MetricData,
913    /// The target project.
914    pub project_key: ProjectKey,
915    /// Whether to keep or reset the metric metadata.
916    pub source: BucketSource,
917    /// The wall clock time at which the request was received.
918    pub received_at: DateTime<Utc>,
919    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
920    /// correction.
921    pub sent_at: Option<DateTime<Utc>>,
922}
923
924/// Raw unparsed metric data.
925#[derive(Debug)]
926pub enum MetricData {
927    /// Raw data, unparsed envelope items.
928    Raw(Vec<Item>),
929    /// Already parsed buckets but unprocessed.
930    Parsed(Vec<Bucket>),
931}
932
933impl MetricData {
934    /// Consumes the metric data and parses the contained buckets.
935    ///
936    /// If the contained data is already parsed the buckets are returned unchanged.
937    /// Raw buckets are parsed and created with the passed `timestamp`.
938    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
939        let items = match self {
940            Self::Parsed(buckets) => return buckets,
941            Self::Raw(items) => items,
942        };
943
944        let mut buckets = Vec::new();
945        for item in items {
946            let payload = item.payload();
947            if item.ty() == &ItemType::Statsd {
948                for bucket_result in Bucket::parse_all(&payload, timestamp) {
949                    match bucket_result {
950                        Ok(bucket) => buckets.push(bucket),
951                        Err(error) => relay_log::debug!(
952                            error = &error as &dyn Error,
953                            "failed to parse metric bucket from statsd format",
954                        ),
955                    }
956                }
957            } else if item.ty() == &ItemType::MetricBuckets {
958                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
959                    Ok(parsed_buckets) => {
960                        // Re-use the allocation of `b` if possible.
961                        if buckets.is_empty() {
962                            buckets = parsed_buckets;
963                        } else {
964                            buckets.extend(parsed_buckets);
965                        }
966                    }
967                    Err(error) => {
968                        relay_log::debug!(
969                            error = &error as &dyn Error,
970                            "failed to parse metric bucket",
971                        );
972                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
973                    }
974                }
975            } else {
976                relay_log::error!(
977                    "invalid item of type {} passed to ProcessMetrics",
978                    item.ty()
979                );
980            }
981        }
982        buckets
983    }
984}
985
986#[derive(Debug)]
987pub struct ProcessBatchedMetrics {
988    /// Metrics payload in JSON format.
989    pub payload: Bytes,
990    /// Whether to keep or reset the metric metadata.
991    pub source: BucketSource,
992    /// The wall clock time at which the request was received.
993    pub received_at: DateTime<Utc>,
994    /// The wall clock time at which the request was received.
995    pub sent_at: Option<DateTime<Utc>>,
996}
997
998/// Source information where a metric bucket originates from.
999#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1000pub enum BucketSource {
1001    /// The metric bucket originated from an internal Relay use case.
1002    ///
1003    /// The metric bucket originates either from within the same Relay
1004    /// or was accepted coming from another Relay which is registered as
1005    /// an internal Relay via Relay's configuration.
1006    Internal,
1007    /// The bucket source originated from an untrusted source.
1008    ///
1009    /// Managed Relays sending extracted metrics are considered external,
1010    /// it's a project use case but it comes from an untrusted source.
1011    External,
1012}
1013
1014impl BucketSource {
1015    /// Infers the bucket source from [`RequestMeta::request_trust`].
1016    pub fn from_meta(meta: &RequestMeta) -> Self {
1017        match meta.request_trust() {
1018            RequestTrust::Trusted => Self::Internal,
1019            RequestTrust::Untrusted => Self::External,
1020        }
1021    }
1022}
1023
1024/// Sends a client report to the upstream.
1025#[derive(Debug)]
1026pub struct SubmitClientReports {
1027    /// The client report to be sent.
1028    pub client_reports: Vec<ClientReport>,
1029    /// Scoping information for the client report.
1030    pub scoping: Scoping,
1031}
1032
1033/// CPU-intensive processing tasks for envelopes.
1034#[derive(Debug)]
1035pub enum EnvelopeProcessor {
1036    ProcessEnvelope(Box<ProcessEnvelope>),
1037    ProcessProjectMetrics(Box<ProcessMetrics>),
1038    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1039    FlushBuckets(Box<FlushBuckets>),
1040    SubmitClientReports(Box<SubmitClientReports>),
1041}
1042
1043impl EnvelopeProcessor {
1044    /// Returns the name of the message variant.
1045    pub fn variant(&self) -> &'static str {
1046        match self {
1047            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1048            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1049            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1050            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1051            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1052        }
1053    }
1054}
1055
1056impl relay_system::Interface for EnvelopeProcessor {}
1057
1058impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1059    type Response = relay_system::NoResponse;
1060
1061    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1062        Self::ProcessEnvelope(Box::new(message))
1063    }
1064}
1065
1066impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1067    type Response = NoResponse;
1068
1069    fn from_message(message: ProcessMetrics, _: ()) -> Self {
1070        Self::ProcessProjectMetrics(Box::new(message))
1071    }
1072}
1073
1074impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1075    type Response = NoResponse;
1076
1077    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1078        Self::ProcessBatchedMetrics(Box::new(message))
1079    }
1080}
1081
1082impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1083    type Response = NoResponse;
1084
1085    fn from_message(message: FlushBuckets, _: ()) -> Self {
1086        Self::FlushBuckets(Box::new(message))
1087    }
1088}
1089
1090impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1091    type Response = NoResponse;
1092
1093    fn from_message(message: SubmitClientReports, _: ()) -> Self {
1094        Self::SubmitClientReports(Box::new(message))
1095    }
1096}
1097
1098/// The asynchronous thread pool used for scheduling processing tasks in the processor.
1099pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1100
1101/// Service implementing the [`EnvelopeProcessor`] interface.
1102///
1103/// This service handles messages in a worker pool with configurable concurrency.
1104#[derive(Clone)]
1105pub struct EnvelopeProcessorService {
1106    inner: Arc<InnerProcessor>,
1107}
1108
1109/// Contains the addresses of services that the processor publishes to.
1110pub struct Addrs {
1111    pub outcome_aggregator: Addr<TrackOutcome>,
1112    pub upstream_relay: Addr<UpstreamRelay>,
1113    #[cfg(feature = "processing")]
1114    pub upload: Option<Addr<Upload>>,
1115    #[cfg(feature = "processing")]
1116    pub store_forwarder: Option<Addr<Store>>,
1117    pub aggregator: Addr<Aggregator>,
1118}
1119
1120impl Default for Addrs {
1121    fn default() -> Self {
1122        Addrs {
1123            outcome_aggregator: Addr::dummy(),
1124            upstream_relay: Addr::dummy(),
1125            #[cfg(feature = "processing")]
1126            upload: None,
1127            #[cfg(feature = "processing")]
1128            store_forwarder: None,
1129            aggregator: Addr::dummy(),
1130        }
1131    }
1132}
1133
1134struct InnerProcessor {
1135    pool: EnvelopeProcessorServicePool,
1136    config: Arc<Config>,
1137    global_config: GlobalConfigHandle,
1138    project_cache: ProjectCacheHandle,
1139    cogs: Cogs,
1140    addrs: Addrs,
1141    #[cfg(feature = "processing")]
1142    rate_limiter: Option<Arc<RedisRateLimiter>>,
1143    geoip_lookup: GeoIpLookup,
1144    #[cfg(feature = "processing")]
1145    cardinality_limiter: Option<CardinalityLimiter>,
1146    metric_outcomes: MetricOutcomes,
1147    processing: Processing,
1148}
1149
1150struct Processing {
1151    logs: LogsProcessor,
1152    trace_metrics: TraceMetricsProcessor,
1153    spans: SpansProcessor,
1154    check_ins: CheckInsProcessor,
1155    sessions: SessionsProcessor,
1156    transactions: TransactionProcessor,
1157    profile_chunks: ProfileChunksProcessor,
1158    trace_attachments: TraceAttachmentsProcessor,
1159    replays: ReplaysProcessor,
1160}
1161
1162impl EnvelopeProcessorService {
1163    /// Creates a multi-threaded envelope processor.
1164    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1165    pub fn new(
1166        pool: EnvelopeProcessorServicePool,
1167        config: Arc<Config>,
1168        global_config: GlobalConfigHandle,
1169        project_cache: ProjectCacheHandle,
1170        cogs: Cogs,
1171        #[cfg(feature = "processing")] redis: Option<RedisClients>,
1172        addrs: Addrs,
1173        metric_outcomes: MetricOutcomes,
1174    ) -> Self {
1175        let geoip_lookup = config
1176            .geoip_path()
1177            .and_then(
1178                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1179                    Ok(geoip) => Some(geoip),
1180                    Err(err) => {
1181                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1182                        None
1183                    }
1184                },
1185            )
1186            .unwrap_or_else(GeoIpLookup::empty);
1187
1188        #[cfg(feature = "processing")]
1189        let (cardinality, quotas) = match redis {
1190            Some(RedisClients {
1191                cardinality,
1192                quotas,
1193                ..
1194            }) => (Some(cardinality), Some(quotas)),
1195            None => (None, None),
1196        };
1197        #[cfg(not(feature = "processing"))]
1198        let quotas = None;
1199
1200        #[cfg(feature = "processing")]
1201        let rate_limiter = quotas.clone().map(|redis| {
1202            RedisRateLimiter::new(redis)
1203                .max_limit(config.max_rate_limit())
1204                .cache(config.quota_cache_ratio(), config.quota_cache_max())
1205        });
1206
1207        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1208            #[cfg(feature = "processing")]
1209            project_cache.clone(),
1210            #[cfg(feature = "processing")]
1211            rate_limiter.clone(),
1212        ));
1213        #[cfg(feature = "processing")]
1214        let rate_limiter = rate_limiter.map(Arc::new);
1215
1216        let inner = InnerProcessor {
1217            pool,
1218            global_config,
1219            project_cache,
1220            cogs,
1221            #[cfg(feature = "processing")]
1222            rate_limiter,
1223            addrs,
1224            #[cfg(feature = "processing")]
1225            cardinality_limiter: cardinality
1226                .map(|cardinality| {
1227                    RedisSetLimiter::new(
1228                        RedisSetLimiterOptions {
1229                            cache_vacuum_interval: config
1230                                .cardinality_limiter_cache_vacuum_interval(),
1231                        },
1232                        cardinality,
1233                    )
1234                })
1235                .map(CardinalityLimiter::new),
1236            metric_outcomes,
1237            processing: Processing {
1238                logs: LogsProcessor::new(Arc::clone(&quota_limiter)),
1239                trace_metrics: TraceMetricsProcessor::new(Arc::clone(&quota_limiter)),
1240                spans: SpansProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1241                check_ins: CheckInsProcessor::new(Arc::clone(&quota_limiter)),
1242                sessions: SessionsProcessor::new(Arc::clone(&quota_limiter)),
1243                transactions: TransactionProcessor::new(
1244                    Arc::clone(&quota_limiter),
1245                    geoip_lookup.clone(),
1246                    quotas.clone(),
1247                ),
1248                profile_chunks: ProfileChunksProcessor::new(Arc::clone(&quota_limiter)),
1249                trace_attachments: TraceAttachmentsProcessor::new(Arc::clone(&quota_limiter)),
1250                replays: ReplaysProcessor::new(quota_limiter, geoip_lookup.clone()),
1251            },
1252            geoip_lookup,
1253            config,
1254        };
1255
1256        Self {
1257            inner: Arc::new(inner),
1258        }
1259    }
1260
1261    async fn enforce_quotas<Group>(
1262        &self,
1263        managed_envelope: &mut TypedEnvelope<Group>,
1264        event: Annotated<Event>,
1265        extracted_metrics: &mut ProcessingExtractedMetrics,
1266        ctx: processing::Context<'_>,
1267    ) -> Result<Annotated<Event>, ProcessingError> {
1268        // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not
1269        // applied in the fast path, all cached quotas can be applied here.
1270        let cached_result = RateLimiter::Cached
1271            .enforce(managed_envelope, event, extracted_metrics, ctx)
1272            .await?;
1273
1274        if_processing!(self.inner.config, {
1275            let rate_limiter = match self.inner.rate_limiter.clone() {
1276                Some(rate_limiter) => rate_limiter,
1277                None => return Ok(cached_result.event),
1278            };
1279
1280            // Enforce all quotas consistently with Redis.
1281            let consistent_result = RateLimiter::Consistent(rate_limiter)
1282                .enforce(
1283                    managed_envelope,
1284                    cached_result.event,
1285                    extracted_metrics,
1286                    ctx
1287                )
1288                .await?;
1289
1290            // Update cached rate limits with the freshly computed ones.
1291            if !consistent_result.rate_limits.is_empty() {
1292                self.inner
1293                    .project_cache
1294                    .get(managed_envelope.scoping().project_key)
1295                    .rate_limits()
1296                    .merge(consistent_result.rate_limits);
1297            }
1298
1299            Ok(consistent_result.event)
1300        } else { Ok(cached_result.event) })
1301    }
1302
1303    /// Processes the general errors, and the items which require or create the events.
1304    async fn process_errors(
1305        &self,
1306        managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1307        project_id: ProjectId,
1308        mut ctx: processing::Context<'_>,
1309    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1310        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1311        let mut metrics = Metrics::default();
1312        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1313
1314        // Events can also contain user reports.
1315        report::process_user_reports(managed_envelope);
1316
1317        if_processing!(self.inner.config, {
1318            unreal::expand(managed_envelope, &self.inner.config)?;
1319            #[cfg(sentry)]
1320            playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1321            nnswitch::expand(managed_envelope)?;
1322        });
1323
1324        let mut event = event::extract(
1325            managed_envelope,
1326            &mut metrics,
1327            event_fully_normalized,
1328            &self.inner.config,
1329        )?;
1330
1331        if_processing!(self.inner.config, {
1332            if let Some(inner_event_fully_normalized) =
1333                unreal::process(managed_envelope, &mut event)?
1334            {
1335                event_fully_normalized = inner_event_fully_normalized;
1336            }
1337            #[cfg(sentry)]
1338            if let Some(inner_event_fully_normalized) =
1339                playstation::process(managed_envelope, &mut event, ctx.project_info)?
1340            {
1341                event_fully_normalized = inner_event_fully_normalized;
1342            }
1343            if let Some(inner_event_fully_normalized) =
1344                attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1345            {
1346                event_fully_normalized = inner_event_fully_normalized;
1347            }
1348        });
1349
1350        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1351            managed_envelope,
1352            &mut event,
1353            ctx.project_info,
1354            ctx.sampling_project_info,
1355        );
1356
1357        let attachments = managed_envelope
1358            .envelope()
1359            .items()
1360            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1361        processing::utils::event::finalize(
1362            managed_envelope.envelope().headers(),
1363            &mut event,
1364            attachments,
1365            &mut metrics,
1366            ctx.config,
1367        )?;
1368        event_fully_normalized = processing::utils::event::normalize(
1369            managed_envelope.envelope().headers(),
1370            &mut event,
1371            event_fully_normalized,
1372            project_id,
1373            ctx,
1374            &self.inner.geoip_lookup,
1375        )?;
1376        let filter_run =
1377            processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1378                .map_err(|err| {
1379                managed_envelope.reject(Outcome::Filtered(err.clone()));
1380                ProcessingError::EventFiltered(err)
1381            })?;
1382
1383        if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1384            dynamic_sampling::tag_error_with_sampling_decision(
1385                managed_envelope,
1386                &mut event,
1387                ctx.sampling_project_info,
1388                &self.inner.config,
1389            )
1390            .await;
1391        }
1392
1393        event = self
1394            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1395            .await?;
1396
1397        if event.value().is_some() {
1398            processing::utils::event::scrub(&mut event, ctx.project_info)?;
1399            event::serialize(
1400                managed_envelope,
1401                &mut event,
1402                event_fully_normalized,
1403                EventMetricsExtracted(false),
1404                SpansExtracted(false),
1405            )?;
1406            event::emit_feedback_metrics(managed_envelope.envelope());
1407        }
1408
1409        let attachments = managed_envelope
1410            .envelope_mut()
1411            .items_mut()
1412            .filter(|i| i.ty() == &ItemType::Attachment);
1413        processing::utils::attachments::scrub(attachments, ctx.project_info);
1414
1415        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1416            relay_log::error!(
1417                tags.project = %project_id,
1418                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1419                "ingested event without normalizing"
1420            );
1421        }
1422
1423        Ok(Some(extracted_metrics))
1424    }
1425
1426    /// Processes standalone items that require an event ID, but do not have an event on the same envelope.
1427    async fn process_standalone(
1428        &self,
1429        managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1430        ctx: processing::Context<'_>,
1431    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1432        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1433
1434        standalone::process(managed_envelope);
1435
1436        profile::filter(managed_envelope, ctx.config, ctx.project_info);
1437
1438        self.enforce_quotas(
1439            managed_envelope,
1440            Annotated::empty(),
1441            &mut extracted_metrics,
1442            ctx,
1443        )
1444        .await?;
1445
1446        report::process_user_reports(managed_envelope);
1447        let attachments = managed_envelope
1448            .envelope_mut()
1449            .items_mut()
1450            .filter(|i| i.ty() == &ItemType::Attachment);
1451        processing::utils::attachments::scrub(attachments, ctx.project_info);
1452
1453        Ok(Some(extracted_metrics))
1454    }
1455
1456    /// Processes user and client reports.
1457    async fn process_client_reports(
1458        &self,
1459        managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1460        ctx: processing::Context<'_>,
1461    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1462        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1463
1464        self.enforce_quotas(
1465            managed_envelope,
1466            Annotated::empty(),
1467            &mut extracted_metrics,
1468            ctx,
1469        )
1470        .await?;
1471
1472        report::process_client_reports(
1473            managed_envelope,
1474            ctx.config,
1475            ctx.project_info,
1476            self.inner.addrs.outcome_aggregator.clone(),
1477        );
1478
1479        Ok(Some(extracted_metrics))
1480    }
1481
1482    /// Processes replays.
1483    async fn process_replays(
1484        &self,
1485        managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1486        ctx: processing::Context<'_>,
1487    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1488        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1489
1490        replay::process(
1491            managed_envelope,
1492            ctx.global_config,
1493            ctx.config,
1494            ctx.project_info,
1495            &self.inner.geoip_lookup,
1496        )?;
1497
1498        self.enforce_quotas(
1499            managed_envelope,
1500            Annotated::empty(),
1501            &mut extracted_metrics,
1502            ctx,
1503        )
1504        .await?;
1505
1506        Ok(Some(extracted_metrics))
1507    }
1508
1509    async fn process_nel(
1510        &self,
1511        mut managed_envelope: ManagedEnvelope,
1512        ctx: processing::Context<'_>,
1513    ) -> Result<ProcessingResult, ProcessingError> {
1514        nel::convert_to_logs(&mut managed_envelope);
1515        self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1516            .await
1517    }
1518
1519    async fn process_with_processor<P: processing::Processor>(
1520        &self,
1521        processor: &P,
1522        mut managed_envelope: ManagedEnvelope,
1523        ctx: processing::Context<'_>,
1524    ) -> Result<ProcessingResult, ProcessingError>
1525    where
1526        Outputs: From<P::Output>,
1527    {
1528        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1529            debug_assert!(
1530                false,
1531                "there must be work for the {} processor",
1532                std::any::type_name::<P>(),
1533            );
1534            return Err(ProcessingError::ProcessingGroupMismatch);
1535        };
1536
1537        managed_envelope.update();
1538        match managed_envelope.envelope().is_empty() {
1539            true => managed_envelope.accept(),
1540            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1541        }
1542
1543        processor
1544            .process(work, ctx)
1545            .await
1546            .map_err(|err| {
1547                relay_log::debug!(
1548                    error = &err as &dyn std::error::Error,
1549                    "processing pipeline failed"
1550                );
1551                ProcessingError::ProcessingFailure
1552            })
1553            .map(|o| o.map(Into::into))
1554            .map(ProcessingResult::Output)
1555    }
1556
1557    /// Processes standalone spans.
1558    ///
1559    /// This function does *not* run for spans extracted from transactions.
1560    async fn process_standalone_spans(
1561        &self,
1562        managed_envelope: &mut TypedEnvelope<SpanGroup>,
1563        _project_id: ProjectId,
1564        ctx: processing::Context<'_>,
1565    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1566        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1567
1568        span::filter(managed_envelope, ctx.config, ctx.project_info);
1569
1570        if_processing!(self.inner.config, {
1571            span::process(
1572                managed_envelope,
1573                &mut Annotated::empty(),
1574                &mut extracted_metrics,
1575                _project_id,
1576                ctx,
1577                &self.inner.geoip_lookup,
1578            )
1579            .await;
1580        });
1581
1582        self.enforce_quotas(
1583            managed_envelope,
1584            Annotated::empty(),
1585            &mut extracted_metrics,
1586            ctx,
1587        )
1588        .await?;
1589
1590        Ok(Some(extracted_metrics))
1591    }
1592
1593    async fn process_envelope(
1594        &self,
1595        project_id: ProjectId,
1596        message: ProcessEnvelopeGrouped<'_>,
1597    ) -> Result<ProcessingResult, ProcessingError> {
1598        let ProcessEnvelopeGrouped {
1599            group,
1600            envelope: mut managed_envelope,
1601            ctx,
1602        } = message;
1603
1604        // Pre-process the envelope headers.
1605        if let Some(sampling_state) = ctx.sampling_project_info {
1606            // Both transactions and standalone span envelopes need a normalized DSC header
1607            // to make sampling rules based on the segment/transaction name work correctly.
1608            managed_envelope
1609                .envelope_mut()
1610                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1611        }
1612
1613        // Set the event retention. Effectively, this value will only be available in processing
1614        // mode when the full project config is queried from the upstream.
1615        if let Some(retention) = ctx.project_info.config.event_retention {
1616            managed_envelope.envelope_mut().set_retention(retention);
1617        }
1618
1619        // Set the event retention. Effectively, this value will only be available in processing
1620        // mode when the full project config is queried from the upstream.
1621        if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1622            managed_envelope
1623                .envelope_mut()
1624                .set_downsampled_retention(retention);
1625        }
1626
1627        // Ensure the project ID is updated to the stored instance for this project cache. This can
1628        // differ in two cases:
1629        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
1630        //  2. The DSN was moved and the envelope sent to the old project ID.
1631        managed_envelope
1632            .envelope_mut()
1633            .meta_mut()
1634            .set_project_id(project_id);
1635
1636        macro_rules! run {
1637            ($fn_name:ident $(, $args:expr)*) => {
1638                async {
1639                    let mut managed_envelope = (managed_envelope, group).try_into()?;
1640                    match self.$fn_name(&mut managed_envelope, $($args),*).await {
1641                        Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1642                            managed_envelope: managed_envelope.into_processed(),
1643                            extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1644                        }),
1645                        Err(error) => {
1646                            relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1647                            if let Some(outcome) = error.to_outcome() {
1648                                managed_envelope.reject(outcome);
1649                            }
1650
1651                            return Err(error);
1652                        }
1653                    }
1654                }.await
1655            };
1656        }
1657
1658        relay_log::trace!("Processing {group} group", group = group.variant());
1659
1660        match group {
1661            ProcessingGroup::Error => run!(process_errors, project_id, ctx),
1662            ProcessingGroup::Transaction => {
1663                self.process_with_processor(
1664                    &self.inner.processing.transactions,
1665                    managed_envelope,
1666                    ctx,
1667                )
1668                .await
1669            }
1670            ProcessingGroup::Session => {
1671                self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1672                    .await
1673            }
1674            ProcessingGroup::Standalone => run!(process_standalone, ctx),
1675            ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
1676            ProcessingGroup::Replay => {
1677                if ctx.project_info.has_feature(Feature::NewReplayProcessing) {
1678                    self.process_with_processor(
1679                        &self.inner.processing.replays,
1680                        managed_envelope,
1681                        ctx,
1682                    )
1683                    .await
1684                } else {
1685                    run!(process_replays, ctx)
1686                }
1687            }
1688            ProcessingGroup::CheckIn => {
1689                self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1690                    .await
1691            }
1692            ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1693            ProcessingGroup::Log => {
1694                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1695                    .await
1696            }
1697            ProcessingGroup::TraceMetric => {
1698                self.process_with_processor(
1699                    &self.inner.processing.trace_metrics,
1700                    managed_envelope,
1701                    ctx,
1702                )
1703                .await
1704            }
1705            ProcessingGroup::SpanV2 => {
1706                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1707                    .await
1708            }
1709            ProcessingGroup::TraceAttachment => {
1710                self.process_with_processor(
1711                    &self.inner.processing.trace_attachments,
1712                    managed_envelope,
1713                    ctx,
1714                )
1715                .await
1716            }
1717            ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1718            ProcessingGroup::ProfileChunk => {
1719                self.process_with_processor(
1720                    &self.inner.processing.profile_chunks,
1721                    managed_envelope,
1722                    ctx,
1723                )
1724                .await
1725            }
1726            // Currently is not used.
1727            ProcessingGroup::Metrics => {
1728                // In proxy mode we simply forward the metrics.
1729                // This group shouldn't be used outside of proxy mode.
1730                if self.inner.config.relay_mode() != RelayMode::Proxy {
1731                    relay_log::error!(
1732                        tags.project = %project_id,
1733                        items = ?managed_envelope.envelope().items().next().map(Item::ty),
1734                        "received metrics in the process_state"
1735                    );
1736                }
1737
1738                Ok(ProcessingResult::no_metrics(
1739                    managed_envelope.into_processed(),
1740                ))
1741            }
1742            // Fallback to the legacy process_state implementation for Ungrouped events.
1743            ProcessingGroup::Ungrouped => {
1744                relay_log::error!(
1745                    tags.project = %project_id,
1746                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
1747                    "could not identify the processing group based on the envelope's items"
1748                );
1749
1750                Ok(ProcessingResult::no_metrics(
1751                    managed_envelope.into_processed(),
1752                ))
1753            }
1754            // Leave this group unchanged.
1755            //
1756            // This will later be forwarded to upstream.
1757            ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
1758                managed_envelope.into_processed(),
1759            )),
1760        }
1761    }
1762
1763    /// Processes the envelope and returns the processed envelope back.
1764    ///
1765    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
1766    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
1767    /// to be dropped, this is `None`.
1768    async fn process<'a>(
1769        &self,
1770        mut message: ProcessEnvelopeGrouped<'a>,
1771    ) -> Result<Option<Submit<'a>>, ProcessingError> {
1772        let ProcessEnvelopeGrouped {
1773            ref mut envelope,
1774            ctx,
1775            ..
1776        } = message;
1777
1778        // Prefer the project's project ID, and fall back to the stated project id from the
1779        // envelope. The project ID is available in all modes, other than in proxy mode, where
1780        // envelopes for unknown projects are forwarded blindly.
1781        //
1782        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
1783        // since we cannot process an envelope without project ID, so drop it.
1784        let Some(project_id) = ctx
1785            .project_info
1786            .project_id
1787            .or_else(|| envelope.envelope().meta().project_id())
1788        else {
1789            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1790            return Err(ProcessingError::MissingProjectId);
1791        };
1792
1793        let client = envelope.envelope().meta().client().map(str::to_owned);
1794        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1795        let project_key = envelope.envelope().meta().public_key();
1796        // Only allow sending to the sampling key, if we successfully loaded a sampling project
1797        // info relating to it. This filters out unknown/invalid project keys as well as project
1798        // keys from different organizations.
1799        let sampling_key = envelope
1800            .envelope()
1801            .sampling_key()
1802            .filter(|_| ctx.sampling_project_info.is_some());
1803
1804        // We set additional information on the scope, which will be removed after processing the
1805        // envelope.
1806        relay_log::configure_scope(|scope| {
1807            scope.set_tag("project", project_id);
1808            if let Some(client) = client {
1809                scope.set_tag("sdk", client);
1810            }
1811            if let Some(user_agent) = user_agent {
1812                scope.set_extra("user_agent", user_agent.into());
1813            }
1814        });
1815
1816        let result = match self.process_envelope(project_id, message).await {
1817            Ok(ProcessingResult::Envelope {
1818                mut managed_envelope,
1819                extracted_metrics,
1820            }) => {
1821                // The envelope could be modified or even emptied during processing, which
1822                // requires re-computation of the context.
1823                managed_envelope.update();
1824
1825                let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
1826                send_metrics(
1827                    extracted_metrics.metrics,
1828                    project_key,
1829                    sampling_key,
1830                    &self.inner.addrs.aggregator,
1831                );
1832
1833                let envelope_response = if managed_envelope.envelope().is_empty() {
1834                    if !has_metrics {
1835                        // Individual rate limits have already been issued
1836                        managed_envelope.reject(Outcome::RateLimited(None));
1837                    } else {
1838                        managed_envelope.accept();
1839                    }
1840
1841                    None
1842                } else {
1843                    Some(managed_envelope)
1844                };
1845
1846                Ok(envelope_response.map(Submit::Envelope))
1847            }
1848            Ok(ProcessingResult::Output(Output { main, metrics })) => {
1849                if let Some(metrics) = metrics {
1850                    metrics.accept(|metrics| {
1851                        send_metrics(
1852                            metrics,
1853                            project_key,
1854                            sampling_key,
1855                            &self.inner.addrs.aggregator,
1856                        );
1857                    });
1858                }
1859
1860                let ctx = ctx.to_forward();
1861                Ok(main.map(|output| Submit::Output { output, ctx }))
1862            }
1863            Err(err) => Err(err),
1864        };
1865
1866        relay_log::configure_scope(|scope| {
1867            scope.remove_tag("project");
1868            scope.remove_tag("sdk");
1869            scope.remove_tag("user_agent");
1870        });
1871
1872        result
1873    }
1874
1875    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1876        let project_key = message.envelope.envelope().meta().public_key();
1877        let wait_time = message.envelope.age();
1878        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1879
1880        // This COGS handling may need an overhaul in the future:
1881        // Cancel the passed in token, to start individual measurements per envelope instead.
1882        cogs.cancel();
1883
1884        let scoping = message.envelope.scoping();
1885        for (group, envelope) in ProcessingGroup::split_envelope(
1886            *message.envelope.into_envelope(),
1887            &message.project_info,
1888        ) {
1889            let mut cogs = self
1890                .inner
1891                .cogs
1892                .timed(ResourceId::Relay, AppFeature::from(group));
1893
1894            let mut envelope =
1895                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1896            envelope.scope(scoping);
1897
1898            let global_config = self.inner.global_config.current();
1899
1900            let ctx = processing::Context {
1901                config: &self.inner.config,
1902                global_config: &global_config,
1903                project_info: &message.project_info,
1904                sampling_project_info: message.sampling_project_info.as_deref(),
1905                rate_limits: &message.rate_limits,
1906                reservoir_counters: &message.reservoir_counters,
1907            };
1908
1909            let message = ProcessEnvelopeGrouped {
1910                group,
1911                envelope,
1912                ctx,
1913            };
1914
1915            let result = metric!(
1916                timer(RelayTimers::EnvelopeProcessingTime),
1917                group = group.variant(),
1918                { self.process(message).await }
1919            );
1920
1921            match result {
1922                Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
1923                Ok(None) => {}
1924                Err(error) if error.is_unexpected() => {
1925                    relay_log::error!(
1926                        tags.project_key = %project_key,
1927                        error = &error as &dyn Error,
1928                        "error processing envelope"
1929                    )
1930                }
1931                Err(error) => {
1932                    relay_log::debug!(
1933                        tags.project_key = %project_key,
1934                        error = &error as &dyn Error,
1935                        "error processing envelope"
1936                    )
1937                }
1938            }
1939        }
1940    }
1941
1942    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1943        let ProcessMetrics {
1944            data,
1945            project_key,
1946            received_at,
1947            sent_at,
1948            source,
1949        } = message;
1950
1951        let received_timestamp =
1952            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1953
1954        let mut buckets = data.into_buckets(received_timestamp);
1955        if buckets.is_empty() {
1956            return;
1957        };
1958        cogs.update(relay_metrics::cogs::BySize(&buckets));
1959
1960        let clock_drift_processor =
1961            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
1962
1963        buckets.retain_mut(|bucket| {
1964            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
1965                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
1966                return false;
1967            }
1968
1969            if !self::metrics::is_valid_namespace(bucket) {
1970                return false;
1971            }
1972
1973            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
1974
1975            if !matches!(source, BucketSource::Internal) {
1976                bucket.metadata = BucketMetadata::new(received_timestamp);
1977            }
1978
1979            true
1980        });
1981
1982        let project = self.inner.project_cache.get(project_key);
1983
1984        // Best effort check to filter and rate limit buckets, if there is no project state
1985        // available at the current time, we will check again after flushing.
1986        let buckets = match project.state() {
1987            ProjectState::Enabled(project_info) => {
1988                let rate_limits = project.rate_limits().current_limits();
1989                self.check_buckets(project_key, project_info, &rate_limits, buckets)
1990            }
1991            _ => buckets,
1992        };
1993
1994        relay_log::trace!("merging metric buckets into the aggregator");
1995        self.inner
1996            .addrs
1997            .aggregator
1998            .send(MergeBuckets::new(project_key, buckets));
1999    }
2000
2001    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2002        let ProcessBatchedMetrics {
2003            payload,
2004            source,
2005            received_at,
2006            sent_at,
2007        } = message;
2008
2009        #[derive(serde::Deserialize)]
2010        struct Wrapper {
2011            buckets: HashMap<ProjectKey, Vec<Bucket>>,
2012        }
2013
2014        let buckets = match serde_json::from_slice(&payload) {
2015            Ok(Wrapper { buckets }) => buckets,
2016            Err(error) => {
2017                relay_log::debug!(
2018                    error = &error as &dyn Error,
2019                    "failed to parse batched metrics",
2020                );
2021                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2022                return;
2023            }
2024        };
2025
2026        for (project_key, buckets) in buckets {
2027            self.handle_process_metrics(
2028                cogs,
2029                ProcessMetrics {
2030                    data: MetricData::Parsed(buckets),
2031                    project_key,
2032                    source,
2033                    received_at,
2034                    sent_at,
2035                },
2036            )
2037        }
2038    }
2039
2040    fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2041        let _submit = cogs.start_category("submit");
2042
2043        #[cfg(feature = "processing")]
2044        if self.inner.config.processing_enabled()
2045            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2046        {
2047            use crate::processing::StoreHandle;
2048
2049            let upload = self.inner.addrs.upload.as_ref();
2050            match submit {
2051                Submit::Envelope(envelope) => {
2052                    let envelope_has_attachments = envelope
2053                        .envelope()
2054                        .items()
2055                        .any(|item| *item.ty() == ItemType::Attachment);
2056                    // Whether Relay will store this attachment in objectstore or use kafka like before.
2057                    let use_objectstore = || {
2058                        let options = &self.inner.global_config.current().options;
2059                        utils::sample(options.objectstore_attachments_sample_rate).is_keep()
2060                    };
2061
2062                    if let Some(upload) = &self.inner.addrs.upload
2063                        && envelope_has_attachments
2064                        && use_objectstore()
2065                    {
2066                        // the `UploadService` will upload all attachments, and then forward the envelope to the `StoreService`.
2067                        upload.send(StoreEnvelope { envelope })
2068                    } else {
2069                        store_forwarder.send(StoreEnvelope { envelope })
2070                    }
2071                }
2072                Submit::Output { output, ctx } => output
2073                    .forward_store(StoreHandle::new(store_forwarder, upload), ctx)
2074                    .unwrap_or_else(|err| err.into_inner()),
2075            }
2076            return;
2077        }
2078
2079        let mut envelope = match submit {
2080            Submit::Envelope(envelope) => envelope,
2081            Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2082                Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2083                Err(_) => {
2084                    relay_log::error!("failed to serialize output to an envelope");
2085                    return;
2086                }
2087            },
2088        };
2089
2090        if envelope.envelope_mut().is_empty() {
2091            envelope.accept();
2092            return;
2093        }
2094
2095        // Override the `sent_at` timestamp. Since the envelope went through basic
2096        // normalization, all timestamps have been corrected. We propagate the new
2097        // `sent_at` to allow the next Relay to double-check this timestamp and
2098        // potentially apply correction again. This is done as close to sending as
2099        // possible so that we avoid internal delays.
2100        envelope.envelope_mut().set_sent_at(Utc::now());
2101
2102        relay_log::trace!("sending envelope to sentry endpoint");
2103        let http_encoding = self.inner.config.http_encoding();
2104        let result = envelope.envelope().to_vec().and_then(|v| {
2105            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2106        });
2107
2108        match result {
2109            Ok(body) => {
2110                self.inner
2111                    .addrs
2112                    .upstream_relay
2113                    .send(SendRequest(SendEnvelope {
2114                        envelope,
2115                        body,
2116                        http_encoding,
2117                        project_cache: self.inner.project_cache.clone(),
2118                    }));
2119            }
2120            Err(error) => {
2121                // Errors are only logged for what we consider an internal discard reason. These
2122                // indicate errors in the infrastructure or implementation bugs.
2123                relay_log::error!(
2124                    error = &error as &dyn Error,
2125                    tags.project_key = %envelope.scoping().project_key,
2126                    "failed to serialize envelope payload"
2127                );
2128
2129                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2130            }
2131        }
2132    }
2133
2134    fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2135        let SubmitClientReports {
2136            client_reports,
2137            scoping,
2138        } = message;
2139
2140        let upstream = self.inner.config.upstream_descriptor();
2141        let dsn = PartialDsn::outbound(&scoping, upstream);
2142
2143        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2144        for client_report in client_reports {
2145            match client_report.serialize() {
2146                Ok(payload) => {
2147                    let mut item = Item::new(ItemType::ClientReport);
2148                    item.set_payload(ContentType::Json, payload);
2149                    envelope.add_item(item);
2150                }
2151                Err(error) => {
2152                    relay_log::error!(
2153                        error = &error as &dyn std::error::Error,
2154                        "failed to serialize client report"
2155                    );
2156                }
2157            }
2158        }
2159
2160        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2161        self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2162    }
2163
2164    fn check_buckets(
2165        &self,
2166        project_key: ProjectKey,
2167        project_info: &ProjectInfo,
2168        rate_limits: &RateLimits,
2169        buckets: Vec<Bucket>,
2170    ) -> Vec<Bucket> {
2171        let Some(scoping) = project_info.scoping(project_key) else {
2172            relay_log::error!(
2173                tags.project_key = project_key.as_str(),
2174                "there is no scoping: dropping {} buckets",
2175                buckets.len(),
2176            );
2177            return Vec::new();
2178        };
2179
2180        let mut buckets = self::metrics::apply_project_info(
2181            buckets,
2182            &self.inner.metric_outcomes,
2183            project_info,
2184            scoping,
2185        );
2186
2187        let namespaces: BTreeSet<MetricNamespace> = buckets
2188            .iter()
2189            .filter_map(|bucket| bucket.name.try_namespace())
2190            .collect();
2191
2192        for namespace in namespaces {
2193            let limits = rate_limits.check_with_quotas(
2194                project_info.get_quotas(),
2195                scoping.item(DataCategory::MetricBucket),
2196            );
2197
2198            if limits.is_limited() {
2199                let rejected;
2200                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2201                    bucket.name.try_namespace() == Some(namespace)
2202                });
2203
2204                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2205                self.inner.metric_outcomes.track(
2206                    scoping,
2207                    &rejected,
2208                    Outcome::RateLimited(reason_code),
2209                );
2210            }
2211        }
2212
2213        let quotas = project_info.config.quotas.clone();
2214        match MetricsLimiter::create(buckets, quotas, scoping) {
2215            Ok(mut bucket_limiter) => {
2216                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2217                bucket_limiter.into_buckets()
2218            }
2219            Err(buckets) => buckets,
2220        }
2221    }
2222
2223    #[cfg(feature = "processing")]
2224    async fn rate_limit_buckets(
2225        &self,
2226        scoping: Scoping,
2227        project_info: &ProjectInfo,
2228        mut buckets: Vec<Bucket>,
2229    ) -> Vec<Bucket> {
2230        let Some(rate_limiter) = &self.inner.rate_limiter else {
2231            return buckets;
2232        };
2233
2234        let global_config = self.inner.global_config.current();
2235        let namespaces = buckets
2236            .iter()
2237            .filter_map(|bucket| bucket.name.try_namespace())
2238            .counts();
2239
2240        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2241
2242        for (namespace, quantity) in namespaces {
2243            let item_scoping = scoping.metric_bucket(namespace);
2244
2245            let limits = match rate_limiter
2246                .is_rate_limited(quotas, item_scoping, quantity, false)
2247                .await
2248            {
2249                Ok(limits) => limits,
2250                Err(err) => {
2251                    relay_log::error!(
2252                        error = &err as &dyn std::error::Error,
2253                        "failed to check redis rate limits"
2254                    );
2255                    break;
2256                }
2257            };
2258
2259            if limits.is_limited() {
2260                let rejected;
2261                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2262                    bucket.name.try_namespace() == Some(namespace)
2263                });
2264
2265                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2266                self.inner.metric_outcomes.track(
2267                    scoping,
2268                    &rejected,
2269                    Outcome::RateLimited(reason_code),
2270                );
2271
2272                self.inner
2273                    .project_cache
2274                    .get(item_scoping.scoping.project_key)
2275                    .rate_limits()
2276                    .merge(limits);
2277            }
2278        }
2279
2280        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2281            Err(buckets) => buckets,
2282            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2283        }
2284    }
2285
2286    /// Check and apply rate limits to metrics buckets for transactions and spans.
2287    #[cfg(feature = "processing")]
2288    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2289        relay_log::trace!("handle_rate_limit_buckets");
2290
2291        let scoping = *bucket_limiter.scoping();
2292
2293        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2294            let global_config = self.inner.global_config.current();
2295            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2296
2297            // We set over_accept_once such that the limit is actually reached, which allows subsequent
2298            // calls with quantity=0 to be rate limited.
2299            let over_accept_once = true;
2300            let mut rate_limits = RateLimits::new();
2301
2302            for category in [DataCategory::Transaction, DataCategory::Span] {
2303                let count = bucket_limiter.count(category);
2304
2305                let timer = Instant::now();
2306                let mut is_limited = false;
2307
2308                if let Some(count) = count {
2309                    match rate_limiter
2310                        .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2311                        .await
2312                    {
2313                        Ok(limits) => {
2314                            is_limited = limits.is_limited();
2315                            rate_limits.merge(limits)
2316                        }
2317                        Err(e) => relay_log::error!(error = &e as &dyn Error),
2318                    }
2319                }
2320
2321                relay_statsd::metric!(
2322                    timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2323                    category = category.name(),
2324                    limited = if is_limited { "true" } else { "false" },
2325                    count = match count {
2326                        None => "none",
2327                        Some(0) => "0",
2328                        Some(1) => "1",
2329                        Some(1..=10) => "10",
2330                        Some(1..=25) => "25",
2331                        Some(1..=50) => "50",
2332                        Some(51..=100) => "100",
2333                        Some(101..=500) => "500",
2334                        _ => "> 500",
2335                    },
2336                );
2337            }
2338
2339            if rate_limits.is_limited() {
2340                let was_enforced =
2341                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2342
2343                if was_enforced {
2344                    // Update the rate limits in the project cache.
2345                    self.inner
2346                        .project_cache
2347                        .get(scoping.project_key)
2348                        .rate_limits()
2349                        .merge(rate_limits);
2350                }
2351            }
2352        }
2353
2354        bucket_limiter.into_buckets()
2355    }
2356
2357    /// Cardinality limits the passed buckets and returns a filtered vector of only accepted buckets.
2358    #[cfg(feature = "processing")]
2359    async fn cardinality_limit_buckets(
2360        &self,
2361        scoping: Scoping,
2362        limits: &[CardinalityLimit],
2363        buckets: Vec<Bucket>,
2364    ) -> Vec<Bucket> {
2365        let global_config = self.inner.global_config.current();
2366        let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2367
2368        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2369            return buckets;
2370        }
2371
2372        let Some(ref limiter) = self.inner.cardinality_limiter else {
2373            return buckets;
2374        };
2375
2376        let scope = relay_cardinality::Scoping {
2377            organization_id: scoping.organization_id,
2378            project_id: scoping.project_id,
2379        };
2380
2381        let limits = match limiter
2382            .check_cardinality_limits(scope, limits, buckets)
2383            .await
2384        {
2385            Ok(limits) => limits,
2386            Err((buckets, error)) => {
2387                relay_log::error!(
2388                    error = &error as &dyn std::error::Error,
2389                    "cardinality limiter failed"
2390                );
2391                return buckets;
2392            }
2393        };
2394
2395        let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2396        if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2397            for limit in limits.exceeded_limits() {
2398                relay_log::with_scope(
2399                    |scope| {
2400                        // Set the organization as user so we can alert on distinct org_ids.
2401                        scope.set_user(Some(relay_log::sentry::User {
2402                            id: Some(scoping.organization_id.to_string()),
2403                            ..Default::default()
2404                        }));
2405                    },
2406                    || {
2407                        relay_log::error!(
2408                            tags.organization_id = scoping.organization_id.value(),
2409                            tags.limit_id = limit.id,
2410                            tags.passive = limit.passive,
2411                            "Cardinality Limit"
2412                        );
2413                    },
2414                );
2415            }
2416        }
2417
2418        for (limit, reports) in limits.cardinality_reports() {
2419            for report in reports {
2420                self.inner
2421                    .metric_outcomes
2422                    .cardinality(scoping, limit, report);
2423            }
2424        }
2425
2426        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2427            return limits.into_source();
2428        }
2429
2430        let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2431
2432        for (bucket, exceeded) in rejected {
2433            self.inner.metric_outcomes.track(
2434                scoping,
2435                &[bucket],
2436                Outcome::CardinalityLimited(exceeded.id.clone()),
2437            );
2438        }
2439        accepted
2440    }
2441
2442    /// Processes metric buckets and sends them to kafka.
2443    ///
2444    /// This function runs the following steps:
2445    ///  - cardinality limiting
2446    ///  - rate limiting
2447    ///  - submit to `StoreForwarder`
2448    #[cfg(feature = "processing")]
2449    async fn encode_metrics_processing(
2450        &self,
2451        message: FlushBuckets,
2452        store_forwarder: &Addr<Store>,
2453    ) {
2454        use crate::constants::DEFAULT_EVENT_RETENTION;
2455        use crate::services::store::StoreMetrics;
2456
2457        for ProjectBuckets {
2458            buckets,
2459            scoping,
2460            project_info,
2461            ..
2462        } in message.buckets.into_values()
2463        {
2464            let buckets = self
2465                .rate_limit_buckets(scoping, &project_info, buckets)
2466                .await;
2467
2468            let limits = project_info.get_cardinality_limits();
2469            let buckets = self
2470                .cardinality_limit_buckets(scoping, limits, buckets)
2471                .await;
2472
2473            if buckets.is_empty() {
2474                continue;
2475            }
2476
2477            let retention = project_info
2478                .config
2479                .event_retention
2480                .unwrap_or(DEFAULT_EVENT_RETENTION);
2481
2482            // The store forwarder takes care of bucket splitting internally, so we can submit the
2483            // entire list of buckets. There is no batching needed here.
2484            store_forwarder.send(StoreMetrics {
2485                buckets,
2486                scoping,
2487                retention,
2488            });
2489        }
2490    }
2491
2492    /// Serializes metric buckets to JSON and sends them to the upstream.
2493    ///
2494    /// This function runs the following steps:
2495    ///  - partitioning
2496    ///  - batching by configured size limit
2497    ///  - serialize to JSON and pack in an envelope
2498    ///  - submit the envelope to upstream or kafka depending on configuration
2499    ///
2500    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2501    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2502    /// already.
2503    fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2504        let FlushBuckets {
2505            partition_key,
2506            buckets,
2507        } = message;
2508
2509        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2510        let upstream = self.inner.config.upstream_descriptor();
2511
2512        for ProjectBuckets {
2513            buckets, scoping, ..
2514        } in buckets.values()
2515        {
2516            let dsn = PartialDsn::outbound(scoping, upstream);
2517
2518            relay_statsd::metric!(
2519                distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2520            );
2521
2522            let mut num_batches = 0;
2523            for batch in BucketsView::from(buckets).by_size(batch_size) {
2524                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2525
2526                let mut item = Item::new(ItemType::MetricBuckets);
2527                item.set_source_quantities(crate::metrics::extract_quantities(batch));
2528                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2529                envelope.add_item(item);
2530
2531                let mut envelope =
2532                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2533                envelope
2534                    .set_partition_key(Some(partition_key))
2535                    .scope(*scoping);
2536
2537                relay_statsd::metric!(
2538                    distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2539                );
2540
2541                self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2542                num_batches += 1;
2543            }
2544
2545            relay_statsd::metric!(
2546                distribution(RelayDistributions::BatchesPerPartition) = num_batches
2547            );
2548        }
2549    }
2550
2551    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
2552    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2553        if partition.is_empty() {
2554            return;
2555        }
2556
2557        let (unencoded, project_info) = partition.take();
2558        let http_encoding = self.inner.config.http_encoding();
2559        let encoded = match encode_payload(&unencoded, http_encoding) {
2560            Ok(payload) => payload,
2561            Err(error) => {
2562                let error = &error as &dyn std::error::Error;
2563                relay_log::error!(error, "failed to encode metrics payload");
2564                return;
2565            }
2566        };
2567
2568        let request = SendMetricsRequest {
2569            partition_key: partition_key.to_string(),
2570            unencoded,
2571            encoded,
2572            project_info,
2573            http_encoding,
2574            metric_outcomes: self.inner.metric_outcomes.clone(),
2575        };
2576
2577        self.inner.addrs.upstream_relay.send(SendRequest(request));
2578    }
2579
2580    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
2581    ///
2582    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
2583    /// payload directly instead of per-project Envelopes.
2584    ///
2585    /// This function runs the following steps:
2586    ///  - partitioning
2587    ///  - batching by configured size limit
2588    ///  - serialize to JSON
2589    ///  - submit directly to the upstream
2590    ///
2591    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2592    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2593    /// already.
2594    fn encode_metrics_global(&self, message: FlushBuckets) {
2595        let FlushBuckets {
2596            partition_key,
2597            buckets,
2598        } = message;
2599
2600        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2601        let mut partition = Partition::new(batch_size);
2602        let mut partition_splits = 0;
2603
2604        for ProjectBuckets {
2605            buckets, scoping, ..
2606        } in buckets.values()
2607        {
2608            for bucket in buckets {
2609                let mut remaining = Some(BucketView::new(bucket));
2610
2611                while let Some(bucket) = remaining.take() {
2612                    if let Some(next) = partition.insert(bucket, *scoping) {
2613                        // A part of the bucket could not be inserted. Take the partition and submit
2614                        // it immediately. Repeat until the final part was inserted. This should
2615                        // always result in a request, otherwise we would enter an endless loop.
2616                        self.send_global_partition(partition_key, &mut partition);
2617                        remaining = Some(next);
2618                        partition_splits += 1;
2619                    }
2620                }
2621            }
2622        }
2623
2624        if partition_splits > 0 {
2625            metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2626        }
2627
2628        self.send_global_partition(partition_key, &mut partition);
2629    }
2630
2631    async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2632        for (project_key, pb) in message.buckets.iter_mut() {
2633            let buckets = std::mem::take(&mut pb.buckets);
2634            pb.buckets =
2635                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2636        }
2637
2638        #[cfg(feature = "processing")]
2639        if self.inner.config.processing_enabled()
2640            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2641        {
2642            return self
2643                .encode_metrics_processing(message, store_forwarder)
2644                .await;
2645        }
2646
2647        if self.inner.config.http_global_metrics() {
2648            self.encode_metrics_global(message)
2649        } else {
2650            self.encode_metrics_envelope(cogs, message)
2651        }
2652    }
2653
2654    #[cfg(all(test, feature = "processing"))]
2655    fn redis_rate_limiter_enabled(&self) -> bool {
2656        self.inner.rate_limiter.is_some()
2657    }
2658
2659    async fn handle_message(&self, message: EnvelopeProcessor) {
2660        let ty = message.variant();
2661        let feature_weights = self.feature_weights(&message);
2662
2663        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2664            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2665
2666            match message {
2667                EnvelopeProcessor::ProcessEnvelope(m) => {
2668                    self.handle_process_envelope(&mut cogs, *m).await
2669                }
2670                EnvelopeProcessor::ProcessProjectMetrics(m) => {
2671                    self.handle_process_metrics(&mut cogs, *m)
2672                }
2673                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2674                    self.handle_process_batched_metrics(&mut cogs, *m)
2675                }
2676                EnvelopeProcessor::FlushBuckets(m) => {
2677                    self.handle_flush_buckets(&mut cogs, *m).await
2678                }
2679                EnvelopeProcessor::SubmitClientReports(m) => {
2680                    self.handle_submit_client_reports(&mut cogs, *m)
2681                }
2682            }
2683        });
2684    }
2685
2686    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2687        match message {
2688            // Envelope is split later and tokens are attributed then.
2689            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2690            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2691            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2692            EnvelopeProcessor::FlushBuckets(v) => v
2693                .buckets
2694                .values()
2695                .map(|s| {
2696                    if self.inner.config.processing_enabled() {
2697                        // Processing does not encode the metrics but instead rate and cardinality
2698                        // limits the metrics, which scales by count and not size.
2699                        relay_metrics::cogs::ByCount(&s.buckets).into()
2700                    } else {
2701                        relay_metrics::cogs::BySize(&s.buckets).into()
2702                    }
2703                })
2704                .fold(FeatureWeights::none(), FeatureWeights::merge),
2705            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2706        }
2707    }
2708}
2709
2710impl Service for EnvelopeProcessorService {
2711    type Interface = EnvelopeProcessor;
2712
2713    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2714        while let Some(message) = rx.recv().await {
2715            let service = self.clone();
2716            self.inner
2717                .pool
2718                .spawn_async(
2719                    async move {
2720                        service.handle_message(message).await;
2721                    }
2722                    .boxed(),
2723                )
2724                .await;
2725        }
2726    }
2727}
2728
2729/// Result of the enforcement of rate limiting.
2730///
2731/// If the event is already `None` or it's rate limited, it will be `None`
2732/// within the [`Annotated`].
2733struct EnforcementResult {
2734    event: Annotated<Event>,
2735    #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2736    rate_limits: RateLimits,
2737}
2738
2739impl EnforcementResult {
2740    /// Creates a new [`EnforcementResult`].
2741    pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
2742        Self { event, rate_limits }
2743    }
2744}
2745
2746#[derive(Clone)]
2747enum RateLimiter {
2748    Cached,
2749    #[cfg(feature = "processing")]
2750    Consistent(Arc<RedisRateLimiter>),
2751}
2752
2753impl RateLimiter {
2754    async fn enforce<Group>(
2755        &self,
2756        managed_envelope: &mut TypedEnvelope<Group>,
2757        event: Annotated<Event>,
2758        _extracted_metrics: &mut ProcessingExtractedMetrics,
2759        ctx: processing::Context<'_>,
2760    ) -> Result<EnforcementResult, ProcessingError> {
2761        if managed_envelope.envelope().is_empty() && event.value().is_none() {
2762            return Ok(EnforcementResult::new(event, RateLimits::default()));
2763        }
2764
2765        let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
2766        if quotas.is_empty() {
2767            return Ok(EnforcementResult::new(event, RateLimits::default()));
2768        }
2769
2770        let event_category = event_category(&event);
2771
2772        // We extract the rate limiters, in case we perform consistent rate limiting, since we will
2773        // need Redis access.
2774        //
2775        // When invoking the rate limiter, capture if the event item has been rate limited to also
2776        // remove it from the processing state eventually.
2777        let this = self.clone();
2778        let mut envelope_limiter =
2779            EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
2780                let this = this.clone();
2781
2782                async move {
2783                    match this {
2784                        #[cfg(feature = "processing")]
2785                        RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
2786                            rate_limiter
2787                                .is_rate_limited(quotas, item_scope, _quantity, false)
2788                                .await?,
2789                        ),
2790                        _ => Ok::<_, ProcessingError>(
2791                            ctx.rate_limits.check_with_quotas(quotas, item_scope),
2792                        ),
2793                    }
2794                }
2795            });
2796
2797        // Tell the envelope limiter about the event, since it has been removed from the Envelope at
2798        // this stage in processing.
2799        if let Some(category) = event_category {
2800            envelope_limiter.assume_event(category);
2801        }
2802
2803        let scoping = managed_envelope.scoping();
2804        let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
2805            envelope_limiter
2806                .compute(managed_envelope.envelope_mut(), &scoping)
2807                .await
2808        })?;
2809        let event_active = enforcement.is_event_active();
2810
2811        // Use the same rate limits as used for the envelope on the metrics.
2812        // Those rate limits should not be checked for expiry or similar to ensure a consistent
2813        // limiting of envelope items and metrics.
2814        #[cfg(feature = "processing")]
2815        _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
2816        enforcement.apply_with_outcomes(managed_envelope);
2817
2818        if event_active {
2819            debug_assert!(managed_envelope.envelope().is_empty());
2820            return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
2821        }
2822
2823        Ok(EnforcementResult::new(event, rate_limits))
2824    }
2825
2826    fn name(&self) -> &'static str {
2827        match self {
2828            Self::Cached => "cached",
2829            #[cfg(feature = "processing")]
2830            Self::Consistent(_) => "consistent",
2831        }
2832    }
2833}
2834
2835pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2836    let envelope_body: Vec<u8> = match http_encoding {
2837        HttpEncoding::Identity => return Ok(body.clone()),
2838        HttpEncoding::Deflate => {
2839            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2840            encoder.write_all(body.as_ref())?;
2841            encoder.finish()?
2842        }
2843        HttpEncoding::Gzip => {
2844            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2845            encoder.write_all(body.as_ref())?;
2846            encoder.finish()?
2847        }
2848        HttpEncoding::Br => {
2849            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
2850            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2851            encoder.write_all(body.as_ref())?;
2852            encoder.into_inner()
2853        }
2854        HttpEncoding::Zstd => {
2855            // Use the fastest compression level, our main objective here is to get the best
2856            // compression ratio for least amount of time spent.
2857            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2858            encoder.write_all(body.as_ref())?;
2859            encoder.finish()?
2860        }
2861    };
2862
2863    Ok(envelope_body.into())
2864}
2865
2866/// An upstream request that submits an envelope via HTTP.
2867#[derive(Debug)]
2868pub struct SendEnvelope {
2869    pub envelope: TypedEnvelope<Processed>,
2870    pub body: Bytes,
2871    pub http_encoding: HttpEncoding,
2872    pub project_cache: ProjectCacheHandle,
2873}
2874
2875impl UpstreamRequest for SendEnvelope {
2876    fn method(&self) -> reqwest::Method {
2877        reqwest::Method::POST
2878    }
2879
2880    fn path(&self) -> Cow<'_, str> {
2881        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2882    }
2883
2884    fn route(&self) -> &'static str {
2885        "envelope"
2886    }
2887
2888    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2889        let envelope_body = self.body.clone();
2890        metric!(
2891            distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2892        );
2893
2894        let meta = &self.envelope.meta();
2895        let shard = self.envelope.partition_key().map(|p| p.to_string());
2896        builder
2897            .content_encoding(self.http_encoding)
2898            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2899            .header_opt("User-Agent", meta.user_agent())
2900            .header("X-Sentry-Auth", meta.auth_header())
2901            .header("X-Forwarded-For", meta.forwarded_for())
2902            .header("Content-Type", envelope::CONTENT_TYPE)
2903            .header_opt("X-Sentry-Relay-Shard", shard)
2904            .body(envelope_body);
2905
2906        Ok(())
2907    }
2908
2909    fn sign(&mut self) -> Option<Sign> {
2910        Some(Sign::Optional(SignatureType::RequestSign))
2911    }
2912
2913    fn respond(
2914        self: Box<Self>,
2915        result: Result<http::Response, UpstreamRequestError>,
2916    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2917        Box::pin(async move {
2918            let result = match result {
2919                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2920                Err(error) => Err(error),
2921            };
2922
2923            match result {
2924                Ok(()) => self.envelope.accept(),
2925                Err(error) if error.is_received() => {
2926                    let scoping = self.envelope.scoping();
2927                    self.envelope.accept();
2928
2929                    if let UpstreamRequestError::RateLimited(limits) = error {
2930                        self.project_cache
2931                            .get(scoping.project_key)
2932                            .rate_limits()
2933                            .merge(limits.scope(&scoping));
2934                    }
2935                }
2936                Err(error) => {
2937                    // Errors are only logged for what we consider an internal discard reason. These
2938                    // indicate errors in the infrastructure or implementation bugs.
2939                    let mut envelope = self.envelope;
2940                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2941                    relay_log::error!(
2942                        error = &error as &dyn Error,
2943                        tags.project_key = %envelope.scoping().project_key,
2944                        "error sending envelope"
2945                    );
2946                }
2947            }
2948        })
2949    }
2950}
2951
2952/// A container for metric buckets from multiple projects.
2953///
2954/// This container is used to send metrics to the upstream in global batches as part of the
2955/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
2956/// the size of all metrics and allows to split them into multiple batches. See
2957/// [`insert`](Self::insert) for more information.
2958#[derive(Debug)]
2959struct Partition<'a> {
2960    max_size: usize,
2961    remaining: usize,
2962    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2963    project_info: HashMap<ProjectKey, Scoping>,
2964}
2965
2966impl<'a> Partition<'a> {
2967    /// Creates a new partition with the given maximum size in bytes.
2968    pub fn new(size: usize) -> Self {
2969        Self {
2970            max_size: size,
2971            remaining: size,
2972            views: HashMap::new(),
2973            project_info: HashMap::new(),
2974        }
2975    }
2976
2977    /// Inserts a bucket into the partition, splitting it if necessary.
2978    ///
2979    /// This function attempts to add the bucket to this partition. If the bucket does not fit
2980    /// entirely into the partition given its maximum size, the remaining part of the bucket is
2981    /// returned from this function call.
2982    ///
2983    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
2984    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
2985    /// partition. Afterwards, the caller is responsible to call this function again with the
2986    /// remaining bucket until it is fully inserted.
2987    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
2988        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
2989
2990        if let Some(current) = current {
2991            self.remaining = self.remaining.saturating_sub(current.estimated_size());
2992            self.views
2993                .entry(scoping.project_key)
2994                .or_default()
2995                .push(current);
2996
2997            self.project_info
2998                .entry(scoping.project_key)
2999                .or_insert(scoping);
3000        }
3001
3002        next
3003    }
3004
3005    /// Returns `true` if the partition does not hold any data.
3006    fn is_empty(&self) -> bool {
3007        self.views.is_empty()
3008    }
3009
3010    /// Returns the serialized buckets for this partition.
3011    ///
3012    /// This empties the partition, so that it can be reused.
3013    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3014        #[derive(serde::Serialize)]
3015        struct Wrapper<'a> {
3016            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3017        }
3018
3019        let buckets = &self.views;
3020        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3021
3022        let scopings = self.project_info.clone();
3023        self.project_info.clear();
3024
3025        self.views.clear();
3026        self.remaining = self.max_size;
3027
3028        (payload, scopings)
3029    }
3030}
3031
3032/// An upstream request that submits metric buckets via HTTP.
3033///
3034/// This request is not awaited. It automatically tracks outcomes if the request is not received.
3035#[derive(Debug)]
3036struct SendMetricsRequest {
3037    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
3038    partition_key: String,
3039    /// Serialized metric buckets without encoding applied, used for signing.
3040    unencoded: Bytes,
3041    /// Serialized metric buckets with the stated HTTP encoding applied.
3042    encoded: Bytes,
3043    /// Mapping of all contained project keys to their scoping and extraction mode.
3044    ///
3045    /// Used to track outcomes for transmission failures.
3046    project_info: HashMap<ProjectKey, Scoping>,
3047    /// Encoding (compression) of the payload.
3048    http_encoding: HttpEncoding,
3049    /// Metric outcomes instance to send outcomes on error.
3050    metric_outcomes: MetricOutcomes,
3051}
3052
3053impl SendMetricsRequest {
3054    fn create_error_outcomes(self) {
3055        #[derive(serde::Deserialize)]
3056        struct Wrapper {
3057            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3058        }
3059
3060        let buckets = match serde_json::from_slice(&self.unencoded) {
3061            Ok(Wrapper { buckets }) => buckets,
3062            Err(err) => {
3063                relay_log::error!(
3064                    error = &err as &dyn std::error::Error,
3065                    "failed to parse buckets from failed transmission"
3066                );
3067                return;
3068            }
3069        };
3070
3071        for (key, buckets) in buckets {
3072            let Some(&scoping) = self.project_info.get(&key) else {
3073                relay_log::error!("missing scoping for project key");
3074                continue;
3075            };
3076
3077            self.metric_outcomes.track(
3078                scoping,
3079                &buckets,
3080                Outcome::Invalid(DiscardReason::Internal),
3081            );
3082        }
3083    }
3084}
3085
3086impl UpstreamRequest for SendMetricsRequest {
3087    fn set_relay_id(&self) -> bool {
3088        true
3089    }
3090
3091    fn sign(&mut self) -> Option<Sign> {
3092        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3093    }
3094
3095    fn method(&self) -> reqwest::Method {
3096        reqwest::Method::POST
3097    }
3098
3099    fn path(&self) -> Cow<'_, str> {
3100        "/api/0/relays/metrics/".into()
3101    }
3102
3103    fn route(&self) -> &'static str {
3104        "global_metrics"
3105    }
3106
3107    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3108        metric!(
3109            distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3110        );
3111
3112        builder
3113            .content_encoding(self.http_encoding)
3114            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3115            .header(header::CONTENT_TYPE, b"application/json")
3116            .body(self.encoded.clone());
3117
3118        Ok(())
3119    }
3120
3121    fn respond(
3122        self: Box<Self>,
3123        result: Result<http::Response, UpstreamRequestError>,
3124    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3125        Box::pin(async {
3126            match result {
3127                Ok(mut response) => {
3128                    response.consume().await.ok();
3129                }
3130                Err(error) => {
3131                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3132
3133                    // If the request did not arrive at the upstream, we are responsible for outcomes.
3134                    // Otherwise, the upstream is responsible to log outcomes.
3135                    if error.is_received() {
3136                        return;
3137                    }
3138
3139                    self.create_error_outcomes()
3140                }
3141            }
3142        })
3143    }
3144}
3145
3146/// Container for global and project level [`Quota`].
3147#[derive(Copy, Clone, Debug)]
3148struct CombinedQuotas<'a> {
3149    global_quotas: &'a [Quota],
3150    project_quotas: &'a [Quota],
3151}
3152
3153impl<'a> CombinedQuotas<'a> {
3154    /// Returns a new [`CombinedQuotas`].
3155    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3156        Self {
3157            global_quotas: &global_config.quotas,
3158            project_quotas,
3159        }
3160    }
3161
3162    /// Returns `true` if both global quotas and project quotas are empty.
3163    pub fn is_empty(&self) -> bool {
3164        self.len() == 0
3165    }
3166
3167    /// Returns the number of both global and project quotas.
3168    pub fn len(&self) -> usize {
3169        self.global_quotas.len() + self.project_quotas.len()
3170    }
3171}
3172
3173impl<'a> IntoIterator for CombinedQuotas<'a> {
3174    type Item = &'a Quota;
3175    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3176
3177    fn into_iter(self) -> Self::IntoIter {
3178        self.global_quotas.iter().chain(self.project_quotas.iter())
3179    }
3180}
3181
3182#[cfg(test)]
3183mod tests {
3184    use std::collections::BTreeMap;
3185
3186    use insta::assert_debug_snapshot;
3187    use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3188    use relay_common::glob2::LazyGlob;
3189    use relay_dynamic_config::ProjectConfig;
3190    use relay_event_normalization::{
3191        MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3192        TransactionNameRule,
3193    };
3194    use relay_event_schema::protocol::TransactionSource;
3195    use relay_pii::DataScrubbingConfig;
3196    use similar_asserts::assert_eq;
3197
3198    use crate::metrics_extraction::IntoMetric;
3199    use crate::metrics_extraction::transactions::types::{
3200        CommonTags, TransactionMeasurementTags, TransactionMetric,
3201    };
3202    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3203
3204    #[cfg(feature = "processing")]
3205    use {
3206        relay_metrics::BucketValue,
3207        relay_quotas::{QuotaScope, ReasonCode},
3208        relay_test::mock_service,
3209    };
3210
3211    use super::*;
3212
3213    #[cfg(feature = "processing")]
3214    fn mock_quota(id: &str) -> Quota {
3215        Quota {
3216            id: Some(id.into()),
3217            categories: [DataCategory::MetricBucket].into(),
3218            scope: QuotaScope::Organization,
3219            scope_id: None,
3220            limit: Some(0),
3221            window: None,
3222            reason_code: None,
3223            namespace: None,
3224        }
3225    }
3226
3227    #[cfg(feature = "processing")]
3228    #[test]
3229    fn test_dynamic_quotas() {
3230        let global_config = GlobalConfig {
3231            quotas: vec![mock_quota("foo"), mock_quota("bar")],
3232            ..Default::default()
3233        };
3234
3235        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3236
3237        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3238
3239        assert_eq!(dynamic_quotas.len(), 4);
3240        assert!(!dynamic_quotas.is_empty());
3241
3242        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3243        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3244    }
3245
3246    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
3247    /// also ratelimit the next batches in the same message automatically.
3248    #[cfg(feature = "processing")]
3249    #[tokio::test]
3250    async fn test_ratelimit_per_batch() {
3251        use relay_base_schema::organization::OrganizationId;
3252        use relay_protocol::FiniteF64;
3253
3254        let rate_limited_org = Scoping {
3255            organization_id: OrganizationId::new(1),
3256            project_id: ProjectId::new(21),
3257            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3258            key_id: Some(17),
3259        };
3260
3261        let not_rate_limited_org = Scoping {
3262            organization_id: OrganizationId::new(2),
3263            project_id: ProjectId::new(21),
3264            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3265            key_id: Some(17),
3266        };
3267
3268        let message = {
3269            let project_info = {
3270                let quota = Quota {
3271                    id: Some("testing".into()),
3272                    categories: [DataCategory::MetricBucket].into(),
3273                    scope: relay_quotas::QuotaScope::Organization,
3274                    scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3275                    limit: Some(0),
3276                    window: None,
3277                    reason_code: Some(ReasonCode::new("test")),
3278                    namespace: None,
3279                };
3280
3281                let mut config = ProjectConfig::default();
3282                config.quotas.push(quota);
3283
3284                Arc::new(ProjectInfo {
3285                    config,
3286                    ..Default::default()
3287                })
3288            };
3289
3290            let project_metrics = |scoping| ProjectBuckets {
3291                buckets: vec![Bucket {
3292                    name: "d:transactions/bar".into(),
3293                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3294                    timestamp: UnixTimestamp::now(),
3295                    tags: Default::default(),
3296                    width: 10,
3297                    metadata: BucketMetadata::default(),
3298                }],
3299                rate_limits: Default::default(),
3300                project_info: project_info.clone(),
3301                scoping,
3302            };
3303
3304            let buckets = hashbrown::HashMap::from([
3305                (
3306                    rate_limited_org.project_key,
3307                    project_metrics(rate_limited_org),
3308                ),
3309                (
3310                    not_rate_limited_org.project_key,
3311                    project_metrics(not_rate_limited_org),
3312                ),
3313            ]);
3314
3315            FlushBuckets {
3316                partition_key: 0,
3317                buckets,
3318            }
3319        };
3320
3321        // ensure the order of the map while iterating is as expected.
3322        assert_eq!(message.buckets.keys().count(), 2);
3323
3324        let config = {
3325            let config_json = serde_json::json!({
3326                "processing": {
3327                    "enabled": true,
3328                    "kafka_config": [],
3329                    "redis": {
3330                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3331                    }
3332                }
3333            });
3334            Config::from_json_value(config_json).unwrap()
3335        };
3336
3337        let (store, handle) = {
3338            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3339                let org_id = match msg {
3340                    Store::Metrics(x) => x.scoping.organization_id,
3341                    _ => panic!("received envelope when expecting only metrics"),
3342                };
3343                org_ids.push(org_id);
3344            };
3345
3346            mock_service("store_forwarder", vec![], f)
3347        };
3348
3349        let processor = create_test_processor(config).await;
3350        assert!(processor.redis_rate_limiter_enabled());
3351
3352        processor.encode_metrics_processing(message, &store).await;
3353
3354        drop(store);
3355        let orgs_not_ratelimited = handle.await.unwrap();
3356
3357        assert_eq!(
3358            orgs_not_ratelimited,
3359            vec![not_rate_limited_org.organization_id]
3360        );
3361    }
3362
3363    #[tokio::test]
3364    async fn test_browser_version_extraction_with_pii_like_data() {
3365        let processor = create_test_processor(Default::default()).await;
3366        let outcome_aggregator = Addr::dummy();
3367        let event_id = EventId::new();
3368
3369        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3370            .parse()
3371            .unwrap();
3372
3373        let request_meta = RequestMeta::new(dsn);
3374        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3375
3376        envelope.add_item({
3377                let mut item = Item::new(ItemType::Event);
3378                item.set_payload(
3379                    ContentType::Json,
3380                    r#"
3381                    {
3382                        "request": {
3383                            "headers": [
3384                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3385                            ]
3386                        }
3387                    }
3388                "#,
3389                );
3390                item
3391            });
3392
3393        let mut datascrubbing_settings = DataScrubbingConfig::default();
3394        // enable all the default scrubbing
3395        datascrubbing_settings.scrub_data = true;
3396        datascrubbing_settings.scrub_defaults = true;
3397        datascrubbing_settings.scrub_ip_addresses = true;
3398
3399        // Make sure to mask any IP-like looking data
3400        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3401
3402        let config = ProjectConfig {
3403            datascrubbing_settings,
3404            pii_config: Some(pii_config),
3405            ..Default::default()
3406        };
3407
3408        let project_info = ProjectInfo {
3409            config,
3410            ..Default::default()
3411        };
3412
3413        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3414        assert_eq!(envelopes.len(), 1);
3415
3416        let (group, envelope) = envelopes.pop().unwrap();
3417        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3418
3419        let message = ProcessEnvelopeGrouped {
3420            group,
3421            envelope,
3422            ctx: processing::Context {
3423                project_info: &project_info,
3424                ..processing::Context::for_test()
3425            },
3426        };
3427
3428        let Ok(Some(Submit::Envelope(mut new_envelope))) = processor.process(message).await else {
3429            panic!();
3430        };
3431        let new_envelope = new_envelope.envelope_mut();
3432
3433        let event_item = new_envelope.items().last().unwrap();
3434        let annotated_event: Annotated<Event> =
3435            Annotated::from_json_bytes(&event_item.payload()).unwrap();
3436        let event = annotated_event.into_value().unwrap();
3437        let headers = event
3438            .request
3439            .into_value()
3440            .unwrap()
3441            .headers
3442            .into_value()
3443            .unwrap();
3444
3445        // IP-like data must be masked
3446        assert_eq!(
3447            Some(
3448                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3449            ),
3450            headers.get_header("User-Agent")
3451        );
3452        // But we still get correct browser and version number
3453        let contexts = event.contexts.into_value().unwrap();
3454        let browser = contexts.0.get("browser").unwrap();
3455        assert_eq!(
3456            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3457            browser.to_json().unwrap()
3458        );
3459    }
3460
3461    #[tokio::test]
3462    #[cfg(feature = "processing")]
3463    async fn test_materialize_dsc() {
3464        use crate::services::projects::project::PublicKeyConfig;
3465
3466        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3467            .parse()
3468            .unwrap();
3469        let request_meta = RequestMeta::new(dsn);
3470        let mut envelope = Envelope::from_request(None, request_meta);
3471
3472        let dsc = r#"{
3473            "trace_id": "00000000-0000-0000-0000-000000000000",
3474            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3475            "sample_rate": "0.2"
3476        }"#;
3477        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3478
3479        let mut item = Item::new(ItemType::Event);
3480        item.set_payload(ContentType::Json, r#"{}"#);
3481        envelope.add_item(item);
3482
3483        let outcome_aggregator = Addr::dummy();
3484        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3485
3486        let mut project_info = ProjectInfo::default();
3487        project_info.public_keys.push(PublicKeyConfig {
3488            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3489            numeric_id: Some(1),
3490        });
3491
3492        let config = serde_json::json!({
3493            "processing": {
3494                "enabled": true,
3495                "kafka_config": [],
3496            }
3497        });
3498
3499        let message = ProcessEnvelopeGrouped {
3500            group: ProcessingGroup::Error,
3501            envelope: managed_envelope,
3502            ctx: processing::Context {
3503                config: &Config::from_json_value(config.clone()).unwrap(),
3504                project_info: &project_info,
3505                sampling_project_info: Some(&project_info),
3506                ..processing::Context::for_test()
3507            },
3508        };
3509
3510        let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3511        let Ok(Some(Submit::Envelope(envelope))) = processor.process(message).await else {
3512            panic!();
3513        };
3514        let event = envelope
3515            .envelope()
3516            .get_item_by(|item| item.ty() == &ItemType::Event)
3517            .unwrap();
3518
3519        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3520        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3521        Object(
3522            {
3523                "environment": ~,
3524                "public_key": String(
3525                    "e12d836b15bb49d7bbf99e64295d995b",
3526                ),
3527                "release": ~,
3528                "replay_id": ~,
3529                "sample_rate": String(
3530                    "0.2",
3531                ),
3532                "trace_id": String(
3533                    "00000000000000000000000000000000",
3534                ),
3535                "transaction": ~,
3536            },
3537        )
3538        "###);
3539    }
3540
3541    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3542        let mut event = Annotated::<Event>::from_json(
3543            r#"
3544            {
3545                "type": "transaction",
3546                "transaction": "/foo/",
3547                "timestamp": 946684810.0,
3548                "start_timestamp": 946684800.0,
3549                "contexts": {
3550                    "trace": {
3551                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3552                        "span_id": "fa90fdead5f74053",
3553                        "op": "http.server",
3554                        "type": "trace"
3555                    }
3556                },
3557                "transaction_info": {
3558                    "source": "url"
3559                }
3560            }
3561            "#,
3562        )
3563        .unwrap();
3564        let e = event.value_mut().as_mut().unwrap();
3565        e.transaction.set_value(Some(transaction_name.into()));
3566
3567        e.transaction_info
3568            .value_mut()
3569            .as_mut()
3570            .unwrap()
3571            .source
3572            .set_value(Some(source));
3573
3574        relay_statsd::with_capturing_test_client(|| {
3575            utils::log_transaction_name_metrics(&mut event, |event| {
3576                let config = NormalizationConfig {
3577                    transaction_name_config: TransactionNameConfig {
3578                        rules: &[TransactionNameRule {
3579                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
3580                            expiry: DateTime::<Utc>::MAX_UTC,
3581                            redaction: RedactionRule::Replace {
3582                                substitution: "*".to_owned(),
3583                            },
3584                        }],
3585                    },
3586                    ..Default::default()
3587                };
3588                relay_event_normalization::normalize_event(event, &config)
3589            });
3590        })
3591    }
3592
3593    #[test]
3594    fn test_log_transaction_metrics_none() {
3595        let captures = capture_test_event("/nothing", TransactionSource::Url);
3596        insta::assert_debug_snapshot!(captures, @r###"
3597        [
3598            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3599        ]
3600        "###);
3601    }
3602
3603    #[test]
3604    fn test_log_transaction_metrics_rule() {
3605        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3606        insta::assert_debug_snapshot!(captures, @r###"
3607        [
3608            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3609        ]
3610        "###);
3611    }
3612
3613    #[test]
3614    fn test_log_transaction_metrics_pattern() {
3615        let captures = capture_test_event("/something/12345", TransactionSource::Url);
3616        insta::assert_debug_snapshot!(captures, @r###"
3617        [
3618            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3619        ]
3620        "###);
3621    }
3622
3623    #[test]
3624    fn test_log_transaction_metrics_both() {
3625        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3626        insta::assert_debug_snapshot!(captures, @r###"
3627        [
3628            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3629        ]
3630        "###);
3631    }
3632
3633    #[test]
3634    fn test_log_transaction_metrics_no_match() {
3635        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3636        insta::assert_debug_snapshot!(captures, @r###"
3637        [
3638            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3639        ]
3640        "###);
3641    }
3642
3643    /// Confirms that the hardcoded value we use for the fixed length of the measurement MRI is
3644    /// correct. Unit test is placed here because it has dependencies to relay-server and therefore
3645    /// cannot be called from relay-metrics.
3646    #[test]
3647    fn test_mri_overhead_constant() {
3648        let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
3649
3650        let derived_value = {
3651            let name = "foobar".to_owned();
3652            let value = 5.into(); // Arbitrary value.
3653            let unit = MetricUnit::Duration(DurationUnit::default());
3654            let tags = TransactionMeasurementTags {
3655                measurement_rating: None,
3656                universal_tags: CommonTags(BTreeMap::new()),
3657                score_profile_version: None,
3658            };
3659
3660            let measurement = TransactionMetric::Measurement {
3661                name: name.clone(),
3662                value,
3663                unit,
3664                tags,
3665            };
3666
3667            let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
3668            metric.name.len() - unit.to_string().len() - name.len()
3669        };
3670        assert_eq!(
3671            hardcoded_value, derived_value,
3672            "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
3673        );
3674    }
3675
3676    #[tokio::test]
3677    async fn test_process_metrics_bucket_metadata() {
3678        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3679        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3680        let received_at = Utc::now();
3681        let config = Config::default();
3682
3683        let (aggregator, mut aggregator_rx) = Addr::custom();
3684        let processor = create_test_processor_with_addrs(
3685            config,
3686            Addrs {
3687                aggregator,
3688                ..Default::default()
3689            },
3690        )
3691        .await;
3692
3693        let mut item = Item::new(ItemType::Statsd);
3694        item.set_payload(
3695            ContentType::Text,
3696            "transactions/foo:3182887624:4267882815|s",
3697        );
3698        for (source, expected_received_at) in [
3699            (
3700                BucketSource::External,
3701                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3702            ),
3703            (BucketSource::Internal, None),
3704        ] {
3705            let message = ProcessMetrics {
3706                data: MetricData::Raw(vec![item.clone()]),
3707                project_key,
3708                source,
3709                received_at,
3710                sent_at: Some(Utc::now()),
3711            };
3712            processor.handle_process_metrics(&mut token, message);
3713
3714            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3715            let buckets = merge_buckets.buckets;
3716            assert_eq!(buckets.len(), 1);
3717            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3718        }
3719    }
3720
3721    #[tokio::test]
3722    async fn test_process_batched_metrics() {
3723        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3724        let received_at = Utc::now();
3725        let config = Config::default();
3726
3727        let (aggregator, mut aggregator_rx) = Addr::custom();
3728        let processor = create_test_processor_with_addrs(
3729            config,
3730            Addrs {
3731                aggregator,
3732                ..Default::default()
3733            },
3734        )
3735        .await;
3736
3737        let payload = r#"{
3738    "buckets": {
3739        "11111111111111111111111111111111": [
3740            {
3741                "timestamp": 1615889440,
3742                "width": 0,
3743                "name": "d:custom/endpoint.response_time@millisecond",
3744                "type": "d",
3745                "value": [
3746                  68.0
3747                ],
3748                "tags": {
3749                  "route": "user_index"
3750                }
3751            }
3752        ],
3753        "22222222222222222222222222222222": [
3754            {
3755                "timestamp": 1615889440,
3756                "width": 0,
3757                "name": "d:custom/endpoint.cache_rate@none",
3758                "type": "d",
3759                "value": [
3760                  36.0
3761                ]
3762            }
3763        ]
3764    }
3765}
3766"#;
3767        let message = ProcessBatchedMetrics {
3768            payload: Bytes::from(payload),
3769            source: BucketSource::Internal,
3770            received_at,
3771            sent_at: Some(Utc::now()),
3772        };
3773        processor.handle_process_batched_metrics(&mut token, message);
3774
3775        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3776        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3777
3778        let mut messages = vec![mb1, mb2];
3779        messages.sort_by_key(|mb| mb.project_key);
3780
3781        let actual = messages
3782            .into_iter()
3783            .map(|mb| (mb.project_key, mb.buckets))
3784            .collect::<Vec<_>>();
3785
3786        assert_debug_snapshot!(actual, @r###"
3787        [
3788            (
3789                ProjectKey("11111111111111111111111111111111"),
3790                [
3791                    Bucket {
3792                        timestamp: UnixTimestamp(1615889440),
3793                        width: 0,
3794                        name: MetricName(
3795                            "d:custom/endpoint.response_time@millisecond",
3796                        ),
3797                        value: Distribution(
3798                            [
3799                                68.0,
3800                            ],
3801                        ),
3802                        tags: {
3803                            "route": "user_index",
3804                        },
3805                        metadata: BucketMetadata {
3806                            merges: 1,
3807                            received_at: None,
3808                            extracted_from_indexed: false,
3809                        },
3810                    },
3811                ],
3812            ),
3813            (
3814                ProjectKey("22222222222222222222222222222222"),
3815                [
3816                    Bucket {
3817                        timestamp: UnixTimestamp(1615889440),
3818                        width: 0,
3819                        name: MetricName(
3820                            "d:custom/endpoint.cache_rate@none",
3821                        ),
3822                        value: Distribution(
3823                            [
3824                                36.0,
3825                            ],
3826                        ),
3827                        tags: {},
3828                        metadata: BucketMetadata {
3829                            merges: 1,
3830                            received_at: None,
3831                            extracted_from_indexed: false,
3832                        },
3833                    },
3834                ],
3835            ),
3836        ]
3837        "###);
3838    }
3839}