relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27    ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42    self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::{Integration, SpansIntegration};
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::ExtractedMetrics;
49use crate::metrics_extraction::transactions::types::ExtractMetricsError;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::sessions::SessionsProcessor;
53use crate::processing::spans::SpansProcessor;
54use crate::processing::trace_metrics::TraceMetricsProcessor;
55use crate::processing::transactions::extraction::ExtractMetricsContext;
56use crate::processing::utils::event::{
57    EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
58    event_type,
59};
60use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
61use crate::service::ServiceError;
62use crate::services::global_config::GlobalConfigHandle;
63use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
64use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
65use crate::services::projects::cache::ProjectCacheHandle;
66use crate::services::projects::project::{ProjectInfo, ProjectState};
67use crate::services::upstream::{
68    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
69};
70use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
71use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
72use crate::{http, processing};
73use relay_threading::AsyncPool;
74#[cfg(feature = "processing")]
75use {
76    crate::services::global_rate_limits::{GlobalRateLimits, GlobalRateLimitsServiceHandle},
77    crate::services::processor::nnswitch::SwitchProcessingError,
78    crate::services::store::{Store, StoreEnvelope},
79    crate::services::upload::UploadAttachments,
80    crate::utils::Enforcement,
81    itertools::Itertools,
82    relay_cardinality::{
83        CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
84        RedisSetLimiterOptions,
85    },
86    relay_dynamic_config::CardinalityLimiterMode,
87    relay_quotas::{RateLimitingError, RedisRateLimiter},
88    relay_redis::{AsyncRedisClient, RedisClients},
89    std::time::Instant,
90    symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
91};
92
93mod attachment;
94mod dynamic_sampling;
95mod event;
96mod metrics;
97mod nel;
98mod profile;
99mod profile_chunk;
100mod replay;
101mod report;
102mod span;
103
104#[cfg(all(sentry, feature = "processing"))]
105mod playstation;
106mod standalone;
107#[cfg(feature = "processing")]
108mod unreal;
109
110#[cfg(feature = "processing")]
111mod nnswitch;
112
113/// Creates the block only if used with `processing` feature.
114///
115/// Provided code block will be executed only if the provided config has `processing_enabled` set.
116macro_rules! if_processing {
117    ($config:expr, $if_true:block) => {
118        #[cfg(feature = "processing")] {
119            if $config.processing_enabled() $if_true
120        }
121    };
122    ($config:expr, $if_true:block else $if_false:block) => {
123        {
124            #[cfg(feature = "processing")] {
125                if $config.processing_enabled() $if_true else $if_false
126            }
127            #[cfg(not(feature = "processing"))] {
128                $if_false
129            }
130        }
131    };
132}
133
134/// The minimum clock drift for correction to apply.
135pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
136
137#[derive(Debug)]
138pub struct GroupTypeError;
139
140impl Display for GroupTypeError {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        f.write_str("failed to convert processing group into corresponding type")
143    }
144}
145
146impl std::error::Error for GroupTypeError {}
147
148macro_rules! processing_group {
149    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
150        #[derive(Clone, Copy, Debug)]
151        pub struct $ty;
152
153        impl From<$ty> for ProcessingGroup {
154            fn from(_: $ty) -> Self {
155                ProcessingGroup::$variant
156            }
157        }
158
159        impl TryFrom<ProcessingGroup> for $ty {
160            type Error = GroupTypeError;
161
162            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
163                if matches!(value, ProcessingGroup::$variant) {
164                    return Ok($ty);
165                }
166                $($(
167                    if matches!(value, ProcessingGroup::$other) {
168                        return Ok($ty);
169                    }
170                )+)?
171                return Err(GroupTypeError);
172            }
173        }
174    };
175}
176
177/// A marker trait.
178///
179/// Should be used only with groups which are responsible for processing envelopes with events.
180pub trait EventProcessing {}
181
182processing_group!(TransactionGroup, Transaction);
183impl EventProcessing for TransactionGroup {}
184
185processing_group!(ErrorGroup, Error);
186impl EventProcessing for ErrorGroup {}
187
188processing_group!(SessionGroup, Session);
189processing_group!(StandaloneGroup, Standalone);
190processing_group!(ClientReportGroup, ClientReport);
191processing_group!(ReplayGroup, Replay);
192processing_group!(CheckInGroup, CheckIn);
193processing_group!(LogGroup, Log, Nel);
194processing_group!(TraceMetricGroup, TraceMetric);
195processing_group!(SpanGroup, Span);
196
197processing_group!(ProfileChunkGroup, ProfileChunk);
198processing_group!(MetricsGroup, Metrics);
199processing_group!(ForwardUnknownGroup, ForwardUnknown);
200processing_group!(Ungrouped, Ungrouped);
201
202/// Processed group type marker.
203///
204/// Marks the envelopes which passed through the processing pipeline.
205#[derive(Clone, Copy, Debug)]
206pub struct Processed;
207
208/// Describes the groups of the processable items.
209#[derive(Clone, Copy, Debug)]
210pub enum ProcessingGroup {
211    /// All the transaction related items.
212    ///
213    /// Includes transactions, related attachments, profiles.
214    Transaction,
215    /// All the items which require (have or create) events.
216    ///
217    /// This includes: errors, NEL, security reports, user reports, some of the
218    /// attachments.
219    Error,
220    /// Session events.
221    Session,
222    /// Standalone items which can be sent alone without any event attached to it in the current
223    /// envelope e.g. some attachments, user reports.
224    Standalone,
225    /// Outcomes.
226    ClientReport,
227    /// Replays and ReplayRecordings.
228    Replay,
229    /// Crons.
230    CheckIn,
231    /// NEL reports.
232    Nel,
233    /// Logs.
234    Log,
235    /// Trace metrics.
236    TraceMetric,
237    /// Spans.
238    Span,
239    /// Span V2 spans.
240    SpanV2,
241    /// Metrics.
242    Metrics,
243    /// ProfileChunk.
244    ProfileChunk,
245    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
246    /// decide what to do with them.
247    ForwardUnknown,
248    /// All the items in the envelope that could not be grouped.
249    Ungrouped,
250}
251
252impl ProcessingGroup {
253    /// Splits provided envelope into list of tuples of groups with associated envelopes.
254    fn split_envelope(
255        mut envelope: Envelope,
256        project_info: &ProjectInfo,
257    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
258        let headers = envelope.headers().clone();
259        let mut grouped_envelopes = smallvec![];
260
261        // Extract replays.
262        let replay_items = envelope.take_items_by(|item| {
263            matches!(
264                item.ty(),
265                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
266            )
267        });
268        if !replay_items.is_empty() {
269            grouped_envelopes.push((
270                ProcessingGroup::Replay,
271                Envelope::from_parts(headers.clone(), replay_items),
272            ))
273        }
274
275        // Keep all the sessions together in one envelope.
276        let session_items = envelope
277            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
278        if !session_items.is_empty() {
279            grouped_envelopes.push((
280                ProcessingGroup::Session,
281                Envelope::from_parts(headers.clone(), session_items),
282            ))
283        }
284
285        let span_v2_items = envelope.take_items_by(|item| {
286            let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
287            let is_supported_integration = matches!(
288                item.integration(),
289                Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
290            );
291            let is_span = matches!(item.ty(), &ItemType::Span);
292
293            ItemContainer::<SpanV2>::is_container(item)
294                || (exp_feature && is_span)
295                || (exp_feature && is_supported_integration)
296        });
297
298        if !span_v2_items.is_empty() {
299            grouped_envelopes.push((
300                ProcessingGroup::SpanV2,
301                Envelope::from_parts(headers.clone(), span_v2_items),
302            ))
303        }
304
305        // Extract spans.
306        let span_items = envelope.take_items_by(|item| {
307            matches!(item.ty(), &ItemType::Span)
308                || matches!(item.integration(), Some(Integration::Spans(_)))
309        });
310
311        if !span_items.is_empty() {
312            grouped_envelopes.push((
313                ProcessingGroup::Span,
314                Envelope::from_parts(headers.clone(), span_items),
315            ))
316        }
317
318        // Extract logs.
319        let logs_items = envelope.take_items_by(|item| {
320            matches!(item.ty(), &ItemType::Log)
321                || matches!(item.integration(), Some(Integration::Logs(_)))
322        });
323        if !logs_items.is_empty() {
324            grouped_envelopes.push((
325                ProcessingGroup::Log,
326                Envelope::from_parts(headers.clone(), logs_items),
327            ))
328        }
329
330        // Extract trace metrics.
331        let trace_metric_items =
332            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
333        if !trace_metric_items.is_empty() {
334            grouped_envelopes.push((
335                ProcessingGroup::TraceMetric,
336                Envelope::from_parts(headers.clone(), trace_metric_items),
337            ))
338        }
339
340        // NEL items are transformed into logs in their own processing step.
341        let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
342        if !nel_items.is_empty() {
343            grouped_envelopes.push((
344                ProcessingGroup::Nel,
345                Envelope::from_parts(headers.clone(), nel_items),
346            ))
347        }
348
349        // Extract all metric items.
350        //
351        // Note: Should only be relevant in proxy mode. In other modes we send metrics through
352        // a separate pipeline.
353        let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
354        if !metric_items.is_empty() {
355            grouped_envelopes.push((
356                ProcessingGroup::Metrics,
357                Envelope::from_parts(headers.clone(), metric_items),
358            ))
359        }
360
361        // Extract profile chunks.
362        let profile_chunk_items =
363            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
364        if !profile_chunk_items.is_empty() {
365            grouped_envelopes.push((
366                ProcessingGroup::ProfileChunk,
367                Envelope::from_parts(headers.clone(), profile_chunk_items),
368            ))
369        }
370
371        // Extract all standalone items.
372        //
373        // Note: only if there are no items in the envelope which can create events, otherwise they
374        // will be in the same envelope with all require event items.
375        if !envelope.items().any(Item::creates_event) {
376            let standalone_items = envelope.take_items_by(Item::requires_event);
377            if !standalone_items.is_empty() {
378                grouped_envelopes.push((
379                    ProcessingGroup::Standalone,
380                    Envelope::from_parts(headers.clone(), standalone_items),
381                ))
382            }
383        };
384
385        // Make sure we create separate envelopes for each `RawSecurity` report.
386        let security_reports_items = envelope
387            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
388            .into_iter()
389            .map(|item| {
390                let headers = headers.clone();
391                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
392                let mut envelope = Envelope::from_parts(headers, items);
393                envelope.set_event_id(EventId::new());
394                (ProcessingGroup::Error, envelope)
395            });
396        grouped_envelopes.extend(security_reports_items);
397
398        // Extract all the items which require an event into separate envelope.
399        let require_event_items = envelope.take_items_by(Item::requires_event);
400        if !require_event_items.is_empty() {
401            let group = if require_event_items
402                .iter()
403                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
404            {
405                ProcessingGroup::Transaction
406            } else {
407                ProcessingGroup::Error
408            };
409
410            grouped_envelopes.push((
411                group,
412                Envelope::from_parts(headers.clone(), require_event_items),
413            ))
414        }
415
416        // Get the rest of the envelopes, one per item.
417        let envelopes = envelope.items_mut().map(|item| {
418            let headers = headers.clone();
419            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
420            let envelope = Envelope::from_parts(headers, items);
421            let item_type = item.ty();
422            let group = if matches!(item_type, &ItemType::CheckIn) {
423                ProcessingGroup::CheckIn
424            } else if matches!(item.ty(), &ItemType::ClientReport) {
425                ProcessingGroup::ClientReport
426            } else if matches!(item_type, &ItemType::Unknown(_)) {
427                ProcessingGroup::ForwardUnknown
428            } else {
429                // Cannot group this item type.
430                ProcessingGroup::Ungrouped
431            };
432
433            (group, envelope)
434        });
435        grouped_envelopes.extend(envelopes);
436
437        grouped_envelopes
438    }
439
440    /// Returns the name of the group.
441    pub fn variant(&self) -> &'static str {
442        match self {
443            ProcessingGroup::Transaction => "transaction",
444            ProcessingGroup::Error => "error",
445            ProcessingGroup::Session => "session",
446            ProcessingGroup::Standalone => "standalone",
447            ProcessingGroup::ClientReport => "client_report",
448            ProcessingGroup::Replay => "replay",
449            ProcessingGroup::CheckIn => "check_in",
450            ProcessingGroup::Log => "log",
451            ProcessingGroup::TraceMetric => "trace_metric",
452            ProcessingGroup::Nel => "nel",
453            ProcessingGroup::Span => "span",
454            ProcessingGroup::SpanV2 => "span_v2",
455            ProcessingGroup::Metrics => "metrics",
456            ProcessingGroup::ProfileChunk => "profile_chunk",
457            ProcessingGroup::ForwardUnknown => "forward_unknown",
458            ProcessingGroup::Ungrouped => "ungrouped",
459        }
460    }
461}
462
463impl From<ProcessingGroup> for AppFeature {
464    fn from(value: ProcessingGroup) -> Self {
465        match value {
466            ProcessingGroup::Transaction => AppFeature::Transactions,
467            ProcessingGroup::Error => AppFeature::Errors,
468            ProcessingGroup::Session => AppFeature::Sessions,
469            ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
470            ProcessingGroup::ClientReport => AppFeature::ClientReports,
471            ProcessingGroup::Replay => AppFeature::Replays,
472            ProcessingGroup::CheckIn => AppFeature::CheckIns,
473            ProcessingGroup::Log => AppFeature::Logs,
474            ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
475            ProcessingGroup::Nel => AppFeature::Logs,
476            ProcessingGroup::Span => AppFeature::Spans,
477            ProcessingGroup::SpanV2 => AppFeature::Spans,
478            ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
479            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
480            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
481            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
482        }
483    }
484}
485
486/// An error returned when handling [`ProcessEnvelope`].
487#[derive(Debug, thiserror::Error)]
488pub enum ProcessingError {
489    #[error("invalid json in event")]
490    InvalidJson(#[source] serde_json::Error),
491
492    #[error("invalid message pack event payload")]
493    InvalidMsgpack(#[from] rmp_serde::decode::Error),
494
495    #[cfg(feature = "processing")]
496    #[error("invalid unreal crash report")]
497    InvalidUnrealReport(#[source] Unreal4Error),
498
499    #[error("event payload too large")]
500    PayloadTooLarge(DiscardItemType),
501
502    #[error("invalid transaction event")]
503    InvalidTransaction,
504
505    #[error("envelope processor failed")]
506    ProcessingFailed(#[from] ProcessingAction),
507
508    #[error("duplicate {0} in event")]
509    DuplicateItem(ItemType),
510
511    #[error("failed to extract event payload")]
512    NoEventPayload,
513
514    #[error("missing project id in DSN")]
515    MissingProjectId,
516
517    #[error("invalid security report type: {0:?}")]
518    InvalidSecurityType(Bytes),
519
520    #[error("unsupported security report type")]
521    UnsupportedSecurityType,
522
523    #[error("invalid security report")]
524    InvalidSecurityReport(#[source] serde_json::Error),
525
526    #[error("invalid nel report")]
527    InvalidNelReport(#[source] NetworkReportError),
528
529    #[error("event filtered with reason: {0:?}")]
530    EventFiltered(FilterStatKey),
531
532    #[error("missing or invalid required event timestamp")]
533    InvalidTimestamp,
534
535    #[error("could not serialize event payload")]
536    SerializeFailed(#[source] serde_json::Error),
537
538    #[cfg(feature = "processing")]
539    #[error("failed to apply quotas")]
540    QuotasFailed(#[from] RateLimitingError),
541
542    #[error("invalid pii config")]
543    PiiConfigError(PiiConfigError),
544
545    #[error("invalid processing group type")]
546    InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
547
548    #[error("invalid replay")]
549    InvalidReplay(DiscardReason),
550
551    #[error("replay filtered with reason: {0:?}")]
552    ReplayFiltered(FilterStatKey),
553
554    #[cfg(feature = "processing")]
555    #[error("nintendo switch dying message processing failed {0:?}")]
556    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
557
558    #[cfg(all(sentry, feature = "processing"))]
559    #[error("playstation dump processing failed: {0}")]
560    InvalidPlaystationDump(String),
561
562    #[error("processing group does not match specific processor")]
563    ProcessingGroupMismatch,
564    #[error("new processing pipeline failed")]
565    ProcessingFailure,
566}
567
568impl ProcessingError {
569    pub fn to_outcome(&self) -> Option<Outcome> {
570        match self {
571            Self::PayloadTooLarge(payload_type) => {
572                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
573            }
574            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
575            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
576            Self::InvalidSecurityType(_) => {
577                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
578            }
579            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
580            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
581            Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
582            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
583            Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
584            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
585            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
586            #[cfg(feature = "processing")]
587            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
588            #[cfg(all(sentry, feature = "processing"))]
589            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
590            #[cfg(feature = "processing")]
591            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
592                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
593            }
594            #[cfg(feature = "processing")]
595            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
596            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
597                Some(Outcome::Invalid(DiscardReason::Internal))
598            }
599            #[cfg(feature = "processing")]
600            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
601            Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
602            Self::MissingProjectId => None,
603            Self::EventFiltered(_) => None,
604            Self::InvalidProcessingGroup(_) => None,
605            Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
606            Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
607
608            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
609            // Outcomes are emitted in the new processing pipeline already.
610            Self::ProcessingFailure => None,
611        }
612    }
613
614    fn is_unexpected(&self) -> bool {
615        self.to_outcome()
616            .is_some_and(|outcome| outcome.is_unexpected())
617    }
618}
619
620#[cfg(feature = "processing")]
621impl From<Unreal4Error> for ProcessingError {
622    fn from(err: Unreal4Error) -> Self {
623        match err.kind() {
624            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
625            _ => ProcessingError::InvalidUnrealReport(err),
626        }
627    }
628}
629
630impl From<ExtractMetricsError> for ProcessingError {
631    fn from(error: ExtractMetricsError) -> Self {
632        match error {
633            ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
634                Self::InvalidTimestamp
635            }
636        }
637    }
638}
639
640impl From<InvalidProcessingGroupType> for ProcessingError {
641    fn from(value: InvalidProcessingGroupType) -> Self {
642        Self::InvalidProcessingGroup(Box::new(value))
643    }
644}
645
646type ExtractedEvent = (Annotated<Event>, usize);
647
648/// A container for extracted metrics during processing.
649///
650/// The container enforces that the extracted metrics are correctly tagged
651/// with the dynamic sampling decision.
652#[derive(Debug)]
653pub struct ProcessingExtractedMetrics {
654    metrics: ExtractedMetrics,
655}
656
657impl ProcessingExtractedMetrics {
658    pub fn new() -> Self {
659        Self {
660            metrics: ExtractedMetrics::default(),
661        }
662    }
663
664    pub fn into_inner(self) -> ExtractedMetrics {
665        self.metrics
666    }
667
668    /// Extends the contained metrics with [`ExtractedMetrics`].
669    pub fn extend(
670        &mut self,
671        extracted: ExtractedMetrics,
672        sampling_decision: Option<SamplingDecision>,
673    ) {
674        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
675        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
676    }
677
678    /// Extends the contained project metrics.
679    pub fn extend_project_metrics<I>(
680        &mut self,
681        buckets: I,
682        sampling_decision: Option<SamplingDecision>,
683    ) where
684        I: IntoIterator<Item = Bucket>,
685    {
686        self.metrics
687            .project_metrics
688            .extend(buckets.into_iter().map(|mut bucket| {
689                bucket.metadata.extracted_from_indexed =
690                    sampling_decision == Some(SamplingDecision::Keep);
691                bucket
692            }));
693    }
694
695    /// Extends the contained sampling metrics.
696    pub fn extend_sampling_metrics<I>(
697        &mut self,
698        buckets: I,
699        sampling_decision: Option<SamplingDecision>,
700    ) where
701        I: IntoIterator<Item = Bucket>,
702    {
703        self.metrics
704            .sampling_metrics
705            .extend(buckets.into_iter().map(|mut bucket| {
706                bucket.metadata.extracted_from_indexed =
707                    sampling_decision == Some(SamplingDecision::Keep);
708                bucket
709            }));
710    }
711
712    /// Applies rate limits to the contained metrics.
713    ///
714    /// This is used to apply rate limits which have been enforced on sampled items of an envelope
715    /// to also consistently apply to the metrics extracted from these items.
716    #[cfg(feature = "processing")]
717    fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
718        // Metric namespaces which need to be dropped.
719        let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
720        // Metrics belonging to this metric namespace need to have the `extracted_from_indexed`
721        // flag reset to `false`.
722        let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
723
724        for (namespace, limit, indexed) in [
725            (
726                MetricNamespace::Transactions,
727                &enforcement.event,
728                &enforcement.event_indexed,
729            ),
730            (
731                MetricNamespace::Spans,
732                &enforcement.spans,
733                &enforcement.spans_indexed,
734            ),
735        ] {
736            if limit.is_active() {
737                drop_namespaces.push(namespace);
738            } else if indexed.is_active() && !enforced_consistently {
739                // If the enforcement was not computed by consistently checking the limits,
740                // the quota for the metrics has not yet been incremented.
741                // In this case we have a dropped indexed payload but a metric which still needs to
742                // be accounted for, make sure the metric will still be rate limited.
743                reset_extracted_from_indexed.push(namespace);
744            }
745        }
746
747        if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
748            self.retain_mut(|bucket| {
749                let Some(namespace) = bucket.name.try_namespace() else {
750                    return true;
751                };
752
753                if drop_namespaces.contains(&namespace) {
754                    return false;
755                }
756
757                if reset_extracted_from_indexed.contains(&namespace) {
758                    bucket.metadata.extracted_from_indexed = false;
759                }
760
761                true
762            });
763        }
764    }
765
766    #[cfg(feature = "processing")]
767    fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
768        self.metrics.project_metrics.retain_mut(&mut f);
769        self.metrics.sampling_metrics.retain_mut(&mut f);
770    }
771}
772
773fn send_metrics(
774    metrics: ExtractedMetrics,
775    project_key: ProjectKey,
776    sampling_key: Option<ProjectKey>,
777    aggregator: &Addr<Aggregator>,
778) {
779    let ExtractedMetrics {
780        project_metrics,
781        sampling_metrics,
782    } = metrics;
783
784    if !project_metrics.is_empty() {
785        aggregator.send(MergeBuckets {
786            project_key,
787            buckets: project_metrics,
788        });
789    }
790
791    if !sampling_metrics.is_empty() {
792        // If no sampling project state is available, we associate the sampling
793        // metrics with the current project.
794        //
795        // project_without_tracing         -> metrics goes to self
796        // dependent_project_with_tracing  -> metrics goes to root
797        // root_project_with_tracing       -> metrics goes to root == self
798        let sampling_project_key = sampling_key.unwrap_or(project_key);
799        aggregator.send(MergeBuckets {
800            project_key: sampling_project_key,
801            buckets: sampling_metrics,
802        });
803    }
804}
805
806/// Function for on-off switches that filter specific item types (profiles, spans)
807/// based on a feature flag.
808///
809/// If the project config did not come from the upstream, we keep the items.
810fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
811    match config.relay_mode() {
812        RelayMode::Proxy => false,
813        RelayMode::Managed => !project_info.has_feature(feature),
814    }
815}
816
817/// The result of the envelope processing containing the processed envelope along with the partial
818/// result.
819#[derive(Debug)]
820#[expect(
821    clippy::large_enum_variant,
822    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
823)]
824enum ProcessingResult {
825    Envelope {
826        managed_envelope: TypedEnvelope<Processed>,
827        extracted_metrics: ProcessingExtractedMetrics,
828    },
829    Output(Output<Outputs>),
830}
831
832impl ProcessingResult {
833    /// Creates a [`ProcessingResult`] with no metrics.
834    fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
835        Self::Envelope {
836            managed_envelope,
837            extracted_metrics: ProcessingExtractedMetrics::new(),
838        }
839    }
840}
841
842/// All items which can be submitted upstream.
843#[derive(Debug)]
844#[expect(
845    clippy::large_enum_variant,
846    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
847)]
848enum Submit<'a> {
849    /// A processed envelope.
850    Envelope(TypedEnvelope<Processed>),
851    /// The output of a [`processing::Processor`].
852    Output {
853        output: Outputs,
854        ctx: processing::ForwardContext<'a>,
855    },
856}
857
858/// Applies processing to all contents of the given envelope.
859///
860/// Depending on the contents of the envelope and Relay's mode, this includes:
861///
862///  - Basic normalization and validation for all item types.
863///  - Clock drift correction if the required `sent_at` header is present.
864///  - Expansion of certain item types (e.g. unreal).
865///  - Store normalization for event payloads in processing mode.
866///  - Rate limiters and inbound filters on events in processing mode.
867#[derive(Debug)]
868pub struct ProcessEnvelope {
869    /// Envelope to process.
870    pub envelope: ManagedEnvelope,
871    /// The project info.
872    pub project_info: Arc<ProjectInfo>,
873    /// Currently active cached rate limits for this project.
874    pub rate_limits: Arc<RateLimits>,
875    /// Root sampling project info.
876    pub sampling_project_info: Option<Arc<ProjectInfo>>,
877    /// Sampling reservoir counters.
878    pub reservoir_counters: ReservoirCounters,
879}
880
881/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
882#[derive(Debug)]
883struct ProcessEnvelopeGrouped<'a> {
884    /// The group the envelope belongs to.
885    pub group: ProcessingGroup,
886    /// Envelope to process.
887    pub envelope: ManagedEnvelope,
888    /// The processing context.
889    pub ctx: processing::Context<'a>,
890}
891
892/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
893///
894/// This parses and validates the metrics:
895///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
896///    ignored independently.
897///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
898///    dropped together on parsing failure.
899///  - Other envelope items will be ignored with an error message.
900///
901/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
902/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
903#[derive(Debug)]
904pub struct ProcessMetrics {
905    /// A list of metric items.
906    pub data: MetricData,
907    /// The target project.
908    pub project_key: ProjectKey,
909    /// Whether to keep or reset the metric metadata.
910    pub source: BucketSource,
911    /// The wall clock time at which the request was received.
912    pub received_at: DateTime<Utc>,
913    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
914    /// correction.
915    pub sent_at: Option<DateTime<Utc>>,
916}
917
918/// Raw unparsed metric data.
919#[derive(Debug)]
920pub enum MetricData {
921    /// Raw data, unparsed envelope items.
922    Raw(Vec<Item>),
923    /// Already parsed buckets but unprocessed.
924    Parsed(Vec<Bucket>),
925}
926
927impl MetricData {
928    /// Consumes the metric data and parses the contained buckets.
929    ///
930    /// If the contained data is already parsed the buckets are returned unchanged.
931    /// Raw buckets are parsed and created with the passed `timestamp`.
932    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
933        let items = match self {
934            Self::Parsed(buckets) => return buckets,
935            Self::Raw(items) => items,
936        };
937
938        let mut buckets = Vec::new();
939        for item in items {
940            let payload = item.payload();
941            if item.ty() == &ItemType::Statsd {
942                for bucket_result in Bucket::parse_all(&payload, timestamp) {
943                    match bucket_result {
944                        Ok(bucket) => buckets.push(bucket),
945                        Err(error) => relay_log::debug!(
946                            error = &error as &dyn Error,
947                            "failed to parse metric bucket from statsd format",
948                        ),
949                    }
950                }
951            } else if item.ty() == &ItemType::MetricBuckets {
952                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
953                    Ok(parsed_buckets) => {
954                        // Re-use the allocation of `b` if possible.
955                        if buckets.is_empty() {
956                            buckets = parsed_buckets;
957                        } else {
958                            buckets.extend(parsed_buckets);
959                        }
960                    }
961                    Err(error) => {
962                        relay_log::debug!(
963                            error = &error as &dyn Error,
964                            "failed to parse metric bucket",
965                        );
966                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
967                    }
968                }
969            } else {
970                relay_log::error!(
971                    "invalid item of type {} passed to ProcessMetrics",
972                    item.ty()
973                );
974            }
975        }
976        buckets
977    }
978}
979
980#[derive(Debug)]
981pub struct ProcessBatchedMetrics {
982    /// Metrics payload in JSON format.
983    pub payload: Bytes,
984    /// Whether to keep or reset the metric metadata.
985    pub source: BucketSource,
986    /// The wall clock time at which the request was received.
987    pub received_at: DateTime<Utc>,
988    /// The wall clock time at which the request was received.
989    pub sent_at: Option<DateTime<Utc>>,
990}
991
992/// Source information where a metric bucket originates from.
993#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
994pub enum BucketSource {
995    /// The metric bucket originated from an internal Relay use case.
996    ///
997    /// The metric bucket originates either from within the same Relay
998    /// or was accepted coming from another Relay which is registered as
999    /// an internal Relay via Relay's configuration.
1000    Internal,
1001    /// The bucket source originated from an untrusted source.
1002    ///
1003    /// Managed Relays sending extracted metrics are considered external,
1004    /// it's a project use case but it comes from an untrusted source.
1005    External,
1006}
1007
1008impl BucketSource {
1009    /// Infers the bucket source from [`RequestMeta::request_trust`].
1010    pub fn from_meta(meta: &RequestMeta) -> Self {
1011        match meta.request_trust() {
1012            RequestTrust::Trusted => Self::Internal,
1013            RequestTrust::Untrusted => Self::External,
1014        }
1015    }
1016}
1017
1018/// Sends a client report to the upstream.
1019#[derive(Debug)]
1020pub struct SubmitClientReports {
1021    /// The client report to be sent.
1022    pub client_reports: Vec<ClientReport>,
1023    /// Scoping information for the client report.
1024    pub scoping: Scoping,
1025}
1026
1027/// CPU-intensive processing tasks for envelopes.
1028#[derive(Debug)]
1029pub enum EnvelopeProcessor {
1030    ProcessEnvelope(Box<ProcessEnvelope>),
1031    ProcessProjectMetrics(Box<ProcessMetrics>),
1032    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1033    FlushBuckets(Box<FlushBuckets>),
1034    SubmitClientReports(Box<SubmitClientReports>),
1035}
1036
1037impl EnvelopeProcessor {
1038    /// Returns the name of the message variant.
1039    pub fn variant(&self) -> &'static str {
1040        match self {
1041            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1042            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1043            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1044            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1045            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1046        }
1047    }
1048}
1049
1050impl relay_system::Interface for EnvelopeProcessor {}
1051
1052impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1053    type Response = relay_system::NoResponse;
1054
1055    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1056        Self::ProcessEnvelope(Box::new(message))
1057    }
1058}
1059
1060impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1061    type Response = NoResponse;
1062
1063    fn from_message(message: ProcessMetrics, _: ()) -> Self {
1064        Self::ProcessProjectMetrics(Box::new(message))
1065    }
1066}
1067
1068impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1069    type Response = NoResponse;
1070
1071    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1072        Self::ProcessBatchedMetrics(Box::new(message))
1073    }
1074}
1075
1076impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1077    type Response = NoResponse;
1078
1079    fn from_message(message: FlushBuckets, _: ()) -> Self {
1080        Self::FlushBuckets(Box::new(message))
1081    }
1082}
1083
1084impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1085    type Response = NoResponse;
1086
1087    fn from_message(message: SubmitClientReports, _: ()) -> Self {
1088        Self::SubmitClientReports(Box::new(message))
1089    }
1090}
1091
1092/// The asynchronous thread pool used for scheduling processing tasks in the processor.
1093pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1094
1095/// Service implementing the [`EnvelopeProcessor`] interface.
1096///
1097/// This service handles messages in a worker pool with configurable concurrency.
1098#[derive(Clone)]
1099pub struct EnvelopeProcessorService {
1100    inner: Arc<InnerProcessor>,
1101}
1102
1103/// Contains the addresses of services that the processor publishes to.
1104pub struct Addrs {
1105    pub outcome_aggregator: Addr<TrackOutcome>,
1106    pub upstream_relay: Addr<UpstreamRelay>,
1107    #[cfg(feature = "processing")]
1108    pub upload: Option<Addr<UploadAttachments>>,
1109    #[cfg(feature = "processing")]
1110    pub store_forwarder: Option<Addr<Store>>,
1111    pub aggregator: Addr<Aggregator>,
1112    #[cfg(feature = "processing")]
1113    pub global_rate_limits: Option<Addr<GlobalRateLimits>>,
1114}
1115
1116impl Default for Addrs {
1117    fn default() -> Self {
1118        Addrs {
1119            outcome_aggregator: Addr::dummy(),
1120            upstream_relay: Addr::dummy(),
1121            #[cfg(feature = "processing")]
1122            upload: None,
1123            #[cfg(feature = "processing")]
1124            store_forwarder: None,
1125            aggregator: Addr::dummy(),
1126            #[cfg(feature = "processing")]
1127            global_rate_limits: None,
1128        }
1129    }
1130}
1131
1132struct InnerProcessor {
1133    pool: EnvelopeProcessorServicePool,
1134    config: Arc<Config>,
1135    global_config: GlobalConfigHandle,
1136    project_cache: ProjectCacheHandle,
1137    cogs: Cogs,
1138    #[cfg(feature = "processing")]
1139    quotas_client: Option<AsyncRedisClient>,
1140    addrs: Addrs,
1141    #[cfg(feature = "processing")]
1142    rate_limiter: Option<Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>>,
1143    geoip_lookup: GeoIpLookup,
1144    #[cfg(feature = "processing")]
1145    cardinality_limiter: Option<CardinalityLimiter>,
1146    metric_outcomes: MetricOutcomes,
1147    processing: Processing,
1148}
1149
1150struct Processing {
1151    logs: LogsProcessor,
1152    trace_metrics: TraceMetricsProcessor,
1153    spans: SpansProcessor,
1154    check_ins: CheckInsProcessor,
1155    sessions: SessionsProcessor,
1156}
1157
1158impl EnvelopeProcessorService {
1159    /// Creates a multi-threaded envelope processor.
1160    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1161    pub fn new(
1162        pool: EnvelopeProcessorServicePool,
1163        config: Arc<Config>,
1164        global_config: GlobalConfigHandle,
1165        project_cache: ProjectCacheHandle,
1166        cogs: Cogs,
1167        #[cfg(feature = "processing")] redis: Option<RedisClients>,
1168        addrs: Addrs,
1169        metric_outcomes: MetricOutcomes,
1170    ) -> Self {
1171        let geoip_lookup = config
1172            .geoip_path()
1173            .and_then(
1174                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1175                    Ok(geoip) => Some(geoip),
1176                    Err(err) => {
1177                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1178                        None
1179                    }
1180                },
1181            )
1182            .unwrap_or_else(GeoIpLookup::empty);
1183
1184        #[cfg(feature = "processing")]
1185        let (cardinality, quotas) = match redis {
1186            Some(RedisClients {
1187                cardinality,
1188                quotas,
1189                ..
1190            }) => (Some(cardinality), Some(quotas)),
1191            None => (None, None),
1192        };
1193
1194        #[cfg(feature = "processing")]
1195        let global_rate_limits = addrs.global_rate_limits.clone().map(Into::into);
1196
1197        #[cfg(feature = "processing")]
1198        let rate_limiter = match (quotas.clone(), global_rate_limits) {
1199            (Some(redis), Some(global)) => {
1200                Some(RedisRateLimiter::new(redis, global).max_limit(config.max_rate_limit()))
1201            }
1202            _ => None,
1203        };
1204
1205        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1206            #[cfg(feature = "processing")]
1207            project_cache.clone(),
1208            #[cfg(feature = "processing")]
1209            rate_limiter.clone(),
1210        ));
1211        #[cfg(feature = "processing")]
1212        let rate_limiter = rate_limiter.map(Arc::new);
1213
1214        let inner = InnerProcessor {
1215            pool,
1216            global_config,
1217            project_cache,
1218            cogs,
1219            #[cfg(feature = "processing")]
1220            quotas_client: quotas.clone(),
1221            #[cfg(feature = "processing")]
1222            rate_limiter,
1223            addrs,
1224            #[cfg(feature = "processing")]
1225            cardinality_limiter: cardinality
1226                .map(|cardinality| {
1227                    RedisSetLimiter::new(
1228                        RedisSetLimiterOptions {
1229                            cache_vacuum_interval: config
1230                                .cardinality_limiter_cache_vacuum_interval(),
1231                        },
1232                        cardinality,
1233                    )
1234                })
1235                .map(CardinalityLimiter::new),
1236            metric_outcomes,
1237            processing: Processing {
1238                logs: LogsProcessor::new(Arc::clone(&quota_limiter)),
1239                trace_metrics: TraceMetricsProcessor::new(Arc::clone(&quota_limiter)),
1240                spans: SpansProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1241                check_ins: CheckInsProcessor::new(Arc::clone(&quota_limiter)),
1242                sessions: SessionsProcessor::new(quota_limiter),
1243            },
1244            geoip_lookup,
1245            config,
1246        };
1247
1248        Self {
1249            inner: Arc::new(inner),
1250        }
1251    }
1252
1253    async fn enforce_quotas<Group>(
1254        &self,
1255        managed_envelope: &mut TypedEnvelope<Group>,
1256        event: Annotated<Event>,
1257        extracted_metrics: &mut ProcessingExtractedMetrics,
1258        ctx: processing::Context<'_>,
1259    ) -> Result<Annotated<Event>, ProcessingError> {
1260        // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not
1261        // applied in the fast path, all cached quotas can be applied here.
1262        let cached_result = RateLimiter::Cached
1263            .enforce(managed_envelope, event, extracted_metrics, ctx)
1264            .await?;
1265
1266        if_processing!(self.inner.config, {
1267            let rate_limiter = match self.inner.rate_limiter.clone() {
1268                Some(rate_limiter) => rate_limiter,
1269                None => return Ok(cached_result.event),
1270            };
1271
1272            // Enforce all quotas consistently with Redis.
1273            let consistent_result = RateLimiter::Consistent(rate_limiter)
1274                .enforce(
1275                    managed_envelope,
1276                    cached_result.event,
1277                    extracted_metrics,
1278                    ctx
1279                )
1280                .await?;
1281
1282            // Update cached rate limits with the freshly computed ones.
1283            if !consistent_result.rate_limits.is_empty() {
1284                self.inner
1285                    .project_cache
1286                    .get(managed_envelope.scoping().project_key)
1287                    .rate_limits()
1288                    .merge(consistent_result.rate_limits);
1289            }
1290
1291            Ok(consistent_result.event)
1292        } else { Ok(cached_result.event) })
1293    }
1294
1295    /// Processes the general errors, and the items which require or create the events.
1296    async fn process_errors(
1297        &self,
1298        managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1299        project_id: ProjectId,
1300        mut ctx: processing::Context<'_>,
1301    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1302        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1303        let mut metrics = Metrics::default();
1304        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1305
1306        // Events can also contain user reports.
1307        report::process_user_reports(managed_envelope);
1308
1309        if_processing!(self.inner.config, {
1310            unreal::expand(managed_envelope, &self.inner.config)?;
1311            #[cfg(sentry)]
1312            playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1313            nnswitch::expand(managed_envelope)?;
1314        });
1315
1316        let extraction_result = event::extract(
1317            managed_envelope,
1318            &mut metrics,
1319            event_fully_normalized,
1320            &self.inner.config,
1321        )?;
1322        let mut event = extraction_result.event;
1323
1324        if_processing!(self.inner.config, {
1325            if let Some(inner_event_fully_normalized) =
1326                unreal::process(managed_envelope, &mut event)?
1327            {
1328                event_fully_normalized = inner_event_fully_normalized;
1329            }
1330            #[cfg(sentry)]
1331            if let Some(inner_event_fully_normalized) =
1332                playstation::process(managed_envelope, &mut event, ctx.project_info)?
1333            {
1334                event_fully_normalized = inner_event_fully_normalized;
1335            }
1336            if let Some(inner_event_fully_normalized) =
1337                attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1338            {
1339                event_fully_normalized = inner_event_fully_normalized;
1340            }
1341        });
1342
1343        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1344            managed_envelope,
1345            &mut event,
1346            ctx.project_info,
1347            ctx.sampling_project_info,
1348        );
1349
1350        let attachments = managed_envelope
1351            .envelope()
1352            .items()
1353            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1354        processing::utils::event::finalize(
1355            managed_envelope.envelope().headers(),
1356            &mut event,
1357            attachments,
1358            &mut metrics,
1359            ctx.config,
1360        )?;
1361        event_fully_normalized = processing::utils::event::normalize(
1362            managed_envelope.envelope().headers(),
1363            &mut event,
1364            event_fully_normalized,
1365            project_id,
1366            ctx,
1367            &self.inner.geoip_lookup,
1368        )?;
1369        let filter_run =
1370            processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1371                .map_err(|err| {
1372                managed_envelope.reject(Outcome::Filtered(err.clone()));
1373                ProcessingError::EventFiltered(err)
1374            })?;
1375
1376        if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1377            dynamic_sampling::tag_error_with_sampling_decision(
1378                managed_envelope,
1379                &mut event,
1380                ctx.sampling_project_info,
1381                &self.inner.config,
1382            )
1383            .await;
1384        }
1385
1386        event = self
1387            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1388            .await?;
1389
1390        if event.value().is_some() {
1391            processing::utils::event::scrub(&mut event, ctx.project_info)?;
1392            event::serialize(
1393                managed_envelope,
1394                &mut event,
1395                event_fully_normalized,
1396                EventMetricsExtracted(false),
1397                SpansExtracted(false),
1398            )?;
1399            event::emit_feedback_metrics(managed_envelope.envelope());
1400        }
1401
1402        let attachments = managed_envelope
1403            .envelope_mut()
1404            .items_mut()
1405            .filter(|i| i.ty() == &ItemType::Attachment);
1406        processing::utils::attachments::scrub(attachments, ctx.project_info);
1407
1408        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1409            relay_log::error!(
1410                tags.project = %project_id,
1411                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1412                "ingested event without normalizing"
1413            );
1414        }
1415
1416        Ok(Some(extracted_metrics))
1417    }
1418
1419    /// Processes only transactions and transaction-related items.
1420    #[allow(unused_assignments)]
1421    async fn process_transactions(
1422        &self,
1423        managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1424        cogs: &mut Token,
1425        project_id: ProjectId,
1426        mut ctx: processing::Context<'_>,
1427    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1428        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1429        let mut event_metrics_extracted = EventMetricsExtracted(false);
1430        let mut spans_extracted = SpansExtracted(false);
1431        let mut metrics = Metrics::default();
1432        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1433
1434        // We extract the main event from the envelope.
1435        let extraction_result = event::extract(
1436            managed_envelope,
1437            &mut metrics,
1438            event_fully_normalized,
1439            &self.inner.config,
1440        )?;
1441
1442        // If metrics were extracted we mark that.
1443        if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1444            event_metrics_extracted = inner_event_metrics_extracted;
1445        }
1446        if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1447            spans_extracted = inner_spans_extracted;
1448        };
1449
1450        // We take the main event out of the result.
1451        let mut event = extraction_result.event;
1452
1453        let profile_id = profile::filter(
1454            managed_envelope,
1455            &event,
1456            ctx.config,
1457            project_id,
1458            ctx.project_info,
1459        );
1460        processing::transactions::profile::transfer_id(&mut event, profile_id);
1461        processing::transactions::profile::remove_context_if_rate_limited(
1462            &mut event,
1463            managed_envelope.scoping(),
1464            ctx,
1465        );
1466
1467        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1468            managed_envelope,
1469            &mut event,
1470            ctx.project_info,
1471            ctx.sampling_project_info,
1472        );
1473
1474        let attachments = managed_envelope
1475            .envelope()
1476            .items()
1477            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1478        processing::utils::event::finalize(
1479            managed_envelope.envelope().headers(),
1480            &mut event,
1481            attachments,
1482            &mut metrics,
1483            &self.inner.config,
1484        )?;
1485
1486        event_fully_normalized = processing::utils::event::normalize(
1487            managed_envelope.envelope().headers(),
1488            &mut event,
1489            event_fully_normalized,
1490            project_id,
1491            ctx,
1492            &self.inner.geoip_lookup,
1493        )?;
1494
1495        let filter_run =
1496            processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1497                .map_err(|err| {
1498                managed_envelope.reject(Outcome::Filtered(err.clone()));
1499                ProcessingError::EventFiltered(err)
1500            })?;
1501
1502        // Always run dynamic sampling on processing Relays,
1503        // but delay decision until inbound filters have been fully processed.
1504        // Also, we require transaction metrics to be enabled before sampling.
1505        let run_dynamic_sampling = (matches!(filter_run, FiltersStatus::Ok)
1506            || self.inner.config.processing_enabled())
1507            && matches!(&ctx.project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled());
1508
1509        let sampling_result = match run_dynamic_sampling {
1510            true => {
1511                #[allow(unused_mut)]
1512                let mut reservoir = ReservoirEvaluator::new(Arc::clone(ctx.reservoir_counters));
1513                #[cfg(feature = "processing")]
1514                if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
1515                    reservoir.set_redis(managed_envelope.scoping().organization_id, quotas_client);
1516                }
1517                processing::utils::dynamic_sampling::run(
1518                    managed_envelope.envelope().headers().dsc(),
1519                    &event,
1520                    &ctx,
1521                    Some(&reservoir),
1522                )
1523                .await
1524            }
1525            false => SamplingResult::Pending,
1526        };
1527
1528        relay_statsd::metric!(
1529            counter(RelayCounters::SamplingDecision) += 1,
1530            decision = sampling_result.decision().as_str(),
1531            item = "transaction"
1532        );
1533
1534        #[cfg(feature = "processing")]
1535        let server_sample_rate = sampling_result.sample_rate();
1536
1537        if let Some(outcome) = sampling_result.into_dropped_outcome() {
1538            // Process profiles before dropping the transaction, if necessary.
1539            // Before metric extraction to make sure the profile count is reflected correctly.
1540            profile::process(
1541                managed_envelope,
1542                &mut event,
1543                ctx.global_config,
1544                ctx.config,
1545                ctx.project_info,
1546            );
1547            // Extract metrics here, we're about to drop the event/transaction.
1548            event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1549                &mut event,
1550                &mut extracted_metrics,
1551                ExtractMetricsContext {
1552                    dsc: managed_envelope.envelope().dsc(),
1553                    project_id,
1554                    ctx,
1555                    sampling_decision: SamplingDecision::Drop,
1556                    metrics_extracted: event_metrics_extracted.0,
1557                    spans_extracted: spans_extracted.0,
1558                },
1559            )?;
1560
1561            dynamic_sampling::drop_unsampled_items(
1562                managed_envelope,
1563                event,
1564                outcome,
1565                spans_extracted,
1566            );
1567
1568            // At this point we have:
1569            //  - An empty envelope.
1570            //  - An envelope containing only processed profiles.
1571            // We need to make sure there are enough quotas for these profiles.
1572            event = self
1573                .enforce_quotas(
1574                    managed_envelope,
1575                    Annotated::empty(),
1576                    &mut extracted_metrics,
1577                    ctx,
1578                )
1579                .await?;
1580
1581            return Ok(Some(extracted_metrics));
1582        }
1583
1584        let _post_ds = cogs.start_category("post_ds");
1585
1586        // Need to scrub the transaction before extracting spans.
1587        //
1588        // Unconditionally scrub to make sure PII is removed as early as possible.
1589        processing::utils::event::scrub(&mut event, ctx.project_info)?;
1590
1591        let attachments = managed_envelope
1592            .envelope_mut()
1593            .items_mut()
1594            .filter(|i| i.ty() == &ItemType::Attachment);
1595        processing::utils::attachments::scrub(attachments, ctx.project_info);
1596
1597        if_processing!(self.inner.config, {
1598            // Process profiles before extracting metrics, to make sure they are removed if they are invalid.
1599            let profile_id = profile::process(
1600                managed_envelope,
1601                &mut event,
1602                ctx.global_config,
1603                ctx.config,
1604                ctx.project_info,
1605            );
1606            processing::transactions::profile::transfer_id(&mut event, profile_id);
1607            processing::transactions::profile::scrub_profiler_id(&mut event);
1608
1609            // Always extract metrics in processing Relays for sampled items.
1610            event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1611                &mut event,
1612                &mut extracted_metrics,
1613                ExtractMetricsContext {
1614                    dsc: managed_envelope.envelope().dsc(),
1615                    project_id,
1616                    ctx,
1617                    sampling_decision: SamplingDecision::Keep,
1618                    metrics_extracted: event_metrics_extracted.0,
1619                    spans_extracted: spans_extracted.0,
1620                },
1621            )?;
1622
1623            if let Some(spans) = processing::transactions::spans::extract_from_event(
1624                managed_envelope.envelope().dsc(),
1625                &event,
1626                ctx.global_config,
1627                ctx.config,
1628                server_sample_rate,
1629                event_metrics_extracted,
1630                spans_extracted,
1631            ) {
1632                spans_extracted = SpansExtracted(true);
1633                for item in spans {
1634                    match item {
1635                        Ok(item) => managed_envelope.envelope_mut().add_item(item),
1636                        Err(()) => managed_envelope.track_outcome(
1637                            Outcome::Invalid(DiscardReason::InvalidSpan),
1638                            DataCategory::SpanIndexed,
1639                            1,
1640                        ),
1641                        // TODO: also `DataCategory::Span`?
1642                    }
1643                }
1644            }
1645        });
1646
1647        event = self
1648            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1649            .await?;
1650
1651        // Event may have been dropped because of a quota and the envelope can be empty.
1652        if event.value().is_some() {
1653            event::serialize(
1654                managed_envelope,
1655                &mut event,
1656                event_fully_normalized,
1657                event_metrics_extracted,
1658                spans_extracted,
1659            )?;
1660        }
1661
1662        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1663            relay_log::error!(
1664                tags.project = %project_id,
1665                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1666                "ingested event without normalizing"
1667            );
1668        };
1669
1670        Ok(Some(extracted_metrics))
1671    }
1672
1673    async fn process_profile_chunks(
1674        &self,
1675        managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
1676        ctx: processing::Context<'_>,
1677    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1678        profile_chunk::filter(managed_envelope, ctx.project_info);
1679
1680        if_processing!(self.inner.config, {
1681            profile_chunk::process(
1682                managed_envelope,
1683                ctx.project_info,
1684                ctx.global_config,
1685                ctx.config,
1686            );
1687        });
1688
1689        self.enforce_quotas(
1690            managed_envelope,
1691            Annotated::empty(),
1692            &mut ProcessingExtractedMetrics::new(),
1693            ctx,
1694        )
1695        .await?;
1696
1697        Ok(None)
1698    }
1699
1700    /// Processes standalone items that require an event ID, but do not have an event on the same envelope.
1701    async fn process_standalone(
1702        &self,
1703        managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1704        project_id: ProjectId,
1705        ctx: processing::Context<'_>,
1706    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1707        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1708
1709        standalone::process(managed_envelope);
1710
1711        profile::filter(
1712            managed_envelope,
1713            &Annotated::empty(),
1714            ctx.config,
1715            project_id,
1716            ctx.project_info,
1717        );
1718
1719        self.enforce_quotas(
1720            managed_envelope,
1721            Annotated::empty(),
1722            &mut extracted_metrics,
1723            ctx,
1724        )
1725        .await?;
1726
1727        report::process_user_reports(managed_envelope);
1728        let attachments = managed_envelope
1729            .envelope_mut()
1730            .items_mut()
1731            .filter(|i| i.ty() == &ItemType::Attachment);
1732        processing::utils::attachments::scrub(attachments, ctx.project_info);
1733
1734        Ok(Some(extracted_metrics))
1735    }
1736
1737    /// Processes user and client reports.
1738    async fn process_client_reports(
1739        &self,
1740        managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1741        ctx: processing::Context<'_>,
1742    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1743        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1744
1745        self.enforce_quotas(
1746            managed_envelope,
1747            Annotated::empty(),
1748            &mut extracted_metrics,
1749            ctx,
1750        )
1751        .await?;
1752
1753        report::process_client_reports(
1754            managed_envelope,
1755            ctx.config,
1756            ctx.project_info,
1757            self.inner.addrs.outcome_aggregator.clone(),
1758        );
1759
1760        Ok(Some(extracted_metrics))
1761    }
1762
1763    /// Processes replays.
1764    async fn process_replays(
1765        &self,
1766        managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1767        ctx: processing::Context<'_>,
1768    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1769        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1770
1771        replay::process(
1772            managed_envelope,
1773            ctx.global_config,
1774            ctx.config,
1775            ctx.project_info,
1776            &self.inner.geoip_lookup,
1777        )?;
1778
1779        self.enforce_quotas(
1780            managed_envelope,
1781            Annotated::empty(),
1782            &mut extracted_metrics,
1783            ctx,
1784        )
1785        .await?;
1786
1787        Ok(Some(extracted_metrics))
1788    }
1789
1790    async fn process_nel(
1791        &self,
1792        mut managed_envelope: ManagedEnvelope,
1793        ctx: processing::Context<'_>,
1794    ) -> Result<ProcessingResult, ProcessingError> {
1795        nel::convert_to_logs(&mut managed_envelope);
1796        self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1797            .await
1798    }
1799
1800    async fn process_with_processor<P: processing::Processor>(
1801        &self,
1802        processor: &P,
1803        mut managed_envelope: ManagedEnvelope,
1804        ctx: processing::Context<'_>,
1805    ) -> Result<ProcessingResult, ProcessingError>
1806    where
1807        Outputs: From<P::Output>,
1808    {
1809        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1810            debug_assert!(
1811                false,
1812                "there must be work for the {} processor",
1813                std::any::type_name::<P>(),
1814            );
1815            return Err(ProcessingError::ProcessingGroupMismatch);
1816        };
1817
1818        managed_envelope.update();
1819        match managed_envelope.envelope().is_empty() {
1820            true => managed_envelope.accept(),
1821            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1822        }
1823
1824        processor
1825            .process(work, ctx)
1826            .await
1827            .map_err(|err| {
1828                relay_log::debug!(
1829                    error = &err as &dyn std::error::Error,
1830                    "processing pipeline failed"
1831                );
1832                ProcessingError::ProcessingFailure
1833            })
1834            .map(|o| o.map(Into::into))
1835            .map(ProcessingResult::Output)
1836    }
1837
1838    /// Processes standalone spans.
1839    ///
1840    /// This function does *not* run for spans extracted from transactions.
1841    async fn process_standalone_spans(
1842        &self,
1843        managed_envelope: &mut TypedEnvelope<SpanGroup>,
1844        _project_id: ProjectId,
1845        ctx: processing::Context<'_>,
1846    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1847        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1848
1849        span::filter(managed_envelope, ctx.config, ctx.project_info);
1850        span::convert_otel_traces_data(managed_envelope);
1851
1852        if_processing!(self.inner.config, {
1853            span::process(
1854                managed_envelope,
1855                &mut Annotated::empty(),
1856                &mut extracted_metrics,
1857                _project_id,
1858                ctx,
1859                &self.inner.geoip_lookup,
1860            )
1861            .await;
1862        });
1863
1864        self.enforce_quotas(
1865            managed_envelope,
1866            Annotated::empty(),
1867            &mut extracted_metrics,
1868            ctx,
1869        )
1870        .await?;
1871
1872        Ok(Some(extracted_metrics))
1873    }
1874
1875    async fn process_envelope(
1876        &self,
1877        cogs: &mut Token,
1878        project_id: ProjectId,
1879        message: ProcessEnvelopeGrouped<'_>,
1880    ) -> Result<ProcessingResult, ProcessingError> {
1881        let ProcessEnvelopeGrouped {
1882            group,
1883            envelope: mut managed_envelope,
1884            ctx,
1885        } = message;
1886
1887        // Pre-process the envelope headers.
1888        if let Some(sampling_state) = ctx.sampling_project_info {
1889            // Both transactions and standalone span envelopes need a normalized DSC header
1890            // to make sampling rules based on the segment/transaction name work correctly.
1891            managed_envelope
1892                .envelope_mut()
1893                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1894        }
1895
1896        // Set the event retention. Effectively, this value will only be available in processing
1897        // mode when the full project config is queried from the upstream.
1898        if let Some(retention) = ctx.project_info.config.event_retention {
1899            managed_envelope.envelope_mut().set_retention(retention);
1900        }
1901
1902        // Set the event retention. Effectively, this value will only be available in processing
1903        // mode when the full project config is queried from the upstream.
1904        if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1905            managed_envelope
1906                .envelope_mut()
1907                .set_downsampled_retention(retention);
1908        }
1909
1910        // Ensure the project ID is updated to the stored instance for this project cache. This can
1911        // differ in two cases:
1912        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
1913        //  2. The DSN was moved and the envelope sent to the old project ID.
1914        managed_envelope
1915            .envelope_mut()
1916            .meta_mut()
1917            .set_project_id(project_id);
1918
1919        macro_rules! run {
1920            ($fn_name:ident $(, $args:expr)*) => {
1921                async {
1922                    let mut managed_envelope = (managed_envelope, group).try_into()?;
1923                    match self.$fn_name(&mut managed_envelope, $($args),*).await {
1924                        Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1925                            managed_envelope: managed_envelope.into_processed(),
1926                            extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1927                        }),
1928                        Err(error) => {
1929                            relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1930                            if let Some(outcome) = error.to_outcome() {
1931                                managed_envelope.reject(outcome);
1932                            }
1933
1934                            return Err(error);
1935                        }
1936                    }
1937                }.await
1938            };
1939        }
1940
1941        relay_log::trace!("Processing {group} group", group = group.variant());
1942
1943        match group {
1944            ProcessingGroup::Error => run!(process_errors, project_id, ctx),
1945            ProcessingGroup::Transaction => {
1946                run!(process_transactions, cogs, project_id, ctx)
1947            }
1948            ProcessingGroup::Session => {
1949                self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1950                    .await
1951            }
1952            ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx),
1953            ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
1954            ProcessingGroup::Replay => {
1955                run!(process_replays, ctx)
1956            }
1957            ProcessingGroup::CheckIn => {
1958                self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1959                    .await
1960            }
1961            ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1962            ProcessingGroup::Log => {
1963                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1964                    .await
1965            }
1966            ProcessingGroup::TraceMetric => {
1967                self.process_with_processor(
1968                    &self.inner.processing.trace_metrics,
1969                    managed_envelope,
1970                    ctx,
1971                )
1972                .await
1973            }
1974            ProcessingGroup::SpanV2 => {
1975                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1976                    .await
1977            }
1978            ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1979            ProcessingGroup::ProfileChunk => {
1980                run!(process_profile_chunks, ctx)
1981            }
1982            // Currently is not used.
1983            ProcessingGroup::Metrics => {
1984                // In proxy mode we simply forward the metrics.
1985                // This group shouldn't be used outside of proxy mode.
1986                if self.inner.config.relay_mode() != RelayMode::Proxy {
1987                    relay_log::error!(
1988                        tags.project = %project_id,
1989                        items = ?managed_envelope.envelope().items().next().map(Item::ty),
1990                        "received metrics in the process_state"
1991                    );
1992                }
1993
1994                Ok(ProcessingResult::no_metrics(
1995                    managed_envelope.into_processed(),
1996                ))
1997            }
1998            // Fallback to the legacy process_state implementation for Ungrouped events.
1999            ProcessingGroup::Ungrouped => {
2000                relay_log::error!(
2001                    tags.project = %project_id,
2002                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
2003                    "could not identify the processing group based on the envelope's items"
2004                );
2005
2006                Ok(ProcessingResult::no_metrics(
2007                    managed_envelope.into_processed(),
2008                ))
2009            }
2010            // Leave this group unchanged.
2011            //
2012            // This will later be forwarded to upstream.
2013            ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
2014                managed_envelope.into_processed(),
2015            )),
2016        }
2017    }
2018
2019    /// Processes the envelope and returns the processed envelope back.
2020    ///
2021    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
2022    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
2023    /// to be dropped, this is `None`.
2024    async fn process<'a>(
2025        &self,
2026        cogs: &mut Token,
2027        mut message: ProcessEnvelopeGrouped<'a>,
2028    ) -> Result<Option<Submit<'a>>, ProcessingError> {
2029        let ProcessEnvelopeGrouped {
2030            ref mut envelope,
2031            ctx,
2032            ..
2033        } = message;
2034
2035        // Prefer the project's project ID, and fall back to the stated project id from the
2036        // envelope. The project ID is available in all modes, other than in proxy mode, where
2037        // envelopes for unknown projects are forwarded blindly.
2038        //
2039        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
2040        // since we cannot process an envelope without project ID, so drop it.
2041        let Some(project_id) = ctx
2042            .project_info
2043            .project_id
2044            .or_else(|| envelope.envelope().meta().project_id())
2045        else {
2046            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2047            return Err(ProcessingError::MissingProjectId);
2048        };
2049
2050        let client = envelope.envelope().meta().client().map(str::to_owned);
2051        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2052        let project_key = envelope.envelope().meta().public_key();
2053        // Only allow sending to the sampling key, if we successfully loaded a sampling project
2054        // info relating to it. This filters out unknown/invalid project keys as well as project
2055        // keys from different organizations.
2056        let sampling_key = envelope
2057            .envelope()
2058            .sampling_key()
2059            .filter(|_| ctx.sampling_project_info.is_some());
2060
2061        // We set additional information on the scope, which will be removed after processing the
2062        // envelope.
2063        relay_log::configure_scope(|scope| {
2064            scope.set_tag("project", project_id);
2065            if let Some(client) = client {
2066                scope.set_tag("sdk", client);
2067            }
2068            if let Some(user_agent) = user_agent {
2069                scope.set_extra("user_agent", user_agent.into());
2070            }
2071        });
2072
2073        let result = match self.process_envelope(cogs, project_id, message).await {
2074            Ok(ProcessingResult::Envelope {
2075                mut managed_envelope,
2076                extracted_metrics,
2077            }) => {
2078                // The envelope could be modified or even emptied during processing, which
2079                // requires re-computation of the context.
2080                managed_envelope.update();
2081
2082                let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2083                send_metrics(
2084                    extracted_metrics.metrics,
2085                    project_key,
2086                    sampling_key,
2087                    &self.inner.addrs.aggregator,
2088                );
2089
2090                let envelope_response = if managed_envelope.envelope().is_empty() {
2091                    if !has_metrics {
2092                        // Individual rate limits have already been issued
2093                        managed_envelope.reject(Outcome::RateLimited(None));
2094                    } else {
2095                        managed_envelope.accept();
2096                    }
2097
2098                    None
2099                } else {
2100                    Some(managed_envelope)
2101                };
2102
2103                Ok(envelope_response.map(Submit::Envelope))
2104            }
2105            Ok(ProcessingResult::Output(Output { main, metrics })) => {
2106                if let Some(metrics) = metrics {
2107                    metrics.accept(|metrics| {
2108                        send_metrics(
2109                            metrics,
2110                            project_key,
2111                            sampling_key,
2112                            &self.inner.addrs.aggregator,
2113                        );
2114                    });
2115                }
2116
2117                let ctx = ctx.to_forward();
2118                Ok(main.map(|output| Submit::Output { output, ctx }))
2119            }
2120            Err(err) => Err(err),
2121        };
2122
2123        relay_log::configure_scope(|scope| {
2124            scope.remove_tag("project");
2125            scope.remove_tag("sdk");
2126            scope.remove_tag("user_agent");
2127        });
2128
2129        result
2130    }
2131
2132    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2133        let project_key = message.envelope.envelope().meta().public_key();
2134        let wait_time = message.envelope.age();
2135        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2136
2137        // This COGS handling may need an overhaul in the future:
2138        // Cancel the passed in token, to start individual measurements per envelope instead.
2139        cogs.cancel();
2140
2141        let scoping = message.envelope.scoping();
2142        for (group, envelope) in ProcessingGroup::split_envelope(
2143            *message.envelope.into_envelope(),
2144            &message.project_info,
2145        ) {
2146            let mut cogs = self
2147                .inner
2148                .cogs
2149                .timed(ResourceId::Relay, AppFeature::from(group));
2150
2151            let mut envelope =
2152                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2153            envelope.scope(scoping);
2154
2155            let global_config = self.inner.global_config.current();
2156
2157            let ctx = processing::Context {
2158                config: &self.inner.config,
2159                global_config: &global_config,
2160                project_info: &message.project_info,
2161                sampling_project_info: message.sampling_project_info.as_deref(),
2162                rate_limits: &message.rate_limits,
2163                reservoir_counters: &message.reservoir_counters,
2164            };
2165
2166            let message = ProcessEnvelopeGrouped {
2167                group,
2168                envelope,
2169                ctx,
2170            };
2171
2172            let result = metric!(
2173                timer(RelayTimers::EnvelopeProcessingTime),
2174                group = group.variant(),
2175                { self.process(&mut cogs, message).await }
2176            );
2177
2178            match result {
2179                Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2180                Ok(None) => {}
2181                Err(error) if error.is_unexpected() => {
2182                    relay_log::error!(
2183                        tags.project_key = %project_key,
2184                        error = &error as &dyn Error,
2185                        "error processing envelope"
2186                    )
2187                }
2188                Err(error) => {
2189                    relay_log::debug!(
2190                        tags.project_key = %project_key,
2191                        error = &error as &dyn Error,
2192                        "error processing envelope"
2193                    )
2194                }
2195            }
2196        }
2197    }
2198
2199    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2200        let ProcessMetrics {
2201            data,
2202            project_key,
2203            received_at,
2204            sent_at,
2205            source,
2206        } = message;
2207
2208        let received_timestamp =
2209            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2210
2211        let mut buckets = data.into_buckets(received_timestamp);
2212        if buckets.is_empty() {
2213            return;
2214        };
2215        cogs.update(relay_metrics::cogs::BySize(&buckets));
2216
2217        let clock_drift_processor =
2218            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2219
2220        buckets.retain_mut(|bucket| {
2221            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2222                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2223                return false;
2224            }
2225
2226            if !self::metrics::is_valid_namespace(bucket) {
2227                return false;
2228            }
2229
2230            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2231
2232            if !matches!(source, BucketSource::Internal) {
2233                bucket.metadata = BucketMetadata::new(received_timestamp);
2234            }
2235
2236            true
2237        });
2238
2239        let project = self.inner.project_cache.get(project_key);
2240
2241        // Best effort check to filter and rate limit buckets, if there is no project state
2242        // available at the current time, we will check again after flushing.
2243        let buckets = match project.state() {
2244            ProjectState::Enabled(project_info) => {
2245                let rate_limits = project.rate_limits().current_limits();
2246                self.check_buckets(project_key, project_info, &rate_limits, buckets)
2247            }
2248            _ => buckets,
2249        };
2250
2251        relay_log::trace!("merging metric buckets into the aggregator");
2252        self.inner
2253            .addrs
2254            .aggregator
2255            .send(MergeBuckets::new(project_key, buckets));
2256    }
2257
2258    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2259        let ProcessBatchedMetrics {
2260            payload,
2261            source,
2262            received_at,
2263            sent_at,
2264        } = message;
2265
2266        #[derive(serde::Deserialize)]
2267        struct Wrapper {
2268            buckets: HashMap<ProjectKey, Vec<Bucket>>,
2269        }
2270
2271        let buckets = match serde_json::from_slice(&payload) {
2272            Ok(Wrapper { buckets }) => buckets,
2273            Err(error) => {
2274                relay_log::debug!(
2275                    error = &error as &dyn Error,
2276                    "failed to parse batched metrics",
2277                );
2278                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2279                return;
2280            }
2281        };
2282
2283        for (project_key, buckets) in buckets {
2284            self.handle_process_metrics(
2285                cogs,
2286                ProcessMetrics {
2287                    data: MetricData::Parsed(buckets),
2288                    project_key,
2289                    source,
2290                    received_at,
2291                    sent_at,
2292                },
2293            )
2294        }
2295    }
2296
2297    fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2298        let _submit = cogs.start_category("submit");
2299
2300        #[cfg(feature = "processing")]
2301        if self.inner.config.processing_enabled()
2302            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2303        {
2304            match submit {
2305                Submit::Envelope(envelope) => {
2306                    let envelope_has_attachments = envelope
2307                        .envelope()
2308                        .items()
2309                        .any(|item| *item.ty() == ItemType::Attachment);
2310                    // Whether Relay will store this attachment in objectstore or use kafka like before.
2311                    let use_objectstore = || {
2312                        let options = &self.inner.global_config.current().options;
2313                        utils::sample(options.objectstore_attachments_sample_rate).is_keep()
2314                    };
2315
2316                    if let Some(upload) = &self.inner.addrs.upload
2317                        && envelope_has_attachments
2318                        && use_objectstore()
2319                    {
2320                        // the `UploadService` will upload all attachments, and then forward the envelope to the `StoreService`.
2321                        upload.send(UploadAttachments { envelope })
2322                    } else {
2323                        store_forwarder.send(StoreEnvelope { envelope })
2324                    }
2325                }
2326                Submit::Output { output, ctx } => output
2327                    .forward_store(store_forwarder, ctx)
2328                    .unwrap_or_else(|err| err.into_inner()),
2329            }
2330            return;
2331        }
2332
2333        let mut envelope = match submit {
2334            Submit::Envelope(envelope) => envelope,
2335            Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2336                Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2337                Err(_) => {
2338                    relay_log::error!("failed to serialize output to an envelope");
2339                    return;
2340                }
2341            },
2342        };
2343
2344        if envelope.envelope_mut().is_empty() {
2345            envelope.accept();
2346            return;
2347        }
2348
2349        // Override the `sent_at` timestamp. Since the envelope went through basic
2350        // normalization, all timestamps have been corrected. We propagate the new
2351        // `sent_at` to allow the next Relay to double-check this timestamp and
2352        // potentially apply correction again. This is done as close to sending as
2353        // possible so that we avoid internal delays.
2354        envelope.envelope_mut().set_sent_at(Utc::now());
2355
2356        relay_log::trace!("sending envelope to sentry endpoint");
2357        let http_encoding = self.inner.config.http_encoding();
2358        let result = envelope.envelope().to_vec().and_then(|v| {
2359            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2360        });
2361
2362        match result {
2363            Ok(body) => {
2364                self.inner
2365                    .addrs
2366                    .upstream_relay
2367                    .send(SendRequest(SendEnvelope {
2368                        envelope,
2369                        body,
2370                        http_encoding,
2371                        project_cache: self.inner.project_cache.clone(),
2372                    }));
2373            }
2374            Err(error) => {
2375                // Errors are only logged for what we consider an internal discard reason. These
2376                // indicate errors in the infrastructure or implementation bugs.
2377                relay_log::error!(
2378                    error = &error as &dyn Error,
2379                    tags.project_key = %envelope.scoping().project_key,
2380                    "failed to serialize envelope payload"
2381                );
2382
2383                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2384            }
2385        }
2386    }
2387
2388    fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2389        let SubmitClientReports {
2390            client_reports,
2391            scoping,
2392        } = message;
2393
2394        let upstream = self.inner.config.upstream_descriptor();
2395        let dsn = PartialDsn::outbound(&scoping, upstream);
2396
2397        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2398        for client_report in client_reports {
2399            match client_report.serialize() {
2400                Ok(payload) => {
2401                    let mut item = Item::new(ItemType::ClientReport);
2402                    item.set_payload(ContentType::Json, payload);
2403                    envelope.add_item(item);
2404                }
2405                Err(error) => {
2406                    relay_log::error!(
2407                        error = &error as &dyn std::error::Error,
2408                        "failed to serialize client report"
2409                    );
2410                }
2411            }
2412        }
2413
2414        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2415        self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2416    }
2417
2418    fn check_buckets(
2419        &self,
2420        project_key: ProjectKey,
2421        project_info: &ProjectInfo,
2422        rate_limits: &RateLimits,
2423        buckets: Vec<Bucket>,
2424    ) -> Vec<Bucket> {
2425        let Some(scoping) = project_info.scoping(project_key) else {
2426            relay_log::error!(
2427                tags.project_key = project_key.as_str(),
2428                "there is no scoping: dropping {} buckets",
2429                buckets.len(),
2430            );
2431            return Vec::new();
2432        };
2433
2434        let mut buckets = self::metrics::apply_project_info(
2435            buckets,
2436            &self.inner.metric_outcomes,
2437            project_info,
2438            scoping,
2439        );
2440
2441        let namespaces: BTreeSet<MetricNamespace> = buckets
2442            .iter()
2443            .filter_map(|bucket| bucket.name.try_namespace())
2444            .collect();
2445
2446        for namespace in namespaces {
2447            let limits = rate_limits.check_with_quotas(
2448                project_info.get_quotas(),
2449                scoping.item(DataCategory::MetricBucket),
2450            );
2451
2452            if limits.is_limited() {
2453                let rejected;
2454                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2455                    bucket.name.try_namespace() == Some(namespace)
2456                });
2457
2458                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2459                self.inner.metric_outcomes.track(
2460                    scoping,
2461                    &rejected,
2462                    Outcome::RateLimited(reason_code),
2463                );
2464            }
2465        }
2466
2467        let quotas = project_info.config.quotas.clone();
2468        match MetricsLimiter::create(buckets, quotas, scoping) {
2469            Ok(mut bucket_limiter) => {
2470                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2471                bucket_limiter.into_buckets()
2472            }
2473            Err(buckets) => buckets,
2474        }
2475    }
2476
2477    #[cfg(feature = "processing")]
2478    async fn rate_limit_buckets(
2479        &self,
2480        scoping: Scoping,
2481        project_info: &ProjectInfo,
2482        mut buckets: Vec<Bucket>,
2483    ) -> Vec<Bucket> {
2484        let Some(rate_limiter) = &self.inner.rate_limiter else {
2485            return buckets;
2486        };
2487
2488        let global_config = self.inner.global_config.current();
2489        let namespaces = buckets
2490            .iter()
2491            .filter_map(|bucket| bucket.name.try_namespace())
2492            .counts();
2493
2494        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2495
2496        for (namespace, quantity) in namespaces {
2497            let item_scoping = scoping.metric_bucket(namespace);
2498
2499            let limits = match rate_limiter
2500                .is_rate_limited(quotas, item_scoping, quantity, false)
2501                .await
2502            {
2503                Ok(limits) => limits,
2504                Err(err) => {
2505                    relay_log::error!(
2506                        error = &err as &dyn std::error::Error,
2507                        "failed to check redis rate limits"
2508                    );
2509                    break;
2510                }
2511            };
2512
2513            if limits.is_limited() {
2514                let rejected;
2515                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2516                    bucket.name.try_namespace() == Some(namespace)
2517                });
2518
2519                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2520                self.inner.metric_outcomes.track(
2521                    scoping,
2522                    &rejected,
2523                    Outcome::RateLimited(reason_code),
2524                );
2525
2526                self.inner
2527                    .project_cache
2528                    .get(item_scoping.scoping.project_key)
2529                    .rate_limits()
2530                    .merge(limits);
2531            }
2532        }
2533
2534        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2535            Err(buckets) => buckets,
2536            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2537        }
2538    }
2539
2540    /// Check and apply rate limits to metrics buckets for transactions and spans.
2541    #[cfg(feature = "processing")]
2542    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2543        relay_log::trace!("handle_rate_limit_buckets");
2544
2545        let scoping = *bucket_limiter.scoping();
2546
2547        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2548            let global_config = self.inner.global_config.current();
2549            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2550
2551            // We set over_accept_once such that the limit is actually reached, which allows subsequent
2552            // calls with quantity=0 to be rate limited.
2553            let over_accept_once = true;
2554            let mut rate_limits = RateLimits::new();
2555
2556            for category in [DataCategory::Transaction, DataCategory::Span] {
2557                let count = bucket_limiter.count(category);
2558
2559                let timer = Instant::now();
2560                let mut is_limited = false;
2561
2562                if let Some(count) = count {
2563                    match rate_limiter
2564                        .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2565                        .await
2566                    {
2567                        Ok(limits) => {
2568                            is_limited = limits.is_limited();
2569                            rate_limits.merge(limits)
2570                        }
2571                        Err(e) => relay_log::error!(error = &e as &dyn Error),
2572                    }
2573                }
2574
2575                relay_statsd::metric!(
2576                    timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2577                    category = category.name(),
2578                    limited = if is_limited { "true" } else { "false" },
2579                    count = match count {
2580                        None => "none",
2581                        Some(0) => "0",
2582                        Some(1) => "1",
2583                        Some(1..=10) => "10",
2584                        Some(1..=25) => "25",
2585                        Some(1..=50) => "50",
2586                        Some(51..=100) => "100",
2587                        Some(101..=500) => "500",
2588                        _ => "> 500",
2589                    },
2590                );
2591            }
2592
2593            if rate_limits.is_limited() {
2594                let was_enforced =
2595                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2596
2597                if was_enforced {
2598                    // Update the rate limits in the project cache.
2599                    self.inner
2600                        .project_cache
2601                        .get(scoping.project_key)
2602                        .rate_limits()
2603                        .merge(rate_limits);
2604                }
2605            }
2606        }
2607
2608        bucket_limiter.into_buckets()
2609    }
2610
2611    /// Cardinality limits the passed buckets and returns a filtered vector of only accepted buckets.
2612    #[cfg(feature = "processing")]
2613    async fn cardinality_limit_buckets(
2614        &self,
2615        scoping: Scoping,
2616        limits: &[CardinalityLimit],
2617        buckets: Vec<Bucket>,
2618    ) -> Vec<Bucket> {
2619        let global_config = self.inner.global_config.current();
2620        let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2621
2622        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2623            return buckets;
2624        }
2625
2626        let Some(ref limiter) = self.inner.cardinality_limiter else {
2627            return buckets;
2628        };
2629
2630        let scope = relay_cardinality::Scoping {
2631            organization_id: scoping.organization_id,
2632            project_id: scoping.project_id,
2633        };
2634
2635        let limits = match limiter
2636            .check_cardinality_limits(scope, limits, buckets)
2637            .await
2638        {
2639            Ok(limits) => limits,
2640            Err((buckets, error)) => {
2641                relay_log::error!(
2642                    error = &error as &dyn std::error::Error,
2643                    "cardinality limiter failed"
2644                );
2645                return buckets;
2646            }
2647        };
2648
2649        let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2650        if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2651            for limit in limits.exceeded_limits() {
2652                relay_log::with_scope(
2653                    |scope| {
2654                        // Set the organization as user so we can alert on distinct org_ids.
2655                        scope.set_user(Some(relay_log::sentry::User {
2656                            id: Some(scoping.organization_id.to_string()),
2657                            ..Default::default()
2658                        }));
2659                    },
2660                    || {
2661                        relay_log::error!(
2662                            tags.organization_id = scoping.organization_id.value(),
2663                            tags.limit_id = limit.id,
2664                            tags.passive = limit.passive,
2665                            "Cardinality Limit"
2666                        );
2667                    },
2668                );
2669            }
2670        }
2671
2672        for (limit, reports) in limits.cardinality_reports() {
2673            for report in reports {
2674                self.inner
2675                    .metric_outcomes
2676                    .cardinality(scoping, limit, report);
2677            }
2678        }
2679
2680        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2681            return limits.into_source();
2682        }
2683
2684        let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2685
2686        for (bucket, exceeded) in rejected {
2687            self.inner.metric_outcomes.track(
2688                scoping,
2689                &[bucket],
2690                Outcome::CardinalityLimited(exceeded.id.clone()),
2691            );
2692        }
2693        accepted
2694    }
2695
2696    /// Processes metric buckets and sends them to kafka.
2697    ///
2698    /// This function runs the following steps:
2699    ///  - cardinality limiting
2700    ///  - rate limiting
2701    ///  - submit to `StoreForwarder`
2702    #[cfg(feature = "processing")]
2703    async fn encode_metrics_processing(
2704        &self,
2705        message: FlushBuckets,
2706        store_forwarder: &Addr<Store>,
2707    ) {
2708        use crate::constants::DEFAULT_EVENT_RETENTION;
2709        use crate::services::store::StoreMetrics;
2710
2711        for ProjectBuckets {
2712            buckets,
2713            scoping,
2714            project_info,
2715            ..
2716        } in message.buckets.into_values()
2717        {
2718            let buckets = self
2719                .rate_limit_buckets(scoping, &project_info, buckets)
2720                .await;
2721
2722            let limits = project_info.get_cardinality_limits();
2723            let buckets = self
2724                .cardinality_limit_buckets(scoping, limits, buckets)
2725                .await;
2726
2727            if buckets.is_empty() {
2728                continue;
2729            }
2730
2731            let retention = project_info
2732                .config
2733                .event_retention
2734                .unwrap_or(DEFAULT_EVENT_RETENTION);
2735
2736            // The store forwarder takes care of bucket splitting internally, so we can submit the
2737            // entire list of buckets. There is no batching needed here.
2738            store_forwarder.send(StoreMetrics {
2739                buckets,
2740                scoping,
2741                retention,
2742            });
2743        }
2744    }
2745
2746    /// Serializes metric buckets to JSON and sends them to the upstream.
2747    ///
2748    /// This function runs the following steps:
2749    ///  - partitioning
2750    ///  - batching by configured size limit
2751    ///  - serialize to JSON and pack in an envelope
2752    ///  - submit the envelope to upstream or kafka depending on configuration
2753    ///
2754    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2755    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2756    /// already.
2757    fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2758        let FlushBuckets {
2759            partition_key,
2760            buckets,
2761        } = message;
2762
2763        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2764        let upstream = self.inner.config.upstream_descriptor();
2765
2766        for ProjectBuckets {
2767            buckets, scoping, ..
2768        } in buckets.values()
2769        {
2770            let dsn = PartialDsn::outbound(scoping, upstream);
2771
2772            relay_statsd::metric!(
2773                distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2774            );
2775
2776            let mut num_batches = 0;
2777            for batch in BucketsView::from(buckets).by_size(batch_size) {
2778                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2779
2780                let mut item = Item::new(ItemType::MetricBuckets);
2781                item.set_source_quantities(crate::metrics::extract_quantities(batch));
2782                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2783                envelope.add_item(item);
2784
2785                let mut envelope =
2786                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2787                envelope
2788                    .set_partition_key(Some(partition_key))
2789                    .scope(*scoping);
2790
2791                relay_statsd::metric!(
2792                    distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2793                );
2794
2795                self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2796                num_batches += 1;
2797            }
2798
2799            relay_statsd::metric!(
2800                distribution(RelayDistributions::BatchesPerPartition) = num_batches
2801            );
2802        }
2803    }
2804
2805    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
2806    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2807        if partition.is_empty() {
2808            return;
2809        }
2810
2811        let (unencoded, project_info) = partition.take();
2812        let http_encoding = self.inner.config.http_encoding();
2813        let encoded = match encode_payload(&unencoded, http_encoding) {
2814            Ok(payload) => payload,
2815            Err(error) => {
2816                let error = &error as &dyn std::error::Error;
2817                relay_log::error!(error, "failed to encode metrics payload");
2818                return;
2819            }
2820        };
2821
2822        let request = SendMetricsRequest {
2823            partition_key: partition_key.to_string(),
2824            unencoded,
2825            encoded,
2826            project_info,
2827            http_encoding,
2828            metric_outcomes: self.inner.metric_outcomes.clone(),
2829        };
2830
2831        self.inner.addrs.upstream_relay.send(SendRequest(request));
2832    }
2833
2834    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
2835    ///
2836    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
2837    /// payload directly instead of per-project Envelopes.
2838    ///
2839    /// This function runs the following steps:
2840    ///  - partitioning
2841    ///  - batching by configured size limit
2842    ///  - serialize to JSON
2843    ///  - submit directly to the upstream
2844    ///
2845    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2846    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2847    /// already.
2848    fn encode_metrics_global(&self, message: FlushBuckets) {
2849        let FlushBuckets {
2850            partition_key,
2851            buckets,
2852        } = message;
2853
2854        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2855        let mut partition = Partition::new(batch_size);
2856        let mut partition_splits = 0;
2857
2858        for ProjectBuckets {
2859            buckets, scoping, ..
2860        } in buckets.values()
2861        {
2862            for bucket in buckets {
2863                let mut remaining = Some(BucketView::new(bucket));
2864
2865                while let Some(bucket) = remaining.take() {
2866                    if let Some(next) = partition.insert(bucket, *scoping) {
2867                        // A part of the bucket could not be inserted. Take the partition and submit
2868                        // it immediately. Repeat until the final part was inserted. This should
2869                        // always result in a request, otherwise we would enter an endless loop.
2870                        self.send_global_partition(partition_key, &mut partition);
2871                        remaining = Some(next);
2872                        partition_splits += 1;
2873                    }
2874                }
2875            }
2876        }
2877
2878        if partition_splits > 0 {
2879            metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2880        }
2881
2882        self.send_global_partition(partition_key, &mut partition);
2883    }
2884
2885    async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2886        for (project_key, pb) in message.buckets.iter_mut() {
2887            let buckets = std::mem::take(&mut pb.buckets);
2888            pb.buckets =
2889                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2890        }
2891
2892        #[cfg(feature = "processing")]
2893        if self.inner.config.processing_enabled()
2894            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2895        {
2896            return self
2897                .encode_metrics_processing(message, store_forwarder)
2898                .await;
2899        }
2900
2901        if self.inner.config.http_global_metrics() {
2902            self.encode_metrics_global(message)
2903        } else {
2904            self.encode_metrics_envelope(cogs, message)
2905        }
2906    }
2907
2908    #[cfg(all(test, feature = "processing"))]
2909    fn redis_rate_limiter_enabled(&self) -> bool {
2910        self.inner.rate_limiter.is_some()
2911    }
2912
2913    async fn handle_message(&self, message: EnvelopeProcessor) {
2914        let ty = message.variant();
2915        let feature_weights = self.feature_weights(&message);
2916
2917        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2918            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2919
2920            match message {
2921                EnvelopeProcessor::ProcessEnvelope(m) => {
2922                    self.handle_process_envelope(&mut cogs, *m).await
2923                }
2924                EnvelopeProcessor::ProcessProjectMetrics(m) => {
2925                    self.handle_process_metrics(&mut cogs, *m)
2926                }
2927                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2928                    self.handle_process_batched_metrics(&mut cogs, *m)
2929                }
2930                EnvelopeProcessor::FlushBuckets(m) => {
2931                    self.handle_flush_buckets(&mut cogs, *m).await
2932                }
2933                EnvelopeProcessor::SubmitClientReports(m) => {
2934                    self.handle_submit_client_reports(&mut cogs, *m)
2935                }
2936            }
2937        });
2938    }
2939
2940    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2941        match message {
2942            // Envelope is split later and tokens are attributed then.
2943            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2944            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2945            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2946            EnvelopeProcessor::FlushBuckets(v) => v
2947                .buckets
2948                .values()
2949                .map(|s| {
2950                    if self.inner.config.processing_enabled() {
2951                        // Processing does not encode the metrics but instead rate and cardinality
2952                        // limits the metrics, which scales by count and not size.
2953                        relay_metrics::cogs::ByCount(&s.buckets).into()
2954                    } else {
2955                        relay_metrics::cogs::BySize(&s.buckets).into()
2956                    }
2957                })
2958                .fold(FeatureWeights::none(), FeatureWeights::merge),
2959            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2960        }
2961    }
2962}
2963
2964impl Service for EnvelopeProcessorService {
2965    type Interface = EnvelopeProcessor;
2966
2967    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2968        while let Some(message) = rx.recv().await {
2969            let service = self.clone();
2970            self.inner
2971                .pool
2972                .spawn_async(
2973                    async move {
2974                        service.handle_message(message).await;
2975                    }
2976                    .boxed(),
2977                )
2978                .await;
2979        }
2980    }
2981}
2982
2983/// Result of the enforcement of rate limiting.
2984///
2985/// If the event is already `None` or it's rate limited, it will be `None`
2986/// within the [`Annotated`].
2987struct EnforcementResult {
2988    event: Annotated<Event>,
2989    #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2990    rate_limits: RateLimits,
2991}
2992
2993impl EnforcementResult {
2994    /// Creates a new [`EnforcementResult`].
2995    pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
2996        Self { event, rate_limits }
2997    }
2998}
2999
3000#[derive(Clone)]
3001enum RateLimiter {
3002    Cached,
3003    #[cfg(feature = "processing")]
3004    Consistent(Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>),
3005}
3006
3007impl RateLimiter {
3008    async fn enforce<Group>(
3009        &self,
3010        managed_envelope: &mut TypedEnvelope<Group>,
3011        event: Annotated<Event>,
3012        _extracted_metrics: &mut ProcessingExtractedMetrics,
3013        ctx: processing::Context<'_>,
3014    ) -> Result<EnforcementResult, ProcessingError> {
3015        if managed_envelope.envelope().is_empty() && event.value().is_none() {
3016            return Ok(EnforcementResult::new(event, RateLimits::default()));
3017        }
3018
3019        let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
3020        if quotas.is_empty() {
3021            return Ok(EnforcementResult::new(event, RateLimits::default()));
3022        }
3023
3024        let event_category = event_category(&event);
3025
3026        // We extract the rate limiters, in case we perform consistent rate limiting, since we will
3027        // need Redis access.
3028        //
3029        // When invoking the rate limiter, capture if the event item has been rate limited to also
3030        // remove it from the processing state eventually.
3031        let this = self.clone();
3032        let mut envelope_limiter =
3033            EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
3034                let this = this.clone();
3035
3036                async move {
3037                    match this {
3038                        #[cfg(feature = "processing")]
3039                        RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
3040                            rate_limiter
3041                                .is_rate_limited(quotas, item_scope, _quantity, false)
3042                                .await?,
3043                        ),
3044                        _ => Ok::<_, ProcessingError>(
3045                            ctx.rate_limits.check_with_quotas(quotas, item_scope),
3046                        ),
3047                    }
3048                }
3049            });
3050
3051        // Tell the envelope limiter about the event, since it has been removed from the Envelope at
3052        // this stage in processing.
3053        if let Some(category) = event_category {
3054            envelope_limiter.assume_event(category);
3055        }
3056
3057        let scoping = managed_envelope.scoping();
3058        let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
3059            envelope_limiter
3060                .compute(managed_envelope.envelope_mut(), &scoping)
3061                .await
3062        })?;
3063        let event_active = enforcement.is_event_active();
3064
3065        // Use the same rate limits as used for the envelope on the metrics.
3066        // Those rate limits should not be checked for expiry or similar to ensure a consistent
3067        // limiting of envelope items and metrics.
3068        #[cfg(feature = "processing")]
3069        _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3070        enforcement.apply_with_outcomes(managed_envelope);
3071
3072        if event_active {
3073            debug_assert!(managed_envelope.envelope().is_empty());
3074            return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3075        }
3076
3077        Ok(EnforcementResult::new(event, rate_limits))
3078    }
3079
3080    fn name(&self) -> &'static str {
3081        match self {
3082            Self::Cached => "cached",
3083            #[cfg(feature = "processing")]
3084            Self::Consistent(_) => "consistent",
3085        }
3086    }
3087}
3088
3089pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3090    let envelope_body: Vec<u8> = match http_encoding {
3091        HttpEncoding::Identity => return Ok(body.clone()),
3092        HttpEncoding::Deflate => {
3093            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3094            encoder.write_all(body.as_ref())?;
3095            encoder.finish()?
3096        }
3097        HttpEncoding::Gzip => {
3098            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3099            encoder.write_all(body.as_ref())?;
3100            encoder.finish()?
3101        }
3102        HttpEncoding::Br => {
3103            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
3104            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3105            encoder.write_all(body.as_ref())?;
3106            encoder.into_inner()
3107        }
3108        HttpEncoding::Zstd => {
3109            // Use the fastest compression level, our main objective here is to get the best
3110            // compression ratio for least amount of time spent.
3111            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3112            encoder.write_all(body.as_ref())?;
3113            encoder.finish()?
3114        }
3115    };
3116
3117    Ok(envelope_body.into())
3118}
3119
3120/// An upstream request that submits an envelope via HTTP.
3121#[derive(Debug)]
3122pub struct SendEnvelope {
3123    pub envelope: TypedEnvelope<Processed>,
3124    pub body: Bytes,
3125    pub http_encoding: HttpEncoding,
3126    pub project_cache: ProjectCacheHandle,
3127}
3128
3129impl UpstreamRequest for SendEnvelope {
3130    fn method(&self) -> reqwest::Method {
3131        reqwest::Method::POST
3132    }
3133
3134    fn path(&self) -> Cow<'_, str> {
3135        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3136    }
3137
3138    fn route(&self) -> &'static str {
3139        "envelope"
3140    }
3141
3142    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3143        let envelope_body = self.body.clone();
3144        metric!(
3145            distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
3146        );
3147
3148        let meta = &self.envelope.meta();
3149        let shard = self.envelope.partition_key().map(|p| p.to_string());
3150        builder
3151            .content_encoding(self.http_encoding)
3152            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3153            .header_opt("User-Agent", meta.user_agent())
3154            .header("X-Sentry-Auth", meta.auth_header())
3155            .header("X-Forwarded-For", meta.forwarded_for())
3156            .header("Content-Type", envelope::CONTENT_TYPE)
3157            .header_opt("X-Sentry-Relay-Shard", shard)
3158            .body(envelope_body);
3159
3160        Ok(())
3161    }
3162
3163    fn sign(&mut self) -> Option<Sign> {
3164        Some(Sign::Optional(SignatureType::RequestSign))
3165    }
3166
3167    fn respond(
3168        self: Box<Self>,
3169        result: Result<http::Response, UpstreamRequestError>,
3170    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3171        Box::pin(async move {
3172            let result = match result {
3173                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3174                Err(error) => Err(error),
3175            };
3176
3177            match result {
3178                Ok(()) => self.envelope.accept(),
3179                Err(error) if error.is_received() => {
3180                    let scoping = self.envelope.scoping();
3181                    self.envelope.accept();
3182
3183                    if let UpstreamRequestError::RateLimited(limits) = error {
3184                        self.project_cache
3185                            .get(scoping.project_key)
3186                            .rate_limits()
3187                            .merge(limits.scope(&scoping));
3188                    }
3189                }
3190                Err(error) => {
3191                    // Errors are only logged for what we consider an internal discard reason. These
3192                    // indicate errors in the infrastructure or implementation bugs.
3193                    let mut envelope = self.envelope;
3194                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3195                    relay_log::error!(
3196                        error = &error as &dyn Error,
3197                        tags.project_key = %envelope.scoping().project_key,
3198                        "error sending envelope"
3199                    );
3200                }
3201            }
3202        })
3203    }
3204}
3205
3206/// A container for metric buckets from multiple projects.
3207///
3208/// This container is used to send metrics to the upstream in global batches as part of the
3209/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
3210/// the size of all metrics and allows to split them into multiple batches. See
3211/// [`insert`](Self::insert) for more information.
3212#[derive(Debug)]
3213struct Partition<'a> {
3214    max_size: usize,
3215    remaining: usize,
3216    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3217    project_info: HashMap<ProjectKey, Scoping>,
3218}
3219
3220impl<'a> Partition<'a> {
3221    /// Creates a new partition with the given maximum size in bytes.
3222    pub fn new(size: usize) -> Self {
3223        Self {
3224            max_size: size,
3225            remaining: size,
3226            views: HashMap::new(),
3227            project_info: HashMap::new(),
3228        }
3229    }
3230
3231    /// Inserts a bucket into the partition, splitting it if necessary.
3232    ///
3233    /// This function attempts to add the bucket to this partition. If the bucket does not fit
3234    /// entirely into the partition given its maximum size, the remaining part of the bucket is
3235    /// returned from this function call.
3236    ///
3237    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
3238    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
3239    /// partition. Afterwards, the caller is responsible to call this function again with the
3240    /// remaining bucket until it is fully inserted.
3241    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3242        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3243
3244        if let Some(current) = current {
3245            self.remaining = self.remaining.saturating_sub(current.estimated_size());
3246            self.views
3247                .entry(scoping.project_key)
3248                .or_default()
3249                .push(current);
3250
3251            self.project_info
3252                .entry(scoping.project_key)
3253                .or_insert(scoping);
3254        }
3255
3256        next
3257    }
3258
3259    /// Returns `true` if the partition does not hold any data.
3260    fn is_empty(&self) -> bool {
3261        self.views.is_empty()
3262    }
3263
3264    /// Returns the serialized buckets for this partition.
3265    ///
3266    /// This empties the partition, so that it can be reused.
3267    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3268        #[derive(serde::Serialize)]
3269        struct Wrapper<'a> {
3270            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3271        }
3272
3273        let buckets = &self.views;
3274        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3275
3276        let scopings = self.project_info.clone();
3277        self.project_info.clear();
3278
3279        self.views.clear();
3280        self.remaining = self.max_size;
3281
3282        (payload, scopings)
3283    }
3284}
3285
3286/// An upstream request that submits metric buckets via HTTP.
3287///
3288/// This request is not awaited. It automatically tracks outcomes if the request is not received.
3289#[derive(Debug)]
3290struct SendMetricsRequest {
3291    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
3292    partition_key: String,
3293    /// Serialized metric buckets without encoding applied, used for signing.
3294    unencoded: Bytes,
3295    /// Serialized metric buckets with the stated HTTP encoding applied.
3296    encoded: Bytes,
3297    /// Mapping of all contained project keys to their scoping and extraction mode.
3298    ///
3299    /// Used to track outcomes for transmission failures.
3300    project_info: HashMap<ProjectKey, Scoping>,
3301    /// Encoding (compression) of the payload.
3302    http_encoding: HttpEncoding,
3303    /// Metric outcomes instance to send outcomes on error.
3304    metric_outcomes: MetricOutcomes,
3305}
3306
3307impl SendMetricsRequest {
3308    fn create_error_outcomes(self) {
3309        #[derive(serde::Deserialize)]
3310        struct Wrapper {
3311            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3312        }
3313
3314        let buckets = match serde_json::from_slice(&self.unencoded) {
3315            Ok(Wrapper { buckets }) => buckets,
3316            Err(err) => {
3317                relay_log::error!(
3318                    error = &err as &dyn std::error::Error,
3319                    "failed to parse buckets from failed transmission"
3320                );
3321                return;
3322            }
3323        };
3324
3325        for (key, buckets) in buckets {
3326            let Some(&scoping) = self.project_info.get(&key) else {
3327                relay_log::error!("missing scoping for project key");
3328                continue;
3329            };
3330
3331            self.metric_outcomes.track(
3332                scoping,
3333                &buckets,
3334                Outcome::Invalid(DiscardReason::Internal),
3335            );
3336        }
3337    }
3338}
3339
3340impl UpstreamRequest for SendMetricsRequest {
3341    fn set_relay_id(&self) -> bool {
3342        true
3343    }
3344
3345    fn sign(&mut self) -> Option<Sign> {
3346        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3347    }
3348
3349    fn method(&self) -> reqwest::Method {
3350        reqwest::Method::POST
3351    }
3352
3353    fn path(&self) -> Cow<'_, str> {
3354        "/api/0/relays/metrics/".into()
3355    }
3356
3357    fn route(&self) -> &'static str {
3358        "global_metrics"
3359    }
3360
3361    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3362        metric!(
3363            distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3364        );
3365
3366        builder
3367            .content_encoding(self.http_encoding)
3368            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3369            .header(header::CONTENT_TYPE, b"application/json")
3370            .body(self.encoded.clone());
3371
3372        Ok(())
3373    }
3374
3375    fn respond(
3376        self: Box<Self>,
3377        result: Result<http::Response, UpstreamRequestError>,
3378    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3379        Box::pin(async {
3380            match result {
3381                Ok(mut response) => {
3382                    response.consume().await.ok();
3383                }
3384                Err(error) => {
3385                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3386
3387                    // If the request did not arrive at the upstream, we are responsible for outcomes.
3388                    // Otherwise, the upstream is responsible to log outcomes.
3389                    if error.is_received() {
3390                        return;
3391                    }
3392
3393                    self.create_error_outcomes()
3394                }
3395            }
3396        })
3397    }
3398}
3399
3400/// Container for global and project level [`Quota`].
3401#[derive(Copy, Clone, Debug)]
3402struct CombinedQuotas<'a> {
3403    global_quotas: &'a [Quota],
3404    project_quotas: &'a [Quota],
3405}
3406
3407impl<'a> CombinedQuotas<'a> {
3408    /// Returns a new [`CombinedQuotas`].
3409    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3410        Self {
3411            global_quotas: &global_config.quotas,
3412            project_quotas,
3413        }
3414    }
3415
3416    /// Returns `true` if both global quotas and project quotas are empty.
3417    pub fn is_empty(&self) -> bool {
3418        self.len() == 0
3419    }
3420
3421    /// Returns the number of both global and project quotas.
3422    pub fn len(&self) -> usize {
3423        self.global_quotas.len() + self.project_quotas.len()
3424    }
3425}
3426
3427impl<'a> IntoIterator for CombinedQuotas<'a> {
3428    type Item = &'a Quota;
3429    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3430
3431    fn into_iter(self) -> Self::IntoIter {
3432        self.global_quotas.iter().chain(self.project_quotas.iter())
3433    }
3434}
3435
3436#[cfg(test)]
3437mod tests {
3438    use std::collections::BTreeMap;
3439
3440    use insta::assert_debug_snapshot;
3441    use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3442    use relay_common::glob2::LazyGlob;
3443    use relay_dynamic_config::ProjectConfig;
3444    use relay_event_normalization::{
3445        MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3446        TransactionNameRule,
3447    };
3448    use relay_event_schema::protocol::TransactionSource;
3449    use relay_pii::DataScrubbingConfig;
3450    use similar_asserts::assert_eq;
3451
3452    use crate::metrics_extraction::IntoMetric;
3453    use crate::metrics_extraction::transactions::types::{
3454        CommonTags, TransactionMeasurementTags, TransactionMetric,
3455    };
3456    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3457
3458    #[cfg(feature = "processing")]
3459    use {
3460        relay_metrics::BucketValue,
3461        relay_quotas::{QuotaScope, ReasonCode},
3462        relay_test::mock_service,
3463    };
3464
3465    use super::*;
3466
3467    #[cfg(feature = "processing")]
3468    fn mock_quota(id: &str) -> Quota {
3469        Quota {
3470            id: Some(id.into()),
3471            categories: [DataCategory::MetricBucket].into(),
3472            scope: QuotaScope::Organization,
3473            scope_id: None,
3474            limit: Some(0),
3475            window: None,
3476            reason_code: None,
3477            namespace: None,
3478        }
3479    }
3480
3481    #[cfg(feature = "processing")]
3482    #[test]
3483    fn test_dynamic_quotas() {
3484        let global_config = GlobalConfig {
3485            quotas: vec![mock_quota("foo"), mock_quota("bar")],
3486            ..Default::default()
3487        };
3488
3489        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3490
3491        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3492
3493        assert_eq!(dynamic_quotas.len(), 4);
3494        assert!(!dynamic_quotas.is_empty());
3495
3496        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3497        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3498    }
3499
3500    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
3501    /// also ratelimit the next batches in the same message automatically.
3502    #[cfg(feature = "processing")]
3503    #[tokio::test]
3504    async fn test_ratelimit_per_batch() {
3505        use relay_base_schema::organization::OrganizationId;
3506        use relay_protocol::FiniteF64;
3507
3508        let rate_limited_org = Scoping {
3509            organization_id: OrganizationId::new(1),
3510            project_id: ProjectId::new(21),
3511            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3512            key_id: Some(17),
3513        };
3514
3515        let not_rate_limited_org = Scoping {
3516            organization_id: OrganizationId::new(2),
3517            project_id: ProjectId::new(21),
3518            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3519            key_id: Some(17),
3520        };
3521
3522        let message = {
3523            let project_info = {
3524                let quota = Quota {
3525                    id: Some("testing".into()),
3526                    categories: [DataCategory::MetricBucket].into(),
3527                    scope: relay_quotas::QuotaScope::Organization,
3528                    scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3529                    limit: Some(0),
3530                    window: None,
3531                    reason_code: Some(ReasonCode::new("test")),
3532                    namespace: None,
3533                };
3534
3535                let mut config = ProjectConfig::default();
3536                config.quotas.push(quota);
3537
3538                Arc::new(ProjectInfo {
3539                    config,
3540                    ..Default::default()
3541                })
3542            };
3543
3544            let project_metrics = |scoping| ProjectBuckets {
3545                buckets: vec![Bucket {
3546                    name: "d:transactions/bar".into(),
3547                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3548                    timestamp: UnixTimestamp::now(),
3549                    tags: Default::default(),
3550                    width: 10,
3551                    metadata: BucketMetadata::default(),
3552                }],
3553                rate_limits: Default::default(),
3554                project_info: project_info.clone(),
3555                scoping,
3556            };
3557
3558            let buckets = hashbrown::HashMap::from([
3559                (
3560                    rate_limited_org.project_key,
3561                    project_metrics(rate_limited_org),
3562                ),
3563                (
3564                    not_rate_limited_org.project_key,
3565                    project_metrics(not_rate_limited_org),
3566                ),
3567            ]);
3568
3569            FlushBuckets {
3570                partition_key: 0,
3571                buckets,
3572            }
3573        };
3574
3575        // ensure the order of the map while iterating is as expected.
3576        assert_eq!(message.buckets.keys().count(), 2);
3577
3578        let config = {
3579            let config_json = serde_json::json!({
3580                "processing": {
3581                    "enabled": true,
3582                    "kafka_config": [],
3583                    "redis": {
3584                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3585                    }
3586                }
3587            });
3588            Config::from_json_value(config_json).unwrap()
3589        };
3590
3591        let (store, handle) = {
3592            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3593                let org_id = match msg {
3594                    Store::Metrics(x) => x.scoping.organization_id,
3595                    _ => panic!("received envelope when expecting only metrics"),
3596                };
3597                org_ids.push(org_id);
3598            };
3599
3600            mock_service("store_forwarder", vec![], f)
3601        };
3602
3603        let processor = create_test_processor(config).await;
3604        assert!(processor.redis_rate_limiter_enabled());
3605
3606        processor.encode_metrics_processing(message, &store).await;
3607
3608        drop(store);
3609        let orgs_not_ratelimited = handle.await.unwrap();
3610
3611        assert_eq!(
3612            orgs_not_ratelimited,
3613            vec![not_rate_limited_org.organization_id]
3614        );
3615    }
3616
3617    #[tokio::test]
3618    async fn test_browser_version_extraction_with_pii_like_data() {
3619        let processor = create_test_processor(Default::default()).await;
3620        let outcome_aggregator = Addr::dummy();
3621        let event_id = EventId::new();
3622
3623        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3624            .parse()
3625            .unwrap();
3626
3627        let request_meta = RequestMeta::new(dsn);
3628        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3629
3630        envelope.add_item({
3631                let mut item = Item::new(ItemType::Event);
3632                item.set_payload(
3633                    ContentType::Json,
3634                    r#"
3635                    {
3636                        "request": {
3637                            "headers": [
3638                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3639                            ]
3640                        }
3641                    }
3642                "#,
3643                );
3644                item
3645            });
3646
3647        let mut datascrubbing_settings = DataScrubbingConfig::default();
3648        // enable all the default scrubbing
3649        datascrubbing_settings.scrub_data = true;
3650        datascrubbing_settings.scrub_defaults = true;
3651        datascrubbing_settings.scrub_ip_addresses = true;
3652
3653        // Make sure to mask any IP-like looking data
3654        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3655
3656        let config = ProjectConfig {
3657            datascrubbing_settings,
3658            pii_config: Some(pii_config),
3659            ..Default::default()
3660        };
3661
3662        let project_info = ProjectInfo {
3663            config,
3664            ..Default::default()
3665        };
3666
3667        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3668        assert_eq!(envelopes.len(), 1);
3669
3670        let (group, envelope) = envelopes.pop().unwrap();
3671        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3672
3673        let message = ProcessEnvelopeGrouped {
3674            group,
3675            envelope,
3676            ctx: processing::Context {
3677                project_info: &project_info,
3678                ..processing::Context::for_test()
3679            },
3680        };
3681
3682        let Ok(Some(Submit::Envelope(mut new_envelope))) =
3683            processor.process(&mut Token::noop(), message).await
3684        else {
3685            panic!();
3686        };
3687        let new_envelope = new_envelope.envelope_mut();
3688
3689        let event_item = new_envelope.items().last().unwrap();
3690        let annotated_event: Annotated<Event> =
3691            Annotated::from_json_bytes(&event_item.payload()).unwrap();
3692        let event = annotated_event.into_value().unwrap();
3693        let headers = event
3694            .request
3695            .into_value()
3696            .unwrap()
3697            .headers
3698            .into_value()
3699            .unwrap();
3700
3701        // IP-like data must be masked
3702        assert_eq!(
3703            Some(
3704                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3705            ),
3706            headers.get_header("User-Agent")
3707        );
3708        // But we still get correct browser and version number
3709        let contexts = event.contexts.into_value().unwrap();
3710        let browser = contexts.0.get("browser").unwrap();
3711        assert_eq!(
3712            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3713            browser.to_json().unwrap()
3714        );
3715    }
3716
3717    #[tokio::test]
3718    #[cfg(feature = "processing")]
3719    async fn test_materialize_dsc() {
3720        use crate::services::projects::project::PublicKeyConfig;
3721
3722        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3723            .parse()
3724            .unwrap();
3725        let request_meta = RequestMeta::new(dsn);
3726        let mut envelope = Envelope::from_request(None, request_meta);
3727
3728        let dsc = r#"{
3729            "trace_id": "00000000-0000-0000-0000-000000000000",
3730            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3731            "sample_rate": "0.2"
3732        }"#;
3733        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3734
3735        let mut item = Item::new(ItemType::Event);
3736        item.set_payload(ContentType::Json, r#"{}"#);
3737        envelope.add_item(item);
3738
3739        let outcome_aggregator = Addr::dummy();
3740        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3741
3742        let mut project_info = ProjectInfo::default();
3743        project_info.public_keys.push(PublicKeyConfig {
3744            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3745            numeric_id: Some(1),
3746        });
3747
3748        let config = serde_json::json!({
3749            "processing": {
3750                "enabled": true,
3751                "kafka_config": [],
3752            }
3753        });
3754
3755        let message = ProcessEnvelopeGrouped {
3756            group: ProcessingGroup::Transaction,
3757            envelope: managed_envelope,
3758            ctx: processing::Context {
3759                config: &Config::from_json_value(config.clone()).unwrap(),
3760                project_info: &project_info,
3761                sampling_project_info: Some(&project_info),
3762                ..processing::Context::for_test()
3763            },
3764        };
3765
3766        let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3767        let Ok(Some(Submit::Envelope(envelope))) =
3768            processor.process(&mut Token::noop(), message).await
3769        else {
3770            panic!();
3771        };
3772        let event = envelope
3773            .envelope()
3774            .get_item_by(|item| item.ty() == &ItemType::Event)
3775            .unwrap();
3776
3777        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3778        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3779        Object(
3780            {
3781                "environment": ~,
3782                "public_key": String(
3783                    "e12d836b15bb49d7bbf99e64295d995b",
3784                ),
3785                "release": ~,
3786                "replay_id": ~,
3787                "sample_rate": String(
3788                    "0.2",
3789                ),
3790                "trace_id": String(
3791                    "00000000000000000000000000000000",
3792                ),
3793                "transaction": ~,
3794            },
3795        )
3796        "###);
3797    }
3798
3799    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3800        let mut event = Annotated::<Event>::from_json(
3801            r#"
3802            {
3803                "type": "transaction",
3804                "transaction": "/foo/",
3805                "timestamp": 946684810.0,
3806                "start_timestamp": 946684800.0,
3807                "contexts": {
3808                    "trace": {
3809                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3810                        "span_id": "fa90fdead5f74053",
3811                        "op": "http.server",
3812                        "type": "trace"
3813                    }
3814                },
3815                "transaction_info": {
3816                    "source": "url"
3817                }
3818            }
3819            "#,
3820        )
3821        .unwrap();
3822        let e = event.value_mut().as_mut().unwrap();
3823        e.transaction.set_value(Some(transaction_name.into()));
3824
3825        e.transaction_info
3826            .value_mut()
3827            .as_mut()
3828            .unwrap()
3829            .source
3830            .set_value(Some(source));
3831
3832        relay_statsd::with_capturing_test_client(|| {
3833            utils::log_transaction_name_metrics(&mut event, |event| {
3834                let config = NormalizationConfig {
3835                    transaction_name_config: TransactionNameConfig {
3836                        rules: &[TransactionNameRule {
3837                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
3838                            expiry: DateTime::<Utc>::MAX_UTC,
3839                            redaction: RedactionRule::Replace {
3840                                substitution: "*".to_owned(),
3841                            },
3842                        }],
3843                    },
3844                    ..Default::default()
3845                };
3846                relay_event_normalization::normalize_event(event, &config)
3847            });
3848        })
3849    }
3850
3851    #[test]
3852    fn test_log_transaction_metrics_none() {
3853        let captures = capture_test_event("/nothing", TransactionSource::Url);
3854        insta::assert_debug_snapshot!(captures, @r###"
3855        [
3856            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3857        ]
3858        "###);
3859    }
3860
3861    #[test]
3862    fn test_log_transaction_metrics_rule() {
3863        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3864        insta::assert_debug_snapshot!(captures, @r###"
3865        [
3866            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3867        ]
3868        "###);
3869    }
3870
3871    #[test]
3872    fn test_log_transaction_metrics_pattern() {
3873        let captures = capture_test_event("/something/12345", TransactionSource::Url);
3874        insta::assert_debug_snapshot!(captures, @r###"
3875        [
3876            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3877        ]
3878        "###);
3879    }
3880
3881    #[test]
3882    fn test_log_transaction_metrics_both() {
3883        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3884        insta::assert_debug_snapshot!(captures, @r###"
3885        [
3886            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3887        ]
3888        "###);
3889    }
3890
3891    #[test]
3892    fn test_log_transaction_metrics_no_match() {
3893        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3894        insta::assert_debug_snapshot!(captures, @r###"
3895        [
3896            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3897        ]
3898        "###);
3899    }
3900
3901    /// Confirms that the hardcoded value we use for the fixed length of the measurement MRI is
3902    /// correct. Unit test is placed here because it has dependencies to relay-server and therefore
3903    /// cannot be called from relay-metrics.
3904    #[test]
3905    fn test_mri_overhead_constant() {
3906        let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
3907
3908        let derived_value = {
3909            let name = "foobar".to_owned();
3910            let value = 5.into(); // Arbitrary value.
3911            let unit = MetricUnit::Duration(DurationUnit::default());
3912            let tags = TransactionMeasurementTags {
3913                measurement_rating: None,
3914                universal_tags: CommonTags(BTreeMap::new()),
3915                score_profile_version: None,
3916            };
3917
3918            let measurement = TransactionMetric::Measurement {
3919                name: name.clone(),
3920                value,
3921                unit,
3922                tags,
3923            };
3924
3925            let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
3926            metric.name.len() - unit.to_string().len() - name.len()
3927        };
3928        assert_eq!(
3929            hardcoded_value, derived_value,
3930            "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
3931        );
3932    }
3933
3934    #[tokio::test]
3935    async fn test_process_metrics_bucket_metadata() {
3936        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3937        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3938        let received_at = Utc::now();
3939        let config = Config::default();
3940
3941        let (aggregator, mut aggregator_rx) = Addr::custom();
3942        let processor = create_test_processor_with_addrs(
3943            config,
3944            Addrs {
3945                aggregator,
3946                ..Default::default()
3947            },
3948        )
3949        .await;
3950
3951        let mut item = Item::new(ItemType::Statsd);
3952        item.set_payload(
3953            ContentType::Text,
3954            "transactions/foo:3182887624:4267882815|s",
3955        );
3956        for (source, expected_received_at) in [
3957            (
3958                BucketSource::External,
3959                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3960            ),
3961            (BucketSource::Internal, None),
3962        ] {
3963            let message = ProcessMetrics {
3964                data: MetricData::Raw(vec![item.clone()]),
3965                project_key,
3966                source,
3967                received_at,
3968                sent_at: Some(Utc::now()),
3969            };
3970            processor.handle_process_metrics(&mut token, message);
3971
3972            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3973            let buckets = merge_buckets.buckets;
3974            assert_eq!(buckets.len(), 1);
3975            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3976        }
3977    }
3978
3979    #[tokio::test]
3980    async fn test_process_batched_metrics() {
3981        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3982        let received_at = Utc::now();
3983        let config = Config::default();
3984
3985        let (aggregator, mut aggregator_rx) = Addr::custom();
3986        let processor = create_test_processor_with_addrs(
3987            config,
3988            Addrs {
3989                aggregator,
3990                ..Default::default()
3991            },
3992        )
3993        .await;
3994
3995        let payload = r#"{
3996    "buckets": {
3997        "11111111111111111111111111111111": [
3998            {
3999                "timestamp": 1615889440,
4000                "width": 0,
4001                "name": "d:custom/endpoint.response_time@millisecond",
4002                "type": "d",
4003                "value": [
4004                  68.0
4005                ],
4006                "tags": {
4007                  "route": "user_index"
4008                }
4009            }
4010        ],
4011        "22222222222222222222222222222222": [
4012            {
4013                "timestamp": 1615889440,
4014                "width": 0,
4015                "name": "d:custom/endpoint.cache_rate@none",
4016                "type": "d",
4017                "value": [
4018                  36.0
4019                ]
4020            }
4021        ]
4022    }
4023}
4024"#;
4025        let message = ProcessBatchedMetrics {
4026            payload: Bytes::from(payload),
4027            source: BucketSource::Internal,
4028            received_at,
4029            sent_at: Some(Utc::now()),
4030        };
4031        processor.handle_process_batched_metrics(&mut token, message);
4032
4033        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
4034        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
4035
4036        let mut messages = vec![mb1, mb2];
4037        messages.sort_by_key(|mb| mb.project_key);
4038
4039        let actual = messages
4040            .into_iter()
4041            .map(|mb| (mb.project_key, mb.buckets))
4042            .collect::<Vec<_>>();
4043
4044        assert_debug_snapshot!(actual, @r###"
4045        [
4046            (
4047                ProjectKey("11111111111111111111111111111111"),
4048                [
4049                    Bucket {
4050                        timestamp: UnixTimestamp(1615889440),
4051                        width: 0,
4052                        name: MetricName(
4053                            "d:custom/endpoint.response_time@millisecond",
4054                        ),
4055                        value: Distribution(
4056                            [
4057                                68.0,
4058                            ],
4059                        ),
4060                        tags: {
4061                            "route": "user_index",
4062                        },
4063                        metadata: BucketMetadata {
4064                            merges: 1,
4065                            received_at: None,
4066                            extracted_from_indexed: false,
4067                        },
4068                    },
4069                ],
4070            ),
4071            (
4072                ProjectKey("22222222222222222222222222222222"),
4073                [
4074                    Bucket {
4075                        timestamp: UnixTimestamp(1615889440),
4076                        width: 0,
4077                        name: MetricName(
4078                            "d:custom/endpoint.cache_rate@none",
4079                        ),
4080                        value: Distribution(
4081                            [
4082                                36.0,
4083                            ],
4084                        ),
4085                        tags: {},
4086                        metadata: BucketMetadata {
4087                            merges: 1,
4088                            received_at: None,
4089                            extracted_from_indexed: false,
4090                        },
4091                    },
4092                ],
4093            ),
4094        ]
4095        "###);
4096    }
4097}