relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27    ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42    self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::{Integration, SpansIntegration};
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::ExtractedMetrics;
49use crate::metrics_extraction::transactions::types::ExtractMetricsError;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::sessions::SessionsProcessor;
53use crate::processing::spans::SpansProcessor;
54use crate::processing::trace_metrics::TraceMetricsProcessor;
55use crate::processing::transactions::extraction::ExtractMetricsContext;
56use crate::processing::utils::event::{
57    EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
58    event_type,
59};
60use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
61use crate::service::ServiceError;
62use crate::services::global_config::GlobalConfigHandle;
63use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
64use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
65use crate::services::projects::cache::ProjectCacheHandle;
66use crate::services::projects::project::{ProjectInfo, ProjectState};
67use crate::services::upstream::{
68    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
69};
70use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
71use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
72use crate::{http, processing};
73use relay_threading::AsyncPool;
74#[cfg(feature = "processing")]
75use {
76    crate::services::global_rate_limits::{GlobalRateLimits, GlobalRateLimitsServiceHandle},
77    crate::services::processor::nnswitch::SwitchProcessingError,
78    crate::services::store::{Store, StoreEnvelope},
79    crate::services::upload::Upload,
80    crate::utils::Enforcement,
81    itertools::Itertools,
82    relay_cardinality::{
83        CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
84        RedisSetLimiterOptions,
85    },
86    relay_dynamic_config::CardinalityLimiterMode,
87    relay_quotas::{RateLimitingError, RedisRateLimiter},
88    relay_redis::{AsyncRedisClient, RedisClients},
89    std::time::Instant,
90    symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
91};
92
93mod attachment;
94mod dynamic_sampling;
95mod event;
96mod metrics;
97mod nel;
98mod profile;
99mod profile_chunk;
100mod replay;
101mod report;
102mod span;
103
104#[cfg(all(sentry, feature = "processing"))]
105mod playstation;
106mod standalone;
107#[cfg(feature = "processing")]
108mod unreal;
109
110#[cfg(feature = "processing")]
111mod nnswitch;
112
113/// Creates the block only if used with `processing` feature.
114///
115/// Provided code block will be executed only if the provided config has `processing_enabled` set.
116macro_rules! if_processing {
117    ($config:expr, $if_true:block) => {
118        #[cfg(feature = "processing")] {
119            if $config.processing_enabled() $if_true
120        }
121    };
122    ($config:expr, $if_true:block else $if_false:block) => {
123        {
124            #[cfg(feature = "processing")] {
125                if $config.processing_enabled() $if_true else $if_false
126            }
127            #[cfg(not(feature = "processing"))] {
128                $if_false
129            }
130        }
131    };
132}
133
134/// The minimum clock drift for correction to apply.
135pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
136
137#[derive(Debug)]
138pub struct GroupTypeError;
139
140impl Display for GroupTypeError {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        f.write_str("failed to convert processing group into corresponding type")
143    }
144}
145
146impl std::error::Error for GroupTypeError {}
147
148macro_rules! processing_group {
149    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
150        #[derive(Clone, Copy, Debug)]
151        pub struct $ty;
152
153        impl From<$ty> for ProcessingGroup {
154            fn from(_: $ty) -> Self {
155                ProcessingGroup::$variant
156            }
157        }
158
159        impl TryFrom<ProcessingGroup> for $ty {
160            type Error = GroupTypeError;
161
162            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
163                if matches!(value, ProcessingGroup::$variant) {
164                    return Ok($ty);
165                }
166                $($(
167                    if matches!(value, ProcessingGroup::$other) {
168                        return Ok($ty);
169                    }
170                )+)?
171                return Err(GroupTypeError);
172            }
173        }
174    };
175}
176
177/// A marker trait.
178///
179/// Should be used only with groups which are responsible for processing envelopes with events.
180pub trait EventProcessing {}
181
182processing_group!(TransactionGroup, Transaction);
183impl EventProcessing for TransactionGroup {}
184
185processing_group!(ErrorGroup, Error);
186impl EventProcessing for ErrorGroup {}
187
188processing_group!(SessionGroup, Session);
189processing_group!(StandaloneGroup, Standalone);
190processing_group!(ClientReportGroup, ClientReport);
191processing_group!(ReplayGroup, Replay);
192processing_group!(CheckInGroup, CheckIn);
193processing_group!(LogGroup, Log, Nel);
194processing_group!(TraceMetricGroup, TraceMetric);
195processing_group!(SpanGroup, Span);
196
197processing_group!(ProfileChunkGroup, ProfileChunk);
198processing_group!(MetricsGroup, Metrics);
199processing_group!(ForwardUnknownGroup, ForwardUnknown);
200processing_group!(Ungrouped, Ungrouped);
201
202/// Processed group type marker.
203///
204/// Marks the envelopes which passed through the processing pipeline.
205#[derive(Clone, Copy, Debug)]
206pub struct Processed;
207
208/// Describes the groups of the processable items.
209#[derive(Clone, Copy, Debug)]
210pub enum ProcessingGroup {
211    /// All the transaction related items.
212    ///
213    /// Includes transactions, related attachments, profiles.
214    Transaction,
215    /// All the items which require (have or create) events.
216    ///
217    /// This includes: errors, NEL, security reports, user reports, some of the
218    /// attachments.
219    Error,
220    /// Session events.
221    Session,
222    /// Standalone items which can be sent alone without any event attached to it in the current
223    /// envelope e.g. some attachments, user reports.
224    Standalone,
225    /// Outcomes.
226    ClientReport,
227    /// Replays and ReplayRecordings.
228    Replay,
229    /// Crons.
230    CheckIn,
231    /// NEL reports.
232    Nel,
233    /// Logs.
234    Log,
235    /// Trace metrics.
236    TraceMetric,
237    /// Spans.
238    Span,
239    /// Span V2 spans.
240    SpanV2,
241    /// Metrics.
242    Metrics,
243    /// ProfileChunk.
244    ProfileChunk,
245    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
246    /// decide what to do with them.
247    ForwardUnknown,
248    /// All the items in the envelope that could not be grouped.
249    Ungrouped,
250}
251
252impl ProcessingGroup {
253    /// Splits provided envelope into list of tuples of groups with associated envelopes.
254    fn split_envelope(
255        mut envelope: Envelope,
256        project_info: &ProjectInfo,
257    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
258        let headers = envelope.headers().clone();
259        let mut grouped_envelopes = smallvec![];
260
261        // Extract replays.
262        let replay_items = envelope.take_items_by(|item| {
263            matches!(
264                item.ty(),
265                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
266            )
267        });
268        if !replay_items.is_empty() {
269            grouped_envelopes.push((
270                ProcessingGroup::Replay,
271                Envelope::from_parts(headers.clone(), replay_items),
272            ))
273        }
274
275        // Keep all the sessions together in one envelope.
276        let session_items = envelope
277            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
278        if !session_items.is_empty() {
279            grouped_envelopes.push((
280                ProcessingGroup::Session,
281                Envelope::from_parts(headers.clone(), session_items),
282            ))
283        }
284
285        let span_v2_items = envelope.take_items_by(|item| {
286            let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
287            let is_supported_integration = matches!(
288                item.integration(),
289                Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
290            );
291            let is_span = matches!(item.ty(), &ItemType::Span);
292            let is_span_attachment = item.is_attachment_v2();
293
294            ItemContainer::<SpanV2>::is_container(item)
295                || (exp_feature && is_span)
296                || (exp_feature && is_supported_integration)
297                || (exp_feature && is_span_attachment)
298        });
299
300        if !span_v2_items.is_empty() {
301            grouped_envelopes.push((
302                ProcessingGroup::SpanV2,
303                Envelope::from_parts(headers.clone(), span_v2_items),
304            ))
305        }
306
307        // Extract spans.
308        let span_items = envelope.take_items_by(|item| {
309            matches!(item.ty(), &ItemType::Span)
310                || matches!(item.integration(), Some(Integration::Spans(_)))
311        });
312
313        if !span_items.is_empty() {
314            grouped_envelopes.push((
315                ProcessingGroup::Span,
316                Envelope::from_parts(headers.clone(), span_items),
317            ))
318        }
319
320        // Extract logs.
321        let logs_items = envelope.take_items_by(|item| {
322            matches!(item.ty(), &ItemType::Log)
323                || matches!(item.integration(), Some(Integration::Logs(_)))
324        });
325        if !logs_items.is_empty() {
326            grouped_envelopes.push((
327                ProcessingGroup::Log,
328                Envelope::from_parts(headers.clone(), logs_items),
329            ))
330        }
331
332        // Extract trace metrics.
333        let trace_metric_items =
334            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
335        if !trace_metric_items.is_empty() {
336            grouped_envelopes.push((
337                ProcessingGroup::TraceMetric,
338                Envelope::from_parts(headers.clone(), trace_metric_items),
339            ))
340        }
341
342        // NEL items are transformed into logs in their own processing step.
343        let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
344        if !nel_items.is_empty() {
345            grouped_envelopes.push((
346                ProcessingGroup::Nel,
347                Envelope::from_parts(headers.clone(), nel_items),
348            ))
349        }
350
351        // Extract all metric items.
352        //
353        // Note: Should only be relevant in proxy mode. In other modes we send metrics through
354        // a separate pipeline.
355        let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
356        if !metric_items.is_empty() {
357            grouped_envelopes.push((
358                ProcessingGroup::Metrics,
359                Envelope::from_parts(headers.clone(), metric_items),
360            ))
361        }
362
363        // Extract profile chunks.
364        let profile_chunk_items =
365            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
366        if !profile_chunk_items.is_empty() {
367            grouped_envelopes.push((
368                ProcessingGroup::ProfileChunk,
369                Envelope::from_parts(headers.clone(), profile_chunk_items),
370            ))
371        }
372
373        // Extract all standalone items.
374        //
375        // Note: only if there are no items in the envelope which can create events, otherwise they
376        // will be in the same envelope with all require event items.
377        if !envelope.items().any(Item::creates_event) {
378            let standalone_items = envelope.take_items_by(Item::requires_event);
379            if !standalone_items.is_empty() {
380                grouped_envelopes.push((
381                    ProcessingGroup::Standalone,
382                    Envelope::from_parts(headers.clone(), standalone_items),
383                ))
384            }
385        };
386
387        // Make sure we create separate envelopes for each `RawSecurity` report.
388        let security_reports_items = envelope
389            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
390            .into_iter()
391            .map(|item| {
392                let headers = headers.clone();
393                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
394                let mut envelope = Envelope::from_parts(headers, items);
395                envelope.set_event_id(EventId::new());
396                (ProcessingGroup::Error, envelope)
397            });
398        grouped_envelopes.extend(security_reports_items);
399
400        // Extract all the items which require an event into separate envelope.
401        let require_event_items = envelope.take_items_by(Item::requires_event);
402        if !require_event_items.is_empty() {
403            let group = if require_event_items
404                .iter()
405                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
406            {
407                ProcessingGroup::Transaction
408            } else {
409                ProcessingGroup::Error
410            };
411
412            grouped_envelopes.push((
413                group,
414                Envelope::from_parts(headers.clone(), require_event_items),
415            ))
416        }
417
418        // Get the rest of the envelopes, one per item.
419        let envelopes = envelope.items_mut().map(|item| {
420            let headers = headers.clone();
421            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
422            let envelope = Envelope::from_parts(headers, items);
423            let item_type = item.ty();
424            let group = if matches!(item_type, &ItemType::CheckIn) {
425                ProcessingGroup::CheckIn
426            } else if matches!(item.ty(), &ItemType::ClientReport) {
427                ProcessingGroup::ClientReport
428            } else if matches!(item_type, &ItemType::Unknown(_)) {
429                ProcessingGroup::ForwardUnknown
430            } else {
431                // Cannot group this item type.
432                ProcessingGroup::Ungrouped
433            };
434
435            (group, envelope)
436        });
437        grouped_envelopes.extend(envelopes);
438
439        grouped_envelopes
440    }
441
442    /// Returns the name of the group.
443    pub fn variant(&self) -> &'static str {
444        match self {
445            ProcessingGroup::Transaction => "transaction",
446            ProcessingGroup::Error => "error",
447            ProcessingGroup::Session => "session",
448            ProcessingGroup::Standalone => "standalone",
449            ProcessingGroup::ClientReport => "client_report",
450            ProcessingGroup::Replay => "replay",
451            ProcessingGroup::CheckIn => "check_in",
452            ProcessingGroup::Log => "log",
453            ProcessingGroup::TraceMetric => "trace_metric",
454            ProcessingGroup::Nel => "nel",
455            ProcessingGroup::Span => "span",
456            ProcessingGroup::SpanV2 => "span_v2",
457            ProcessingGroup::Metrics => "metrics",
458            ProcessingGroup::ProfileChunk => "profile_chunk",
459            ProcessingGroup::ForwardUnknown => "forward_unknown",
460            ProcessingGroup::Ungrouped => "ungrouped",
461        }
462    }
463}
464
465impl From<ProcessingGroup> for AppFeature {
466    fn from(value: ProcessingGroup) -> Self {
467        match value {
468            ProcessingGroup::Transaction => AppFeature::Transactions,
469            ProcessingGroup::Error => AppFeature::Errors,
470            ProcessingGroup::Session => AppFeature::Sessions,
471            ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
472            ProcessingGroup::ClientReport => AppFeature::ClientReports,
473            ProcessingGroup::Replay => AppFeature::Replays,
474            ProcessingGroup::CheckIn => AppFeature::CheckIns,
475            ProcessingGroup::Log => AppFeature::Logs,
476            ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
477            ProcessingGroup::Nel => AppFeature::Logs,
478            ProcessingGroup::Span => AppFeature::Spans,
479            ProcessingGroup::SpanV2 => AppFeature::Spans,
480            ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
481            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
482            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
483            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
484        }
485    }
486}
487
488/// An error returned when handling [`ProcessEnvelope`].
489#[derive(Debug, thiserror::Error)]
490pub enum ProcessingError {
491    #[error("invalid json in event")]
492    InvalidJson(#[source] serde_json::Error),
493
494    #[error("invalid message pack event payload")]
495    InvalidMsgpack(#[from] rmp_serde::decode::Error),
496
497    #[cfg(feature = "processing")]
498    #[error("invalid unreal crash report")]
499    InvalidUnrealReport(#[source] Unreal4Error),
500
501    #[error("event payload too large")]
502    PayloadTooLarge(DiscardItemType),
503
504    #[error("invalid transaction event")]
505    InvalidTransaction,
506
507    #[error("envelope processor failed")]
508    ProcessingFailed(#[from] ProcessingAction),
509
510    #[error("duplicate {0} in event")]
511    DuplicateItem(ItemType),
512
513    #[error("failed to extract event payload")]
514    NoEventPayload,
515
516    #[error("missing project id in DSN")]
517    MissingProjectId,
518
519    #[error("invalid security report type: {0:?}")]
520    InvalidSecurityType(Bytes),
521
522    #[error("unsupported security report type")]
523    UnsupportedSecurityType,
524
525    #[error("invalid security report")]
526    InvalidSecurityReport(#[source] serde_json::Error),
527
528    #[error("invalid nel report")]
529    InvalidNelReport(#[source] NetworkReportError),
530
531    #[error("event filtered with reason: {0:?}")]
532    EventFiltered(FilterStatKey),
533
534    #[error("missing or invalid required event timestamp")]
535    InvalidTimestamp,
536
537    #[error("could not serialize event payload")]
538    SerializeFailed(#[source] serde_json::Error),
539
540    #[cfg(feature = "processing")]
541    #[error("failed to apply quotas")]
542    QuotasFailed(#[from] RateLimitingError),
543
544    #[error("invalid pii config")]
545    PiiConfigError(PiiConfigError),
546
547    #[error("invalid processing group type")]
548    InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
549
550    #[error("invalid replay")]
551    InvalidReplay(DiscardReason),
552
553    #[error("replay filtered with reason: {0:?}")]
554    ReplayFiltered(FilterStatKey),
555
556    #[cfg(feature = "processing")]
557    #[error("nintendo switch dying message processing failed {0:?}")]
558    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
559
560    #[cfg(all(sentry, feature = "processing"))]
561    #[error("playstation dump processing failed: {0}")]
562    InvalidPlaystationDump(String),
563
564    #[error("processing group does not match specific processor")]
565    ProcessingGroupMismatch,
566    #[error("new processing pipeline failed")]
567    ProcessingFailure,
568}
569
570impl ProcessingError {
571    pub fn to_outcome(&self) -> Option<Outcome> {
572        match self {
573            Self::PayloadTooLarge(payload_type) => {
574                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
575            }
576            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
577            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
578            Self::InvalidSecurityType(_) => {
579                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
580            }
581            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
582            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
583            Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
584            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
585            Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
586            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
587            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
588            #[cfg(feature = "processing")]
589            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
590            #[cfg(all(sentry, feature = "processing"))]
591            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
592            #[cfg(feature = "processing")]
593            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
594                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
595            }
596            #[cfg(feature = "processing")]
597            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
598            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
599                Some(Outcome::Invalid(DiscardReason::Internal))
600            }
601            #[cfg(feature = "processing")]
602            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
603            Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
604            Self::MissingProjectId => None,
605            Self::EventFiltered(_) => None,
606            Self::InvalidProcessingGroup(_) => None,
607            Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
608            Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
609
610            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
611            // Outcomes are emitted in the new processing pipeline already.
612            Self::ProcessingFailure => None,
613        }
614    }
615
616    fn is_unexpected(&self) -> bool {
617        self.to_outcome()
618            .is_some_and(|outcome| outcome.is_unexpected())
619    }
620}
621
622#[cfg(feature = "processing")]
623impl From<Unreal4Error> for ProcessingError {
624    fn from(err: Unreal4Error) -> Self {
625        match err.kind() {
626            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
627            _ => ProcessingError::InvalidUnrealReport(err),
628        }
629    }
630}
631
632impl From<ExtractMetricsError> for ProcessingError {
633    fn from(error: ExtractMetricsError) -> Self {
634        match error {
635            ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
636                Self::InvalidTimestamp
637            }
638        }
639    }
640}
641
642impl From<InvalidProcessingGroupType> for ProcessingError {
643    fn from(value: InvalidProcessingGroupType) -> Self {
644        Self::InvalidProcessingGroup(Box::new(value))
645    }
646}
647
648type ExtractedEvent = (Annotated<Event>, usize);
649
650/// A container for extracted metrics during processing.
651///
652/// The container enforces that the extracted metrics are correctly tagged
653/// with the dynamic sampling decision.
654#[derive(Debug)]
655pub struct ProcessingExtractedMetrics {
656    metrics: ExtractedMetrics,
657}
658
659impl ProcessingExtractedMetrics {
660    pub fn new() -> Self {
661        Self {
662            metrics: ExtractedMetrics::default(),
663        }
664    }
665
666    pub fn into_inner(self) -> ExtractedMetrics {
667        self.metrics
668    }
669
670    /// Extends the contained metrics with [`ExtractedMetrics`].
671    pub fn extend(
672        &mut self,
673        extracted: ExtractedMetrics,
674        sampling_decision: Option<SamplingDecision>,
675    ) {
676        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
677        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
678    }
679
680    /// Extends the contained project metrics.
681    pub fn extend_project_metrics<I>(
682        &mut self,
683        buckets: I,
684        sampling_decision: Option<SamplingDecision>,
685    ) where
686        I: IntoIterator<Item = Bucket>,
687    {
688        self.metrics
689            .project_metrics
690            .extend(buckets.into_iter().map(|mut bucket| {
691                bucket.metadata.extracted_from_indexed =
692                    sampling_decision == Some(SamplingDecision::Keep);
693                bucket
694            }));
695    }
696
697    /// Extends the contained sampling metrics.
698    pub fn extend_sampling_metrics<I>(
699        &mut self,
700        buckets: I,
701        sampling_decision: Option<SamplingDecision>,
702    ) where
703        I: IntoIterator<Item = Bucket>,
704    {
705        self.metrics
706            .sampling_metrics
707            .extend(buckets.into_iter().map(|mut bucket| {
708                bucket.metadata.extracted_from_indexed =
709                    sampling_decision == Some(SamplingDecision::Keep);
710                bucket
711            }));
712    }
713
714    /// Applies rate limits to the contained metrics.
715    ///
716    /// This is used to apply rate limits which have been enforced on sampled items of an envelope
717    /// to also consistently apply to the metrics extracted from these items.
718    #[cfg(feature = "processing")]
719    fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
720        // Metric namespaces which need to be dropped.
721        let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
722        // Metrics belonging to this metric namespace need to have the `extracted_from_indexed`
723        // flag reset to `false`.
724        let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
725
726        for (namespace, limit, indexed) in [
727            (
728                MetricNamespace::Transactions,
729                &enforcement.event,
730                &enforcement.event_indexed,
731            ),
732            (
733                MetricNamespace::Spans,
734                &enforcement.spans,
735                &enforcement.spans_indexed,
736            ),
737        ] {
738            if limit.is_active() {
739                drop_namespaces.push(namespace);
740            } else if indexed.is_active() && !enforced_consistently {
741                // If the enforcement was not computed by consistently checking the limits,
742                // the quota for the metrics has not yet been incremented.
743                // In this case we have a dropped indexed payload but a metric which still needs to
744                // be accounted for, make sure the metric will still be rate limited.
745                reset_extracted_from_indexed.push(namespace);
746            }
747        }
748
749        if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
750            self.retain_mut(|bucket| {
751                let Some(namespace) = bucket.name.try_namespace() else {
752                    return true;
753                };
754
755                if drop_namespaces.contains(&namespace) {
756                    return false;
757                }
758
759                if reset_extracted_from_indexed.contains(&namespace) {
760                    bucket.metadata.extracted_from_indexed = false;
761                }
762
763                true
764            });
765        }
766    }
767
768    #[cfg(feature = "processing")]
769    fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
770        self.metrics.project_metrics.retain_mut(&mut f);
771        self.metrics.sampling_metrics.retain_mut(&mut f);
772    }
773}
774
775fn send_metrics(
776    metrics: ExtractedMetrics,
777    project_key: ProjectKey,
778    sampling_key: Option<ProjectKey>,
779    aggregator: &Addr<Aggregator>,
780) {
781    let ExtractedMetrics {
782        project_metrics,
783        sampling_metrics,
784    } = metrics;
785
786    if !project_metrics.is_empty() {
787        aggregator.send(MergeBuckets {
788            project_key,
789            buckets: project_metrics,
790        });
791    }
792
793    if !sampling_metrics.is_empty() {
794        // If no sampling project state is available, we associate the sampling
795        // metrics with the current project.
796        //
797        // project_without_tracing         -> metrics goes to self
798        // dependent_project_with_tracing  -> metrics goes to root
799        // root_project_with_tracing       -> metrics goes to root == self
800        let sampling_project_key = sampling_key.unwrap_or(project_key);
801        aggregator.send(MergeBuckets {
802            project_key: sampling_project_key,
803            buckets: sampling_metrics,
804        });
805    }
806}
807
808/// Function for on-off switches that filter specific item types (profiles, spans)
809/// based on a feature flag.
810///
811/// If the project config did not come from the upstream, we keep the items.
812fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
813    match config.relay_mode() {
814        RelayMode::Proxy => false,
815        RelayMode::Managed => !project_info.has_feature(feature),
816    }
817}
818
819/// The result of the envelope processing containing the processed envelope along with the partial
820/// result.
821#[derive(Debug)]
822#[expect(
823    clippy::large_enum_variant,
824    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
825)]
826enum ProcessingResult {
827    Envelope {
828        managed_envelope: TypedEnvelope<Processed>,
829        extracted_metrics: ProcessingExtractedMetrics,
830    },
831    Output(Output<Outputs>),
832}
833
834impl ProcessingResult {
835    /// Creates a [`ProcessingResult`] with no metrics.
836    fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
837        Self::Envelope {
838            managed_envelope,
839            extracted_metrics: ProcessingExtractedMetrics::new(),
840        }
841    }
842}
843
844/// All items which can be submitted upstream.
845#[derive(Debug)]
846#[expect(
847    clippy::large_enum_variant,
848    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
849)]
850enum Submit<'a> {
851    /// A processed envelope.
852    Envelope(TypedEnvelope<Processed>),
853    /// The output of a [`processing::Processor`].
854    Output {
855        output: Outputs,
856        ctx: processing::ForwardContext<'a>,
857    },
858}
859
860/// Applies processing to all contents of the given envelope.
861///
862/// Depending on the contents of the envelope and Relay's mode, this includes:
863///
864///  - Basic normalization and validation for all item types.
865///  - Clock drift correction if the required `sent_at` header is present.
866///  - Expansion of certain item types (e.g. unreal).
867///  - Store normalization for event payloads in processing mode.
868///  - Rate limiters and inbound filters on events in processing mode.
869#[derive(Debug)]
870pub struct ProcessEnvelope {
871    /// Envelope to process.
872    pub envelope: ManagedEnvelope,
873    /// The project info.
874    pub project_info: Arc<ProjectInfo>,
875    /// Currently active cached rate limits for this project.
876    pub rate_limits: Arc<RateLimits>,
877    /// Root sampling project info.
878    pub sampling_project_info: Option<Arc<ProjectInfo>>,
879    /// Sampling reservoir counters.
880    pub reservoir_counters: ReservoirCounters,
881}
882
883/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
884#[derive(Debug)]
885struct ProcessEnvelopeGrouped<'a> {
886    /// The group the envelope belongs to.
887    pub group: ProcessingGroup,
888    /// Envelope to process.
889    pub envelope: ManagedEnvelope,
890    /// The processing context.
891    pub ctx: processing::Context<'a>,
892}
893
894/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
895///
896/// This parses and validates the metrics:
897///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
898///    ignored independently.
899///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
900///    dropped together on parsing failure.
901///  - Other envelope items will be ignored with an error message.
902///
903/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
904/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
905#[derive(Debug)]
906pub struct ProcessMetrics {
907    /// A list of metric items.
908    pub data: MetricData,
909    /// The target project.
910    pub project_key: ProjectKey,
911    /// Whether to keep or reset the metric metadata.
912    pub source: BucketSource,
913    /// The wall clock time at which the request was received.
914    pub received_at: DateTime<Utc>,
915    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
916    /// correction.
917    pub sent_at: Option<DateTime<Utc>>,
918}
919
920/// Raw unparsed metric data.
921#[derive(Debug)]
922pub enum MetricData {
923    /// Raw data, unparsed envelope items.
924    Raw(Vec<Item>),
925    /// Already parsed buckets but unprocessed.
926    Parsed(Vec<Bucket>),
927}
928
929impl MetricData {
930    /// Consumes the metric data and parses the contained buckets.
931    ///
932    /// If the contained data is already parsed the buckets are returned unchanged.
933    /// Raw buckets are parsed and created with the passed `timestamp`.
934    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
935        let items = match self {
936            Self::Parsed(buckets) => return buckets,
937            Self::Raw(items) => items,
938        };
939
940        let mut buckets = Vec::new();
941        for item in items {
942            let payload = item.payload();
943            if item.ty() == &ItemType::Statsd {
944                for bucket_result in Bucket::parse_all(&payload, timestamp) {
945                    match bucket_result {
946                        Ok(bucket) => buckets.push(bucket),
947                        Err(error) => relay_log::debug!(
948                            error = &error as &dyn Error,
949                            "failed to parse metric bucket from statsd format",
950                        ),
951                    }
952                }
953            } else if item.ty() == &ItemType::MetricBuckets {
954                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
955                    Ok(parsed_buckets) => {
956                        // Re-use the allocation of `b` if possible.
957                        if buckets.is_empty() {
958                            buckets = parsed_buckets;
959                        } else {
960                            buckets.extend(parsed_buckets);
961                        }
962                    }
963                    Err(error) => {
964                        relay_log::debug!(
965                            error = &error as &dyn Error,
966                            "failed to parse metric bucket",
967                        );
968                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
969                    }
970                }
971            } else {
972                relay_log::error!(
973                    "invalid item of type {} passed to ProcessMetrics",
974                    item.ty()
975                );
976            }
977        }
978        buckets
979    }
980}
981
982#[derive(Debug)]
983pub struct ProcessBatchedMetrics {
984    /// Metrics payload in JSON format.
985    pub payload: Bytes,
986    /// Whether to keep or reset the metric metadata.
987    pub source: BucketSource,
988    /// The wall clock time at which the request was received.
989    pub received_at: DateTime<Utc>,
990    /// The wall clock time at which the request was received.
991    pub sent_at: Option<DateTime<Utc>>,
992}
993
994/// Source information where a metric bucket originates from.
995#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
996pub enum BucketSource {
997    /// The metric bucket originated from an internal Relay use case.
998    ///
999    /// The metric bucket originates either from within the same Relay
1000    /// or was accepted coming from another Relay which is registered as
1001    /// an internal Relay via Relay's configuration.
1002    Internal,
1003    /// The bucket source originated from an untrusted source.
1004    ///
1005    /// Managed Relays sending extracted metrics are considered external,
1006    /// it's a project use case but it comes from an untrusted source.
1007    External,
1008}
1009
1010impl BucketSource {
1011    /// Infers the bucket source from [`RequestMeta::request_trust`].
1012    pub fn from_meta(meta: &RequestMeta) -> Self {
1013        match meta.request_trust() {
1014            RequestTrust::Trusted => Self::Internal,
1015            RequestTrust::Untrusted => Self::External,
1016        }
1017    }
1018}
1019
1020/// Sends a client report to the upstream.
1021#[derive(Debug)]
1022pub struct SubmitClientReports {
1023    /// The client report to be sent.
1024    pub client_reports: Vec<ClientReport>,
1025    /// Scoping information for the client report.
1026    pub scoping: Scoping,
1027}
1028
1029/// CPU-intensive processing tasks for envelopes.
1030#[derive(Debug)]
1031pub enum EnvelopeProcessor {
1032    ProcessEnvelope(Box<ProcessEnvelope>),
1033    ProcessProjectMetrics(Box<ProcessMetrics>),
1034    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1035    FlushBuckets(Box<FlushBuckets>),
1036    SubmitClientReports(Box<SubmitClientReports>),
1037}
1038
1039impl EnvelopeProcessor {
1040    /// Returns the name of the message variant.
1041    pub fn variant(&self) -> &'static str {
1042        match self {
1043            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1044            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1045            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1046            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1047            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1048        }
1049    }
1050}
1051
1052impl relay_system::Interface for EnvelopeProcessor {}
1053
1054impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1055    type Response = relay_system::NoResponse;
1056
1057    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1058        Self::ProcessEnvelope(Box::new(message))
1059    }
1060}
1061
1062impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1063    type Response = NoResponse;
1064
1065    fn from_message(message: ProcessMetrics, _: ()) -> Self {
1066        Self::ProcessProjectMetrics(Box::new(message))
1067    }
1068}
1069
1070impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1071    type Response = NoResponse;
1072
1073    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1074        Self::ProcessBatchedMetrics(Box::new(message))
1075    }
1076}
1077
1078impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1079    type Response = NoResponse;
1080
1081    fn from_message(message: FlushBuckets, _: ()) -> Self {
1082        Self::FlushBuckets(Box::new(message))
1083    }
1084}
1085
1086impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1087    type Response = NoResponse;
1088
1089    fn from_message(message: SubmitClientReports, _: ()) -> Self {
1090        Self::SubmitClientReports(Box::new(message))
1091    }
1092}
1093
1094/// The asynchronous thread pool used for scheduling processing tasks in the processor.
1095pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1096
1097/// Service implementing the [`EnvelopeProcessor`] interface.
1098///
1099/// This service handles messages in a worker pool with configurable concurrency.
1100#[derive(Clone)]
1101pub struct EnvelopeProcessorService {
1102    inner: Arc<InnerProcessor>,
1103}
1104
1105/// Contains the addresses of services that the processor publishes to.
1106pub struct Addrs {
1107    pub outcome_aggregator: Addr<TrackOutcome>,
1108    pub upstream_relay: Addr<UpstreamRelay>,
1109    #[cfg(feature = "processing")]
1110    pub upload: Option<Addr<Upload>>,
1111    #[cfg(feature = "processing")]
1112    pub store_forwarder: Option<Addr<Store>>,
1113    pub aggregator: Addr<Aggregator>,
1114    #[cfg(feature = "processing")]
1115    pub global_rate_limits: Option<Addr<GlobalRateLimits>>,
1116}
1117
1118impl Default for Addrs {
1119    fn default() -> Self {
1120        Addrs {
1121            outcome_aggregator: Addr::dummy(),
1122            upstream_relay: Addr::dummy(),
1123            #[cfg(feature = "processing")]
1124            upload: None,
1125            #[cfg(feature = "processing")]
1126            store_forwarder: None,
1127            aggregator: Addr::dummy(),
1128            #[cfg(feature = "processing")]
1129            global_rate_limits: None,
1130        }
1131    }
1132}
1133
1134struct InnerProcessor {
1135    pool: EnvelopeProcessorServicePool,
1136    config: Arc<Config>,
1137    global_config: GlobalConfigHandle,
1138    project_cache: ProjectCacheHandle,
1139    cogs: Cogs,
1140    #[cfg(feature = "processing")]
1141    quotas_client: Option<AsyncRedisClient>,
1142    addrs: Addrs,
1143    #[cfg(feature = "processing")]
1144    rate_limiter: Option<Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>>,
1145    geoip_lookup: GeoIpLookup,
1146    #[cfg(feature = "processing")]
1147    cardinality_limiter: Option<CardinalityLimiter>,
1148    metric_outcomes: MetricOutcomes,
1149    processing: Processing,
1150}
1151
1152struct Processing {
1153    logs: LogsProcessor,
1154    trace_metrics: TraceMetricsProcessor,
1155    spans: SpansProcessor,
1156    check_ins: CheckInsProcessor,
1157    sessions: SessionsProcessor,
1158}
1159
1160impl EnvelopeProcessorService {
1161    /// Creates a multi-threaded envelope processor.
1162    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1163    pub fn new(
1164        pool: EnvelopeProcessorServicePool,
1165        config: Arc<Config>,
1166        global_config: GlobalConfigHandle,
1167        project_cache: ProjectCacheHandle,
1168        cogs: Cogs,
1169        #[cfg(feature = "processing")] redis: Option<RedisClients>,
1170        addrs: Addrs,
1171        metric_outcomes: MetricOutcomes,
1172    ) -> Self {
1173        let geoip_lookup = config
1174            .geoip_path()
1175            .and_then(
1176                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1177                    Ok(geoip) => Some(geoip),
1178                    Err(err) => {
1179                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1180                        None
1181                    }
1182                },
1183            )
1184            .unwrap_or_else(GeoIpLookup::empty);
1185
1186        #[cfg(feature = "processing")]
1187        let (cardinality, quotas) = match redis {
1188            Some(RedisClients {
1189                cardinality,
1190                quotas,
1191                ..
1192            }) => (Some(cardinality), Some(quotas)),
1193            None => (None, None),
1194        };
1195
1196        #[cfg(feature = "processing")]
1197        let global_rate_limits = addrs.global_rate_limits.clone().map(Into::into);
1198
1199        #[cfg(feature = "processing")]
1200        let rate_limiter = match (quotas.clone(), global_rate_limits) {
1201            (Some(redis), Some(global)) => Some(
1202                RedisRateLimiter::new(redis, global)
1203                    .max_limit(config.max_rate_limit())
1204                    .cache(config.quota_cache_ratio(), config.quota_cache_max()),
1205            ),
1206            _ => None,
1207        };
1208
1209        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1210            #[cfg(feature = "processing")]
1211            project_cache.clone(),
1212            #[cfg(feature = "processing")]
1213            rate_limiter.clone(),
1214        ));
1215        #[cfg(feature = "processing")]
1216        let rate_limiter = rate_limiter.map(Arc::new);
1217
1218        let inner = InnerProcessor {
1219            pool,
1220            global_config,
1221            project_cache,
1222            cogs,
1223            #[cfg(feature = "processing")]
1224            quotas_client: quotas.clone(),
1225            #[cfg(feature = "processing")]
1226            rate_limiter,
1227            addrs,
1228            #[cfg(feature = "processing")]
1229            cardinality_limiter: cardinality
1230                .map(|cardinality| {
1231                    RedisSetLimiter::new(
1232                        RedisSetLimiterOptions {
1233                            cache_vacuum_interval: config
1234                                .cardinality_limiter_cache_vacuum_interval(),
1235                        },
1236                        cardinality,
1237                    )
1238                })
1239                .map(CardinalityLimiter::new),
1240            metric_outcomes,
1241            processing: Processing {
1242                logs: LogsProcessor::new(Arc::clone(&quota_limiter)),
1243                trace_metrics: TraceMetricsProcessor::new(Arc::clone(&quota_limiter)),
1244                spans: SpansProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1245                check_ins: CheckInsProcessor::new(Arc::clone(&quota_limiter)),
1246                sessions: SessionsProcessor::new(quota_limiter),
1247            },
1248            geoip_lookup,
1249            config,
1250        };
1251
1252        Self {
1253            inner: Arc::new(inner),
1254        }
1255    }
1256
1257    async fn enforce_quotas<Group>(
1258        &self,
1259        managed_envelope: &mut TypedEnvelope<Group>,
1260        event: Annotated<Event>,
1261        extracted_metrics: &mut ProcessingExtractedMetrics,
1262        ctx: processing::Context<'_>,
1263    ) -> Result<Annotated<Event>, ProcessingError> {
1264        // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not
1265        // applied in the fast path, all cached quotas can be applied here.
1266        let cached_result = RateLimiter::Cached
1267            .enforce(managed_envelope, event, extracted_metrics, ctx)
1268            .await?;
1269
1270        if_processing!(self.inner.config, {
1271            let rate_limiter = match self.inner.rate_limiter.clone() {
1272                Some(rate_limiter) => rate_limiter,
1273                None => return Ok(cached_result.event),
1274            };
1275
1276            // Enforce all quotas consistently with Redis.
1277            let consistent_result = RateLimiter::Consistent(rate_limiter)
1278                .enforce(
1279                    managed_envelope,
1280                    cached_result.event,
1281                    extracted_metrics,
1282                    ctx
1283                )
1284                .await?;
1285
1286            // Update cached rate limits with the freshly computed ones.
1287            if !consistent_result.rate_limits.is_empty() {
1288                self.inner
1289                    .project_cache
1290                    .get(managed_envelope.scoping().project_key)
1291                    .rate_limits()
1292                    .merge(consistent_result.rate_limits);
1293            }
1294
1295            Ok(consistent_result.event)
1296        } else { Ok(cached_result.event) })
1297    }
1298
1299    /// Processes the general errors, and the items which require or create the events.
1300    async fn process_errors(
1301        &self,
1302        managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1303        project_id: ProjectId,
1304        mut ctx: processing::Context<'_>,
1305    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1306        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1307        let mut metrics = Metrics::default();
1308        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1309
1310        // Events can also contain user reports.
1311        report::process_user_reports(managed_envelope);
1312
1313        if_processing!(self.inner.config, {
1314            unreal::expand(managed_envelope, &self.inner.config)?;
1315            #[cfg(sentry)]
1316            playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1317            nnswitch::expand(managed_envelope)?;
1318        });
1319
1320        let extraction_result = event::extract(
1321            managed_envelope,
1322            &mut metrics,
1323            event_fully_normalized,
1324            &self.inner.config,
1325        )?;
1326        let mut event = extraction_result.event;
1327
1328        if_processing!(self.inner.config, {
1329            if let Some(inner_event_fully_normalized) =
1330                unreal::process(managed_envelope, &mut event)?
1331            {
1332                event_fully_normalized = inner_event_fully_normalized;
1333            }
1334            #[cfg(sentry)]
1335            if let Some(inner_event_fully_normalized) =
1336                playstation::process(managed_envelope, &mut event, ctx.project_info)?
1337            {
1338                event_fully_normalized = inner_event_fully_normalized;
1339            }
1340            if let Some(inner_event_fully_normalized) =
1341                attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1342            {
1343                event_fully_normalized = inner_event_fully_normalized;
1344            }
1345        });
1346
1347        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1348            managed_envelope,
1349            &mut event,
1350            ctx.project_info,
1351            ctx.sampling_project_info,
1352        );
1353
1354        let attachments = managed_envelope
1355            .envelope()
1356            .items()
1357            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1358        processing::utils::event::finalize(
1359            managed_envelope.envelope().headers(),
1360            &mut event,
1361            attachments,
1362            &mut metrics,
1363            ctx.config,
1364        )?;
1365        event_fully_normalized = processing::utils::event::normalize(
1366            managed_envelope.envelope().headers(),
1367            &mut event,
1368            event_fully_normalized,
1369            project_id,
1370            ctx,
1371            &self.inner.geoip_lookup,
1372        )?;
1373        let filter_run =
1374            processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1375                .map_err(|err| {
1376                managed_envelope.reject(Outcome::Filtered(err.clone()));
1377                ProcessingError::EventFiltered(err)
1378            })?;
1379
1380        if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1381            dynamic_sampling::tag_error_with_sampling_decision(
1382                managed_envelope,
1383                &mut event,
1384                ctx.sampling_project_info,
1385                &self.inner.config,
1386            )
1387            .await;
1388        }
1389
1390        event = self
1391            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1392            .await?;
1393
1394        if event.value().is_some() {
1395            processing::utils::event::scrub(&mut event, ctx.project_info)?;
1396            event::serialize(
1397                managed_envelope,
1398                &mut event,
1399                event_fully_normalized,
1400                EventMetricsExtracted(false),
1401                SpansExtracted(false),
1402            )?;
1403            event::emit_feedback_metrics(managed_envelope.envelope());
1404        }
1405
1406        let attachments = managed_envelope
1407            .envelope_mut()
1408            .items_mut()
1409            .filter(|i| i.ty() == &ItemType::Attachment);
1410        processing::utils::attachments::scrub(attachments, ctx.project_info);
1411
1412        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1413            relay_log::error!(
1414                tags.project = %project_id,
1415                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1416                "ingested event without normalizing"
1417            );
1418        }
1419
1420        Ok(Some(extracted_metrics))
1421    }
1422
1423    /// Processes only transactions and transaction-related items.
1424    #[allow(unused_assignments)]
1425    async fn process_transactions(
1426        &self,
1427        managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1428        cogs: &mut Token,
1429        project_id: ProjectId,
1430        mut ctx: processing::Context<'_>,
1431    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1432        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1433        let mut event_metrics_extracted = EventMetricsExtracted(false);
1434        let mut spans_extracted = SpansExtracted(false);
1435        let mut metrics = Metrics::default();
1436        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1437
1438        // We extract the main event from the envelope.
1439        let extraction_result = event::extract(
1440            managed_envelope,
1441            &mut metrics,
1442            event_fully_normalized,
1443            &self.inner.config,
1444        )?;
1445
1446        // If metrics were extracted we mark that.
1447        if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1448            event_metrics_extracted = inner_event_metrics_extracted;
1449        }
1450        if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1451            spans_extracted = inner_spans_extracted;
1452        };
1453
1454        // We take the main event out of the result.
1455        let mut event = extraction_result.event;
1456
1457        let profile_id = profile::filter(
1458            managed_envelope,
1459            &event,
1460            ctx.config,
1461            project_id,
1462            ctx.project_info,
1463        );
1464        processing::transactions::profile::transfer_id(&mut event, profile_id);
1465        processing::transactions::profile::remove_context_if_rate_limited(
1466            &mut event,
1467            managed_envelope.scoping(),
1468            ctx,
1469        );
1470
1471        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1472            managed_envelope,
1473            &mut event,
1474            ctx.project_info,
1475            ctx.sampling_project_info,
1476        );
1477
1478        let attachments = managed_envelope
1479            .envelope()
1480            .items()
1481            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1482        processing::utils::event::finalize(
1483            managed_envelope.envelope().headers(),
1484            &mut event,
1485            attachments,
1486            &mut metrics,
1487            &self.inner.config,
1488        )?;
1489
1490        event_fully_normalized = processing::utils::event::normalize(
1491            managed_envelope.envelope().headers(),
1492            &mut event,
1493            event_fully_normalized,
1494            project_id,
1495            ctx,
1496            &self.inner.geoip_lookup,
1497        )?;
1498
1499        let filter_run =
1500            processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1501                .map_err(|err| {
1502                managed_envelope.reject(Outcome::Filtered(err.clone()));
1503                ProcessingError::EventFiltered(err)
1504            })?;
1505
1506        // Always run dynamic sampling on processing Relays,
1507        // but delay decision until inbound filters have been fully processed.
1508        // Also, we require transaction metrics to be enabled before sampling.
1509        let run_dynamic_sampling = (matches!(filter_run, FiltersStatus::Ok)
1510            || self.inner.config.processing_enabled())
1511            && matches!(&ctx.project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled());
1512
1513        let sampling_result = match run_dynamic_sampling {
1514            true => {
1515                #[allow(unused_mut)]
1516                let mut reservoir = ReservoirEvaluator::new(Arc::clone(ctx.reservoir_counters));
1517                #[cfg(feature = "processing")]
1518                if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
1519                    reservoir.set_redis(managed_envelope.scoping().organization_id, quotas_client);
1520                }
1521                processing::utils::dynamic_sampling::run(
1522                    managed_envelope.envelope().headers().dsc(),
1523                    &event,
1524                    &ctx,
1525                    Some(&reservoir),
1526                )
1527                .await
1528            }
1529            false => SamplingResult::Pending,
1530        };
1531
1532        relay_statsd::metric!(
1533            counter(RelayCounters::SamplingDecision) += 1,
1534            decision = sampling_result.decision().as_str(),
1535            item = "transaction"
1536        );
1537
1538        #[cfg(feature = "processing")]
1539        let server_sample_rate = sampling_result.sample_rate();
1540
1541        if let Some(outcome) = sampling_result.into_dropped_outcome() {
1542            // Process profiles before dropping the transaction, if necessary.
1543            // Before metric extraction to make sure the profile count is reflected correctly.
1544            profile::process(
1545                managed_envelope,
1546                &mut event,
1547                ctx.global_config,
1548                ctx.config,
1549                ctx.project_info,
1550            );
1551            // Extract metrics here, we're about to drop the event/transaction.
1552            event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1553                &mut event,
1554                &mut extracted_metrics,
1555                ExtractMetricsContext {
1556                    dsc: managed_envelope.envelope().dsc(),
1557                    project_id,
1558                    ctx,
1559                    sampling_decision: SamplingDecision::Drop,
1560                    metrics_extracted: event_metrics_extracted.0,
1561                    spans_extracted: spans_extracted.0,
1562                },
1563            )?;
1564
1565            dynamic_sampling::drop_unsampled_items(
1566                managed_envelope,
1567                event,
1568                outcome,
1569                spans_extracted,
1570            );
1571
1572            // At this point we have:
1573            //  - An empty envelope.
1574            //  - An envelope containing only processed profiles.
1575            // We need to make sure there are enough quotas for these profiles.
1576            event = self
1577                .enforce_quotas(
1578                    managed_envelope,
1579                    Annotated::empty(),
1580                    &mut extracted_metrics,
1581                    ctx,
1582                )
1583                .await?;
1584
1585            return Ok(Some(extracted_metrics));
1586        }
1587
1588        let _post_ds = cogs.start_category("post_ds");
1589
1590        // Need to scrub the transaction before extracting spans.
1591        //
1592        // Unconditionally scrub to make sure PII is removed as early as possible.
1593        processing::utils::event::scrub(&mut event, ctx.project_info)?;
1594
1595        let attachments = managed_envelope
1596            .envelope_mut()
1597            .items_mut()
1598            .filter(|i| i.ty() == &ItemType::Attachment);
1599        processing::utils::attachments::scrub(attachments, ctx.project_info);
1600
1601        if_processing!(self.inner.config, {
1602            // Process profiles before extracting metrics, to make sure they are removed if they are invalid.
1603            let profile_id = profile::process(
1604                managed_envelope,
1605                &mut event,
1606                ctx.global_config,
1607                ctx.config,
1608                ctx.project_info,
1609            );
1610            processing::transactions::profile::transfer_id(&mut event, profile_id);
1611            processing::transactions::profile::scrub_profiler_id(&mut event);
1612
1613            // Always extract metrics in processing Relays for sampled items.
1614            event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1615                &mut event,
1616                &mut extracted_metrics,
1617                ExtractMetricsContext {
1618                    dsc: managed_envelope.envelope().dsc(),
1619                    project_id,
1620                    ctx,
1621                    sampling_decision: SamplingDecision::Keep,
1622                    metrics_extracted: event_metrics_extracted.0,
1623                    spans_extracted: spans_extracted.0,
1624                },
1625            )?;
1626
1627            if let Some(spans) = processing::transactions::spans::extract_from_event(
1628                managed_envelope.envelope().dsc(),
1629                &event,
1630                ctx.global_config,
1631                ctx.config,
1632                server_sample_rate,
1633                event_metrics_extracted,
1634                spans_extracted,
1635            ) {
1636                spans_extracted = SpansExtracted(true);
1637                for item in spans {
1638                    match item {
1639                        Ok(item) => managed_envelope.envelope_mut().add_item(item),
1640                        Err(()) => managed_envelope.track_outcome(
1641                            Outcome::Invalid(DiscardReason::InvalidSpan),
1642                            DataCategory::SpanIndexed,
1643                            1,
1644                        ),
1645                        // TODO: also `DataCategory::Span`?
1646                    }
1647                }
1648            }
1649        });
1650
1651        event = self
1652            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1653            .await?;
1654
1655        // Event may have been dropped because of a quota and the envelope can be empty.
1656        if event.value().is_some() {
1657            event::serialize(
1658                managed_envelope,
1659                &mut event,
1660                event_fully_normalized,
1661                event_metrics_extracted,
1662                spans_extracted,
1663            )?;
1664        }
1665
1666        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1667            relay_log::error!(
1668                tags.project = %project_id,
1669                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1670                "ingested event without normalizing"
1671            );
1672        };
1673
1674        Ok(Some(extracted_metrics))
1675    }
1676
1677    async fn process_profile_chunks(
1678        &self,
1679        managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
1680        ctx: processing::Context<'_>,
1681    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1682        profile_chunk::filter(managed_envelope, ctx.project_info);
1683
1684        if_processing!(self.inner.config, {
1685            profile_chunk::process(
1686                managed_envelope,
1687                ctx.project_info,
1688                ctx.global_config,
1689                ctx.config,
1690            );
1691        });
1692
1693        self.enforce_quotas(
1694            managed_envelope,
1695            Annotated::empty(),
1696            &mut ProcessingExtractedMetrics::new(),
1697            ctx,
1698        )
1699        .await?;
1700
1701        Ok(None)
1702    }
1703
1704    /// Processes standalone items that require an event ID, but do not have an event on the same envelope.
1705    async fn process_standalone(
1706        &self,
1707        managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1708        project_id: ProjectId,
1709        ctx: processing::Context<'_>,
1710    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1711        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1712
1713        standalone::process(managed_envelope);
1714
1715        profile::filter(
1716            managed_envelope,
1717            &Annotated::empty(),
1718            ctx.config,
1719            project_id,
1720            ctx.project_info,
1721        );
1722
1723        self.enforce_quotas(
1724            managed_envelope,
1725            Annotated::empty(),
1726            &mut extracted_metrics,
1727            ctx,
1728        )
1729        .await?;
1730
1731        report::process_user_reports(managed_envelope);
1732        let attachments = managed_envelope
1733            .envelope_mut()
1734            .items_mut()
1735            .filter(|i| i.ty() == &ItemType::Attachment);
1736        processing::utils::attachments::scrub(attachments, ctx.project_info);
1737
1738        Ok(Some(extracted_metrics))
1739    }
1740
1741    /// Processes user and client reports.
1742    async fn process_client_reports(
1743        &self,
1744        managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1745        ctx: processing::Context<'_>,
1746    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1747        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1748
1749        self.enforce_quotas(
1750            managed_envelope,
1751            Annotated::empty(),
1752            &mut extracted_metrics,
1753            ctx,
1754        )
1755        .await?;
1756
1757        report::process_client_reports(
1758            managed_envelope,
1759            ctx.config,
1760            ctx.project_info,
1761            self.inner.addrs.outcome_aggregator.clone(),
1762        );
1763
1764        Ok(Some(extracted_metrics))
1765    }
1766
1767    /// Processes replays.
1768    async fn process_replays(
1769        &self,
1770        managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1771        ctx: processing::Context<'_>,
1772    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1773        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1774
1775        replay::process(
1776            managed_envelope,
1777            ctx.global_config,
1778            ctx.config,
1779            ctx.project_info,
1780            &self.inner.geoip_lookup,
1781        )?;
1782
1783        self.enforce_quotas(
1784            managed_envelope,
1785            Annotated::empty(),
1786            &mut extracted_metrics,
1787            ctx,
1788        )
1789        .await?;
1790
1791        Ok(Some(extracted_metrics))
1792    }
1793
1794    async fn process_nel(
1795        &self,
1796        mut managed_envelope: ManagedEnvelope,
1797        ctx: processing::Context<'_>,
1798    ) -> Result<ProcessingResult, ProcessingError> {
1799        nel::convert_to_logs(&mut managed_envelope);
1800        self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1801            .await
1802    }
1803
1804    async fn process_with_processor<P: processing::Processor>(
1805        &self,
1806        processor: &P,
1807        mut managed_envelope: ManagedEnvelope,
1808        ctx: processing::Context<'_>,
1809    ) -> Result<ProcessingResult, ProcessingError>
1810    where
1811        Outputs: From<P::Output>,
1812    {
1813        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1814            debug_assert!(
1815                false,
1816                "there must be work for the {} processor",
1817                std::any::type_name::<P>(),
1818            );
1819            return Err(ProcessingError::ProcessingGroupMismatch);
1820        };
1821
1822        managed_envelope.update();
1823        match managed_envelope.envelope().is_empty() {
1824            true => managed_envelope.accept(),
1825            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1826        }
1827
1828        processor
1829            .process(work, ctx)
1830            .await
1831            .map_err(|err| {
1832                relay_log::debug!(
1833                    error = &err as &dyn std::error::Error,
1834                    "processing pipeline failed"
1835                );
1836                ProcessingError::ProcessingFailure
1837            })
1838            .map(|o| o.map(Into::into))
1839            .map(ProcessingResult::Output)
1840    }
1841
1842    /// Processes standalone spans.
1843    ///
1844    /// This function does *not* run for spans extracted from transactions.
1845    async fn process_standalone_spans(
1846        &self,
1847        managed_envelope: &mut TypedEnvelope<SpanGroup>,
1848        _project_id: ProjectId,
1849        ctx: processing::Context<'_>,
1850    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1851        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1852
1853        span::filter(managed_envelope, ctx.config, ctx.project_info);
1854        span::convert_otel_traces_data(managed_envelope);
1855
1856        if_processing!(self.inner.config, {
1857            span::process(
1858                managed_envelope,
1859                &mut Annotated::empty(),
1860                &mut extracted_metrics,
1861                _project_id,
1862                ctx,
1863                &self.inner.geoip_lookup,
1864            )
1865            .await;
1866        });
1867
1868        self.enforce_quotas(
1869            managed_envelope,
1870            Annotated::empty(),
1871            &mut extracted_metrics,
1872            ctx,
1873        )
1874        .await?;
1875
1876        Ok(Some(extracted_metrics))
1877    }
1878
1879    async fn process_envelope(
1880        &self,
1881        cogs: &mut Token,
1882        project_id: ProjectId,
1883        message: ProcessEnvelopeGrouped<'_>,
1884    ) -> Result<ProcessingResult, ProcessingError> {
1885        let ProcessEnvelopeGrouped {
1886            group,
1887            envelope: mut managed_envelope,
1888            ctx,
1889        } = message;
1890
1891        // Pre-process the envelope headers.
1892        if let Some(sampling_state) = ctx.sampling_project_info {
1893            // Both transactions and standalone span envelopes need a normalized DSC header
1894            // to make sampling rules based on the segment/transaction name work correctly.
1895            managed_envelope
1896                .envelope_mut()
1897                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1898        }
1899
1900        // Set the event retention. Effectively, this value will only be available in processing
1901        // mode when the full project config is queried from the upstream.
1902        if let Some(retention) = ctx.project_info.config.event_retention {
1903            managed_envelope.envelope_mut().set_retention(retention);
1904        }
1905
1906        // Set the event retention. Effectively, this value will only be available in processing
1907        // mode when the full project config is queried from the upstream.
1908        if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1909            managed_envelope
1910                .envelope_mut()
1911                .set_downsampled_retention(retention);
1912        }
1913
1914        // Ensure the project ID is updated to the stored instance for this project cache. This can
1915        // differ in two cases:
1916        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
1917        //  2. The DSN was moved and the envelope sent to the old project ID.
1918        managed_envelope
1919            .envelope_mut()
1920            .meta_mut()
1921            .set_project_id(project_id);
1922
1923        macro_rules! run {
1924            ($fn_name:ident $(, $args:expr)*) => {
1925                async {
1926                    let mut managed_envelope = (managed_envelope, group).try_into()?;
1927                    match self.$fn_name(&mut managed_envelope, $($args),*).await {
1928                        Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1929                            managed_envelope: managed_envelope.into_processed(),
1930                            extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1931                        }),
1932                        Err(error) => {
1933                            relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1934                            if let Some(outcome) = error.to_outcome() {
1935                                managed_envelope.reject(outcome);
1936                            }
1937
1938                            return Err(error);
1939                        }
1940                    }
1941                }.await
1942            };
1943        }
1944
1945        relay_log::trace!("Processing {group} group", group = group.variant());
1946
1947        match group {
1948            ProcessingGroup::Error => run!(process_errors, project_id, ctx),
1949            ProcessingGroup::Transaction => {
1950                run!(process_transactions, cogs, project_id, ctx)
1951            }
1952            ProcessingGroup::Session => {
1953                self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1954                    .await
1955            }
1956            ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx),
1957            ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
1958            ProcessingGroup::Replay => {
1959                run!(process_replays, ctx)
1960            }
1961            ProcessingGroup::CheckIn => {
1962                self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1963                    .await
1964            }
1965            ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1966            ProcessingGroup::Log => {
1967                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1968                    .await
1969            }
1970            ProcessingGroup::TraceMetric => {
1971                self.process_with_processor(
1972                    &self.inner.processing.trace_metrics,
1973                    managed_envelope,
1974                    ctx,
1975                )
1976                .await
1977            }
1978            ProcessingGroup::SpanV2 => {
1979                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1980                    .await
1981            }
1982            ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1983            ProcessingGroup::ProfileChunk => {
1984                run!(process_profile_chunks, ctx)
1985            }
1986            // Currently is not used.
1987            ProcessingGroup::Metrics => {
1988                // In proxy mode we simply forward the metrics.
1989                // This group shouldn't be used outside of proxy mode.
1990                if self.inner.config.relay_mode() != RelayMode::Proxy {
1991                    relay_log::error!(
1992                        tags.project = %project_id,
1993                        items = ?managed_envelope.envelope().items().next().map(Item::ty),
1994                        "received metrics in the process_state"
1995                    );
1996                }
1997
1998                Ok(ProcessingResult::no_metrics(
1999                    managed_envelope.into_processed(),
2000                ))
2001            }
2002            // Fallback to the legacy process_state implementation for Ungrouped events.
2003            ProcessingGroup::Ungrouped => {
2004                relay_log::error!(
2005                    tags.project = %project_id,
2006                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
2007                    "could not identify the processing group based on the envelope's items"
2008                );
2009
2010                Ok(ProcessingResult::no_metrics(
2011                    managed_envelope.into_processed(),
2012                ))
2013            }
2014            // Leave this group unchanged.
2015            //
2016            // This will later be forwarded to upstream.
2017            ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
2018                managed_envelope.into_processed(),
2019            )),
2020        }
2021    }
2022
2023    /// Processes the envelope and returns the processed envelope back.
2024    ///
2025    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
2026    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
2027    /// to be dropped, this is `None`.
2028    async fn process<'a>(
2029        &self,
2030        cogs: &mut Token,
2031        mut message: ProcessEnvelopeGrouped<'a>,
2032    ) -> Result<Option<Submit<'a>>, ProcessingError> {
2033        let ProcessEnvelopeGrouped {
2034            ref mut envelope,
2035            ctx,
2036            ..
2037        } = message;
2038
2039        // Prefer the project's project ID, and fall back to the stated project id from the
2040        // envelope. The project ID is available in all modes, other than in proxy mode, where
2041        // envelopes for unknown projects are forwarded blindly.
2042        //
2043        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
2044        // since we cannot process an envelope without project ID, so drop it.
2045        let Some(project_id) = ctx
2046            .project_info
2047            .project_id
2048            .or_else(|| envelope.envelope().meta().project_id())
2049        else {
2050            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2051            return Err(ProcessingError::MissingProjectId);
2052        };
2053
2054        let client = envelope.envelope().meta().client().map(str::to_owned);
2055        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2056        let project_key = envelope.envelope().meta().public_key();
2057        // Only allow sending to the sampling key, if we successfully loaded a sampling project
2058        // info relating to it. This filters out unknown/invalid project keys as well as project
2059        // keys from different organizations.
2060        let sampling_key = envelope
2061            .envelope()
2062            .sampling_key()
2063            .filter(|_| ctx.sampling_project_info.is_some());
2064
2065        // We set additional information on the scope, which will be removed after processing the
2066        // envelope.
2067        relay_log::configure_scope(|scope| {
2068            scope.set_tag("project", project_id);
2069            if let Some(client) = client {
2070                scope.set_tag("sdk", client);
2071            }
2072            if let Some(user_agent) = user_agent {
2073                scope.set_extra("user_agent", user_agent.into());
2074            }
2075        });
2076
2077        let result = match self.process_envelope(cogs, project_id, message).await {
2078            Ok(ProcessingResult::Envelope {
2079                mut managed_envelope,
2080                extracted_metrics,
2081            }) => {
2082                // The envelope could be modified or even emptied during processing, which
2083                // requires re-computation of the context.
2084                managed_envelope.update();
2085
2086                let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2087                send_metrics(
2088                    extracted_metrics.metrics,
2089                    project_key,
2090                    sampling_key,
2091                    &self.inner.addrs.aggregator,
2092                );
2093
2094                let envelope_response = if managed_envelope.envelope().is_empty() {
2095                    if !has_metrics {
2096                        // Individual rate limits have already been issued
2097                        managed_envelope.reject(Outcome::RateLimited(None));
2098                    } else {
2099                        managed_envelope.accept();
2100                    }
2101
2102                    None
2103                } else {
2104                    Some(managed_envelope)
2105                };
2106
2107                Ok(envelope_response.map(Submit::Envelope))
2108            }
2109            Ok(ProcessingResult::Output(Output { main, metrics })) => {
2110                if let Some(metrics) = metrics {
2111                    metrics.accept(|metrics| {
2112                        send_metrics(
2113                            metrics,
2114                            project_key,
2115                            sampling_key,
2116                            &self.inner.addrs.aggregator,
2117                        );
2118                    });
2119                }
2120
2121                let ctx = ctx.to_forward();
2122                Ok(main.map(|output| Submit::Output { output, ctx }))
2123            }
2124            Err(err) => Err(err),
2125        };
2126
2127        relay_log::configure_scope(|scope| {
2128            scope.remove_tag("project");
2129            scope.remove_tag("sdk");
2130            scope.remove_tag("user_agent");
2131        });
2132
2133        result
2134    }
2135
2136    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2137        let project_key = message.envelope.envelope().meta().public_key();
2138        let wait_time = message.envelope.age();
2139        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2140
2141        // This COGS handling may need an overhaul in the future:
2142        // Cancel the passed in token, to start individual measurements per envelope instead.
2143        cogs.cancel();
2144
2145        let scoping = message.envelope.scoping();
2146        for (group, envelope) in ProcessingGroup::split_envelope(
2147            *message.envelope.into_envelope(),
2148            &message.project_info,
2149        ) {
2150            let mut cogs = self
2151                .inner
2152                .cogs
2153                .timed(ResourceId::Relay, AppFeature::from(group));
2154
2155            let mut envelope =
2156                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2157            envelope.scope(scoping);
2158
2159            let global_config = self.inner.global_config.current();
2160
2161            let ctx = processing::Context {
2162                config: &self.inner.config,
2163                global_config: &global_config,
2164                project_info: &message.project_info,
2165                sampling_project_info: message.sampling_project_info.as_deref(),
2166                rate_limits: &message.rate_limits,
2167                reservoir_counters: &message.reservoir_counters,
2168            };
2169
2170            let message = ProcessEnvelopeGrouped {
2171                group,
2172                envelope,
2173                ctx,
2174            };
2175
2176            let result = metric!(
2177                timer(RelayTimers::EnvelopeProcessingTime),
2178                group = group.variant(),
2179                { self.process(&mut cogs, message).await }
2180            );
2181
2182            match result {
2183                Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2184                Ok(None) => {}
2185                Err(error) if error.is_unexpected() => {
2186                    relay_log::error!(
2187                        tags.project_key = %project_key,
2188                        error = &error as &dyn Error,
2189                        "error processing envelope"
2190                    )
2191                }
2192                Err(error) => {
2193                    relay_log::debug!(
2194                        tags.project_key = %project_key,
2195                        error = &error as &dyn Error,
2196                        "error processing envelope"
2197                    )
2198                }
2199            }
2200        }
2201    }
2202
2203    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2204        let ProcessMetrics {
2205            data,
2206            project_key,
2207            received_at,
2208            sent_at,
2209            source,
2210        } = message;
2211
2212        let received_timestamp =
2213            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2214
2215        let mut buckets = data.into_buckets(received_timestamp);
2216        if buckets.is_empty() {
2217            return;
2218        };
2219        cogs.update(relay_metrics::cogs::BySize(&buckets));
2220
2221        let clock_drift_processor =
2222            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2223
2224        buckets.retain_mut(|bucket| {
2225            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2226                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2227                return false;
2228            }
2229
2230            if !self::metrics::is_valid_namespace(bucket) {
2231                return false;
2232            }
2233
2234            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2235
2236            if !matches!(source, BucketSource::Internal) {
2237                bucket.metadata = BucketMetadata::new(received_timestamp);
2238            }
2239
2240            true
2241        });
2242
2243        let project = self.inner.project_cache.get(project_key);
2244
2245        // Best effort check to filter and rate limit buckets, if there is no project state
2246        // available at the current time, we will check again after flushing.
2247        let buckets = match project.state() {
2248            ProjectState::Enabled(project_info) => {
2249                let rate_limits = project.rate_limits().current_limits();
2250                self.check_buckets(project_key, project_info, &rate_limits, buckets)
2251            }
2252            _ => buckets,
2253        };
2254
2255        relay_log::trace!("merging metric buckets into the aggregator");
2256        self.inner
2257            .addrs
2258            .aggregator
2259            .send(MergeBuckets::new(project_key, buckets));
2260    }
2261
2262    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2263        let ProcessBatchedMetrics {
2264            payload,
2265            source,
2266            received_at,
2267            sent_at,
2268        } = message;
2269
2270        #[derive(serde::Deserialize)]
2271        struct Wrapper {
2272            buckets: HashMap<ProjectKey, Vec<Bucket>>,
2273        }
2274
2275        let buckets = match serde_json::from_slice(&payload) {
2276            Ok(Wrapper { buckets }) => buckets,
2277            Err(error) => {
2278                relay_log::debug!(
2279                    error = &error as &dyn Error,
2280                    "failed to parse batched metrics",
2281                );
2282                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2283                return;
2284            }
2285        };
2286
2287        for (project_key, buckets) in buckets {
2288            self.handle_process_metrics(
2289                cogs,
2290                ProcessMetrics {
2291                    data: MetricData::Parsed(buckets),
2292                    project_key,
2293                    source,
2294                    received_at,
2295                    sent_at,
2296                },
2297            )
2298        }
2299    }
2300
2301    fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2302        let _submit = cogs.start_category("submit");
2303
2304        #[cfg(feature = "processing")]
2305        if self.inner.config.processing_enabled()
2306            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2307        {
2308            use crate::processing::StoreHandle;
2309
2310            let upload = self.inner.addrs.upload.as_ref();
2311            match submit {
2312                Submit::Envelope(envelope) => {
2313                    let envelope_has_attachments = envelope
2314                        .envelope()
2315                        .items()
2316                        .any(|item| *item.ty() == ItemType::Attachment);
2317                    // Whether Relay will store this attachment in objectstore or use kafka like before.
2318                    let use_objectstore = || {
2319                        let options = &self.inner.global_config.current().options;
2320                        utils::sample(options.objectstore_attachments_sample_rate).is_keep()
2321                    };
2322
2323                    if let Some(upload) = &self.inner.addrs.upload
2324                        && envelope_has_attachments
2325                        && use_objectstore()
2326                    {
2327                        // the `UploadService` will upload all attachments, and then forward the envelope to the `StoreService`.
2328                        upload.send(StoreEnvelope { envelope })
2329                    } else {
2330                        store_forwarder.send(StoreEnvelope { envelope })
2331                    }
2332                }
2333                Submit::Output { output, ctx } => output
2334                    .forward_store(StoreHandle::new(store_forwarder, upload), ctx)
2335                    .unwrap_or_else(|err| err.into_inner()),
2336            }
2337            return;
2338        }
2339
2340        let mut envelope = match submit {
2341            Submit::Envelope(envelope) => envelope,
2342            Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2343                Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2344                Err(_) => {
2345                    relay_log::error!("failed to serialize output to an envelope");
2346                    return;
2347                }
2348            },
2349        };
2350
2351        if envelope.envelope_mut().is_empty() {
2352            envelope.accept();
2353            return;
2354        }
2355
2356        // Override the `sent_at` timestamp. Since the envelope went through basic
2357        // normalization, all timestamps have been corrected. We propagate the new
2358        // `sent_at` to allow the next Relay to double-check this timestamp and
2359        // potentially apply correction again. This is done as close to sending as
2360        // possible so that we avoid internal delays.
2361        envelope.envelope_mut().set_sent_at(Utc::now());
2362
2363        relay_log::trace!("sending envelope to sentry endpoint");
2364        let http_encoding = self.inner.config.http_encoding();
2365        let result = envelope.envelope().to_vec().and_then(|v| {
2366            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2367        });
2368
2369        match result {
2370            Ok(body) => {
2371                self.inner
2372                    .addrs
2373                    .upstream_relay
2374                    .send(SendRequest(SendEnvelope {
2375                        envelope,
2376                        body,
2377                        http_encoding,
2378                        project_cache: self.inner.project_cache.clone(),
2379                    }));
2380            }
2381            Err(error) => {
2382                // Errors are only logged for what we consider an internal discard reason. These
2383                // indicate errors in the infrastructure or implementation bugs.
2384                relay_log::error!(
2385                    error = &error as &dyn Error,
2386                    tags.project_key = %envelope.scoping().project_key,
2387                    "failed to serialize envelope payload"
2388                );
2389
2390                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2391            }
2392        }
2393    }
2394
2395    fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2396        let SubmitClientReports {
2397            client_reports,
2398            scoping,
2399        } = message;
2400
2401        let upstream = self.inner.config.upstream_descriptor();
2402        let dsn = PartialDsn::outbound(&scoping, upstream);
2403
2404        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2405        for client_report in client_reports {
2406            match client_report.serialize() {
2407                Ok(payload) => {
2408                    let mut item = Item::new(ItemType::ClientReport);
2409                    item.set_payload(ContentType::Json, payload);
2410                    envelope.add_item(item);
2411                }
2412                Err(error) => {
2413                    relay_log::error!(
2414                        error = &error as &dyn std::error::Error,
2415                        "failed to serialize client report"
2416                    );
2417                }
2418            }
2419        }
2420
2421        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2422        self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2423    }
2424
2425    fn check_buckets(
2426        &self,
2427        project_key: ProjectKey,
2428        project_info: &ProjectInfo,
2429        rate_limits: &RateLimits,
2430        buckets: Vec<Bucket>,
2431    ) -> Vec<Bucket> {
2432        let Some(scoping) = project_info.scoping(project_key) else {
2433            relay_log::error!(
2434                tags.project_key = project_key.as_str(),
2435                "there is no scoping: dropping {} buckets",
2436                buckets.len(),
2437            );
2438            return Vec::new();
2439        };
2440
2441        let mut buckets = self::metrics::apply_project_info(
2442            buckets,
2443            &self.inner.metric_outcomes,
2444            project_info,
2445            scoping,
2446        );
2447
2448        let namespaces: BTreeSet<MetricNamespace> = buckets
2449            .iter()
2450            .filter_map(|bucket| bucket.name.try_namespace())
2451            .collect();
2452
2453        for namespace in namespaces {
2454            let limits = rate_limits.check_with_quotas(
2455                project_info.get_quotas(),
2456                scoping.item(DataCategory::MetricBucket),
2457            );
2458
2459            if limits.is_limited() {
2460                let rejected;
2461                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2462                    bucket.name.try_namespace() == Some(namespace)
2463                });
2464
2465                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2466                self.inner.metric_outcomes.track(
2467                    scoping,
2468                    &rejected,
2469                    Outcome::RateLimited(reason_code),
2470                );
2471            }
2472        }
2473
2474        let quotas = project_info.config.quotas.clone();
2475        match MetricsLimiter::create(buckets, quotas, scoping) {
2476            Ok(mut bucket_limiter) => {
2477                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2478                bucket_limiter.into_buckets()
2479            }
2480            Err(buckets) => buckets,
2481        }
2482    }
2483
2484    #[cfg(feature = "processing")]
2485    async fn rate_limit_buckets(
2486        &self,
2487        scoping: Scoping,
2488        project_info: &ProjectInfo,
2489        mut buckets: Vec<Bucket>,
2490    ) -> Vec<Bucket> {
2491        let Some(rate_limiter) = &self.inner.rate_limiter else {
2492            return buckets;
2493        };
2494
2495        let global_config = self.inner.global_config.current();
2496        let namespaces = buckets
2497            .iter()
2498            .filter_map(|bucket| bucket.name.try_namespace())
2499            .counts();
2500
2501        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2502
2503        for (namespace, quantity) in namespaces {
2504            let item_scoping = scoping.metric_bucket(namespace);
2505
2506            let limits = match rate_limiter
2507                .is_rate_limited(quotas, item_scoping, quantity, false)
2508                .await
2509            {
2510                Ok(limits) => limits,
2511                Err(err) => {
2512                    relay_log::error!(
2513                        error = &err as &dyn std::error::Error,
2514                        "failed to check redis rate limits"
2515                    );
2516                    break;
2517                }
2518            };
2519
2520            if limits.is_limited() {
2521                let rejected;
2522                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2523                    bucket.name.try_namespace() == Some(namespace)
2524                });
2525
2526                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2527                self.inner.metric_outcomes.track(
2528                    scoping,
2529                    &rejected,
2530                    Outcome::RateLimited(reason_code),
2531                );
2532
2533                self.inner
2534                    .project_cache
2535                    .get(item_scoping.scoping.project_key)
2536                    .rate_limits()
2537                    .merge(limits);
2538            }
2539        }
2540
2541        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2542            Err(buckets) => buckets,
2543            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2544        }
2545    }
2546
2547    /// Check and apply rate limits to metrics buckets for transactions and spans.
2548    #[cfg(feature = "processing")]
2549    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2550        relay_log::trace!("handle_rate_limit_buckets");
2551
2552        let scoping = *bucket_limiter.scoping();
2553
2554        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2555            let global_config = self.inner.global_config.current();
2556            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2557
2558            // We set over_accept_once such that the limit is actually reached, which allows subsequent
2559            // calls with quantity=0 to be rate limited.
2560            let over_accept_once = true;
2561            let mut rate_limits = RateLimits::new();
2562
2563            for category in [DataCategory::Transaction, DataCategory::Span] {
2564                let count = bucket_limiter.count(category);
2565
2566                let timer = Instant::now();
2567                let mut is_limited = false;
2568
2569                if let Some(count) = count {
2570                    match rate_limiter
2571                        .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2572                        .await
2573                    {
2574                        Ok(limits) => {
2575                            is_limited = limits.is_limited();
2576                            rate_limits.merge(limits)
2577                        }
2578                        Err(e) => relay_log::error!(error = &e as &dyn Error),
2579                    }
2580                }
2581
2582                relay_statsd::metric!(
2583                    timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2584                    category = category.name(),
2585                    limited = if is_limited { "true" } else { "false" },
2586                    count = match count {
2587                        None => "none",
2588                        Some(0) => "0",
2589                        Some(1) => "1",
2590                        Some(1..=10) => "10",
2591                        Some(1..=25) => "25",
2592                        Some(1..=50) => "50",
2593                        Some(51..=100) => "100",
2594                        Some(101..=500) => "500",
2595                        _ => "> 500",
2596                    },
2597                );
2598            }
2599
2600            if rate_limits.is_limited() {
2601                let was_enforced =
2602                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2603
2604                if was_enforced {
2605                    // Update the rate limits in the project cache.
2606                    self.inner
2607                        .project_cache
2608                        .get(scoping.project_key)
2609                        .rate_limits()
2610                        .merge(rate_limits);
2611                }
2612            }
2613        }
2614
2615        bucket_limiter.into_buckets()
2616    }
2617
2618    /// Cardinality limits the passed buckets and returns a filtered vector of only accepted buckets.
2619    #[cfg(feature = "processing")]
2620    async fn cardinality_limit_buckets(
2621        &self,
2622        scoping: Scoping,
2623        limits: &[CardinalityLimit],
2624        buckets: Vec<Bucket>,
2625    ) -> Vec<Bucket> {
2626        let global_config = self.inner.global_config.current();
2627        let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2628
2629        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2630            return buckets;
2631        }
2632
2633        let Some(ref limiter) = self.inner.cardinality_limiter else {
2634            return buckets;
2635        };
2636
2637        let scope = relay_cardinality::Scoping {
2638            organization_id: scoping.organization_id,
2639            project_id: scoping.project_id,
2640        };
2641
2642        let limits = match limiter
2643            .check_cardinality_limits(scope, limits, buckets)
2644            .await
2645        {
2646            Ok(limits) => limits,
2647            Err((buckets, error)) => {
2648                relay_log::error!(
2649                    error = &error as &dyn std::error::Error,
2650                    "cardinality limiter failed"
2651                );
2652                return buckets;
2653            }
2654        };
2655
2656        let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2657        if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2658            for limit in limits.exceeded_limits() {
2659                relay_log::with_scope(
2660                    |scope| {
2661                        // Set the organization as user so we can alert on distinct org_ids.
2662                        scope.set_user(Some(relay_log::sentry::User {
2663                            id: Some(scoping.organization_id.to_string()),
2664                            ..Default::default()
2665                        }));
2666                    },
2667                    || {
2668                        relay_log::error!(
2669                            tags.organization_id = scoping.organization_id.value(),
2670                            tags.limit_id = limit.id,
2671                            tags.passive = limit.passive,
2672                            "Cardinality Limit"
2673                        );
2674                    },
2675                );
2676            }
2677        }
2678
2679        for (limit, reports) in limits.cardinality_reports() {
2680            for report in reports {
2681                self.inner
2682                    .metric_outcomes
2683                    .cardinality(scoping, limit, report);
2684            }
2685        }
2686
2687        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2688            return limits.into_source();
2689        }
2690
2691        let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2692
2693        for (bucket, exceeded) in rejected {
2694            self.inner.metric_outcomes.track(
2695                scoping,
2696                &[bucket],
2697                Outcome::CardinalityLimited(exceeded.id.clone()),
2698            );
2699        }
2700        accepted
2701    }
2702
2703    /// Processes metric buckets and sends them to kafka.
2704    ///
2705    /// This function runs the following steps:
2706    ///  - cardinality limiting
2707    ///  - rate limiting
2708    ///  - submit to `StoreForwarder`
2709    #[cfg(feature = "processing")]
2710    async fn encode_metrics_processing(
2711        &self,
2712        message: FlushBuckets,
2713        store_forwarder: &Addr<Store>,
2714    ) {
2715        use crate::constants::DEFAULT_EVENT_RETENTION;
2716        use crate::services::store::StoreMetrics;
2717
2718        for ProjectBuckets {
2719            buckets,
2720            scoping,
2721            project_info,
2722            ..
2723        } in message.buckets.into_values()
2724        {
2725            let buckets = self
2726                .rate_limit_buckets(scoping, &project_info, buckets)
2727                .await;
2728
2729            let limits = project_info.get_cardinality_limits();
2730            let buckets = self
2731                .cardinality_limit_buckets(scoping, limits, buckets)
2732                .await;
2733
2734            if buckets.is_empty() {
2735                continue;
2736            }
2737
2738            let retention = project_info
2739                .config
2740                .event_retention
2741                .unwrap_or(DEFAULT_EVENT_RETENTION);
2742
2743            // The store forwarder takes care of bucket splitting internally, so we can submit the
2744            // entire list of buckets. There is no batching needed here.
2745            store_forwarder.send(StoreMetrics {
2746                buckets,
2747                scoping,
2748                retention,
2749            });
2750        }
2751    }
2752
2753    /// Serializes metric buckets to JSON and sends them to the upstream.
2754    ///
2755    /// This function runs the following steps:
2756    ///  - partitioning
2757    ///  - batching by configured size limit
2758    ///  - serialize to JSON and pack in an envelope
2759    ///  - submit the envelope to upstream or kafka depending on configuration
2760    ///
2761    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2762    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2763    /// already.
2764    fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2765        let FlushBuckets {
2766            partition_key,
2767            buckets,
2768        } = message;
2769
2770        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2771        let upstream = self.inner.config.upstream_descriptor();
2772
2773        for ProjectBuckets {
2774            buckets, scoping, ..
2775        } in buckets.values()
2776        {
2777            let dsn = PartialDsn::outbound(scoping, upstream);
2778
2779            relay_statsd::metric!(
2780                distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2781            );
2782
2783            let mut num_batches = 0;
2784            for batch in BucketsView::from(buckets).by_size(batch_size) {
2785                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2786
2787                let mut item = Item::new(ItemType::MetricBuckets);
2788                item.set_source_quantities(crate::metrics::extract_quantities(batch));
2789                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2790                envelope.add_item(item);
2791
2792                let mut envelope =
2793                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2794                envelope
2795                    .set_partition_key(Some(partition_key))
2796                    .scope(*scoping);
2797
2798                relay_statsd::metric!(
2799                    distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2800                );
2801
2802                self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2803                num_batches += 1;
2804            }
2805
2806            relay_statsd::metric!(
2807                distribution(RelayDistributions::BatchesPerPartition) = num_batches
2808            );
2809        }
2810    }
2811
2812    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
2813    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2814        if partition.is_empty() {
2815            return;
2816        }
2817
2818        let (unencoded, project_info) = partition.take();
2819        let http_encoding = self.inner.config.http_encoding();
2820        let encoded = match encode_payload(&unencoded, http_encoding) {
2821            Ok(payload) => payload,
2822            Err(error) => {
2823                let error = &error as &dyn std::error::Error;
2824                relay_log::error!(error, "failed to encode metrics payload");
2825                return;
2826            }
2827        };
2828
2829        let request = SendMetricsRequest {
2830            partition_key: partition_key.to_string(),
2831            unencoded,
2832            encoded,
2833            project_info,
2834            http_encoding,
2835            metric_outcomes: self.inner.metric_outcomes.clone(),
2836        };
2837
2838        self.inner.addrs.upstream_relay.send(SendRequest(request));
2839    }
2840
2841    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
2842    ///
2843    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
2844    /// payload directly instead of per-project Envelopes.
2845    ///
2846    /// This function runs the following steps:
2847    ///  - partitioning
2848    ///  - batching by configured size limit
2849    ///  - serialize to JSON
2850    ///  - submit directly to the upstream
2851    ///
2852    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2853    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2854    /// already.
2855    fn encode_metrics_global(&self, message: FlushBuckets) {
2856        let FlushBuckets {
2857            partition_key,
2858            buckets,
2859        } = message;
2860
2861        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2862        let mut partition = Partition::new(batch_size);
2863        let mut partition_splits = 0;
2864
2865        for ProjectBuckets {
2866            buckets, scoping, ..
2867        } in buckets.values()
2868        {
2869            for bucket in buckets {
2870                let mut remaining = Some(BucketView::new(bucket));
2871
2872                while let Some(bucket) = remaining.take() {
2873                    if let Some(next) = partition.insert(bucket, *scoping) {
2874                        // A part of the bucket could not be inserted. Take the partition and submit
2875                        // it immediately. Repeat until the final part was inserted. This should
2876                        // always result in a request, otherwise we would enter an endless loop.
2877                        self.send_global_partition(partition_key, &mut partition);
2878                        remaining = Some(next);
2879                        partition_splits += 1;
2880                    }
2881                }
2882            }
2883        }
2884
2885        if partition_splits > 0 {
2886            metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2887        }
2888
2889        self.send_global_partition(partition_key, &mut partition);
2890    }
2891
2892    async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2893        for (project_key, pb) in message.buckets.iter_mut() {
2894            let buckets = std::mem::take(&mut pb.buckets);
2895            pb.buckets =
2896                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2897        }
2898
2899        #[cfg(feature = "processing")]
2900        if self.inner.config.processing_enabled()
2901            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2902        {
2903            return self
2904                .encode_metrics_processing(message, store_forwarder)
2905                .await;
2906        }
2907
2908        if self.inner.config.http_global_metrics() {
2909            self.encode_metrics_global(message)
2910        } else {
2911            self.encode_metrics_envelope(cogs, message)
2912        }
2913    }
2914
2915    #[cfg(all(test, feature = "processing"))]
2916    fn redis_rate_limiter_enabled(&self) -> bool {
2917        self.inner.rate_limiter.is_some()
2918    }
2919
2920    async fn handle_message(&self, message: EnvelopeProcessor) {
2921        let ty = message.variant();
2922        let feature_weights = self.feature_weights(&message);
2923
2924        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2925            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2926
2927            match message {
2928                EnvelopeProcessor::ProcessEnvelope(m) => {
2929                    self.handle_process_envelope(&mut cogs, *m).await
2930                }
2931                EnvelopeProcessor::ProcessProjectMetrics(m) => {
2932                    self.handle_process_metrics(&mut cogs, *m)
2933                }
2934                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2935                    self.handle_process_batched_metrics(&mut cogs, *m)
2936                }
2937                EnvelopeProcessor::FlushBuckets(m) => {
2938                    self.handle_flush_buckets(&mut cogs, *m).await
2939                }
2940                EnvelopeProcessor::SubmitClientReports(m) => {
2941                    self.handle_submit_client_reports(&mut cogs, *m)
2942                }
2943            }
2944        });
2945    }
2946
2947    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2948        match message {
2949            // Envelope is split later and tokens are attributed then.
2950            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2951            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2952            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2953            EnvelopeProcessor::FlushBuckets(v) => v
2954                .buckets
2955                .values()
2956                .map(|s| {
2957                    if self.inner.config.processing_enabled() {
2958                        // Processing does not encode the metrics but instead rate and cardinality
2959                        // limits the metrics, which scales by count and not size.
2960                        relay_metrics::cogs::ByCount(&s.buckets).into()
2961                    } else {
2962                        relay_metrics::cogs::BySize(&s.buckets).into()
2963                    }
2964                })
2965                .fold(FeatureWeights::none(), FeatureWeights::merge),
2966            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2967        }
2968    }
2969}
2970
2971impl Service for EnvelopeProcessorService {
2972    type Interface = EnvelopeProcessor;
2973
2974    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2975        while let Some(message) = rx.recv().await {
2976            let service = self.clone();
2977            self.inner
2978                .pool
2979                .spawn_async(
2980                    async move {
2981                        service.handle_message(message).await;
2982                    }
2983                    .boxed(),
2984                )
2985                .await;
2986        }
2987    }
2988}
2989
2990/// Result of the enforcement of rate limiting.
2991///
2992/// If the event is already `None` or it's rate limited, it will be `None`
2993/// within the [`Annotated`].
2994struct EnforcementResult {
2995    event: Annotated<Event>,
2996    #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2997    rate_limits: RateLimits,
2998}
2999
3000impl EnforcementResult {
3001    /// Creates a new [`EnforcementResult`].
3002    pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
3003        Self { event, rate_limits }
3004    }
3005}
3006
3007#[derive(Clone)]
3008enum RateLimiter {
3009    Cached,
3010    #[cfg(feature = "processing")]
3011    Consistent(Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>),
3012}
3013
3014impl RateLimiter {
3015    async fn enforce<Group>(
3016        &self,
3017        managed_envelope: &mut TypedEnvelope<Group>,
3018        event: Annotated<Event>,
3019        _extracted_metrics: &mut ProcessingExtractedMetrics,
3020        ctx: processing::Context<'_>,
3021    ) -> Result<EnforcementResult, ProcessingError> {
3022        if managed_envelope.envelope().is_empty() && event.value().is_none() {
3023            return Ok(EnforcementResult::new(event, RateLimits::default()));
3024        }
3025
3026        let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
3027        if quotas.is_empty() {
3028            return Ok(EnforcementResult::new(event, RateLimits::default()));
3029        }
3030
3031        let event_category = event_category(&event);
3032
3033        // We extract the rate limiters, in case we perform consistent rate limiting, since we will
3034        // need Redis access.
3035        //
3036        // When invoking the rate limiter, capture if the event item has been rate limited to also
3037        // remove it from the processing state eventually.
3038        let this = self.clone();
3039        let mut envelope_limiter =
3040            EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
3041                let this = this.clone();
3042
3043                async move {
3044                    match this {
3045                        #[cfg(feature = "processing")]
3046                        RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
3047                            rate_limiter
3048                                .is_rate_limited(quotas, item_scope, _quantity, false)
3049                                .await?,
3050                        ),
3051                        _ => Ok::<_, ProcessingError>(
3052                            ctx.rate_limits.check_with_quotas(quotas, item_scope),
3053                        ),
3054                    }
3055                }
3056            });
3057
3058        // Tell the envelope limiter about the event, since it has been removed from the Envelope at
3059        // this stage in processing.
3060        if let Some(category) = event_category {
3061            envelope_limiter.assume_event(category);
3062        }
3063
3064        let scoping = managed_envelope.scoping();
3065        let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
3066            envelope_limiter
3067                .compute(managed_envelope.envelope_mut(), &scoping)
3068                .await
3069        })?;
3070        let event_active = enforcement.is_event_active();
3071
3072        // Use the same rate limits as used for the envelope on the metrics.
3073        // Those rate limits should not be checked for expiry or similar to ensure a consistent
3074        // limiting of envelope items and metrics.
3075        #[cfg(feature = "processing")]
3076        _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3077        enforcement.apply_with_outcomes(managed_envelope);
3078
3079        if event_active {
3080            debug_assert!(managed_envelope.envelope().is_empty());
3081            return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3082        }
3083
3084        Ok(EnforcementResult::new(event, rate_limits))
3085    }
3086
3087    fn name(&self) -> &'static str {
3088        match self {
3089            Self::Cached => "cached",
3090            #[cfg(feature = "processing")]
3091            Self::Consistent(_) => "consistent",
3092        }
3093    }
3094}
3095
3096pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3097    let envelope_body: Vec<u8> = match http_encoding {
3098        HttpEncoding::Identity => return Ok(body.clone()),
3099        HttpEncoding::Deflate => {
3100            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3101            encoder.write_all(body.as_ref())?;
3102            encoder.finish()?
3103        }
3104        HttpEncoding::Gzip => {
3105            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3106            encoder.write_all(body.as_ref())?;
3107            encoder.finish()?
3108        }
3109        HttpEncoding::Br => {
3110            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
3111            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3112            encoder.write_all(body.as_ref())?;
3113            encoder.into_inner()
3114        }
3115        HttpEncoding::Zstd => {
3116            // Use the fastest compression level, our main objective here is to get the best
3117            // compression ratio for least amount of time spent.
3118            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3119            encoder.write_all(body.as_ref())?;
3120            encoder.finish()?
3121        }
3122    };
3123
3124    Ok(envelope_body.into())
3125}
3126
3127/// An upstream request that submits an envelope via HTTP.
3128#[derive(Debug)]
3129pub struct SendEnvelope {
3130    pub envelope: TypedEnvelope<Processed>,
3131    pub body: Bytes,
3132    pub http_encoding: HttpEncoding,
3133    pub project_cache: ProjectCacheHandle,
3134}
3135
3136impl UpstreamRequest for SendEnvelope {
3137    fn method(&self) -> reqwest::Method {
3138        reqwest::Method::POST
3139    }
3140
3141    fn path(&self) -> Cow<'_, str> {
3142        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3143    }
3144
3145    fn route(&self) -> &'static str {
3146        "envelope"
3147    }
3148
3149    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3150        let envelope_body = self.body.clone();
3151        metric!(
3152            distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
3153        );
3154
3155        let meta = &self.envelope.meta();
3156        let shard = self.envelope.partition_key().map(|p| p.to_string());
3157        builder
3158            .content_encoding(self.http_encoding)
3159            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3160            .header_opt("User-Agent", meta.user_agent())
3161            .header("X-Sentry-Auth", meta.auth_header())
3162            .header("X-Forwarded-For", meta.forwarded_for())
3163            .header("Content-Type", envelope::CONTENT_TYPE)
3164            .header_opt("X-Sentry-Relay-Shard", shard)
3165            .body(envelope_body);
3166
3167        Ok(())
3168    }
3169
3170    fn sign(&mut self) -> Option<Sign> {
3171        Some(Sign::Optional(SignatureType::RequestSign))
3172    }
3173
3174    fn respond(
3175        self: Box<Self>,
3176        result: Result<http::Response, UpstreamRequestError>,
3177    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3178        Box::pin(async move {
3179            let result = match result {
3180                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3181                Err(error) => Err(error),
3182            };
3183
3184            match result {
3185                Ok(()) => self.envelope.accept(),
3186                Err(error) if error.is_received() => {
3187                    let scoping = self.envelope.scoping();
3188                    self.envelope.accept();
3189
3190                    if let UpstreamRequestError::RateLimited(limits) = error {
3191                        self.project_cache
3192                            .get(scoping.project_key)
3193                            .rate_limits()
3194                            .merge(limits.scope(&scoping));
3195                    }
3196                }
3197                Err(error) => {
3198                    // Errors are only logged for what we consider an internal discard reason. These
3199                    // indicate errors in the infrastructure or implementation bugs.
3200                    let mut envelope = self.envelope;
3201                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3202                    relay_log::error!(
3203                        error = &error as &dyn Error,
3204                        tags.project_key = %envelope.scoping().project_key,
3205                        "error sending envelope"
3206                    );
3207                }
3208            }
3209        })
3210    }
3211}
3212
3213/// A container for metric buckets from multiple projects.
3214///
3215/// This container is used to send metrics to the upstream in global batches as part of the
3216/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
3217/// the size of all metrics and allows to split them into multiple batches. See
3218/// [`insert`](Self::insert) for more information.
3219#[derive(Debug)]
3220struct Partition<'a> {
3221    max_size: usize,
3222    remaining: usize,
3223    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3224    project_info: HashMap<ProjectKey, Scoping>,
3225}
3226
3227impl<'a> Partition<'a> {
3228    /// Creates a new partition with the given maximum size in bytes.
3229    pub fn new(size: usize) -> Self {
3230        Self {
3231            max_size: size,
3232            remaining: size,
3233            views: HashMap::new(),
3234            project_info: HashMap::new(),
3235        }
3236    }
3237
3238    /// Inserts a bucket into the partition, splitting it if necessary.
3239    ///
3240    /// This function attempts to add the bucket to this partition. If the bucket does not fit
3241    /// entirely into the partition given its maximum size, the remaining part of the bucket is
3242    /// returned from this function call.
3243    ///
3244    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
3245    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
3246    /// partition. Afterwards, the caller is responsible to call this function again with the
3247    /// remaining bucket until it is fully inserted.
3248    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3249        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3250
3251        if let Some(current) = current {
3252            self.remaining = self.remaining.saturating_sub(current.estimated_size());
3253            self.views
3254                .entry(scoping.project_key)
3255                .or_default()
3256                .push(current);
3257
3258            self.project_info
3259                .entry(scoping.project_key)
3260                .or_insert(scoping);
3261        }
3262
3263        next
3264    }
3265
3266    /// Returns `true` if the partition does not hold any data.
3267    fn is_empty(&self) -> bool {
3268        self.views.is_empty()
3269    }
3270
3271    /// Returns the serialized buckets for this partition.
3272    ///
3273    /// This empties the partition, so that it can be reused.
3274    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3275        #[derive(serde::Serialize)]
3276        struct Wrapper<'a> {
3277            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3278        }
3279
3280        let buckets = &self.views;
3281        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3282
3283        let scopings = self.project_info.clone();
3284        self.project_info.clear();
3285
3286        self.views.clear();
3287        self.remaining = self.max_size;
3288
3289        (payload, scopings)
3290    }
3291}
3292
3293/// An upstream request that submits metric buckets via HTTP.
3294///
3295/// This request is not awaited. It automatically tracks outcomes if the request is not received.
3296#[derive(Debug)]
3297struct SendMetricsRequest {
3298    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
3299    partition_key: String,
3300    /// Serialized metric buckets without encoding applied, used for signing.
3301    unencoded: Bytes,
3302    /// Serialized metric buckets with the stated HTTP encoding applied.
3303    encoded: Bytes,
3304    /// Mapping of all contained project keys to their scoping and extraction mode.
3305    ///
3306    /// Used to track outcomes for transmission failures.
3307    project_info: HashMap<ProjectKey, Scoping>,
3308    /// Encoding (compression) of the payload.
3309    http_encoding: HttpEncoding,
3310    /// Metric outcomes instance to send outcomes on error.
3311    metric_outcomes: MetricOutcomes,
3312}
3313
3314impl SendMetricsRequest {
3315    fn create_error_outcomes(self) {
3316        #[derive(serde::Deserialize)]
3317        struct Wrapper {
3318            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3319        }
3320
3321        let buckets = match serde_json::from_slice(&self.unencoded) {
3322            Ok(Wrapper { buckets }) => buckets,
3323            Err(err) => {
3324                relay_log::error!(
3325                    error = &err as &dyn std::error::Error,
3326                    "failed to parse buckets from failed transmission"
3327                );
3328                return;
3329            }
3330        };
3331
3332        for (key, buckets) in buckets {
3333            let Some(&scoping) = self.project_info.get(&key) else {
3334                relay_log::error!("missing scoping for project key");
3335                continue;
3336            };
3337
3338            self.metric_outcomes.track(
3339                scoping,
3340                &buckets,
3341                Outcome::Invalid(DiscardReason::Internal),
3342            );
3343        }
3344    }
3345}
3346
3347impl UpstreamRequest for SendMetricsRequest {
3348    fn set_relay_id(&self) -> bool {
3349        true
3350    }
3351
3352    fn sign(&mut self) -> Option<Sign> {
3353        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3354    }
3355
3356    fn method(&self) -> reqwest::Method {
3357        reqwest::Method::POST
3358    }
3359
3360    fn path(&self) -> Cow<'_, str> {
3361        "/api/0/relays/metrics/".into()
3362    }
3363
3364    fn route(&self) -> &'static str {
3365        "global_metrics"
3366    }
3367
3368    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3369        metric!(
3370            distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3371        );
3372
3373        builder
3374            .content_encoding(self.http_encoding)
3375            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3376            .header(header::CONTENT_TYPE, b"application/json")
3377            .body(self.encoded.clone());
3378
3379        Ok(())
3380    }
3381
3382    fn respond(
3383        self: Box<Self>,
3384        result: Result<http::Response, UpstreamRequestError>,
3385    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3386        Box::pin(async {
3387            match result {
3388                Ok(mut response) => {
3389                    response.consume().await.ok();
3390                }
3391                Err(error) => {
3392                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3393
3394                    // If the request did not arrive at the upstream, we are responsible for outcomes.
3395                    // Otherwise, the upstream is responsible to log outcomes.
3396                    if error.is_received() {
3397                        return;
3398                    }
3399
3400                    self.create_error_outcomes()
3401                }
3402            }
3403        })
3404    }
3405}
3406
3407/// Container for global and project level [`Quota`].
3408#[derive(Copy, Clone, Debug)]
3409struct CombinedQuotas<'a> {
3410    global_quotas: &'a [Quota],
3411    project_quotas: &'a [Quota],
3412}
3413
3414impl<'a> CombinedQuotas<'a> {
3415    /// Returns a new [`CombinedQuotas`].
3416    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3417        Self {
3418            global_quotas: &global_config.quotas,
3419            project_quotas,
3420        }
3421    }
3422
3423    /// Returns `true` if both global quotas and project quotas are empty.
3424    pub fn is_empty(&self) -> bool {
3425        self.len() == 0
3426    }
3427
3428    /// Returns the number of both global and project quotas.
3429    pub fn len(&self) -> usize {
3430        self.global_quotas.len() + self.project_quotas.len()
3431    }
3432}
3433
3434impl<'a> IntoIterator for CombinedQuotas<'a> {
3435    type Item = &'a Quota;
3436    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3437
3438    fn into_iter(self) -> Self::IntoIter {
3439        self.global_quotas.iter().chain(self.project_quotas.iter())
3440    }
3441}
3442
3443#[cfg(test)]
3444mod tests {
3445    use std::collections::BTreeMap;
3446
3447    use insta::assert_debug_snapshot;
3448    use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3449    use relay_common::glob2::LazyGlob;
3450    use relay_dynamic_config::ProjectConfig;
3451    use relay_event_normalization::{
3452        MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3453        TransactionNameRule,
3454    };
3455    use relay_event_schema::protocol::TransactionSource;
3456    use relay_pii::DataScrubbingConfig;
3457    use similar_asserts::assert_eq;
3458
3459    use crate::metrics_extraction::IntoMetric;
3460    use crate::metrics_extraction::transactions::types::{
3461        CommonTags, TransactionMeasurementTags, TransactionMetric,
3462    };
3463    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3464
3465    #[cfg(feature = "processing")]
3466    use {
3467        relay_metrics::BucketValue,
3468        relay_quotas::{QuotaScope, ReasonCode},
3469        relay_test::mock_service,
3470    };
3471
3472    use super::*;
3473
3474    #[cfg(feature = "processing")]
3475    fn mock_quota(id: &str) -> Quota {
3476        Quota {
3477            id: Some(id.into()),
3478            categories: [DataCategory::MetricBucket].into(),
3479            scope: QuotaScope::Organization,
3480            scope_id: None,
3481            limit: Some(0),
3482            window: None,
3483            reason_code: None,
3484            namespace: None,
3485        }
3486    }
3487
3488    #[cfg(feature = "processing")]
3489    #[test]
3490    fn test_dynamic_quotas() {
3491        let global_config = GlobalConfig {
3492            quotas: vec![mock_quota("foo"), mock_quota("bar")],
3493            ..Default::default()
3494        };
3495
3496        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3497
3498        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3499
3500        assert_eq!(dynamic_quotas.len(), 4);
3501        assert!(!dynamic_quotas.is_empty());
3502
3503        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3504        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3505    }
3506
3507    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
3508    /// also ratelimit the next batches in the same message automatically.
3509    #[cfg(feature = "processing")]
3510    #[tokio::test]
3511    async fn test_ratelimit_per_batch() {
3512        use relay_base_schema::organization::OrganizationId;
3513        use relay_protocol::FiniteF64;
3514
3515        let rate_limited_org = Scoping {
3516            organization_id: OrganizationId::new(1),
3517            project_id: ProjectId::new(21),
3518            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3519            key_id: Some(17),
3520        };
3521
3522        let not_rate_limited_org = Scoping {
3523            organization_id: OrganizationId::new(2),
3524            project_id: ProjectId::new(21),
3525            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3526            key_id: Some(17),
3527        };
3528
3529        let message = {
3530            let project_info = {
3531                let quota = Quota {
3532                    id: Some("testing".into()),
3533                    categories: [DataCategory::MetricBucket].into(),
3534                    scope: relay_quotas::QuotaScope::Organization,
3535                    scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3536                    limit: Some(0),
3537                    window: None,
3538                    reason_code: Some(ReasonCode::new("test")),
3539                    namespace: None,
3540                };
3541
3542                let mut config = ProjectConfig::default();
3543                config.quotas.push(quota);
3544
3545                Arc::new(ProjectInfo {
3546                    config,
3547                    ..Default::default()
3548                })
3549            };
3550
3551            let project_metrics = |scoping| ProjectBuckets {
3552                buckets: vec![Bucket {
3553                    name: "d:transactions/bar".into(),
3554                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3555                    timestamp: UnixTimestamp::now(),
3556                    tags: Default::default(),
3557                    width: 10,
3558                    metadata: BucketMetadata::default(),
3559                }],
3560                rate_limits: Default::default(),
3561                project_info: project_info.clone(),
3562                scoping,
3563            };
3564
3565            let buckets = hashbrown::HashMap::from([
3566                (
3567                    rate_limited_org.project_key,
3568                    project_metrics(rate_limited_org),
3569                ),
3570                (
3571                    not_rate_limited_org.project_key,
3572                    project_metrics(not_rate_limited_org),
3573                ),
3574            ]);
3575
3576            FlushBuckets {
3577                partition_key: 0,
3578                buckets,
3579            }
3580        };
3581
3582        // ensure the order of the map while iterating is as expected.
3583        assert_eq!(message.buckets.keys().count(), 2);
3584
3585        let config = {
3586            let config_json = serde_json::json!({
3587                "processing": {
3588                    "enabled": true,
3589                    "kafka_config": [],
3590                    "redis": {
3591                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3592                    }
3593                }
3594            });
3595            Config::from_json_value(config_json).unwrap()
3596        };
3597
3598        let (store, handle) = {
3599            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3600                let org_id = match msg {
3601                    Store::Metrics(x) => x.scoping.organization_id,
3602                    _ => panic!("received envelope when expecting only metrics"),
3603                };
3604                org_ids.push(org_id);
3605            };
3606
3607            mock_service("store_forwarder", vec![], f)
3608        };
3609
3610        let processor = create_test_processor(config).await;
3611        assert!(processor.redis_rate_limiter_enabled());
3612
3613        processor.encode_metrics_processing(message, &store).await;
3614
3615        drop(store);
3616        let orgs_not_ratelimited = handle.await.unwrap();
3617
3618        assert_eq!(
3619            orgs_not_ratelimited,
3620            vec![not_rate_limited_org.organization_id]
3621        );
3622    }
3623
3624    #[tokio::test]
3625    async fn test_browser_version_extraction_with_pii_like_data() {
3626        let processor = create_test_processor(Default::default()).await;
3627        let outcome_aggregator = Addr::dummy();
3628        let event_id = EventId::new();
3629
3630        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3631            .parse()
3632            .unwrap();
3633
3634        let request_meta = RequestMeta::new(dsn);
3635        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3636
3637        envelope.add_item({
3638                let mut item = Item::new(ItemType::Event);
3639                item.set_payload(
3640                    ContentType::Json,
3641                    r#"
3642                    {
3643                        "request": {
3644                            "headers": [
3645                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3646                            ]
3647                        }
3648                    }
3649                "#,
3650                );
3651                item
3652            });
3653
3654        let mut datascrubbing_settings = DataScrubbingConfig::default();
3655        // enable all the default scrubbing
3656        datascrubbing_settings.scrub_data = true;
3657        datascrubbing_settings.scrub_defaults = true;
3658        datascrubbing_settings.scrub_ip_addresses = true;
3659
3660        // Make sure to mask any IP-like looking data
3661        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3662
3663        let config = ProjectConfig {
3664            datascrubbing_settings,
3665            pii_config: Some(pii_config),
3666            ..Default::default()
3667        };
3668
3669        let project_info = ProjectInfo {
3670            config,
3671            ..Default::default()
3672        };
3673
3674        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3675        assert_eq!(envelopes.len(), 1);
3676
3677        let (group, envelope) = envelopes.pop().unwrap();
3678        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3679
3680        let message = ProcessEnvelopeGrouped {
3681            group,
3682            envelope,
3683            ctx: processing::Context {
3684                project_info: &project_info,
3685                ..processing::Context::for_test()
3686            },
3687        };
3688
3689        let Ok(Some(Submit::Envelope(mut new_envelope))) =
3690            processor.process(&mut Token::noop(), message).await
3691        else {
3692            panic!();
3693        };
3694        let new_envelope = new_envelope.envelope_mut();
3695
3696        let event_item = new_envelope.items().last().unwrap();
3697        let annotated_event: Annotated<Event> =
3698            Annotated::from_json_bytes(&event_item.payload()).unwrap();
3699        let event = annotated_event.into_value().unwrap();
3700        let headers = event
3701            .request
3702            .into_value()
3703            .unwrap()
3704            .headers
3705            .into_value()
3706            .unwrap();
3707
3708        // IP-like data must be masked
3709        assert_eq!(
3710            Some(
3711                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3712            ),
3713            headers.get_header("User-Agent")
3714        );
3715        // But we still get correct browser and version number
3716        let contexts = event.contexts.into_value().unwrap();
3717        let browser = contexts.0.get("browser").unwrap();
3718        assert_eq!(
3719            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3720            browser.to_json().unwrap()
3721        );
3722    }
3723
3724    #[tokio::test]
3725    #[cfg(feature = "processing")]
3726    async fn test_materialize_dsc() {
3727        use crate::services::projects::project::PublicKeyConfig;
3728
3729        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3730            .parse()
3731            .unwrap();
3732        let request_meta = RequestMeta::new(dsn);
3733        let mut envelope = Envelope::from_request(None, request_meta);
3734
3735        let dsc = r#"{
3736            "trace_id": "00000000-0000-0000-0000-000000000000",
3737            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3738            "sample_rate": "0.2"
3739        }"#;
3740        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3741
3742        let mut item = Item::new(ItemType::Event);
3743        item.set_payload(ContentType::Json, r#"{}"#);
3744        envelope.add_item(item);
3745
3746        let outcome_aggregator = Addr::dummy();
3747        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3748
3749        let mut project_info = ProjectInfo::default();
3750        project_info.public_keys.push(PublicKeyConfig {
3751            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3752            numeric_id: Some(1),
3753        });
3754
3755        let config = serde_json::json!({
3756            "processing": {
3757                "enabled": true,
3758                "kafka_config": [],
3759            }
3760        });
3761
3762        let message = ProcessEnvelopeGrouped {
3763            group: ProcessingGroup::Transaction,
3764            envelope: managed_envelope,
3765            ctx: processing::Context {
3766                config: &Config::from_json_value(config.clone()).unwrap(),
3767                project_info: &project_info,
3768                sampling_project_info: Some(&project_info),
3769                ..processing::Context::for_test()
3770            },
3771        };
3772
3773        let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3774        let Ok(Some(Submit::Envelope(envelope))) =
3775            processor.process(&mut Token::noop(), message).await
3776        else {
3777            panic!();
3778        };
3779        let event = envelope
3780            .envelope()
3781            .get_item_by(|item| item.ty() == &ItemType::Event)
3782            .unwrap();
3783
3784        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3785        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3786        Object(
3787            {
3788                "environment": ~,
3789                "public_key": String(
3790                    "e12d836b15bb49d7bbf99e64295d995b",
3791                ),
3792                "release": ~,
3793                "replay_id": ~,
3794                "sample_rate": String(
3795                    "0.2",
3796                ),
3797                "trace_id": String(
3798                    "00000000000000000000000000000000",
3799                ),
3800                "transaction": ~,
3801            },
3802        )
3803        "###);
3804    }
3805
3806    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3807        let mut event = Annotated::<Event>::from_json(
3808            r#"
3809            {
3810                "type": "transaction",
3811                "transaction": "/foo/",
3812                "timestamp": 946684810.0,
3813                "start_timestamp": 946684800.0,
3814                "contexts": {
3815                    "trace": {
3816                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3817                        "span_id": "fa90fdead5f74053",
3818                        "op": "http.server",
3819                        "type": "trace"
3820                    }
3821                },
3822                "transaction_info": {
3823                    "source": "url"
3824                }
3825            }
3826            "#,
3827        )
3828        .unwrap();
3829        let e = event.value_mut().as_mut().unwrap();
3830        e.transaction.set_value(Some(transaction_name.into()));
3831
3832        e.transaction_info
3833            .value_mut()
3834            .as_mut()
3835            .unwrap()
3836            .source
3837            .set_value(Some(source));
3838
3839        relay_statsd::with_capturing_test_client(|| {
3840            utils::log_transaction_name_metrics(&mut event, |event| {
3841                let config = NormalizationConfig {
3842                    transaction_name_config: TransactionNameConfig {
3843                        rules: &[TransactionNameRule {
3844                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
3845                            expiry: DateTime::<Utc>::MAX_UTC,
3846                            redaction: RedactionRule::Replace {
3847                                substitution: "*".to_owned(),
3848                            },
3849                        }],
3850                    },
3851                    ..Default::default()
3852                };
3853                relay_event_normalization::normalize_event(event, &config)
3854            });
3855        })
3856    }
3857
3858    #[test]
3859    fn test_log_transaction_metrics_none() {
3860        let captures = capture_test_event("/nothing", TransactionSource::Url);
3861        insta::assert_debug_snapshot!(captures, @r###"
3862        [
3863            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3864        ]
3865        "###);
3866    }
3867
3868    #[test]
3869    fn test_log_transaction_metrics_rule() {
3870        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3871        insta::assert_debug_snapshot!(captures, @r###"
3872        [
3873            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3874        ]
3875        "###);
3876    }
3877
3878    #[test]
3879    fn test_log_transaction_metrics_pattern() {
3880        let captures = capture_test_event("/something/12345", TransactionSource::Url);
3881        insta::assert_debug_snapshot!(captures, @r###"
3882        [
3883            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3884        ]
3885        "###);
3886    }
3887
3888    #[test]
3889    fn test_log_transaction_metrics_both() {
3890        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3891        insta::assert_debug_snapshot!(captures, @r###"
3892        [
3893            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3894        ]
3895        "###);
3896    }
3897
3898    #[test]
3899    fn test_log_transaction_metrics_no_match() {
3900        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3901        insta::assert_debug_snapshot!(captures, @r###"
3902        [
3903            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3904        ]
3905        "###);
3906    }
3907
3908    /// Confirms that the hardcoded value we use for the fixed length of the measurement MRI is
3909    /// correct. Unit test is placed here because it has dependencies to relay-server and therefore
3910    /// cannot be called from relay-metrics.
3911    #[test]
3912    fn test_mri_overhead_constant() {
3913        let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
3914
3915        let derived_value = {
3916            let name = "foobar".to_owned();
3917            let value = 5.into(); // Arbitrary value.
3918            let unit = MetricUnit::Duration(DurationUnit::default());
3919            let tags = TransactionMeasurementTags {
3920                measurement_rating: None,
3921                universal_tags: CommonTags(BTreeMap::new()),
3922                score_profile_version: None,
3923            };
3924
3925            let measurement = TransactionMetric::Measurement {
3926                name: name.clone(),
3927                value,
3928                unit,
3929                tags,
3930            };
3931
3932            let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
3933            metric.name.len() - unit.to_string().len() - name.len()
3934        };
3935        assert_eq!(
3936            hardcoded_value, derived_value,
3937            "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
3938        );
3939    }
3940
3941    #[tokio::test]
3942    async fn test_process_metrics_bucket_metadata() {
3943        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3944        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3945        let received_at = Utc::now();
3946        let config = Config::default();
3947
3948        let (aggregator, mut aggregator_rx) = Addr::custom();
3949        let processor = create_test_processor_with_addrs(
3950            config,
3951            Addrs {
3952                aggregator,
3953                ..Default::default()
3954            },
3955        )
3956        .await;
3957
3958        let mut item = Item::new(ItemType::Statsd);
3959        item.set_payload(
3960            ContentType::Text,
3961            "transactions/foo:3182887624:4267882815|s",
3962        );
3963        for (source, expected_received_at) in [
3964            (
3965                BucketSource::External,
3966                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3967            ),
3968            (BucketSource::Internal, None),
3969        ] {
3970            let message = ProcessMetrics {
3971                data: MetricData::Raw(vec![item.clone()]),
3972                project_key,
3973                source,
3974                received_at,
3975                sent_at: Some(Utc::now()),
3976            };
3977            processor.handle_process_metrics(&mut token, message);
3978
3979            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3980            let buckets = merge_buckets.buckets;
3981            assert_eq!(buckets.len(), 1);
3982            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3983        }
3984    }
3985
3986    #[tokio::test]
3987    async fn test_process_batched_metrics() {
3988        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3989        let received_at = Utc::now();
3990        let config = Config::default();
3991
3992        let (aggregator, mut aggregator_rx) = Addr::custom();
3993        let processor = create_test_processor_with_addrs(
3994            config,
3995            Addrs {
3996                aggregator,
3997                ..Default::default()
3998            },
3999        )
4000        .await;
4001
4002        let payload = r#"{
4003    "buckets": {
4004        "11111111111111111111111111111111": [
4005            {
4006                "timestamp": 1615889440,
4007                "width": 0,
4008                "name": "d:custom/endpoint.response_time@millisecond",
4009                "type": "d",
4010                "value": [
4011                  68.0
4012                ],
4013                "tags": {
4014                  "route": "user_index"
4015                }
4016            }
4017        ],
4018        "22222222222222222222222222222222": [
4019            {
4020                "timestamp": 1615889440,
4021                "width": 0,
4022                "name": "d:custom/endpoint.cache_rate@none",
4023                "type": "d",
4024                "value": [
4025                  36.0
4026                ]
4027            }
4028        ]
4029    }
4030}
4031"#;
4032        let message = ProcessBatchedMetrics {
4033            payload: Bytes::from(payload),
4034            source: BucketSource::Internal,
4035            received_at,
4036            sent_at: Some(Utc::now()),
4037        };
4038        processor.handle_process_batched_metrics(&mut token, message);
4039
4040        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
4041        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
4042
4043        let mut messages = vec![mb1, mb2];
4044        messages.sort_by_key(|mb| mb.project_key);
4045
4046        let actual = messages
4047            .into_iter()
4048            .map(|mb| (mb.project_key, mb.buckets))
4049            .collect::<Vec<_>>();
4050
4051        assert_debug_snapshot!(actual, @r###"
4052        [
4053            (
4054                ProjectKey("11111111111111111111111111111111"),
4055                [
4056                    Bucket {
4057                        timestamp: UnixTimestamp(1615889440),
4058                        width: 0,
4059                        name: MetricName(
4060                            "d:custom/endpoint.response_time@millisecond",
4061                        ),
4062                        value: Distribution(
4063                            [
4064                                68.0,
4065                            ],
4066                        ),
4067                        tags: {
4068                            "route": "user_index",
4069                        },
4070                        metadata: BucketMetadata {
4071                            merges: 1,
4072                            received_at: None,
4073                            extracted_from_indexed: false,
4074                        },
4075                    },
4076                ],
4077            ),
4078            (
4079                ProjectKey("22222222222222222222222222222222"),
4080                [
4081                    Bucket {
4082                        timestamp: UnixTimestamp(1615889440),
4083                        width: 0,
4084                        name: MetricName(
4085                            "d:custom/endpoint.cache_rate@none",
4086                        ),
4087                        value: Distribution(
4088                            [
4089                                36.0,
4090                            ],
4091                        ),
4092                        tags: {},
4093                        metadata: BucketMetadata {
4094                            merges: 1,
4095                            received_at: None,
4096                            extracted_from_indexed: false,
4097                        },
4098                    },
4099                ],
4100            ),
4101        ]
4102        "###);
4103    }
4104}