relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27    ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42    self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::{Integration, SpansIntegration};
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::ExtractedMetrics;
49use crate::metrics_extraction::transactions::types::ExtractMetricsError;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::sessions::SessionsProcessor;
53use crate::processing::spans::SpansProcessor;
54use crate::processing::trace_attachments::TraceAttachmentsProcessor;
55use crate::processing::trace_metrics::TraceMetricsProcessor;
56use crate::processing::transactions::extraction::ExtractMetricsContext;
57use crate::processing::utils::event::{
58    EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
59    event_type,
60};
61use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
62use crate::service::ServiceError;
63use crate::services::global_config::GlobalConfigHandle;
64use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
65use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
66use crate::services::projects::cache::ProjectCacheHandle;
67use crate::services::projects::project::{ProjectInfo, ProjectState};
68use crate::services::upstream::{
69    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
70};
71use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
72use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
73use crate::{http, processing};
74use relay_threading::AsyncPool;
75#[cfg(feature = "processing")]
76use {
77    crate::services::global_rate_limits::{GlobalRateLimits, GlobalRateLimitsServiceHandle},
78    crate::services::processor::nnswitch::SwitchProcessingError,
79    crate::services::store::{Store, StoreEnvelope},
80    crate::services::upload::Upload,
81    crate::utils::Enforcement,
82    itertools::Itertools,
83    relay_cardinality::{
84        CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
85        RedisSetLimiterOptions,
86    },
87    relay_dynamic_config::CardinalityLimiterMode,
88    relay_quotas::{RateLimitingError, RedisRateLimiter},
89    relay_redis::{AsyncRedisClient, RedisClients},
90    std::time::Instant,
91    symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
92};
93
94mod attachment;
95mod dynamic_sampling;
96mod event;
97mod metrics;
98mod nel;
99mod profile;
100mod profile_chunk;
101mod replay;
102mod report;
103mod span;
104
105#[cfg(all(sentry, feature = "processing"))]
106mod playstation;
107mod standalone;
108#[cfg(feature = "processing")]
109mod unreal;
110
111#[cfg(feature = "processing")]
112mod nnswitch;
113
114/// Creates the block only if used with `processing` feature.
115///
116/// Provided code block will be executed only if the provided config has `processing_enabled` set.
117macro_rules! if_processing {
118    ($config:expr, $if_true:block) => {
119        #[cfg(feature = "processing")] {
120            if $config.processing_enabled() $if_true
121        }
122    };
123    ($config:expr, $if_true:block else $if_false:block) => {
124        {
125            #[cfg(feature = "processing")] {
126                if $config.processing_enabled() $if_true else $if_false
127            }
128            #[cfg(not(feature = "processing"))] {
129                $if_false
130            }
131        }
132    };
133}
134
135/// The minimum clock drift for correction to apply.
136pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
137
138#[derive(Debug)]
139pub struct GroupTypeError;
140
141impl Display for GroupTypeError {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        f.write_str("failed to convert processing group into corresponding type")
144    }
145}
146
147impl std::error::Error for GroupTypeError {}
148
149macro_rules! processing_group {
150    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
151        #[derive(Clone, Copy, Debug)]
152        pub struct $ty;
153
154        impl From<$ty> for ProcessingGroup {
155            fn from(_: $ty) -> Self {
156                ProcessingGroup::$variant
157            }
158        }
159
160        impl TryFrom<ProcessingGroup> for $ty {
161            type Error = GroupTypeError;
162
163            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
164                if matches!(value, ProcessingGroup::$variant) {
165                    return Ok($ty);
166                }
167                $($(
168                    if matches!(value, ProcessingGroup::$other) {
169                        return Ok($ty);
170                    }
171                )+)?
172                return Err(GroupTypeError);
173            }
174        }
175    };
176}
177
178/// A marker trait.
179///
180/// Should be used only with groups which are responsible for processing envelopes with events.
181pub trait EventProcessing {}
182
183processing_group!(TransactionGroup, Transaction);
184impl EventProcessing for TransactionGroup {}
185
186processing_group!(ErrorGroup, Error);
187impl EventProcessing for ErrorGroup {}
188
189processing_group!(SessionGroup, Session);
190processing_group!(StandaloneGroup, Standalone);
191processing_group!(ClientReportGroup, ClientReport);
192processing_group!(ReplayGroup, Replay);
193processing_group!(CheckInGroup, CheckIn);
194processing_group!(LogGroup, Log, Nel);
195processing_group!(TraceMetricGroup, TraceMetric);
196processing_group!(SpanGroup, Span);
197
198processing_group!(ProfileChunkGroup, ProfileChunk);
199processing_group!(MetricsGroup, Metrics);
200processing_group!(ForwardUnknownGroup, ForwardUnknown);
201processing_group!(Ungrouped, Ungrouped);
202
203/// Processed group type marker.
204///
205/// Marks the envelopes which passed through the processing pipeline.
206#[derive(Clone, Copy, Debug)]
207pub struct Processed;
208
209/// Describes the groups of the processable items.
210#[derive(Clone, Copy, Debug)]
211pub enum ProcessingGroup {
212    /// All the transaction related items.
213    ///
214    /// Includes transactions, related attachments, profiles.
215    Transaction,
216    /// All the items which require (have or create) events.
217    ///
218    /// This includes: errors, NEL, security reports, user reports, some of the
219    /// attachments.
220    Error,
221    /// Session events.
222    Session,
223    /// Standalone items which can be sent alone without any event attached to it in the current
224    /// envelope e.g. some attachments, user reports.
225    Standalone,
226    /// Outcomes.
227    ClientReport,
228    /// Replays and ReplayRecordings.
229    Replay,
230    /// Crons.
231    CheckIn,
232    /// NEL reports.
233    Nel,
234    /// Logs.
235    Log,
236    /// Trace metrics.
237    TraceMetric,
238    /// Spans.
239    Span,
240    /// Span V2 spans.
241    SpanV2,
242    /// Metrics.
243    Metrics,
244    /// ProfileChunk.
245    ProfileChunk,
246    /// V2 attachments without span / log association.
247    TraceAttachment,
248    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
249    /// decide what to do with them.
250    ForwardUnknown,
251    /// All the items in the envelope that could not be grouped.
252    Ungrouped,
253}
254
255impl ProcessingGroup {
256    /// Splits provided envelope into list of tuples of groups with associated envelopes.
257    fn split_envelope(
258        mut envelope: Envelope,
259        project_info: &ProjectInfo,
260    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
261        let headers = envelope.headers().clone();
262        let mut grouped_envelopes = smallvec![];
263
264        // Extract replays.
265        let replay_items = envelope.take_items_by(|item| {
266            matches!(
267                item.ty(),
268                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
269            )
270        });
271        if !replay_items.is_empty() {
272            grouped_envelopes.push((
273                ProcessingGroup::Replay,
274                Envelope::from_parts(headers.clone(), replay_items),
275            ))
276        }
277
278        // Keep all the sessions together in one envelope.
279        let session_items = envelope
280            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
281        if !session_items.is_empty() {
282            grouped_envelopes.push((
283                ProcessingGroup::Session,
284                Envelope::from_parts(headers.clone(), session_items),
285            ))
286        }
287
288        let span_v2_items = envelope.take_items_by(|item| {
289            let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
290            let is_supported_integration = {
291                matches!(
292                    item.integration(),
293                    Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
294                )
295            };
296            let is_span = matches!(item.ty(), &ItemType::Span);
297            let is_span_attachment = item.is_span_attachment();
298
299            ItemContainer::<SpanV2>::is_container(item)
300                || (exp_feature && is_span)
301                || (exp_feature && is_supported_integration)
302                || (exp_feature && is_span_attachment)
303        });
304
305        if !span_v2_items.is_empty() {
306            grouped_envelopes.push((
307                ProcessingGroup::SpanV2,
308                Envelope::from_parts(headers.clone(), span_v2_items),
309            ))
310        }
311
312        // Extract spans.
313        let span_items = envelope.take_items_by(|item| {
314            matches!(item.ty(), &ItemType::Span)
315                || matches!(item.integration(), Some(Integration::Spans(_)))
316        });
317
318        if !span_items.is_empty() {
319            grouped_envelopes.push((
320                ProcessingGroup::Span,
321                Envelope::from_parts(headers.clone(), span_items),
322            ))
323        }
324
325        // Extract logs.
326        let logs_items = envelope.take_items_by(|item| {
327            matches!(item.ty(), &ItemType::Log)
328                || matches!(item.integration(), Some(Integration::Logs(_)))
329        });
330        if !logs_items.is_empty() {
331            grouped_envelopes.push((
332                ProcessingGroup::Log,
333                Envelope::from_parts(headers.clone(), logs_items),
334            ))
335        }
336
337        // Extract trace metrics.
338        let trace_metric_items =
339            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
340        if !trace_metric_items.is_empty() {
341            grouped_envelopes.push((
342                ProcessingGroup::TraceMetric,
343                Envelope::from_parts(headers.clone(), trace_metric_items),
344            ))
345        }
346
347        // NEL items are transformed into logs in their own processing step.
348        let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
349        if !nel_items.is_empty() {
350            grouped_envelopes.push((
351                ProcessingGroup::Nel,
352                Envelope::from_parts(headers.clone(), nel_items),
353            ))
354        }
355
356        // Extract all metric items.
357        //
358        // Note: Should only be relevant in proxy mode. In other modes we send metrics through
359        // a separate pipeline.
360        let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
361        if !metric_items.is_empty() {
362            grouped_envelopes.push((
363                ProcessingGroup::Metrics,
364                Envelope::from_parts(headers.clone(), metric_items),
365            ))
366        }
367
368        // Extract profile chunks.
369        let profile_chunk_items =
370            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
371        if !profile_chunk_items.is_empty() {
372            grouped_envelopes.push((
373                ProcessingGroup::ProfileChunk,
374                Envelope::from_parts(headers.clone(), profile_chunk_items),
375            ))
376        }
377
378        let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
379        if !trace_attachment_items.is_empty() {
380            grouped_envelopes.push((
381                ProcessingGroup::TraceAttachment,
382                Envelope::from_parts(headers.clone(), trace_attachment_items),
383            ))
384        }
385
386        // Extract all standalone items.
387        //
388        // Note: only if there are no items in the envelope which can create events, otherwise they
389        // will be in the same envelope with all require event items.
390        if !envelope.items().any(Item::creates_event) {
391            let standalone_items = envelope.take_items_by(Item::requires_event);
392            if !standalone_items.is_empty() {
393                grouped_envelopes.push((
394                    ProcessingGroup::Standalone,
395                    Envelope::from_parts(headers.clone(), standalone_items),
396                ))
397            }
398        };
399
400        // Make sure we create separate envelopes for each `RawSecurity` report.
401        let security_reports_items = envelope
402            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
403            .into_iter()
404            .map(|item| {
405                let headers = headers.clone();
406                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
407                let mut envelope = Envelope::from_parts(headers, items);
408                envelope.set_event_id(EventId::new());
409                (ProcessingGroup::Error, envelope)
410            });
411        grouped_envelopes.extend(security_reports_items);
412
413        // Extract all the items which require an event into separate envelope.
414        let require_event_items = envelope.take_items_by(Item::requires_event);
415        if !require_event_items.is_empty() {
416            let group = if require_event_items
417                .iter()
418                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
419            {
420                ProcessingGroup::Transaction
421            } else {
422                ProcessingGroup::Error
423            };
424
425            grouped_envelopes.push((
426                group,
427                Envelope::from_parts(headers.clone(), require_event_items),
428            ))
429        }
430
431        // Get the rest of the envelopes, one per item.
432        let envelopes = envelope.items_mut().map(|item| {
433            let headers = headers.clone();
434            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
435            let envelope = Envelope::from_parts(headers, items);
436            let item_type = item.ty();
437            let group = if matches!(item_type, &ItemType::CheckIn) {
438                ProcessingGroup::CheckIn
439            } else if matches!(item.ty(), &ItemType::ClientReport) {
440                ProcessingGroup::ClientReport
441            } else if matches!(item_type, &ItemType::Unknown(_)) {
442                ProcessingGroup::ForwardUnknown
443            } else {
444                // Cannot group this item type.
445                ProcessingGroup::Ungrouped
446            };
447
448            (group, envelope)
449        });
450        grouped_envelopes.extend(envelopes);
451
452        grouped_envelopes
453    }
454
455    /// Returns the name of the group.
456    pub fn variant(&self) -> &'static str {
457        match self {
458            ProcessingGroup::Transaction => "transaction",
459            ProcessingGroup::Error => "error",
460            ProcessingGroup::Session => "session",
461            ProcessingGroup::Standalone => "standalone",
462            ProcessingGroup::ClientReport => "client_report",
463            ProcessingGroup::Replay => "replay",
464            ProcessingGroup::CheckIn => "check_in",
465            ProcessingGroup::Log => "log",
466            ProcessingGroup::TraceMetric => "trace_metric",
467            ProcessingGroup::Nel => "nel",
468            ProcessingGroup::Span => "span",
469            ProcessingGroup::SpanV2 => "span_v2",
470            ProcessingGroup::Metrics => "metrics",
471            ProcessingGroup::ProfileChunk => "profile_chunk",
472            ProcessingGroup::TraceAttachment => "trace_attachment",
473            ProcessingGroup::ForwardUnknown => "forward_unknown",
474            ProcessingGroup::Ungrouped => "ungrouped",
475        }
476    }
477}
478
479impl From<ProcessingGroup> for AppFeature {
480    fn from(value: ProcessingGroup) -> Self {
481        match value {
482            ProcessingGroup::Transaction => AppFeature::Transactions,
483            ProcessingGroup::Error => AppFeature::Errors,
484            ProcessingGroup::Session => AppFeature::Sessions,
485            ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
486            ProcessingGroup::ClientReport => AppFeature::ClientReports,
487            ProcessingGroup::Replay => AppFeature::Replays,
488            ProcessingGroup::CheckIn => AppFeature::CheckIns,
489            ProcessingGroup::Log => AppFeature::Logs,
490            ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
491            ProcessingGroup::Nel => AppFeature::Logs,
492            ProcessingGroup::Span => AppFeature::Spans,
493            ProcessingGroup::SpanV2 => AppFeature::Spans,
494            ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
495            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
496            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
497            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
498            ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
499        }
500    }
501}
502
503/// An error returned when handling [`ProcessEnvelope`].
504#[derive(Debug, thiserror::Error)]
505pub enum ProcessingError {
506    #[error("invalid json in event")]
507    InvalidJson(#[source] serde_json::Error),
508
509    #[error("invalid message pack event payload")]
510    InvalidMsgpack(#[from] rmp_serde::decode::Error),
511
512    #[cfg(feature = "processing")]
513    #[error("invalid unreal crash report")]
514    InvalidUnrealReport(#[source] Unreal4Error),
515
516    #[error("event payload too large")]
517    PayloadTooLarge(DiscardItemType),
518
519    #[error("invalid transaction event")]
520    InvalidTransaction,
521
522    #[error("envelope processor failed")]
523    ProcessingFailed(#[from] ProcessingAction),
524
525    #[error("duplicate {0} in event")]
526    DuplicateItem(ItemType),
527
528    #[error("failed to extract event payload")]
529    NoEventPayload,
530
531    #[error("missing project id in DSN")]
532    MissingProjectId,
533
534    #[error("invalid security report type: {0:?}")]
535    InvalidSecurityType(Bytes),
536
537    #[error("unsupported security report type")]
538    UnsupportedSecurityType,
539
540    #[error("invalid security report")]
541    InvalidSecurityReport(#[source] serde_json::Error),
542
543    #[error("invalid nel report")]
544    InvalidNelReport(#[source] NetworkReportError),
545
546    #[error("event filtered with reason: {0:?}")]
547    EventFiltered(FilterStatKey),
548
549    #[error("missing or invalid required event timestamp")]
550    InvalidTimestamp,
551
552    #[error("could not serialize event payload")]
553    SerializeFailed(#[source] serde_json::Error),
554
555    #[cfg(feature = "processing")]
556    #[error("failed to apply quotas")]
557    QuotasFailed(#[from] RateLimitingError),
558
559    #[error("invalid pii config")]
560    PiiConfigError(PiiConfigError),
561
562    #[error("invalid processing group type")]
563    InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
564
565    #[error("invalid replay")]
566    InvalidReplay(DiscardReason),
567
568    #[error("replay filtered with reason: {0:?}")]
569    ReplayFiltered(FilterStatKey),
570
571    #[cfg(feature = "processing")]
572    #[error("nintendo switch dying message processing failed {0:?}")]
573    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
574
575    #[cfg(all(sentry, feature = "processing"))]
576    #[error("playstation dump processing failed: {0}")]
577    InvalidPlaystationDump(String),
578
579    #[error("processing group does not match specific processor")]
580    ProcessingGroupMismatch,
581    #[error("new processing pipeline failed")]
582    ProcessingFailure,
583}
584
585impl ProcessingError {
586    pub fn to_outcome(&self) -> Option<Outcome> {
587        match self {
588            Self::PayloadTooLarge(payload_type) => {
589                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
590            }
591            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
592            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
593            Self::InvalidSecurityType(_) => {
594                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
595            }
596            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
597            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
598            Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
599            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
600            Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
601            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
602            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
603            #[cfg(feature = "processing")]
604            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
605            #[cfg(all(sentry, feature = "processing"))]
606            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
607            #[cfg(feature = "processing")]
608            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
609                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
610            }
611            #[cfg(feature = "processing")]
612            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
613            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
614                Some(Outcome::Invalid(DiscardReason::Internal))
615            }
616            #[cfg(feature = "processing")]
617            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
618            Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
619            Self::MissingProjectId => None,
620            Self::EventFiltered(_) => None,
621            Self::InvalidProcessingGroup(_) => None,
622            Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
623            Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
624
625            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
626            // Outcomes are emitted in the new processing pipeline already.
627            Self::ProcessingFailure => None,
628        }
629    }
630
631    fn is_unexpected(&self) -> bool {
632        self.to_outcome()
633            .is_some_and(|outcome| outcome.is_unexpected())
634    }
635}
636
637#[cfg(feature = "processing")]
638impl From<Unreal4Error> for ProcessingError {
639    fn from(err: Unreal4Error) -> Self {
640        match err.kind() {
641            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
642            _ => ProcessingError::InvalidUnrealReport(err),
643        }
644    }
645}
646
647impl From<ExtractMetricsError> for ProcessingError {
648    fn from(error: ExtractMetricsError) -> Self {
649        match error {
650            ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
651                Self::InvalidTimestamp
652            }
653        }
654    }
655}
656
657impl From<InvalidProcessingGroupType> for ProcessingError {
658    fn from(value: InvalidProcessingGroupType) -> Self {
659        Self::InvalidProcessingGroup(Box::new(value))
660    }
661}
662
663type ExtractedEvent = (Annotated<Event>, usize);
664
665/// A container for extracted metrics during processing.
666///
667/// The container enforces that the extracted metrics are correctly tagged
668/// with the dynamic sampling decision.
669#[derive(Debug)]
670pub struct ProcessingExtractedMetrics {
671    metrics: ExtractedMetrics,
672}
673
674impl ProcessingExtractedMetrics {
675    pub fn new() -> Self {
676        Self {
677            metrics: ExtractedMetrics::default(),
678        }
679    }
680
681    pub fn into_inner(self) -> ExtractedMetrics {
682        self.metrics
683    }
684
685    /// Extends the contained metrics with [`ExtractedMetrics`].
686    pub fn extend(
687        &mut self,
688        extracted: ExtractedMetrics,
689        sampling_decision: Option<SamplingDecision>,
690    ) {
691        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
692        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
693    }
694
695    /// Extends the contained project metrics.
696    pub fn extend_project_metrics<I>(
697        &mut self,
698        buckets: I,
699        sampling_decision: Option<SamplingDecision>,
700    ) where
701        I: IntoIterator<Item = Bucket>,
702    {
703        self.metrics
704            .project_metrics
705            .extend(buckets.into_iter().map(|mut bucket| {
706                bucket.metadata.extracted_from_indexed =
707                    sampling_decision == Some(SamplingDecision::Keep);
708                bucket
709            }));
710    }
711
712    /// Extends the contained sampling metrics.
713    pub fn extend_sampling_metrics<I>(
714        &mut self,
715        buckets: I,
716        sampling_decision: Option<SamplingDecision>,
717    ) where
718        I: IntoIterator<Item = Bucket>,
719    {
720        self.metrics
721            .sampling_metrics
722            .extend(buckets.into_iter().map(|mut bucket| {
723                bucket.metadata.extracted_from_indexed =
724                    sampling_decision == Some(SamplingDecision::Keep);
725                bucket
726            }));
727    }
728
729    /// Applies rate limits to the contained metrics.
730    ///
731    /// This is used to apply rate limits which have been enforced on sampled items of an envelope
732    /// to also consistently apply to the metrics extracted from these items.
733    #[cfg(feature = "processing")]
734    fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
735        // Metric namespaces which need to be dropped.
736        let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
737        // Metrics belonging to this metric namespace need to have the `extracted_from_indexed`
738        // flag reset to `false`.
739        let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
740
741        for (namespace, limit, indexed) in [
742            (
743                MetricNamespace::Transactions,
744                &enforcement.event,
745                &enforcement.event_indexed,
746            ),
747            (
748                MetricNamespace::Spans,
749                &enforcement.spans,
750                &enforcement.spans_indexed,
751            ),
752        ] {
753            if limit.is_active() {
754                drop_namespaces.push(namespace);
755            } else if indexed.is_active() && !enforced_consistently {
756                // If the enforcement was not computed by consistently checking the limits,
757                // the quota for the metrics has not yet been incremented.
758                // In this case we have a dropped indexed payload but a metric which still needs to
759                // be accounted for, make sure the metric will still be rate limited.
760                reset_extracted_from_indexed.push(namespace);
761            }
762        }
763
764        if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
765            self.retain_mut(|bucket| {
766                let Some(namespace) = bucket.name.try_namespace() else {
767                    return true;
768                };
769
770                if drop_namespaces.contains(&namespace) {
771                    return false;
772                }
773
774                if reset_extracted_from_indexed.contains(&namespace) {
775                    bucket.metadata.extracted_from_indexed = false;
776                }
777
778                true
779            });
780        }
781    }
782
783    #[cfg(feature = "processing")]
784    fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
785        self.metrics.project_metrics.retain_mut(&mut f);
786        self.metrics.sampling_metrics.retain_mut(&mut f);
787    }
788}
789
790fn send_metrics(
791    metrics: ExtractedMetrics,
792    project_key: ProjectKey,
793    sampling_key: Option<ProjectKey>,
794    aggregator: &Addr<Aggregator>,
795) {
796    let ExtractedMetrics {
797        project_metrics,
798        sampling_metrics,
799    } = metrics;
800
801    if !project_metrics.is_empty() {
802        aggregator.send(MergeBuckets {
803            project_key,
804            buckets: project_metrics,
805        });
806    }
807
808    if !sampling_metrics.is_empty() {
809        // If no sampling project state is available, we associate the sampling
810        // metrics with the current project.
811        //
812        // project_without_tracing         -> metrics goes to self
813        // dependent_project_with_tracing  -> metrics goes to root
814        // root_project_with_tracing       -> metrics goes to root == self
815        let sampling_project_key = sampling_key.unwrap_or(project_key);
816        aggregator.send(MergeBuckets {
817            project_key: sampling_project_key,
818            buckets: sampling_metrics,
819        });
820    }
821}
822
823/// Function for on-off switches that filter specific item types (profiles, spans)
824/// based on a feature flag.
825///
826/// If the project config did not come from the upstream, we keep the items.
827fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
828    match config.relay_mode() {
829        RelayMode::Proxy => false,
830        RelayMode::Managed => !project_info.has_feature(feature),
831    }
832}
833
834/// The result of the envelope processing containing the processed envelope along with the partial
835/// result.
836#[derive(Debug)]
837#[expect(
838    clippy::large_enum_variant,
839    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
840)]
841enum ProcessingResult {
842    Envelope {
843        managed_envelope: TypedEnvelope<Processed>,
844        extracted_metrics: ProcessingExtractedMetrics,
845    },
846    Output(Output<Outputs>),
847}
848
849impl ProcessingResult {
850    /// Creates a [`ProcessingResult`] with no metrics.
851    fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
852        Self::Envelope {
853            managed_envelope,
854            extracted_metrics: ProcessingExtractedMetrics::new(),
855        }
856    }
857}
858
859/// All items which can be submitted upstream.
860#[derive(Debug)]
861#[expect(
862    clippy::large_enum_variant,
863    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
864)]
865enum Submit<'a> {
866    /// A processed envelope.
867    Envelope(TypedEnvelope<Processed>),
868    /// The output of a [`processing::Processor`].
869    Output {
870        output: Outputs,
871        ctx: processing::ForwardContext<'a>,
872    },
873}
874
875/// Applies processing to all contents of the given envelope.
876///
877/// Depending on the contents of the envelope and Relay's mode, this includes:
878///
879///  - Basic normalization and validation for all item types.
880///  - Clock drift correction if the required `sent_at` header is present.
881///  - Expansion of certain item types (e.g. unreal).
882///  - Store normalization for event payloads in processing mode.
883///  - Rate limiters and inbound filters on events in processing mode.
884#[derive(Debug)]
885pub struct ProcessEnvelope {
886    /// Envelope to process.
887    pub envelope: ManagedEnvelope,
888    /// The project info.
889    pub project_info: Arc<ProjectInfo>,
890    /// Currently active cached rate limits for this project.
891    pub rate_limits: Arc<RateLimits>,
892    /// Root sampling project info.
893    pub sampling_project_info: Option<Arc<ProjectInfo>>,
894    /// Sampling reservoir counters.
895    pub reservoir_counters: ReservoirCounters,
896}
897
898/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
899#[derive(Debug)]
900struct ProcessEnvelopeGrouped<'a> {
901    /// The group the envelope belongs to.
902    pub group: ProcessingGroup,
903    /// Envelope to process.
904    pub envelope: ManagedEnvelope,
905    /// The processing context.
906    pub ctx: processing::Context<'a>,
907}
908
909/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
910///
911/// This parses and validates the metrics:
912///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
913///    ignored independently.
914///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
915///    dropped together on parsing failure.
916///  - Other envelope items will be ignored with an error message.
917///
918/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
919/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
920#[derive(Debug)]
921pub struct ProcessMetrics {
922    /// A list of metric items.
923    pub data: MetricData,
924    /// The target project.
925    pub project_key: ProjectKey,
926    /// Whether to keep or reset the metric metadata.
927    pub source: BucketSource,
928    /// The wall clock time at which the request was received.
929    pub received_at: DateTime<Utc>,
930    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
931    /// correction.
932    pub sent_at: Option<DateTime<Utc>>,
933}
934
935/// Raw unparsed metric data.
936#[derive(Debug)]
937pub enum MetricData {
938    /// Raw data, unparsed envelope items.
939    Raw(Vec<Item>),
940    /// Already parsed buckets but unprocessed.
941    Parsed(Vec<Bucket>),
942}
943
944impl MetricData {
945    /// Consumes the metric data and parses the contained buckets.
946    ///
947    /// If the contained data is already parsed the buckets are returned unchanged.
948    /// Raw buckets are parsed and created with the passed `timestamp`.
949    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
950        let items = match self {
951            Self::Parsed(buckets) => return buckets,
952            Self::Raw(items) => items,
953        };
954
955        let mut buckets = Vec::new();
956        for item in items {
957            let payload = item.payload();
958            if item.ty() == &ItemType::Statsd {
959                for bucket_result in Bucket::parse_all(&payload, timestamp) {
960                    match bucket_result {
961                        Ok(bucket) => buckets.push(bucket),
962                        Err(error) => relay_log::debug!(
963                            error = &error as &dyn Error,
964                            "failed to parse metric bucket from statsd format",
965                        ),
966                    }
967                }
968            } else if item.ty() == &ItemType::MetricBuckets {
969                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
970                    Ok(parsed_buckets) => {
971                        // Re-use the allocation of `b` if possible.
972                        if buckets.is_empty() {
973                            buckets = parsed_buckets;
974                        } else {
975                            buckets.extend(parsed_buckets);
976                        }
977                    }
978                    Err(error) => {
979                        relay_log::debug!(
980                            error = &error as &dyn Error,
981                            "failed to parse metric bucket",
982                        );
983                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
984                    }
985                }
986            } else {
987                relay_log::error!(
988                    "invalid item of type {} passed to ProcessMetrics",
989                    item.ty()
990                );
991            }
992        }
993        buckets
994    }
995}
996
997#[derive(Debug)]
998pub struct ProcessBatchedMetrics {
999    /// Metrics payload in JSON format.
1000    pub payload: Bytes,
1001    /// Whether to keep or reset the metric metadata.
1002    pub source: BucketSource,
1003    /// The wall clock time at which the request was received.
1004    pub received_at: DateTime<Utc>,
1005    /// The wall clock time at which the request was received.
1006    pub sent_at: Option<DateTime<Utc>>,
1007}
1008
1009/// Source information where a metric bucket originates from.
1010#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1011pub enum BucketSource {
1012    /// The metric bucket originated from an internal Relay use case.
1013    ///
1014    /// The metric bucket originates either from within the same Relay
1015    /// or was accepted coming from another Relay which is registered as
1016    /// an internal Relay via Relay's configuration.
1017    Internal,
1018    /// The bucket source originated from an untrusted source.
1019    ///
1020    /// Managed Relays sending extracted metrics are considered external,
1021    /// it's a project use case but it comes from an untrusted source.
1022    External,
1023}
1024
1025impl BucketSource {
1026    /// Infers the bucket source from [`RequestMeta::request_trust`].
1027    pub fn from_meta(meta: &RequestMeta) -> Self {
1028        match meta.request_trust() {
1029            RequestTrust::Trusted => Self::Internal,
1030            RequestTrust::Untrusted => Self::External,
1031        }
1032    }
1033}
1034
1035/// Sends a client report to the upstream.
1036#[derive(Debug)]
1037pub struct SubmitClientReports {
1038    /// The client report to be sent.
1039    pub client_reports: Vec<ClientReport>,
1040    /// Scoping information for the client report.
1041    pub scoping: Scoping,
1042}
1043
1044/// CPU-intensive processing tasks for envelopes.
1045#[derive(Debug)]
1046pub enum EnvelopeProcessor {
1047    ProcessEnvelope(Box<ProcessEnvelope>),
1048    ProcessProjectMetrics(Box<ProcessMetrics>),
1049    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1050    FlushBuckets(Box<FlushBuckets>),
1051    SubmitClientReports(Box<SubmitClientReports>),
1052}
1053
1054impl EnvelopeProcessor {
1055    /// Returns the name of the message variant.
1056    pub fn variant(&self) -> &'static str {
1057        match self {
1058            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1059            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1060            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1061            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1062            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1063        }
1064    }
1065}
1066
1067impl relay_system::Interface for EnvelopeProcessor {}
1068
1069impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1070    type Response = relay_system::NoResponse;
1071
1072    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1073        Self::ProcessEnvelope(Box::new(message))
1074    }
1075}
1076
1077impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1078    type Response = NoResponse;
1079
1080    fn from_message(message: ProcessMetrics, _: ()) -> Self {
1081        Self::ProcessProjectMetrics(Box::new(message))
1082    }
1083}
1084
1085impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1086    type Response = NoResponse;
1087
1088    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1089        Self::ProcessBatchedMetrics(Box::new(message))
1090    }
1091}
1092
1093impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1094    type Response = NoResponse;
1095
1096    fn from_message(message: FlushBuckets, _: ()) -> Self {
1097        Self::FlushBuckets(Box::new(message))
1098    }
1099}
1100
1101impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1102    type Response = NoResponse;
1103
1104    fn from_message(message: SubmitClientReports, _: ()) -> Self {
1105        Self::SubmitClientReports(Box::new(message))
1106    }
1107}
1108
1109/// The asynchronous thread pool used for scheduling processing tasks in the processor.
1110pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1111
1112/// Service implementing the [`EnvelopeProcessor`] interface.
1113///
1114/// This service handles messages in a worker pool with configurable concurrency.
1115#[derive(Clone)]
1116pub struct EnvelopeProcessorService {
1117    inner: Arc<InnerProcessor>,
1118}
1119
1120/// Contains the addresses of services that the processor publishes to.
1121pub struct Addrs {
1122    pub outcome_aggregator: Addr<TrackOutcome>,
1123    pub upstream_relay: Addr<UpstreamRelay>,
1124    #[cfg(feature = "processing")]
1125    pub upload: Option<Addr<Upload>>,
1126    #[cfg(feature = "processing")]
1127    pub store_forwarder: Option<Addr<Store>>,
1128    pub aggregator: Addr<Aggregator>,
1129    #[cfg(feature = "processing")]
1130    pub global_rate_limits: Option<Addr<GlobalRateLimits>>,
1131}
1132
1133impl Default for Addrs {
1134    fn default() -> Self {
1135        Addrs {
1136            outcome_aggregator: Addr::dummy(),
1137            upstream_relay: Addr::dummy(),
1138            #[cfg(feature = "processing")]
1139            upload: None,
1140            #[cfg(feature = "processing")]
1141            store_forwarder: None,
1142            aggregator: Addr::dummy(),
1143            #[cfg(feature = "processing")]
1144            global_rate_limits: None,
1145        }
1146    }
1147}
1148
1149struct InnerProcessor {
1150    pool: EnvelopeProcessorServicePool,
1151    config: Arc<Config>,
1152    global_config: GlobalConfigHandle,
1153    project_cache: ProjectCacheHandle,
1154    cogs: Cogs,
1155    #[cfg(feature = "processing")]
1156    quotas_client: Option<AsyncRedisClient>,
1157    addrs: Addrs,
1158    #[cfg(feature = "processing")]
1159    rate_limiter: Option<Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>>,
1160    geoip_lookup: GeoIpLookup,
1161    #[cfg(feature = "processing")]
1162    cardinality_limiter: Option<CardinalityLimiter>,
1163    metric_outcomes: MetricOutcomes,
1164    processing: Processing,
1165}
1166
1167struct Processing {
1168    logs: LogsProcessor,
1169    trace_metrics: TraceMetricsProcessor,
1170    spans: SpansProcessor,
1171    check_ins: CheckInsProcessor,
1172    sessions: SessionsProcessor,
1173    trace_attachments: TraceAttachmentsProcessor,
1174}
1175
1176impl EnvelopeProcessorService {
1177    /// Creates a multi-threaded envelope processor.
1178    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1179    pub fn new(
1180        pool: EnvelopeProcessorServicePool,
1181        config: Arc<Config>,
1182        global_config: GlobalConfigHandle,
1183        project_cache: ProjectCacheHandle,
1184        cogs: Cogs,
1185        #[cfg(feature = "processing")] redis: Option<RedisClients>,
1186        addrs: Addrs,
1187        metric_outcomes: MetricOutcomes,
1188    ) -> Self {
1189        let geoip_lookup = config
1190            .geoip_path()
1191            .and_then(
1192                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1193                    Ok(geoip) => Some(geoip),
1194                    Err(err) => {
1195                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1196                        None
1197                    }
1198                },
1199            )
1200            .unwrap_or_else(GeoIpLookup::empty);
1201
1202        #[cfg(feature = "processing")]
1203        let (cardinality, quotas) = match redis {
1204            Some(RedisClients {
1205                cardinality,
1206                quotas,
1207                ..
1208            }) => (Some(cardinality), Some(quotas)),
1209            None => (None, None),
1210        };
1211
1212        #[cfg(feature = "processing")]
1213        let global_rate_limits = addrs.global_rate_limits.clone().map(Into::into);
1214
1215        #[cfg(feature = "processing")]
1216        let rate_limiter = match (quotas.clone(), global_rate_limits) {
1217            (Some(redis), Some(global)) => Some(
1218                RedisRateLimiter::new(redis, global)
1219                    .max_limit(config.max_rate_limit())
1220                    .cache(config.quota_cache_ratio(), config.quota_cache_max()),
1221            ),
1222            _ => None,
1223        };
1224
1225        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1226            #[cfg(feature = "processing")]
1227            project_cache.clone(),
1228            #[cfg(feature = "processing")]
1229            rate_limiter.clone(),
1230        ));
1231        #[cfg(feature = "processing")]
1232        let rate_limiter = rate_limiter.map(Arc::new);
1233
1234        let inner = InnerProcessor {
1235            pool,
1236            global_config,
1237            project_cache,
1238            cogs,
1239            #[cfg(feature = "processing")]
1240            quotas_client: quotas.clone(),
1241            #[cfg(feature = "processing")]
1242            rate_limiter,
1243            addrs,
1244            #[cfg(feature = "processing")]
1245            cardinality_limiter: cardinality
1246                .map(|cardinality| {
1247                    RedisSetLimiter::new(
1248                        RedisSetLimiterOptions {
1249                            cache_vacuum_interval: config
1250                                .cardinality_limiter_cache_vacuum_interval(),
1251                        },
1252                        cardinality,
1253                    )
1254                })
1255                .map(CardinalityLimiter::new),
1256            metric_outcomes,
1257            processing: Processing {
1258                logs: LogsProcessor::new(Arc::clone(&quota_limiter)),
1259                trace_metrics: TraceMetricsProcessor::new(Arc::clone(&quota_limiter)),
1260                spans: SpansProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1261                check_ins: CheckInsProcessor::new(Arc::clone(&quota_limiter)),
1262                sessions: SessionsProcessor::new(Arc::clone(&quota_limiter)),
1263                trace_attachments: TraceAttachmentsProcessor::new(quota_limiter),
1264            },
1265            geoip_lookup,
1266            config,
1267        };
1268
1269        Self {
1270            inner: Arc::new(inner),
1271        }
1272    }
1273
1274    async fn enforce_quotas<Group>(
1275        &self,
1276        managed_envelope: &mut TypedEnvelope<Group>,
1277        event: Annotated<Event>,
1278        extracted_metrics: &mut ProcessingExtractedMetrics,
1279        ctx: processing::Context<'_>,
1280    ) -> Result<Annotated<Event>, ProcessingError> {
1281        // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not
1282        // applied in the fast path, all cached quotas can be applied here.
1283        let cached_result = RateLimiter::Cached
1284            .enforce(managed_envelope, event, extracted_metrics, ctx)
1285            .await?;
1286
1287        if_processing!(self.inner.config, {
1288            let rate_limiter = match self.inner.rate_limiter.clone() {
1289                Some(rate_limiter) => rate_limiter,
1290                None => return Ok(cached_result.event),
1291            };
1292
1293            // Enforce all quotas consistently with Redis.
1294            let consistent_result = RateLimiter::Consistent(rate_limiter)
1295                .enforce(
1296                    managed_envelope,
1297                    cached_result.event,
1298                    extracted_metrics,
1299                    ctx
1300                )
1301                .await?;
1302
1303            // Update cached rate limits with the freshly computed ones.
1304            if !consistent_result.rate_limits.is_empty() {
1305                self.inner
1306                    .project_cache
1307                    .get(managed_envelope.scoping().project_key)
1308                    .rate_limits()
1309                    .merge(consistent_result.rate_limits);
1310            }
1311
1312            Ok(consistent_result.event)
1313        } else { Ok(cached_result.event) })
1314    }
1315
1316    /// Processes the general errors, and the items which require or create the events.
1317    async fn process_errors(
1318        &self,
1319        managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1320        project_id: ProjectId,
1321        mut ctx: processing::Context<'_>,
1322    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1323        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1324        let mut metrics = Metrics::default();
1325        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1326
1327        // Events can also contain user reports.
1328        report::process_user_reports(managed_envelope);
1329
1330        if_processing!(self.inner.config, {
1331            unreal::expand(managed_envelope, &self.inner.config)?;
1332            #[cfg(sentry)]
1333            playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1334            nnswitch::expand(managed_envelope)?;
1335        });
1336
1337        let extraction_result = event::extract(
1338            managed_envelope,
1339            &mut metrics,
1340            event_fully_normalized,
1341            &self.inner.config,
1342        )?;
1343        let mut event = extraction_result.event;
1344
1345        if_processing!(self.inner.config, {
1346            if let Some(inner_event_fully_normalized) =
1347                unreal::process(managed_envelope, &mut event)?
1348            {
1349                event_fully_normalized = inner_event_fully_normalized;
1350            }
1351            #[cfg(sentry)]
1352            if let Some(inner_event_fully_normalized) =
1353                playstation::process(managed_envelope, &mut event, ctx.project_info)?
1354            {
1355                event_fully_normalized = inner_event_fully_normalized;
1356            }
1357            if let Some(inner_event_fully_normalized) =
1358                attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1359            {
1360                event_fully_normalized = inner_event_fully_normalized;
1361            }
1362        });
1363
1364        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1365            managed_envelope,
1366            &mut event,
1367            ctx.project_info,
1368            ctx.sampling_project_info,
1369        );
1370
1371        let attachments = managed_envelope
1372            .envelope()
1373            .items()
1374            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1375        processing::utils::event::finalize(
1376            managed_envelope.envelope().headers(),
1377            &mut event,
1378            attachments,
1379            &mut metrics,
1380            ctx.config,
1381        )?;
1382        event_fully_normalized = processing::utils::event::normalize(
1383            managed_envelope.envelope().headers(),
1384            &mut event,
1385            event_fully_normalized,
1386            project_id,
1387            ctx,
1388            &self.inner.geoip_lookup,
1389        )?;
1390        let filter_run =
1391            processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1392                .map_err(|err| {
1393                managed_envelope.reject(Outcome::Filtered(err.clone()));
1394                ProcessingError::EventFiltered(err)
1395            })?;
1396
1397        if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1398            dynamic_sampling::tag_error_with_sampling_decision(
1399                managed_envelope,
1400                &mut event,
1401                ctx.sampling_project_info,
1402                &self.inner.config,
1403            )
1404            .await;
1405        }
1406
1407        event = self
1408            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1409            .await?;
1410
1411        if event.value().is_some() {
1412            processing::utils::event::scrub(&mut event, ctx.project_info)?;
1413            event::serialize(
1414                managed_envelope,
1415                &mut event,
1416                event_fully_normalized,
1417                EventMetricsExtracted(false),
1418                SpansExtracted(false),
1419            )?;
1420            event::emit_feedback_metrics(managed_envelope.envelope());
1421        }
1422
1423        let attachments = managed_envelope
1424            .envelope_mut()
1425            .items_mut()
1426            .filter(|i| i.ty() == &ItemType::Attachment);
1427        processing::utils::attachments::scrub(attachments, ctx.project_info);
1428
1429        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1430            relay_log::error!(
1431                tags.project = %project_id,
1432                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1433                "ingested event without normalizing"
1434            );
1435        }
1436
1437        Ok(Some(extracted_metrics))
1438    }
1439
1440    /// Processes only transactions and transaction-related items.
1441    #[allow(unused_assignments)]
1442    async fn process_transactions(
1443        &self,
1444        managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1445        cogs: &mut Token,
1446        project_id: ProjectId,
1447        mut ctx: processing::Context<'_>,
1448    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1449        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1450        let mut event_metrics_extracted = EventMetricsExtracted(false);
1451        let mut spans_extracted = SpansExtracted(false);
1452        let mut metrics = Metrics::default();
1453        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1454
1455        // We extract the main event from the envelope.
1456        let extraction_result = event::extract(
1457            managed_envelope,
1458            &mut metrics,
1459            event_fully_normalized,
1460            &self.inner.config,
1461        )?;
1462
1463        // If metrics were extracted we mark that.
1464        if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1465            event_metrics_extracted = inner_event_metrics_extracted;
1466        }
1467        if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1468            spans_extracted = inner_spans_extracted;
1469        };
1470
1471        // We take the main event out of the result.
1472        let mut event = extraction_result.event;
1473
1474        let profile_id = profile::filter(
1475            managed_envelope,
1476            &event,
1477            ctx.config,
1478            project_id,
1479            ctx.project_info,
1480        );
1481        processing::transactions::profile::transfer_id(&mut event, profile_id);
1482        processing::transactions::profile::remove_context_if_rate_limited(
1483            &mut event,
1484            managed_envelope.scoping(),
1485            ctx,
1486        );
1487
1488        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1489            managed_envelope,
1490            &mut event,
1491            ctx.project_info,
1492            ctx.sampling_project_info,
1493        );
1494
1495        let attachments = managed_envelope
1496            .envelope()
1497            .items()
1498            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1499        processing::utils::event::finalize(
1500            managed_envelope.envelope().headers(),
1501            &mut event,
1502            attachments,
1503            &mut metrics,
1504            &self.inner.config,
1505        )?;
1506
1507        event_fully_normalized = processing::utils::event::normalize(
1508            managed_envelope.envelope().headers(),
1509            &mut event,
1510            event_fully_normalized,
1511            project_id,
1512            ctx,
1513            &self.inner.geoip_lookup,
1514        )?;
1515
1516        let filter_run =
1517            processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1518                .map_err(|err| {
1519                managed_envelope.reject(Outcome::Filtered(err.clone()));
1520                ProcessingError::EventFiltered(err)
1521            })?;
1522
1523        // Always run dynamic sampling on processing Relays,
1524        // but delay decision until inbound filters have been fully processed.
1525        // Also, we require transaction metrics to be enabled before sampling.
1526        let run_dynamic_sampling = (matches!(filter_run, FiltersStatus::Ok)
1527            || self.inner.config.processing_enabled())
1528            && matches!(&ctx.project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled());
1529
1530        let sampling_result = match run_dynamic_sampling {
1531            true => {
1532                #[allow(unused_mut)]
1533                let mut reservoir = ReservoirEvaluator::new(Arc::clone(ctx.reservoir_counters));
1534                #[cfg(feature = "processing")]
1535                if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
1536                    reservoir.set_redis(managed_envelope.scoping().organization_id, quotas_client);
1537                }
1538                processing::utils::dynamic_sampling::run(
1539                    managed_envelope.envelope().headers().dsc(),
1540                    event.value(),
1541                    &ctx,
1542                    Some(&reservoir),
1543                )
1544                .await
1545            }
1546            false => SamplingResult::Pending,
1547        };
1548
1549        relay_statsd::metric!(
1550            counter(RelayCounters::SamplingDecision) += 1,
1551            decision = sampling_result.decision().as_str(),
1552            item = "transaction"
1553        );
1554
1555        #[cfg(feature = "processing")]
1556        let server_sample_rate = sampling_result.sample_rate();
1557
1558        if let Some(outcome) = sampling_result.into_dropped_outcome() {
1559            // Process profiles before dropping the transaction, if necessary.
1560            // Before metric extraction to make sure the profile count is reflected correctly.
1561            profile::process(
1562                managed_envelope,
1563                &mut event,
1564                ctx.global_config,
1565                ctx.config,
1566                ctx.project_info,
1567            );
1568            // Extract metrics here, we're about to drop the event/transaction.
1569            event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1570                &mut event,
1571                &mut extracted_metrics,
1572                ExtractMetricsContext {
1573                    dsc: managed_envelope.envelope().dsc(),
1574                    project_id,
1575                    ctx,
1576                    sampling_decision: SamplingDecision::Drop,
1577                    metrics_extracted: event_metrics_extracted.0,
1578                    spans_extracted: spans_extracted.0,
1579                },
1580            )?;
1581
1582            dynamic_sampling::drop_unsampled_items(
1583                managed_envelope,
1584                event,
1585                outcome,
1586                spans_extracted,
1587            );
1588
1589            // At this point we have:
1590            //  - An empty envelope.
1591            //  - An envelope containing only processed profiles.
1592            // We need to make sure there are enough quotas for these profiles.
1593            event = self
1594                .enforce_quotas(
1595                    managed_envelope,
1596                    Annotated::empty(),
1597                    &mut extracted_metrics,
1598                    ctx,
1599                )
1600                .await?;
1601
1602            return Ok(Some(extracted_metrics));
1603        }
1604
1605        let _post_ds = cogs.start_category("post_ds");
1606
1607        // Need to scrub the transaction before extracting spans.
1608        //
1609        // Unconditionally scrub to make sure PII is removed as early as possible.
1610        processing::utils::event::scrub(&mut event, ctx.project_info)?;
1611
1612        let attachments = managed_envelope
1613            .envelope_mut()
1614            .items_mut()
1615            .filter(|i| i.ty() == &ItemType::Attachment);
1616        processing::utils::attachments::scrub(attachments, ctx.project_info);
1617
1618        if_processing!(self.inner.config, {
1619            // Process profiles before extracting metrics, to make sure they are removed if they are invalid.
1620            let profile_id = profile::process(
1621                managed_envelope,
1622                &mut event,
1623                ctx.global_config,
1624                ctx.config,
1625                ctx.project_info,
1626            );
1627            processing::transactions::profile::transfer_id(&mut event, profile_id);
1628            processing::transactions::profile::scrub_profiler_id(&mut event);
1629
1630            // Always extract metrics in processing Relays for sampled items.
1631            event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1632                &mut event,
1633                &mut extracted_metrics,
1634                ExtractMetricsContext {
1635                    dsc: managed_envelope.envelope().dsc(),
1636                    project_id,
1637                    ctx,
1638                    sampling_decision: SamplingDecision::Keep,
1639                    metrics_extracted: event_metrics_extracted.0,
1640                    spans_extracted: spans_extracted.0,
1641                },
1642            )?;
1643
1644            if let Some(spans) = processing::transactions::spans::extract_from_event(
1645                managed_envelope.envelope().dsc(),
1646                &event,
1647                ctx.global_config,
1648                ctx.config,
1649                server_sample_rate,
1650                event_metrics_extracted,
1651                spans_extracted,
1652            ) {
1653                spans_extracted = SpansExtracted(true);
1654                for item in spans {
1655                    match item {
1656                        Ok(item) => managed_envelope.envelope_mut().add_item(item),
1657                        Err(()) => managed_envelope.track_outcome(
1658                            Outcome::Invalid(DiscardReason::InvalidSpan),
1659                            DataCategory::SpanIndexed,
1660                            1,
1661                        ),
1662                        // TODO: also `DataCategory::Span`?
1663                    }
1664                }
1665            }
1666        });
1667
1668        event = self
1669            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1670            .await?;
1671
1672        // Event may have been dropped because of a quota and the envelope can be empty.
1673        if event.value().is_some() {
1674            event::serialize(
1675                managed_envelope,
1676                &mut event,
1677                event_fully_normalized,
1678                event_metrics_extracted,
1679                spans_extracted,
1680            )?;
1681        }
1682
1683        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1684            relay_log::error!(
1685                tags.project = %project_id,
1686                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1687                "ingested event without normalizing"
1688            );
1689        };
1690
1691        Ok(Some(extracted_metrics))
1692    }
1693
1694    async fn process_profile_chunks(
1695        &self,
1696        managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
1697        ctx: processing::Context<'_>,
1698    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1699        profile_chunk::filter(managed_envelope, ctx.project_info);
1700
1701        if_processing!(self.inner.config, {
1702            profile_chunk::process(
1703                managed_envelope,
1704                ctx.project_info,
1705                ctx.global_config,
1706                ctx.config,
1707            );
1708        });
1709
1710        self.enforce_quotas(
1711            managed_envelope,
1712            Annotated::empty(),
1713            &mut ProcessingExtractedMetrics::new(),
1714            ctx,
1715        )
1716        .await?;
1717
1718        Ok(None)
1719    }
1720
1721    /// Processes standalone items that require an event ID, but do not have an event on the same envelope.
1722    async fn process_standalone(
1723        &self,
1724        managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1725        project_id: ProjectId,
1726        ctx: processing::Context<'_>,
1727    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1728        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1729
1730        standalone::process(managed_envelope);
1731
1732        profile::filter(
1733            managed_envelope,
1734            &Annotated::empty(),
1735            ctx.config,
1736            project_id,
1737            ctx.project_info,
1738        );
1739
1740        self.enforce_quotas(
1741            managed_envelope,
1742            Annotated::empty(),
1743            &mut extracted_metrics,
1744            ctx,
1745        )
1746        .await?;
1747
1748        report::process_user_reports(managed_envelope);
1749        let attachments = managed_envelope
1750            .envelope_mut()
1751            .items_mut()
1752            .filter(|i| i.ty() == &ItemType::Attachment);
1753        processing::utils::attachments::scrub(attachments, ctx.project_info);
1754
1755        Ok(Some(extracted_metrics))
1756    }
1757
1758    /// Processes user and client reports.
1759    async fn process_client_reports(
1760        &self,
1761        managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1762        ctx: processing::Context<'_>,
1763    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1764        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1765
1766        self.enforce_quotas(
1767            managed_envelope,
1768            Annotated::empty(),
1769            &mut extracted_metrics,
1770            ctx,
1771        )
1772        .await?;
1773
1774        report::process_client_reports(
1775            managed_envelope,
1776            ctx.config,
1777            ctx.project_info,
1778            self.inner.addrs.outcome_aggregator.clone(),
1779        );
1780
1781        Ok(Some(extracted_metrics))
1782    }
1783
1784    /// Processes replays.
1785    async fn process_replays(
1786        &self,
1787        managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1788        ctx: processing::Context<'_>,
1789    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1790        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1791
1792        replay::process(
1793            managed_envelope,
1794            ctx.global_config,
1795            ctx.config,
1796            ctx.project_info,
1797            &self.inner.geoip_lookup,
1798        )?;
1799
1800        self.enforce_quotas(
1801            managed_envelope,
1802            Annotated::empty(),
1803            &mut extracted_metrics,
1804            ctx,
1805        )
1806        .await?;
1807
1808        Ok(Some(extracted_metrics))
1809    }
1810
1811    async fn process_nel(
1812        &self,
1813        mut managed_envelope: ManagedEnvelope,
1814        ctx: processing::Context<'_>,
1815    ) -> Result<ProcessingResult, ProcessingError> {
1816        nel::convert_to_logs(&mut managed_envelope);
1817        self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1818            .await
1819    }
1820
1821    async fn process_with_processor<P: processing::Processor>(
1822        &self,
1823        processor: &P,
1824        mut managed_envelope: ManagedEnvelope,
1825        ctx: processing::Context<'_>,
1826    ) -> Result<ProcessingResult, ProcessingError>
1827    where
1828        Outputs: From<P::Output>,
1829    {
1830        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1831            debug_assert!(
1832                false,
1833                "there must be work for the {} processor",
1834                std::any::type_name::<P>(),
1835            );
1836            return Err(ProcessingError::ProcessingGroupMismatch);
1837        };
1838
1839        managed_envelope.update();
1840        match managed_envelope.envelope().is_empty() {
1841            true => managed_envelope.accept(),
1842            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1843        }
1844
1845        processor
1846            .process(work, ctx)
1847            .await
1848            .map_err(|err| {
1849                relay_log::debug!(
1850                    error = &err as &dyn std::error::Error,
1851                    "processing pipeline failed"
1852                );
1853                ProcessingError::ProcessingFailure
1854            })
1855            .map(|o| o.map(Into::into))
1856            .map(ProcessingResult::Output)
1857    }
1858
1859    /// Processes standalone spans.
1860    ///
1861    /// This function does *not* run for spans extracted from transactions.
1862    async fn process_standalone_spans(
1863        &self,
1864        managed_envelope: &mut TypedEnvelope<SpanGroup>,
1865        _project_id: ProjectId,
1866        ctx: processing::Context<'_>,
1867    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1868        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1869
1870        span::filter(managed_envelope, ctx.config, ctx.project_info);
1871        span::convert_otel_traces_data(managed_envelope);
1872
1873        if_processing!(self.inner.config, {
1874            span::process(
1875                managed_envelope,
1876                &mut Annotated::empty(),
1877                &mut extracted_metrics,
1878                _project_id,
1879                ctx,
1880                &self.inner.geoip_lookup,
1881            )
1882            .await;
1883        });
1884
1885        self.enforce_quotas(
1886            managed_envelope,
1887            Annotated::empty(),
1888            &mut extracted_metrics,
1889            ctx,
1890        )
1891        .await?;
1892
1893        Ok(Some(extracted_metrics))
1894    }
1895
1896    async fn process_envelope(
1897        &self,
1898        cogs: &mut Token,
1899        project_id: ProjectId,
1900        message: ProcessEnvelopeGrouped<'_>,
1901    ) -> Result<ProcessingResult, ProcessingError> {
1902        let ProcessEnvelopeGrouped {
1903            group,
1904            envelope: mut managed_envelope,
1905            ctx,
1906        } = message;
1907
1908        // Pre-process the envelope headers.
1909        if let Some(sampling_state) = ctx.sampling_project_info {
1910            // Both transactions and standalone span envelopes need a normalized DSC header
1911            // to make sampling rules based on the segment/transaction name work correctly.
1912            managed_envelope
1913                .envelope_mut()
1914                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1915        }
1916
1917        // Set the event retention. Effectively, this value will only be available in processing
1918        // mode when the full project config is queried from the upstream.
1919        if let Some(retention) = ctx.project_info.config.event_retention {
1920            managed_envelope.envelope_mut().set_retention(retention);
1921        }
1922
1923        // Set the event retention. Effectively, this value will only be available in processing
1924        // mode when the full project config is queried from the upstream.
1925        if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1926            managed_envelope
1927                .envelope_mut()
1928                .set_downsampled_retention(retention);
1929        }
1930
1931        // Ensure the project ID is updated to the stored instance for this project cache. This can
1932        // differ in two cases:
1933        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
1934        //  2. The DSN was moved and the envelope sent to the old project ID.
1935        managed_envelope
1936            .envelope_mut()
1937            .meta_mut()
1938            .set_project_id(project_id);
1939
1940        macro_rules! run {
1941            ($fn_name:ident $(, $args:expr)*) => {
1942                async {
1943                    let mut managed_envelope = (managed_envelope, group).try_into()?;
1944                    match self.$fn_name(&mut managed_envelope, $($args),*).await {
1945                        Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1946                            managed_envelope: managed_envelope.into_processed(),
1947                            extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1948                        }),
1949                        Err(error) => {
1950                            relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1951                            if let Some(outcome) = error.to_outcome() {
1952                                managed_envelope.reject(outcome);
1953                            }
1954
1955                            return Err(error);
1956                        }
1957                    }
1958                }.await
1959            };
1960        }
1961
1962        relay_log::trace!("Processing {group} group", group = group.variant());
1963
1964        match group {
1965            ProcessingGroup::Error => run!(process_errors, project_id, ctx),
1966            ProcessingGroup::Transaction => {
1967                run!(process_transactions, cogs, project_id, ctx)
1968            }
1969            ProcessingGroup::Session => {
1970                self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1971                    .await
1972            }
1973            ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx),
1974            ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
1975            ProcessingGroup::Replay => {
1976                run!(process_replays, ctx)
1977            }
1978            ProcessingGroup::CheckIn => {
1979                self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1980                    .await
1981            }
1982            ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1983            ProcessingGroup::Log => {
1984                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1985                    .await
1986            }
1987            ProcessingGroup::TraceMetric => {
1988                self.process_with_processor(
1989                    &self.inner.processing.trace_metrics,
1990                    managed_envelope,
1991                    ctx,
1992                )
1993                .await
1994            }
1995            ProcessingGroup::SpanV2 => {
1996                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1997                    .await
1998            }
1999            ProcessingGroup::TraceAttachment => {
2000                self.process_with_processor(
2001                    &self.inner.processing.trace_attachments,
2002                    managed_envelope,
2003                    ctx,
2004                )
2005                .await
2006            }
2007            ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
2008            ProcessingGroup::ProfileChunk => {
2009                run!(process_profile_chunks, ctx)
2010            }
2011            // Currently is not used.
2012            ProcessingGroup::Metrics => {
2013                // In proxy mode we simply forward the metrics.
2014                // This group shouldn't be used outside of proxy mode.
2015                if self.inner.config.relay_mode() != RelayMode::Proxy {
2016                    relay_log::error!(
2017                        tags.project = %project_id,
2018                        items = ?managed_envelope.envelope().items().next().map(Item::ty),
2019                        "received metrics in the process_state"
2020                    );
2021                }
2022
2023                Ok(ProcessingResult::no_metrics(
2024                    managed_envelope.into_processed(),
2025                ))
2026            }
2027            // Fallback to the legacy process_state implementation for Ungrouped events.
2028            ProcessingGroup::Ungrouped => {
2029                relay_log::error!(
2030                    tags.project = %project_id,
2031                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
2032                    "could not identify the processing group based on the envelope's items"
2033                );
2034
2035                Ok(ProcessingResult::no_metrics(
2036                    managed_envelope.into_processed(),
2037                ))
2038            }
2039            // Leave this group unchanged.
2040            //
2041            // This will later be forwarded to upstream.
2042            ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
2043                managed_envelope.into_processed(),
2044            )),
2045        }
2046    }
2047
2048    /// Processes the envelope and returns the processed envelope back.
2049    ///
2050    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
2051    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
2052    /// to be dropped, this is `None`.
2053    async fn process<'a>(
2054        &self,
2055        cogs: &mut Token,
2056        mut message: ProcessEnvelopeGrouped<'a>,
2057    ) -> Result<Option<Submit<'a>>, ProcessingError> {
2058        let ProcessEnvelopeGrouped {
2059            ref mut envelope,
2060            ctx,
2061            ..
2062        } = message;
2063
2064        // Prefer the project's project ID, and fall back to the stated project id from the
2065        // envelope. The project ID is available in all modes, other than in proxy mode, where
2066        // envelopes for unknown projects are forwarded blindly.
2067        //
2068        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
2069        // since we cannot process an envelope without project ID, so drop it.
2070        let Some(project_id) = ctx
2071            .project_info
2072            .project_id
2073            .or_else(|| envelope.envelope().meta().project_id())
2074        else {
2075            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2076            return Err(ProcessingError::MissingProjectId);
2077        };
2078
2079        let client = envelope.envelope().meta().client().map(str::to_owned);
2080        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2081        let project_key = envelope.envelope().meta().public_key();
2082        // Only allow sending to the sampling key, if we successfully loaded a sampling project
2083        // info relating to it. This filters out unknown/invalid project keys as well as project
2084        // keys from different organizations.
2085        let sampling_key = envelope
2086            .envelope()
2087            .sampling_key()
2088            .filter(|_| ctx.sampling_project_info.is_some());
2089
2090        // We set additional information on the scope, which will be removed after processing the
2091        // envelope.
2092        relay_log::configure_scope(|scope| {
2093            scope.set_tag("project", project_id);
2094            if let Some(client) = client {
2095                scope.set_tag("sdk", client);
2096            }
2097            if let Some(user_agent) = user_agent {
2098                scope.set_extra("user_agent", user_agent.into());
2099            }
2100        });
2101
2102        let result = match self.process_envelope(cogs, project_id, message).await {
2103            Ok(ProcessingResult::Envelope {
2104                mut managed_envelope,
2105                extracted_metrics,
2106            }) => {
2107                // The envelope could be modified or even emptied during processing, which
2108                // requires re-computation of the context.
2109                managed_envelope.update();
2110
2111                let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2112                send_metrics(
2113                    extracted_metrics.metrics,
2114                    project_key,
2115                    sampling_key,
2116                    &self.inner.addrs.aggregator,
2117                );
2118
2119                let envelope_response = if managed_envelope.envelope().is_empty() {
2120                    if !has_metrics {
2121                        // Individual rate limits have already been issued
2122                        managed_envelope.reject(Outcome::RateLimited(None));
2123                    } else {
2124                        managed_envelope.accept();
2125                    }
2126
2127                    None
2128                } else {
2129                    Some(managed_envelope)
2130                };
2131
2132                Ok(envelope_response.map(Submit::Envelope))
2133            }
2134            Ok(ProcessingResult::Output(Output { main, metrics })) => {
2135                if let Some(metrics) = metrics {
2136                    metrics.accept(|metrics| {
2137                        send_metrics(
2138                            metrics,
2139                            project_key,
2140                            sampling_key,
2141                            &self.inner.addrs.aggregator,
2142                        );
2143                    });
2144                }
2145
2146                let ctx = ctx.to_forward();
2147                Ok(main.map(|output| Submit::Output { output, ctx }))
2148            }
2149            Err(err) => Err(err),
2150        };
2151
2152        relay_log::configure_scope(|scope| {
2153            scope.remove_tag("project");
2154            scope.remove_tag("sdk");
2155            scope.remove_tag("user_agent");
2156        });
2157
2158        result
2159    }
2160
2161    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2162        let project_key = message.envelope.envelope().meta().public_key();
2163        let wait_time = message.envelope.age();
2164        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2165
2166        // This COGS handling may need an overhaul in the future:
2167        // Cancel the passed in token, to start individual measurements per envelope instead.
2168        cogs.cancel();
2169
2170        let scoping = message.envelope.scoping();
2171        for (group, envelope) in ProcessingGroup::split_envelope(
2172            *message.envelope.into_envelope(),
2173            &message.project_info,
2174        ) {
2175            let mut cogs = self
2176                .inner
2177                .cogs
2178                .timed(ResourceId::Relay, AppFeature::from(group));
2179
2180            let mut envelope =
2181                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2182            envelope.scope(scoping);
2183
2184            let global_config = self.inner.global_config.current();
2185
2186            let ctx = processing::Context {
2187                config: &self.inner.config,
2188                global_config: &global_config,
2189                project_info: &message.project_info,
2190                sampling_project_info: message.sampling_project_info.as_deref(),
2191                rate_limits: &message.rate_limits,
2192                reservoir_counters: &message.reservoir_counters,
2193            };
2194
2195            let message = ProcessEnvelopeGrouped {
2196                group,
2197                envelope,
2198                ctx,
2199            };
2200
2201            let result = metric!(
2202                timer(RelayTimers::EnvelopeProcessingTime),
2203                group = group.variant(),
2204                { self.process(&mut cogs, message).await }
2205            );
2206
2207            match result {
2208                Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2209                Ok(None) => {}
2210                Err(error) if error.is_unexpected() => {
2211                    relay_log::error!(
2212                        tags.project_key = %project_key,
2213                        error = &error as &dyn Error,
2214                        "error processing envelope"
2215                    )
2216                }
2217                Err(error) => {
2218                    relay_log::debug!(
2219                        tags.project_key = %project_key,
2220                        error = &error as &dyn Error,
2221                        "error processing envelope"
2222                    )
2223                }
2224            }
2225        }
2226    }
2227
2228    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2229        let ProcessMetrics {
2230            data,
2231            project_key,
2232            received_at,
2233            sent_at,
2234            source,
2235        } = message;
2236
2237        let received_timestamp =
2238            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2239
2240        let mut buckets = data.into_buckets(received_timestamp);
2241        if buckets.is_empty() {
2242            return;
2243        };
2244        cogs.update(relay_metrics::cogs::BySize(&buckets));
2245
2246        let clock_drift_processor =
2247            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2248
2249        buckets.retain_mut(|bucket| {
2250            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2251                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2252                return false;
2253            }
2254
2255            if !self::metrics::is_valid_namespace(bucket) {
2256                return false;
2257            }
2258
2259            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2260
2261            if !matches!(source, BucketSource::Internal) {
2262                bucket.metadata = BucketMetadata::new(received_timestamp);
2263            }
2264
2265            true
2266        });
2267
2268        let project = self.inner.project_cache.get(project_key);
2269
2270        // Best effort check to filter and rate limit buckets, if there is no project state
2271        // available at the current time, we will check again after flushing.
2272        let buckets = match project.state() {
2273            ProjectState::Enabled(project_info) => {
2274                let rate_limits = project.rate_limits().current_limits();
2275                self.check_buckets(project_key, project_info, &rate_limits, buckets)
2276            }
2277            _ => buckets,
2278        };
2279
2280        relay_log::trace!("merging metric buckets into the aggregator");
2281        self.inner
2282            .addrs
2283            .aggregator
2284            .send(MergeBuckets::new(project_key, buckets));
2285    }
2286
2287    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2288        let ProcessBatchedMetrics {
2289            payload,
2290            source,
2291            received_at,
2292            sent_at,
2293        } = message;
2294
2295        #[derive(serde::Deserialize)]
2296        struct Wrapper {
2297            buckets: HashMap<ProjectKey, Vec<Bucket>>,
2298        }
2299
2300        let buckets = match serde_json::from_slice(&payload) {
2301            Ok(Wrapper { buckets }) => buckets,
2302            Err(error) => {
2303                relay_log::debug!(
2304                    error = &error as &dyn Error,
2305                    "failed to parse batched metrics",
2306                );
2307                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2308                return;
2309            }
2310        };
2311
2312        for (project_key, buckets) in buckets {
2313            self.handle_process_metrics(
2314                cogs,
2315                ProcessMetrics {
2316                    data: MetricData::Parsed(buckets),
2317                    project_key,
2318                    source,
2319                    received_at,
2320                    sent_at,
2321                },
2322            )
2323        }
2324    }
2325
2326    fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2327        let _submit = cogs.start_category("submit");
2328
2329        #[cfg(feature = "processing")]
2330        if self.inner.config.processing_enabled()
2331            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2332        {
2333            use crate::processing::StoreHandle;
2334
2335            let upload = self.inner.addrs.upload.as_ref();
2336            match submit {
2337                Submit::Envelope(envelope) => {
2338                    let envelope_has_attachments = envelope
2339                        .envelope()
2340                        .items()
2341                        .any(|item| *item.ty() == ItemType::Attachment);
2342                    // Whether Relay will store this attachment in objectstore or use kafka like before.
2343                    let use_objectstore = || {
2344                        let options = &self.inner.global_config.current().options;
2345                        utils::sample(options.objectstore_attachments_sample_rate).is_keep()
2346                    };
2347
2348                    if let Some(upload) = &self.inner.addrs.upload
2349                        && envelope_has_attachments
2350                        && use_objectstore()
2351                    {
2352                        // the `UploadService` will upload all attachments, and then forward the envelope to the `StoreService`.
2353                        upload.send(StoreEnvelope { envelope })
2354                    } else {
2355                        store_forwarder.send(StoreEnvelope { envelope })
2356                    }
2357                }
2358                Submit::Output { output, ctx } => output
2359                    .forward_store(StoreHandle::new(store_forwarder, upload), ctx)
2360                    .unwrap_or_else(|err| err.into_inner()),
2361            }
2362            return;
2363        }
2364
2365        let mut envelope = match submit {
2366            Submit::Envelope(envelope) => envelope,
2367            Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2368                Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2369                Err(_) => {
2370                    relay_log::error!("failed to serialize output to an envelope");
2371                    return;
2372                }
2373            },
2374        };
2375
2376        if envelope.envelope_mut().is_empty() {
2377            envelope.accept();
2378            return;
2379        }
2380
2381        // Override the `sent_at` timestamp. Since the envelope went through basic
2382        // normalization, all timestamps have been corrected. We propagate the new
2383        // `sent_at` to allow the next Relay to double-check this timestamp and
2384        // potentially apply correction again. This is done as close to sending as
2385        // possible so that we avoid internal delays.
2386        envelope.envelope_mut().set_sent_at(Utc::now());
2387
2388        relay_log::trace!("sending envelope to sentry endpoint");
2389        let http_encoding = self.inner.config.http_encoding();
2390        let result = envelope.envelope().to_vec().and_then(|v| {
2391            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2392        });
2393
2394        match result {
2395            Ok(body) => {
2396                self.inner
2397                    .addrs
2398                    .upstream_relay
2399                    .send(SendRequest(SendEnvelope {
2400                        envelope,
2401                        body,
2402                        http_encoding,
2403                        project_cache: self.inner.project_cache.clone(),
2404                    }));
2405            }
2406            Err(error) => {
2407                // Errors are only logged for what we consider an internal discard reason. These
2408                // indicate errors in the infrastructure or implementation bugs.
2409                relay_log::error!(
2410                    error = &error as &dyn Error,
2411                    tags.project_key = %envelope.scoping().project_key,
2412                    "failed to serialize envelope payload"
2413                );
2414
2415                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2416            }
2417        }
2418    }
2419
2420    fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2421        let SubmitClientReports {
2422            client_reports,
2423            scoping,
2424        } = message;
2425
2426        let upstream = self.inner.config.upstream_descriptor();
2427        let dsn = PartialDsn::outbound(&scoping, upstream);
2428
2429        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2430        for client_report in client_reports {
2431            match client_report.serialize() {
2432                Ok(payload) => {
2433                    let mut item = Item::new(ItemType::ClientReport);
2434                    item.set_payload(ContentType::Json, payload);
2435                    envelope.add_item(item);
2436                }
2437                Err(error) => {
2438                    relay_log::error!(
2439                        error = &error as &dyn std::error::Error,
2440                        "failed to serialize client report"
2441                    );
2442                }
2443            }
2444        }
2445
2446        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2447        self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2448    }
2449
2450    fn check_buckets(
2451        &self,
2452        project_key: ProjectKey,
2453        project_info: &ProjectInfo,
2454        rate_limits: &RateLimits,
2455        buckets: Vec<Bucket>,
2456    ) -> Vec<Bucket> {
2457        let Some(scoping) = project_info.scoping(project_key) else {
2458            relay_log::error!(
2459                tags.project_key = project_key.as_str(),
2460                "there is no scoping: dropping {} buckets",
2461                buckets.len(),
2462            );
2463            return Vec::new();
2464        };
2465
2466        let mut buckets = self::metrics::apply_project_info(
2467            buckets,
2468            &self.inner.metric_outcomes,
2469            project_info,
2470            scoping,
2471        );
2472
2473        let namespaces: BTreeSet<MetricNamespace> = buckets
2474            .iter()
2475            .filter_map(|bucket| bucket.name.try_namespace())
2476            .collect();
2477
2478        for namespace in namespaces {
2479            let limits = rate_limits.check_with_quotas(
2480                project_info.get_quotas(),
2481                scoping.item(DataCategory::MetricBucket),
2482            );
2483
2484            if limits.is_limited() {
2485                let rejected;
2486                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2487                    bucket.name.try_namespace() == Some(namespace)
2488                });
2489
2490                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2491                self.inner.metric_outcomes.track(
2492                    scoping,
2493                    &rejected,
2494                    Outcome::RateLimited(reason_code),
2495                );
2496            }
2497        }
2498
2499        let quotas = project_info.config.quotas.clone();
2500        match MetricsLimiter::create(buckets, quotas, scoping) {
2501            Ok(mut bucket_limiter) => {
2502                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2503                bucket_limiter.into_buckets()
2504            }
2505            Err(buckets) => buckets,
2506        }
2507    }
2508
2509    #[cfg(feature = "processing")]
2510    async fn rate_limit_buckets(
2511        &self,
2512        scoping: Scoping,
2513        project_info: &ProjectInfo,
2514        mut buckets: Vec<Bucket>,
2515    ) -> Vec<Bucket> {
2516        let Some(rate_limiter) = &self.inner.rate_limiter else {
2517            return buckets;
2518        };
2519
2520        let global_config = self.inner.global_config.current();
2521        let namespaces = buckets
2522            .iter()
2523            .filter_map(|bucket| bucket.name.try_namespace())
2524            .counts();
2525
2526        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2527
2528        for (namespace, quantity) in namespaces {
2529            let item_scoping = scoping.metric_bucket(namespace);
2530
2531            let limits = match rate_limiter
2532                .is_rate_limited(quotas, item_scoping, quantity, false)
2533                .await
2534            {
2535                Ok(limits) => limits,
2536                Err(err) => {
2537                    relay_log::error!(
2538                        error = &err as &dyn std::error::Error,
2539                        "failed to check redis rate limits"
2540                    );
2541                    break;
2542                }
2543            };
2544
2545            if limits.is_limited() {
2546                let rejected;
2547                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2548                    bucket.name.try_namespace() == Some(namespace)
2549                });
2550
2551                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2552                self.inner.metric_outcomes.track(
2553                    scoping,
2554                    &rejected,
2555                    Outcome::RateLimited(reason_code),
2556                );
2557
2558                self.inner
2559                    .project_cache
2560                    .get(item_scoping.scoping.project_key)
2561                    .rate_limits()
2562                    .merge(limits);
2563            }
2564        }
2565
2566        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2567            Err(buckets) => buckets,
2568            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2569        }
2570    }
2571
2572    /// Check and apply rate limits to metrics buckets for transactions and spans.
2573    #[cfg(feature = "processing")]
2574    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2575        relay_log::trace!("handle_rate_limit_buckets");
2576
2577        let scoping = *bucket_limiter.scoping();
2578
2579        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2580            let global_config = self.inner.global_config.current();
2581            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2582
2583            // We set over_accept_once such that the limit is actually reached, which allows subsequent
2584            // calls with quantity=0 to be rate limited.
2585            let over_accept_once = true;
2586            let mut rate_limits = RateLimits::new();
2587
2588            for category in [DataCategory::Transaction, DataCategory::Span] {
2589                let count = bucket_limiter.count(category);
2590
2591                let timer = Instant::now();
2592                let mut is_limited = false;
2593
2594                if let Some(count) = count {
2595                    match rate_limiter
2596                        .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2597                        .await
2598                    {
2599                        Ok(limits) => {
2600                            is_limited = limits.is_limited();
2601                            rate_limits.merge(limits)
2602                        }
2603                        Err(e) => relay_log::error!(error = &e as &dyn Error),
2604                    }
2605                }
2606
2607                relay_statsd::metric!(
2608                    timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2609                    category = category.name(),
2610                    limited = if is_limited { "true" } else { "false" },
2611                    count = match count {
2612                        None => "none",
2613                        Some(0) => "0",
2614                        Some(1) => "1",
2615                        Some(1..=10) => "10",
2616                        Some(1..=25) => "25",
2617                        Some(1..=50) => "50",
2618                        Some(51..=100) => "100",
2619                        Some(101..=500) => "500",
2620                        _ => "> 500",
2621                    },
2622                );
2623            }
2624
2625            if rate_limits.is_limited() {
2626                let was_enforced =
2627                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2628
2629                if was_enforced {
2630                    // Update the rate limits in the project cache.
2631                    self.inner
2632                        .project_cache
2633                        .get(scoping.project_key)
2634                        .rate_limits()
2635                        .merge(rate_limits);
2636                }
2637            }
2638        }
2639
2640        bucket_limiter.into_buckets()
2641    }
2642
2643    /// Cardinality limits the passed buckets and returns a filtered vector of only accepted buckets.
2644    #[cfg(feature = "processing")]
2645    async fn cardinality_limit_buckets(
2646        &self,
2647        scoping: Scoping,
2648        limits: &[CardinalityLimit],
2649        buckets: Vec<Bucket>,
2650    ) -> Vec<Bucket> {
2651        let global_config = self.inner.global_config.current();
2652        let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2653
2654        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2655            return buckets;
2656        }
2657
2658        let Some(ref limiter) = self.inner.cardinality_limiter else {
2659            return buckets;
2660        };
2661
2662        let scope = relay_cardinality::Scoping {
2663            organization_id: scoping.organization_id,
2664            project_id: scoping.project_id,
2665        };
2666
2667        let limits = match limiter
2668            .check_cardinality_limits(scope, limits, buckets)
2669            .await
2670        {
2671            Ok(limits) => limits,
2672            Err((buckets, error)) => {
2673                relay_log::error!(
2674                    error = &error as &dyn std::error::Error,
2675                    "cardinality limiter failed"
2676                );
2677                return buckets;
2678            }
2679        };
2680
2681        let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2682        if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2683            for limit in limits.exceeded_limits() {
2684                relay_log::with_scope(
2685                    |scope| {
2686                        // Set the organization as user so we can alert on distinct org_ids.
2687                        scope.set_user(Some(relay_log::sentry::User {
2688                            id: Some(scoping.organization_id.to_string()),
2689                            ..Default::default()
2690                        }));
2691                    },
2692                    || {
2693                        relay_log::error!(
2694                            tags.organization_id = scoping.organization_id.value(),
2695                            tags.limit_id = limit.id,
2696                            tags.passive = limit.passive,
2697                            "Cardinality Limit"
2698                        );
2699                    },
2700                );
2701            }
2702        }
2703
2704        for (limit, reports) in limits.cardinality_reports() {
2705            for report in reports {
2706                self.inner
2707                    .metric_outcomes
2708                    .cardinality(scoping, limit, report);
2709            }
2710        }
2711
2712        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2713            return limits.into_source();
2714        }
2715
2716        let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2717
2718        for (bucket, exceeded) in rejected {
2719            self.inner.metric_outcomes.track(
2720                scoping,
2721                &[bucket],
2722                Outcome::CardinalityLimited(exceeded.id.clone()),
2723            );
2724        }
2725        accepted
2726    }
2727
2728    /// Processes metric buckets and sends them to kafka.
2729    ///
2730    /// This function runs the following steps:
2731    ///  - cardinality limiting
2732    ///  - rate limiting
2733    ///  - submit to `StoreForwarder`
2734    #[cfg(feature = "processing")]
2735    async fn encode_metrics_processing(
2736        &self,
2737        message: FlushBuckets,
2738        store_forwarder: &Addr<Store>,
2739    ) {
2740        use crate::constants::DEFAULT_EVENT_RETENTION;
2741        use crate::services::store::StoreMetrics;
2742
2743        for ProjectBuckets {
2744            buckets,
2745            scoping,
2746            project_info,
2747            ..
2748        } in message.buckets.into_values()
2749        {
2750            let buckets = self
2751                .rate_limit_buckets(scoping, &project_info, buckets)
2752                .await;
2753
2754            let limits = project_info.get_cardinality_limits();
2755            let buckets = self
2756                .cardinality_limit_buckets(scoping, limits, buckets)
2757                .await;
2758
2759            if buckets.is_empty() {
2760                continue;
2761            }
2762
2763            let retention = project_info
2764                .config
2765                .event_retention
2766                .unwrap_or(DEFAULT_EVENT_RETENTION);
2767
2768            // The store forwarder takes care of bucket splitting internally, so we can submit the
2769            // entire list of buckets. There is no batching needed here.
2770            store_forwarder.send(StoreMetrics {
2771                buckets,
2772                scoping,
2773                retention,
2774            });
2775        }
2776    }
2777
2778    /// Serializes metric buckets to JSON and sends them to the upstream.
2779    ///
2780    /// This function runs the following steps:
2781    ///  - partitioning
2782    ///  - batching by configured size limit
2783    ///  - serialize to JSON and pack in an envelope
2784    ///  - submit the envelope to upstream or kafka depending on configuration
2785    ///
2786    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2787    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2788    /// already.
2789    fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2790        let FlushBuckets {
2791            partition_key,
2792            buckets,
2793        } = message;
2794
2795        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2796        let upstream = self.inner.config.upstream_descriptor();
2797
2798        for ProjectBuckets {
2799            buckets, scoping, ..
2800        } in buckets.values()
2801        {
2802            let dsn = PartialDsn::outbound(scoping, upstream);
2803
2804            relay_statsd::metric!(
2805                distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2806            );
2807
2808            let mut num_batches = 0;
2809            for batch in BucketsView::from(buckets).by_size(batch_size) {
2810                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2811
2812                let mut item = Item::new(ItemType::MetricBuckets);
2813                item.set_source_quantities(crate::metrics::extract_quantities(batch));
2814                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2815                envelope.add_item(item);
2816
2817                let mut envelope =
2818                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2819                envelope
2820                    .set_partition_key(Some(partition_key))
2821                    .scope(*scoping);
2822
2823                relay_statsd::metric!(
2824                    distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2825                );
2826
2827                self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2828                num_batches += 1;
2829            }
2830
2831            relay_statsd::metric!(
2832                distribution(RelayDistributions::BatchesPerPartition) = num_batches
2833            );
2834        }
2835    }
2836
2837    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
2838    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2839        if partition.is_empty() {
2840            return;
2841        }
2842
2843        let (unencoded, project_info) = partition.take();
2844        let http_encoding = self.inner.config.http_encoding();
2845        let encoded = match encode_payload(&unencoded, http_encoding) {
2846            Ok(payload) => payload,
2847            Err(error) => {
2848                let error = &error as &dyn std::error::Error;
2849                relay_log::error!(error, "failed to encode metrics payload");
2850                return;
2851            }
2852        };
2853
2854        let request = SendMetricsRequest {
2855            partition_key: partition_key.to_string(),
2856            unencoded,
2857            encoded,
2858            project_info,
2859            http_encoding,
2860            metric_outcomes: self.inner.metric_outcomes.clone(),
2861        };
2862
2863        self.inner.addrs.upstream_relay.send(SendRequest(request));
2864    }
2865
2866    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
2867    ///
2868    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
2869    /// payload directly instead of per-project Envelopes.
2870    ///
2871    /// This function runs the following steps:
2872    ///  - partitioning
2873    ///  - batching by configured size limit
2874    ///  - serialize to JSON
2875    ///  - submit directly to the upstream
2876    ///
2877    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2878    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2879    /// already.
2880    fn encode_metrics_global(&self, message: FlushBuckets) {
2881        let FlushBuckets {
2882            partition_key,
2883            buckets,
2884        } = message;
2885
2886        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2887        let mut partition = Partition::new(batch_size);
2888        let mut partition_splits = 0;
2889
2890        for ProjectBuckets {
2891            buckets, scoping, ..
2892        } in buckets.values()
2893        {
2894            for bucket in buckets {
2895                let mut remaining = Some(BucketView::new(bucket));
2896
2897                while let Some(bucket) = remaining.take() {
2898                    if let Some(next) = partition.insert(bucket, *scoping) {
2899                        // A part of the bucket could not be inserted. Take the partition and submit
2900                        // it immediately. Repeat until the final part was inserted. This should
2901                        // always result in a request, otherwise we would enter an endless loop.
2902                        self.send_global_partition(partition_key, &mut partition);
2903                        remaining = Some(next);
2904                        partition_splits += 1;
2905                    }
2906                }
2907            }
2908        }
2909
2910        if partition_splits > 0 {
2911            metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2912        }
2913
2914        self.send_global_partition(partition_key, &mut partition);
2915    }
2916
2917    async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2918        for (project_key, pb) in message.buckets.iter_mut() {
2919            let buckets = std::mem::take(&mut pb.buckets);
2920            pb.buckets =
2921                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2922        }
2923
2924        #[cfg(feature = "processing")]
2925        if self.inner.config.processing_enabled()
2926            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2927        {
2928            return self
2929                .encode_metrics_processing(message, store_forwarder)
2930                .await;
2931        }
2932
2933        if self.inner.config.http_global_metrics() {
2934            self.encode_metrics_global(message)
2935        } else {
2936            self.encode_metrics_envelope(cogs, message)
2937        }
2938    }
2939
2940    #[cfg(all(test, feature = "processing"))]
2941    fn redis_rate_limiter_enabled(&self) -> bool {
2942        self.inner.rate_limiter.is_some()
2943    }
2944
2945    async fn handle_message(&self, message: EnvelopeProcessor) {
2946        let ty = message.variant();
2947        let feature_weights = self.feature_weights(&message);
2948
2949        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2950            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2951
2952            match message {
2953                EnvelopeProcessor::ProcessEnvelope(m) => {
2954                    self.handle_process_envelope(&mut cogs, *m).await
2955                }
2956                EnvelopeProcessor::ProcessProjectMetrics(m) => {
2957                    self.handle_process_metrics(&mut cogs, *m)
2958                }
2959                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2960                    self.handle_process_batched_metrics(&mut cogs, *m)
2961                }
2962                EnvelopeProcessor::FlushBuckets(m) => {
2963                    self.handle_flush_buckets(&mut cogs, *m).await
2964                }
2965                EnvelopeProcessor::SubmitClientReports(m) => {
2966                    self.handle_submit_client_reports(&mut cogs, *m)
2967                }
2968            }
2969        });
2970    }
2971
2972    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2973        match message {
2974            // Envelope is split later and tokens are attributed then.
2975            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2976            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2977            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2978            EnvelopeProcessor::FlushBuckets(v) => v
2979                .buckets
2980                .values()
2981                .map(|s| {
2982                    if self.inner.config.processing_enabled() {
2983                        // Processing does not encode the metrics but instead rate and cardinality
2984                        // limits the metrics, which scales by count and not size.
2985                        relay_metrics::cogs::ByCount(&s.buckets).into()
2986                    } else {
2987                        relay_metrics::cogs::BySize(&s.buckets).into()
2988                    }
2989                })
2990                .fold(FeatureWeights::none(), FeatureWeights::merge),
2991            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2992        }
2993    }
2994}
2995
2996impl Service for EnvelopeProcessorService {
2997    type Interface = EnvelopeProcessor;
2998
2999    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
3000        while let Some(message) = rx.recv().await {
3001            let service = self.clone();
3002            self.inner
3003                .pool
3004                .spawn_async(
3005                    async move {
3006                        service.handle_message(message).await;
3007                    }
3008                    .boxed(),
3009                )
3010                .await;
3011        }
3012    }
3013}
3014
3015/// Result of the enforcement of rate limiting.
3016///
3017/// If the event is already `None` or it's rate limited, it will be `None`
3018/// within the [`Annotated`].
3019struct EnforcementResult {
3020    event: Annotated<Event>,
3021    #[cfg_attr(not(feature = "processing"), expect(dead_code))]
3022    rate_limits: RateLimits,
3023}
3024
3025impl EnforcementResult {
3026    /// Creates a new [`EnforcementResult`].
3027    pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
3028        Self { event, rate_limits }
3029    }
3030}
3031
3032#[derive(Clone)]
3033enum RateLimiter {
3034    Cached,
3035    #[cfg(feature = "processing")]
3036    Consistent(Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>),
3037}
3038
3039impl RateLimiter {
3040    async fn enforce<Group>(
3041        &self,
3042        managed_envelope: &mut TypedEnvelope<Group>,
3043        event: Annotated<Event>,
3044        _extracted_metrics: &mut ProcessingExtractedMetrics,
3045        ctx: processing::Context<'_>,
3046    ) -> Result<EnforcementResult, ProcessingError> {
3047        if managed_envelope.envelope().is_empty() && event.value().is_none() {
3048            return Ok(EnforcementResult::new(event, RateLimits::default()));
3049        }
3050
3051        let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
3052        if quotas.is_empty() {
3053            return Ok(EnforcementResult::new(event, RateLimits::default()));
3054        }
3055
3056        let event_category = event_category(&event);
3057
3058        // We extract the rate limiters, in case we perform consistent rate limiting, since we will
3059        // need Redis access.
3060        //
3061        // When invoking the rate limiter, capture if the event item has been rate limited to also
3062        // remove it from the processing state eventually.
3063        let this = self.clone();
3064        let mut envelope_limiter =
3065            EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
3066                let this = this.clone();
3067
3068                async move {
3069                    match this {
3070                        #[cfg(feature = "processing")]
3071                        RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
3072                            rate_limiter
3073                                .is_rate_limited(quotas, item_scope, _quantity, false)
3074                                .await?,
3075                        ),
3076                        _ => Ok::<_, ProcessingError>(
3077                            ctx.rate_limits.check_with_quotas(quotas, item_scope),
3078                        ),
3079                    }
3080                }
3081            });
3082
3083        // Tell the envelope limiter about the event, since it has been removed from the Envelope at
3084        // this stage in processing.
3085        if let Some(category) = event_category {
3086            envelope_limiter.assume_event(category);
3087        }
3088
3089        let scoping = managed_envelope.scoping();
3090        let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
3091            envelope_limiter
3092                .compute(managed_envelope.envelope_mut(), &scoping)
3093                .await
3094        })?;
3095        let event_active = enforcement.is_event_active();
3096
3097        // Use the same rate limits as used for the envelope on the metrics.
3098        // Those rate limits should not be checked for expiry or similar to ensure a consistent
3099        // limiting of envelope items and metrics.
3100        #[cfg(feature = "processing")]
3101        _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3102        enforcement.apply_with_outcomes(managed_envelope);
3103
3104        if event_active {
3105            debug_assert!(managed_envelope.envelope().is_empty());
3106            return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3107        }
3108
3109        Ok(EnforcementResult::new(event, rate_limits))
3110    }
3111
3112    fn name(&self) -> &'static str {
3113        match self {
3114            Self::Cached => "cached",
3115            #[cfg(feature = "processing")]
3116            Self::Consistent(_) => "consistent",
3117        }
3118    }
3119}
3120
3121pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3122    let envelope_body: Vec<u8> = match http_encoding {
3123        HttpEncoding::Identity => return Ok(body.clone()),
3124        HttpEncoding::Deflate => {
3125            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3126            encoder.write_all(body.as_ref())?;
3127            encoder.finish()?
3128        }
3129        HttpEncoding::Gzip => {
3130            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3131            encoder.write_all(body.as_ref())?;
3132            encoder.finish()?
3133        }
3134        HttpEncoding::Br => {
3135            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
3136            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3137            encoder.write_all(body.as_ref())?;
3138            encoder.into_inner()
3139        }
3140        HttpEncoding::Zstd => {
3141            // Use the fastest compression level, our main objective here is to get the best
3142            // compression ratio for least amount of time spent.
3143            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3144            encoder.write_all(body.as_ref())?;
3145            encoder.finish()?
3146        }
3147    };
3148
3149    Ok(envelope_body.into())
3150}
3151
3152/// An upstream request that submits an envelope via HTTP.
3153#[derive(Debug)]
3154pub struct SendEnvelope {
3155    pub envelope: TypedEnvelope<Processed>,
3156    pub body: Bytes,
3157    pub http_encoding: HttpEncoding,
3158    pub project_cache: ProjectCacheHandle,
3159}
3160
3161impl UpstreamRequest for SendEnvelope {
3162    fn method(&self) -> reqwest::Method {
3163        reqwest::Method::POST
3164    }
3165
3166    fn path(&self) -> Cow<'_, str> {
3167        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3168    }
3169
3170    fn route(&self) -> &'static str {
3171        "envelope"
3172    }
3173
3174    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3175        let envelope_body = self.body.clone();
3176        metric!(
3177            distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
3178        );
3179
3180        let meta = &self.envelope.meta();
3181        let shard = self.envelope.partition_key().map(|p| p.to_string());
3182        builder
3183            .content_encoding(self.http_encoding)
3184            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3185            .header_opt("User-Agent", meta.user_agent())
3186            .header("X-Sentry-Auth", meta.auth_header())
3187            .header("X-Forwarded-For", meta.forwarded_for())
3188            .header("Content-Type", envelope::CONTENT_TYPE)
3189            .header_opt("X-Sentry-Relay-Shard", shard)
3190            .body(envelope_body);
3191
3192        Ok(())
3193    }
3194
3195    fn sign(&mut self) -> Option<Sign> {
3196        Some(Sign::Optional(SignatureType::RequestSign))
3197    }
3198
3199    fn respond(
3200        self: Box<Self>,
3201        result: Result<http::Response, UpstreamRequestError>,
3202    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3203        Box::pin(async move {
3204            let result = match result {
3205                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3206                Err(error) => Err(error),
3207            };
3208
3209            match result {
3210                Ok(()) => self.envelope.accept(),
3211                Err(error) if error.is_received() => {
3212                    let scoping = self.envelope.scoping();
3213                    self.envelope.accept();
3214
3215                    if let UpstreamRequestError::RateLimited(limits) = error {
3216                        self.project_cache
3217                            .get(scoping.project_key)
3218                            .rate_limits()
3219                            .merge(limits.scope(&scoping));
3220                    }
3221                }
3222                Err(error) => {
3223                    // Errors are only logged for what we consider an internal discard reason. These
3224                    // indicate errors in the infrastructure or implementation bugs.
3225                    let mut envelope = self.envelope;
3226                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3227                    relay_log::error!(
3228                        error = &error as &dyn Error,
3229                        tags.project_key = %envelope.scoping().project_key,
3230                        "error sending envelope"
3231                    );
3232                }
3233            }
3234        })
3235    }
3236}
3237
3238/// A container for metric buckets from multiple projects.
3239///
3240/// This container is used to send metrics to the upstream in global batches as part of the
3241/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
3242/// the size of all metrics and allows to split them into multiple batches. See
3243/// [`insert`](Self::insert) for more information.
3244#[derive(Debug)]
3245struct Partition<'a> {
3246    max_size: usize,
3247    remaining: usize,
3248    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3249    project_info: HashMap<ProjectKey, Scoping>,
3250}
3251
3252impl<'a> Partition<'a> {
3253    /// Creates a new partition with the given maximum size in bytes.
3254    pub fn new(size: usize) -> Self {
3255        Self {
3256            max_size: size,
3257            remaining: size,
3258            views: HashMap::new(),
3259            project_info: HashMap::new(),
3260        }
3261    }
3262
3263    /// Inserts a bucket into the partition, splitting it if necessary.
3264    ///
3265    /// This function attempts to add the bucket to this partition. If the bucket does not fit
3266    /// entirely into the partition given its maximum size, the remaining part of the bucket is
3267    /// returned from this function call.
3268    ///
3269    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
3270    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
3271    /// partition. Afterwards, the caller is responsible to call this function again with the
3272    /// remaining bucket until it is fully inserted.
3273    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3274        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3275
3276        if let Some(current) = current {
3277            self.remaining = self.remaining.saturating_sub(current.estimated_size());
3278            self.views
3279                .entry(scoping.project_key)
3280                .or_default()
3281                .push(current);
3282
3283            self.project_info
3284                .entry(scoping.project_key)
3285                .or_insert(scoping);
3286        }
3287
3288        next
3289    }
3290
3291    /// Returns `true` if the partition does not hold any data.
3292    fn is_empty(&self) -> bool {
3293        self.views.is_empty()
3294    }
3295
3296    /// Returns the serialized buckets for this partition.
3297    ///
3298    /// This empties the partition, so that it can be reused.
3299    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3300        #[derive(serde::Serialize)]
3301        struct Wrapper<'a> {
3302            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3303        }
3304
3305        let buckets = &self.views;
3306        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3307
3308        let scopings = self.project_info.clone();
3309        self.project_info.clear();
3310
3311        self.views.clear();
3312        self.remaining = self.max_size;
3313
3314        (payload, scopings)
3315    }
3316}
3317
3318/// An upstream request that submits metric buckets via HTTP.
3319///
3320/// This request is not awaited. It automatically tracks outcomes if the request is not received.
3321#[derive(Debug)]
3322struct SendMetricsRequest {
3323    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
3324    partition_key: String,
3325    /// Serialized metric buckets without encoding applied, used for signing.
3326    unencoded: Bytes,
3327    /// Serialized metric buckets with the stated HTTP encoding applied.
3328    encoded: Bytes,
3329    /// Mapping of all contained project keys to their scoping and extraction mode.
3330    ///
3331    /// Used to track outcomes for transmission failures.
3332    project_info: HashMap<ProjectKey, Scoping>,
3333    /// Encoding (compression) of the payload.
3334    http_encoding: HttpEncoding,
3335    /// Metric outcomes instance to send outcomes on error.
3336    metric_outcomes: MetricOutcomes,
3337}
3338
3339impl SendMetricsRequest {
3340    fn create_error_outcomes(self) {
3341        #[derive(serde::Deserialize)]
3342        struct Wrapper {
3343            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3344        }
3345
3346        let buckets = match serde_json::from_slice(&self.unencoded) {
3347            Ok(Wrapper { buckets }) => buckets,
3348            Err(err) => {
3349                relay_log::error!(
3350                    error = &err as &dyn std::error::Error,
3351                    "failed to parse buckets from failed transmission"
3352                );
3353                return;
3354            }
3355        };
3356
3357        for (key, buckets) in buckets {
3358            let Some(&scoping) = self.project_info.get(&key) else {
3359                relay_log::error!("missing scoping for project key");
3360                continue;
3361            };
3362
3363            self.metric_outcomes.track(
3364                scoping,
3365                &buckets,
3366                Outcome::Invalid(DiscardReason::Internal),
3367            );
3368        }
3369    }
3370}
3371
3372impl UpstreamRequest for SendMetricsRequest {
3373    fn set_relay_id(&self) -> bool {
3374        true
3375    }
3376
3377    fn sign(&mut self) -> Option<Sign> {
3378        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3379    }
3380
3381    fn method(&self) -> reqwest::Method {
3382        reqwest::Method::POST
3383    }
3384
3385    fn path(&self) -> Cow<'_, str> {
3386        "/api/0/relays/metrics/".into()
3387    }
3388
3389    fn route(&self) -> &'static str {
3390        "global_metrics"
3391    }
3392
3393    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3394        metric!(
3395            distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3396        );
3397
3398        builder
3399            .content_encoding(self.http_encoding)
3400            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3401            .header(header::CONTENT_TYPE, b"application/json")
3402            .body(self.encoded.clone());
3403
3404        Ok(())
3405    }
3406
3407    fn respond(
3408        self: Box<Self>,
3409        result: Result<http::Response, UpstreamRequestError>,
3410    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3411        Box::pin(async {
3412            match result {
3413                Ok(mut response) => {
3414                    response.consume().await.ok();
3415                }
3416                Err(error) => {
3417                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3418
3419                    // If the request did not arrive at the upstream, we are responsible for outcomes.
3420                    // Otherwise, the upstream is responsible to log outcomes.
3421                    if error.is_received() {
3422                        return;
3423                    }
3424
3425                    self.create_error_outcomes()
3426                }
3427            }
3428        })
3429    }
3430}
3431
3432/// Container for global and project level [`Quota`].
3433#[derive(Copy, Clone, Debug)]
3434struct CombinedQuotas<'a> {
3435    global_quotas: &'a [Quota],
3436    project_quotas: &'a [Quota],
3437}
3438
3439impl<'a> CombinedQuotas<'a> {
3440    /// Returns a new [`CombinedQuotas`].
3441    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3442        Self {
3443            global_quotas: &global_config.quotas,
3444            project_quotas,
3445        }
3446    }
3447
3448    /// Returns `true` if both global quotas and project quotas are empty.
3449    pub fn is_empty(&self) -> bool {
3450        self.len() == 0
3451    }
3452
3453    /// Returns the number of both global and project quotas.
3454    pub fn len(&self) -> usize {
3455        self.global_quotas.len() + self.project_quotas.len()
3456    }
3457}
3458
3459impl<'a> IntoIterator for CombinedQuotas<'a> {
3460    type Item = &'a Quota;
3461    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3462
3463    fn into_iter(self) -> Self::IntoIter {
3464        self.global_quotas.iter().chain(self.project_quotas.iter())
3465    }
3466}
3467
3468#[cfg(test)]
3469mod tests {
3470    use std::collections::BTreeMap;
3471
3472    use insta::assert_debug_snapshot;
3473    use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3474    use relay_common::glob2::LazyGlob;
3475    use relay_dynamic_config::ProjectConfig;
3476    use relay_event_normalization::{
3477        MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3478        TransactionNameRule,
3479    };
3480    use relay_event_schema::protocol::TransactionSource;
3481    use relay_pii::DataScrubbingConfig;
3482    use similar_asserts::assert_eq;
3483
3484    use crate::metrics_extraction::IntoMetric;
3485    use crate::metrics_extraction::transactions::types::{
3486        CommonTags, TransactionMeasurementTags, TransactionMetric,
3487    };
3488    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3489
3490    #[cfg(feature = "processing")]
3491    use {
3492        relay_metrics::BucketValue,
3493        relay_quotas::{QuotaScope, ReasonCode},
3494        relay_test::mock_service,
3495    };
3496
3497    use super::*;
3498
3499    #[cfg(feature = "processing")]
3500    fn mock_quota(id: &str) -> Quota {
3501        Quota {
3502            id: Some(id.into()),
3503            categories: [DataCategory::MetricBucket].into(),
3504            scope: QuotaScope::Organization,
3505            scope_id: None,
3506            limit: Some(0),
3507            window: None,
3508            reason_code: None,
3509            namespace: None,
3510        }
3511    }
3512
3513    #[cfg(feature = "processing")]
3514    #[test]
3515    fn test_dynamic_quotas() {
3516        let global_config = GlobalConfig {
3517            quotas: vec![mock_quota("foo"), mock_quota("bar")],
3518            ..Default::default()
3519        };
3520
3521        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3522
3523        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3524
3525        assert_eq!(dynamic_quotas.len(), 4);
3526        assert!(!dynamic_quotas.is_empty());
3527
3528        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3529        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3530    }
3531
3532    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
3533    /// also ratelimit the next batches in the same message automatically.
3534    #[cfg(feature = "processing")]
3535    #[tokio::test]
3536    async fn test_ratelimit_per_batch() {
3537        use relay_base_schema::organization::OrganizationId;
3538        use relay_protocol::FiniteF64;
3539
3540        let rate_limited_org = Scoping {
3541            organization_id: OrganizationId::new(1),
3542            project_id: ProjectId::new(21),
3543            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3544            key_id: Some(17),
3545        };
3546
3547        let not_rate_limited_org = Scoping {
3548            organization_id: OrganizationId::new(2),
3549            project_id: ProjectId::new(21),
3550            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3551            key_id: Some(17),
3552        };
3553
3554        let message = {
3555            let project_info = {
3556                let quota = Quota {
3557                    id: Some("testing".into()),
3558                    categories: [DataCategory::MetricBucket].into(),
3559                    scope: relay_quotas::QuotaScope::Organization,
3560                    scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3561                    limit: Some(0),
3562                    window: None,
3563                    reason_code: Some(ReasonCode::new("test")),
3564                    namespace: None,
3565                };
3566
3567                let mut config = ProjectConfig::default();
3568                config.quotas.push(quota);
3569
3570                Arc::new(ProjectInfo {
3571                    config,
3572                    ..Default::default()
3573                })
3574            };
3575
3576            let project_metrics = |scoping| ProjectBuckets {
3577                buckets: vec![Bucket {
3578                    name: "d:transactions/bar".into(),
3579                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3580                    timestamp: UnixTimestamp::now(),
3581                    tags: Default::default(),
3582                    width: 10,
3583                    metadata: BucketMetadata::default(),
3584                }],
3585                rate_limits: Default::default(),
3586                project_info: project_info.clone(),
3587                scoping,
3588            };
3589
3590            let buckets = hashbrown::HashMap::from([
3591                (
3592                    rate_limited_org.project_key,
3593                    project_metrics(rate_limited_org),
3594                ),
3595                (
3596                    not_rate_limited_org.project_key,
3597                    project_metrics(not_rate_limited_org),
3598                ),
3599            ]);
3600
3601            FlushBuckets {
3602                partition_key: 0,
3603                buckets,
3604            }
3605        };
3606
3607        // ensure the order of the map while iterating is as expected.
3608        assert_eq!(message.buckets.keys().count(), 2);
3609
3610        let config = {
3611            let config_json = serde_json::json!({
3612                "processing": {
3613                    "enabled": true,
3614                    "kafka_config": [],
3615                    "redis": {
3616                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3617                    }
3618                }
3619            });
3620            Config::from_json_value(config_json).unwrap()
3621        };
3622
3623        let (store, handle) = {
3624            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3625                let org_id = match msg {
3626                    Store::Metrics(x) => x.scoping.organization_id,
3627                    _ => panic!("received envelope when expecting only metrics"),
3628                };
3629                org_ids.push(org_id);
3630            };
3631
3632            mock_service("store_forwarder", vec![], f)
3633        };
3634
3635        let processor = create_test_processor(config).await;
3636        assert!(processor.redis_rate_limiter_enabled());
3637
3638        processor.encode_metrics_processing(message, &store).await;
3639
3640        drop(store);
3641        let orgs_not_ratelimited = handle.await.unwrap();
3642
3643        assert_eq!(
3644            orgs_not_ratelimited,
3645            vec![not_rate_limited_org.organization_id]
3646        );
3647    }
3648
3649    #[tokio::test]
3650    async fn test_browser_version_extraction_with_pii_like_data() {
3651        let processor = create_test_processor(Default::default()).await;
3652        let outcome_aggregator = Addr::dummy();
3653        let event_id = EventId::new();
3654
3655        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3656            .parse()
3657            .unwrap();
3658
3659        let request_meta = RequestMeta::new(dsn);
3660        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3661
3662        envelope.add_item({
3663                let mut item = Item::new(ItemType::Event);
3664                item.set_payload(
3665                    ContentType::Json,
3666                    r#"
3667                    {
3668                        "request": {
3669                            "headers": [
3670                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3671                            ]
3672                        }
3673                    }
3674                "#,
3675                );
3676                item
3677            });
3678
3679        let mut datascrubbing_settings = DataScrubbingConfig::default();
3680        // enable all the default scrubbing
3681        datascrubbing_settings.scrub_data = true;
3682        datascrubbing_settings.scrub_defaults = true;
3683        datascrubbing_settings.scrub_ip_addresses = true;
3684
3685        // Make sure to mask any IP-like looking data
3686        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3687
3688        let config = ProjectConfig {
3689            datascrubbing_settings,
3690            pii_config: Some(pii_config),
3691            ..Default::default()
3692        };
3693
3694        let project_info = ProjectInfo {
3695            config,
3696            ..Default::default()
3697        };
3698
3699        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3700        assert_eq!(envelopes.len(), 1);
3701
3702        let (group, envelope) = envelopes.pop().unwrap();
3703        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3704
3705        let message = ProcessEnvelopeGrouped {
3706            group,
3707            envelope,
3708            ctx: processing::Context {
3709                project_info: &project_info,
3710                ..processing::Context::for_test()
3711            },
3712        };
3713
3714        let Ok(Some(Submit::Envelope(mut new_envelope))) =
3715            processor.process(&mut Token::noop(), message).await
3716        else {
3717            panic!();
3718        };
3719        let new_envelope = new_envelope.envelope_mut();
3720
3721        let event_item = new_envelope.items().last().unwrap();
3722        let annotated_event: Annotated<Event> =
3723            Annotated::from_json_bytes(&event_item.payload()).unwrap();
3724        let event = annotated_event.into_value().unwrap();
3725        let headers = event
3726            .request
3727            .into_value()
3728            .unwrap()
3729            .headers
3730            .into_value()
3731            .unwrap();
3732
3733        // IP-like data must be masked
3734        assert_eq!(
3735            Some(
3736                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3737            ),
3738            headers.get_header("User-Agent")
3739        );
3740        // But we still get correct browser and version number
3741        let contexts = event.contexts.into_value().unwrap();
3742        let browser = contexts.0.get("browser").unwrap();
3743        assert_eq!(
3744            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3745            browser.to_json().unwrap()
3746        );
3747    }
3748
3749    #[tokio::test]
3750    #[cfg(feature = "processing")]
3751    async fn test_materialize_dsc() {
3752        use crate::services::projects::project::PublicKeyConfig;
3753
3754        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3755            .parse()
3756            .unwrap();
3757        let request_meta = RequestMeta::new(dsn);
3758        let mut envelope = Envelope::from_request(None, request_meta);
3759
3760        let dsc = r#"{
3761            "trace_id": "00000000-0000-0000-0000-000000000000",
3762            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3763            "sample_rate": "0.2"
3764        }"#;
3765        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3766
3767        let mut item = Item::new(ItemType::Event);
3768        item.set_payload(ContentType::Json, r#"{}"#);
3769        envelope.add_item(item);
3770
3771        let outcome_aggregator = Addr::dummy();
3772        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3773
3774        let mut project_info = ProjectInfo::default();
3775        project_info.public_keys.push(PublicKeyConfig {
3776            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3777            numeric_id: Some(1),
3778        });
3779
3780        let config = serde_json::json!({
3781            "processing": {
3782                "enabled": true,
3783                "kafka_config": [],
3784            }
3785        });
3786
3787        let message = ProcessEnvelopeGrouped {
3788            group: ProcessingGroup::Transaction,
3789            envelope: managed_envelope,
3790            ctx: processing::Context {
3791                config: &Config::from_json_value(config.clone()).unwrap(),
3792                project_info: &project_info,
3793                sampling_project_info: Some(&project_info),
3794                ..processing::Context::for_test()
3795            },
3796        };
3797
3798        let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3799        let Ok(Some(Submit::Envelope(envelope))) =
3800            processor.process(&mut Token::noop(), message).await
3801        else {
3802            panic!();
3803        };
3804        let event = envelope
3805            .envelope()
3806            .get_item_by(|item| item.ty() == &ItemType::Event)
3807            .unwrap();
3808
3809        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3810        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3811        Object(
3812            {
3813                "environment": ~,
3814                "public_key": String(
3815                    "e12d836b15bb49d7bbf99e64295d995b",
3816                ),
3817                "release": ~,
3818                "replay_id": ~,
3819                "sample_rate": String(
3820                    "0.2",
3821                ),
3822                "trace_id": String(
3823                    "00000000000000000000000000000000",
3824                ),
3825                "transaction": ~,
3826            },
3827        )
3828        "###);
3829    }
3830
3831    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3832        let mut event = Annotated::<Event>::from_json(
3833            r#"
3834            {
3835                "type": "transaction",
3836                "transaction": "/foo/",
3837                "timestamp": 946684810.0,
3838                "start_timestamp": 946684800.0,
3839                "contexts": {
3840                    "trace": {
3841                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3842                        "span_id": "fa90fdead5f74053",
3843                        "op": "http.server",
3844                        "type": "trace"
3845                    }
3846                },
3847                "transaction_info": {
3848                    "source": "url"
3849                }
3850            }
3851            "#,
3852        )
3853        .unwrap();
3854        let e = event.value_mut().as_mut().unwrap();
3855        e.transaction.set_value(Some(transaction_name.into()));
3856
3857        e.transaction_info
3858            .value_mut()
3859            .as_mut()
3860            .unwrap()
3861            .source
3862            .set_value(Some(source));
3863
3864        relay_statsd::with_capturing_test_client(|| {
3865            utils::log_transaction_name_metrics(&mut event, |event| {
3866                let config = NormalizationConfig {
3867                    transaction_name_config: TransactionNameConfig {
3868                        rules: &[TransactionNameRule {
3869                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
3870                            expiry: DateTime::<Utc>::MAX_UTC,
3871                            redaction: RedactionRule::Replace {
3872                                substitution: "*".to_owned(),
3873                            },
3874                        }],
3875                    },
3876                    ..Default::default()
3877                };
3878                relay_event_normalization::normalize_event(event, &config)
3879            });
3880        })
3881    }
3882
3883    #[test]
3884    fn test_log_transaction_metrics_none() {
3885        let captures = capture_test_event("/nothing", TransactionSource::Url);
3886        insta::assert_debug_snapshot!(captures, @r###"
3887        [
3888            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3889        ]
3890        "###);
3891    }
3892
3893    #[test]
3894    fn test_log_transaction_metrics_rule() {
3895        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3896        insta::assert_debug_snapshot!(captures, @r###"
3897        [
3898            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3899        ]
3900        "###);
3901    }
3902
3903    #[test]
3904    fn test_log_transaction_metrics_pattern() {
3905        let captures = capture_test_event("/something/12345", TransactionSource::Url);
3906        insta::assert_debug_snapshot!(captures, @r###"
3907        [
3908            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3909        ]
3910        "###);
3911    }
3912
3913    #[test]
3914    fn test_log_transaction_metrics_both() {
3915        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3916        insta::assert_debug_snapshot!(captures, @r###"
3917        [
3918            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3919        ]
3920        "###);
3921    }
3922
3923    #[test]
3924    fn test_log_transaction_metrics_no_match() {
3925        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3926        insta::assert_debug_snapshot!(captures, @r###"
3927        [
3928            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3929        ]
3930        "###);
3931    }
3932
3933    /// Confirms that the hardcoded value we use for the fixed length of the measurement MRI is
3934    /// correct. Unit test is placed here because it has dependencies to relay-server and therefore
3935    /// cannot be called from relay-metrics.
3936    #[test]
3937    fn test_mri_overhead_constant() {
3938        let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
3939
3940        let derived_value = {
3941            let name = "foobar".to_owned();
3942            let value = 5.into(); // Arbitrary value.
3943            let unit = MetricUnit::Duration(DurationUnit::default());
3944            let tags = TransactionMeasurementTags {
3945                measurement_rating: None,
3946                universal_tags: CommonTags(BTreeMap::new()),
3947                score_profile_version: None,
3948            };
3949
3950            let measurement = TransactionMetric::Measurement {
3951                name: name.clone(),
3952                value,
3953                unit,
3954                tags,
3955            };
3956
3957            let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
3958            metric.name.len() - unit.to_string().len() - name.len()
3959        };
3960        assert_eq!(
3961            hardcoded_value, derived_value,
3962            "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
3963        );
3964    }
3965
3966    #[tokio::test]
3967    async fn test_process_metrics_bucket_metadata() {
3968        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3969        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3970        let received_at = Utc::now();
3971        let config = Config::default();
3972
3973        let (aggregator, mut aggregator_rx) = Addr::custom();
3974        let processor = create_test_processor_with_addrs(
3975            config,
3976            Addrs {
3977                aggregator,
3978                ..Default::default()
3979            },
3980        )
3981        .await;
3982
3983        let mut item = Item::new(ItemType::Statsd);
3984        item.set_payload(
3985            ContentType::Text,
3986            "transactions/foo:3182887624:4267882815|s",
3987        );
3988        for (source, expected_received_at) in [
3989            (
3990                BucketSource::External,
3991                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3992            ),
3993            (BucketSource::Internal, None),
3994        ] {
3995            let message = ProcessMetrics {
3996                data: MetricData::Raw(vec![item.clone()]),
3997                project_key,
3998                source,
3999                received_at,
4000                sent_at: Some(Utc::now()),
4001            };
4002            processor.handle_process_metrics(&mut token, message);
4003
4004            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
4005            let buckets = merge_buckets.buckets;
4006            assert_eq!(buckets.len(), 1);
4007            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
4008        }
4009    }
4010
4011    #[tokio::test]
4012    async fn test_process_batched_metrics() {
4013        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
4014        let received_at = Utc::now();
4015        let config = Config::default();
4016
4017        let (aggregator, mut aggregator_rx) = Addr::custom();
4018        let processor = create_test_processor_with_addrs(
4019            config,
4020            Addrs {
4021                aggregator,
4022                ..Default::default()
4023            },
4024        )
4025        .await;
4026
4027        let payload = r#"{
4028    "buckets": {
4029        "11111111111111111111111111111111": [
4030            {
4031                "timestamp": 1615889440,
4032                "width": 0,
4033                "name": "d:custom/endpoint.response_time@millisecond",
4034                "type": "d",
4035                "value": [
4036                  68.0
4037                ],
4038                "tags": {
4039                  "route": "user_index"
4040                }
4041            }
4042        ],
4043        "22222222222222222222222222222222": [
4044            {
4045                "timestamp": 1615889440,
4046                "width": 0,
4047                "name": "d:custom/endpoint.cache_rate@none",
4048                "type": "d",
4049                "value": [
4050                  36.0
4051                ]
4052            }
4053        ]
4054    }
4055}
4056"#;
4057        let message = ProcessBatchedMetrics {
4058            payload: Bytes::from(payload),
4059            source: BucketSource::Internal,
4060            received_at,
4061            sent_at: Some(Utc::now()),
4062        };
4063        processor.handle_process_batched_metrics(&mut token, message);
4064
4065        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
4066        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
4067
4068        let mut messages = vec![mb1, mb2];
4069        messages.sort_by_key(|mb| mb.project_key);
4070
4071        let actual = messages
4072            .into_iter()
4073            .map(|mb| (mb.project_key, mb.buckets))
4074            .collect::<Vec<_>>();
4075
4076        assert_debug_snapshot!(actual, @r###"
4077        [
4078            (
4079                ProjectKey("11111111111111111111111111111111"),
4080                [
4081                    Bucket {
4082                        timestamp: UnixTimestamp(1615889440),
4083                        width: 0,
4084                        name: MetricName(
4085                            "d:custom/endpoint.response_time@millisecond",
4086                        ),
4087                        value: Distribution(
4088                            [
4089                                68.0,
4090                            ],
4091                        ),
4092                        tags: {
4093                            "route": "user_index",
4094                        },
4095                        metadata: BucketMetadata {
4096                            merges: 1,
4097                            received_at: None,
4098                            extracted_from_indexed: false,
4099                        },
4100                    },
4101                ],
4102            ),
4103            (
4104                ProjectKey("22222222222222222222222222222222"),
4105                [
4106                    Bucket {
4107                        timestamp: UnixTimestamp(1615889440),
4108                        width: 0,
4109                        name: MetricName(
4110                            "d:custom/endpoint.cache_rate@none",
4111                        ),
4112                        value: Distribution(
4113                            [
4114                                36.0,
4115                            ],
4116                        ),
4117                        tags: {},
4118                        metadata: BucketMetadata {
4119                            merges: 1,
4120                            received_at: None,
4121                            extracted_from_indexed: false,
4122                        },
4123                    },
4124                ],
4125            ),
4126        ]
4127        "###);
4128    }
4129}