relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27    ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42    self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::{Integration, SpansIntegration};
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::ExtractedMetrics;
49use crate::metrics_extraction::transactions::types::ExtractMetricsError;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::profile_chunks::ProfileChunksProcessor;
53use crate::processing::sessions::SessionsProcessor;
54use crate::processing::spans::SpansProcessor;
55use crate::processing::trace_attachments::TraceAttachmentsProcessor;
56use crate::processing::trace_metrics::TraceMetricsProcessor;
57use crate::processing::transactions::TransactionProcessor;
58use crate::processing::utils::event::{
59    EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
60    event_type,
61};
62use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
63use crate::service::ServiceError;
64use crate::services::global_config::GlobalConfigHandle;
65use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
66use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
67use crate::services::projects::cache::ProjectCacheHandle;
68use crate::services::projects::project::{ProjectInfo, ProjectState};
69use crate::services::upstream::{
70    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
71};
72use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
73use crate::utils::{self, CheckLimits, EnvelopeLimiter};
74use crate::{http, processing};
75use relay_threading::AsyncPool;
76#[cfg(feature = "processing")]
77use {
78    crate::services::processor::nnswitch::SwitchProcessingError,
79    crate::services::store::{Store, StoreEnvelope},
80    crate::services::upload::Upload,
81    crate::utils::Enforcement,
82    itertools::Itertools,
83    relay_cardinality::{
84        CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
85        RedisSetLimiterOptions,
86    },
87    relay_dynamic_config::CardinalityLimiterMode,
88    relay_quotas::{RateLimitingError, RedisRateLimiter},
89    relay_redis::RedisClients,
90    std::time::Instant,
91    symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
92};
93
94mod attachment;
95mod dynamic_sampling;
96mod event;
97mod metrics;
98mod nel;
99mod profile;
100mod replay;
101mod report;
102mod span;
103
104#[cfg(all(sentry, feature = "processing"))]
105mod playstation;
106mod standalone;
107#[cfg(feature = "processing")]
108mod unreal;
109
110#[cfg(feature = "processing")]
111mod nnswitch;
112
113/// Creates the block only if used with `processing` feature.
114///
115/// Provided code block will be executed only if the provided config has `processing_enabled` set.
116macro_rules! if_processing {
117    ($config:expr, $if_true:block) => {
118        #[cfg(feature = "processing")] {
119            if $config.processing_enabled() $if_true
120        }
121    };
122    ($config:expr, $if_true:block else $if_false:block) => {
123        {
124            #[cfg(feature = "processing")] {
125                if $config.processing_enabled() $if_true else $if_false
126            }
127            #[cfg(not(feature = "processing"))] {
128                $if_false
129            }
130        }
131    };
132}
133
134/// The minimum clock drift for correction to apply.
135pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
136
137#[derive(Debug)]
138pub struct GroupTypeError;
139
140impl Display for GroupTypeError {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        f.write_str("failed to convert processing group into corresponding type")
143    }
144}
145
146impl std::error::Error for GroupTypeError {}
147
148macro_rules! processing_group {
149    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
150        #[derive(Clone, Copy, Debug)]
151        pub struct $ty;
152
153        impl From<$ty> for ProcessingGroup {
154            fn from(_: $ty) -> Self {
155                ProcessingGroup::$variant
156            }
157        }
158
159        impl TryFrom<ProcessingGroup> for $ty {
160            type Error = GroupTypeError;
161
162            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
163                if matches!(value, ProcessingGroup::$variant) {
164                    return Ok($ty);
165                }
166                $($(
167                    if matches!(value, ProcessingGroup::$other) {
168                        return Ok($ty);
169                    }
170                )+)?
171                return Err(GroupTypeError);
172            }
173        }
174    };
175}
176
177/// A marker trait.
178///
179/// Should be used only with groups which are responsible for processing envelopes with events.
180pub trait EventProcessing {}
181
182processing_group!(TransactionGroup, Transaction);
183impl EventProcessing for TransactionGroup {}
184
185processing_group!(ErrorGroup, Error);
186impl EventProcessing for ErrorGroup {}
187
188processing_group!(SessionGroup, Session);
189processing_group!(StandaloneGroup, Standalone);
190processing_group!(ClientReportGroup, ClientReport);
191processing_group!(ReplayGroup, Replay);
192processing_group!(CheckInGroup, CheckIn);
193processing_group!(LogGroup, Log, Nel);
194processing_group!(TraceMetricGroup, TraceMetric);
195processing_group!(SpanGroup, Span);
196
197processing_group!(ProfileChunkGroup, ProfileChunk);
198processing_group!(MetricsGroup, Metrics);
199processing_group!(ForwardUnknownGroup, ForwardUnknown);
200processing_group!(Ungrouped, Ungrouped);
201
202/// Processed group type marker.
203///
204/// Marks the envelopes which passed through the processing pipeline.
205#[derive(Clone, Copy, Debug)]
206pub struct Processed;
207
208/// Describes the groups of the processable items.
209#[derive(Clone, Copy, Debug)]
210pub enum ProcessingGroup {
211    /// All the transaction related items.
212    ///
213    /// Includes transactions, related attachments, profiles.
214    Transaction,
215    /// All the items which require (have or create) events.
216    ///
217    /// This includes: errors, NEL, security reports, user reports, some of the
218    /// attachments.
219    Error,
220    /// Session events.
221    Session,
222    /// Standalone items which can be sent alone without any event attached to it in the current
223    /// envelope e.g. some attachments, user reports.
224    Standalone,
225    /// Outcomes.
226    ClientReport,
227    /// Replays and ReplayRecordings.
228    Replay,
229    /// Crons.
230    CheckIn,
231    /// NEL reports.
232    Nel,
233    /// Logs.
234    Log,
235    /// Trace metrics.
236    TraceMetric,
237    /// Spans.
238    Span,
239    /// Span V2 spans.
240    SpanV2,
241    /// Metrics.
242    Metrics,
243    /// ProfileChunk.
244    ProfileChunk,
245    /// V2 attachments without span / log association.
246    TraceAttachment,
247    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
248    /// decide what to do with them.
249    ForwardUnknown,
250    /// All the items in the envelope that could not be grouped.
251    Ungrouped,
252}
253
254impl ProcessingGroup {
255    /// Splits provided envelope into list of tuples of groups with associated envelopes.
256    fn split_envelope(
257        mut envelope: Envelope,
258        project_info: &ProjectInfo,
259    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
260        let headers = envelope.headers().clone();
261        let mut grouped_envelopes = smallvec![];
262
263        // Extract replays.
264        let replay_items = envelope.take_items_by(|item| {
265            matches!(
266                item.ty(),
267                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
268            )
269        });
270        if !replay_items.is_empty() {
271            grouped_envelopes.push((
272                ProcessingGroup::Replay,
273                Envelope::from_parts(headers.clone(), replay_items),
274            ))
275        }
276
277        // Keep all the sessions together in one envelope.
278        let session_items = envelope
279            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
280        if !session_items.is_empty() {
281            grouped_envelopes.push((
282                ProcessingGroup::Session,
283                Envelope::from_parts(headers.clone(), session_items),
284            ))
285        }
286
287        let span_v2_items = envelope.take_items_by(|item| {
288            let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
289            let otlp_feature = project_info.has_feature(Feature::SpanV2OtlpProcessing);
290            let is_supported_integration = {
291                matches!(
292                    item.integration(),
293                    Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
294                )
295            };
296            let is_span = matches!(item.ty(), &ItemType::Span);
297            let is_span_attachment = item.is_span_attachment();
298
299            ItemContainer::<SpanV2>::is_container(item)
300                || (exp_feature && is_span)
301                || ((exp_feature || otlp_feature) && is_supported_integration)
302                || (exp_feature && is_span_attachment)
303        });
304
305        if !span_v2_items.is_empty() {
306            grouped_envelopes.push((
307                ProcessingGroup::SpanV2,
308                Envelope::from_parts(headers.clone(), span_v2_items),
309            ))
310        }
311
312        // Extract spans.
313        let span_items = envelope.take_items_by(|item| {
314            matches!(item.ty(), &ItemType::Span)
315                || matches!(item.integration(), Some(Integration::Spans(_)))
316        });
317
318        if !span_items.is_empty() {
319            grouped_envelopes.push((
320                ProcessingGroup::Span,
321                Envelope::from_parts(headers.clone(), span_items),
322            ))
323        }
324
325        // Extract logs.
326        let logs_items = envelope.take_items_by(|item| {
327            matches!(item.ty(), &ItemType::Log)
328                || matches!(item.integration(), Some(Integration::Logs(_)))
329        });
330        if !logs_items.is_empty() {
331            grouped_envelopes.push((
332                ProcessingGroup::Log,
333                Envelope::from_parts(headers.clone(), logs_items),
334            ))
335        }
336
337        // Extract trace metrics.
338        let trace_metric_items =
339            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
340        if !trace_metric_items.is_empty() {
341            grouped_envelopes.push((
342                ProcessingGroup::TraceMetric,
343                Envelope::from_parts(headers.clone(), trace_metric_items),
344            ))
345        }
346
347        // NEL items are transformed into logs in their own processing step.
348        let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
349        if !nel_items.is_empty() {
350            grouped_envelopes.push((
351                ProcessingGroup::Nel,
352                Envelope::from_parts(headers.clone(), nel_items),
353            ))
354        }
355
356        // Extract all metric items.
357        //
358        // Note: Should only be relevant in proxy mode. In other modes we send metrics through
359        // a separate pipeline.
360        let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
361        if !metric_items.is_empty() {
362            grouped_envelopes.push((
363                ProcessingGroup::Metrics,
364                Envelope::from_parts(headers.clone(), metric_items),
365            ))
366        }
367
368        // Extract profile chunks.
369        let profile_chunk_items =
370            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
371        if !profile_chunk_items.is_empty() {
372            grouped_envelopes.push((
373                ProcessingGroup::ProfileChunk,
374                Envelope::from_parts(headers.clone(), profile_chunk_items),
375            ))
376        }
377
378        let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
379        if !trace_attachment_items.is_empty() {
380            grouped_envelopes.push((
381                ProcessingGroup::TraceAttachment,
382                Envelope::from_parts(headers.clone(), trace_attachment_items),
383            ))
384        }
385
386        // Extract all standalone items.
387        //
388        // Note: only if there are no items in the envelope which can create events, otherwise they
389        // will be in the same envelope with all require event items.
390        if !envelope.items().any(Item::creates_event) {
391            let standalone_items = envelope.take_items_by(Item::requires_event);
392            if !standalone_items.is_empty() {
393                grouped_envelopes.push((
394                    ProcessingGroup::Standalone,
395                    Envelope::from_parts(headers.clone(), standalone_items),
396                ))
397            }
398        };
399
400        // Make sure we create separate envelopes for each `RawSecurity` report.
401        let security_reports_items = envelope
402            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
403            .into_iter()
404            .map(|item| {
405                let headers = headers.clone();
406                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
407                let mut envelope = Envelope::from_parts(headers, items);
408                envelope.set_event_id(EventId::new());
409                (ProcessingGroup::Error, envelope)
410            });
411        grouped_envelopes.extend(security_reports_items);
412
413        // Extract all the items which require an event into separate envelope.
414        let require_event_items = envelope.take_items_by(Item::requires_event);
415        if !require_event_items.is_empty() {
416            let group = if require_event_items
417                .iter()
418                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
419            {
420                ProcessingGroup::Transaction
421            } else {
422                ProcessingGroup::Error
423            };
424
425            grouped_envelopes.push((
426                group,
427                Envelope::from_parts(headers.clone(), require_event_items),
428            ))
429        }
430
431        // Get the rest of the envelopes, one per item.
432        let envelopes = envelope.items_mut().map(|item| {
433            let headers = headers.clone();
434            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
435            let envelope = Envelope::from_parts(headers, items);
436            let item_type = item.ty();
437            let group = if matches!(item_type, &ItemType::CheckIn) {
438                ProcessingGroup::CheckIn
439            } else if matches!(item.ty(), &ItemType::ClientReport) {
440                ProcessingGroup::ClientReport
441            } else if matches!(item_type, &ItemType::Unknown(_)) {
442                ProcessingGroup::ForwardUnknown
443            } else {
444                // Cannot group this item type.
445                ProcessingGroup::Ungrouped
446            };
447
448            (group, envelope)
449        });
450        grouped_envelopes.extend(envelopes);
451
452        grouped_envelopes
453    }
454
455    /// Returns the name of the group.
456    pub fn variant(&self) -> &'static str {
457        match self {
458            ProcessingGroup::Transaction => "transaction",
459            ProcessingGroup::Error => "error",
460            ProcessingGroup::Session => "session",
461            ProcessingGroup::Standalone => "standalone",
462            ProcessingGroup::ClientReport => "client_report",
463            ProcessingGroup::Replay => "replay",
464            ProcessingGroup::CheckIn => "check_in",
465            ProcessingGroup::Log => "log",
466            ProcessingGroup::TraceMetric => "trace_metric",
467            ProcessingGroup::Nel => "nel",
468            ProcessingGroup::Span => "span",
469            ProcessingGroup::SpanV2 => "span_v2",
470            ProcessingGroup::Metrics => "metrics",
471            ProcessingGroup::ProfileChunk => "profile_chunk",
472            ProcessingGroup::TraceAttachment => "trace_attachment",
473            ProcessingGroup::ForwardUnknown => "forward_unknown",
474            ProcessingGroup::Ungrouped => "ungrouped",
475        }
476    }
477}
478
479impl From<ProcessingGroup> for AppFeature {
480    fn from(value: ProcessingGroup) -> Self {
481        match value {
482            ProcessingGroup::Transaction => AppFeature::Transactions,
483            ProcessingGroup::Error => AppFeature::Errors,
484            ProcessingGroup::Session => AppFeature::Sessions,
485            ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
486            ProcessingGroup::ClientReport => AppFeature::ClientReports,
487            ProcessingGroup::Replay => AppFeature::Replays,
488            ProcessingGroup::CheckIn => AppFeature::CheckIns,
489            ProcessingGroup::Log => AppFeature::Logs,
490            ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
491            ProcessingGroup::Nel => AppFeature::Logs,
492            ProcessingGroup::Span => AppFeature::Spans,
493            ProcessingGroup::SpanV2 => AppFeature::Spans,
494            ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
495            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
496            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
497            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
498            ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
499        }
500    }
501}
502
503/// An error returned when handling [`ProcessEnvelope`].
504#[derive(Debug, thiserror::Error)]
505pub enum ProcessingError {
506    #[error("invalid json in event")]
507    InvalidJson(#[source] serde_json::Error),
508
509    #[error("invalid message pack event payload")]
510    InvalidMsgpack(#[from] rmp_serde::decode::Error),
511
512    #[cfg(feature = "processing")]
513    #[error("invalid unreal crash report")]
514    InvalidUnrealReport(#[source] Unreal4Error),
515
516    #[error("event payload too large")]
517    PayloadTooLarge(DiscardItemType),
518
519    #[error("invalid transaction event")]
520    InvalidTransaction,
521
522    #[error("envelope processor failed")]
523    ProcessingFailed(#[from] ProcessingAction),
524
525    #[error("duplicate {0} in event")]
526    DuplicateItem(ItemType),
527
528    #[error("failed to extract event payload")]
529    NoEventPayload,
530
531    #[error("missing project id in DSN")]
532    MissingProjectId,
533
534    #[error("invalid security report type: {0:?}")]
535    InvalidSecurityType(Bytes),
536
537    #[error("unsupported security report type")]
538    UnsupportedSecurityType,
539
540    #[error("invalid security report")]
541    InvalidSecurityReport(#[source] serde_json::Error),
542
543    #[error("invalid nel report")]
544    InvalidNelReport(#[source] NetworkReportError),
545
546    #[error("event filtered with reason: {0:?}")]
547    EventFiltered(FilterStatKey),
548
549    #[error("missing or invalid required event timestamp")]
550    InvalidTimestamp,
551
552    #[error("could not serialize event payload")]
553    SerializeFailed(#[source] serde_json::Error),
554
555    #[cfg(feature = "processing")]
556    #[error("failed to apply quotas")]
557    QuotasFailed(#[from] RateLimitingError),
558
559    #[error("invalid pii config")]
560    PiiConfigError(PiiConfigError),
561
562    #[error("invalid processing group type")]
563    InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
564
565    #[error("invalid replay")]
566    InvalidReplay(DiscardReason),
567
568    #[error("replay filtered with reason: {0:?}")]
569    ReplayFiltered(FilterStatKey),
570
571    #[cfg(feature = "processing")]
572    #[error("nintendo switch dying message processing failed {0:?}")]
573    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
574
575    #[cfg(all(sentry, feature = "processing"))]
576    #[error("playstation dump processing failed: {0}")]
577    InvalidPlaystationDump(String),
578
579    #[error("processing group does not match specific processor")]
580    ProcessingGroupMismatch,
581    #[error("new processing pipeline failed")]
582    ProcessingFailure,
583}
584
585impl ProcessingError {
586    pub fn to_outcome(&self) -> Option<Outcome> {
587        match self {
588            Self::PayloadTooLarge(payload_type) => {
589                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
590            }
591            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
592            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
593            Self::InvalidSecurityType(_) => {
594                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
595            }
596            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
597            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
598            Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
599            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
600            Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
601            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
602            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
603            #[cfg(feature = "processing")]
604            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
605            #[cfg(all(sentry, feature = "processing"))]
606            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
607            #[cfg(feature = "processing")]
608            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
609                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
610            }
611            #[cfg(feature = "processing")]
612            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
613            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
614                Some(Outcome::Invalid(DiscardReason::Internal))
615            }
616            #[cfg(feature = "processing")]
617            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
618            Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
619            Self::MissingProjectId => None,
620            Self::EventFiltered(_) => None,
621            Self::InvalidProcessingGroup(_) => None,
622            Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
623            Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
624
625            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
626            // Outcomes are emitted in the new processing pipeline already.
627            Self::ProcessingFailure => None,
628        }
629    }
630
631    fn is_unexpected(&self) -> bool {
632        self.to_outcome()
633            .is_some_and(|outcome| outcome.is_unexpected())
634    }
635}
636
637#[cfg(feature = "processing")]
638impl From<Unreal4Error> for ProcessingError {
639    fn from(err: Unreal4Error) -> Self {
640        match err.kind() {
641            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
642            _ => ProcessingError::InvalidUnrealReport(err),
643        }
644    }
645}
646
647impl From<ExtractMetricsError> for ProcessingError {
648    fn from(error: ExtractMetricsError) -> Self {
649        match error {
650            ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
651                Self::InvalidTimestamp
652            }
653        }
654    }
655}
656
657impl From<InvalidProcessingGroupType> for ProcessingError {
658    fn from(value: InvalidProcessingGroupType) -> Self {
659        Self::InvalidProcessingGroup(Box::new(value))
660    }
661}
662
663type ExtractedEvent = (Annotated<Event>, usize);
664
665/// A container for extracted metrics during processing.
666///
667/// The container enforces that the extracted metrics are correctly tagged
668/// with the dynamic sampling decision.
669#[derive(Debug)]
670pub struct ProcessingExtractedMetrics {
671    metrics: ExtractedMetrics,
672}
673
674impl ProcessingExtractedMetrics {
675    pub fn new() -> Self {
676        Self {
677            metrics: ExtractedMetrics::default(),
678        }
679    }
680
681    pub fn into_inner(self) -> ExtractedMetrics {
682        self.metrics
683    }
684
685    /// Extends the contained metrics with [`ExtractedMetrics`].
686    pub fn extend(
687        &mut self,
688        extracted: ExtractedMetrics,
689        sampling_decision: Option<SamplingDecision>,
690    ) {
691        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
692        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
693    }
694
695    /// Extends the contained project metrics.
696    pub fn extend_project_metrics<I>(
697        &mut self,
698        buckets: I,
699        sampling_decision: Option<SamplingDecision>,
700    ) where
701        I: IntoIterator<Item = Bucket>,
702    {
703        self.metrics
704            .project_metrics
705            .extend(buckets.into_iter().map(|mut bucket| {
706                bucket.metadata.extracted_from_indexed =
707                    sampling_decision == Some(SamplingDecision::Keep);
708                bucket
709            }));
710    }
711
712    /// Extends the contained sampling metrics.
713    pub fn extend_sampling_metrics<I>(
714        &mut self,
715        buckets: I,
716        sampling_decision: Option<SamplingDecision>,
717    ) where
718        I: IntoIterator<Item = Bucket>,
719    {
720        self.metrics
721            .sampling_metrics
722            .extend(buckets.into_iter().map(|mut bucket| {
723                bucket.metadata.extracted_from_indexed =
724                    sampling_decision == Some(SamplingDecision::Keep);
725                bucket
726            }));
727    }
728
729    /// Applies rate limits to the contained metrics.
730    ///
731    /// This is used to apply rate limits which have been enforced on sampled items of an envelope
732    /// to also consistently apply to the metrics extracted from these items.
733    #[cfg(feature = "processing")]
734    fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
735        // Metric namespaces which need to be dropped.
736        let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
737        // Metrics belonging to this metric namespace need to have the `extracted_from_indexed`
738        // flag reset to `false`.
739        let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
740
741        for (namespace, limit, indexed) in [
742            (
743                MetricNamespace::Transactions,
744                &enforcement.event,
745                &enforcement.event_indexed,
746            ),
747            (
748                MetricNamespace::Spans,
749                &enforcement.spans,
750                &enforcement.spans_indexed,
751            ),
752        ] {
753            if limit.is_active() {
754                drop_namespaces.push(namespace);
755            } else if indexed.is_active() && !enforced_consistently {
756                // If the enforcement was not computed by consistently checking the limits,
757                // the quota for the metrics has not yet been incremented.
758                // In this case we have a dropped indexed payload but a metric which still needs to
759                // be accounted for, make sure the metric will still be rate limited.
760                reset_extracted_from_indexed.push(namespace);
761            }
762        }
763
764        if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
765            self.retain_mut(|bucket| {
766                let Some(namespace) = bucket.name.try_namespace() else {
767                    return true;
768                };
769
770                if drop_namespaces.contains(&namespace) {
771                    return false;
772                }
773
774                if reset_extracted_from_indexed.contains(&namespace) {
775                    bucket.metadata.extracted_from_indexed = false;
776                }
777
778                true
779            });
780        }
781    }
782
783    #[cfg(feature = "processing")]
784    fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
785        self.metrics.project_metrics.retain_mut(&mut f);
786        self.metrics.sampling_metrics.retain_mut(&mut f);
787    }
788}
789
790fn send_metrics(
791    metrics: ExtractedMetrics,
792    project_key: ProjectKey,
793    sampling_key: Option<ProjectKey>,
794    aggregator: &Addr<Aggregator>,
795) {
796    let ExtractedMetrics {
797        project_metrics,
798        sampling_metrics,
799    } = metrics;
800
801    if !project_metrics.is_empty() {
802        aggregator.send(MergeBuckets {
803            project_key,
804            buckets: project_metrics,
805        });
806    }
807
808    if !sampling_metrics.is_empty() {
809        // If no sampling project state is available, we associate the sampling
810        // metrics with the current project.
811        //
812        // project_without_tracing         -> metrics goes to self
813        // dependent_project_with_tracing  -> metrics goes to root
814        // root_project_with_tracing       -> metrics goes to root == self
815        let sampling_project_key = sampling_key.unwrap_or(project_key);
816        aggregator.send(MergeBuckets {
817            project_key: sampling_project_key,
818            buckets: sampling_metrics,
819        });
820    }
821}
822
823/// Function for on-off switches that filter specific item types (profiles, spans)
824/// based on a feature flag.
825///
826/// If the project config did not come from the upstream, we keep the items.
827fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
828    match config.relay_mode() {
829        RelayMode::Proxy => false,
830        RelayMode::Managed => !project_info.has_feature(feature),
831    }
832}
833
834/// The result of the envelope processing containing the processed envelope along with the partial
835/// result.
836#[derive(Debug)]
837#[expect(
838    clippy::large_enum_variant,
839    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
840)]
841enum ProcessingResult {
842    Envelope {
843        managed_envelope: TypedEnvelope<Processed>,
844        extracted_metrics: ProcessingExtractedMetrics,
845    },
846    Output(Output<Outputs>),
847}
848
849impl ProcessingResult {
850    /// Creates a [`ProcessingResult`] with no metrics.
851    fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
852        Self::Envelope {
853            managed_envelope,
854            extracted_metrics: ProcessingExtractedMetrics::new(),
855        }
856    }
857}
858
859/// All items which can be submitted upstream.
860#[derive(Debug)]
861#[expect(
862    clippy::large_enum_variant,
863    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
864)]
865enum Submit<'a> {
866    /// A processed envelope.
867    Envelope(TypedEnvelope<Processed>),
868    /// The output of a [`processing::Processor`].
869    Output {
870        output: Outputs,
871        ctx: processing::ForwardContext<'a>,
872    },
873}
874
875/// Applies processing to all contents of the given envelope.
876///
877/// Depending on the contents of the envelope and Relay's mode, this includes:
878///
879///  - Basic normalization and validation for all item types.
880///  - Clock drift correction if the required `sent_at` header is present.
881///  - Expansion of certain item types (e.g. unreal).
882///  - Store normalization for event payloads in processing mode.
883///  - Rate limiters and inbound filters on events in processing mode.
884#[derive(Debug)]
885pub struct ProcessEnvelope {
886    /// Envelope to process.
887    pub envelope: ManagedEnvelope,
888    /// The project info.
889    pub project_info: Arc<ProjectInfo>,
890    /// Currently active cached rate limits for this project.
891    pub rate_limits: Arc<RateLimits>,
892    /// Root sampling project info.
893    pub sampling_project_info: Option<Arc<ProjectInfo>>,
894    /// Sampling reservoir counters.
895    pub reservoir_counters: ReservoirCounters,
896}
897
898/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
899#[derive(Debug)]
900struct ProcessEnvelopeGrouped<'a> {
901    /// The group the envelope belongs to.
902    pub group: ProcessingGroup,
903    /// Envelope to process.
904    pub envelope: ManagedEnvelope,
905    /// The processing context.
906    pub ctx: processing::Context<'a>,
907}
908
909/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
910///
911/// This parses and validates the metrics:
912///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
913///    ignored independently.
914///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
915///    dropped together on parsing failure.
916///  - Other envelope items will be ignored with an error message.
917///
918/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
919/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
920#[derive(Debug)]
921pub struct ProcessMetrics {
922    /// A list of metric items.
923    pub data: MetricData,
924    /// The target project.
925    pub project_key: ProjectKey,
926    /// Whether to keep or reset the metric metadata.
927    pub source: BucketSource,
928    /// The wall clock time at which the request was received.
929    pub received_at: DateTime<Utc>,
930    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
931    /// correction.
932    pub sent_at: Option<DateTime<Utc>>,
933}
934
935/// Raw unparsed metric data.
936#[derive(Debug)]
937pub enum MetricData {
938    /// Raw data, unparsed envelope items.
939    Raw(Vec<Item>),
940    /// Already parsed buckets but unprocessed.
941    Parsed(Vec<Bucket>),
942}
943
944impl MetricData {
945    /// Consumes the metric data and parses the contained buckets.
946    ///
947    /// If the contained data is already parsed the buckets are returned unchanged.
948    /// Raw buckets are parsed and created with the passed `timestamp`.
949    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
950        let items = match self {
951            Self::Parsed(buckets) => return buckets,
952            Self::Raw(items) => items,
953        };
954
955        let mut buckets = Vec::new();
956        for item in items {
957            let payload = item.payload();
958            if item.ty() == &ItemType::Statsd {
959                for bucket_result in Bucket::parse_all(&payload, timestamp) {
960                    match bucket_result {
961                        Ok(bucket) => buckets.push(bucket),
962                        Err(error) => relay_log::debug!(
963                            error = &error as &dyn Error,
964                            "failed to parse metric bucket from statsd format",
965                        ),
966                    }
967                }
968            } else if item.ty() == &ItemType::MetricBuckets {
969                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
970                    Ok(parsed_buckets) => {
971                        // Re-use the allocation of `b` if possible.
972                        if buckets.is_empty() {
973                            buckets = parsed_buckets;
974                        } else {
975                            buckets.extend(parsed_buckets);
976                        }
977                    }
978                    Err(error) => {
979                        relay_log::debug!(
980                            error = &error as &dyn Error,
981                            "failed to parse metric bucket",
982                        );
983                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
984                    }
985                }
986            } else {
987                relay_log::error!(
988                    "invalid item of type {} passed to ProcessMetrics",
989                    item.ty()
990                );
991            }
992        }
993        buckets
994    }
995}
996
997#[derive(Debug)]
998pub struct ProcessBatchedMetrics {
999    /// Metrics payload in JSON format.
1000    pub payload: Bytes,
1001    /// Whether to keep or reset the metric metadata.
1002    pub source: BucketSource,
1003    /// The wall clock time at which the request was received.
1004    pub received_at: DateTime<Utc>,
1005    /// The wall clock time at which the request was received.
1006    pub sent_at: Option<DateTime<Utc>>,
1007}
1008
1009/// Source information where a metric bucket originates from.
1010#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1011pub enum BucketSource {
1012    /// The metric bucket originated from an internal Relay use case.
1013    ///
1014    /// The metric bucket originates either from within the same Relay
1015    /// or was accepted coming from another Relay which is registered as
1016    /// an internal Relay via Relay's configuration.
1017    Internal,
1018    /// The bucket source originated from an untrusted source.
1019    ///
1020    /// Managed Relays sending extracted metrics are considered external,
1021    /// it's a project use case but it comes from an untrusted source.
1022    External,
1023}
1024
1025impl BucketSource {
1026    /// Infers the bucket source from [`RequestMeta::request_trust`].
1027    pub fn from_meta(meta: &RequestMeta) -> Self {
1028        match meta.request_trust() {
1029            RequestTrust::Trusted => Self::Internal,
1030            RequestTrust::Untrusted => Self::External,
1031        }
1032    }
1033}
1034
1035/// Sends a client report to the upstream.
1036#[derive(Debug)]
1037pub struct SubmitClientReports {
1038    /// The client report to be sent.
1039    pub client_reports: Vec<ClientReport>,
1040    /// Scoping information for the client report.
1041    pub scoping: Scoping,
1042}
1043
1044/// CPU-intensive processing tasks for envelopes.
1045#[derive(Debug)]
1046pub enum EnvelopeProcessor {
1047    ProcessEnvelope(Box<ProcessEnvelope>),
1048    ProcessProjectMetrics(Box<ProcessMetrics>),
1049    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1050    FlushBuckets(Box<FlushBuckets>),
1051    SubmitClientReports(Box<SubmitClientReports>),
1052}
1053
1054impl EnvelopeProcessor {
1055    /// Returns the name of the message variant.
1056    pub fn variant(&self) -> &'static str {
1057        match self {
1058            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1059            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1060            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1061            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1062            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1063        }
1064    }
1065}
1066
1067impl relay_system::Interface for EnvelopeProcessor {}
1068
1069impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1070    type Response = relay_system::NoResponse;
1071
1072    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1073        Self::ProcessEnvelope(Box::new(message))
1074    }
1075}
1076
1077impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1078    type Response = NoResponse;
1079
1080    fn from_message(message: ProcessMetrics, _: ()) -> Self {
1081        Self::ProcessProjectMetrics(Box::new(message))
1082    }
1083}
1084
1085impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1086    type Response = NoResponse;
1087
1088    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1089        Self::ProcessBatchedMetrics(Box::new(message))
1090    }
1091}
1092
1093impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1094    type Response = NoResponse;
1095
1096    fn from_message(message: FlushBuckets, _: ()) -> Self {
1097        Self::FlushBuckets(Box::new(message))
1098    }
1099}
1100
1101impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1102    type Response = NoResponse;
1103
1104    fn from_message(message: SubmitClientReports, _: ()) -> Self {
1105        Self::SubmitClientReports(Box::new(message))
1106    }
1107}
1108
1109/// The asynchronous thread pool used for scheduling processing tasks in the processor.
1110pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1111
1112/// Service implementing the [`EnvelopeProcessor`] interface.
1113///
1114/// This service handles messages in a worker pool with configurable concurrency.
1115#[derive(Clone)]
1116pub struct EnvelopeProcessorService {
1117    inner: Arc<InnerProcessor>,
1118}
1119
1120/// Contains the addresses of services that the processor publishes to.
1121pub struct Addrs {
1122    pub outcome_aggregator: Addr<TrackOutcome>,
1123    pub upstream_relay: Addr<UpstreamRelay>,
1124    #[cfg(feature = "processing")]
1125    pub upload: Option<Addr<Upload>>,
1126    #[cfg(feature = "processing")]
1127    pub store_forwarder: Option<Addr<Store>>,
1128    pub aggregator: Addr<Aggregator>,
1129}
1130
1131impl Default for Addrs {
1132    fn default() -> Self {
1133        Addrs {
1134            outcome_aggregator: Addr::dummy(),
1135            upstream_relay: Addr::dummy(),
1136            #[cfg(feature = "processing")]
1137            upload: None,
1138            #[cfg(feature = "processing")]
1139            store_forwarder: None,
1140            aggregator: Addr::dummy(),
1141        }
1142    }
1143}
1144
1145struct InnerProcessor {
1146    pool: EnvelopeProcessorServicePool,
1147    config: Arc<Config>,
1148    global_config: GlobalConfigHandle,
1149    project_cache: ProjectCacheHandle,
1150    cogs: Cogs,
1151    addrs: Addrs,
1152    #[cfg(feature = "processing")]
1153    rate_limiter: Option<Arc<RedisRateLimiter>>,
1154    geoip_lookup: GeoIpLookup,
1155    #[cfg(feature = "processing")]
1156    cardinality_limiter: Option<CardinalityLimiter>,
1157    metric_outcomes: MetricOutcomes,
1158    processing: Processing,
1159}
1160
1161struct Processing {
1162    logs: LogsProcessor,
1163    trace_metrics: TraceMetricsProcessor,
1164    spans: SpansProcessor,
1165    check_ins: CheckInsProcessor,
1166    sessions: SessionsProcessor,
1167    transactions: TransactionProcessor,
1168    profile_chunks: ProfileChunksProcessor,
1169    trace_attachments: TraceAttachmentsProcessor,
1170}
1171
1172impl EnvelopeProcessorService {
1173    /// Creates a multi-threaded envelope processor.
1174    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1175    pub fn new(
1176        pool: EnvelopeProcessorServicePool,
1177        config: Arc<Config>,
1178        global_config: GlobalConfigHandle,
1179        project_cache: ProjectCacheHandle,
1180        cogs: Cogs,
1181        #[cfg(feature = "processing")] redis: Option<RedisClients>,
1182        addrs: Addrs,
1183        metric_outcomes: MetricOutcomes,
1184    ) -> Self {
1185        let geoip_lookup = config
1186            .geoip_path()
1187            .and_then(
1188                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1189                    Ok(geoip) => Some(geoip),
1190                    Err(err) => {
1191                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1192                        None
1193                    }
1194                },
1195            )
1196            .unwrap_or_else(GeoIpLookup::empty);
1197
1198        #[cfg(feature = "processing")]
1199        let (cardinality, quotas) = match redis {
1200            Some(RedisClients {
1201                cardinality,
1202                quotas,
1203                ..
1204            }) => (Some(cardinality), Some(quotas)),
1205            None => (None, None),
1206        };
1207        #[cfg(not(feature = "processing"))]
1208        let quotas = None;
1209
1210        #[cfg(feature = "processing")]
1211        let rate_limiter = quotas.clone().map(|redis| {
1212            RedisRateLimiter::new(redis)
1213                .max_limit(config.max_rate_limit())
1214                .cache(config.quota_cache_ratio(), config.quota_cache_max())
1215        });
1216
1217        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1218            #[cfg(feature = "processing")]
1219            project_cache.clone(),
1220            #[cfg(feature = "processing")]
1221            rate_limiter.clone(),
1222        ));
1223        #[cfg(feature = "processing")]
1224        let rate_limiter = rate_limiter.map(Arc::new);
1225
1226        let inner = InnerProcessor {
1227            pool,
1228            global_config,
1229            project_cache,
1230            cogs,
1231            #[cfg(feature = "processing")]
1232            rate_limiter,
1233            addrs,
1234            #[cfg(feature = "processing")]
1235            cardinality_limiter: cardinality
1236                .map(|cardinality| {
1237                    RedisSetLimiter::new(
1238                        RedisSetLimiterOptions {
1239                            cache_vacuum_interval: config
1240                                .cardinality_limiter_cache_vacuum_interval(),
1241                        },
1242                        cardinality,
1243                    )
1244                })
1245                .map(CardinalityLimiter::new),
1246            metric_outcomes,
1247            processing: Processing {
1248                logs: LogsProcessor::new(Arc::clone(&quota_limiter)),
1249                trace_metrics: TraceMetricsProcessor::new(Arc::clone(&quota_limiter)),
1250                spans: SpansProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1251                check_ins: CheckInsProcessor::new(Arc::clone(&quota_limiter)),
1252                sessions: SessionsProcessor::new(Arc::clone(&quota_limiter)),
1253                transactions: TransactionProcessor::new(
1254                    Arc::clone(&quota_limiter),
1255                    geoip_lookup.clone(),
1256                    quotas.clone(),
1257                ),
1258                profile_chunks: ProfileChunksProcessor::new(Arc::clone(&quota_limiter)),
1259                trace_attachments: TraceAttachmentsProcessor::new(quota_limiter),
1260            },
1261            geoip_lookup,
1262            config,
1263        };
1264
1265        Self {
1266            inner: Arc::new(inner),
1267        }
1268    }
1269
1270    async fn enforce_quotas<Group>(
1271        &self,
1272        managed_envelope: &mut TypedEnvelope<Group>,
1273        event: Annotated<Event>,
1274        extracted_metrics: &mut ProcessingExtractedMetrics,
1275        ctx: processing::Context<'_>,
1276    ) -> Result<Annotated<Event>, ProcessingError> {
1277        // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not
1278        // applied in the fast path, all cached quotas can be applied here.
1279        let cached_result = RateLimiter::Cached
1280            .enforce(managed_envelope, event, extracted_metrics, ctx)
1281            .await?;
1282
1283        if_processing!(self.inner.config, {
1284            let rate_limiter = match self.inner.rate_limiter.clone() {
1285                Some(rate_limiter) => rate_limiter,
1286                None => return Ok(cached_result.event),
1287            };
1288
1289            // Enforce all quotas consistently with Redis.
1290            let consistent_result = RateLimiter::Consistent(rate_limiter)
1291                .enforce(
1292                    managed_envelope,
1293                    cached_result.event,
1294                    extracted_metrics,
1295                    ctx
1296                )
1297                .await?;
1298
1299            // Update cached rate limits with the freshly computed ones.
1300            if !consistent_result.rate_limits.is_empty() {
1301                self.inner
1302                    .project_cache
1303                    .get(managed_envelope.scoping().project_key)
1304                    .rate_limits()
1305                    .merge(consistent_result.rate_limits);
1306            }
1307
1308            Ok(consistent_result.event)
1309        } else { Ok(cached_result.event) })
1310    }
1311
1312    /// Processes the general errors, and the items which require or create the events.
1313    async fn process_errors(
1314        &self,
1315        managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1316        project_id: ProjectId,
1317        mut ctx: processing::Context<'_>,
1318    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1319        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1320        let mut metrics = Metrics::default();
1321        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1322
1323        // Events can also contain user reports.
1324        report::process_user_reports(managed_envelope);
1325
1326        if_processing!(self.inner.config, {
1327            unreal::expand(managed_envelope, &self.inner.config)?;
1328            #[cfg(sentry)]
1329            playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1330            nnswitch::expand(managed_envelope)?;
1331        });
1332
1333        let mut event = event::extract(
1334            managed_envelope,
1335            &mut metrics,
1336            event_fully_normalized,
1337            &self.inner.config,
1338        )?;
1339
1340        if_processing!(self.inner.config, {
1341            if let Some(inner_event_fully_normalized) =
1342                unreal::process(managed_envelope, &mut event)?
1343            {
1344                event_fully_normalized = inner_event_fully_normalized;
1345            }
1346            #[cfg(sentry)]
1347            if let Some(inner_event_fully_normalized) =
1348                playstation::process(managed_envelope, &mut event, ctx.project_info)?
1349            {
1350                event_fully_normalized = inner_event_fully_normalized;
1351            }
1352            if let Some(inner_event_fully_normalized) =
1353                attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1354            {
1355                event_fully_normalized = inner_event_fully_normalized;
1356            }
1357        });
1358
1359        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1360            managed_envelope,
1361            &mut event,
1362            ctx.project_info,
1363            ctx.sampling_project_info,
1364        );
1365
1366        let attachments = managed_envelope
1367            .envelope()
1368            .items()
1369            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1370        processing::utils::event::finalize(
1371            managed_envelope.envelope().headers(),
1372            &mut event,
1373            attachments,
1374            &mut metrics,
1375            ctx.config,
1376        )?;
1377        event_fully_normalized = processing::utils::event::normalize(
1378            managed_envelope.envelope().headers(),
1379            &mut event,
1380            event_fully_normalized,
1381            project_id,
1382            ctx,
1383            &self.inner.geoip_lookup,
1384        )?;
1385        let filter_run =
1386            processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1387                .map_err(|err| {
1388                managed_envelope.reject(Outcome::Filtered(err.clone()));
1389                ProcessingError::EventFiltered(err)
1390            })?;
1391
1392        if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1393            dynamic_sampling::tag_error_with_sampling_decision(
1394                managed_envelope,
1395                &mut event,
1396                ctx.sampling_project_info,
1397                &self.inner.config,
1398            )
1399            .await;
1400        }
1401
1402        event = self
1403            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1404            .await?;
1405
1406        if event.value().is_some() {
1407            processing::utils::event::scrub(&mut event, ctx.project_info)?;
1408            event::serialize(
1409                managed_envelope,
1410                &mut event,
1411                event_fully_normalized,
1412                EventMetricsExtracted(false),
1413                SpansExtracted(false),
1414            )?;
1415            event::emit_feedback_metrics(managed_envelope.envelope());
1416        }
1417
1418        let attachments = managed_envelope
1419            .envelope_mut()
1420            .items_mut()
1421            .filter(|i| i.ty() == &ItemType::Attachment);
1422        processing::utils::attachments::scrub(attachments, ctx.project_info);
1423
1424        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1425            relay_log::error!(
1426                tags.project = %project_id,
1427                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1428                "ingested event without normalizing"
1429            );
1430        }
1431
1432        Ok(Some(extracted_metrics))
1433    }
1434
1435    /// Processes standalone items that require an event ID, but do not have an event on the same envelope.
1436    async fn process_standalone(
1437        &self,
1438        managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1439        project_id: ProjectId,
1440        ctx: processing::Context<'_>,
1441    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1442        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1443
1444        standalone::process(managed_envelope);
1445
1446        profile::filter(
1447            managed_envelope,
1448            &Annotated::empty(),
1449            ctx.config,
1450            project_id,
1451            ctx.project_info,
1452        );
1453
1454        self.enforce_quotas(
1455            managed_envelope,
1456            Annotated::empty(),
1457            &mut extracted_metrics,
1458            ctx,
1459        )
1460        .await?;
1461
1462        report::process_user_reports(managed_envelope);
1463        let attachments = managed_envelope
1464            .envelope_mut()
1465            .items_mut()
1466            .filter(|i| i.ty() == &ItemType::Attachment);
1467        processing::utils::attachments::scrub(attachments, ctx.project_info);
1468
1469        Ok(Some(extracted_metrics))
1470    }
1471
1472    /// Processes user and client reports.
1473    async fn process_client_reports(
1474        &self,
1475        managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1476        ctx: processing::Context<'_>,
1477    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1478        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1479
1480        self.enforce_quotas(
1481            managed_envelope,
1482            Annotated::empty(),
1483            &mut extracted_metrics,
1484            ctx,
1485        )
1486        .await?;
1487
1488        report::process_client_reports(
1489            managed_envelope,
1490            ctx.config,
1491            ctx.project_info,
1492            self.inner.addrs.outcome_aggregator.clone(),
1493        );
1494
1495        Ok(Some(extracted_metrics))
1496    }
1497
1498    /// Processes replays.
1499    async fn process_replays(
1500        &self,
1501        managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1502        ctx: processing::Context<'_>,
1503    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1504        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1505
1506        replay::process(
1507            managed_envelope,
1508            ctx.global_config,
1509            ctx.config,
1510            ctx.project_info,
1511            &self.inner.geoip_lookup,
1512        )?;
1513
1514        self.enforce_quotas(
1515            managed_envelope,
1516            Annotated::empty(),
1517            &mut extracted_metrics,
1518            ctx,
1519        )
1520        .await?;
1521
1522        Ok(Some(extracted_metrics))
1523    }
1524
1525    async fn process_nel(
1526        &self,
1527        mut managed_envelope: ManagedEnvelope,
1528        ctx: processing::Context<'_>,
1529    ) -> Result<ProcessingResult, ProcessingError> {
1530        nel::convert_to_logs(&mut managed_envelope);
1531        self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1532            .await
1533    }
1534
1535    async fn process_with_processor<P: processing::Processor>(
1536        &self,
1537        processor: &P,
1538        mut managed_envelope: ManagedEnvelope,
1539        ctx: processing::Context<'_>,
1540    ) -> Result<ProcessingResult, ProcessingError>
1541    where
1542        Outputs: From<P::Output>,
1543    {
1544        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1545            debug_assert!(
1546                false,
1547                "there must be work for the {} processor",
1548                std::any::type_name::<P>(),
1549            );
1550            return Err(ProcessingError::ProcessingGroupMismatch);
1551        };
1552
1553        managed_envelope.update();
1554        match managed_envelope.envelope().is_empty() {
1555            true => managed_envelope.accept(),
1556            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1557        }
1558
1559        processor
1560            .process(work, ctx)
1561            .await
1562            .map_err(|err| {
1563                relay_log::debug!(
1564                    error = &err as &dyn std::error::Error,
1565                    "processing pipeline failed"
1566                );
1567                ProcessingError::ProcessingFailure
1568            })
1569            .map(|o| o.map(Into::into))
1570            .map(ProcessingResult::Output)
1571    }
1572
1573    /// Processes standalone spans.
1574    ///
1575    /// This function does *not* run for spans extracted from transactions.
1576    async fn process_standalone_spans(
1577        &self,
1578        managed_envelope: &mut TypedEnvelope<SpanGroup>,
1579        _project_id: ProjectId,
1580        ctx: processing::Context<'_>,
1581    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1582        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1583
1584        span::filter(managed_envelope, ctx.config, ctx.project_info);
1585        span::convert_otel_traces_data(managed_envelope);
1586
1587        if_processing!(self.inner.config, {
1588            span::process(
1589                managed_envelope,
1590                &mut Annotated::empty(),
1591                &mut extracted_metrics,
1592                _project_id,
1593                ctx,
1594                &self.inner.geoip_lookup,
1595            )
1596            .await;
1597        });
1598
1599        self.enforce_quotas(
1600            managed_envelope,
1601            Annotated::empty(),
1602            &mut extracted_metrics,
1603            ctx,
1604        )
1605        .await?;
1606
1607        Ok(Some(extracted_metrics))
1608    }
1609
1610    async fn process_envelope(
1611        &self,
1612        project_id: ProjectId,
1613        message: ProcessEnvelopeGrouped<'_>,
1614    ) -> Result<ProcessingResult, ProcessingError> {
1615        let ProcessEnvelopeGrouped {
1616            group,
1617            envelope: mut managed_envelope,
1618            ctx,
1619        } = message;
1620
1621        // Pre-process the envelope headers.
1622        if let Some(sampling_state) = ctx.sampling_project_info {
1623            // Both transactions and standalone span envelopes need a normalized DSC header
1624            // to make sampling rules based on the segment/transaction name work correctly.
1625            managed_envelope
1626                .envelope_mut()
1627                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1628        }
1629
1630        // Set the event retention. Effectively, this value will only be available in processing
1631        // mode when the full project config is queried from the upstream.
1632        if let Some(retention) = ctx.project_info.config.event_retention {
1633            managed_envelope.envelope_mut().set_retention(retention);
1634        }
1635
1636        // Set the event retention. Effectively, this value will only be available in processing
1637        // mode when the full project config is queried from the upstream.
1638        if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1639            managed_envelope
1640                .envelope_mut()
1641                .set_downsampled_retention(retention);
1642        }
1643
1644        // Ensure the project ID is updated to the stored instance for this project cache. This can
1645        // differ in two cases:
1646        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
1647        //  2. The DSN was moved and the envelope sent to the old project ID.
1648        managed_envelope
1649            .envelope_mut()
1650            .meta_mut()
1651            .set_project_id(project_id);
1652
1653        macro_rules! run {
1654            ($fn_name:ident $(, $args:expr)*) => {
1655                async {
1656                    let mut managed_envelope = (managed_envelope, group).try_into()?;
1657                    match self.$fn_name(&mut managed_envelope, $($args),*).await {
1658                        Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1659                            managed_envelope: managed_envelope.into_processed(),
1660                            extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1661                        }),
1662                        Err(error) => {
1663                            relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1664                            if let Some(outcome) = error.to_outcome() {
1665                                managed_envelope.reject(outcome);
1666                            }
1667
1668                            return Err(error);
1669                        }
1670                    }
1671                }.await
1672            };
1673        }
1674
1675        relay_log::trace!("Processing {group} group", group = group.variant());
1676
1677        match group {
1678            ProcessingGroup::Error => run!(process_errors, project_id, ctx),
1679            ProcessingGroup::Transaction => {
1680                self.process_with_processor(
1681                    &self.inner.processing.transactions,
1682                    managed_envelope,
1683                    ctx,
1684                )
1685                .await
1686            }
1687            ProcessingGroup::Session => {
1688                self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1689                    .await
1690            }
1691            ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx),
1692            ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
1693            ProcessingGroup::Replay => {
1694                run!(process_replays, ctx)
1695            }
1696            ProcessingGroup::CheckIn => {
1697                self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1698                    .await
1699            }
1700            ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1701            ProcessingGroup::Log => {
1702                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1703                    .await
1704            }
1705            ProcessingGroup::TraceMetric => {
1706                self.process_with_processor(
1707                    &self.inner.processing.trace_metrics,
1708                    managed_envelope,
1709                    ctx,
1710                )
1711                .await
1712            }
1713            ProcessingGroup::SpanV2 => {
1714                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1715                    .await
1716            }
1717            ProcessingGroup::TraceAttachment => {
1718                self.process_with_processor(
1719                    &self.inner.processing.trace_attachments,
1720                    managed_envelope,
1721                    ctx,
1722                )
1723                .await
1724            }
1725            ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1726            ProcessingGroup::ProfileChunk => {
1727                self.process_with_processor(
1728                    &self.inner.processing.profile_chunks,
1729                    managed_envelope,
1730                    ctx,
1731                )
1732                .await
1733            }
1734            // Currently is not used.
1735            ProcessingGroup::Metrics => {
1736                // In proxy mode we simply forward the metrics.
1737                // This group shouldn't be used outside of proxy mode.
1738                if self.inner.config.relay_mode() != RelayMode::Proxy {
1739                    relay_log::error!(
1740                        tags.project = %project_id,
1741                        items = ?managed_envelope.envelope().items().next().map(Item::ty),
1742                        "received metrics in the process_state"
1743                    );
1744                }
1745
1746                Ok(ProcessingResult::no_metrics(
1747                    managed_envelope.into_processed(),
1748                ))
1749            }
1750            // Fallback to the legacy process_state implementation for Ungrouped events.
1751            ProcessingGroup::Ungrouped => {
1752                relay_log::error!(
1753                    tags.project = %project_id,
1754                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
1755                    "could not identify the processing group based on the envelope's items"
1756                );
1757
1758                Ok(ProcessingResult::no_metrics(
1759                    managed_envelope.into_processed(),
1760                ))
1761            }
1762            // Leave this group unchanged.
1763            //
1764            // This will later be forwarded to upstream.
1765            ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
1766                managed_envelope.into_processed(),
1767            )),
1768        }
1769    }
1770
1771    /// Processes the envelope and returns the processed envelope back.
1772    ///
1773    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
1774    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
1775    /// to be dropped, this is `None`.
1776    async fn process<'a>(
1777        &self,
1778        mut message: ProcessEnvelopeGrouped<'a>,
1779    ) -> Result<Option<Submit<'a>>, ProcessingError> {
1780        let ProcessEnvelopeGrouped {
1781            ref mut envelope,
1782            ctx,
1783            ..
1784        } = message;
1785
1786        // Prefer the project's project ID, and fall back to the stated project id from the
1787        // envelope. The project ID is available in all modes, other than in proxy mode, where
1788        // envelopes for unknown projects are forwarded blindly.
1789        //
1790        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
1791        // since we cannot process an envelope without project ID, so drop it.
1792        let Some(project_id) = ctx
1793            .project_info
1794            .project_id
1795            .or_else(|| envelope.envelope().meta().project_id())
1796        else {
1797            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1798            return Err(ProcessingError::MissingProjectId);
1799        };
1800
1801        let client = envelope.envelope().meta().client().map(str::to_owned);
1802        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1803        let project_key = envelope.envelope().meta().public_key();
1804        // Only allow sending to the sampling key, if we successfully loaded a sampling project
1805        // info relating to it. This filters out unknown/invalid project keys as well as project
1806        // keys from different organizations.
1807        let sampling_key = envelope
1808            .envelope()
1809            .sampling_key()
1810            .filter(|_| ctx.sampling_project_info.is_some());
1811
1812        // We set additional information on the scope, which will be removed after processing the
1813        // envelope.
1814        relay_log::configure_scope(|scope| {
1815            scope.set_tag("project", project_id);
1816            if let Some(client) = client {
1817                scope.set_tag("sdk", client);
1818            }
1819            if let Some(user_agent) = user_agent {
1820                scope.set_extra("user_agent", user_agent.into());
1821            }
1822        });
1823
1824        let result = match self.process_envelope(project_id, message).await {
1825            Ok(ProcessingResult::Envelope {
1826                mut managed_envelope,
1827                extracted_metrics,
1828            }) => {
1829                // The envelope could be modified or even emptied during processing, which
1830                // requires re-computation of the context.
1831                managed_envelope.update();
1832
1833                let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
1834                send_metrics(
1835                    extracted_metrics.metrics,
1836                    project_key,
1837                    sampling_key,
1838                    &self.inner.addrs.aggregator,
1839                );
1840
1841                let envelope_response = if managed_envelope.envelope().is_empty() {
1842                    if !has_metrics {
1843                        // Individual rate limits have already been issued
1844                        managed_envelope.reject(Outcome::RateLimited(None));
1845                    } else {
1846                        managed_envelope.accept();
1847                    }
1848
1849                    None
1850                } else {
1851                    Some(managed_envelope)
1852                };
1853
1854                Ok(envelope_response.map(Submit::Envelope))
1855            }
1856            Ok(ProcessingResult::Output(Output { main, metrics })) => {
1857                if let Some(metrics) = metrics {
1858                    metrics.accept(|metrics| {
1859                        send_metrics(
1860                            metrics,
1861                            project_key,
1862                            sampling_key,
1863                            &self.inner.addrs.aggregator,
1864                        );
1865                    });
1866                }
1867
1868                let ctx = ctx.to_forward();
1869                Ok(main.map(|output| Submit::Output { output, ctx }))
1870            }
1871            Err(err) => Err(err),
1872        };
1873
1874        relay_log::configure_scope(|scope| {
1875            scope.remove_tag("project");
1876            scope.remove_tag("sdk");
1877            scope.remove_tag("user_agent");
1878        });
1879
1880        result
1881    }
1882
1883    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1884        let project_key = message.envelope.envelope().meta().public_key();
1885        let wait_time = message.envelope.age();
1886        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1887
1888        // This COGS handling may need an overhaul in the future:
1889        // Cancel the passed in token, to start individual measurements per envelope instead.
1890        cogs.cancel();
1891
1892        let scoping = message.envelope.scoping();
1893        for (group, envelope) in ProcessingGroup::split_envelope(
1894            *message.envelope.into_envelope(),
1895            &message.project_info,
1896        ) {
1897            let mut cogs = self
1898                .inner
1899                .cogs
1900                .timed(ResourceId::Relay, AppFeature::from(group));
1901
1902            let mut envelope =
1903                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1904            envelope.scope(scoping);
1905
1906            let global_config = self.inner.global_config.current();
1907
1908            let ctx = processing::Context {
1909                config: &self.inner.config,
1910                global_config: &global_config,
1911                project_info: &message.project_info,
1912                sampling_project_info: message.sampling_project_info.as_deref(),
1913                rate_limits: &message.rate_limits,
1914                reservoir_counters: &message.reservoir_counters,
1915            };
1916
1917            let message = ProcessEnvelopeGrouped {
1918                group,
1919                envelope,
1920                ctx,
1921            };
1922
1923            let result = metric!(
1924                timer(RelayTimers::EnvelopeProcessingTime),
1925                group = group.variant(),
1926                { self.process(message).await }
1927            );
1928
1929            match result {
1930                Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
1931                Ok(None) => {}
1932                Err(error) if error.is_unexpected() => {
1933                    relay_log::error!(
1934                        tags.project_key = %project_key,
1935                        error = &error as &dyn Error,
1936                        "error processing envelope"
1937                    )
1938                }
1939                Err(error) => {
1940                    relay_log::debug!(
1941                        tags.project_key = %project_key,
1942                        error = &error as &dyn Error,
1943                        "error processing envelope"
1944                    )
1945                }
1946            }
1947        }
1948    }
1949
1950    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1951        let ProcessMetrics {
1952            data,
1953            project_key,
1954            received_at,
1955            sent_at,
1956            source,
1957        } = message;
1958
1959        let received_timestamp =
1960            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1961
1962        let mut buckets = data.into_buckets(received_timestamp);
1963        if buckets.is_empty() {
1964            return;
1965        };
1966        cogs.update(relay_metrics::cogs::BySize(&buckets));
1967
1968        let clock_drift_processor =
1969            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
1970
1971        buckets.retain_mut(|bucket| {
1972            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
1973                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
1974                return false;
1975            }
1976
1977            if !self::metrics::is_valid_namespace(bucket) {
1978                return false;
1979            }
1980
1981            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
1982
1983            if !matches!(source, BucketSource::Internal) {
1984                bucket.metadata = BucketMetadata::new(received_timestamp);
1985            }
1986
1987            true
1988        });
1989
1990        let project = self.inner.project_cache.get(project_key);
1991
1992        // Best effort check to filter and rate limit buckets, if there is no project state
1993        // available at the current time, we will check again after flushing.
1994        let buckets = match project.state() {
1995            ProjectState::Enabled(project_info) => {
1996                let rate_limits = project.rate_limits().current_limits();
1997                self.check_buckets(project_key, project_info, &rate_limits, buckets)
1998            }
1999            _ => buckets,
2000        };
2001
2002        relay_log::trace!("merging metric buckets into the aggregator");
2003        self.inner
2004            .addrs
2005            .aggregator
2006            .send(MergeBuckets::new(project_key, buckets));
2007    }
2008
2009    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2010        let ProcessBatchedMetrics {
2011            payload,
2012            source,
2013            received_at,
2014            sent_at,
2015        } = message;
2016
2017        #[derive(serde::Deserialize)]
2018        struct Wrapper {
2019            buckets: HashMap<ProjectKey, Vec<Bucket>>,
2020        }
2021
2022        let buckets = match serde_json::from_slice(&payload) {
2023            Ok(Wrapper { buckets }) => buckets,
2024            Err(error) => {
2025                relay_log::debug!(
2026                    error = &error as &dyn Error,
2027                    "failed to parse batched metrics",
2028                );
2029                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2030                return;
2031            }
2032        };
2033
2034        for (project_key, buckets) in buckets {
2035            self.handle_process_metrics(
2036                cogs,
2037                ProcessMetrics {
2038                    data: MetricData::Parsed(buckets),
2039                    project_key,
2040                    source,
2041                    received_at,
2042                    sent_at,
2043                },
2044            )
2045        }
2046    }
2047
2048    fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2049        let _submit = cogs.start_category("submit");
2050
2051        #[cfg(feature = "processing")]
2052        if self.inner.config.processing_enabled()
2053            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2054        {
2055            use crate::processing::StoreHandle;
2056
2057            let upload = self.inner.addrs.upload.as_ref();
2058            match submit {
2059                Submit::Envelope(envelope) => {
2060                    let envelope_has_attachments = envelope
2061                        .envelope()
2062                        .items()
2063                        .any(|item| *item.ty() == ItemType::Attachment);
2064                    // Whether Relay will store this attachment in objectstore or use kafka like before.
2065                    let use_objectstore = || {
2066                        let options = &self.inner.global_config.current().options;
2067                        utils::sample(options.objectstore_attachments_sample_rate).is_keep()
2068                    };
2069
2070                    if let Some(upload) = &self.inner.addrs.upload
2071                        && envelope_has_attachments
2072                        && use_objectstore()
2073                    {
2074                        // the `UploadService` will upload all attachments, and then forward the envelope to the `StoreService`.
2075                        upload.send(StoreEnvelope { envelope })
2076                    } else {
2077                        store_forwarder.send(StoreEnvelope { envelope })
2078                    }
2079                }
2080                Submit::Output { output, ctx } => output
2081                    .forward_store(StoreHandle::new(store_forwarder, upload), ctx)
2082                    .unwrap_or_else(|err| err.into_inner()),
2083            }
2084            return;
2085        }
2086
2087        let mut envelope = match submit {
2088            Submit::Envelope(envelope) => envelope,
2089            Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2090                Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2091                Err(_) => {
2092                    relay_log::error!("failed to serialize output to an envelope");
2093                    return;
2094                }
2095            },
2096        };
2097
2098        if envelope.envelope_mut().is_empty() {
2099            envelope.accept();
2100            return;
2101        }
2102
2103        // Override the `sent_at` timestamp. Since the envelope went through basic
2104        // normalization, all timestamps have been corrected. We propagate the new
2105        // `sent_at` to allow the next Relay to double-check this timestamp and
2106        // potentially apply correction again. This is done as close to sending as
2107        // possible so that we avoid internal delays.
2108        envelope.envelope_mut().set_sent_at(Utc::now());
2109
2110        relay_log::trace!("sending envelope to sentry endpoint");
2111        let http_encoding = self.inner.config.http_encoding();
2112        let result = envelope.envelope().to_vec().and_then(|v| {
2113            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2114        });
2115
2116        match result {
2117            Ok(body) => {
2118                self.inner
2119                    .addrs
2120                    .upstream_relay
2121                    .send(SendRequest(SendEnvelope {
2122                        envelope,
2123                        body,
2124                        http_encoding,
2125                        project_cache: self.inner.project_cache.clone(),
2126                    }));
2127            }
2128            Err(error) => {
2129                // Errors are only logged for what we consider an internal discard reason. These
2130                // indicate errors in the infrastructure or implementation bugs.
2131                relay_log::error!(
2132                    error = &error as &dyn Error,
2133                    tags.project_key = %envelope.scoping().project_key,
2134                    "failed to serialize envelope payload"
2135                );
2136
2137                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2138            }
2139        }
2140    }
2141
2142    fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2143        let SubmitClientReports {
2144            client_reports,
2145            scoping,
2146        } = message;
2147
2148        let upstream = self.inner.config.upstream_descriptor();
2149        let dsn = PartialDsn::outbound(&scoping, upstream);
2150
2151        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2152        for client_report in client_reports {
2153            match client_report.serialize() {
2154                Ok(payload) => {
2155                    let mut item = Item::new(ItemType::ClientReport);
2156                    item.set_payload(ContentType::Json, payload);
2157                    envelope.add_item(item);
2158                }
2159                Err(error) => {
2160                    relay_log::error!(
2161                        error = &error as &dyn std::error::Error,
2162                        "failed to serialize client report"
2163                    );
2164                }
2165            }
2166        }
2167
2168        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2169        self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2170    }
2171
2172    fn check_buckets(
2173        &self,
2174        project_key: ProjectKey,
2175        project_info: &ProjectInfo,
2176        rate_limits: &RateLimits,
2177        buckets: Vec<Bucket>,
2178    ) -> Vec<Bucket> {
2179        let Some(scoping) = project_info.scoping(project_key) else {
2180            relay_log::error!(
2181                tags.project_key = project_key.as_str(),
2182                "there is no scoping: dropping {} buckets",
2183                buckets.len(),
2184            );
2185            return Vec::new();
2186        };
2187
2188        let mut buckets = self::metrics::apply_project_info(
2189            buckets,
2190            &self.inner.metric_outcomes,
2191            project_info,
2192            scoping,
2193        );
2194
2195        let namespaces: BTreeSet<MetricNamespace> = buckets
2196            .iter()
2197            .filter_map(|bucket| bucket.name.try_namespace())
2198            .collect();
2199
2200        for namespace in namespaces {
2201            let limits = rate_limits.check_with_quotas(
2202                project_info.get_quotas(),
2203                scoping.item(DataCategory::MetricBucket),
2204            );
2205
2206            if limits.is_limited() {
2207                let rejected;
2208                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2209                    bucket.name.try_namespace() == Some(namespace)
2210                });
2211
2212                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2213                self.inner.metric_outcomes.track(
2214                    scoping,
2215                    &rejected,
2216                    Outcome::RateLimited(reason_code),
2217                );
2218            }
2219        }
2220
2221        let quotas = project_info.config.quotas.clone();
2222        match MetricsLimiter::create(buckets, quotas, scoping) {
2223            Ok(mut bucket_limiter) => {
2224                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2225                bucket_limiter.into_buckets()
2226            }
2227            Err(buckets) => buckets,
2228        }
2229    }
2230
2231    #[cfg(feature = "processing")]
2232    async fn rate_limit_buckets(
2233        &self,
2234        scoping: Scoping,
2235        project_info: &ProjectInfo,
2236        mut buckets: Vec<Bucket>,
2237    ) -> Vec<Bucket> {
2238        let Some(rate_limiter) = &self.inner.rate_limiter else {
2239            return buckets;
2240        };
2241
2242        let global_config = self.inner.global_config.current();
2243        let namespaces = buckets
2244            .iter()
2245            .filter_map(|bucket| bucket.name.try_namespace())
2246            .counts();
2247
2248        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2249
2250        for (namespace, quantity) in namespaces {
2251            let item_scoping = scoping.metric_bucket(namespace);
2252
2253            let limits = match rate_limiter
2254                .is_rate_limited(quotas, item_scoping, quantity, false)
2255                .await
2256            {
2257                Ok(limits) => limits,
2258                Err(err) => {
2259                    relay_log::error!(
2260                        error = &err as &dyn std::error::Error,
2261                        "failed to check redis rate limits"
2262                    );
2263                    break;
2264                }
2265            };
2266
2267            if limits.is_limited() {
2268                let rejected;
2269                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2270                    bucket.name.try_namespace() == Some(namespace)
2271                });
2272
2273                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2274                self.inner.metric_outcomes.track(
2275                    scoping,
2276                    &rejected,
2277                    Outcome::RateLimited(reason_code),
2278                );
2279
2280                self.inner
2281                    .project_cache
2282                    .get(item_scoping.scoping.project_key)
2283                    .rate_limits()
2284                    .merge(limits);
2285            }
2286        }
2287
2288        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2289            Err(buckets) => buckets,
2290            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2291        }
2292    }
2293
2294    /// Check and apply rate limits to metrics buckets for transactions and spans.
2295    #[cfg(feature = "processing")]
2296    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2297        relay_log::trace!("handle_rate_limit_buckets");
2298
2299        let scoping = *bucket_limiter.scoping();
2300
2301        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2302            let global_config = self.inner.global_config.current();
2303            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2304
2305            // We set over_accept_once such that the limit is actually reached, which allows subsequent
2306            // calls with quantity=0 to be rate limited.
2307            let over_accept_once = true;
2308            let mut rate_limits = RateLimits::new();
2309
2310            for category in [DataCategory::Transaction, DataCategory::Span] {
2311                let count = bucket_limiter.count(category);
2312
2313                let timer = Instant::now();
2314                let mut is_limited = false;
2315
2316                if let Some(count) = count {
2317                    match rate_limiter
2318                        .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2319                        .await
2320                    {
2321                        Ok(limits) => {
2322                            is_limited = limits.is_limited();
2323                            rate_limits.merge(limits)
2324                        }
2325                        Err(e) => relay_log::error!(error = &e as &dyn Error),
2326                    }
2327                }
2328
2329                relay_statsd::metric!(
2330                    timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2331                    category = category.name(),
2332                    limited = if is_limited { "true" } else { "false" },
2333                    count = match count {
2334                        None => "none",
2335                        Some(0) => "0",
2336                        Some(1) => "1",
2337                        Some(1..=10) => "10",
2338                        Some(1..=25) => "25",
2339                        Some(1..=50) => "50",
2340                        Some(51..=100) => "100",
2341                        Some(101..=500) => "500",
2342                        _ => "> 500",
2343                    },
2344                );
2345            }
2346
2347            if rate_limits.is_limited() {
2348                let was_enforced =
2349                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2350
2351                if was_enforced {
2352                    // Update the rate limits in the project cache.
2353                    self.inner
2354                        .project_cache
2355                        .get(scoping.project_key)
2356                        .rate_limits()
2357                        .merge(rate_limits);
2358                }
2359            }
2360        }
2361
2362        bucket_limiter.into_buckets()
2363    }
2364
2365    /// Cardinality limits the passed buckets and returns a filtered vector of only accepted buckets.
2366    #[cfg(feature = "processing")]
2367    async fn cardinality_limit_buckets(
2368        &self,
2369        scoping: Scoping,
2370        limits: &[CardinalityLimit],
2371        buckets: Vec<Bucket>,
2372    ) -> Vec<Bucket> {
2373        let global_config = self.inner.global_config.current();
2374        let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2375
2376        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2377            return buckets;
2378        }
2379
2380        let Some(ref limiter) = self.inner.cardinality_limiter else {
2381            return buckets;
2382        };
2383
2384        let scope = relay_cardinality::Scoping {
2385            organization_id: scoping.organization_id,
2386            project_id: scoping.project_id,
2387        };
2388
2389        let limits = match limiter
2390            .check_cardinality_limits(scope, limits, buckets)
2391            .await
2392        {
2393            Ok(limits) => limits,
2394            Err((buckets, error)) => {
2395                relay_log::error!(
2396                    error = &error as &dyn std::error::Error,
2397                    "cardinality limiter failed"
2398                );
2399                return buckets;
2400            }
2401        };
2402
2403        let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2404        if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2405            for limit in limits.exceeded_limits() {
2406                relay_log::with_scope(
2407                    |scope| {
2408                        // Set the organization as user so we can alert on distinct org_ids.
2409                        scope.set_user(Some(relay_log::sentry::User {
2410                            id: Some(scoping.organization_id.to_string()),
2411                            ..Default::default()
2412                        }));
2413                    },
2414                    || {
2415                        relay_log::error!(
2416                            tags.organization_id = scoping.organization_id.value(),
2417                            tags.limit_id = limit.id,
2418                            tags.passive = limit.passive,
2419                            "Cardinality Limit"
2420                        );
2421                    },
2422                );
2423            }
2424        }
2425
2426        for (limit, reports) in limits.cardinality_reports() {
2427            for report in reports {
2428                self.inner
2429                    .metric_outcomes
2430                    .cardinality(scoping, limit, report);
2431            }
2432        }
2433
2434        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2435            return limits.into_source();
2436        }
2437
2438        let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2439
2440        for (bucket, exceeded) in rejected {
2441            self.inner.metric_outcomes.track(
2442                scoping,
2443                &[bucket],
2444                Outcome::CardinalityLimited(exceeded.id.clone()),
2445            );
2446        }
2447        accepted
2448    }
2449
2450    /// Processes metric buckets and sends them to kafka.
2451    ///
2452    /// This function runs the following steps:
2453    ///  - cardinality limiting
2454    ///  - rate limiting
2455    ///  - submit to `StoreForwarder`
2456    #[cfg(feature = "processing")]
2457    async fn encode_metrics_processing(
2458        &self,
2459        message: FlushBuckets,
2460        store_forwarder: &Addr<Store>,
2461    ) {
2462        use crate::constants::DEFAULT_EVENT_RETENTION;
2463        use crate::services::store::StoreMetrics;
2464
2465        for ProjectBuckets {
2466            buckets,
2467            scoping,
2468            project_info,
2469            ..
2470        } in message.buckets.into_values()
2471        {
2472            let buckets = self
2473                .rate_limit_buckets(scoping, &project_info, buckets)
2474                .await;
2475
2476            let limits = project_info.get_cardinality_limits();
2477            let buckets = self
2478                .cardinality_limit_buckets(scoping, limits, buckets)
2479                .await;
2480
2481            if buckets.is_empty() {
2482                continue;
2483            }
2484
2485            let retention = project_info
2486                .config
2487                .event_retention
2488                .unwrap_or(DEFAULT_EVENT_RETENTION);
2489
2490            // The store forwarder takes care of bucket splitting internally, so we can submit the
2491            // entire list of buckets. There is no batching needed here.
2492            store_forwarder.send(StoreMetrics {
2493                buckets,
2494                scoping,
2495                retention,
2496            });
2497        }
2498    }
2499
2500    /// Serializes metric buckets to JSON and sends them to the upstream.
2501    ///
2502    /// This function runs the following steps:
2503    ///  - partitioning
2504    ///  - batching by configured size limit
2505    ///  - serialize to JSON and pack in an envelope
2506    ///  - submit the envelope to upstream or kafka depending on configuration
2507    ///
2508    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2509    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2510    /// already.
2511    fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2512        let FlushBuckets {
2513            partition_key,
2514            buckets,
2515        } = message;
2516
2517        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2518        let upstream = self.inner.config.upstream_descriptor();
2519
2520        for ProjectBuckets {
2521            buckets, scoping, ..
2522        } in buckets.values()
2523        {
2524            let dsn = PartialDsn::outbound(scoping, upstream);
2525
2526            relay_statsd::metric!(
2527                distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2528            );
2529
2530            let mut num_batches = 0;
2531            for batch in BucketsView::from(buckets).by_size(batch_size) {
2532                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2533
2534                let mut item = Item::new(ItemType::MetricBuckets);
2535                item.set_source_quantities(crate::metrics::extract_quantities(batch));
2536                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2537                envelope.add_item(item);
2538
2539                let mut envelope =
2540                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2541                envelope
2542                    .set_partition_key(Some(partition_key))
2543                    .scope(*scoping);
2544
2545                relay_statsd::metric!(
2546                    distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2547                );
2548
2549                self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2550                num_batches += 1;
2551            }
2552
2553            relay_statsd::metric!(
2554                distribution(RelayDistributions::BatchesPerPartition) = num_batches
2555            );
2556        }
2557    }
2558
2559    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
2560    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2561        if partition.is_empty() {
2562            return;
2563        }
2564
2565        let (unencoded, project_info) = partition.take();
2566        let http_encoding = self.inner.config.http_encoding();
2567        let encoded = match encode_payload(&unencoded, http_encoding) {
2568            Ok(payload) => payload,
2569            Err(error) => {
2570                let error = &error as &dyn std::error::Error;
2571                relay_log::error!(error, "failed to encode metrics payload");
2572                return;
2573            }
2574        };
2575
2576        let request = SendMetricsRequest {
2577            partition_key: partition_key.to_string(),
2578            unencoded,
2579            encoded,
2580            project_info,
2581            http_encoding,
2582            metric_outcomes: self.inner.metric_outcomes.clone(),
2583        };
2584
2585        self.inner.addrs.upstream_relay.send(SendRequest(request));
2586    }
2587
2588    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
2589    ///
2590    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
2591    /// payload directly instead of per-project Envelopes.
2592    ///
2593    /// This function runs the following steps:
2594    ///  - partitioning
2595    ///  - batching by configured size limit
2596    ///  - serialize to JSON
2597    ///  - submit directly to the upstream
2598    ///
2599    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2600    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2601    /// already.
2602    fn encode_metrics_global(&self, message: FlushBuckets) {
2603        let FlushBuckets {
2604            partition_key,
2605            buckets,
2606        } = message;
2607
2608        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2609        let mut partition = Partition::new(batch_size);
2610        let mut partition_splits = 0;
2611
2612        for ProjectBuckets {
2613            buckets, scoping, ..
2614        } in buckets.values()
2615        {
2616            for bucket in buckets {
2617                let mut remaining = Some(BucketView::new(bucket));
2618
2619                while let Some(bucket) = remaining.take() {
2620                    if let Some(next) = partition.insert(bucket, *scoping) {
2621                        // A part of the bucket could not be inserted. Take the partition and submit
2622                        // it immediately. Repeat until the final part was inserted. This should
2623                        // always result in a request, otherwise we would enter an endless loop.
2624                        self.send_global_partition(partition_key, &mut partition);
2625                        remaining = Some(next);
2626                        partition_splits += 1;
2627                    }
2628                }
2629            }
2630        }
2631
2632        if partition_splits > 0 {
2633            metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2634        }
2635
2636        self.send_global_partition(partition_key, &mut partition);
2637    }
2638
2639    async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2640        for (project_key, pb) in message.buckets.iter_mut() {
2641            let buckets = std::mem::take(&mut pb.buckets);
2642            pb.buckets =
2643                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2644        }
2645
2646        #[cfg(feature = "processing")]
2647        if self.inner.config.processing_enabled()
2648            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2649        {
2650            return self
2651                .encode_metrics_processing(message, store_forwarder)
2652                .await;
2653        }
2654
2655        if self.inner.config.http_global_metrics() {
2656            self.encode_metrics_global(message)
2657        } else {
2658            self.encode_metrics_envelope(cogs, message)
2659        }
2660    }
2661
2662    #[cfg(all(test, feature = "processing"))]
2663    fn redis_rate_limiter_enabled(&self) -> bool {
2664        self.inner.rate_limiter.is_some()
2665    }
2666
2667    async fn handle_message(&self, message: EnvelopeProcessor) {
2668        let ty = message.variant();
2669        let feature_weights = self.feature_weights(&message);
2670
2671        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2672            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2673
2674            match message {
2675                EnvelopeProcessor::ProcessEnvelope(m) => {
2676                    self.handle_process_envelope(&mut cogs, *m).await
2677                }
2678                EnvelopeProcessor::ProcessProjectMetrics(m) => {
2679                    self.handle_process_metrics(&mut cogs, *m)
2680                }
2681                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2682                    self.handle_process_batched_metrics(&mut cogs, *m)
2683                }
2684                EnvelopeProcessor::FlushBuckets(m) => {
2685                    self.handle_flush_buckets(&mut cogs, *m).await
2686                }
2687                EnvelopeProcessor::SubmitClientReports(m) => {
2688                    self.handle_submit_client_reports(&mut cogs, *m)
2689                }
2690            }
2691        });
2692    }
2693
2694    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2695        match message {
2696            // Envelope is split later and tokens are attributed then.
2697            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2698            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2699            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2700            EnvelopeProcessor::FlushBuckets(v) => v
2701                .buckets
2702                .values()
2703                .map(|s| {
2704                    if self.inner.config.processing_enabled() {
2705                        // Processing does not encode the metrics but instead rate and cardinality
2706                        // limits the metrics, which scales by count and not size.
2707                        relay_metrics::cogs::ByCount(&s.buckets).into()
2708                    } else {
2709                        relay_metrics::cogs::BySize(&s.buckets).into()
2710                    }
2711                })
2712                .fold(FeatureWeights::none(), FeatureWeights::merge),
2713            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2714        }
2715    }
2716}
2717
2718impl Service for EnvelopeProcessorService {
2719    type Interface = EnvelopeProcessor;
2720
2721    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2722        while let Some(message) = rx.recv().await {
2723            let service = self.clone();
2724            self.inner
2725                .pool
2726                .spawn_async(
2727                    async move {
2728                        service.handle_message(message).await;
2729                    }
2730                    .boxed(),
2731                )
2732                .await;
2733        }
2734    }
2735}
2736
2737/// Result of the enforcement of rate limiting.
2738///
2739/// If the event is already `None` or it's rate limited, it will be `None`
2740/// within the [`Annotated`].
2741struct EnforcementResult {
2742    event: Annotated<Event>,
2743    #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2744    rate_limits: RateLimits,
2745}
2746
2747impl EnforcementResult {
2748    /// Creates a new [`EnforcementResult`].
2749    pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
2750        Self { event, rate_limits }
2751    }
2752}
2753
2754#[derive(Clone)]
2755enum RateLimiter {
2756    Cached,
2757    #[cfg(feature = "processing")]
2758    Consistent(Arc<RedisRateLimiter>),
2759}
2760
2761impl RateLimiter {
2762    async fn enforce<Group>(
2763        &self,
2764        managed_envelope: &mut TypedEnvelope<Group>,
2765        event: Annotated<Event>,
2766        _extracted_metrics: &mut ProcessingExtractedMetrics,
2767        ctx: processing::Context<'_>,
2768    ) -> Result<EnforcementResult, ProcessingError> {
2769        if managed_envelope.envelope().is_empty() && event.value().is_none() {
2770            return Ok(EnforcementResult::new(event, RateLimits::default()));
2771        }
2772
2773        let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
2774        if quotas.is_empty() {
2775            return Ok(EnforcementResult::new(event, RateLimits::default()));
2776        }
2777
2778        let event_category = event_category(&event);
2779
2780        // We extract the rate limiters, in case we perform consistent rate limiting, since we will
2781        // need Redis access.
2782        //
2783        // When invoking the rate limiter, capture if the event item has been rate limited to also
2784        // remove it from the processing state eventually.
2785        let this = self.clone();
2786        let mut envelope_limiter =
2787            EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
2788                let this = this.clone();
2789
2790                async move {
2791                    match this {
2792                        #[cfg(feature = "processing")]
2793                        RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
2794                            rate_limiter
2795                                .is_rate_limited(quotas, item_scope, _quantity, false)
2796                                .await?,
2797                        ),
2798                        _ => Ok::<_, ProcessingError>(
2799                            ctx.rate_limits.check_with_quotas(quotas, item_scope),
2800                        ),
2801                    }
2802                }
2803            });
2804
2805        // Tell the envelope limiter about the event, since it has been removed from the Envelope at
2806        // this stage in processing.
2807        if let Some(category) = event_category {
2808            envelope_limiter.assume_event(category);
2809        }
2810
2811        let scoping = managed_envelope.scoping();
2812        let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
2813            envelope_limiter
2814                .compute(managed_envelope.envelope_mut(), &scoping)
2815                .await
2816        })?;
2817        let event_active = enforcement.is_event_active();
2818
2819        // Use the same rate limits as used for the envelope on the metrics.
2820        // Those rate limits should not be checked for expiry or similar to ensure a consistent
2821        // limiting of envelope items and metrics.
2822        #[cfg(feature = "processing")]
2823        _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
2824        enforcement.apply_with_outcomes(managed_envelope);
2825
2826        if event_active {
2827            debug_assert!(managed_envelope.envelope().is_empty());
2828            return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
2829        }
2830
2831        Ok(EnforcementResult::new(event, rate_limits))
2832    }
2833
2834    fn name(&self) -> &'static str {
2835        match self {
2836            Self::Cached => "cached",
2837            #[cfg(feature = "processing")]
2838            Self::Consistent(_) => "consistent",
2839        }
2840    }
2841}
2842
2843pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2844    let envelope_body: Vec<u8> = match http_encoding {
2845        HttpEncoding::Identity => return Ok(body.clone()),
2846        HttpEncoding::Deflate => {
2847            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2848            encoder.write_all(body.as_ref())?;
2849            encoder.finish()?
2850        }
2851        HttpEncoding::Gzip => {
2852            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2853            encoder.write_all(body.as_ref())?;
2854            encoder.finish()?
2855        }
2856        HttpEncoding::Br => {
2857            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
2858            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2859            encoder.write_all(body.as_ref())?;
2860            encoder.into_inner()
2861        }
2862        HttpEncoding::Zstd => {
2863            // Use the fastest compression level, our main objective here is to get the best
2864            // compression ratio for least amount of time spent.
2865            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2866            encoder.write_all(body.as_ref())?;
2867            encoder.finish()?
2868        }
2869    };
2870
2871    Ok(envelope_body.into())
2872}
2873
2874/// An upstream request that submits an envelope via HTTP.
2875#[derive(Debug)]
2876pub struct SendEnvelope {
2877    pub envelope: TypedEnvelope<Processed>,
2878    pub body: Bytes,
2879    pub http_encoding: HttpEncoding,
2880    pub project_cache: ProjectCacheHandle,
2881}
2882
2883impl UpstreamRequest for SendEnvelope {
2884    fn method(&self) -> reqwest::Method {
2885        reqwest::Method::POST
2886    }
2887
2888    fn path(&self) -> Cow<'_, str> {
2889        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2890    }
2891
2892    fn route(&self) -> &'static str {
2893        "envelope"
2894    }
2895
2896    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2897        let envelope_body = self.body.clone();
2898        metric!(
2899            distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2900        );
2901
2902        let meta = &self.envelope.meta();
2903        let shard = self.envelope.partition_key().map(|p| p.to_string());
2904        builder
2905            .content_encoding(self.http_encoding)
2906            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2907            .header_opt("User-Agent", meta.user_agent())
2908            .header("X-Sentry-Auth", meta.auth_header())
2909            .header("X-Forwarded-For", meta.forwarded_for())
2910            .header("Content-Type", envelope::CONTENT_TYPE)
2911            .header_opt("X-Sentry-Relay-Shard", shard)
2912            .body(envelope_body);
2913
2914        Ok(())
2915    }
2916
2917    fn sign(&mut self) -> Option<Sign> {
2918        Some(Sign::Optional(SignatureType::RequestSign))
2919    }
2920
2921    fn respond(
2922        self: Box<Self>,
2923        result: Result<http::Response, UpstreamRequestError>,
2924    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2925        Box::pin(async move {
2926            let result = match result {
2927                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2928                Err(error) => Err(error),
2929            };
2930
2931            match result {
2932                Ok(()) => self.envelope.accept(),
2933                Err(error) if error.is_received() => {
2934                    let scoping = self.envelope.scoping();
2935                    self.envelope.accept();
2936
2937                    if let UpstreamRequestError::RateLimited(limits) = error {
2938                        self.project_cache
2939                            .get(scoping.project_key)
2940                            .rate_limits()
2941                            .merge(limits.scope(&scoping));
2942                    }
2943                }
2944                Err(error) => {
2945                    // Errors are only logged for what we consider an internal discard reason. These
2946                    // indicate errors in the infrastructure or implementation bugs.
2947                    let mut envelope = self.envelope;
2948                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2949                    relay_log::error!(
2950                        error = &error as &dyn Error,
2951                        tags.project_key = %envelope.scoping().project_key,
2952                        "error sending envelope"
2953                    );
2954                }
2955            }
2956        })
2957    }
2958}
2959
2960/// A container for metric buckets from multiple projects.
2961///
2962/// This container is used to send metrics to the upstream in global batches as part of the
2963/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
2964/// the size of all metrics and allows to split them into multiple batches. See
2965/// [`insert`](Self::insert) for more information.
2966#[derive(Debug)]
2967struct Partition<'a> {
2968    max_size: usize,
2969    remaining: usize,
2970    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2971    project_info: HashMap<ProjectKey, Scoping>,
2972}
2973
2974impl<'a> Partition<'a> {
2975    /// Creates a new partition with the given maximum size in bytes.
2976    pub fn new(size: usize) -> Self {
2977        Self {
2978            max_size: size,
2979            remaining: size,
2980            views: HashMap::new(),
2981            project_info: HashMap::new(),
2982        }
2983    }
2984
2985    /// Inserts a bucket into the partition, splitting it if necessary.
2986    ///
2987    /// This function attempts to add the bucket to this partition. If the bucket does not fit
2988    /// entirely into the partition given its maximum size, the remaining part of the bucket is
2989    /// returned from this function call.
2990    ///
2991    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
2992    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
2993    /// partition. Afterwards, the caller is responsible to call this function again with the
2994    /// remaining bucket until it is fully inserted.
2995    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
2996        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
2997
2998        if let Some(current) = current {
2999            self.remaining = self.remaining.saturating_sub(current.estimated_size());
3000            self.views
3001                .entry(scoping.project_key)
3002                .or_default()
3003                .push(current);
3004
3005            self.project_info
3006                .entry(scoping.project_key)
3007                .or_insert(scoping);
3008        }
3009
3010        next
3011    }
3012
3013    /// Returns `true` if the partition does not hold any data.
3014    fn is_empty(&self) -> bool {
3015        self.views.is_empty()
3016    }
3017
3018    /// Returns the serialized buckets for this partition.
3019    ///
3020    /// This empties the partition, so that it can be reused.
3021    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3022        #[derive(serde::Serialize)]
3023        struct Wrapper<'a> {
3024            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3025        }
3026
3027        let buckets = &self.views;
3028        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3029
3030        let scopings = self.project_info.clone();
3031        self.project_info.clear();
3032
3033        self.views.clear();
3034        self.remaining = self.max_size;
3035
3036        (payload, scopings)
3037    }
3038}
3039
3040/// An upstream request that submits metric buckets via HTTP.
3041///
3042/// This request is not awaited. It automatically tracks outcomes if the request is not received.
3043#[derive(Debug)]
3044struct SendMetricsRequest {
3045    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
3046    partition_key: String,
3047    /// Serialized metric buckets without encoding applied, used for signing.
3048    unencoded: Bytes,
3049    /// Serialized metric buckets with the stated HTTP encoding applied.
3050    encoded: Bytes,
3051    /// Mapping of all contained project keys to their scoping and extraction mode.
3052    ///
3053    /// Used to track outcomes for transmission failures.
3054    project_info: HashMap<ProjectKey, Scoping>,
3055    /// Encoding (compression) of the payload.
3056    http_encoding: HttpEncoding,
3057    /// Metric outcomes instance to send outcomes on error.
3058    metric_outcomes: MetricOutcomes,
3059}
3060
3061impl SendMetricsRequest {
3062    fn create_error_outcomes(self) {
3063        #[derive(serde::Deserialize)]
3064        struct Wrapper {
3065            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3066        }
3067
3068        let buckets = match serde_json::from_slice(&self.unencoded) {
3069            Ok(Wrapper { buckets }) => buckets,
3070            Err(err) => {
3071                relay_log::error!(
3072                    error = &err as &dyn std::error::Error,
3073                    "failed to parse buckets from failed transmission"
3074                );
3075                return;
3076            }
3077        };
3078
3079        for (key, buckets) in buckets {
3080            let Some(&scoping) = self.project_info.get(&key) else {
3081                relay_log::error!("missing scoping for project key");
3082                continue;
3083            };
3084
3085            self.metric_outcomes.track(
3086                scoping,
3087                &buckets,
3088                Outcome::Invalid(DiscardReason::Internal),
3089            );
3090        }
3091    }
3092}
3093
3094impl UpstreamRequest for SendMetricsRequest {
3095    fn set_relay_id(&self) -> bool {
3096        true
3097    }
3098
3099    fn sign(&mut self) -> Option<Sign> {
3100        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3101    }
3102
3103    fn method(&self) -> reqwest::Method {
3104        reqwest::Method::POST
3105    }
3106
3107    fn path(&self) -> Cow<'_, str> {
3108        "/api/0/relays/metrics/".into()
3109    }
3110
3111    fn route(&self) -> &'static str {
3112        "global_metrics"
3113    }
3114
3115    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3116        metric!(
3117            distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3118        );
3119
3120        builder
3121            .content_encoding(self.http_encoding)
3122            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3123            .header(header::CONTENT_TYPE, b"application/json")
3124            .body(self.encoded.clone());
3125
3126        Ok(())
3127    }
3128
3129    fn respond(
3130        self: Box<Self>,
3131        result: Result<http::Response, UpstreamRequestError>,
3132    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3133        Box::pin(async {
3134            match result {
3135                Ok(mut response) => {
3136                    response.consume().await.ok();
3137                }
3138                Err(error) => {
3139                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3140
3141                    // If the request did not arrive at the upstream, we are responsible for outcomes.
3142                    // Otherwise, the upstream is responsible to log outcomes.
3143                    if error.is_received() {
3144                        return;
3145                    }
3146
3147                    self.create_error_outcomes()
3148                }
3149            }
3150        })
3151    }
3152}
3153
3154/// Container for global and project level [`Quota`].
3155#[derive(Copy, Clone, Debug)]
3156struct CombinedQuotas<'a> {
3157    global_quotas: &'a [Quota],
3158    project_quotas: &'a [Quota],
3159}
3160
3161impl<'a> CombinedQuotas<'a> {
3162    /// Returns a new [`CombinedQuotas`].
3163    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3164        Self {
3165            global_quotas: &global_config.quotas,
3166            project_quotas,
3167        }
3168    }
3169
3170    /// Returns `true` if both global quotas and project quotas are empty.
3171    pub fn is_empty(&self) -> bool {
3172        self.len() == 0
3173    }
3174
3175    /// Returns the number of both global and project quotas.
3176    pub fn len(&self) -> usize {
3177        self.global_quotas.len() + self.project_quotas.len()
3178    }
3179}
3180
3181impl<'a> IntoIterator for CombinedQuotas<'a> {
3182    type Item = &'a Quota;
3183    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3184
3185    fn into_iter(self) -> Self::IntoIter {
3186        self.global_quotas.iter().chain(self.project_quotas.iter())
3187    }
3188}
3189
3190#[cfg(test)]
3191mod tests {
3192    use std::collections::BTreeMap;
3193
3194    use insta::assert_debug_snapshot;
3195    use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3196    use relay_common::glob2::LazyGlob;
3197    use relay_dynamic_config::ProjectConfig;
3198    use relay_event_normalization::{
3199        MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3200        TransactionNameRule,
3201    };
3202    use relay_event_schema::protocol::TransactionSource;
3203    use relay_pii::DataScrubbingConfig;
3204    use similar_asserts::assert_eq;
3205
3206    use crate::metrics_extraction::IntoMetric;
3207    use crate::metrics_extraction::transactions::types::{
3208        CommonTags, TransactionMeasurementTags, TransactionMetric,
3209    };
3210    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3211
3212    #[cfg(feature = "processing")]
3213    use {
3214        relay_metrics::BucketValue,
3215        relay_quotas::{QuotaScope, ReasonCode},
3216        relay_test::mock_service,
3217    };
3218
3219    use super::*;
3220
3221    #[cfg(feature = "processing")]
3222    fn mock_quota(id: &str) -> Quota {
3223        Quota {
3224            id: Some(id.into()),
3225            categories: [DataCategory::MetricBucket].into(),
3226            scope: QuotaScope::Organization,
3227            scope_id: None,
3228            limit: Some(0),
3229            window: None,
3230            reason_code: None,
3231            namespace: None,
3232        }
3233    }
3234
3235    #[cfg(feature = "processing")]
3236    #[test]
3237    fn test_dynamic_quotas() {
3238        let global_config = GlobalConfig {
3239            quotas: vec![mock_quota("foo"), mock_quota("bar")],
3240            ..Default::default()
3241        };
3242
3243        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3244
3245        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3246
3247        assert_eq!(dynamic_quotas.len(), 4);
3248        assert!(!dynamic_quotas.is_empty());
3249
3250        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3251        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3252    }
3253
3254    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
3255    /// also ratelimit the next batches in the same message automatically.
3256    #[cfg(feature = "processing")]
3257    #[tokio::test]
3258    async fn test_ratelimit_per_batch() {
3259        use relay_base_schema::organization::OrganizationId;
3260        use relay_protocol::FiniteF64;
3261
3262        let rate_limited_org = Scoping {
3263            organization_id: OrganizationId::new(1),
3264            project_id: ProjectId::new(21),
3265            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3266            key_id: Some(17),
3267        };
3268
3269        let not_rate_limited_org = Scoping {
3270            organization_id: OrganizationId::new(2),
3271            project_id: ProjectId::new(21),
3272            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3273            key_id: Some(17),
3274        };
3275
3276        let message = {
3277            let project_info = {
3278                let quota = Quota {
3279                    id: Some("testing".into()),
3280                    categories: [DataCategory::MetricBucket].into(),
3281                    scope: relay_quotas::QuotaScope::Organization,
3282                    scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3283                    limit: Some(0),
3284                    window: None,
3285                    reason_code: Some(ReasonCode::new("test")),
3286                    namespace: None,
3287                };
3288
3289                let mut config = ProjectConfig::default();
3290                config.quotas.push(quota);
3291
3292                Arc::new(ProjectInfo {
3293                    config,
3294                    ..Default::default()
3295                })
3296            };
3297
3298            let project_metrics = |scoping| ProjectBuckets {
3299                buckets: vec![Bucket {
3300                    name: "d:transactions/bar".into(),
3301                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3302                    timestamp: UnixTimestamp::now(),
3303                    tags: Default::default(),
3304                    width: 10,
3305                    metadata: BucketMetadata::default(),
3306                }],
3307                rate_limits: Default::default(),
3308                project_info: project_info.clone(),
3309                scoping,
3310            };
3311
3312            let buckets = hashbrown::HashMap::from([
3313                (
3314                    rate_limited_org.project_key,
3315                    project_metrics(rate_limited_org),
3316                ),
3317                (
3318                    not_rate_limited_org.project_key,
3319                    project_metrics(not_rate_limited_org),
3320                ),
3321            ]);
3322
3323            FlushBuckets {
3324                partition_key: 0,
3325                buckets,
3326            }
3327        };
3328
3329        // ensure the order of the map while iterating is as expected.
3330        assert_eq!(message.buckets.keys().count(), 2);
3331
3332        let config = {
3333            let config_json = serde_json::json!({
3334                "processing": {
3335                    "enabled": true,
3336                    "kafka_config": [],
3337                    "redis": {
3338                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3339                    }
3340                }
3341            });
3342            Config::from_json_value(config_json).unwrap()
3343        };
3344
3345        let (store, handle) = {
3346            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3347                let org_id = match msg {
3348                    Store::Metrics(x) => x.scoping.organization_id,
3349                    _ => panic!("received envelope when expecting only metrics"),
3350                };
3351                org_ids.push(org_id);
3352            };
3353
3354            mock_service("store_forwarder", vec![], f)
3355        };
3356
3357        let processor = create_test_processor(config).await;
3358        assert!(processor.redis_rate_limiter_enabled());
3359
3360        processor.encode_metrics_processing(message, &store).await;
3361
3362        drop(store);
3363        let orgs_not_ratelimited = handle.await.unwrap();
3364
3365        assert_eq!(
3366            orgs_not_ratelimited,
3367            vec![not_rate_limited_org.organization_id]
3368        );
3369    }
3370
3371    #[tokio::test]
3372    async fn test_browser_version_extraction_with_pii_like_data() {
3373        let processor = create_test_processor(Default::default()).await;
3374        let outcome_aggregator = Addr::dummy();
3375        let event_id = EventId::new();
3376
3377        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3378            .parse()
3379            .unwrap();
3380
3381        let request_meta = RequestMeta::new(dsn);
3382        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3383
3384        envelope.add_item({
3385                let mut item = Item::new(ItemType::Event);
3386                item.set_payload(
3387                    ContentType::Json,
3388                    r#"
3389                    {
3390                        "request": {
3391                            "headers": [
3392                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3393                            ]
3394                        }
3395                    }
3396                "#,
3397                );
3398                item
3399            });
3400
3401        let mut datascrubbing_settings = DataScrubbingConfig::default();
3402        // enable all the default scrubbing
3403        datascrubbing_settings.scrub_data = true;
3404        datascrubbing_settings.scrub_defaults = true;
3405        datascrubbing_settings.scrub_ip_addresses = true;
3406
3407        // Make sure to mask any IP-like looking data
3408        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3409
3410        let config = ProjectConfig {
3411            datascrubbing_settings,
3412            pii_config: Some(pii_config),
3413            ..Default::default()
3414        };
3415
3416        let project_info = ProjectInfo {
3417            config,
3418            ..Default::default()
3419        };
3420
3421        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3422        assert_eq!(envelopes.len(), 1);
3423
3424        let (group, envelope) = envelopes.pop().unwrap();
3425        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3426
3427        let message = ProcessEnvelopeGrouped {
3428            group,
3429            envelope,
3430            ctx: processing::Context {
3431                project_info: &project_info,
3432                ..processing::Context::for_test()
3433            },
3434        };
3435
3436        let Ok(Some(Submit::Envelope(mut new_envelope))) = processor.process(message).await else {
3437            panic!();
3438        };
3439        let new_envelope = new_envelope.envelope_mut();
3440
3441        let event_item = new_envelope.items().last().unwrap();
3442        let annotated_event: Annotated<Event> =
3443            Annotated::from_json_bytes(&event_item.payload()).unwrap();
3444        let event = annotated_event.into_value().unwrap();
3445        let headers = event
3446            .request
3447            .into_value()
3448            .unwrap()
3449            .headers
3450            .into_value()
3451            .unwrap();
3452
3453        // IP-like data must be masked
3454        assert_eq!(
3455            Some(
3456                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3457            ),
3458            headers.get_header("User-Agent")
3459        );
3460        // But we still get correct browser and version number
3461        let contexts = event.contexts.into_value().unwrap();
3462        let browser = contexts.0.get("browser").unwrap();
3463        assert_eq!(
3464            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3465            browser.to_json().unwrap()
3466        );
3467    }
3468
3469    #[tokio::test]
3470    #[cfg(feature = "processing")]
3471    async fn test_materialize_dsc() {
3472        use crate::services::projects::project::PublicKeyConfig;
3473
3474        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3475            .parse()
3476            .unwrap();
3477        let request_meta = RequestMeta::new(dsn);
3478        let mut envelope = Envelope::from_request(None, request_meta);
3479
3480        let dsc = r#"{
3481            "trace_id": "00000000-0000-0000-0000-000000000000",
3482            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3483            "sample_rate": "0.2"
3484        }"#;
3485        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3486
3487        let mut item = Item::new(ItemType::Event);
3488        item.set_payload(ContentType::Json, r#"{}"#);
3489        envelope.add_item(item);
3490
3491        let outcome_aggregator = Addr::dummy();
3492        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3493
3494        let mut project_info = ProjectInfo::default();
3495        project_info.public_keys.push(PublicKeyConfig {
3496            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3497            numeric_id: Some(1),
3498        });
3499
3500        let config = serde_json::json!({
3501            "processing": {
3502                "enabled": true,
3503                "kafka_config": [],
3504            }
3505        });
3506
3507        let message = ProcessEnvelopeGrouped {
3508            group: ProcessingGroup::Error,
3509            envelope: managed_envelope,
3510            ctx: processing::Context {
3511                config: &Config::from_json_value(config.clone()).unwrap(),
3512                project_info: &project_info,
3513                sampling_project_info: Some(&project_info),
3514                ..processing::Context::for_test()
3515            },
3516        };
3517
3518        let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3519        let Ok(Some(Submit::Envelope(envelope))) = processor.process(message).await else {
3520            panic!();
3521        };
3522        let event = envelope
3523            .envelope()
3524            .get_item_by(|item| item.ty() == &ItemType::Event)
3525            .unwrap();
3526
3527        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3528        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3529        Object(
3530            {
3531                "environment": ~,
3532                "public_key": String(
3533                    "e12d836b15bb49d7bbf99e64295d995b",
3534                ),
3535                "release": ~,
3536                "replay_id": ~,
3537                "sample_rate": String(
3538                    "0.2",
3539                ),
3540                "trace_id": String(
3541                    "00000000000000000000000000000000",
3542                ),
3543                "transaction": ~,
3544            },
3545        )
3546        "###);
3547    }
3548
3549    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3550        let mut event = Annotated::<Event>::from_json(
3551            r#"
3552            {
3553                "type": "transaction",
3554                "transaction": "/foo/",
3555                "timestamp": 946684810.0,
3556                "start_timestamp": 946684800.0,
3557                "contexts": {
3558                    "trace": {
3559                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3560                        "span_id": "fa90fdead5f74053",
3561                        "op": "http.server",
3562                        "type": "trace"
3563                    }
3564                },
3565                "transaction_info": {
3566                    "source": "url"
3567                }
3568            }
3569            "#,
3570        )
3571        .unwrap();
3572        let e = event.value_mut().as_mut().unwrap();
3573        e.transaction.set_value(Some(transaction_name.into()));
3574
3575        e.transaction_info
3576            .value_mut()
3577            .as_mut()
3578            .unwrap()
3579            .source
3580            .set_value(Some(source));
3581
3582        relay_statsd::with_capturing_test_client(|| {
3583            utils::log_transaction_name_metrics(&mut event, |event| {
3584                let config = NormalizationConfig {
3585                    transaction_name_config: TransactionNameConfig {
3586                        rules: &[TransactionNameRule {
3587                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
3588                            expiry: DateTime::<Utc>::MAX_UTC,
3589                            redaction: RedactionRule::Replace {
3590                                substitution: "*".to_owned(),
3591                            },
3592                        }],
3593                    },
3594                    ..Default::default()
3595                };
3596                relay_event_normalization::normalize_event(event, &config)
3597            });
3598        })
3599    }
3600
3601    #[test]
3602    fn test_log_transaction_metrics_none() {
3603        let captures = capture_test_event("/nothing", TransactionSource::Url);
3604        insta::assert_debug_snapshot!(captures, @r###"
3605        [
3606            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3607        ]
3608        "###);
3609    }
3610
3611    #[test]
3612    fn test_log_transaction_metrics_rule() {
3613        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3614        insta::assert_debug_snapshot!(captures, @r###"
3615        [
3616            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3617        ]
3618        "###);
3619    }
3620
3621    #[test]
3622    fn test_log_transaction_metrics_pattern() {
3623        let captures = capture_test_event("/something/12345", TransactionSource::Url);
3624        insta::assert_debug_snapshot!(captures, @r###"
3625        [
3626            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3627        ]
3628        "###);
3629    }
3630
3631    #[test]
3632    fn test_log_transaction_metrics_both() {
3633        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3634        insta::assert_debug_snapshot!(captures, @r###"
3635        [
3636            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3637        ]
3638        "###);
3639    }
3640
3641    #[test]
3642    fn test_log_transaction_metrics_no_match() {
3643        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3644        insta::assert_debug_snapshot!(captures, @r###"
3645        [
3646            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3647        ]
3648        "###);
3649    }
3650
3651    /// Confirms that the hardcoded value we use for the fixed length of the measurement MRI is
3652    /// correct. Unit test is placed here because it has dependencies to relay-server and therefore
3653    /// cannot be called from relay-metrics.
3654    #[test]
3655    fn test_mri_overhead_constant() {
3656        let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
3657
3658        let derived_value = {
3659            let name = "foobar".to_owned();
3660            let value = 5.into(); // Arbitrary value.
3661            let unit = MetricUnit::Duration(DurationUnit::default());
3662            let tags = TransactionMeasurementTags {
3663                measurement_rating: None,
3664                universal_tags: CommonTags(BTreeMap::new()),
3665                score_profile_version: None,
3666            };
3667
3668            let measurement = TransactionMetric::Measurement {
3669                name: name.clone(),
3670                value,
3671                unit,
3672                tags,
3673            };
3674
3675            let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
3676            metric.name.len() - unit.to_string().len() - name.len()
3677        };
3678        assert_eq!(
3679            hardcoded_value, derived_value,
3680            "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
3681        );
3682    }
3683
3684    #[tokio::test]
3685    async fn test_process_metrics_bucket_metadata() {
3686        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3687        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3688        let received_at = Utc::now();
3689        let config = Config::default();
3690
3691        let (aggregator, mut aggregator_rx) = Addr::custom();
3692        let processor = create_test_processor_with_addrs(
3693            config,
3694            Addrs {
3695                aggregator,
3696                ..Default::default()
3697            },
3698        )
3699        .await;
3700
3701        let mut item = Item::new(ItemType::Statsd);
3702        item.set_payload(
3703            ContentType::Text,
3704            "transactions/foo:3182887624:4267882815|s",
3705        );
3706        for (source, expected_received_at) in [
3707            (
3708                BucketSource::External,
3709                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3710            ),
3711            (BucketSource::Internal, None),
3712        ] {
3713            let message = ProcessMetrics {
3714                data: MetricData::Raw(vec![item.clone()]),
3715                project_key,
3716                source,
3717                received_at,
3718                sent_at: Some(Utc::now()),
3719            };
3720            processor.handle_process_metrics(&mut token, message);
3721
3722            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3723            let buckets = merge_buckets.buckets;
3724            assert_eq!(buckets.len(), 1);
3725            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3726        }
3727    }
3728
3729    #[tokio::test]
3730    async fn test_process_batched_metrics() {
3731        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3732        let received_at = Utc::now();
3733        let config = Config::default();
3734
3735        let (aggregator, mut aggregator_rx) = Addr::custom();
3736        let processor = create_test_processor_with_addrs(
3737            config,
3738            Addrs {
3739                aggregator,
3740                ..Default::default()
3741            },
3742        )
3743        .await;
3744
3745        let payload = r#"{
3746    "buckets": {
3747        "11111111111111111111111111111111": [
3748            {
3749                "timestamp": 1615889440,
3750                "width": 0,
3751                "name": "d:custom/endpoint.response_time@millisecond",
3752                "type": "d",
3753                "value": [
3754                  68.0
3755                ],
3756                "tags": {
3757                  "route": "user_index"
3758                }
3759            }
3760        ],
3761        "22222222222222222222222222222222": [
3762            {
3763                "timestamp": 1615889440,
3764                "width": 0,
3765                "name": "d:custom/endpoint.cache_rate@none",
3766                "type": "d",
3767                "value": [
3768                  36.0
3769                ]
3770            }
3771        ]
3772    }
3773}
3774"#;
3775        let message = ProcessBatchedMetrics {
3776            payload: Bytes::from(payload),
3777            source: BucketSource::Internal,
3778            received_at,
3779            sent_at: Some(Utc::now()),
3780        };
3781        processor.handle_process_batched_metrics(&mut token, message);
3782
3783        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3784        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3785
3786        let mut messages = vec![mb1, mb2];
3787        messages.sort_by_key(|mb| mb.project_key);
3788
3789        let actual = messages
3790            .into_iter()
3791            .map(|mb| (mb.project_key, mb.buckets))
3792            .collect::<Vec<_>>();
3793
3794        assert_debug_snapshot!(actual, @r###"
3795        [
3796            (
3797                ProjectKey("11111111111111111111111111111111"),
3798                [
3799                    Bucket {
3800                        timestamp: UnixTimestamp(1615889440),
3801                        width: 0,
3802                        name: MetricName(
3803                            "d:custom/endpoint.response_time@millisecond",
3804                        ),
3805                        value: Distribution(
3806                            [
3807                                68.0,
3808                            ],
3809                        ),
3810                        tags: {
3811                            "route": "user_index",
3812                        },
3813                        metadata: BucketMetadata {
3814                            merges: 1,
3815                            received_at: None,
3816                            extracted_from_indexed: false,
3817                        },
3818                    },
3819                ],
3820            ),
3821            (
3822                ProjectKey("22222222222222222222222222222222"),
3823                [
3824                    Bucket {
3825                        timestamp: UnixTimestamp(1615889440),
3826                        width: 0,
3827                        name: MetricName(
3828                            "d:custom/endpoint.cache_rate@none",
3829                        ),
3830                        value: Distribution(
3831                            [
3832                                36.0,
3833                            ],
3834                        ),
3835                        tags: {},
3836                        metadata: BucketMetadata {
3837                            merges: 1,
3838                            received_at: None,
3839                            extracted_from_indexed: false,
3840                        },
3841                    },
3842                ],
3843            ),
3844        ]
3845        "###);
3846    }
3847}