relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27    ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42    self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::{Integration, SpansIntegration};
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::ExtractedMetrics;
49use crate::metrics_extraction::transactions::types::ExtractMetricsError;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::profile_chunks::ProfileChunksProcessor;
53use crate::processing::sessions::SessionsProcessor;
54use crate::processing::spans::SpansProcessor;
55use crate::processing::trace_attachments::TraceAttachmentsProcessor;
56use crate::processing::trace_metrics::TraceMetricsProcessor;
57use crate::processing::transactions::extraction::ExtractMetricsContext;
58use crate::processing::utils::event::{
59    EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
60    event_type,
61};
62use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
63use crate::service::ServiceError;
64use crate::services::global_config::GlobalConfigHandle;
65use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
66use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
67use crate::services::projects::cache::ProjectCacheHandle;
68use crate::services::projects::project::{ProjectInfo, ProjectState};
69use crate::services::upstream::{
70    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
71};
72use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
73use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
74use crate::{http, processing};
75use relay_threading::AsyncPool;
76#[cfg(feature = "processing")]
77use {
78    crate::services::processor::nnswitch::SwitchProcessingError,
79    crate::services::store::{Store, StoreEnvelope},
80    crate::services::upload::Upload,
81    crate::utils::Enforcement,
82    itertools::Itertools,
83    relay_cardinality::{
84        CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
85        RedisSetLimiterOptions,
86    },
87    relay_dynamic_config::CardinalityLimiterMode,
88    relay_quotas::{RateLimitingError, RedisRateLimiter},
89    relay_redis::{AsyncRedisClient, RedisClients},
90    std::time::Instant,
91    symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
92};
93
94mod attachment;
95mod dynamic_sampling;
96mod event;
97mod metrics;
98mod nel;
99mod profile;
100mod replay;
101mod report;
102mod span;
103
104#[cfg(all(sentry, feature = "processing"))]
105mod playstation;
106mod standalone;
107#[cfg(feature = "processing")]
108mod unreal;
109
110#[cfg(feature = "processing")]
111mod nnswitch;
112
113/// Creates the block only if used with `processing` feature.
114///
115/// Provided code block will be executed only if the provided config has `processing_enabled` set.
116macro_rules! if_processing {
117    ($config:expr, $if_true:block) => {
118        #[cfg(feature = "processing")] {
119            if $config.processing_enabled() $if_true
120        }
121    };
122    ($config:expr, $if_true:block else $if_false:block) => {
123        {
124            #[cfg(feature = "processing")] {
125                if $config.processing_enabled() $if_true else $if_false
126            }
127            #[cfg(not(feature = "processing"))] {
128                $if_false
129            }
130        }
131    };
132}
133
134/// The minimum clock drift for correction to apply.
135pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
136
137#[derive(Debug)]
138pub struct GroupTypeError;
139
140impl Display for GroupTypeError {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        f.write_str("failed to convert processing group into corresponding type")
143    }
144}
145
146impl std::error::Error for GroupTypeError {}
147
148macro_rules! processing_group {
149    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
150        #[derive(Clone, Copy, Debug)]
151        pub struct $ty;
152
153        impl From<$ty> for ProcessingGroup {
154            fn from(_: $ty) -> Self {
155                ProcessingGroup::$variant
156            }
157        }
158
159        impl TryFrom<ProcessingGroup> for $ty {
160            type Error = GroupTypeError;
161
162            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
163                if matches!(value, ProcessingGroup::$variant) {
164                    return Ok($ty);
165                }
166                $($(
167                    if matches!(value, ProcessingGroup::$other) {
168                        return Ok($ty);
169                    }
170                )+)?
171                return Err(GroupTypeError);
172            }
173        }
174    };
175}
176
177/// A marker trait.
178///
179/// Should be used only with groups which are responsible for processing envelopes with events.
180pub trait EventProcessing {}
181
182processing_group!(TransactionGroup, Transaction);
183impl EventProcessing for TransactionGroup {}
184
185processing_group!(ErrorGroup, Error);
186impl EventProcessing for ErrorGroup {}
187
188processing_group!(SessionGroup, Session);
189processing_group!(StandaloneGroup, Standalone);
190processing_group!(ClientReportGroup, ClientReport);
191processing_group!(ReplayGroup, Replay);
192processing_group!(CheckInGroup, CheckIn);
193processing_group!(LogGroup, Log, Nel);
194processing_group!(TraceMetricGroup, TraceMetric);
195processing_group!(SpanGroup, Span);
196
197processing_group!(ProfileChunkGroup, ProfileChunk);
198processing_group!(MetricsGroup, Metrics);
199processing_group!(ForwardUnknownGroup, ForwardUnknown);
200processing_group!(Ungrouped, Ungrouped);
201
202/// Processed group type marker.
203///
204/// Marks the envelopes which passed through the processing pipeline.
205#[derive(Clone, Copy, Debug)]
206pub struct Processed;
207
208/// Describes the groups of the processable items.
209#[derive(Clone, Copy, Debug)]
210pub enum ProcessingGroup {
211    /// All the transaction related items.
212    ///
213    /// Includes transactions, related attachments, profiles.
214    Transaction,
215    /// All the items which require (have or create) events.
216    ///
217    /// This includes: errors, NEL, security reports, user reports, some of the
218    /// attachments.
219    Error,
220    /// Session events.
221    Session,
222    /// Standalone items which can be sent alone without any event attached to it in the current
223    /// envelope e.g. some attachments, user reports.
224    Standalone,
225    /// Outcomes.
226    ClientReport,
227    /// Replays and ReplayRecordings.
228    Replay,
229    /// Crons.
230    CheckIn,
231    /// NEL reports.
232    Nel,
233    /// Logs.
234    Log,
235    /// Trace metrics.
236    TraceMetric,
237    /// Spans.
238    Span,
239    /// Span V2 spans.
240    SpanV2,
241    /// Metrics.
242    Metrics,
243    /// ProfileChunk.
244    ProfileChunk,
245    /// V2 attachments without span / log association.
246    TraceAttachment,
247    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
248    /// decide what to do with them.
249    ForwardUnknown,
250    /// All the items in the envelope that could not be grouped.
251    Ungrouped,
252}
253
254impl ProcessingGroup {
255    /// Splits provided envelope into list of tuples of groups with associated envelopes.
256    fn split_envelope(
257        mut envelope: Envelope,
258        project_info: &ProjectInfo,
259    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
260        let headers = envelope.headers().clone();
261        let mut grouped_envelopes = smallvec![];
262
263        // Extract replays.
264        let replay_items = envelope.take_items_by(|item| {
265            matches!(
266                item.ty(),
267                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
268            )
269        });
270        if !replay_items.is_empty() {
271            grouped_envelopes.push((
272                ProcessingGroup::Replay,
273                Envelope::from_parts(headers.clone(), replay_items),
274            ))
275        }
276
277        // Keep all the sessions together in one envelope.
278        let session_items = envelope
279            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
280        if !session_items.is_empty() {
281            grouped_envelopes.push((
282                ProcessingGroup::Session,
283                Envelope::from_parts(headers.clone(), session_items),
284            ))
285        }
286
287        let span_v2_items = envelope.take_items_by(|item| {
288            let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
289            let otlp_feature = project_info.has_feature(Feature::SpanV2OtlpProcessing);
290            let is_supported_integration = {
291                matches!(
292                    item.integration(),
293                    Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
294                )
295            };
296            let is_span = matches!(item.ty(), &ItemType::Span);
297            let is_span_attachment = item.is_span_attachment();
298
299            ItemContainer::<SpanV2>::is_container(item)
300                || (exp_feature && is_span)
301                || ((exp_feature || otlp_feature) && is_supported_integration)
302                || (exp_feature && is_span_attachment)
303        });
304
305        if !span_v2_items.is_empty() {
306            grouped_envelopes.push((
307                ProcessingGroup::SpanV2,
308                Envelope::from_parts(headers.clone(), span_v2_items),
309            ))
310        }
311
312        // Extract spans.
313        let span_items = envelope.take_items_by(|item| {
314            matches!(item.ty(), &ItemType::Span)
315                || matches!(item.integration(), Some(Integration::Spans(_)))
316        });
317
318        if !span_items.is_empty() {
319            grouped_envelopes.push((
320                ProcessingGroup::Span,
321                Envelope::from_parts(headers.clone(), span_items),
322            ))
323        }
324
325        // Extract logs.
326        let logs_items = envelope.take_items_by(|item| {
327            matches!(item.ty(), &ItemType::Log)
328                || matches!(item.integration(), Some(Integration::Logs(_)))
329        });
330        if !logs_items.is_empty() {
331            grouped_envelopes.push((
332                ProcessingGroup::Log,
333                Envelope::from_parts(headers.clone(), logs_items),
334            ))
335        }
336
337        // Extract trace metrics.
338        let trace_metric_items =
339            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
340        if !trace_metric_items.is_empty() {
341            grouped_envelopes.push((
342                ProcessingGroup::TraceMetric,
343                Envelope::from_parts(headers.clone(), trace_metric_items),
344            ))
345        }
346
347        // NEL items are transformed into logs in their own processing step.
348        let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
349        if !nel_items.is_empty() {
350            grouped_envelopes.push((
351                ProcessingGroup::Nel,
352                Envelope::from_parts(headers.clone(), nel_items),
353            ))
354        }
355
356        // Extract all metric items.
357        //
358        // Note: Should only be relevant in proxy mode. In other modes we send metrics through
359        // a separate pipeline.
360        let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
361        if !metric_items.is_empty() {
362            grouped_envelopes.push((
363                ProcessingGroup::Metrics,
364                Envelope::from_parts(headers.clone(), metric_items),
365            ))
366        }
367
368        // Extract profile chunks.
369        let profile_chunk_items =
370            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
371        if !profile_chunk_items.is_empty() {
372            grouped_envelopes.push((
373                ProcessingGroup::ProfileChunk,
374                Envelope::from_parts(headers.clone(), profile_chunk_items),
375            ))
376        }
377
378        let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
379        if !trace_attachment_items.is_empty() {
380            grouped_envelopes.push((
381                ProcessingGroup::TraceAttachment,
382                Envelope::from_parts(headers.clone(), trace_attachment_items),
383            ))
384        }
385
386        // Extract all standalone items.
387        //
388        // Note: only if there are no items in the envelope which can create events, otherwise they
389        // will be in the same envelope with all require event items.
390        if !envelope.items().any(Item::creates_event) {
391            let standalone_items = envelope.take_items_by(Item::requires_event);
392            if !standalone_items.is_empty() {
393                grouped_envelopes.push((
394                    ProcessingGroup::Standalone,
395                    Envelope::from_parts(headers.clone(), standalone_items),
396                ))
397            }
398        };
399
400        // Make sure we create separate envelopes for each `RawSecurity` report.
401        let security_reports_items = envelope
402            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
403            .into_iter()
404            .map(|item| {
405                let headers = headers.clone();
406                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
407                let mut envelope = Envelope::from_parts(headers, items);
408                envelope.set_event_id(EventId::new());
409                (ProcessingGroup::Error, envelope)
410            });
411        grouped_envelopes.extend(security_reports_items);
412
413        // Extract all the items which require an event into separate envelope.
414        let require_event_items = envelope.take_items_by(Item::requires_event);
415        if !require_event_items.is_empty() {
416            let group = if require_event_items
417                .iter()
418                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
419            {
420                ProcessingGroup::Transaction
421            } else {
422                ProcessingGroup::Error
423            };
424
425            grouped_envelopes.push((
426                group,
427                Envelope::from_parts(headers.clone(), require_event_items),
428            ))
429        }
430
431        // Get the rest of the envelopes, one per item.
432        let envelopes = envelope.items_mut().map(|item| {
433            let headers = headers.clone();
434            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
435            let envelope = Envelope::from_parts(headers, items);
436            let item_type = item.ty();
437            let group = if matches!(item_type, &ItemType::CheckIn) {
438                ProcessingGroup::CheckIn
439            } else if matches!(item.ty(), &ItemType::ClientReport) {
440                ProcessingGroup::ClientReport
441            } else if matches!(item_type, &ItemType::Unknown(_)) {
442                ProcessingGroup::ForwardUnknown
443            } else {
444                // Cannot group this item type.
445                ProcessingGroup::Ungrouped
446            };
447
448            (group, envelope)
449        });
450        grouped_envelopes.extend(envelopes);
451
452        grouped_envelopes
453    }
454
455    /// Returns the name of the group.
456    pub fn variant(&self) -> &'static str {
457        match self {
458            ProcessingGroup::Transaction => "transaction",
459            ProcessingGroup::Error => "error",
460            ProcessingGroup::Session => "session",
461            ProcessingGroup::Standalone => "standalone",
462            ProcessingGroup::ClientReport => "client_report",
463            ProcessingGroup::Replay => "replay",
464            ProcessingGroup::CheckIn => "check_in",
465            ProcessingGroup::Log => "log",
466            ProcessingGroup::TraceMetric => "trace_metric",
467            ProcessingGroup::Nel => "nel",
468            ProcessingGroup::Span => "span",
469            ProcessingGroup::SpanV2 => "span_v2",
470            ProcessingGroup::Metrics => "metrics",
471            ProcessingGroup::ProfileChunk => "profile_chunk",
472            ProcessingGroup::TraceAttachment => "trace_attachment",
473            ProcessingGroup::ForwardUnknown => "forward_unknown",
474            ProcessingGroup::Ungrouped => "ungrouped",
475        }
476    }
477}
478
479impl From<ProcessingGroup> for AppFeature {
480    fn from(value: ProcessingGroup) -> Self {
481        match value {
482            ProcessingGroup::Transaction => AppFeature::Transactions,
483            ProcessingGroup::Error => AppFeature::Errors,
484            ProcessingGroup::Session => AppFeature::Sessions,
485            ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
486            ProcessingGroup::ClientReport => AppFeature::ClientReports,
487            ProcessingGroup::Replay => AppFeature::Replays,
488            ProcessingGroup::CheckIn => AppFeature::CheckIns,
489            ProcessingGroup::Log => AppFeature::Logs,
490            ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
491            ProcessingGroup::Nel => AppFeature::Logs,
492            ProcessingGroup::Span => AppFeature::Spans,
493            ProcessingGroup::SpanV2 => AppFeature::Spans,
494            ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
495            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
496            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
497            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
498            ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
499        }
500    }
501}
502
503/// An error returned when handling [`ProcessEnvelope`].
504#[derive(Debug, thiserror::Error)]
505pub enum ProcessingError {
506    #[error("invalid json in event")]
507    InvalidJson(#[source] serde_json::Error),
508
509    #[error("invalid message pack event payload")]
510    InvalidMsgpack(#[from] rmp_serde::decode::Error),
511
512    #[cfg(feature = "processing")]
513    #[error("invalid unreal crash report")]
514    InvalidUnrealReport(#[source] Unreal4Error),
515
516    #[error("event payload too large")]
517    PayloadTooLarge(DiscardItemType),
518
519    #[error("invalid transaction event")]
520    InvalidTransaction,
521
522    #[error("envelope processor failed")]
523    ProcessingFailed(#[from] ProcessingAction),
524
525    #[error("duplicate {0} in event")]
526    DuplicateItem(ItemType),
527
528    #[error("failed to extract event payload")]
529    NoEventPayload,
530
531    #[error("missing project id in DSN")]
532    MissingProjectId,
533
534    #[error("invalid security report type: {0:?}")]
535    InvalidSecurityType(Bytes),
536
537    #[error("unsupported security report type")]
538    UnsupportedSecurityType,
539
540    #[error("invalid security report")]
541    InvalidSecurityReport(#[source] serde_json::Error),
542
543    #[error("invalid nel report")]
544    InvalidNelReport(#[source] NetworkReportError),
545
546    #[error("event filtered with reason: {0:?}")]
547    EventFiltered(FilterStatKey),
548
549    #[error("missing or invalid required event timestamp")]
550    InvalidTimestamp,
551
552    #[error("could not serialize event payload")]
553    SerializeFailed(#[source] serde_json::Error),
554
555    #[cfg(feature = "processing")]
556    #[error("failed to apply quotas")]
557    QuotasFailed(#[from] RateLimitingError),
558
559    #[error("invalid pii config")]
560    PiiConfigError(PiiConfigError),
561
562    #[error("invalid processing group type")]
563    InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
564
565    #[error("invalid replay")]
566    InvalidReplay(DiscardReason),
567
568    #[error("replay filtered with reason: {0:?}")]
569    ReplayFiltered(FilterStatKey),
570
571    #[cfg(feature = "processing")]
572    #[error("nintendo switch dying message processing failed {0:?}")]
573    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
574
575    #[cfg(all(sentry, feature = "processing"))]
576    #[error("playstation dump processing failed: {0}")]
577    InvalidPlaystationDump(String),
578
579    #[error("processing group does not match specific processor")]
580    ProcessingGroupMismatch,
581    #[error("new processing pipeline failed")]
582    ProcessingFailure,
583}
584
585impl ProcessingError {
586    pub fn to_outcome(&self) -> Option<Outcome> {
587        match self {
588            Self::PayloadTooLarge(payload_type) => {
589                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
590            }
591            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
592            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
593            Self::InvalidSecurityType(_) => {
594                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
595            }
596            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
597            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
598            Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
599            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
600            Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
601            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
602            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
603            #[cfg(feature = "processing")]
604            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
605            #[cfg(all(sentry, feature = "processing"))]
606            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
607            #[cfg(feature = "processing")]
608            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
609                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
610            }
611            #[cfg(feature = "processing")]
612            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
613            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
614                Some(Outcome::Invalid(DiscardReason::Internal))
615            }
616            #[cfg(feature = "processing")]
617            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
618            Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
619            Self::MissingProjectId => None,
620            Self::EventFiltered(_) => None,
621            Self::InvalidProcessingGroup(_) => None,
622            Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
623            Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
624
625            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
626            // Outcomes are emitted in the new processing pipeline already.
627            Self::ProcessingFailure => None,
628        }
629    }
630
631    fn is_unexpected(&self) -> bool {
632        self.to_outcome()
633            .is_some_and(|outcome| outcome.is_unexpected())
634    }
635}
636
637#[cfg(feature = "processing")]
638impl From<Unreal4Error> for ProcessingError {
639    fn from(err: Unreal4Error) -> Self {
640        match err.kind() {
641            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
642            _ => ProcessingError::InvalidUnrealReport(err),
643        }
644    }
645}
646
647impl From<ExtractMetricsError> for ProcessingError {
648    fn from(error: ExtractMetricsError) -> Self {
649        match error {
650            ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
651                Self::InvalidTimestamp
652            }
653        }
654    }
655}
656
657impl From<InvalidProcessingGroupType> for ProcessingError {
658    fn from(value: InvalidProcessingGroupType) -> Self {
659        Self::InvalidProcessingGroup(Box::new(value))
660    }
661}
662
663type ExtractedEvent = (Annotated<Event>, usize);
664
665/// A container for extracted metrics during processing.
666///
667/// The container enforces that the extracted metrics are correctly tagged
668/// with the dynamic sampling decision.
669#[derive(Debug)]
670pub struct ProcessingExtractedMetrics {
671    metrics: ExtractedMetrics,
672}
673
674impl ProcessingExtractedMetrics {
675    pub fn new() -> Self {
676        Self {
677            metrics: ExtractedMetrics::default(),
678        }
679    }
680
681    pub fn into_inner(self) -> ExtractedMetrics {
682        self.metrics
683    }
684
685    /// Extends the contained metrics with [`ExtractedMetrics`].
686    pub fn extend(
687        &mut self,
688        extracted: ExtractedMetrics,
689        sampling_decision: Option<SamplingDecision>,
690    ) {
691        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
692        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
693    }
694
695    /// Extends the contained project metrics.
696    pub fn extend_project_metrics<I>(
697        &mut self,
698        buckets: I,
699        sampling_decision: Option<SamplingDecision>,
700    ) where
701        I: IntoIterator<Item = Bucket>,
702    {
703        self.metrics
704            .project_metrics
705            .extend(buckets.into_iter().map(|mut bucket| {
706                bucket.metadata.extracted_from_indexed =
707                    sampling_decision == Some(SamplingDecision::Keep);
708                bucket
709            }));
710    }
711
712    /// Extends the contained sampling metrics.
713    pub fn extend_sampling_metrics<I>(
714        &mut self,
715        buckets: I,
716        sampling_decision: Option<SamplingDecision>,
717    ) where
718        I: IntoIterator<Item = Bucket>,
719    {
720        self.metrics
721            .sampling_metrics
722            .extend(buckets.into_iter().map(|mut bucket| {
723                bucket.metadata.extracted_from_indexed =
724                    sampling_decision == Some(SamplingDecision::Keep);
725                bucket
726            }));
727    }
728
729    /// Applies rate limits to the contained metrics.
730    ///
731    /// This is used to apply rate limits which have been enforced on sampled items of an envelope
732    /// to also consistently apply to the metrics extracted from these items.
733    #[cfg(feature = "processing")]
734    fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
735        // Metric namespaces which need to be dropped.
736        let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
737        // Metrics belonging to this metric namespace need to have the `extracted_from_indexed`
738        // flag reset to `false`.
739        let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
740
741        for (namespace, limit, indexed) in [
742            (
743                MetricNamespace::Transactions,
744                &enforcement.event,
745                &enforcement.event_indexed,
746            ),
747            (
748                MetricNamespace::Spans,
749                &enforcement.spans,
750                &enforcement.spans_indexed,
751            ),
752        ] {
753            if limit.is_active() {
754                drop_namespaces.push(namespace);
755            } else if indexed.is_active() && !enforced_consistently {
756                // If the enforcement was not computed by consistently checking the limits,
757                // the quota for the metrics has not yet been incremented.
758                // In this case we have a dropped indexed payload but a metric which still needs to
759                // be accounted for, make sure the metric will still be rate limited.
760                reset_extracted_from_indexed.push(namespace);
761            }
762        }
763
764        if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
765            self.retain_mut(|bucket| {
766                let Some(namespace) = bucket.name.try_namespace() else {
767                    return true;
768                };
769
770                if drop_namespaces.contains(&namespace) {
771                    return false;
772                }
773
774                if reset_extracted_from_indexed.contains(&namespace) {
775                    bucket.metadata.extracted_from_indexed = false;
776                }
777
778                true
779            });
780        }
781    }
782
783    #[cfg(feature = "processing")]
784    fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
785        self.metrics.project_metrics.retain_mut(&mut f);
786        self.metrics.sampling_metrics.retain_mut(&mut f);
787    }
788}
789
790fn send_metrics(
791    metrics: ExtractedMetrics,
792    project_key: ProjectKey,
793    sampling_key: Option<ProjectKey>,
794    aggregator: &Addr<Aggregator>,
795) {
796    let ExtractedMetrics {
797        project_metrics,
798        sampling_metrics,
799    } = metrics;
800
801    if !project_metrics.is_empty() {
802        aggregator.send(MergeBuckets {
803            project_key,
804            buckets: project_metrics,
805        });
806    }
807
808    if !sampling_metrics.is_empty() {
809        // If no sampling project state is available, we associate the sampling
810        // metrics with the current project.
811        //
812        // project_without_tracing         -> metrics goes to self
813        // dependent_project_with_tracing  -> metrics goes to root
814        // root_project_with_tracing       -> metrics goes to root == self
815        let sampling_project_key = sampling_key.unwrap_or(project_key);
816        aggregator.send(MergeBuckets {
817            project_key: sampling_project_key,
818            buckets: sampling_metrics,
819        });
820    }
821}
822
823/// Function for on-off switches that filter specific item types (profiles, spans)
824/// based on a feature flag.
825///
826/// If the project config did not come from the upstream, we keep the items.
827fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
828    match config.relay_mode() {
829        RelayMode::Proxy => false,
830        RelayMode::Managed => !project_info.has_feature(feature),
831    }
832}
833
834/// The result of the envelope processing containing the processed envelope along with the partial
835/// result.
836#[derive(Debug)]
837#[expect(
838    clippy::large_enum_variant,
839    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
840)]
841enum ProcessingResult {
842    Envelope {
843        managed_envelope: TypedEnvelope<Processed>,
844        extracted_metrics: ProcessingExtractedMetrics,
845    },
846    Output(Output<Outputs>),
847}
848
849impl ProcessingResult {
850    /// Creates a [`ProcessingResult`] with no metrics.
851    fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
852        Self::Envelope {
853            managed_envelope,
854            extracted_metrics: ProcessingExtractedMetrics::new(),
855        }
856    }
857}
858
859/// All items which can be submitted upstream.
860#[derive(Debug)]
861#[expect(
862    clippy::large_enum_variant,
863    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
864)]
865enum Submit<'a> {
866    /// A processed envelope.
867    Envelope(TypedEnvelope<Processed>),
868    /// The output of a [`processing::Processor`].
869    Output {
870        output: Outputs,
871        ctx: processing::ForwardContext<'a>,
872    },
873}
874
875/// Applies processing to all contents of the given envelope.
876///
877/// Depending on the contents of the envelope and Relay's mode, this includes:
878///
879///  - Basic normalization and validation for all item types.
880///  - Clock drift correction if the required `sent_at` header is present.
881///  - Expansion of certain item types (e.g. unreal).
882///  - Store normalization for event payloads in processing mode.
883///  - Rate limiters and inbound filters on events in processing mode.
884#[derive(Debug)]
885pub struct ProcessEnvelope {
886    /// Envelope to process.
887    pub envelope: ManagedEnvelope,
888    /// The project info.
889    pub project_info: Arc<ProjectInfo>,
890    /// Currently active cached rate limits for this project.
891    pub rate_limits: Arc<RateLimits>,
892    /// Root sampling project info.
893    pub sampling_project_info: Option<Arc<ProjectInfo>>,
894    /// Sampling reservoir counters.
895    pub reservoir_counters: ReservoirCounters,
896}
897
898/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
899#[derive(Debug)]
900struct ProcessEnvelopeGrouped<'a> {
901    /// The group the envelope belongs to.
902    pub group: ProcessingGroup,
903    /// Envelope to process.
904    pub envelope: ManagedEnvelope,
905    /// The processing context.
906    pub ctx: processing::Context<'a>,
907}
908
909/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
910///
911/// This parses and validates the metrics:
912///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
913///    ignored independently.
914///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
915///    dropped together on parsing failure.
916///  - Other envelope items will be ignored with an error message.
917///
918/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
919/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
920#[derive(Debug)]
921pub struct ProcessMetrics {
922    /// A list of metric items.
923    pub data: MetricData,
924    /// The target project.
925    pub project_key: ProjectKey,
926    /// Whether to keep or reset the metric metadata.
927    pub source: BucketSource,
928    /// The wall clock time at which the request was received.
929    pub received_at: DateTime<Utc>,
930    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
931    /// correction.
932    pub sent_at: Option<DateTime<Utc>>,
933}
934
935/// Raw unparsed metric data.
936#[derive(Debug)]
937pub enum MetricData {
938    /// Raw data, unparsed envelope items.
939    Raw(Vec<Item>),
940    /// Already parsed buckets but unprocessed.
941    Parsed(Vec<Bucket>),
942}
943
944impl MetricData {
945    /// Consumes the metric data and parses the contained buckets.
946    ///
947    /// If the contained data is already parsed the buckets are returned unchanged.
948    /// Raw buckets are parsed and created with the passed `timestamp`.
949    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
950        let items = match self {
951            Self::Parsed(buckets) => return buckets,
952            Self::Raw(items) => items,
953        };
954
955        let mut buckets = Vec::new();
956        for item in items {
957            let payload = item.payload();
958            if item.ty() == &ItemType::Statsd {
959                for bucket_result in Bucket::parse_all(&payload, timestamp) {
960                    match bucket_result {
961                        Ok(bucket) => buckets.push(bucket),
962                        Err(error) => relay_log::debug!(
963                            error = &error as &dyn Error,
964                            "failed to parse metric bucket from statsd format",
965                        ),
966                    }
967                }
968            } else if item.ty() == &ItemType::MetricBuckets {
969                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
970                    Ok(parsed_buckets) => {
971                        // Re-use the allocation of `b` if possible.
972                        if buckets.is_empty() {
973                            buckets = parsed_buckets;
974                        } else {
975                            buckets.extend(parsed_buckets);
976                        }
977                    }
978                    Err(error) => {
979                        relay_log::debug!(
980                            error = &error as &dyn Error,
981                            "failed to parse metric bucket",
982                        );
983                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
984                    }
985                }
986            } else {
987                relay_log::error!(
988                    "invalid item of type {} passed to ProcessMetrics",
989                    item.ty()
990                );
991            }
992        }
993        buckets
994    }
995}
996
997#[derive(Debug)]
998pub struct ProcessBatchedMetrics {
999    /// Metrics payload in JSON format.
1000    pub payload: Bytes,
1001    /// Whether to keep or reset the metric metadata.
1002    pub source: BucketSource,
1003    /// The wall clock time at which the request was received.
1004    pub received_at: DateTime<Utc>,
1005    /// The wall clock time at which the request was received.
1006    pub sent_at: Option<DateTime<Utc>>,
1007}
1008
1009/// Source information where a metric bucket originates from.
1010#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1011pub enum BucketSource {
1012    /// The metric bucket originated from an internal Relay use case.
1013    ///
1014    /// The metric bucket originates either from within the same Relay
1015    /// or was accepted coming from another Relay which is registered as
1016    /// an internal Relay via Relay's configuration.
1017    Internal,
1018    /// The bucket source originated from an untrusted source.
1019    ///
1020    /// Managed Relays sending extracted metrics are considered external,
1021    /// it's a project use case but it comes from an untrusted source.
1022    External,
1023}
1024
1025impl BucketSource {
1026    /// Infers the bucket source from [`RequestMeta::request_trust`].
1027    pub fn from_meta(meta: &RequestMeta) -> Self {
1028        match meta.request_trust() {
1029            RequestTrust::Trusted => Self::Internal,
1030            RequestTrust::Untrusted => Self::External,
1031        }
1032    }
1033}
1034
1035/// Sends a client report to the upstream.
1036#[derive(Debug)]
1037pub struct SubmitClientReports {
1038    /// The client report to be sent.
1039    pub client_reports: Vec<ClientReport>,
1040    /// Scoping information for the client report.
1041    pub scoping: Scoping,
1042}
1043
1044/// CPU-intensive processing tasks for envelopes.
1045#[derive(Debug)]
1046pub enum EnvelopeProcessor {
1047    ProcessEnvelope(Box<ProcessEnvelope>),
1048    ProcessProjectMetrics(Box<ProcessMetrics>),
1049    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1050    FlushBuckets(Box<FlushBuckets>),
1051    SubmitClientReports(Box<SubmitClientReports>),
1052}
1053
1054impl EnvelopeProcessor {
1055    /// Returns the name of the message variant.
1056    pub fn variant(&self) -> &'static str {
1057        match self {
1058            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1059            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1060            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1061            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1062            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1063        }
1064    }
1065}
1066
1067impl relay_system::Interface for EnvelopeProcessor {}
1068
1069impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1070    type Response = relay_system::NoResponse;
1071
1072    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1073        Self::ProcessEnvelope(Box::new(message))
1074    }
1075}
1076
1077impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1078    type Response = NoResponse;
1079
1080    fn from_message(message: ProcessMetrics, _: ()) -> Self {
1081        Self::ProcessProjectMetrics(Box::new(message))
1082    }
1083}
1084
1085impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1086    type Response = NoResponse;
1087
1088    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1089        Self::ProcessBatchedMetrics(Box::new(message))
1090    }
1091}
1092
1093impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1094    type Response = NoResponse;
1095
1096    fn from_message(message: FlushBuckets, _: ()) -> Self {
1097        Self::FlushBuckets(Box::new(message))
1098    }
1099}
1100
1101impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1102    type Response = NoResponse;
1103
1104    fn from_message(message: SubmitClientReports, _: ()) -> Self {
1105        Self::SubmitClientReports(Box::new(message))
1106    }
1107}
1108
1109/// The asynchronous thread pool used for scheduling processing tasks in the processor.
1110pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1111
1112/// Service implementing the [`EnvelopeProcessor`] interface.
1113///
1114/// This service handles messages in a worker pool with configurable concurrency.
1115#[derive(Clone)]
1116pub struct EnvelopeProcessorService {
1117    inner: Arc<InnerProcessor>,
1118}
1119
1120/// Contains the addresses of services that the processor publishes to.
1121pub struct Addrs {
1122    pub outcome_aggregator: Addr<TrackOutcome>,
1123    pub upstream_relay: Addr<UpstreamRelay>,
1124    #[cfg(feature = "processing")]
1125    pub upload: Option<Addr<Upload>>,
1126    #[cfg(feature = "processing")]
1127    pub store_forwarder: Option<Addr<Store>>,
1128    pub aggregator: Addr<Aggregator>,
1129}
1130
1131impl Default for Addrs {
1132    fn default() -> Self {
1133        Addrs {
1134            outcome_aggregator: Addr::dummy(),
1135            upstream_relay: Addr::dummy(),
1136            #[cfg(feature = "processing")]
1137            upload: None,
1138            #[cfg(feature = "processing")]
1139            store_forwarder: None,
1140            aggregator: Addr::dummy(),
1141        }
1142    }
1143}
1144
1145struct InnerProcessor {
1146    pool: EnvelopeProcessorServicePool,
1147    config: Arc<Config>,
1148    global_config: GlobalConfigHandle,
1149    project_cache: ProjectCacheHandle,
1150    cogs: Cogs,
1151    #[cfg(feature = "processing")]
1152    quotas_client: Option<AsyncRedisClient>,
1153    addrs: Addrs,
1154    #[cfg(feature = "processing")]
1155    rate_limiter: Option<Arc<RedisRateLimiter>>,
1156    geoip_lookup: GeoIpLookup,
1157    #[cfg(feature = "processing")]
1158    cardinality_limiter: Option<CardinalityLimiter>,
1159    metric_outcomes: MetricOutcomes,
1160    processing: Processing,
1161}
1162
1163struct Processing {
1164    logs: LogsProcessor,
1165    trace_metrics: TraceMetricsProcessor,
1166    spans: SpansProcessor,
1167    check_ins: CheckInsProcessor,
1168    sessions: SessionsProcessor,
1169    profile_chunks: ProfileChunksProcessor,
1170    trace_attachments: TraceAttachmentsProcessor,
1171}
1172
1173impl EnvelopeProcessorService {
1174    /// Creates a multi-threaded envelope processor.
1175    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1176    pub fn new(
1177        pool: EnvelopeProcessorServicePool,
1178        config: Arc<Config>,
1179        global_config: GlobalConfigHandle,
1180        project_cache: ProjectCacheHandle,
1181        cogs: Cogs,
1182        #[cfg(feature = "processing")] redis: Option<RedisClients>,
1183        addrs: Addrs,
1184        metric_outcomes: MetricOutcomes,
1185    ) -> Self {
1186        let geoip_lookup = config
1187            .geoip_path()
1188            .and_then(
1189                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1190                    Ok(geoip) => Some(geoip),
1191                    Err(err) => {
1192                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1193                        None
1194                    }
1195                },
1196            )
1197            .unwrap_or_else(GeoIpLookup::empty);
1198
1199        #[cfg(feature = "processing")]
1200        let (cardinality, quotas) = match redis {
1201            Some(RedisClients {
1202                cardinality,
1203                quotas,
1204                ..
1205            }) => (Some(cardinality), Some(quotas)),
1206            None => (None, None),
1207        };
1208
1209        #[cfg(feature = "processing")]
1210        let rate_limiter = quotas.clone().map(|redis| {
1211            RedisRateLimiter::new(redis)
1212                .max_limit(config.max_rate_limit())
1213                .cache(config.quota_cache_ratio(), config.quota_cache_max())
1214        });
1215
1216        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1217            #[cfg(feature = "processing")]
1218            project_cache.clone(),
1219            #[cfg(feature = "processing")]
1220            rate_limiter.clone(),
1221        ));
1222        #[cfg(feature = "processing")]
1223        let rate_limiter = rate_limiter.map(Arc::new);
1224
1225        let inner = InnerProcessor {
1226            pool,
1227            global_config,
1228            project_cache,
1229            cogs,
1230            #[cfg(feature = "processing")]
1231            quotas_client: quotas.clone(),
1232            #[cfg(feature = "processing")]
1233            rate_limiter,
1234            addrs,
1235            #[cfg(feature = "processing")]
1236            cardinality_limiter: cardinality
1237                .map(|cardinality| {
1238                    RedisSetLimiter::new(
1239                        RedisSetLimiterOptions {
1240                            cache_vacuum_interval: config
1241                                .cardinality_limiter_cache_vacuum_interval(),
1242                        },
1243                        cardinality,
1244                    )
1245                })
1246                .map(CardinalityLimiter::new),
1247            metric_outcomes,
1248            processing: Processing {
1249                logs: LogsProcessor::new(Arc::clone(&quota_limiter)),
1250                trace_metrics: TraceMetricsProcessor::new(Arc::clone(&quota_limiter)),
1251                spans: SpansProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1252                check_ins: CheckInsProcessor::new(Arc::clone(&quota_limiter)),
1253                sessions: SessionsProcessor::new(Arc::clone(&quota_limiter)),
1254                profile_chunks: ProfileChunksProcessor::new(Arc::clone(&quota_limiter)),
1255                trace_attachments: TraceAttachmentsProcessor::new(quota_limiter),
1256            },
1257            geoip_lookup,
1258            config,
1259        };
1260
1261        Self {
1262            inner: Arc::new(inner),
1263        }
1264    }
1265
1266    async fn enforce_quotas<Group>(
1267        &self,
1268        managed_envelope: &mut TypedEnvelope<Group>,
1269        event: Annotated<Event>,
1270        extracted_metrics: &mut ProcessingExtractedMetrics,
1271        ctx: processing::Context<'_>,
1272    ) -> Result<Annotated<Event>, ProcessingError> {
1273        // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not
1274        // applied in the fast path, all cached quotas can be applied here.
1275        let cached_result = RateLimiter::Cached
1276            .enforce(managed_envelope, event, extracted_metrics, ctx)
1277            .await?;
1278
1279        if_processing!(self.inner.config, {
1280            let rate_limiter = match self.inner.rate_limiter.clone() {
1281                Some(rate_limiter) => rate_limiter,
1282                None => return Ok(cached_result.event),
1283            };
1284
1285            // Enforce all quotas consistently with Redis.
1286            let consistent_result = RateLimiter::Consistent(rate_limiter)
1287                .enforce(
1288                    managed_envelope,
1289                    cached_result.event,
1290                    extracted_metrics,
1291                    ctx
1292                )
1293                .await?;
1294
1295            // Update cached rate limits with the freshly computed ones.
1296            if !consistent_result.rate_limits.is_empty() {
1297                self.inner
1298                    .project_cache
1299                    .get(managed_envelope.scoping().project_key)
1300                    .rate_limits()
1301                    .merge(consistent_result.rate_limits);
1302            }
1303
1304            Ok(consistent_result.event)
1305        } else { Ok(cached_result.event) })
1306    }
1307
1308    /// Processes the general errors, and the items which require or create the events.
1309    async fn process_errors(
1310        &self,
1311        managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1312        project_id: ProjectId,
1313        mut ctx: processing::Context<'_>,
1314    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1315        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1316        let mut metrics = Metrics::default();
1317        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1318
1319        // Events can also contain user reports.
1320        report::process_user_reports(managed_envelope);
1321
1322        if_processing!(self.inner.config, {
1323            unreal::expand(managed_envelope, &self.inner.config)?;
1324            #[cfg(sentry)]
1325            playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1326            nnswitch::expand(managed_envelope)?;
1327        });
1328
1329        let extraction_result = event::extract(
1330            managed_envelope,
1331            &mut metrics,
1332            event_fully_normalized,
1333            &self.inner.config,
1334        )?;
1335        let mut event = extraction_result.event;
1336
1337        if_processing!(self.inner.config, {
1338            if let Some(inner_event_fully_normalized) =
1339                unreal::process(managed_envelope, &mut event)?
1340            {
1341                event_fully_normalized = inner_event_fully_normalized;
1342            }
1343            #[cfg(sentry)]
1344            if let Some(inner_event_fully_normalized) =
1345                playstation::process(managed_envelope, &mut event, ctx.project_info)?
1346            {
1347                event_fully_normalized = inner_event_fully_normalized;
1348            }
1349            if let Some(inner_event_fully_normalized) =
1350                attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1351            {
1352                event_fully_normalized = inner_event_fully_normalized;
1353            }
1354        });
1355
1356        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1357            managed_envelope,
1358            &mut event,
1359            ctx.project_info,
1360            ctx.sampling_project_info,
1361        );
1362
1363        let attachments = managed_envelope
1364            .envelope()
1365            .items()
1366            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1367        processing::utils::event::finalize(
1368            managed_envelope.envelope().headers(),
1369            &mut event,
1370            attachments,
1371            &mut metrics,
1372            ctx.config,
1373        )?;
1374        event_fully_normalized = processing::utils::event::normalize(
1375            managed_envelope.envelope().headers(),
1376            &mut event,
1377            event_fully_normalized,
1378            project_id,
1379            ctx,
1380            &self.inner.geoip_lookup,
1381        )?;
1382        let filter_run =
1383            processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1384                .map_err(|err| {
1385                managed_envelope.reject(Outcome::Filtered(err.clone()));
1386                ProcessingError::EventFiltered(err)
1387            })?;
1388
1389        if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1390            dynamic_sampling::tag_error_with_sampling_decision(
1391                managed_envelope,
1392                &mut event,
1393                ctx.sampling_project_info,
1394                &self.inner.config,
1395            )
1396            .await;
1397        }
1398
1399        event = self
1400            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1401            .await?;
1402
1403        if event.value().is_some() {
1404            processing::utils::event::scrub(&mut event, ctx.project_info)?;
1405            event::serialize(
1406                managed_envelope,
1407                &mut event,
1408                event_fully_normalized,
1409                EventMetricsExtracted(false),
1410                SpansExtracted(false),
1411            )?;
1412            event::emit_feedback_metrics(managed_envelope.envelope());
1413        }
1414
1415        let attachments = managed_envelope
1416            .envelope_mut()
1417            .items_mut()
1418            .filter(|i| i.ty() == &ItemType::Attachment);
1419        processing::utils::attachments::scrub(attachments, ctx.project_info);
1420
1421        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1422            relay_log::error!(
1423                tags.project = %project_id,
1424                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1425                "ingested event without normalizing"
1426            );
1427        }
1428
1429        Ok(Some(extracted_metrics))
1430    }
1431
1432    /// Processes only transactions and transaction-related items.
1433    #[allow(unused_assignments)]
1434    async fn process_transactions(
1435        &self,
1436        managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1437        cogs: &mut Token,
1438        project_id: ProjectId,
1439        mut ctx: processing::Context<'_>,
1440    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1441        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1442        let mut event_metrics_extracted = EventMetricsExtracted(false);
1443        let mut spans_extracted = SpansExtracted(false);
1444        let mut metrics = Metrics::default();
1445        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1446
1447        // We extract the main event from the envelope.
1448        let extraction_result = event::extract(
1449            managed_envelope,
1450            &mut metrics,
1451            event_fully_normalized,
1452            &self.inner.config,
1453        )?;
1454
1455        // If metrics were extracted we mark that.
1456        if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1457            event_metrics_extracted = inner_event_metrics_extracted;
1458        }
1459        if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1460            spans_extracted = inner_spans_extracted;
1461        };
1462
1463        // We take the main event out of the result.
1464        let mut event = extraction_result.event;
1465
1466        let profile_id = profile::filter(
1467            managed_envelope,
1468            &event,
1469            ctx.config,
1470            project_id,
1471            ctx.project_info,
1472        );
1473        processing::transactions::profile::transfer_id(&mut event, profile_id);
1474        processing::transactions::profile::remove_context_if_rate_limited(
1475            &mut event,
1476            managed_envelope.scoping(),
1477            ctx,
1478        );
1479
1480        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1481            managed_envelope,
1482            &mut event,
1483            ctx.project_info,
1484            ctx.sampling_project_info,
1485        );
1486
1487        let attachments = managed_envelope
1488            .envelope()
1489            .items()
1490            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1491        processing::utils::event::finalize(
1492            managed_envelope.envelope().headers(),
1493            &mut event,
1494            attachments,
1495            &mut metrics,
1496            &self.inner.config,
1497        )?;
1498
1499        event_fully_normalized = processing::utils::event::normalize(
1500            managed_envelope.envelope().headers(),
1501            &mut event,
1502            event_fully_normalized,
1503            project_id,
1504            ctx,
1505            &self.inner.geoip_lookup,
1506        )?;
1507
1508        let filter_run =
1509            processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1510                .map_err(|err| {
1511                managed_envelope.reject(Outcome::Filtered(err.clone()));
1512                ProcessingError::EventFiltered(err)
1513            })?;
1514
1515        // Always run dynamic sampling on processing Relays,
1516        // but delay decision until inbound filters have been fully processed.
1517        // Also, we require transaction metrics to be enabled before sampling.
1518        let run_dynamic_sampling = (matches!(filter_run, FiltersStatus::Ok)
1519            || self.inner.config.processing_enabled())
1520            && matches!(&ctx.project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled());
1521
1522        let sampling_result = match run_dynamic_sampling {
1523            true => {
1524                #[allow(unused_mut)]
1525                let mut reservoir = ReservoirEvaluator::new(Arc::clone(ctx.reservoir_counters));
1526                #[cfg(feature = "processing")]
1527                if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
1528                    reservoir.set_redis(managed_envelope.scoping().organization_id, quotas_client);
1529                }
1530                processing::utils::dynamic_sampling::run(
1531                    managed_envelope.envelope().headers().dsc(),
1532                    event.value(),
1533                    &ctx,
1534                    Some(&reservoir),
1535                )
1536                .await
1537            }
1538            false => SamplingResult::Pending,
1539        };
1540
1541        relay_statsd::metric!(
1542            counter(RelayCounters::SamplingDecision) += 1,
1543            decision = sampling_result.decision().as_str(),
1544            item = "transaction"
1545        );
1546
1547        #[cfg(feature = "processing")]
1548        let server_sample_rate = sampling_result.sample_rate();
1549
1550        if let Some(outcome) = sampling_result.into_dropped_outcome() {
1551            // Process profiles before dropping the transaction, if necessary.
1552            // Before metric extraction to make sure the profile count is reflected correctly.
1553            profile::process(
1554                managed_envelope,
1555                &mut event,
1556                ctx.global_config,
1557                ctx.config,
1558                ctx.project_info,
1559            );
1560            // Extract metrics here, we're about to drop the event/transaction.
1561            event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1562                &mut event,
1563                &mut extracted_metrics,
1564                ExtractMetricsContext {
1565                    dsc: managed_envelope.envelope().dsc(),
1566                    project_id,
1567                    ctx,
1568                    sampling_decision: SamplingDecision::Drop,
1569                    metrics_extracted: event_metrics_extracted.0,
1570                    spans_extracted: spans_extracted.0,
1571                },
1572            )?;
1573
1574            dynamic_sampling::drop_unsampled_items(
1575                managed_envelope,
1576                event,
1577                outcome,
1578                spans_extracted,
1579            );
1580
1581            // At this point we have:
1582            //  - An empty envelope.
1583            //  - An envelope containing only processed profiles.
1584            // We need to make sure there are enough quotas for these profiles.
1585            event = self
1586                .enforce_quotas(
1587                    managed_envelope,
1588                    Annotated::empty(),
1589                    &mut extracted_metrics,
1590                    ctx,
1591                )
1592                .await?;
1593
1594            return Ok(Some(extracted_metrics));
1595        }
1596
1597        let _post_ds = cogs.start_category("post_ds");
1598
1599        // Need to scrub the transaction before extracting spans.
1600        //
1601        // Unconditionally scrub to make sure PII is removed as early as possible.
1602        processing::utils::event::scrub(&mut event, ctx.project_info)?;
1603
1604        let attachments = managed_envelope
1605            .envelope_mut()
1606            .items_mut()
1607            .filter(|i| i.ty() == &ItemType::Attachment);
1608        processing::utils::attachments::scrub(attachments, ctx.project_info);
1609
1610        if_processing!(self.inner.config, {
1611            // Process profiles before extracting metrics, to make sure they are removed if they are invalid.
1612            let profile_id = profile::process(
1613                managed_envelope,
1614                &mut event,
1615                ctx.global_config,
1616                ctx.config,
1617                ctx.project_info,
1618            );
1619            processing::transactions::profile::transfer_id(&mut event, profile_id);
1620            processing::transactions::profile::scrub_profiler_id(&mut event);
1621
1622            // Always extract metrics in processing Relays for sampled items.
1623            event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1624                &mut event,
1625                &mut extracted_metrics,
1626                ExtractMetricsContext {
1627                    dsc: managed_envelope.envelope().dsc(),
1628                    project_id,
1629                    ctx,
1630                    sampling_decision: SamplingDecision::Keep,
1631                    metrics_extracted: event_metrics_extracted.0,
1632                    spans_extracted: spans_extracted.0,
1633                },
1634            )?;
1635
1636            if let Some(spans) = processing::transactions::spans::extract_from_event(
1637                managed_envelope.envelope().dsc(),
1638                &event,
1639                ctx.global_config,
1640                ctx.config,
1641                server_sample_rate,
1642                event_metrics_extracted,
1643                spans_extracted,
1644            ) {
1645                spans_extracted = SpansExtracted(true);
1646                for item in spans {
1647                    match item {
1648                        Ok(item) => managed_envelope.envelope_mut().add_item(item),
1649                        Err(()) => managed_envelope.track_outcome(
1650                            Outcome::Invalid(DiscardReason::InvalidSpan),
1651                            DataCategory::SpanIndexed,
1652                            1,
1653                        ),
1654                        // TODO: also `DataCategory::Span`?
1655                    }
1656                }
1657            }
1658        });
1659
1660        event = self
1661            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1662            .await?;
1663
1664        // Event may have been dropped because of a quota and the envelope can be empty.
1665        if event.value().is_some() {
1666            event::serialize(
1667                managed_envelope,
1668                &mut event,
1669                event_fully_normalized,
1670                event_metrics_extracted,
1671                spans_extracted,
1672            )?;
1673        }
1674
1675        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1676            relay_log::error!(
1677                tags.project = %project_id,
1678                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1679                "ingested event without normalizing"
1680            );
1681        };
1682
1683        Ok(Some(extracted_metrics))
1684    }
1685
1686    /// Processes standalone items that require an event ID, but do not have an event on the same envelope.
1687    async fn process_standalone(
1688        &self,
1689        managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1690        project_id: ProjectId,
1691        ctx: processing::Context<'_>,
1692    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1693        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1694
1695        standalone::process(managed_envelope);
1696
1697        profile::filter(
1698            managed_envelope,
1699            &Annotated::empty(),
1700            ctx.config,
1701            project_id,
1702            ctx.project_info,
1703        );
1704
1705        self.enforce_quotas(
1706            managed_envelope,
1707            Annotated::empty(),
1708            &mut extracted_metrics,
1709            ctx,
1710        )
1711        .await?;
1712
1713        report::process_user_reports(managed_envelope);
1714        let attachments = managed_envelope
1715            .envelope_mut()
1716            .items_mut()
1717            .filter(|i| i.ty() == &ItemType::Attachment);
1718        processing::utils::attachments::scrub(attachments, ctx.project_info);
1719
1720        Ok(Some(extracted_metrics))
1721    }
1722
1723    /// Processes user and client reports.
1724    async fn process_client_reports(
1725        &self,
1726        managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1727        ctx: processing::Context<'_>,
1728    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1729        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1730
1731        self.enforce_quotas(
1732            managed_envelope,
1733            Annotated::empty(),
1734            &mut extracted_metrics,
1735            ctx,
1736        )
1737        .await?;
1738
1739        report::process_client_reports(
1740            managed_envelope,
1741            ctx.config,
1742            ctx.project_info,
1743            self.inner.addrs.outcome_aggregator.clone(),
1744        );
1745
1746        Ok(Some(extracted_metrics))
1747    }
1748
1749    /// Processes replays.
1750    async fn process_replays(
1751        &self,
1752        managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1753        ctx: processing::Context<'_>,
1754    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1755        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1756
1757        replay::process(
1758            managed_envelope,
1759            ctx.global_config,
1760            ctx.config,
1761            ctx.project_info,
1762            &self.inner.geoip_lookup,
1763        )?;
1764
1765        self.enforce_quotas(
1766            managed_envelope,
1767            Annotated::empty(),
1768            &mut extracted_metrics,
1769            ctx,
1770        )
1771        .await?;
1772
1773        Ok(Some(extracted_metrics))
1774    }
1775
1776    async fn process_nel(
1777        &self,
1778        mut managed_envelope: ManagedEnvelope,
1779        ctx: processing::Context<'_>,
1780    ) -> Result<ProcessingResult, ProcessingError> {
1781        nel::convert_to_logs(&mut managed_envelope);
1782        self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1783            .await
1784    }
1785
1786    async fn process_with_processor<P: processing::Processor>(
1787        &self,
1788        processor: &P,
1789        mut managed_envelope: ManagedEnvelope,
1790        ctx: processing::Context<'_>,
1791    ) -> Result<ProcessingResult, ProcessingError>
1792    where
1793        Outputs: From<P::Output>,
1794    {
1795        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1796            debug_assert!(
1797                false,
1798                "there must be work for the {} processor",
1799                std::any::type_name::<P>(),
1800            );
1801            return Err(ProcessingError::ProcessingGroupMismatch);
1802        };
1803
1804        managed_envelope.update();
1805        match managed_envelope.envelope().is_empty() {
1806            true => managed_envelope.accept(),
1807            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1808        }
1809
1810        processor
1811            .process(work, ctx)
1812            .await
1813            .map_err(|err| {
1814                relay_log::debug!(
1815                    error = &err as &dyn std::error::Error,
1816                    "processing pipeline failed"
1817                );
1818                ProcessingError::ProcessingFailure
1819            })
1820            .map(|o| o.map(Into::into))
1821            .map(ProcessingResult::Output)
1822    }
1823
1824    /// Processes standalone spans.
1825    ///
1826    /// This function does *not* run for spans extracted from transactions.
1827    async fn process_standalone_spans(
1828        &self,
1829        managed_envelope: &mut TypedEnvelope<SpanGroup>,
1830        _project_id: ProjectId,
1831        ctx: processing::Context<'_>,
1832    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1833        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1834
1835        span::filter(managed_envelope, ctx.config, ctx.project_info);
1836        span::convert_otel_traces_data(managed_envelope);
1837
1838        if_processing!(self.inner.config, {
1839            span::process(
1840                managed_envelope,
1841                &mut Annotated::empty(),
1842                &mut extracted_metrics,
1843                _project_id,
1844                ctx,
1845                &self.inner.geoip_lookup,
1846            )
1847            .await;
1848        });
1849
1850        self.enforce_quotas(
1851            managed_envelope,
1852            Annotated::empty(),
1853            &mut extracted_metrics,
1854            ctx,
1855        )
1856        .await?;
1857
1858        Ok(Some(extracted_metrics))
1859    }
1860
1861    async fn process_envelope(
1862        &self,
1863        cogs: &mut Token,
1864        project_id: ProjectId,
1865        message: ProcessEnvelopeGrouped<'_>,
1866    ) -> Result<ProcessingResult, ProcessingError> {
1867        let ProcessEnvelopeGrouped {
1868            group,
1869            envelope: mut managed_envelope,
1870            ctx,
1871        } = message;
1872
1873        // Pre-process the envelope headers.
1874        if let Some(sampling_state) = ctx.sampling_project_info {
1875            // Both transactions and standalone span envelopes need a normalized DSC header
1876            // to make sampling rules based on the segment/transaction name work correctly.
1877            managed_envelope
1878                .envelope_mut()
1879                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1880        }
1881
1882        // Set the event retention. Effectively, this value will only be available in processing
1883        // mode when the full project config is queried from the upstream.
1884        if let Some(retention) = ctx.project_info.config.event_retention {
1885            managed_envelope.envelope_mut().set_retention(retention);
1886        }
1887
1888        // Set the event retention. Effectively, this value will only be available in processing
1889        // mode when the full project config is queried from the upstream.
1890        if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1891            managed_envelope
1892                .envelope_mut()
1893                .set_downsampled_retention(retention);
1894        }
1895
1896        // Ensure the project ID is updated to the stored instance for this project cache. This can
1897        // differ in two cases:
1898        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
1899        //  2. The DSN was moved and the envelope sent to the old project ID.
1900        managed_envelope
1901            .envelope_mut()
1902            .meta_mut()
1903            .set_project_id(project_id);
1904
1905        macro_rules! run {
1906            ($fn_name:ident $(, $args:expr)*) => {
1907                async {
1908                    let mut managed_envelope = (managed_envelope, group).try_into()?;
1909                    match self.$fn_name(&mut managed_envelope, $($args),*).await {
1910                        Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1911                            managed_envelope: managed_envelope.into_processed(),
1912                            extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1913                        }),
1914                        Err(error) => {
1915                            relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1916                            if let Some(outcome) = error.to_outcome() {
1917                                managed_envelope.reject(outcome);
1918                            }
1919
1920                            return Err(error);
1921                        }
1922                    }
1923                }.await
1924            };
1925        }
1926
1927        relay_log::trace!("Processing {group} group", group = group.variant());
1928
1929        match group {
1930            ProcessingGroup::Error => run!(process_errors, project_id, ctx),
1931            ProcessingGroup::Transaction => {
1932                run!(process_transactions, cogs, project_id, ctx)
1933            }
1934            ProcessingGroup::Session => {
1935                self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1936                    .await
1937            }
1938            ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx),
1939            ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
1940            ProcessingGroup::Replay => {
1941                run!(process_replays, ctx)
1942            }
1943            ProcessingGroup::CheckIn => {
1944                self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1945                    .await
1946            }
1947            ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1948            ProcessingGroup::Log => {
1949                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1950                    .await
1951            }
1952            ProcessingGroup::TraceMetric => {
1953                self.process_with_processor(
1954                    &self.inner.processing.trace_metrics,
1955                    managed_envelope,
1956                    ctx,
1957                )
1958                .await
1959            }
1960            ProcessingGroup::SpanV2 => {
1961                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1962                    .await
1963            }
1964            ProcessingGroup::TraceAttachment => {
1965                self.process_with_processor(
1966                    &self.inner.processing.trace_attachments,
1967                    managed_envelope,
1968                    ctx,
1969                )
1970                .await
1971            }
1972            ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1973            ProcessingGroup::ProfileChunk => {
1974                self.process_with_processor(
1975                    &self.inner.processing.profile_chunks,
1976                    managed_envelope,
1977                    ctx,
1978                )
1979                .await
1980            }
1981            // Currently is not used.
1982            ProcessingGroup::Metrics => {
1983                // In proxy mode we simply forward the metrics.
1984                // This group shouldn't be used outside of proxy mode.
1985                if self.inner.config.relay_mode() != RelayMode::Proxy {
1986                    relay_log::error!(
1987                        tags.project = %project_id,
1988                        items = ?managed_envelope.envelope().items().next().map(Item::ty),
1989                        "received metrics in the process_state"
1990                    );
1991                }
1992
1993                Ok(ProcessingResult::no_metrics(
1994                    managed_envelope.into_processed(),
1995                ))
1996            }
1997            // Fallback to the legacy process_state implementation for Ungrouped events.
1998            ProcessingGroup::Ungrouped => {
1999                relay_log::error!(
2000                    tags.project = %project_id,
2001                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
2002                    "could not identify the processing group based on the envelope's items"
2003                );
2004
2005                Ok(ProcessingResult::no_metrics(
2006                    managed_envelope.into_processed(),
2007                ))
2008            }
2009            // Leave this group unchanged.
2010            //
2011            // This will later be forwarded to upstream.
2012            ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
2013                managed_envelope.into_processed(),
2014            )),
2015        }
2016    }
2017
2018    /// Processes the envelope and returns the processed envelope back.
2019    ///
2020    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
2021    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
2022    /// to be dropped, this is `None`.
2023    async fn process<'a>(
2024        &self,
2025        cogs: &mut Token,
2026        mut message: ProcessEnvelopeGrouped<'a>,
2027    ) -> Result<Option<Submit<'a>>, ProcessingError> {
2028        let ProcessEnvelopeGrouped {
2029            ref mut envelope,
2030            ctx,
2031            ..
2032        } = message;
2033
2034        // Prefer the project's project ID, and fall back to the stated project id from the
2035        // envelope. The project ID is available in all modes, other than in proxy mode, where
2036        // envelopes for unknown projects are forwarded blindly.
2037        //
2038        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
2039        // since we cannot process an envelope without project ID, so drop it.
2040        let Some(project_id) = ctx
2041            .project_info
2042            .project_id
2043            .or_else(|| envelope.envelope().meta().project_id())
2044        else {
2045            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2046            return Err(ProcessingError::MissingProjectId);
2047        };
2048
2049        let client = envelope.envelope().meta().client().map(str::to_owned);
2050        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2051        let project_key = envelope.envelope().meta().public_key();
2052        // Only allow sending to the sampling key, if we successfully loaded a sampling project
2053        // info relating to it. This filters out unknown/invalid project keys as well as project
2054        // keys from different organizations.
2055        let sampling_key = envelope
2056            .envelope()
2057            .sampling_key()
2058            .filter(|_| ctx.sampling_project_info.is_some());
2059
2060        // We set additional information on the scope, which will be removed after processing the
2061        // envelope.
2062        relay_log::configure_scope(|scope| {
2063            scope.set_tag("project", project_id);
2064            if let Some(client) = client {
2065                scope.set_tag("sdk", client);
2066            }
2067            if let Some(user_agent) = user_agent {
2068                scope.set_extra("user_agent", user_agent.into());
2069            }
2070        });
2071
2072        let result = match self.process_envelope(cogs, project_id, message).await {
2073            Ok(ProcessingResult::Envelope {
2074                mut managed_envelope,
2075                extracted_metrics,
2076            }) => {
2077                // The envelope could be modified or even emptied during processing, which
2078                // requires re-computation of the context.
2079                managed_envelope.update();
2080
2081                let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2082                send_metrics(
2083                    extracted_metrics.metrics,
2084                    project_key,
2085                    sampling_key,
2086                    &self.inner.addrs.aggregator,
2087                );
2088
2089                let envelope_response = if managed_envelope.envelope().is_empty() {
2090                    if !has_metrics {
2091                        // Individual rate limits have already been issued
2092                        managed_envelope.reject(Outcome::RateLimited(None));
2093                    } else {
2094                        managed_envelope.accept();
2095                    }
2096
2097                    None
2098                } else {
2099                    Some(managed_envelope)
2100                };
2101
2102                Ok(envelope_response.map(Submit::Envelope))
2103            }
2104            Ok(ProcessingResult::Output(Output { main, metrics })) => {
2105                if let Some(metrics) = metrics {
2106                    metrics.accept(|metrics| {
2107                        send_metrics(
2108                            metrics,
2109                            project_key,
2110                            sampling_key,
2111                            &self.inner.addrs.aggregator,
2112                        );
2113                    });
2114                }
2115
2116                let ctx = ctx.to_forward();
2117                Ok(main.map(|output| Submit::Output { output, ctx }))
2118            }
2119            Err(err) => Err(err),
2120        };
2121
2122        relay_log::configure_scope(|scope| {
2123            scope.remove_tag("project");
2124            scope.remove_tag("sdk");
2125            scope.remove_tag("user_agent");
2126        });
2127
2128        result
2129    }
2130
2131    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2132        let project_key = message.envelope.envelope().meta().public_key();
2133        let wait_time = message.envelope.age();
2134        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2135
2136        // This COGS handling may need an overhaul in the future:
2137        // Cancel the passed in token, to start individual measurements per envelope instead.
2138        cogs.cancel();
2139
2140        let scoping = message.envelope.scoping();
2141        for (group, envelope) in ProcessingGroup::split_envelope(
2142            *message.envelope.into_envelope(),
2143            &message.project_info,
2144        ) {
2145            let mut cogs = self
2146                .inner
2147                .cogs
2148                .timed(ResourceId::Relay, AppFeature::from(group));
2149
2150            let mut envelope =
2151                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2152            envelope.scope(scoping);
2153
2154            let global_config = self.inner.global_config.current();
2155
2156            let ctx = processing::Context {
2157                config: &self.inner.config,
2158                global_config: &global_config,
2159                project_info: &message.project_info,
2160                sampling_project_info: message.sampling_project_info.as_deref(),
2161                rate_limits: &message.rate_limits,
2162                reservoir_counters: &message.reservoir_counters,
2163            };
2164
2165            let message = ProcessEnvelopeGrouped {
2166                group,
2167                envelope,
2168                ctx,
2169            };
2170
2171            let result = metric!(
2172                timer(RelayTimers::EnvelopeProcessingTime),
2173                group = group.variant(),
2174                { self.process(&mut cogs, message).await }
2175            );
2176
2177            match result {
2178                Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2179                Ok(None) => {}
2180                Err(error) if error.is_unexpected() => {
2181                    relay_log::error!(
2182                        tags.project_key = %project_key,
2183                        error = &error as &dyn Error,
2184                        "error processing envelope"
2185                    )
2186                }
2187                Err(error) => {
2188                    relay_log::debug!(
2189                        tags.project_key = %project_key,
2190                        error = &error as &dyn Error,
2191                        "error processing envelope"
2192                    )
2193                }
2194            }
2195        }
2196    }
2197
2198    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2199        let ProcessMetrics {
2200            data,
2201            project_key,
2202            received_at,
2203            sent_at,
2204            source,
2205        } = message;
2206
2207        let received_timestamp =
2208            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2209
2210        let mut buckets = data.into_buckets(received_timestamp);
2211        if buckets.is_empty() {
2212            return;
2213        };
2214        cogs.update(relay_metrics::cogs::BySize(&buckets));
2215
2216        let clock_drift_processor =
2217            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2218
2219        buckets.retain_mut(|bucket| {
2220            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2221                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2222                return false;
2223            }
2224
2225            if !self::metrics::is_valid_namespace(bucket) {
2226                return false;
2227            }
2228
2229            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2230
2231            if !matches!(source, BucketSource::Internal) {
2232                bucket.metadata = BucketMetadata::new(received_timestamp);
2233            }
2234
2235            true
2236        });
2237
2238        let project = self.inner.project_cache.get(project_key);
2239
2240        // Best effort check to filter and rate limit buckets, if there is no project state
2241        // available at the current time, we will check again after flushing.
2242        let buckets = match project.state() {
2243            ProjectState::Enabled(project_info) => {
2244                let rate_limits = project.rate_limits().current_limits();
2245                self.check_buckets(project_key, project_info, &rate_limits, buckets)
2246            }
2247            _ => buckets,
2248        };
2249
2250        relay_log::trace!("merging metric buckets into the aggregator");
2251        self.inner
2252            .addrs
2253            .aggregator
2254            .send(MergeBuckets::new(project_key, buckets));
2255    }
2256
2257    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2258        let ProcessBatchedMetrics {
2259            payload,
2260            source,
2261            received_at,
2262            sent_at,
2263        } = message;
2264
2265        #[derive(serde::Deserialize)]
2266        struct Wrapper {
2267            buckets: HashMap<ProjectKey, Vec<Bucket>>,
2268        }
2269
2270        let buckets = match serde_json::from_slice(&payload) {
2271            Ok(Wrapper { buckets }) => buckets,
2272            Err(error) => {
2273                relay_log::debug!(
2274                    error = &error as &dyn Error,
2275                    "failed to parse batched metrics",
2276                );
2277                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2278                return;
2279            }
2280        };
2281
2282        for (project_key, buckets) in buckets {
2283            self.handle_process_metrics(
2284                cogs,
2285                ProcessMetrics {
2286                    data: MetricData::Parsed(buckets),
2287                    project_key,
2288                    source,
2289                    received_at,
2290                    sent_at,
2291                },
2292            )
2293        }
2294    }
2295
2296    fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2297        let _submit = cogs.start_category("submit");
2298
2299        #[cfg(feature = "processing")]
2300        if self.inner.config.processing_enabled()
2301            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2302        {
2303            use crate::processing::StoreHandle;
2304
2305            let upload = self.inner.addrs.upload.as_ref();
2306            match submit {
2307                Submit::Envelope(envelope) => {
2308                    let envelope_has_attachments = envelope
2309                        .envelope()
2310                        .items()
2311                        .any(|item| *item.ty() == ItemType::Attachment);
2312                    // Whether Relay will store this attachment in objectstore or use kafka like before.
2313                    let use_objectstore = || {
2314                        let options = &self.inner.global_config.current().options;
2315                        utils::sample(options.objectstore_attachments_sample_rate).is_keep()
2316                    };
2317
2318                    if let Some(upload) = &self.inner.addrs.upload
2319                        && envelope_has_attachments
2320                        && use_objectstore()
2321                    {
2322                        // the `UploadService` will upload all attachments, and then forward the envelope to the `StoreService`.
2323                        upload.send(StoreEnvelope { envelope })
2324                    } else {
2325                        store_forwarder.send(StoreEnvelope { envelope })
2326                    }
2327                }
2328                Submit::Output { output, ctx } => output
2329                    .forward_store(StoreHandle::new(store_forwarder, upload), ctx)
2330                    .unwrap_or_else(|err| err.into_inner()),
2331            }
2332            return;
2333        }
2334
2335        let mut envelope = match submit {
2336            Submit::Envelope(envelope) => envelope,
2337            Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2338                Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2339                Err(_) => {
2340                    relay_log::error!("failed to serialize output to an envelope");
2341                    return;
2342                }
2343            },
2344        };
2345
2346        if envelope.envelope_mut().is_empty() {
2347            envelope.accept();
2348            return;
2349        }
2350
2351        // Override the `sent_at` timestamp. Since the envelope went through basic
2352        // normalization, all timestamps have been corrected. We propagate the new
2353        // `sent_at` to allow the next Relay to double-check this timestamp and
2354        // potentially apply correction again. This is done as close to sending as
2355        // possible so that we avoid internal delays.
2356        envelope.envelope_mut().set_sent_at(Utc::now());
2357
2358        relay_log::trace!("sending envelope to sentry endpoint");
2359        let http_encoding = self.inner.config.http_encoding();
2360        let result = envelope.envelope().to_vec().and_then(|v| {
2361            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2362        });
2363
2364        match result {
2365            Ok(body) => {
2366                self.inner
2367                    .addrs
2368                    .upstream_relay
2369                    .send(SendRequest(SendEnvelope {
2370                        envelope,
2371                        body,
2372                        http_encoding,
2373                        project_cache: self.inner.project_cache.clone(),
2374                    }));
2375            }
2376            Err(error) => {
2377                // Errors are only logged for what we consider an internal discard reason. These
2378                // indicate errors in the infrastructure or implementation bugs.
2379                relay_log::error!(
2380                    error = &error as &dyn Error,
2381                    tags.project_key = %envelope.scoping().project_key,
2382                    "failed to serialize envelope payload"
2383                );
2384
2385                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2386            }
2387        }
2388    }
2389
2390    fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2391        let SubmitClientReports {
2392            client_reports,
2393            scoping,
2394        } = message;
2395
2396        let upstream = self.inner.config.upstream_descriptor();
2397        let dsn = PartialDsn::outbound(&scoping, upstream);
2398
2399        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2400        for client_report in client_reports {
2401            match client_report.serialize() {
2402                Ok(payload) => {
2403                    let mut item = Item::new(ItemType::ClientReport);
2404                    item.set_payload(ContentType::Json, payload);
2405                    envelope.add_item(item);
2406                }
2407                Err(error) => {
2408                    relay_log::error!(
2409                        error = &error as &dyn std::error::Error,
2410                        "failed to serialize client report"
2411                    );
2412                }
2413            }
2414        }
2415
2416        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2417        self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2418    }
2419
2420    fn check_buckets(
2421        &self,
2422        project_key: ProjectKey,
2423        project_info: &ProjectInfo,
2424        rate_limits: &RateLimits,
2425        buckets: Vec<Bucket>,
2426    ) -> Vec<Bucket> {
2427        let Some(scoping) = project_info.scoping(project_key) else {
2428            relay_log::error!(
2429                tags.project_key = project_key.as_str(),
2430                "there is no scoping: dropping {} buckets",
2431                buckets.len(),
2432            );
2433            return Vec::new();
2434        };
2435
2436        let mut buckets = self::metrics::apply_project_info(
2437            buckets,
2438            &self.inner.metric_outcomes,
2439            project_info,
2440            scoping,
2441        );
2442
2443        let namespaces: BTreeSet<MetricNamespace> = buckets
2444            .iter()
2445            .filter_map(|bucket| bucket.name.try_namespace())
2446            .collect();
2447
2448        for namespace in namespaces {
2449            let limits = rate_limits.check_with_quotas(
2450                project_info.get_quotas(),
2451                scoping.item(DataCategory::MetricBucket),
2452            );
2453
2454            if limits.is_limited() {
2455                let rejected;
2456                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2457                    bucket.name.try_namespace() == Some(namespace)
2458                });
2459
2460                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2461                self.inner.metric_outcomes.track(
2462                    scoping,
2463                    &rejected,
2464                    Outcome::RateLimited(reason_code),
2465                );
2466            }
2467        }
2468
2469        let quotas = project_info.config.quotas.clone();
2470        match MetricsLimiter::create(buckets, quotas, scoping) {
2471            Ok(mut bucket_limiter) => {
2472                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2473                bucket_limiter.into_buckets()
2474            }
2475            Err(buckets) => buckets,
2476        }
2477    }
2478
2479    #[cfg(feature = "processing")]
2480    async fn rate_limit_buckets(
2481        &self,
2482        scoping: Scoping,
2483        project_info: &ProjectInfo,
2484        mut buckets: Vec<Bucket>,
2485    ) -> Vec<Bucket> {
2486        let Some(rate_limiter) = &self.inner.rate_limiter else {
2487            return buckets;
2488        };
2489
2490        let global_config = self.inner.global_config.current();
2491        let namespaces = buckets
2492            .iter()
2493            .filter_map(|bucket| bucket.name.try_namespace())
2494            .counts();
2495
2496        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2497
2498        for (namespace, quantity) in namespaces {
2499            let item_scoping = scoping.metric_bucket(namespace);
2500
2501            let limits = match rate_limiter
2502                .is_rate_limited(quotas, item_scoping, quantity, false)
2503                .await
2504            {
2505                Ok(limits) => limits,
2506                Err(err) => {
2507                    relay_log::error!(
2508                        error = &err as &dyn std::error::Error,
2509                        "failed to check redis rate limits"
2510                    );
2511                    break;
2512                }
2513            };
2514
2515            if limits.is_limited() {
2516                let rejected;
2517                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2518                    bucket.name.try_namespace() == Some(namespace)
2519                });
2520
2521                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2522                self.inner.metric_outcomes.track(
2523                    scoping,
2524                    &rejected,
2525                    Outcome::RateLimited(reason_code),
2526                );
2527
2528                self.inner
2529                    .project_cache
2530                    .get(item_scoping.scoping.project_key)
2531                    .rate_limits()
2532                    .merge(limits);
2533            }
2534        }
2535
2536        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2537            Err(buckets) => buckets,
2538            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2539        }
2540    }
2541
2542    /// Check and apply rate limits to metrics buckets for transactions and spans.
2543    #[cfg(feature = "processing")]
2544    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2545        relay_log::trace!("handle_rate_limit_buckets");
2546
2547        let scoping = *bucket_limiter.scoping();
2548
2549        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2550            let global_config = self.inner.global_config.current();
2551            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2552
2553            // We set over_accept_once such that the limit is actually reached, which allows subsequent
2554            // calls with quantity=0 to be rate limited.
2555            let over_accept_once = true;
2556            let mut rate_limits = RateLimits::new();
2557
2558            for category in [DataCategory::Transaction, DataCategory::Span] {
2559                let count = bucket_limiter.count(category);
2560
2561                let timer = Instant::now();
2562                let mut is_limited = false;
2563
2564                if let Some(count) = count {
2565                    match rate_limiter
2566                        .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2567                        .await
2568                    {
2569                        Ok(limits) => {
2570                            is_limited = limits.is_limited();
2571                            rate_limits.merge(limits)
2572                        }
2573                        Err(e) => relay_log::error!(error = &e as &dyn Error),
2574                    }
2575                }
2576
2577                relay_statsd::metric!(
2578                    timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2579                    category = category.name(),
2580                    limited = if is_limited { "true" } else { "false" },
2581                    count = match count {
2582                        None => "none",
2583                        Some(0) => "0",
2584                        Some(1) => "1",
2585                        Some(1..=10) => "10",
2586                        Some(1..=25) => "25",
2587                        Some(1..=50) => "50",
2588                        Some(51..=100) => "100",
2589                        Some(101..=500) => "500",
2590                        _ => "> 500",
2591                    },
2592                );
2593            }
2594
2595            if rate_limits.is_limited() {
2596                let was_enforced =
2597                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2598
2599                if was_enforced {
2600                    // Update the rate limits in the project cache.
2601                    self.inner
2602                        .project_cache
2603                        .get(scoping.project_key)
2604                        .rate_limits()
2605                        .merge(rate_limits);
2606                }
2607            }
2608        }
2609
2610        bucket_limiter.into_buckets()
2611    }
2612
2613    /// Cardinality limits the passed buckets and returns a filtered vector of only accepted buckets.
2614    #[cfg(feature = "processing")]
2615    async fn cardinality_limit_buckets(
2616        &self,
2617        scoping: Scoping,
2618        limits: &[CardinalityLimit],
2619        buckets: Vec<Bucket>,
2620    ) -> Vec<Bucket> {
2621        let global_config = self.inner.global_config.current();
2622        let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2623
2624        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2625            return buckets;
2626        }
2627
2628        let Some(ref limiter) = self.inner.cardinality_limiter else {
2629            return buckets;
2630        };
2631
2632        let scope = relay_cardinality::Scoping {
2633            organization_id: scoping.organization_id,
2634            project_id: scoping.project_id,
2635        };
2636
2637        let limits = match limiter
2638            .check_cardinality_limits(scope, limits, buckets)
2639            .await
2640        {
2641            Ok(limits) => limits,
2642            Err((buckets, error)) => {
2643                relay_log::error!(
2644                    error = &error as &dyn std::error::Error,
2645                    "cardinality limiter failed"
2646                );
2647                return buckets;
2648            }
2649        };
2650
2651        let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2652        if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2653            for limit in limits.exceeded_limits() {
2654                relay_log::with_scope(
2655                    |scope| {
2656                        // Set the organization as user so we can alert on distinct org_ids.
2657                        scope.set_user(Some(relay_log::sentry::User {
2658                            id: Some(scoping.organization_id.to_string()),
2659                            ..Default::default()
2660                        }));
2661                    },
2662                    || {
2663                        relay_log::error!(
2664                            tags.organization_id = scoping.organization_id.value(),
2665                            tags.limit_id = limit.id,
2666                            tags.passive = limit.passive,
2667                            "Cardinality Limit"
2668                        );
2669                    },
2670                );
2671            }
2672        }
2673
2674        for (limit, reports) in limits.cardinality_reports() {
2675            for report in reports {
2676                self.inner
2677                    .metric_outcomes
2678                    .cardinality(scoping, limit, report);
2679            }
2680        }
2681
2682        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2683            return limits.into_source();
2684        }
2685
2686        let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2687
2688        for (bucket, exceeded) in rejected {
2689            self.inner.metric_outcomes.track(
2690                scoping,
2691                &[bucket],
2692                Outcome::CardinalityLimited(exceeded.id.clone()),
2693            );
2694        }
2695        accepted
2696    }
2697
2698    /// Processes metric buckets and sends them to kafka.
2699    ///
2700    /// This function runs the following steps:
2701    ///  - cardinality limiting
2702    ///  - rate limiting
2703    ///  - submit to `StoreForwarder`
2704    #[cfg(feature = "processing")]
2705    async fn encode_metrics_processing(
2706        &self,
2707        message: FlushBuckets,
2708        store_forwarder: &Addr<Store>,
2709    ) {
2710        use crate::constants::DEFAULT_EVENT_RETENTION;
2711        use crate::services::store::StoreMetrics;
2712
2713        for ProjectBuckets {
2714            buckets,
2715            scoping,
2716            project_info,
2717            ..
2718        } in message.buckets.into_values()
2719        {
2720            let buckets = self
2721                .rate_limit_buckets(scoping, &project_info, buckets)
2722                .await;
2723
2724            let limits = project_info.get_cardinality_limits();
2725            let buckets = self
2726                .cardinality_limit_buckets(scoping, limits, buckets)
2727                .await;
2728
2729            if buckets.is_empty() {
2730                continue;
2731            }
2732
2733            let retention = project_info
2734                .config
2735                .event_retention
2736                .unwrap_or(DEFAULT_EVENT_RETENTION);
2737
2738            // The store forwarder takes care of bucket splitting internally, so we can submit the
2739            // entire list of buckets. There is no batching needed here.
2740            store_forwarder.send(StoreMetrics {
2741                buckets,
2742                scoping,
2743                retention,
2744            });
2745        }
2746    }
2747
2748    /// Serializes metric buckets to JSON and sends them to the upstream.
2749    ///
2750    /// This function runs the following steps:
2751    ///  - partitioning
2752    ///  - batching by configured size limit
2753    ///  - serialize to JSON and pack in an envelope
2754    ///  - submit the envelope to upstream or kafka depending on configuration
2755    ///
2756    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2757    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2758    /// already.
2759    fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2760        let FlushBuckets {
2761            partition_key,
2762            buckets,
2763        } = message;
2764
2765        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2766        let upstream = self.inner.config.upstream_descriptor();
2767
2768        for ProjectBuckets {
2769            buckets, scoping, ..
2770        } in buckets.values()
2771        {
2772            let dsn = PartialDsn::outbound(scoping, upstream);
2773
2774            relay_statsd::metric!(
2775                distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2776            );
2777
2778            let mut num_batches = 0;
2779            for batch in BucketsView::from(buckets).by_size(batch_size) {
2780                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2781
2782                let mut item = Item::new(ItemType::MetricBuckets);
2783                item.set_source_quantities(crate::metrics::extract_quantities(batch));
2784                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2785                envelope.add_item(item);
2786
2787                let mut envelope =
2788                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2789                envelope
2790                    .set_partition_key(Some(partition_key))
2791                    .scope(*scoping);
2792
2793                relay_statsd::metric!(
2794                    distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2795                );
2796
2797                self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2798                num_batches += 1;
2799            }
2800
2801            relay_statsd::metric!(
2802                distribution(RelayDistributions::BatchesPerPartition) = num_batches
2803            );
2804        }
2805    }
2806
2807    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
2808    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2809        if partition.is_empty() {
2810            return;
2811        }
2812
2813        let (unencoded, project_info) = partition.take();
2814        let http_encoding = self.inner.config.http_encoding();
2815        let encoded = match encode_payload(&unencoded, http_encoding) {
2816            Ok(payload) => payload,
2817            Err(error) => {
2818                let error = &error as &dyn std::error::Error;
2819                relay_log::error!(error, "failed to encode metrics payload");
2820                return;
2821            }
2822        };
2823
2824        let request = SendMetricsRequest {
2825            partition_key: partition_key.to_string(),
2826            unencoded,
2827            encoded,
2828            project_info,
2829            http_encoding,
2830            metric_outcomes: self.inner.metric_outcomes.clone(),
2831        };
2832
2833        self.inner.addrs.upstream_relay.send(SendRequest(request));
2834    }
2835
2836    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
2837    ///
2838    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
2839    /// payload directly instead of per-project Envelopes.
2840    ///
2841    /// This function runs the following steps:
2842    ///  - partitioning
2843    ///  - batching by configured size limit
2844    ///  - serialize to JSON
2845    ///  - submit directly to the upstream
2846    ///
2847    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2848    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2849    /// already.
2850    fn encode_metrics_global(&self, message: FlushBuckets) {
2851        let FlushBuckets {
2852            partition_key,
2853            buckets,
2854        } = message;
2855
2856        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2857        let mut partition = Partition::new(batch_size);
2858        let mut partition_splits = 0;
2859
2860        for ProjectBuckets {
2861            buckets, scoping, ..
2862        } in buckets.values()
2863        {
2864            for bucket in buckets {
2865                let mut remaining = Some(BucketView::new(bucket));
2866
2867                while let Some(bucket) = remaining.take() {
2868                    if let Some(next) = partition.insert(bucket, *scoping) {
2869                        // A part of the bucket could not be inserted. Take the partition and submit
2870                        // it immediately. Repeat until the final part was inserted. This should
2871                        // always result in a request, otherwise we would enter an endless loop.
2872                        self.send_global_partition(partition_key, &mut partition);
2873                        remaining = Some(next);
2874                        partition_splits += 1;
2875                    }
2876                }
2877            }
2878        }
2879
2880        if partition_splits > 0 {
2881            metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2882        }
2883
2884        self.send_global_partition(partition_key, &mut partition);
2885    }
2886
2887    async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2888        for (project_key, pb) in message.buckets.iter_mut() {
2889            let buckets = std::mem::take(&mut pb.buckets);
2890            pb.buckets =
2891                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2892        }
2893
2894        #[cfg(feature = "processing")]
2895        if self.inner.config.processing_enabled()
2896            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2897        {
2898            return self
2899                .encode_metrics_processing(message, store_forwarder)
2900                .await;
2901        }
2902
2903        if self.inner.config.http_global_metrics() {
2904            self.encode_metrics_global(message)
2905        } else {
2906            self.encode_metrics_envelope(cogs, message)
2907        }
2908    }
2909
2910    #[cfg(all(test, feature = "processing"))]
2911    fn redis_rate_limiter_enabled(&self) -> bool {
2912        self.inner.rate_limiter.is_some()
2913    }
2914
2915    async fn handle_message(&self, message: EnvelopeProcessor) {
2916        let ty = message.variant();
2917        let feature_weights = self.feature_weights(&message);
2918
2919        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2920            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2921
2922            match message {
2923                EnvelopeProcessor::ProcessEnvelope(m) => {
2924                    self.handle_process_envelope(&mut cogs, *m).await
2925                }
2926                EnvelopeProcessor::ProcessProjectMetrics(m) => {
2927                    self.handle_process_metrics(&mut cogs, *m)
2928                }
2929                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2930                    self.handle_process_batched_metrics(&mut cogs, *m)
2931                }
2932                EnvelopeProcessor::FlushBuckets(m) => {
2933                    self.handle_flush_buckets(&mut cogs, *m).await
2934                }
2935                EnvelopeProcessor::SubmitClientReports(m) => {
2936                    self.handle_submit_client_reports(&mut cogs, *m)
2937                }
2938            }
2939        });
2940    }
2941
2942    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2943        match message {
2944            // Envelope is split later and tokens are attributed then.
2945            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2946            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2947            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2948            EnvelopeProcessor::FlushBuckets(v) => v
2949                .buckets
2950                .values()
2951                .map(|s| {
2952                    if self.inner.config.processing_enabled() {
2953                        // Processing does not encode the metrics but instead rate and cardinality
2954                        // limits the metrics, which scales by count and not size.
2955                        relay_metrics::cogs::ByCount(&s.buckets).into()
2956                    } else {
2957                        relay_metrics::cogs::BySize(&s.buckets).into()
2958                    }
2959                })
2960                .fold(FeatureWeights::none(), FeatureWeights::merge),
2961            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2962        }
2963    }
2964}
2965
2966impl Service for EnvelopeProcessorService {
2967    type Interface = EnvelopeProcessor;
2968
2969    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2970        while let Some(message) = rx.recv().await {
2971            let service = self.clone();
2972            self.inner
2973                .pool
2974                .spawn_async(
2975                    async move {
2976                        service.handle_message(message).await;
2977                    }
2978                    .boxed(),
2979                )
2980                .await;
2981        }
2982    }
2983}
2984
2985/// Result of the enforcement of rate limiting.
2986///
2987/// If the event is already `None` or it's rate limited, it will be `None`
2988/// within the [`Annotated`].
2989struct EnforcementResult {
2990    event: Annotated<Event>,
2991    #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2992    rate_limits: RateLimits,
2993}
2994
2995impl EnforcementResult {
2996    /// Creates a new [`EnforcementResult`].
2997    pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
2998        Self { event, rate_limits }
2999    }
3000}
3001
3002#[derive(Clone)]
3003enum RateLimiter {
3004    Cached,
3005    #[cfg(feature = "processing")]
3006    Consistent(Arc<RedisRateLimiter>),
3007}
3008
3009impl RateLimiter {
3010    async fn enforce<Group>(
3011        &self,
3012        managed_envelope: &mut TypedEnvelope<Group>,
3013        event: Annotated<Event>,
3014        _extracted_metrics: &mut ProcessingExtractedMetrics,
3015        ctx: processing::Context<'_>,
3016    ) -> Result<EnforcementResult, ProcessingError> {
3017        if managed_envelope.envelope().is_empty() && event.value().is_none() {
3018            return Ok(EnforcementResult::new(event, RateLimits::default()));
3019        }
3020
3021        let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
3022        if quotas.is_empty() {
3023            return Ok(EnforcementResult::new(event, RateLimits::default()));
3024        }
3025
3026        let event_category = event_category(&event);
3027
3028        // We extract the rate limiters, in case we perform consistent rate limiting, since we will
3029        // need Redis access.
3030        //
3031        // When invoking the rate limiter, capture if the event item has been rate limited to also
3032        // remove it from the processing state eventually.
3033        let this = self.clone();
3034        let mut envelope_limiter =
3035            EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
3036                let this = this.clone();
3037
3038                async move {
3039                    match this {
3040                        #[cfg(feature = "processing")]
3041                        RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
3042                            rate_limiter
3043                                .is_rate_limited(quotas, item_scope, _quantity, false)
3044                                .await?,
3045                        ),
3046                        _ => Ok::<_, ProcessingError>(
3047                            ctx.rate_limits.check_with_quotas(quotas, item_scope),
3048                        ),
3049                    }
3050                }
3051            });
3052
3053        // Tell the envelope limiter about the event, since it has been removed from the Envelope at
3054        // this stage in processing.
3055        if let Some(category) = event_category {
3056            envelope_limiter.assume_event(category);
3057        }
3058
3059        let scoping = managed_envelope.scoping();
3060        let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
3061            envelope_limiter
3062                .compute(managed_envelope.envelope_mut(), &scoping)
3063                .await
3064        })?;
3065        let event_active = enforcement.is_event_active();
3066
3067        // Use the same rate limits as used for the envelope on the metrics.
3068        // Those rate limits should not be checked for expiry or similar to ensure a consistent
3069        // limiting of envelope items and metrics.
3070        #[cfg(feature = "processing")]
3071        _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3072        enforcement.apply_with_outcomes(managed_envelope);
3073
3074        if event_active {
3075            debug_assert!(managed_envelope.envelope().is_empty());
3076            return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3077        }
3078
3079        Ok(EnforcementResult::new(event, rate_limits))
3080    }
3081
3082    fn name(&self) -> &'static str {
3083        match self {
3084            Self::Cached => "cached",
3085            #[cfg(feature = "processing")]
3086            Self::Consistent(_) => "consistent",
3087        }
3088    }
3089}
3090
3091pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3092    let envelope_body: Vec<u8> = match http_encoding {
3093        HttpEncoding::Identity => return Ok(body.clone()),
3094        HttpEncoding::Deflate => {
3095            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3096            encoder.write_all(body.as_ref())?;
3097            encoder.finish()?
3098        }
3099        HttpEncoding::Gzip => {
3100            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3101            encoder.write_all(body.as_ref())?;
3102            encoder.finish()?
3103        }
3104        HttpEncoding::Br => {
3105            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
3106            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3107            encoder.write_all(body.as_ref())?;
3108            encoder.into_inner()
3109        }
3110        HttpEncoding::Zstd => {
3111            // Use the fastest compression level, our main objective here is to get the best
3112            // compression ratio for least amount of time spent.
3113            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3114            encoder.write_all(body.as_ref())?;
3115            encoder.finish()?
3116        }
3117    };
3118
3119    Ok(envelope_body.into())
3120}
3121
3122/// An upstream request that submits an envelope via HTTP.
3123#[derive(Debug)]
3124pub struct SendEnvelope {
3125    pub envelope: TypedEnvelope<Processed>,
3126    pub body: Bytes,
3127    pub http_encoding: HttpEncoding,
3128    pub project_cache: ProjectCacheHandle,
3129}
3130
3131impl UpstreamRequest for SendEnvelope {
3132    fn method(&self) -> reqwest::Method {
3133        reqwest::Method::POST
3134    }
3135
3136    fn path(&self) -> Cow<'_, str> {
3137        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3138    }
3139
3140    fn route(&self) -> &'static str {
3141        "envelope"
3142    }
3143
3144    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3145        let envelope_body = self.body.clone();
3146        metric!(
3147            distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
3148        );
3149
3150        let meta = &self.envelope.meta();
3151        let shard = self.envelope.partition_key().map(|p| p.to_string());
3152        builder
3153            .content_encoding(self.http_encoding)
3154            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3155            .header_opt("User-Agent", meta.user_agent())
3156            .header("X-Sentry-Auth", meta.auth_header())
3157            .header("X-Forwarded-For", meta.forwarded_for())
3158            .header("Content-Type", envelope::CONTENT_TYPE)
3159            .header_opt("X-Sentry-Relay-Shard", shard)
3160            .body(envelope_body);
3161
3162        Ok(())
3163    }
3164
3165    fn sign(&mut self) -> Option<Sign> {
3166        Some(Sign::Optional(SignatureType::RequestSign))
3167    }
3168
3169    fn respond(
3170        self: Box<Self>,
3171        result: Result<http::Response, UpstreamRequestError>,
3172    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3173        Box::pin(async move {
3174            let result = match result {
3175                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3176                Err(error) => Err(error),
3177            };
3178
3179            match result {
3180                Ok(()) => self.envelope.accept(),
3181                Err(error) if error.is_received() => {
3182                    let scoping = self.envelope.scoping();
3183                    self.envelope.accept();
3184
3185                    if let UpstreamRequestError::RateLimited(limits) = error {
3186                        self.project_cache
3187                            .get(scoping.project_key)
3188                            .rate_limits()
3189                            .merge(limits.scope(&scoping));
3190                    }
3191                }
3192                Err(error) => {
3193                    // Errors are only logged for what we consider an internal discard reason. These
3194                    // indicate errors in the infrastructure or implementation bugs.
3195                    let mut envelope = self.envelope;
3196                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3197                    relay_log::error!(
3198                        error = &error as &dyn Error,
3199                        tags.project_key = %envelope.scoping().project_key,
3200                        "error sending envelope"
3201                    );
3202                }
3203            }
3204        })
3205    }
3206}
3207
3208/// A container for metric buckets from multiple projects.
3209///
3210/// This container is used to send metrics to the upstream in global batches as part of the
3211/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
3212/// the size of all metrics and allows to split them into multiple batches. See
3213/// [`insert`](Self::insert) for more information.
3214#[derive(Debug)]
3215struct Partition<'a> {
3216    max_size: usize,
3217    remaining: usize,
3218    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3219    project_info: HashMap<ProjectKey, Scoping>,
3220}
3221
3222impl<'a> Partition<'a> {
3223    /// Creates a new partition with the given maximum size in bytes.
3224    pub fn new(size: usize) -> Self {
3225        Self {
3226            max_size: size,
3227            remaining: size,
3228            views: HashMap::new(),
3229            project_info: HashMap::new(),
3230        }
3231    }
3232
3233    /// Inserts a bucket into the partition, splitting it if necessary.
3234    ///
3235    /// This function attempts to add the bucket to this partition. If the bucket does not fit
3236    /// entirely into the partition given its maximum size, the remaining part of the bucket is
3237    /// returned from this function call.
3238    ///
3239    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
3240    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
3241    /// partition. Afterwards, the caller is responsible to call this function again with the
3242    /// remaining bucket until it is fully inserted.
3243    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3244        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3245
3246        if let Some(current) = current {
3247            self.remaining = self.remaining.saturating_sub(current.estimated_size());
3248            self.views
3249                .entry(scoping.project_key)
3250                .or_default()
3251                .push(current);
3252
3253            self.project_info
3254                .entry(scoping.project_key)
3255                .or_insert(scoping);
3256        }
3257
3258        next
3259    }
3260
3261    /// Returns `true` if the partition does not hold any data.
3262    fn is_empty(&self) -> bool {
3263        self.views.is_empty()
3264    }
3265
3266    /// Returns the serialized buckets for this partition.
3267    ///
3268    /// This empties the partition, so that it can be reused.
3269    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3270        #[derive(serde::Serialize)]
3271        struct Wrapper<'a> {
3272            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3273        }
3274
3275        let buckets = &self.views;
3276        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3277
3278        let scopings = self.project_info.clone();
3279        self.project_info.clear();
3280
3281        self.views.clear();
3282        self.remaining = self.max_size;
3283
3284        (payload, scopings)
3285    }
3286}
3287
3288/// An upstream request that submits metric buckets via HTTP.
3289///
3290/// This request is not awaited. It automatically tracks outcomes if the request is not received.
3291#[derive(Debug)]
3292struct SendMetricsRequest {
3293    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
3294    partition_key: String,
3295    /// Serialized metric buckets without encoding applied, used for signing.
3296    unencoded: Bytes,
3297    /// Serialized metric buckets with the stated HTTP encoding applied.
3298    encoded: Bytes,
3299    /// Mapping of all contained project keys to their scoping and extraction mode.
3300    ///
3301    /// Used to track outcomes for transmission failures.
3302    project_info: HashMap<ProjectKey, Scoping>,
3303    /// Encoding (compression) of the payload.
3304    http_encoding: HttpEncoding,
3305    /// Metric outcomes instance to send outcomes on error.
3306    metric_outcomes: MetricOutcomes,
3307}
3308
3309impl SendMetricsRequest {
3310    fn create_error_outcomes(self) {
3311        #[derive(serde::Deserialize)]
3312        struct Wrapper {
3313            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3314        }
3315
3316        let buckets = match serde_json::from_slice(&self.unencoded) {
3317            Ok(Wrapper { buckets }) => buckets,
3318            Err(err) => {
3319                relay_log::error!(
3320                    error = &err as &dyn std::error::Error,
3321                    "failed to parse buckets from failed transmission"
3322                );
3323                return;
3324            }
3325        };
3326
3327        for (key, buckets) in buckets {
3328            let Some(&scoping) = self.project_info.get(&key) else {
3329                relay_log::error!("missing scoping for project key");
3330                continue;
3331            };
3332
3333            self.metric_outcomes.track(
3334                scoping,
3335                &buckets,
3336                Outcome::Invalid(DiscardReason::Internal),
3337            );
3338        }
3339    }
3340}
3341
3342impl UpstreamRequest for SendMetricsRequest {
3343    fn set_relay_id(&self) -> bool {
3344        true
3345    }
3346
3347    fn sign(&mut self) -> Option<Sign> {
3348        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3349    }
3350
3351    fn method(&self) -> reqwest::Method {
3352        reqwest::Method::POST
3353    }
3354
3355    fn path(&self) -> Cow<'_, str> {
3356        "/api/0/relays/metrics/".into()
3357    }
3358
3359    fn route(&self) -> &'static str {
3360        "global_metrics"
3361    }
3362
3363    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3364        metric!(
3365            distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3366        );
3367
3368        builder
3369            .content_encoding(self.http_encoding)
3370            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3371            .header(header::CONTENT_TYPE, b"application/json")
3372            .body(self.encoded.clone());
3373
3374        Ok(())
3375    }
3376
3377    fn respond(
3378        self: Box<Self>,
3379        result: Result<http::Response, UpstreamRequestError>,
3380    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3381        Box::pin(async {
3382            match result {
3383                Ok(mut response) => {
3384                    response.consume().await.ok();
3385                }
3386                Err(error) => {
3387                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3388
3389                    // If the request did not arrive at the upstream, we are responsible for outcomes.
3390                    // Otherwise, the upstream is responsible to log outcomes.
3391                    if error.is_received() {
3392                        return;
3393                    }
3394
3395                    self.create_error_outcomes()
3396                }
3397            }
3398        })
3399    }
3400}
3401
3402/// Container for global and project level [`Quota`].
3403#[derive(Copy, Clone, Debug)]
3404struct CombinedQuotas<'a> {
3405    global_quotas: &'a [Quota],
3406    project_quotas: &'a [Quota],
3407}
3408
3409impl<'a> CombinedQuotas<'a> {
3410    /// Returns a new [`CombinedQuotas`].
3411    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3412        Self {
3413            global_quotas: &global_config.quotas,
3414            project_quotas,
3415        }
3416    }
3417
3418    /// Returns `true` if both global quotas and project quotas are empty.
3419    pub fn is_empty(&self) -> bool {
3420        self.len() == 0
3421    }
3422
3423    /// Returns the number of both global and project quotas.
3424    pub fn len(&self) -> usize {
3425        self.global_quotas.len() + self.project_quotas.len()
3426    }
3427}
3428
3429impl<'a> IntoIterator for CombinedQuotas<'a> {
3430    type Item = &'a Quota;
3431    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3432
3433    fn into_iter(self) -> Self::IntoIter {
3434        self.global_quotas.iter().chain(self.project_quotas.iter())
3435    }
3436}
3437
3438#[cfg(test)]
3439mod tests {
3440    use std::collections::BTreeMap;
3441
3442    use insta::assert_debug_snapshot;
3443    use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3444    use relay_common::glob2::LazyGlob;
3445    use relay_dynamic_config::ProjectConfig;
3446    use relay_event_normalization::{
3447        MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3448        TransactionNameRule,
3449    };
3450    use relay_event_schema::protocol::TransactionSource;
3451    use relay_pii::DataScrubbingConfig;
3452    use similar_asserts::assert_eq;
3453
3454    use crate::metrics_extraction::IntoMetric;
3455    use crate::metrics_extraction::transactions::types::{
3456        CommonTags, TransactionMeasurementTags, TransactionMetric,
3457    };
3458    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3459
3460    #[cfg(feature = "processing")]
3461    use {
3462        relay_metrics::BucketValue,
3463        relay_quotas::{QuotaScope, ReasonCode},
3464        relay_test::mock_service,
3465    };
3466
3467    use super::*;
3468
3469    #[cfg(feature = "processing")]
3470    fn mock_quota(id: &str) -> Quota {
3471        Quota {
3472            id: Some(id.into()),
3473            categories: [DataCategory::MetricBucket].into(),
3474            scope: QuotaScope::Organization,
3475            scope_id: None,
3476            limit: Some(0),
3477            window: None,
3478            reason_code: None,
3479            namespace: None,
3480        }
3481    }
3482
3483    #[cfg(feature = "processing")]
3484    #[test]
3485    fn test_dynamic_quotas() {
3486        let global_config = GlobalConfig {
3487            quotas: vec![mock_quota("foo"), mock_quota("bar")],
3488            ..Default::default()
3489        };
3490
3491        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3492
3493        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3494
3495        assert_eq!(dynamic_quotas.len(), 4);
3496        assert!(!dynamic_quotas.is_empty());
3497
3498        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3499        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3500    }
3501
3502    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
3503    /// also ratelimit the next batches in the same message automatically.
3504    #[cfg(feature = "processing")]
3505    #[tokio::test]
3506    async fn test_ratelimit_per_batch() {
3507        use relay_base_schema::organization::OrganizationId;
3508        use relay_protocol::FiniteF64;
3509
3510        let rate_limited_org = Scoping {
3511            organization_id: OrganizationId::new(1),
3512            project_id: ProjectId::new(21),
3513            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3514            key_id: Some(17),
3515        };
3516
3517        let not_rate_limited_org = Scoping {
3518            organization_id: OrganizationId::new(2),
3519            project_id: ProjectId::new(21),
3520            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3521            key_id: Some(17),
3522        };
3523
3524        let message = {
3525            let project_info = {
3526                let quota = Quota {
3527                    id: Some("testing".into()),
3528                    categories: [DataCategory::MetricBucket].into(),
3529                    scope: relay_quotas::QuotaScope::Organization,
3530                    scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3531                    limit: Some(0),
3532                    window: None,
3533                    reason_code: Some(ReasonCode::new("test")),
3534                    namespace: None,
3535                };
3536
3537                let mut config = ProjectConfig::default();
3538                config.quotas.push(quota);
3539
3540                Arc::new(ProjectInfo {
3541                    config,
3542                    ..Default::default()
3543                })
3544            };
3545
3546            let project_metrics = |scoping| ProjectBuckets {
3547                buckets: vec![Bucket {
3548                    name: "d:transactions/bar".into(),
3549                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3550                    timestamp: UnixTimestamp::now(),
3551                    tags: Default::default(),
3552                    width: 10,
3553                    metadata: BucketMetadata::default(),
3554                }],
3555                rate_limits: Default::default(),
3556                project_info: project_info.clone(),
3557                scoping,
3558            };
3559
3560            let buckets = hashbrown::HashMap::from([
3561                (
3562                    rate_limited_org.project_key,
3563                    project_metrics(rate_limited_org),
3564                ),
3565                (
3566                    not_rate_limited_org.project_key,
3567                    project_metrics(not_rate_limited_org),
3568                ),
3569            ]);
3570
3571            FlushBuckets {
3572                partition_key: 0,
3573                buckets,
3574            }
3575        };
3576
3577        // ensure the order of the map while iterating is as expected.
3578        assert_eq!(message.buckets.keys().count(), 2);
3579
3580        let config = {
3581            let config_json = serde_json::json!({
3582                "processing": {
3583                    "enabled": true,
3584                    "kafka_config": [],
3585                    "redis": {
3586                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3587                    }
3588                }
3589            });
3590            Config::from_json_value(config_json).unwrap()
3591        };
3592
3593        let (store, handle) = {
3594            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3595                let org_id = match msg {
3596                    Store::Metrics(x) => x.scoping.organization_id,
3597                    _ => panic!("received envelope when expecting only metrics"),
3598                };
3599                org_ids.push(org_id);
3600            };
3601
3602            mock_service("store_forwarder", vec![], f)
3603        };
3604
3605        let processor = create_test_processor(config).await;
3606        assert!(processor.redis_rate_limiter_enabled());
3607
3608        processor.encode_metrics_processing(message, &store).await;
3609
3610        drop(store);
3611        let orgs_not_ratelimited = handle.await.unwrap();
3612
3613        assert_eq!(
3614            orgs_not_ratelimited,
3615            vec![not_rate_limited_org.organization_id]
3616        );
3617    }
3618
3619    #[tokio::test]
3620    async fn test_browser_version_extraction_with_pii_like_data() {
3621        let processor = create_test_processor(Default::default()).await;
3622        let outcome_aggregator = Addr::dummy();
3623        let event_id = EventId::new();
3624
3625        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3626            .parse()
3627            .unwrap();
3628
3629        let request_meta = RequestMeta::new(dsn);
3630        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3631
3632        envelope.add_item({
3633                let mut item = Item::new(ItemType::Event);
3634                item.set_payload(
3635                    ContentType::Json,
3636                    r#"
3637                    {
3638                        "request": {
3639                            "headers": [
3640                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3641                            ]
3642                        }
3643                    }
3644                "#,
3645                );
3646                item
3647            });
3648
3649        let mut datascrubbing_settings = DataScrubbingConfig::default();
3650        // enable all the default scrubbing
3651        datascrubbing_settings.scrub_data = true;
3652        datascrubbing_settings.scrub_defaults = true;
3653        datascrubbing_settings.scrub_ip_addresses = true;
3654
3655        // Make sure to mask any IP-like looking data
3656        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3657
3658        let config = ProjectConfig {
3659            datascrubbing_settings,
3660            pii_config: Some(pii_config),
3661            ..Default::default()
3662        };
3663
3664        let project_info = ProjectInfo {
3665            config,
3666            ..Default::default()
3667        };
3668
3669        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3670        assert_eq!(envelopes.len(), 1);
3671
3672        let (group, envelope) = envelopes.pop().unwrap();
3673        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3674
3675        let message = ProcessEnvelopeGrouped {
3676            group,
3677            envelope,
3678            ctx: processing::Context {
3679                project_info: &project_info,
3680                ..processing::Context::for_test()
3681            },
3682        };
3683
3684        let Ok(Some(Submit::Envelope(mut new_envelope))) =
3685            processor.process(&mut Token::noop(), message).await
3686        else {
3687            panic!();
3688        };
3689        let new_envelope = new_envelope.envelope_mut();
3690
3691        let event_item = new_envelope.items().last().unwrap();
3692        let annotated_event: Annotated<Event> =
3693            Annotated::from_json_bytes(&event_item.payload()).unwrap();
3694        let event = annotated_event.into_value().unwrap();
3695        let headers = event
3696            .request
3697            .into_value()
3698            .unwrap()
3699            .headers
3700            .into_value()
3701            .unwrap();
3702
3703        // IP-like data must be masked
3704        assert_eq!(
3705            Some(
3706                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3707            ),
3708            headers.get_header("User-Agent")
3709        );
3710        // But we still get correct browser and version number
3711        let contexts = event.contexts.into_value().unwrap();
3712        let browser = contexts.0.get("browser").unwrap();
3713        assert_eq!(
3714            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3715            browser.to_json().unwrap()
3716        );
3717    }
3718
3719    #[tokio::test]
3720    #[cfg(feature = "processing")]
3721    async fn test_materialize_dsc() {
3722        use crate::services::projects::project::PublicKeyConfig;
3723
3724        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3725            .parse()
3726            .unwrap();
3727        let request_meta = RequestMeta::new(dsn);
3728        let mut envelope = Envelope::from_request(None, request_meta);
3729
3730        let dsc = r#"{
3731            "trace_id": "00000000-0000-0000-0000-000000000000",
3732            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3733            "sample_rate": "0.2"
3734        }"#;
3735        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3736
3737        let mut item = Item::new(ItemType::Event);
3738        item.set_payload(ContentType::Json, r#"{}"#);
3739        envelope.add_item(item);
3740
3741        let outcome_aggregator = Addr::dummy();
3742        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3743
3744        let mut project_info = ProjectInfo::default();
3745        project_info.public_keys.push(PublicKeyConfig {
3746            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3747            numeric_id: Some(1),
3748        });
3749
3750        let config = serde_json::json!({
3751            "processing": {
3752                "enabled": true,
3753                "kafka_config": [],
3754            }
3755        });
3756
3757        let message = ProcessEnvelopeGrouped {
3758            group: ProcessingGroup::Transaction,
3759            envelope: managed_envelope,
3760            ctx: processing::Context {
3761                config: &Config::from_json_value(config.clone()).unwrap(),
3762                project_info: &project_info,
3763                sampling_project_info: Some(&project_info),
3764                ..processing::Context::for_test()
3765            },
3766        };
3767
3768        let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3769        let Ok(Some(Submit::Envelope(envelope))) =
3770            processor.process(&mut Token::noop(), message).await
3771        else {
3772            panic!();
3773        };
3774        let event = envelope
3775            .envelope()
3776            .get_item_by(|item| item.ty() == &ItemType::Event)
3777            .unwrap();
3778
3779        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3780        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3781        Object(
3782            {
3783                "environment": ~,
3784                "public_key": String(
3785                    "e12d836b15bb49d7bbf99e64295d995b",
3786                ),
3787                "release": ~,
3788                "replay_id": ~,
3789                "sample_rate": String(
3790                    "0.2",
3791                ),
3792                "trace_id": String(
3793                    "00000000000000000000000000000000",
3794                ),
3795                "transaction": ~,
3796            },
3797        )
3798        "###);
3799    }
3800
3801    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3802        let mut event = Annotated::<Event>::from_json(
3803            r#"
3804            {
3805                "type": "transaction",
3806                "transaction": "/foo/",
3807                "timestamp": 946684810.0,
3808                "start_timestamp": 946684800.0,
3809                "contexts": {
3810                    "trace": {
3811                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3812                        "span_id": "fa90fdead5f74053",
3813                        "op": "http.server",
3814                        "type": "trace"
3815                    }
3816                },
3817                "transaction_info": {
3818                    "source": "url"
3819                }
3820            }
3821            "#,
3822        )
3823        .unwrap();
3824        let e = event.value_mut().as_mut().unwrap();
3825        e.transaction.set_value(Some(transaction_name.into()));
3826
3827        e.transaction_info
3828            .value_mut()
3829            .as_mut()
3830            .unwrap()
3831            .source
3832            .set_value(Some(source));
3833
3834        relay_statsd::with_capturing_test_client(|| {
3835            utils::log_transaction_name_metrics(&mut event, |event| {
3836                let config = NormalizationConfig {
3837                    transaction_name_config: TransactionNameConfig {
3838                        rules: &[TransactionNameRule {
3839                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
3840                            expiry: DateTime::<Utc>::MAX_UTC,
3841                            redaction: RedactionRule::Replace {
3842                                substitution: "*".to_owned(),
3843                            },
3844                        }],
3845                    },
3846                    ..Default::default()
3847                };
3848                relay_event_normalization::normalize_event(event, &config)
3849            });
3850        })
3851    }
3852
3853    #[test]
3854    fn test_log_transaction_metrics_none() {
3855        let captures = capture_test_event("/nothing", TransactionSource::Url);
3856        insta::assert_debug_snapshot!(captures, @r###"
3857        [
3858            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3859        ]
3860        "###);
3861    }
3862
3863    #[test]
3864    fn test_log_transaction_metrics_rule() {
3865        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3866        insta::assert_debug_snapshot!(captures, @r###"
3867        [
3868            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3869        ]
3870        "###);
3871    }
3872
3873    #[test]
3874    fn test_log_transaction_metrics_pattern() {
3875        let captures = capture_test_event("/something/12345", TransactionSource::Url);
3876        insta::assert_debug_snapshot!(captures, @r###"
3877        [
3878            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3879        ]
3880        "###);
3881    }
3882
3883    #[test]
3884    fn test_log_transaction_metrics_both() {
3885        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3886        insta::assert_debug_snapshot!(captures, @r###"
3887        [
3888            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3889        ]
3890        "###);
3891    }
3892
3893    #[test]
3894    fn test_log_transaction_metrics_no_match() {
3895        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3896        insta::assert_debug_snapshot!(captures, @r###"
3897        [
3898            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3899        ]
3900        "###);
3901    }
3902
3903    /// Confirms that the hardcoded value we use for the fixed length of the measurement MRI is
3904    /// correct. Unit test is placed here because it has dependencies to relay-server and therefore
3905    /// cannot be called from relay-metrics.
3906    #[test]
3907    fn test_mri_overhead_constant() {
3908        let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
3909
3910        let derived_value = {
3911            let name = "foobar".to_owned();
3912            let value = 5.into(); // Arbitrary value.
3913            let unit = MetricUnit::Duration(DurationUnit::default());
3914            let tags = TransactionMeasurementTags {
3915                measurement_rating: None,
3916                universal_tags: CommonTags(BTreeMap::new()),
3917                score_profile_version: None,
3918            };
3919
3920            let measurement = TransactionMetric::Measurement {
3921                name: name.clone(),
3922                value,
3923                unit,
3924                tags,
3925            };
3926
3927            let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
3928            metric.name.len() - unit.to_string().len() - name.len()
3929        };
3930        assert_eq!(
3931            hardcoded_value, derived_value,
3932            "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
3933        );
3934    }
3935
3936    #[tokio::test]
3937    async fn test_process_metrics_bucket_metadata() {
3938        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3939        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3940        let received_at = Utc::now();
3941        let config = Config::default();
3942
3943        let (aggregator, mut aggregator_rx) = Addr::custom();
3944        let processor = create_test_processor_with_addrs(
3945            config,
3946            Addrs {
3947                aggregator,
3948                ..Default::default()
3949            },
3950        )
3951        .await;
3952
3953        let mut item = Item::new(ItemType::Statsd);
3954        item.set_payload(
3955            ContentType::Text,
3956            "transactions/foo:3182887624:4267882815|s",
3957        );
3958        for (source, expected_received_at) in [
3959            (
3960                BucketSource::External,
3961                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3962            ),
3963            (BucketSource::Internal, None),
3964        ] {
3965            let message = ProcessMetrics {
3966                data: MetricData::Raw(vec![item.clone()]),
3967                project_key,
3968                source,
3969                received_at,
3970                sent_at: Some(Utc::now()),
3971            };
3972            processor.handle_process_metrics(&mut token, message);
3973
3974            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3975            let buckets = merge_buckets.buckets;
3976            assert_eq!(buckets.len(), 1);
3977            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3978        }
3979    }
3980
3981    #[tokio::test]
3982    async fn test_process_batched_metrics() {
3983        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3984        let received_at = Utc::now();
3985        let config = Config::default();
3986
3987        let (aggregator, mut aggregator_rx) = Addr::custom();
3988        let processor = create_test_processor_with_addrs(
3989            config,
3990            Addrs {
3991                aggregator,
3992                ..Default::default()
3993            },
3994        )
3995        .await;
3996
3997        let payload = r#"{
3998    "buckets": {
3999        "11111111111111111111111111111111": [
4000            {
4001                "timestamp": 1615889440,
4002                "width": 0,
4003                "name": "d:custom/endpoint.response_time@millisecond",
4004                "type": "d",
4005                "value": [
4006                  68.0
4007                ],
4008                "tags": {
4009                  "route": "user_index"
4010                }
4011            }
4012        ],
4013        "22222222222222222222222222222222": [
4014            {
4015                "timestamp": 1615889440,
4016                "width": 0,
4017                "name": "d:custom/endpoint.cache_rate@none",
4018                "type": "d",
4019                "value": [
4020                  36.0
4021                ]
4022            }
4023        ]
4024    }
4025}
4026"#;
4027        let message = ProcessBatchedMetrics {
4028            payload: Bytes::from(payload),
4029            source: BucketSource::Internal,
4030            received_at,
4031            sent_at: Some(Utc::now()),
4032        };
4033        processor.handle_process_batched_metrics(&mut token, message);
4034
4035        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
4036        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
4037
4038        let mut messages = vec![mb1, mb2];
4039        messages.sort_by_key(|mb| mb.project_key);
4040
4041        let actual = messages
4042            .into_iter()
4043            .map(|mb| (mb.project_key, mb.buckets))
4044            .collect::<Vec<_>>();
4045
4046        assert_debug_snapshot!(actual, @r###"
4047        [
4048            (
4049                ProjectKey("11111111111111111111111111111111"),
4050                [
4051                    Bucket {
4052                        timestamp: UnixTimestamp(1615889440),
4053                        width: 0,
4054                        name: MetricName(
4055                            "d:custom/endpoint.response_time@millisecond",
4056                        ),
4057                        value: Distribution(
4058                            [
4059                                68.0,
4060                            ],
4061                        ),
4062                        tags: {
4063                            "route": "user_index",
4064                        },
4065                        metadata: BucketMetadata {
4066                            merges: 1,
4067                            received_at: None,
4068                            extracted_from_indexed: false,
4069                        },
4070                    },
4071                ],
4072            ),
4073            (
4074                ProjectKey("22222222222222222222222222222222"),
4075                [
4076                    Bucket {
4077                        timestamp: UnixTimestamp(1615889440),
4078                        width: 0,
4079                        name: MetricName(
4080                            "d:custom/endpoint.cache_rate@none",
4081                        ),
4082                        value: Distribution(
4083                            [
4084                                36.0,
4085                            ],
4086                        ),
4087                        tags: {},
4088                        metadata: BucketMetadata {
4089                            merges: 1,
4090                            received_at: None,
4091                            extracted_from_indexed: false,
4092                        },
4093                    },
4094                ],
4095            ),
4096        ]
4097        "###);
4098    }
4099}