relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27    ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42    self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::{Integration, SpansIntegration};
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::ExtractedMetrics;
49use crate::metrics_extraction::transactions::types::ExtractMetricsError;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::profile_chunks::ProfileChunksProcessor;
53use crate::processing::sessions::SessionsProcessor;
54use crate::processing::spans::SpansProcessor;
55use crate::processing::trace_attachments::TraceAttachmentsProcessor;
56use crate::processing::trace_metrics::TraceMetricsProcessor;
57use crate::processing::transactions::extraction::ExtractMetricsContext;
58use crate::processing::utils::event::{
59    EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
60    event_type,
61};
62use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
63use crate::service::ServiceError;
64use crate::services::global_config::GlobalConfigHandle;
65use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
66use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
67use crate::services::projects::cache::ProjectCacheHandle;
68use crate::services::projects::project::{ProjectInfo, ProjectState};
69use crate::services::upstream::{
70    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
71};
72use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
73use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
74use crate::{http, processing};
75use relay_threading::AsyncPool;
76#[cfg(feature = "processing")]
77use {
78    crate::services::global_rate_limits::{GlobalRateLimits, GlobalRateLimitsServiceHandle},
79    crate::services::processor::nnswitch::SwitchProcessingError,
80    crate::services::store::{Store, StoreEnvelope},
81    crate::services::upload::Upload,
82    crate::utils::Enforcement,
83    itertools::Itertools,
84    relay_cardinality::{
85        CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
86        RedisSetLimiterOptions,
87    },
88    relay_dynamic_config::CardinalityLimiterMode,
89    relay_quotas::{RateLimitingError, RedisRateLimiter},
90    relay_redis::{AsyncRedisClient, RedisClients},
91    std::time::Instant,
92    symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
93};
94
95mod attachment;
96mod dynamic_sampling;
97mod event;
98mod metrics;
99mod nel;
100mod profile;
101mod replay;
102mod report;
103mod span;
104
105#[cfg(all(sentry, feature = "processing"))]
106mod playstation;
107mod standalone;
108#[cfg(feature = "processing")]
109mod unreal;
110
111#[cfg(feature = "processing")]
112mod nnswitch;
113
114/// Creates the block only if used with `processing` feature.
115///
116/// Provided code block will be executed only if the provided config has `processing_enabled` set.
117macro_rules! if_processing {
118    ($config:expr, $if_true:block) => {
119        #[cfg(feature = "processing")] {
120            if $config.processing_enabled() $if_true
121        }
122    };
123    ($config:expr, $if_true:block else $if_false:block) => {
124        {
125            #[cfg(feature = "processing")] {
126                if $config.processing_enabled() $if_true else $if_false
127            }
128            #[cfg(not(feature = "processing"))] {
129                $if_false
130            }
131        }
132    };
133}
134
135/// The minimum clock drift for correction to apply.
136pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
137
138#[derive(Debug)]
139pub struct GroupTypeError;
140
141impl Display for GroupTypeError {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        f.write_str("failed to convert processing group into corresponding type")
144    }
145}
146
147impl std::error::Error for GroupTypeError {}
148
149macro_rules! processing_group {
150    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
151        #[derive(Clone, Copy, Debug)]
152        pub struct $ty;
153
154        impl From<$ty> for ProcessingGroup {
155            fn from(_: $ty) -> Self {
156                ProcessingGroup::$variant
157            }
158        }
159
160        impl TryFrom<ProcessingGroup> for $ty {
161            type Error = GroupTypeError;
162
163            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
164                if matches!(value, ProcessingGroup::$variant) {
165                    return Ok($ty);
166                }
167                $($(
168                    if matches!(value, ProcessingGroup::$other) {
169                        return Ok($ty);
170                    }
171                )+)?
172                return Err(GroupTypeError);
173            }
174        }
175    };
176}
177
178/// A marker trait.
179///
180/// Should be used only with groups which are responsible for processing envelopes with events.
181pub trait EventProcessing {}
182
183processing_group!(TransactionGroup, Transaction);
184impl EventProcessing for TransactionGroup {}
185
186processing_group!(ErrorGroup, Error);
187impl EventProcessing for ErrorGroup {}
188
189processing_group!(SessionGroup, Session);
190processing_group!(StandaloneGroup, Standalone);
191processing_group!(ClientReportGroup, ClientReport);
192processing_group!(ReplayGroup, Replay);
193processing_group!(CheckInGroup, CheckIn);
194processing_group!(LogGroup, Log, Nel);
195processing_group!(TraceMetricGroup, TraceMetric);
196processing_group!(SpanGroup, Span);
197
198processing_group!(ProfileChunkGroup, ProfileChunk);
199processing_group!(MetricsGroup, Metrics);
200processing_group!(ForwardUnknownGroup, ForwardUnknown);
201processing_group!(Ungrouped, Ungrouped);
202
203/// Processed group type marker.
204///
205/// Marks the envelopes which passed through the processing pipeline.
206#[derive(Clone, Copy, Debug)]
207pub struct Processed;
208
209/// Describes the groups of the processable items.
210#[derive(Clone, Copy, Debug)]
211pub enum ProcessingGroup {
212    /// All the transaction related items.
213    ///
214    /// Includes transactions, related attachments, profiles.
215    Transaction,
216    /// All the items which require (have or create) events.
217    ///
218    /// This includes: errors, NEL, security reports, user reports, some of the
219    /// attachments.
220    Error,
221    /// Session events.
222    Session,
223    /// Standalone items which can be sent alone without any event attached to it in the current
224    /// envelope e.g. some attachments, user reports.
225    Standalone,
226    /// Outcomes.
227    ClientReport,
228    /// Replays and ReplayRecordings.
229    Replay,
230    /// Crons.
231    CheckIn,
232    /// NEL reports.
233    Nel,
234    /// Logs.
235    Log,
236    /// Trace metrics.
237    TraceMetric,
238    /// Spans.
239    Span,
240    /// Span V2 spans.
241    SpanV2,
242    /// Metrics.
243    Metrics,
244    /// ProfileChunk.
245    ProfileChunk,
246    /// V2 attachments without span / log association.
247    TraceAttachment,
248    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
249    /// decide what to do with them.
250    ForwardUnknown,
251    /// All the items in the envelope that could not be grouped.
252    Ungrouped,
253}
254
255impl ProcessingGroup {
256    /// Splits provided envelope into list of tuples of groups with associated envelopes.
257    fn split_envelope(
258        mut envelope: Envelope,
259        project_info: &ProjectInfo,
260    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
261        let headers = envelope.headers().clone();
262        let mut grouped_envelopes = smallvec![];
263
264        // Extract replays.
265        let replay_items = envelope.take_items_by(|item| {
266            matches!(
267                item.ty(),
268                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
269            )
270        });
271        if !replay_items.is_empty() {
272            grouped_envelopes.push((
273                ProcessingGroup::Replay,
274                Envelope::from_parts(headers.clone(), replay_items),
275            ))
276        }
277
278        // Keep all the sessions together in one envelope.
279        let session_items = envelope
280            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
281        if !session_items.is_empty() {
282            grouped_envelopes.push((
283                ProcessingGroup::Session,
284                Envelope::from_parts(headers.clone(), session_items),
285            ))
286        }
287
288        let span_v2_items = envelope.take_items_by(|item| {
289            let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
290            let otlp_feature = project_info.has_feature(Feature::SpanV2OtlpProcessing);
291            let is_supported_integration = {
292                matches!(
293                    item.integration(),
294                    Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
295                )
296            };
297            let is_span = matches!(item.ty(), &ItemType::Span);
298            let is_span_attachment = item.is_span_attachment();
299
300            ItemContainer::<SpanV2>::is_container(item)
301                || (exp_feature && is_span)
302                || ((exp_feature || otlp_feature) && is_supported_integration)
303                || (exp_feature && is_span_attachment)
304        });
305
306        if !span_v2_items.is_empty() {
307            grouped_envelopes.push((
308                ProcessingGroup::SpanV2,
309                Envelope::from_parts(headers.clone(), span_v2_items),
310            ))
311        }
312
313        // Extract spans.
314        let span_items = envelope.take_items_by(|item| {
315            matches!(item.ty(), &ItemType::Span)
316                || matches!(item.integration(), Some(Integration::Spans(_)))
317        });
318
319        if !span_items.is_empty() {
320            grouped_envelopes.push((
321                ProcessingGroup::Span,
322                Envelope::from_parts(headers.clone(), span_items),
323            ))
324        }
325
326        // Extract logs.
327        let logs_items = envelope.take_items_by(|item| {
328            matches!(item.ty(), &ItemType::Log)
329                || matches!(item.integration(), Some(Integration::Logs(_)))
330        });
331        if !logs_items.is_empty() {
332            grouped_envelopes.push((
333                ProcessingGroup::Log,
334                Envelope::from_parts(headers.clone(), logs_items),
335            ))
336        }
337
338        // Extract trace metrics.
339        let trace_metric_items =
340            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
341        if !trace_metric_items.is_empty() {
342            grouped_envelopes.push((
343                ProcessingGroup::TraceMetric,
344                Envelope::from_parts(headers.clone(), trace_metric_items),
345            ))
346        }
347
348        // NEL items are transformed into logs in their own processing step.
349        let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
350        if !nel_items.is_empty() {
351            grouped_envelopes.push((
352                ProcessingGroup::Nel,
353                Envelope::from_parts(headers.clone(), nel_items),
354            ))
355        }
356
357        // Extract all metric items.
358        //
359        // Note: Should only be relevant in proxy mode. In other modes we send metrics through
360        // a separate pipeline.
361        let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
362        if !metric_items.is_empty() {
363            grouped_envelopes.push((
364                ProcessingGroup::Metrics,
365                Envelope::from_parts(headers.clone(), metric_items),
366            ))
367        }
368
369        // Extract profile chunks.
370        let profile_chunk_items =
371            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
372        if !profile_chunk_items.is_empty() {
373            grouped_envelopes.push((
374                ProcessingGroup::ProfileChunk,
375                Envelope::from_parts(headers.clone(), profile_chunk_items),
376            ))
377        }
378
379        let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
380        if !trace_attachment_items.is_empty() {
381            grouped_envelopes.push((
382                ProcessingGroup::TraceAttachment,
383                Envelope::from_parts(headers.clone(), trace_attachment_items),
384            ))
385        }
386
387        // Extract all standalone items.
388        //
389        // Note: only if there are no items in the envelope which can create events, otherwise they
390        // will be in the same envelope with all require event items.
391        if !envelope.items().any(Item::creates_event) {
392            let standalone_items = envelope.take_items_by(Item::requires_event);
393            if !standalone_items.is_empty() {
394                grouped_envelopes.push((
395                    ProcessingGroup::Standalone,
396                    Envelope::from_parts(headers.clone(), standalone_items),
397                ))
398            }
399        };
400
401        // Make sure we create separate envelopes for each `RawSecurity` report.
402        let security_reports_items = envelope
403            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
404            .into_iter()
405            .map(|item| {
406                let headers = headers.clone();
407                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
408                let mut envelope = Envelope::from_parts(headers, items);
409                envelope.set_event_id(EventId::new());
410                (ProcessingGroup::Error, envelope)
411            });
412        grouped_envelopes.extend(security_reports_items);
413
414        // Extract all the items which require an event into separate envelope.
415        let require_event_items = envelope.take_items_by(Item::requires_event);
416        if !require_event_items.is_empty() {
417            let group = if require_event_items
418                .iter()
419                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
420            {
421                ProcessingGroup::Transaction
422            } else {
423                ProcessingGroup::Error
424            };
425
426            grouped_envelopes.push((
427                group,
428                Envelope::from_parts(headers.clone(), require_event_items),
429            ))
430        }
431
432        // Get the rest of the envelopes, one per item.
433        let envelopes = envelope.items_mut().map(|item| {
434            let headers = headers.clone();
435            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
436            let envelope = Envelope::from_parts(headers, items);
437            let item_type = item.ty();
438            let group = if matches!(item_type, &ItemType::CheckIn) {
439                ProcessingGroup::CheckIn
440            } else if matches!(item.ty(), &ItemType::ClientReport) {
441                ProcessingGroup::ClientReport
442            } else if matches!(item_type, &ItemType::Unknown(_)) {
443                ProcessingGroup::ForwardUnknown
444            } else {
445                // Cannot group this item type.
446                ProcessingGroup::Ungrouped
447            };
448
449            (group, envelope)
450        });
451        grouped_envelopes.extend(envelopes);
452
453        grouped_envelopes
454    }
455
456    /// Returns the name of the group.
457    pub fn variant(&self) -> &'static str {
458        match self {
459            ProcessingGroup::Transaction => "transaction",
460            ProcessingGroup::Error => "error",
461            ProcessingGroup::Session => "session",
462            ProcessingGroup::Standalone => "standalone",
463            ProcessingGroup::ClientReport => "client_report",
464            ProcessingGroup::Replay => "replay",
465            ProcessingGroup::CheckIn => "check_in",
466            ProcessingGroup::Log => "log",
467            ProcessingGroup::TraceMetric => "trace_metric",
468            ProcessingGroup::Nel => "nel",
469            ProcessingGroup::Span => "span",
470            ProcessingGroup::SpanV2 => "span_v2",
471            ProcessingGroup::Metrics => "metrics",
472            ProcessingGroup::ProfileChunk => "profile_chunk",
473            ProcessingGroup::TraceAttachment => "trace_attachment",
474            ProcessingGroup::ForwardUnknown => "forward_unknown",
475            ProcessingGroup::Ungrouped => "ungrouped",
476        }
477    }
478}
479
480impl From<ProcessingGroup> for AppFeature {
481    fn from(value: ProcessingGroup) -> Self {
482        match value {
483            ProcessingGroup::Transaction => AppFeature::Transactions,
484            ProcessingGroup::Error => AppFeature::Errors,
485            ProcessingGroup::Session => AppFeature::Sessions,
486            ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
487            ProcessingGroup::ClientReport => AppFeature::ClientReports,
488            ProcessingGroup::Replay => AppFeature::Replays,
489            ProcessingGroup::CheckIn => AppFeature::CheckIns,
490            ProcessingGroup::Log => AppFeature::Logs,
491            ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
492            ProcessingGroup::Nel => AppFeature::Logs,
493            ProcessingGroup::Span => AppFeature::Spans,
494            ProcessingGroup::SpanV2 => AppFeature::Spans,
495            ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
496            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
497            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
498            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
499            ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
500        }
501    }
502}
503
504/// An error returned when handling [`ProcessEnvelope`].
505#[derive(Debug, thiserror::Error)]
506pub enum ProcessingError {
507    #[error("invalid json in event")]
508    InvalidJson(#[source] serde_json::Error),
509
510    #[error("invalid message pack event payload")]
511    InvalidMsgpack(#[from] rmp_serde::decode::Error),
512
513    #[cfg(feature = "processing")]
514    #[error("invalid unreal crash report")]
515    InvalidUnrealReport(#[source] Unreal4Error),
516
517    #[error("event payload too large")]
518    PayloadTooLarge(DiscardItemType),
519
520    #[error("invalid transaction event")]
521    InvalidTransaction,
522
523    #[error("envelope processor failed")]
524    ProcessingFailed(#[from] ProcessingAction),
525
526    #[error("duplicate {0} in event")]
527    DuplicateItem(ItemType),
528
529    #[error("failed to extract event payload")]
530    NoEventPayload,
531
532    #[error("missing project id in DSN")]
533    MissingProjectId,
534
535    #[error("invalid security report type: {0:?}")]
536    InvalidSecurityType(Bytes),
537
538    #[error("unsupported security report type")]
539    UnsupportedSecurityType,
540
541    #[error("invalid security report")]
542    InvalidSecurityReport(#[source] serde_json::Error),
543
544    #[error("invalid nel report")]
545    InvalidNelReport(#[source] NetworkReportError),
546
547    #[error("event filtered with reason: {0:?}")]
548    EventFiltered(FilterStatKey),
549
550    #[error("missing or invalid required event timestamp")]
551    InvalidTimestamp,
552
553    #[error("could not serialize event payload")]
554    SerializeFailed(#[source] serde_json::Error),
555
556    #[cfg(feature = "processing")]
557    #[error("failed to apply quotas")]
558    QuotasFailed(#[from] RateLimitingError),
559
560    #[error("invalid pii config")]
561    PiiConfigError(PiiConfigError),
562
563    #[error("invalid processing group type")]
564    InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
565
566    #[error("invalid replay")]
567    InvalidReplay(DiscardReason),
568
569    #[error("replay filtered with reason: {0:?}")]
570    ReplayFiltered(FilterStatKey),
571
572    #[cfg(feature = "processing")]
573    #[error("nintendo switch dying message processing failed {0:?}")]
574    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
575
576    #[cfg(all(sentry, feature = "processing"))]
577    #[error("playstation dump processing failed: {0}")]
578    InvalidPlaystationDump(String),
579
580    #[error("processing group does not match specific processor")]
581    ProcessingGroupMismatch,
582    #[error("new processing pipeline failed")]
583    ProcessingFailure,
584}
585
586impl ProcessingError {
587    pub fn to_outcome(&self) -> Option<Outcome> {
588        match self {
589            Self::PayloadTooLarge(payload_type) => {
590                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
591            }
592            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
593            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
594            Self::InvalidSecurityType(_) => {
595                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
596            }
597            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
598            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
599            Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
600            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
601            Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
602            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
603            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
604            #[cfg(feature = "processing")]
605            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
606            #[cfg(all(sentry, feature = "processing"))]
607            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
608            #[cfg(feature = "processing")]
609            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
610                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
611            }
612            #[cfg(feature = "processing")]
613            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
614            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
615                Some(Outcome::Invalid(DiscardReason::Internal))
616            }
617            #[cfg(feature = "processing")]
618            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
619            Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
620            Self::MissingProjectId => None,
621            Self::EventFiltered(_) => None,
622            Self::InvalidProcessingGroup(_) => None,
623            Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
624            Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
625
626            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
627            // Outcomes are emitted in the new processing pipeline already.
628            Self::ProcessingFailure => None,
629        }
630    }
631
632    fn is_unexpected(&self) -> bool {
633        self.to_outcome()
634            .is_some_and(|outcome| outcome.is_unexpected())
635    }
636}
637
638#[cfg(feature = "processing")]
639impl From<Unreal4Error> for ProcessingError {
640    fn from(err: Unreal4Error) -> Self {
641        match err.kind() {
642            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
643            _ => ProcessingError::InvalidUnrealReport(err),
644        }
645    }
646}
647
648impl From<ExtractMetricsError> for ProcessingError {
649    fn from(error: ExtractMetricsError) -> Self {
650        match error {
651            ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
652                Self::InvalidTimestamp
653            }
654        }
655    }
656}
657
658impl From<InvalidProcessingGroupType> for ProcessingError {
659    fn from(value: InvalidProcessingGroupType) -> Self {
660        Self::InvalidProcessingGroup(Box::new(value))
661    }
662}
663
664type ExtractedEvent = (Annotated<Event>, usize);
665
666/// A container for extracted metrics during processing.
667///
668/// The container enforces that the extracted metrics are correctly tagged
669/// with the dynamic sampling decision.
670#[derive(Debug)]
671pub struct ProcessingExtractedMetrics {
672    metrics: ExtractedMetrics,
673}
674
675impl ProcessingExtractedMetrics {
676    pub fn new() -> Self {
677        Self {
678            metrics: ExtractedMetrics::default(),
679        }
680    }
681
682    pub fn into_inner(self) -> ExtractedMetrics {
683        self.metrics
684    }
685
686    /// Extends the contained metrics with [`ExtractedMetrics`].
687    pub fn extend(
688        &mut self,
689        extracted: ExtractedMetrics,
690        sampling_decision: Option<SamplingDecision>,
691    ) {
692        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
693        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
694    }
695
696    /// Extends the contained project metrics.
697    pub fn extend_project_metrics<I>(
698        &mut self,
699        buckets: I,
700        sampling_decision: Option<SamplingDecision>,
701    ) where
702        I: IntoIterator<Item = Bucket>,
703    {
704        self.metrics
705            .project_metrics
706            .extend(buckets.into_iter().map(|mut bucket| {
707                bucket.metadata.extracted_from_indexed =
708                    sampling_decision == Some(SamplingDecision::Keep);
709                bucket
710            }));
711    }
712
713    /// Extends the contained sampling metrics.
714    pub fn extend_sampling_metrics<I>(
715        &mut self,
716        buckets: I,
717        sampling_decision: Option<SamplingDecision>,
718    ) where
719        I: IntoIterator<Item = Bucket>,
720    {
721        self.metrics
722            .sampling_metrics
723            .extend(buckets.into_iter().map(|mut bucket| {
724                bucket.metadata.extracted_from_indexed =
725                    sampling_decision == Some(SamplingDecision::Keep);
726                bucket
727            }));
728    }
729
730    /// Applies rate limits to the contained metrics.
731    ///
732    /// This is used to apply rate limits which have been enforced on sampled items of an envelope
733    /// to also consistently apply to the metrics extracted from these items.
734    #[cfg(feature = "processing")]
735    fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
736        // Metric namespaces which need to be dropped.
737        let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
738        // Metrics belonging to this metric namespace need to have the `extracted_from_indexed`
739        // flag reset to `false`.
740        let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
741
742        for (namespace, limit, indexed) in [
743            (
744                MetricNamespace::Transactions,
745                &enforcement.event,
746                &enforcement.event_indexed,
747            ),
748            (
749                MetricNamespace::Spans,
750                &enforcement.spans,
751                &enforcement.spans_indexed,
752            ),
753        ] {
754            if limit.is_active() {
755                drop_namespaces.push(namespace);
756            } else if indexed.is_active() && !enforced_consistently {
757                // If the enforcement was not computed by consistently checking the limits,
758                // the quota for the metrics has not yet been incremented.
759                // In this case we have a dropped indexed payload but a metric which still needs to
760                // be accounted for, make sure the metric will still be rate limited.
761                reset_extracted_from_indexed.push(namespace);
762            }
763        }
764
765        if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
766            self.retain_mut(|bucket| {
767                let Some(namespace) = bucket.name.try_namespace() else {
768                    return true;
769                };
770
771                if drop_namespaces.contains(&namespace) {
772                    return false;
773                }
774
775                if reset_extracted_from_indexed.contains(&namespace) {
776                    bucket.metadata.extracted_from_indexed = false;
777                }
778
779                true
780            });
781        }
782    }
783
784    #[cfg(feature = "processing")]
785    fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
786        self.metrics.project_metrics.retain_mut(&mut f);
787        self.metrics.sampling_metrics.retain_mut(&mut f);
788    }
789}
790
791fn send_metrics(
792    metrics: ExtractedMetrics,
793    project_key: ProjectKey,
794    sampling_key: Option<ProjectKey>,
795    aggregator: &Addr<Aggregator>,
796) {
797    let ExtractedMetrics {
798        project_metrics,
799        sampling_metrics,
800    } = metrics;
801
802    if !project_metrics.is_empty() {
803        aggregator.send(MergeBuckets {
804            project_key,
805            buckets: project_metrics,
806        });
807    }
808
809    if !sampling_metrics.is_empty() {
810        // If no sampling project state is available, we associate the sampling
811        // metrics with the current project.
812        //
813        // project_without_tracing         -> metrics goes to self
814        // dependent_project_with_tracing  -> metrics goes to root
815        // root_project_with_tracing       -> metrics goes to root == self
816        let sampling_project_key = sampling_key.unwrap_or(project_key);
817        aggregator.send(MergeBuckets {
818            project_key: sampling_project_key,
819            buckets: sampling_metrics,
820        });
821    }
822}
823
824/// Function for on-off switches that filter specific item types (profiles, spans)
825/// based on a feature flag.
826///
827/// If the project config did not come from the upstream, we keep the items.
828fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
829    match config.relay_mode() {
830        RelayMode::Proxy => false,
831        RelayMode::Managed => !project_info.has_feature(feature),
832    }
833}
834
835/// The result of the envelope processing containing the processed envelope along with the partial
836/// result.
837#[derive(Debug)]
838#[expect(
839    clippy::large_enum_variant,
840    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
841)]
842enum ProcessingResult {
843    Envelope {
844        managed_envelope: TypedEnvelope<Processed>,
845        extracted_metrics: ProcessingExtractedMetrics,
846    },
847    Output(Output<Outputs>),
848}
849
850impl ProcessingResult {
851    /// Creates a [`ProcessingResult`] with no metrics.
852    fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
853        Self::Envelope {
854            managed_envelope,
855            extracted_metrics: ProcessingExtractedMetrics::new(),
856        }
857    }
858}
859
860/// All items which can be submitted upstream.
861#[derive(Debug)]
862#[expect(
863    clippy::large_enum_variant,
864    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
865)]
866enum Submit<'a> {
867    /// A processed envelope.
868    Envelope(TypedEnvelope<Processed>),
869    /// The output of a [`processing::Processor`].
870    Output {
871        output: Outputs,
872        ctx: processing::ForwardContext<'a>,
873    },
874}
875
876/// Applies processing to all contents of the given envelope.
877///
878/// Depending on the contents of the envelope and Relay's mode, this includes:
879///
880///  - Basic normalization and validation for all item types.
881///  - Clock drift correction if the required `sent_at` header is present.
882///  - Expansion of certain item types (e.g. unreal).
883///  - Store normalization for event payloads in processing mode.
884///  - Rate limiters and inbound filters on events in processing mode.
885#[derive(Debug)]
886pub struct ProcessEnvelope {
887    /// Envelope to process.
888    pub envelope: ManagedEnvelope,
889    /// The project info.
890    pub project_info: Arc<ProjectInfo>,
891    /// Currently active cached rate limits for this project.
892    pub rate_limits: Arc<RateLimits>,
893    /// Root sampling project info.
894    pub sampling_project_info: Option<Arc<ProjectInfo>>,
895    /// Sampling reservoir counters.
896    pub reservoir_counters: ReservoirCounters,
897}
898
899/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
900#[derive(Debug)]
901struct ProcessEnvelopeGrouped<'a> {
902    /// The group the envelope belongs to.
903    pub group: ProcessingGroup,
904    /// Envelope to process.
905    pub envelope: ManagedEnvelope,
906    /// The processing context.
907    pub ctx: processing::Context<'a>,
908}
909
910/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
911///
912/// This parses and validates the metrics:
913///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
914///    ignored independently.
915///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
916///    dropped together on parsing failure.
917///  - Other envelope items will be ignored with an error message.
918///
919/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
920/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
921#[derive(Debug)]
922pub struct ProcessMetrics {
923    /// A list of metric items.
924    pub data: MetricData,
925    /// The target project.
926    pub project_key: ProjectKey,
927    /// Whether to keep or reset the metric metadata.
928    pub source: BucketSource,
929    /// The wall clock time at which the request was received.
930    pub received_at: DateTime<Utc>,
931    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
932    /// correction.
933    pub sent_at: Option<DateTime<Utc>>,
934}
935
936/// Raw unparsed metric data.
937#[derive(Debug)]
938pub enum MetricData {
939    /// Raw data, unparsed envelope items.
940    Raw(Vec<Item>),
941    /// Already parsed buckets but unprocessed.
942    Parsed(Vec<Bucket>),
943}
944
945impl MetricData {
946    /// Consumes the metric data and parses the contained buckets.
947    ///
948    /// If the contained data is already parsed the buckets are returned unchanged.
949    /// Raw buckets are parsed and created with the passed `timestamp`.
950    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
951        let items = match self {
952            Self::Parsed(buckets) => return buckets,
953            Self::Raw(items) => items,
954        };
955
956        let mut buckets = Vec::new();
957        for item in items {
958            let payload = item.payload();
959            if item.ty() == &ItemType::Statsd {
960                for bucket_result in Bucket::parse_all(&payload, timestamp) {
961                    match bucket_result {
962                        Ok(bucket) => buckets.push(bucket),
963                        Err(error) => relay_log::debug!(
964                            error = &error as &dyn Error,
965                            "failed to parse metric bucket from statsd format",
966                        ),
967                    }
968                }
969            } else if item.ty() == &ItemType::MetricBuckets {
970                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
971                    Ok(parsed_buckets) => {
972                        // Re-use the allocation of `b` if possible.
973                        if buckets.is_empty() {
974                            buckets = parsed_buckets;
975                        } else {
976                            buckets.extend(parsed_buckets);
977                        }
978                    }
979                    Err(error) => {
980                        relay_log::debug!(
981                            error = &error as &dyn Error,
982                            "failed to parse metric bucket",
983                        );
984                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
985                    }
986                }
987            } else {
988                relay_log::error!(
989                    "invalid item of type {} passed to ProcessMetrics",
990                    item.ty()
991                );
992            }
993        }
994        buckets
995    }
996}
997
998#[derive(Debug)]
999pub struct ProcessBatchedMetrics {
1000    /// Metrics payload in JSON format.
1001    pub payload: Bytes,
1002    /// Whether to keep or reset the metric metadata.
1003    pub source: BucketSource,
1004    /// The wall clock time at which the request was received.
1005    pub received_at: DateTime<Utc>,
1006    /// The wall clock time at which the request was received.
1007    pub sent_at: Option<DateTime<Utc>>,
1008}
1009
1010/// Source information where a metric bucket originates from.
1011#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1012pub enum BucketSource {
1013    /// The metric bucket originated from an internal Relay use case.
1014    ///
1015    /// The metric bucket originates either from within the same Relay
1016    /// or was accepted coming from another Relay which is registered as
1017    /// an internal Relay via Relay's configuration.
1018    Internal,
1019    /// The bucket source originated from an untrusted source.
1020    ///
1021    /// Managed Relays sending extracted metrics are considered external,
1022    /// it's a project use case but it comes from an untrusted source.
1023    External,
1024}
1025
1026impl BucketSource {
1027    /// Infers the bucket source from [`RequestMeta::request_trust`].
1028    pub fn from_meta(meta: &RequestMeta) -> Self {
1029        match meta.request_trust() {
1030            RequestTrust::Trusted => Self::Internal,
1031            RequestTrust::Untrusted => Self::External,
1032        }
1033    }
1034}
1035
1036/// Sends a client report to the upstream.
1037#[derive(Debug)]
1038pub struct SubmitClientReports {
1039    /// The client report to be sent.
1040    pub client_reports: Vec<ClientReport>,
1041    /// Scoping information for the client report.
1042    pub scoping: Scoping,
1043}
1044
1045/// CPU-intensive processing tasks for envelopes.
1046#[derive(Debug)]
1047pub enum EnvelopeProcessor {
1048    ProcessEnvelope(Box<ProcessEnvelope>),
1049    ProcessProjectMetrics(Box<ProcessMetrics>),
1050    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1051    FlushBuckets(Box<FlushBuckets>),
1052    SubmitClientReports(Box<SubmitClientReports>),
1053}
1054
1055impl EnvelopeProcessor {
1056    /// Returns the name of the message variant.
1057    pub fn variant(&self) -> &'static str {
1058        match self {
1059            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1060            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1061            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1062            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1063            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1064        }
1065    }
1066}
1067
1068impl relay_system::Interface for EnvelopeProcessor {}
1069
1070impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1071    type Response = relay_system::NoResponse;
1072
1073    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1074        Self::ProcessEnvelope(Box::new(message))
1075    }
1076}
1077
1078impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1079    type Response = NoResponse;
1080
1081    fn from_message(message: ProcessMetrics, _: ()) -> Self {
1082        Self::ProcessProjectMetrics(Box::new(message))
1083    }
1084}
1085
1086impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1087    type Response = NoResponse;
1088
1089    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1090        Self::ProcessBatchedMetrics(Box::new(message))
1091    }
1092}
1093
1094impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1095    type Response = NoResponse;
1096
1097    fn from_message(message: FlushBuckets, _: ()) -> Self {
1098        Self::FlushBuckets(Box::new(message))
1099    }
1100}
1101
1102impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1103    type Response = NoResponse;
1104
1105    fn from_message(message: SubmitClientReports, _: ()) -> Self {
1106        Self::SubmitClientReports(Box::new(message))
1107    }
1108}
1109
1110/// The asynchronous thread pool used for scheduling processing tasks in the processor.
1111pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1112
1113/// Service implementing the [`EnvelopeProcessor`] interface.
1114///
1115/// This service handles messages in a worker pool with configurable concurrency.
1116#[derive(Clone)]
1117pub struct EnvelopeProcessorService {
1118    inner: Arc<InnerProcessor>,
1119}
1120
1121/// Contains the addresses of services that the processor publishes to.
1122pub struct Addrs {
1123    pub outcome_aggregator: Addr<TrackOutcome>,
1124    pub upstream_relay: Addr<UpstreamRelay>,
1125    #[cfg(feature = "processing")]
1126    pub upload: Option<Addr<Upload>>,
1127    #[cfg(feature = "processing")]
1128    pub store_forwarder: Option<Addr<Store>>,
1129    pub aggregator: Addr<Aggregator>,
1130    #[cfg(feature = "processing")]
1131    pub global_rate_limits: Option<Addr<GlobalRateLimits>>,
1132}
1133
1134impl Default for Addrs {
1135    fn default() -> Self {
1136        Addrs {
1137            outcome_aggregator: Addr::dummy(),
1138            upstream_relay: Addr::dummy(),
1139            #[cfg(feature = "processing")]
1140            upload: None,
1141            #[cfg(feature = "processing")]
1142            store_forwarder: None,
1143            aggregator: Addr::dummy(),
1144            #[cfg(feature = "processing")]
1145            global_rate_limits: None,
1146        }
1147    }
1148}
1149
1150struct InnerProcessor {
1151    pool: EnvelopeProcessorServicePool,
1152    config: Arc<Config>,
1153    global_config: GlobalConfigHandle,
1154    project_cache: ProjectCacheHandle,
1155    cogs: Cogs,
1156    #[cfg(feature = "processing")]
1157    quotas_client: Option<AsyncRedisClient>,
1158    addrs: Addrs,
1159    #[cfg(feature = "processing")]
1160    rate_limiter: Option<Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>>,
1161    geoip_lookup: GeoIpLookup,
1162    #[cfg(feature = "processing")]
1163    cardinality_limiter: Option<CardinalityLimiter>,
1164    metric_outcomes: MetricOutcomes,
1165    processing: Processing,
1166}
1167
1168struct Processing {
1169    logs: LogsProcessor,
1170    trace_metrics: TraceMetricsProcessor,
1171    spans: SpansProcessor,
1172    check_ins: CheckInsProcessor,
1173    sessions: SessionsProcessor,
1174    profile_chunks: ProfileChunksProcessor,
1175    trace_attachments: TraceAttachmentsProcessor,
1176}
1177
1178impl EnvelopeProcessorService {
1179    /// Creates a multi-threaded envelope processor.
1180    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1181    pub fn new(
1182        pool: EnvelopeProcessorServicePool,
1183        config: Arc<Config>,
1184        global_config: GlobalConfigHandle,
1185        project_cache: ProjectCacheHandle,
1186        cogs: Cogs,
1187        #[cfg(feature = "processing")] redis: Option<RedisClients>,
1188        addrs: Addrs,
1189        metric_outcomes: MetricOutcomes,
1190    ) -> Self {
1191        let geoip_lookup = config
1192            .geoip_path()
1193            .and_then(
1194                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1195                    Ok(geoip) => Some(geoip),
1196                    Err(err) => {
1197                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1198                        None
1199                    }
1200                },
1201            )
1202            .unwrap_or_else(GeoIpLookup::empty);
1203
1204        #[cfg(feature = "processing")]
1205        let (cardinality, quotas) = match redis {
1206            Some(RedisClients {
1207                cardinality,
1208                quotas,
1209                ..
1210            }) => (Some(cardinality), Some(quotas)),
1211            None => (None, None),
1212        };
1213
1214        #[cfg(feature = "processing")]
1215        let global_rate_limits = addrs.global_rate_limits.clone().map(Into::into);
1216
1217        #[cfg(feature = "processing")]
1218        let rate_limiter = match (quotas.clone(), global_rate_limits) {
1219            (Some(redis), Some(global)) => Some(
1220                RedisRateLimiter::new(redis, global)
1221                    .max_limit(config.max_rate_limit())
1222                    .cache(config.quota_cache_ratio(), config.quota_cache_max()),
1223            ),
1224            _ => None,
1225        };
1226
1227        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1228            #[cfg(feature = "processing")]
1229            project_cache.clone(),
1230            #[cfg(feature = "processing")]
1231            rate_limiter.clone(),
1232        ));
1233        #[cfg(feature = "processing")]
1234        let rate_limiter = rate_limiter.map(Arc::new);
1235
1236        let inner = InnerProcessor {
1237            pool,
1238            global_config,
1239            project_cache,
1240            cogs,
1241            #[cfg(feature = "processing")]
1242            quotas_client: quotas.clone(),
1243            #[cfg(feature = "processing")]
1244            rate_limiter,
1245            addrs,
1246            #[cfg(feature = "processing")]
1247            cardinality_limiter: cardinality
1248                .map(|cardinality| {
1249                    RedisSetLimiter::new(
1250                        RedisSetLimiterOptions {
1251                            cache_vacuum_interval: config
1252                                .cardinality_limiter_cache_vacuum_interval(),
1253                        },
1254                        cardinality,
1255                    )
1256                })
1257                .map(CardinalityLimiter::new),
1258            metric_outcomes,
1259            processing: Processing {
1260                logs: LogsProcessor::new(Arc::clone(&quota_limiter)),
1261                trace_metrics: TraceMetricsProcessor::new(Arc::clone(&quota_limiter)),
1262                spans: SpansProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1263                check_ins: CheckInsProcessor::new(Arc::clone(&quota_limiter)),
1264                sessions: SessionsProcessor::new(Arc::clone(&quota_limiter)),
1265                profile_chunks: ProfileChunksProcessor::new(Arc::clone(&quota_limiter)),
1266                trace_attachments: TraceAttachmentsProcessor::new(quota_limiter),
1267            },
1268            geoip_lookup,
1269            config,
1270        };
1271
1272        Self {
1273            inner: Arc::new(inner),
1274        }
1275    }
1276
1277    async fn enforce_quotas<Group>(
1278        &self,
1279        managed_envelope: &mut TypedEnvelope<Group>,
1280        event: Annotated<Event>,
1281        extracted_metrics: &mut ProcessingExtractedMetrics,
1282        ctx: processing::Context<'_>,
1283    ) -> Result<Annotated<Event>, ProcessingError> {
1284        // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not
1285        // applied in the fast path, all cached quotas can be applied here.
1286        let cached_result = RateLimiter::Cached
1287            .enforce(managed_envelope, event, extracted_metrics, ctx)
1288            .await?;
1289
1290        if_processing!(self.inner.config, {
1291            let rate_limiter = match self.inner.rate_limiter.clone() {
1292                Some(rate_limiter) => rate_limiter,
1293                None => return Ok(cached_result.event),
1294            };
1295
1296            // Enforce all quotas consistently with Redis.
1297            let consistent_result = RateLimiter::Consistent(rate_limiter)
1298                .enforce(
1299                    managed_envelope,
1300                    cached_result.event,
1301                    extracted_metrics,
1302                    ctx
1303                )
1304                .await?;
1305
1306            // Update cached rate limits with the freshly computed ones.
1307            if !consistent_result.rate_limits.is_empty() {
1308                self.inner
1309                    .project_cache
1310                    .get(managed_envelope.scoping().project_key)
1311                    .rate_limits()
1312                    .merge(consistent_result.rate_limits);
1313            }
1314
1315            Ok(consistent_result.event)
1316        } else { Ok(cached_result.event) })
1317    }
1318
1319    /// Processes the general errors, and the items which require or create the events.
1320    async fn process_errors(
1321        &self,
1322        managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1323        project_id: ProjectId,
1324        mut ctx: processing::Context<'_>,
1325    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1326        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1327        let mut metrics = Metrics::default();
1328        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1329
1330        // Events can also contain user reports.
1331        report::process_user_reports(managed_envelope);
1332
1333        if_processing!(self.inner.config, {
1334            unreal::expand(managed_envelope, &self.inner.config)?;
1335            #[cfg(sentry)]
1336            playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1337            nnswitch::expand(managed_envelope)?;
1338        });
1339
1340        let extraction_result = event::extract(
1341            managed_envelope,
1342            &mut metrics,
1343            event_fully_normalized,
1344            &self.inner.config,
1345        )?;
1346        let mut event = extraction_result.event;
1347
1348        if_processing!(self.inner.config, {
1349            if let Some(inner_event_fully_normalized) =
1350                unreal::process(managed_envelope, &mut event)?
1351            {
1352                event_fully_normalized = inner_event_fully_normalized;
1353            }
1354            #[cfg(sentry)]
1355            if let Some(inner_event_fully_normalized) =
1356                playstation::process(managed_envelope, &mut event, ctx.project_info)?
1357            {
1358                event_fully_normalized = inner_event_fully_normalized;
1359            }
1360            if let Some(inner_event_fully_normalized) =
1361                attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1362            {
1363                event_fully_normalized = inner_event_fully_normalized;
1364            }
1365        });
1366
1367        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1368            managed_envelope,
1369            &mut event,
1370            ctx.project_info,
1371            ctx.sampling_project_info,
1372        );
1373
1374        let attachments = managed_envelope
1375            .envelope()
1376            .items()
1377            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1378        processing::utils::event::finalize(
1379            managed_envelope.envelope().headers(),
1380            &mut event,
1381            attachments,
1382            &mut metrics,
1383            ctx.config,
1384        )?;
1385        event_fully_normalized = processing::utils::event::normalize(
1386            managed_envelope.envelope().headers(),
1387            &mut event,
1388            event_fully_normalized,
1389            project_id,
1390            ctx,
1391            &self.inner.geoip_lookup,
1392        )?;
1393        let filter_run =
1394            processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1395                .map_err(|err| {
1396                managed_envelope.reject(Outcome::Filtered(err.clone()));
1397                ProcessingError::EventFiltered(err)
1398            })?;
1399
1400        if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1401            dynamic_sampling::tag_error_with_sampling_decision(
1402                managed_envelope,
1403                &mut event,
1404                ctx.sampling_project_info,
1405                &self.inner.config,
1406            )
1407            .await;
1408        }
1409
1410        event = self
1411            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1412            .await?;
1413
1414        if event.value().is_some() {
1415            processing::utils::event::scrub(&mut event, ctx.project_info)?;
1416            event::serialize(
1417                managed_envelope,
1418                &mut event,
1419                event_fully_normalized,
1420                EventMetricsExtracted(false),
1421                SpansExtracted(false),
1422            )?;
1423            event::emit_feedback_metrics(managed_envelope.envelope());
1424        }
1425
1426        let attachments = managed_envelope
1427            .envelope_mut()
1428            .items_mut()
1429            .filter(|i| i.ty() == &ItemType::Attachment);
1430        processing::utils::attachments::scrub(attachments, ctx.project_info);
1431
1432        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1433            relay_log::error!(
1434                tags.project = %project_id,
1435                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1436                "ingested event without normalizing"
1437            );
1438        }
1439
1440        Ok(Some(extracted_metrics))
1441    }
1442
1443    /// Processes only transactions and transaction-related items.
1444    #[allow(unused_assignments)]
1445    async fn process_transactions(
1446        &self,
1447        managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1448        cogs: &mut Token,
1449        project_id: ProjectId,
1450        mut ctx: processing::Context<'_>,
1451    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1452        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1453        let mut event_metrics_extracted = EventMetricsExtracted(false);
1454        let mut spans_extracted = SpansExtracted(false);
1455        let mut metrics = Metrics::default();
1456        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1457
1458        // We extract the main event from the envelope.
1459        let extraction_result = event::extract(
1460            managed_envelope,
1461            &mut metrics,
1462            event_fully_normalized,
1463            &self.inner.config,
1464        )?;
1465
1466        // If metrics were extracted we mark that.
1467        if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1468            event_metrics_extracted = inner_event_metrics_extracted;
1469        }
1470        if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1471            spans_extracted = inner_spans_extracted;
1472        };
1473
1474        // We take the main event out of the result.
1475        let mut event = extraction_result.event;
1476
1477        let profile_id = profile::filter(
1478            managed_envelope,
1479            &event,
1480            ctx.config,
1481            project_id,
1482            ctx.project_info,
1483        );
1484        processing::transactions::profile::transfer_id(&mut event, profile_id);
1485        processing::transactions::profile::remove_context_if_rate_limited(
1486            &mut event,
1487            managed_envelope.scoping(),
1488            ctx,
1489        );
1490
1491        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1492            managed_envelope,
1493            &mut event,
1494            ctx.project_info,
1495            ctx.sampling_project_info,
1496        );
1497
1498        let attachments = managed_envelope
1499            .envelope()
1500            .items()
1501            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1502        processing::utils::event::finalize(
1503            managed_envelope.envelope().headers(),
1504            &mut event,
1505            attachments,
1506            &mut metrics,
1507            &self.inner.config,
1508        )?;
1509
1510        event_fully_normalized = processing::utils::event::normalize(
1511            managed_envelope.envelope().headers(),
1512            &mut event,
1513            event_fully_normalized,
1514            project_id,
1515            ctx,
1516            &self.inner.geoip_lookup,
1517        )?;
1518
1519        let filter_run =
1520            processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1521                .map_err(|err| {
1522                managed_envelope.reject(Outcome::Filtered(err.clone()));
1523                ProcessingError::EventFiltered(err)
1524            })?;
1525
1526        // Always run dynamic sampling on processing Relays,
1527        // but delay decision until inbound filters have been fully processed.
1528        // Also, we require transaction metrics to be enabled before sampling.
1529        let run_dynamic_sampling = (matches!(filter_run, FiltersStatus::Ok)
1530            || self.inner.config.processing_enabled())
1531            && matches!(&ctx.project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled());
1532
1533        let sampling_result = match run_dynamic_sampling {
1534            true => {
1535                #[allow(unused_mut)]
1536                let mut reservoir = ReservoirEvaluator::new(Arc::clone(ctx.reservoir_counters));
1537                #[cfg(feature = "processing")]
1538                if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
1539                    reservoir.set_redis(managed_envelope.scoping().organization_id, quotas_client);
1540                }
1541                processing::utils::dynamic_sampling::run(
1542                    managed_envelope.envelope().headers().dsc(),
1543                    event.value(),
1544                    &ctx,
1545                    Some(&reservoir),
1546                )
1547                .await
1548            }
1549            false => SamplingResult::Pending,
1550        };
1551
1552        relay_statsd::metric!(
1553            counter(RelayCounters::SamplingDecision) += 1,
1554            decision = sampling_result.decision().as_str(),
1555            item = "transaction"
1556        );
1557
1558        #[cfg(feature = "processing")]
1559        let server_sample_rate = sampling_result.sample_rate();
1560
1561        if let Some(outcome) = sampling_result.into_dropped_outcome() {
1562            // Process profiles before dropping the transaction, if necessary.
1563            // Before metric extraction to make sure the profile count is reflected correctly.
1564            profile::process(
1565                managed_envelope,
1566                &mut event,
1567                ctx.global_config,
1568                ctx.config,
1569                ctx.project_info,
1570            );
1571            // Extract metrics here, we're about to drop the event/transaction.
1572            event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1573                &mut event,
1574                &mut extracted_metrics,
1575                ExtractMetricsContext {
1576                    dsc: managed_envelope.envelope().dsc(),
1577                    project_id,
1578                    ctx,
1579                    sampling_decision: SamplingDecision::Drop,
1580                    metrics_extracted: event_metrics_extracted.0,
1581                    spans_extracted: spans_extracted.0,
1582                },
1583            )?;
1584
1585            dynamic_sampling::drop_unsampled_items(
1586                managed_envelope,
1587                event,
1588                outcome,
1589                spans_extracted,
1590            );
1591
1592            // At this point we have:
1593            //  - An empty envelope.
1594            //  - An envelope containing only processed profiles.
1595            // We need to make sure there are enough quotas for these profiles.
1596            event = self
1597                .enforce_quotas(
1598                    managed_envelope,
1599                    Annotated::empty(),
1600                    &mut extracted_metrics,
1601                    ctx,
1602                )
1603                .await?;
1604
1605            return Ok(Some(extracted_metrics));
1606        }
1607
1608        let _post_ds = cogs.start_category("post_ds");
1609
1610        // Need to scrub the transaction before extracting spans.
1611        //
1612        // Unconditionally scrub to make sure PII is removed as early as possible.
1613        processing::utils::event::scrub(&mut event, ctx.project_info)?;
1614
1615        let attachments = managed_envelope
1616            .envelope_mut()
1617            .items_mut()
1618            .filter(|i| i.ty() == &ItemType::Attachment);
1619        processing::utils::attachments::scrub(attachments, ctx.project_info);
1620
1621        if_processing!(self.inner.config, {
1622            // Process profiles before extracting metrics, to make sure they are removed if they are invalid.
1623            let profile_id = profile::process(
1624                managed_envelope,
1625                &mut event,
1626                ctx.global_config,
1627                ctx.config,
1628                ctx.project_info,
1629            );
1630            processing::transactions::profile::transfer_id(&mut event, profile_id);
1631            processing::transactions::profile::scrub_profiler_id(&mut event);
1632
1633            // Always extract metrics in processing Relays for sampled items.
1634            event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1635                &mut event,
1636                &mut extracted_metrics,
1637                ExtractMetricsContext {
1638                    dsc: managed_envelope.envelope().dsc(),
1639                    project_id,
1640                    ctx,
1641                    sampling_decision: SamplingDecision::Keep,
1642                    metrics_extracted: event_metrics_extracted.0,
1643                    spans_extracted: spans_extracted.0,
1644                },
1645            )?;
1646
1647            if let Some(spans) = processing::transactions::spans::extract_from_event(
1648                managed_envelope.envelope().dsc(),
1649                &event,
1650                ctx.global_config,
1651                ctx.config,
1652                server_sample_rate,
1653                event_metrics_extracted,
1654                spans_extracted,
1655            ) {
1656                spans_extracted = SpansExtracted(true);
1657                for item in spans {
1658                    match item {
1659                        Ok(item) => managed_envelope.envelope_mut().add_item(item),
1660                        Err(()) => managed_envelope.track_outcome(
1661                            Outcome::Invalid(DiscardReason::InvalidSpan),
1662                            DataCategory::SpanIndexed,
1663                            1,
1664                        ),
1665                        // TODO: also `DataCategory::Span`?
1666                    }
1667                }
1668            }
1669        });
1670
1671        event = self
1672            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1673            .await?;
1674
1675        // Event may have been dropped because of a quota and the envelope can be empty.
1676        if event.value().is_some() {
1677            event::serialize(
1678                managed_envelope,
1679                &mut event,
1680                event_fully_normalized,
1681                event_metrics_extracted,
1682                spans_extracted,
1683            )?;
1684        }
1685
1686        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1687            relay_log::error!(
1688                tags.project = %project_id,
1689                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1690                "ingested event without normalizing"
1691            );
1692        };
1693
1694        Ok(Some(extracted_metrics))
1695    }
1696
1697    /// Processes standalone items that require an event ID, but do not have an event on the same envelope.
1698    async fn process_standalone(
1699        &self,
1700        managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1701        project_id: ProjectId,
1702        ctx: processing::Context<'_>,
1703    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1704        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1705
1706        standalone::process(managed_envelope);
1707
1708        profile::filter(
1709            managed_envelope,
1710            &Annotated::empty(),
1711            ctx.config,
1712            project_id,
1713            ctx.project_info,
1714        );
1715
1716        self.enforce_quotas(
1717            managed_envelope,
1718            Annotated::empty(),
1719            &mut extracted_metrics,
1720            ctx,
1721        )
1722        .await?;
1723
1724        report::process_user_reports(managed_envelope);
1725        let attachments = managed_envelope
1726            .envelope_mut()
1727            .items_mut()
1728            .filter(|i| i.ty() == &ItemType::Attachment);
1729        processing::utils::attachments::scrub(attachments, ctx.project_info);
1730
1731        Ok(Some(extracted_metrics))
1732    }
1733
1734    /// Processes user and client reports.
1735    async fn process_client_reports(
1736        &self,
1737        managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1738        ctx: processing::Context<'_>,
1739    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1740        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1741
1742        self.enforce_quotas(
1743            managed_envelope,
1744            Annotated::empty(),
1745            &mut extracted_metrics,
1746            ctx,
1747        )
1748        .await?;
1749
1750        report::process_client_reports(
1751            managed_envelope,
1752            ctx.config,
1753            ctx.project_info,
1754            self.inner.addrs.outcome_aggregator.clone(),
1755        );
1756
1757        Ok(Some(extracted_metrics))
1758    }
1759
1760    /// Processes replays.
1761    async fn process_replays(
1762        &self,
1763        managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1764        ctx: processing::Context<'_>,
1765    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1766        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1767
1768        replay::process(
1769            managed_envelope,
1770            ctx.global_config,
1771            ctx.config,
1772            ctx.project_info,
1773            &self.inner.geoip_lookup,
1774        )?;
1775
1776        self.enforce_quotas(
1777            managed_envelope,
1778            Annotated::empty(),
1779            &mut extracted_metrics,
1780            ctx,
1781        )
1782        .await?;
1783
1784        Ok(Some(extracted_metrics))
1785    }
1786
1787    async fn process_nel(
1788        &self,
1789        mut managed_envelope: ManagedEnvelope,
1790        ctx: processing::Context<'_>,
1791    ) -> Result<ProcessingResult, ProcessingError> {
1792        nel::convert_to_logs(&mut managed_envelope);
1793        self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1794            .await
1795    }
1796
1797    async fn process_with_processor<P: processing::Processor>(
1798        &self,
1799        processor: &P,
1800        mut managed_envelope: ManagedEnvelope,
1801        ctx: processing::Context<'_>,
1802    ) -> Result<ProcessingResult, ProcessingError>
1803    where
1804        Outputs: From<P::Output>,
1805    {
1806        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1807            debug_assert!(
1808                false,
1809                "there must be work for the {} processor",
1810                std::any::type_name::<P>(),
1811            );
1812            return Err(ProcessingError::ProcessingGroupMismatch);
1813        };
1814
1815        managed_envelope.update();
1816        match managed_envelope.envelope().is_empty() {
1817            true => managed_envelope.accept(),
1818            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1819        }
1820
1821        processor
1822            .process(work, ctx)
1823            .await
1824            .map_err(|err| {
1825                relay_log::debug!(
1826                    error = &err as &dyn std::error::Error,
1827                    "processing pipeline failed"
1828                );
1829                ProcessingError::ProcessingFailure
1830            })
1831            .map(|o| o.map(Into::into))
1832            .map(ProcessingResult::Output)
1833    }
1834
1835    /// Processes standalone spans.
1836    ///
1837    /// This function does *not* run for spans extracted from transactions.
1838    async fn process_standalone_spans(
1839        &self,
1840        managed_envelope: &mut TypedEnvelope<SpanGroup>,
1841        _project_id: ProjectId,
1842        ctx: processing::Context<'_>,
1843    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1844        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1845
1846        span::filter(managed_envelope, ctx.config, ctx.project_info);
1847        span::convert_otel_traces_data(managed_envelope);
1848
1849        if_processing!(self.inner.config, {
1850            span::process(
1851                managed_envelope,
1852                &mut Annotated::empty(),
1853                &mut extracted_metrics,
1854                _project_id,
1855                ctx,
1856                &self.inner.geoip_lookup,
1857            )
1858            .await;
1859        });
1860
1861        self.enforce_quotas(
1862            managed_envelope,
1863            Annotated::empty(),
1864            &mut extracted_metrics,
1865            ctx,
1866        )
1867        .await?;
1868
1869        Ok(Some(extracted_metrics))
1870    }
1871
1872    async fn process_envelope(
1873        &self,
1874        cogs: &mut Token,
1875        project_id: ProjectId,
1876        message: ProcessEnvelopeGrouped<'_>,
1877    ) -> Result<ProcessingResult, ProcessingError> {
1878        let ProcessEnvelopeGrouped {
1879            group,
1880            envelope: mut managed_envelope,
1881            ctx,
1882        } = message;
1883
1884        // Pre-process the envelope headers.
1885        if let Some(sampling_state) = ctx.sampling_project_info {
1886            // Both transactions and standalone span envelopes need a normalized DSC header
1887            // to make sampling rules based on the segment/transaction name work correctly.
1888            managed_envelope
1889                .envelope_mut()
1890                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1891        }
1892
1893        // Set the event retention. Effectively, this value will only be available in processing
1894        // mode when the full project config is queried from the upstream.
1895        if let Some(retention) = ctx.project_info.config.event_retention {
1896            managed_envelope.envelope_mut().set_retention(retention);
1897        }
1898
1899        // Set the event retention. Effectively, this value will only be available in processing
1900        // mode when the full project config is queried from the upstream.
1901        if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1902            managed_envelope
1903                .envelope_mut()
1904                .set_downsampled_retention(retention);
1905        }
1906
1907        // Ensure the project ID is updated to the stored instance for this project cache. This can
1908        // differ in two cases:
1909        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
1910        //  2. The DSN was moved and the envelope sent to the old project ID.
1911        managed_envelope
1912            .envelope_mut()
1913            .meta_mut()
1914            .set_project_id(project_id);
1915
1916        macro_rules! run {
1917            ($fn_name:ident $(, $args:expr)*) => {
1918                async {
1919                    let mut managed_envelope = (managed_envelope, group).try_into()?;
1920                    match self.$fn_name(&mut managed_envelope, $($args),*).await {
1921                        Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1922                            managed_envelope: managed_envelope.into_processed(),
1923                            extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1924                        }),
1925                        Err(error) => {
1926                            relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1927                            if let Some(outcome) = error.to_outcome() {
1928                                managed_envelope.reject(outcome);
1929                            }
1930
1931                            return Err(error);
1932                        }
1933                    }
1934                }.await
1935            };
1936        }
1937
1938        relay_log::trace!("Processing {group} group", group = group.variant());
1939
1940        match group {
1941            ProcessingGroup::Error => run!(process_errors, project_id, ctx),
1942            ProcessingGroup::Transaction => {
1943                run!(process_transactions, cogs, project_id, ctx)
1944            }
1945            ProcessingGroup::Session => {
1946                self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1947                    .await
1948            }
1949            ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx),
1950            ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
1951            ProcessingGroup::Replay => {
1952                run!(process_replays, ctx)
1953            }
1954            ProcessingGroup::CheckIn => {
1955                self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1956                    .await
1957            }
1958            ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1959            ProcessingGroup::Log => {
1960                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1961                    .await
1962            }
1963            ProcessingGroup::TraceMetric => {
1964                self.process_with_processor(
1965                    &self.inner.processing.trace_metrics,
1966                    managed_envelope,
1967                    ctx,
1968                )
1969                .await
1970            }
1971            ProcessingGroup::SpanV2 => {
1972                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1973                    .await
1974            }
1975            ProcessingGroup::TraceAttachment => {
1976                self.process_with_processor(
1977                    &self.inner.processing.trace_attachments,
1978                    managed_envelope,
1979                    ctx,
1980                )
1981                .await
1982            }
1983            ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1984            ProcessingGroup::ProfileChunk => {
1985                self.process_with_processor(
1986                    &self.inner.processing.profile_chunks,
1987                    managed_envelope,
1988                    ctx,
1989                )
1990                .await
1991            }
1992            // Currently is not used.
1993            ProcessingGroup::Metrics => {
1994                // In proxy mode we simply forward the metrics.
1995                // This group shouldn't be used outside of proxy mode.
1996                if self.inner.config.relay_mode() != RelayMode::Proxy {
1997                    relay_log::error!(
1998                        tags.project = %project_id,
1999                        items = ?managed_envelope.envelope().items().next().map(Item::ty),
2000                        "received metrics in the process_state"
2001                    );
2002                }
2003
2004                Ok(ProcessingResult::no_metrics(
2005                    managed_envelope.into_processed(),
2006                ))
2007            }
2008            // Fallback to the legacy process_state implementation for Ungrouped events.
2009            ProcessingGroup::Ungrouped => {
2010                relay_log::error!(
2011                    tags.project = %project_id,
2012                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
2013                    "could not identify the processing group based on the envelope's items"
2014                );
2015
2016                Ok(ProcessingResult::no_metrics(
2017                    managed_envelope.into_processed(),
2018                ))
2019            }
2020            // Leave this group unchanged.
2021            //
2022            // This will later be forwarded to upstream.
2023            ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
2024                managed_envelope.into_processed(),
2025            )),
2026        }
2027    }
2028
2029    /// Processes the envelope and returns the processed envelope back.
2030    ///
2031    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
2032    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
2033    /// to be dropped, this is `None`.
2034    async fn process<'a>(
2035        &self,
2036        cogs: &mut Token,
2037        mut message: ProcessEnvelopeGrouped<'a>,
2038    ) -> Result<Option<Submit<'a>>, ProcessingError> {
2039        let ProcessEnvelopeGrouped {
2040            ref mut envelope,
2041            ctx,
2042            ..
2043        } = message;
2044
2045        // Prefer the project's project ID, and fall back to the stated project id from the
2046        // envelope. The project ID is available in all modes, other than in proxy mode, where
2047        // envelopes for unknown projects are forwarded blindly.
2048        //
2049        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
2050        // since we cannot process an envelope without project ID, so drop it.
2051        let Some(project_id) = ctx
2052            .project_info
2053            .project_id
2054            .or_else(|| envelope.envelope().meta().project_id())
2055        else {
2056            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2057            return Err(ProcessingError::MissingProjectId);
2058        };
2059
2060        let client = envelope.envelope().meta().client().map(str::to_owned);
2061        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2062        let project_key = envelope.envelope().meta().public_key();
2063        // Only allow sending to the sampling key, if we successfully loaded a sampling project
2064        // info relating to it. This filters out unknown/invalid project keys as well as project
2065        // keys from different organizations.
2066        let sampling_key = envelope
2067            .envelope()
2068            .sampling_key()
2069            .filter(|_| ctx.sampling_project_info.is_some());
2070
2071        // We set additional information on the scope, which will be removed after processing the
2072        // envelope.
2073        relay_log::configure_scope(|scope| {
2074            scope.set_tag("project", project_id);
2075            if let Some(client) = client {
2076                scope.set_tag("sdk", client);
2077            }
2078            if let Some(user_agent) = user_agent {
2079                scope.set_extra("user_agent", user_agent.into());
2080            }
2081        });
2082
2083        let result = match self.process_envelope(cogs, project_id, message).await {
2084            Ok(ProcessingResult::Envelope {
2085                mut managed_envelope,
2086                extracted_metrics,
2087            }) => {
2088                // The envelope could be modified or even emptied during processing, which
2089                // requires re-computation of the context.
2090                managed_envelope.update();
2091
2092                let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2093                send_metrics(
2094                    extracted_metrics.metrics,
2095                    project_key,
2096                    sampling_key,
2097                    &self.inner.addrs.aggregator,
2098                );
2099
2100                let envelope_response = if managed_envelope.envelope().is_empty() {
2101                    if !has_metrics {
2102                        // Individual rate limits have already been issued
2103                        managed_envelope.reject(Outcome::RateLimited(None));
2104                    } else {
2105                        managed_envelope.accept();
2106                    }
2107
2108                    None
2109                } else {
2110                    Some(managed_envelope)
2111                };
2112
2113                Ok(envelope_response.map(Submit::Envelope))
2114            }
2115            Ok(ProcessingResult::Output(Output { main, metrics })) => {
2116                if let Some(metrics) = metrics {
2117                    metrics.accept(|metrics| {
2118                        send_metrics(
2119                            metrics,
2120                            project_key,
2121                            sampling_key,
2122                            &self.inner.addrs.aggregator,
2123                        );
2124                    });
2125                }
2126
2127                let ctx = ctx.to_forward();
2128                Ok(main.map(|output| Submit::Output { output, ctx }))
2129            }
2130            Err(err) => Err(err),
2131        };
2132
2133        relay_log::configure_scope(|scope| {
2134            scope.remove_tag("project");
2135            scope.remove_tag("sdk");
2136            scope.remove_tag("user_agent");
2137        });
2138
2139        result
2140    }
2141
2142    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2143        let project_key = message.envelope.envelope().meta().public_key();
2144        let wait_time = message.envelope.age();
2145        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2146
2147        // This COGS handling may need an overhaul in the future:
2148        // Cancel the passed in token, to start individual measurements per envelope instead.
2149        cogs.cancel();
2150
2151        let scoping = message.envelope.scoping();
2152        for (group, envelope) in ProcessingGroup::split_envelope(
2153            *message.envelope.into_envelope(),
2154            &message.project_info,
2155        ) {
2156            let mut cogs = self
2157                .inner
2158                .cogs
2159                .timed(ResourceId::Relay, AppFeature::from(group));
2160
2161            let mut envelope =
2162                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2163            envelope.scope(scoping);
2164
2165            let global_config = self.inner.global_config.current();
2166
2167            let ctx = processing::Context {
2168                config: &self.inner.config,
2169                global_config: &global_config,
2170                project_info: &message.project_info,
2171                sampling_project_info: message.sampling_project_info.as_deref(),
2172                rate_limits: &message.rate_limits,
2173                reservoir_counters: &message.reservoir_counters,
2174            };
2175
2176            let message = ProcessEnvelopeGrouped {
2177                group,
2178                envelope,
2179                ctx,
2180            };
2181
2182            let result = metric!(
2183                timer(RelayTimers::EnvelopeProcessingTime),
2184                group = group.variant(),
2185                { self.process(&mut cogs, message).await }
2186            );
2187
2188            match result {
2189                Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2190                Ok(None) => {}
2191                Err(error) if error.is_unexpected() => {
2192                    relay_log::error!(
2193                        tags.project_key = %project_key,
2194                        error = &error as &dyn Error,
2195                        "error processing envelope"
2196                    )
2197                }
2198                Err(error) => {
2199                    relay_log::debug!(
2200                        tags.project_key = %project_key,
2201                        error = &error as &dyn Error,
2202                        "error processing envelope"
2203                    )
2204                }
2205            }
2206        }
2207    }
2208
2209    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2210        let ProcessMetrics {
2211            data,
2212            project_key,
2213            received_at,
2214            sent_at,
2215            source,
2216        } = message;
2217
2218        let received_timestamp =
2219            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2220
2221        let mut buckets = data.into_buckets(received_timestamp);
2222        if buckets.is_empty() {
2223            return;
2224        };
2225        cogs.update(relay_metrics::cogs::BySize(&buckets));
2226
2227        let clock_drift_processor =
2228            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2229
2230        buckets.retain_mut(|bucket| {
2231            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2232                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2233                return false;
2234            }
2235
2236            if !self::metrics::is_valid_namespace(bucket) {
2237                return false;
2238            }
2239
2240            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2241
2242            if !matches!(source, BucketSource::Internal) {
2243                bucket.metadata = BucketMetadata::new(received_timestamp);
2244            }
2245
2246            true
2247        });
2248
2249        let project = self.inner.project_cache.get(project_key);
2250
2251        // Best effort check to filter and rate limit buckets, if there is no project state
2252        // available at the current time, we will check again after flushing.
2253        let buckets = match project.state() {
2254            ProjectState::Enabled(project_info) => {
2255                let rate_limits = project.rate_limits().current_limits();
2256                self.check_buckets(project_key, project_info, &rate_limits, buckets)
2257            }
2258            _ => buckets,
2259        };
2260
2261        relay_log::trace!("merging metric buckets into the aggregator");
2262        self.inner
2263            .addrs
2264            .aggregator
2265            .send(MergeBuckets::new(project_key, buckets));
2266    }
2267
2268    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2269        let ProcessBatchedMetrics {
2270            payload,
2271            source,
2272            received_at,
2273            sent_at,
2274        } = message;
2275
2276        #[derive(serde::Deserialize)]
2277        struct Wrapper {
2278            buckets: HashMap<ProjectKey, Vec<Bucket>>,
2279        }
2280
2281        let buckets = match serde_json::from_slice(&payload) {
2282            Ok(Wrapper { buckets }) => buckets,
2283            Err(error) => {
2284                relay_log::debug!(
2285                    error = &error as &dyn Error,
2286                    "failed to parse batched metrics",
2287                );
2288                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2289                return;
2290            }
2291        };
2292
2293        for (project_key, buckets) in buckets {
2294            self.handle_process_metrics(
2295                cogs,
2296                ProcessMetrics {
2297                    data: MetricData::Parsed(buckets),
2298                    project_key,
2299                    source,
2300                    received_at,
2301                    sent_at,
2302                },
2303            )
2304        }
2305    }
2306
2307    fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2308        let _submit = cogs.start_category("submit");
2309
2310        #[cfg(feature = "processing")]
2311        if self.inner.config.processing_enabled()
2312            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2313        {
2314            use crate::processing::StoreHandle;
2315
2316            let upload = self.inner.addrs.upload.as_ref();
2317            match submit {
2318                Submit::Envelope(envelope) => {
2319                    let envelope_has_attachments = envelope
2320                        .envelope()
2321                        .items()
2322                        .any(|item| *item.ty() == ItemType::Attachment);
2323                    // Whether Relay will store this attachment in objectstore or use kafka like before.
2324                    let use_objectstore = || {
2325                        let options = &self.inner.global_config.current().options;
2326                        utils::sample(options.objectstore_attachments_sample_rate).is_keep()
2327                    };
2328
2329                    if let Some(upload) = &self.inner.addrs.upload
2330                        && envelope_has_attachments
2331                        && use_objectstore()
2332                    {
2333                        // the `UploadService` will upload all attachments, and then forward the envelope to the `StoreService`.
2334                        upload.send(StoreEnvelope { envelope })
2335                    } else {
2336                        store_forwarder.send(StoreEnvelope { envelope })
2337                    }
2338                }
2339                Submit::Output { output, ctx } => output
2340                    .forward_store(StoreHandle::new(store_forwarder, upload), ctx)
2341                    .unwrap_or_else(|err| err.into_inner()),
2342            }
2343            return;
2344        }
2345
2346        let mut envelope = match submit {
2347            Submit::Envelope(envelope) => envelope,
2348            Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2349                Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2350                Err(_) => {
2351                    relay_log::error!("failed to serialize output to an envelope");
2352                    return;
2353                }
2354            },
2355        };
2356
2357        if envelope.envelope_mut().is_empty() {
2358            envelope.accept();
2359            return;
2360        }
2361
2362        // Override the `sent_at` timestamp. Since the envelope went through basic
2363        // normalization, all timestamps have been corrected. We propagate the new
2364        // `sent_at` to allow the next Relay to double-check this timestamp and
2365        // potentially apply correction again. This is done as close to sending as
2366        // possible so that we avoid internal delays.
2367        envelope.envelope_mut().set_sent_at(Utc::now());
2368
2369        relay_log::trace!("sending envelope to sentry endpoint");
2370        let http_encoding = self.inner.config.http_encoding();
2371        let result = envelope.envelope().to_vec().and_then(|v| {
2372            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2373        });
2374
2375        match result {
2376            Ok(body) => {
2377                self.inner
2378                    .addrs
2379                    .upstream_relay
2380                    .send(SendRequest(SendEnvelope {
2381                        envelope,
2382                        body,
2383                        http_encoding,
2384                        project_cache: self.inner.project_cache.clone(),
2385                    }));
2386            }
2387            Err(error) => {
2388                // Errors are only logged for what we consider an internal discard reason. These
2389                // indicate errors in the infrastructure or implementation bugs.
2390                relay_log::error!(
2391                    error = &error as &dyn Error,
2392                    tags.project_key = %envelope.scoping().project_key,
2393                    "failed to serialize envelope payload"
2394                );
2395
2396                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2397            }
2398        }
2399    }
2400
2401    fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2402        let SubmitClientReports {
2403            client_reports,
2404            scoping,
2405        } = message;
2406
2407        let upstream = self.inner.config.upstream_descriptor();
2408        let dsn = PartialDsn::outbound(&scoping, upstream);
2409
2410        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2411        for client_report in client_reports {
2412            match client_report.serialize() {
2413                Ok(payload) => {
2414                    let mut item = Item::new(ItemType::ClientReport);
2415                    item.set_payload(ContentType::Json, payload);
2416                    envelope.add_item(item);
2417                }
2418                Err(error) => {
2419                    relay_log::error!(
2420                        error = &error as &dyn std::error::Error,
2421                        "failed to serialize client report"
2422                    );
2423                }
2424            }
2425        }
2426
2427        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2428        self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2429    }
2430
2431    fn check_buckets(
2432        &self,
2433        project_key: ProjectKey,
2434        project_info: &ProjectInfo,
2435        rate_limits: &RateLimits,
2436        buckets: Vec<Bucket>,
2437    ) -> Vec<Bucket> {
2438        let Some(scoping) = project_info.scoping(project_key) else {
2439            relay_log::error!(
2440                tags.project_key = project_key.as_str(),
2441                "there is no scoping: dropping {} buckets",
2442                buckets.len(),
2443            );
2444            return Vec::new();
2445        };
2446
2447        let mut buckets = self::metrics::apply_project_info(
2448            buckets,
2449            &self.inner.metric_outcomes,
2450            project_info,
2451            scoping,
2452        );
2453
2454        let namespaces: BTreeSet<MetricNamespace> = buckets
2455            .iter()
2456            .filter_map(|bucket| bucket.name.try_namespace())
2457            .collect();
2458
2459        for namespace in namespaces {
2460            let limits = rate_limits.check_with_quotas(
2461                project_info.get_quotas(),
2462                scoping.item(DataCategory::MetricBucket),
2463            );
2464
2465            if limits.is_limited() {
2466                let rejected;
2467                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2468                    bucket.name.try_namespace() == Some(namespace)
2469                });
2470
2471                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2472                self.inner.metric_outcomes.track(
2473                    scoping,
2474                    &rejected,
2475                    Outcome::RateLimited(reason_code),
2476                );
2477            }
2478        }
2479
2480        let quotas = project_info.config.quotas.clone();
2481        match MetricsLimiter::create(buckets, quotas, scoping) {
2482            Ok(mut bucket_limiter) => {
2483                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2484                bucket_limiter.into_buckets()
2485            }
2486            Err(buckets) => buckets,
2487        }
2488    }
2489
2490    #[cfg(feature = "processing")]
2491    async fn rate_limit_buckets(
2492        &self,
2493        scoping: Scoping,
2494        project_info: &ProjectInfo,
2495        mut buckets: Vec<Bucket>,
2496    ) -> Vec<Bucket> {
2497        let Some(rate_limiter) = &self.inner.rate_limiter else {
2498            return buckets;
2499        };
2500
2501        let global_config = self.inner.global_config.current();
2502        let namespaces = buckets
2503            .iter()
2504            .filter_map(|bucket| bucket.name.try_namespace())
2505            .counts();
2506
2507        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2508
2509        for (namespace, quantity) in namespaces {
2510            let item_scoping = scoping.metric_bucket(namespace);
2511
2512            let limits = match rate_limiter
2513                .is_rate_limited(quotas, item_scoping, quantity, false)
2514                .await
2515            {
2516                Ok(limits) => limits,
2517                Err(err) => {
2518                    relay_log::error!(
2519                        error = &err as &dyn std::error::Error,
2520                        "failed to check redis rate limits"
2521                    );
2522                    break;
2523                }
2524            };
2525
2526            if limits.is_limited() {
2527                let rejected;
2528                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2529                    bucket.name.try_namespace() == Some(namespace)
2530                });
2531
2532                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2533                self.inner.metric_outcomes.track(
2534                    scoping,
2535                    &rejected,
2536                    Outcome::RateLimited(reason_code),
2537                );
2538
2539                self.inner
2540                    .project_cache
2541                    .get(item_scoping.scoping.project_key)
2542                    .rate_limits()
2543                    .merge(limits);
2544            }
2545        }
2546
2547        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2548            Err(buckets) => buckets,
2549            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2550        }
2551    }
2552
2553    /// Check and apply rate limits to metrics buckets for transactions and spans.
2554    #[cfg(feature = "processing")]
2555    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2556        relay_log::trace!("handle_rate_limit_buckets");
2557
2558        let scoping = *bucket_limiter.scoping();
2559
2560        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2561            let global_config = self.inner.global_config.current();
2562            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2563
2564            // We set over_accept_once such that the limit is actually reached, which allows subsequent
2565            // calls with quantity=0 to be rate limited.
2566            let over_accept_once = true;
2567            let mut rate_limits = RateLimits::new();
2568
2569            for category in [DataCategory::Transaction, DataCategory::Span] {
2570                let count = bucket_limiter.count(category);
2571
2572                let timer = Instant::now();
2573                let mut is_limited = false;
2574
2575                if let Some(count) = count {
2576                    match rate_limiter
2577                        .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2578                        .await
2579                    {
2580                        Ok(limits) => {
2581                            is_limited = limits.is_limited();
2582                            rate_limits.merge(limits)
2583                        }
2584                        Err(e) => relay_log::error!(error = &e as &dyn Error),
2585                    }
2586                }
2587
2588                relay_statsd::metric!(
2589                    timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2590                    category = category.name(),
2591                    limited = if is_limited { "true" } else { "false" },
2592                    count = match count {
2593                        None => "none",
2594                        Some(0) => "0",
2595                        Some(1) => "1",
2596                        Some(1..=10) => "10",
2597                        Some(1..=25) => "25",
2598                        Some(1..=50) => "50",
2599                        Some(51..=100) => "100",
2600                        Some(101..=500) => "500",
2601                        _ => "> 500",
2602                    },
2603                );
2604            }
2605
2606            if rate_limits.is_limited() {
2607                let was_enforced =
2608                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2609
2610                if was_enforced {
2611                    // Update the rate limits in the project cache.
2612                    self.inner
2613                        .project_cache
2614                        .get(scoping.project_key)
2615                        .rate_limits()
2616                        .merge(rate_limits);
2617                }
2618            }
2619        }
2620
2621        bucket_limiter.into_buckets()
2622    }
2623
2624    /// Cardinality limits the passed buckets and returns a filtered vector of only accepted buckets.
2625    #[cfg(feature = "processing")]
2626    async fn cardinality_limit_buckets(
2627        &self,
2628        scoping: Scoping,
2629        limits: &[CardinalityLimit],
2630        buckets: Vec<Bucket>,
2631    ) -> Vec<Bucket> {
2632        let global_config = self.inner.global_config.current();
2633        let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2634
2635        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2636            return buckets;
2637        }
2638
2639        let Some(ref limiter) = self.inner.cardinality_limiter else {
2640            return buckets;
2641        };
2642
2643        let scope = relay_cardinality::Scoping {
2644            organization_id: scoping.organization_id,
2645            project_id: scoping.project_id,
2646        };
2647
2648        let limits = match limiter
2649            .check_cardinality_limits(scope, limits, buckets)
2650            .await
2651        {
2652            Ok(limits) => limits,
2653            Err((buckets, error)) => {
2654                relay_log::error!(
2655                    error = &error as &dyn std::error::Error,
2656                    "cardinality limiter failed"
2657                );
2658                return buckets;
2659            }
2660        };
2661
2662        let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2663        if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2664            for limit in limits.exceeded_limits() {
2665                relay_log::with_scope(
2666                    |scope| {
2667                        // Set the organization as user so we can alert on distinct org_ids.
2668                        scope.set_user(Some(relay_log::sentry::User {
2669                            id: Some(scoping.organization_id.to_string()),
2670                            ..Default::default()
2671                        }));
2672                    },
2673                    || {
2674                        relay_log::error!(
2675                            tags.organization_id = scoping.organization_id.value(),
2676                            tags.limit_id = limit.id,
2677                            tags.passive = limit.passive,
2678                            "Cardinality Limit"
2679                        );
2680                    },
2681                );
2682            }
2683        }
2684
2685        for (limit, reports) in limits.cardinality_reports() {
2686            for report in reports {
2687                self.inner
2688                    .metric_outcomes
2689                    .cardinality(scoping, limit, report);
2690            }
2691        }
2692
2693        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2694            return limits.into_source();
2695        }
2696
2697        let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2698
2699        for (bucket, exceeded) in rejected {
2700            self.inner.metric_outcomes.track(
2701                scoping,
2702                &[bucket],
2703                Outcome::CardinalityLimited(exceeded.id.clone()),
2704            );
2705        }
2706        accepted
2707    }
2708
2709    /// Processes metric buckets and sends them to kafka.
2710    ///
2711    /// This function runs the following steps:
2712    ///  - cardinality limiting
2713    ///  - rate limiting
2714    ///  - submit to `StoreForwarder`
2715    #[cfg(feature = "processing")]
2716    async fn encode_metrics_processing(
2717        &self,
2718        message: FlushBuckets,
2719        store_forwarder: &Addr<Store>,
2720    ) {
2721        use crate::constants::DEFAULT_EVENT_RETENTION;
2722        use crate::services::store::StoreMetrics;
2723
2724        for ProjectBuckets {
2725            buckets,
2726            scoping,
2727            project_info,
2728            ..
2729        } in message.buckets.into_values()
2730        {
2731            let buckets = self
2732                .rate_limit_buckets(scoping, &project_info, buckets)
2733                .await;
2734
2735            let limits = project_info.get_cardinality_limits();
2736            let buckets = self
2737                .cardinality_limit_buckets(scoping, limits, buckets)
2738                .await;
2739
2740            if buckets.is_empty() {
2741                continue;
2742            }
2743
2744            let retention = project_info
2745                .config
2746                .event_retention
2747                .unwrap_or(DEFAULT_EVENT_RETENTION);
2748
2749            // The store forwarder takes care of bucket splitting internally, so we can submit the
2750            // entire list of buckets. There is no batching needed here.
2751            store_forwarder.send(StoreMetrics {
2752                buckets,
2753                scoping,
2754                retention,
2755            });
2756        }
2757    }
2758
2759    /// Serializes metric buckets to JSON and sends them to the upstream.
2760    ///
2761    /// This function runs the following steps:
2762    ///  - partitioning
2763    ///  - batching by configured size limit
2764    ///  - serialize to JSON and pack in an envelope
2765    ///  - submit the envelope to upstream or kafka depending on configuration
2766    ///
2767    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2768    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2769    /// already.
2770    fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2771        let FlushBuckets {
2772            partition_key,
2773            buckets,
2774        } = message;
2775
2776        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2777        let upstream = self.inner.config.upstream_descriptor();
2778
2779        for ProjectBuckets {
2780            buckets, scoping, ..
2781        } in buckets.values()
2782        {
2783            let dsn = PartialDsn::outbound(scoping, upstream);
2784
2785            relay_statsd::metric!(
2786                distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2787            );
2788
2789            let mut num_batches = 0;
2790            for batch in BucketsView::from(buckets).by_size(batch_size) {
2791                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2792
2793                let mut item = Item::new(ItemType::MetricBuckets);
2794                item.set_source_quantities(crate::metrics::extract_quantities(batch));
2795                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2796                envelope.add_item(item);
2797
2798                let mut envelope =
2799                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2800                envelope
2801                    .set_partition_key(Some(partition_key))
2802                    .scope(*scoping);
2803
2804                relay_statsd::metric!(
2805                    distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2806                );
2807
2808                self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2809                num_batches += 1;
2810            }
2811
2812            relay_statsd::metric!(
2813                distribution(RelayDistributions::BatchesPerPartition) = num_batches
2814            );
2815        }
2816    }
2817
2818    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
2819    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2820        if partition.is_empty() {
2821            return;
2822        }
2823
2824        let (unencoded, project_info) = partition.take();
2825        let http_encoding = self.inner.config.http_encoding();
2826        let encoded = match encode_payload(&unencoded, http_encoding) {
2827            Ok(payload) => payload,
2828            Err(error) => {
2829                let error = &error as &dyn std::error::Error;
2830                relay_log::error!(error, "failed to encode metrics payload");
2831                return;
2832            }
2833        };
2834
2835        let request = SendMetricsRequest {
2836            partition_key: partition_key.to_string(),
2837            unencoded,
2838            encoded,
2839            project_info,
2840            http_encoding,
2841            metric_outcomes: self.inner.metric_outcomes.clone(),
2842        };
2843
2844        self.inner.addrs.upstream_relay.send(SendRequest(request));
2845    }
2846
2847    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
2848    ///
2849    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
2850    /// payload directly instead of per-project Envelopes.
2851    ///
2852    /// This function runs the following steps:
2853    ///  - partitioning
2854    ///  - batching by configured size limit
2855    ///  - serialize to JSON
2856    ///  - submit directly to the upstream
2857    ///
2858    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2859    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2860    /// already.
2861    fn encode_metrics_global(&self, message: FlushBuckets) {
2862        let FlushBuckets {
2863            partition_key,
2864            buckets,
2865        } = message;
2866
2867        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2868        let mut partition = Partition::new(batch_size);
2869        let mut partition_splits = 0;
2870
2871        for ProjectBuckets {
2872            buckets, scoping, ..
2873        } in buckets.values()
2874        {
2875            for bucket in buckets {
2876                let mut remaining = Some(BucketView::new(bucket));
2877
2878                while let Some(bucket) = remaining.take() {
2879                    if let Some(next) = partition.insert(bucket, *scoping) {
2880                        // A part of the bucket could not be inserted. Take the partition and submit
2881                        // it immediately. Repeat until the final part was inserted. This should
2882                        // always result in a request, otherwise we would enter an endless loop.
2883                        self.send_global_partition(partition_key, &mut partition);
2884                        remaining = Some(next);
2885                        partition_splits += 1;
2886                    }
2887                }
2888            }
2889        }
2890
2891        if partition_splits > 0 {
2892            metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2893        }
2894
2895        self.send_global_partition(partition_key, &mut partition);
2896    }
2897
2898    async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2899        for (project_key, pb) in message.buckets.iter_mut() {
2900            let buckets = std::mem::take(&mut pb.buckets);
2901            pb.buckets =
2902                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2903        }
2904
2905        #[cfg(feature = "processing")]
2906        if self.inner.config.processing_enabled()
2907            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2908        {
2909            return self
2910                .encode_metrics_processing(message, store_forwarder)
2911                .await;
2912        }
2913
2914        if self.inner.config.http_global_metrics() {
2915            self.encode_metrics_global(message)
2916        } else {
2917            self.encode_metrics_envelope(cogs, message)
2918        }
2919    }
2920
2921    #[cfg(all(test, feature = "processing"))]
2922    fn redis_rate_limiter_enabled(&self) -> bool {
2923        self.inner.rate_limiter.is_some()
2924    }
2925
2926    async fn handle_message(&self, message: EnvelopeProcessor) {
2927        let ty = message.variant();
2928        let feature_weights = self.feature_weights(&message);
2929
2930        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2931            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2932
2933            match message {
2934                EnvelopeProcessor::ProcessEnvelope(m) => {
2935                    self.handle_process_envelope(&mut cogs, *m).await
2936                }
2937                EnvelopeProcessor::ProcessProjectMetrics(m) => {
2938                    self.handle_process_metrics(&mut cogs, *m)
2939                }
2940                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2941                    self.handle_process_batched_metrics(&mut cogs, *m)
2942                }
2943                EnvelopeProcessor::FlushBuckets(m) => {
2944                    self.handle_flush_buckets(&mut cogs, *m).await
2945                }
2946                EnvelopeProcessor::SubmitClientReports(m) => {
2947                    self.handle_submit_client_reports(&mut cogs, *m)
2948                }
2949            }
2950        });
2951    }
2952
2953    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2954        match message {
2955            // Envelope is split later and tokens are attributed then.
2956            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2957            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2958            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2959            EnvelopeProcessor::FlushBuckets(v) => v
2960                .buckets
2961                .values()
2962                .map(|s| {
2963                    if self.inner.config.processing_enabled() {
2964                        // Processing does not encode the metrics but instead rate and cardinality
2965                        // limits the metrics, which scales by count and not size.
2966                        relay_metrics::cogs::ByCount(&s.buckets).into()
2967                    } else {
2968                        relay_metrics::cogs::BySize(&s.buckets).into()
2969                    }
2970                })
2971                .fold(FeatureWeights::none(), FeatureWeights::merge),
2972            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2973        }
2974    }
2975}
2976
2977impl Service for EnvelopeProcessorService {
2978    type Interface = EnvelopeProcessor;
2979
2980    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2981        while let Some(message) = rx.recv().await {
2982            let service = self.clone();
2983            self.inner
2984                .pool
2985                .spawn_async(
2986                    async move {
2987                        service.handle_message(message).await;
2988                    }
2989                    .boxed(),
2990                )
2991                .await;
2992        }
2993    }
2994}
2995
2996/// Result of the enforcement of rate limiting.
2997///
2998/// If the event is already `None` or it's rate limited, it will be `None`
2999/// within the [`Annotated`].
3000struct EnforcementResult {
3001    event: Annotated<Event>,
3002    #[cfg_attr(not(feature = "processing"), expect(dead_code))]
3003    rate_limits: RateLimits,
3004}
3005
3006impl EnforcementResult {
3007    /// Creates a new [`EnforcementResult`].
3008    pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
3009        Self { event, rate_limits }
3010    }
3011}
3012
3013#[derive(Clone)]
3014enum RateLimiter {
3015    Cached,
3016    #[cfg(feature = "processing")]
3017    Consistent(Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>),
3018}
3019
3020impl RateLimiter {
3021    async fn enforce<Group>(
3022        &self,
3023        managed_envelope: &mut TypedEnvelope<Group>,
3024        event: Annotated<Event>,
3025        _extracted_metrics: &mut ProcessingExtractedMetrics,
3026        ctx: processing::Context<'_>,
3027    ) -> Result<EnforcementResult, ProcessingError> {
3028        if managed_envelope.envelope().is_empty() && event.value().is_none() {
3029            return Ok(EnforcementResult::new(event, RateLimits::default()));
3030        }
3031
3032        let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
3033        if quotas.is_empty() {
3034            return Ok(EnforcementResult::new(event, RateLimits::default()));
3035        }
3036
3037        let event_category = event_category(&event);
3038
3039        // We extract the rate limiters, in case we perform consistent rate limiting, since we will
3040        // need Redis access.
3041        //
3042        // When invoking the rate limiter, capture if the event item has been rate limited to also
3043        // remove it from the processing state eventually.
3044        let this = self.clone();
3045        let mut envelope_limiter =
3046            EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
3047                let this = this.clone();
3048
3049                async move {
3050                    match this {
3051                        #[cfg(feature = "processing")]
3052                        RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
3053                            rate_limiter
3054                                .is_rate_limited(quotas, item_scope, _quantity, false)
3055                                .await?,
3056                        ),
3057                        _ => Ok::<_, ProcessingError>(
3058                            ctx.rate_limits.check_with_quotas(quotas, item_scope),
3059                        ),
3060                    }
3061                }
3062            });
3063
3064        // Tell the envelope limiter about the event, since it has been removed from the Envelope at
3065        // this stage in processing.
3066        if let Some(category) = event_category {
3067            envelope_limiter.assume_event(category);
3068        }
3069
3070        let scoping = managed_envelope.scoping();
3071        let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
3072            envelope_limiter
3073                .compute(managed_envelope.envelope_mut(), &scoping)
3074                .await
3075        })?;
3076        let event_active = enforcement.is_event_active();
3077
3078        // Use the same rate limits as used for the envelope on the metrics.
3079        // Those rate limits should not be checked for expiry or similar to ensure a consistent
3080        // limiting of envelope items and metrics.
3081        #[cfg(feature = "processing")]
3082        _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3083        enforcement.apply_with_outcomes(managed_envelope);
3084
3085        if event_active {
3086            debug_assert!(managed_envelope.envelope().is_empty());
3087            return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3088        }
3089
3090        Ok(EnforcementResult::new(event, rate_limits))
3091    }
3092
3093    fn name(&self) -> &'static str {
3094        match self {
3095            Self::Cached => "cached",
3096            #[cfg(feature = "processing")]
3097            Self::Consistent(_) => "consistent",
3098        }
3099    }
3100}
3101
3102pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3103    let envelope_body: Vec<u8> = match http_encoding {
3104        HttpEncoding::Identity => return Ok(body.clone()),
3105        HttpEncoding::Deflate => {
3106            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3107            encoder.write_all(body.as_ref())?;
3108            encoder.finish()?
3109        }
3110        HttpEncoding::Gzip => {
3111            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3112            encoder.write_all(body.as_ref())?;
3113            encoder.finish()?
3114        }
3115        HttpEncoding::Br => {
3116            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
3117            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3118            encoder.write_all(body.as_ref())?;
3119            encoder.into_inner()
3120        }
3121        HttpEncoding::Zstd => {
3122            // Use the fastest compression level, our main objective here is to get the best
3123            // compression ratio for least amount of time spent.
3124            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3125            encoder.write_all(body.as_ref())?;
3126            encoder.finish()?
3127        }
3128    };
3129
3130    Ok(envelope_body.into())
3131}
3132
3133/// An upstream request that submits an envelope via HTTP.
3134#[derive(Debug)]
3135pub struct SendEnvelope {
3136    pub envelope: TypedEnvelope<Processed>,
3137    pub body: Bytes,
3138    pub http_encoding: HttpEncoding,
3139    pub project_cache: ProjectCacheHandle,
3140}
3141
3142impl UpstreamRequest for SendEnvelope {
3143    fn method(&self) -> reqwest::Method {
3144        reqwest::Method::POST
3145    }
3146
3147    fn path(&self) -> Cow<'_, str> {
3148        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3149    }
3150
3151    fn route(&self) -> &'static str {
3152        "envelope"
3153    }
3154
3155    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3156        let envelope_body = self.body.clone();
3157        metric!(
3158            distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
3159        );
3160
3161        let meta = &self.envelope.meta();
3162        let shard = self.envelope.partition_key().map(|p| p.to_string());
3163        builder
3164            .content_encoding(self.http_encoding)
3165            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3166            .header_opt("User-Agent", meta.user_agent())
3167            .header("X-Sentry-Auth", meta.auth_header())
3168            .header("X-Forwarded-For", meta.forwarded_for())
3169            .header("Content-Type", envelope::CONTENT_TYPE)
3170            .header_opt("X-Sentry-Relay-Shard", shard)
3171            .body(envelope_body);
3172
3173        Ok(())
3174    }
3175
3176    fn sign(&mut self) -> Option<Sign> {
3177        Some(Sign::Optional(SignatureType::RequestSign))
3178    }
3179
3180    fn respond(
3181        self: Box<Self>,
3182        result: Result<http::Response, UpstreamRequestError>,
3183    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3184        Box::pin(async move {
3185            let result = match result {
3186                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3187                Err(error) => Err(error),
3188            };
3189
3190            match result {
3191                Ok(()) => self.envelope.accept(),
3192                Err(error) if error.is_received() => {
3193                    let scoping = self.envelope.scoping();
3194                    self.envelope.accept();
3195
3196                    if let UpstreamRequestError::RateLimited(limits) = error {
3197                        self.project_cache
3198                            .get(scoping.project_key)
3199                            .rate_limits()
3200                            .merge(limits.scope(&scoping));
3201                    }
3202                }
3203                Err(error) => {
3204                    // Errors are only logged for what we consider an internal discard reason. These
3205                    // indicate errors in the infrastructure or implementation bugs.
3206                    let mut envelope = self.envelope;
3207                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3208                    relay_log::error!(
3209                        error = &error as &dyn Error,
3210                        tags.project_key = %envelope.scoping().project_key,
3211                        "error sending envelope"
3212                    );
3213                }
3214            }
3215        })
3216    }
3217}
3218
3219/// A container for metric buckets from multiple projects.
3220///
3221/// This container is used to send metrics to the upstream in global batches as part of the
3222/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
3223/// the size of all metrics and allows to split them into multiple batches. See
3224/// [`insert`](Self::insert) for more information.
3225#[derive(Debug)]
3226struct Partition<'a> {
3227    max_size: usize,
3228    remaining: usize,
3229    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3230    project_info: HashMap<ProjectKey, Scoping>,
3231}
3232
3233impl<'a> Partition<'a> {
3234    /// Creates a new partition with the given maximum size in bytes.
3235    pub fn new(size: usize) -> Self {
3236        Self {
3237            max_size: size,
3238            remaining: size,
3239            views: HashMap::new(),
3240            project_info: HashMap::new(),
3241        }
3242    }
3243
3244    /// Inserts a bucket into the partition, splitting it if necessary.
3245    ///
3246    /// This function attempts to add the bucket to this partition. If the bucket does not fit
3247    /// entirely into the partition given its maximum size, the remaining part of the bucket is
3248    /// returned from this function call.
3249    ///
3250    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
3251    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
3252    /// partition. Afterwards, the caller is responsible to call this function again with the
3253    /// remaining bucket until it is fully inserted.
3254    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3255        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3256
3257        if let Some(current) = current {
3258            self.remaining = self.remaining.saturating_sub(current.estimated_size());
3259            self.views
3260                .entry(scoping.project_key)
3261                .or_default()
3262                .push(current);
3263
3264            self.project_info
3265                .entry(scoping.project_key)
3266                .or_insert(scoping);
3267        }
3268
3269        next
3270    }
3271
3272    /// Returns `true` if the partition does not hold any data.
3273    fn is_empty(&self) -> bool {
3274        self.views.is_empty()
3275    }
3276
3277    /// Returns the serialized buckets for this partition.
3278    ///
3279    /// This empties the partition, so that it can be reused.
3280    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3281        #[derive(serde::Serialize)]
3282        struct Wrapper<'a> {
3283            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3284        }
3285
3286        let buckets = &self.views;
3287        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3288
3289        let scopings = self.project_info.clone();
3290        self.project_info.clear();
3291
3292        self.views.clear();
3293        self.remaining = self.max_size;
3294
3295        (payload, scopings)
3296    }
3297}
3298
3299/// An upstream request that submits metric buckets via HTTP.
3300///
3301/// This request is not awaited. It automatically tracks outcomes if the request is not received.
3302#[derive(Debug)]
3303struct SendMetricsRequest {
3304    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
3305    partition_key: String,
3306    /// Serialized metric buckets without encoding applied, used for signing.
3307    unencoded: Bytes,
3308    /// Serialized metric buckets with the stated HTTP encoding applied.
3309    encoded: Bytes,
3310    /// Mapping of all contained project keys to their scoping and extraction mode.
3311    ///
3312    /// Used to track outcomes for transmission failures.
3313    project_info: HashMap<ProjectKey, Scoping>,
3314    /// Encoding (compression) of the payload.
3315    http_encoding: HttpEncoding,
3316    /// Metric outcomes instance to send outcomes on error.
3317    metric_outcomes: MetricOutcomes,
3318}
3319
3320impl SendMetricsRequest {
3321    fn create_error_outcomes(self) {
3322        #[derive(serde::Deserialize)]
3323        struct Wrapper {
3324            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3325        }
3326
3327        let buckets = match serde_json::from_slice(&self.unencoded) {
3328            Ok(Wrapper { buckets }) => buckets,
3329            Err(err) => {
3330                relay_log::error!(
3331                    error = &err as &dyn std::error::Error,
3332                    "failed to parse buckets from failed transmission"
3333                );
3334                return;
3335            }
3336        };
3337
3338        for (key, buckets) in buckets {
3339            let Some(&scoping) = self.project_info.get(&key) else {
3340                relay_log::error!("missing scoping for project key");
3341                continue;
3342            };
3343
3344            self.metric_outcomes.track(
3345                scoping,
3346                &buckets,
3347                Outcome::Invalid(DiscardReason::Internal),
3348            );
3349        }
3350    }
3351}
3352
3353impl UpstreamRequest for SendMetricsRequest {
3354    fn set_relay_id(&self) -> bool {
3355        true
3356    }
3357
3358    fn sign(&mut self) -> Option<Sign> {
3359        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3360    }
3361
3362    fn method(&self) -> reqwest::Method {
3363        reqwest::Method::POST
3364    }
3365
3366    fn path(&self) -> Cow<'_, str> {
3367        "/api/0/relays/metrics/".into()
3368    }
3369
3370    fn route(&self) -> &'static str {
3371        "global_metrics"
3372    }
3373
3374    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3375        metric!(
3376            distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3377        );
3378
3379        builder
3380            .content_encoding(self.http_encoding)
3381            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3382            .header(header::CONTENT_TYPE, b"application/json")
3383            .body(self.encoded.clone());
3384
3385        Ok(())
3386    }
3387
3388    fn respond(
3389        self: Box<Self>,
3390        result: Result<http::Response, UpstreamRequestError>,
3391    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3392        Box::pin(async {
3393            match result {
3394                Ok(mut response) => {
3395                    response.consume().await.ok();
3396                }
3397                Err(error) => {
3398                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3399
3400                    // If the request did not arrive at the upstream, we are responsible for outcomes.
3401                    // Otherwise, the upstream is responsible to log outcomes.
3402                    if error.is_received() {
3403                        return;
3404                    }
3405
3406                    self.create_error_outcomes()
3407                }
3408            }
3409        })
3410    }
3411}
3412
3413/// Container for global and project level [`Quota`].
3414#[derive(Copy, Clone, Debug)]
3415struct CombinedQuotas<'a> {
3416    global_quotas: &'a [Quota],
3417    project_quotas: &'a [Quota],
3418}
3419
3420impl<'a> CombinedQuotas<'a> {
3421    /// Returns a new [`CombinedQuotas`].
3422    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3423        Self {
3424            global_quotas: &global_config.quotas,
3425            project_quotas,
3426        }
3427    }
3428
3429    /// Returns `true` if both global quotas and project quotas are empty.
3430    pub fn is_empty(&self) -> bool {
3431        self.len() == 0
3432    }
3433
3434    /// Returns the number of both global and project quotas.
3435    pub fn len(&self) -> usize {
3436        self.global_quotas.len() + self.project_quotas.len()
3437    }
3438}
3439
3440impl<'a> IntoIterator for CombinedQuotas<'a> {
3441    type Item = &'a Quota;
3442    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3443
3444    fn into_iter(self) -> Self::IntoIter {
3445        self.global_quotas.iter().chain(self.project_quotas.iter())
3446    }
3447}
3448
3449#[cfg(test)]
3450mod tests {
3451    use std::collections::BTreeMap;
3452
3453    use insta::assert_debug_snapshot;
3454    use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3455    use relay_common::glob2::LazyGlob;
3456    use relay_dynamic_config::ProjectConfig;
3457    use relay_event_normalization::{
3458        MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3459        TransactionNameRule,
3460    };
3461    use relay_event_schema::protocol::TransactionSource;
3462    use relay_pii::DataScrubbingConfig;
3463    use similar_asserts::assert_eq;
3464
3465    use crate::metrics_extraction::IntoMetric;
3466    use crate::metrics_extraction::transactions::types::{
3467        CommonTags, TransactionMeasurementTags, TransactionMetric,
3468    };
3469    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3470
3471    #[cfg(feature = "processing")]
3472    use {
3473        relay_metrics::BucketValue,
3474        relay_quotas::{QuotaScope, ReasonCode},
3475        relay_test::mock_service,
3476    };
3477
3478    use super::*;
3479
3480    #[cfg(feature = "processing")]
3481    fn mock_quota(id: &str) -> Quota {
3482        Quota {
3483            id: Some(id.into()),
3484            categories: [DataCategory::MetricBucket].into(),
3485            scope: QuotaScope::Organization,
3486            scope_id: None,
3487            limit: Some(0),
3488            window: None,
3489            reason_code: None,
3490            namespace: None,
3491        }
3492    }
3493
3494    #[cfg(feature = "processing")]
3495    #[test]
3496    fn test_dynamic_quotas() {
3497        let global_config = GlobalConfig {
3498            quotas: vec![mock_quota("foo"), mock_quota("bar")],
3499            ..Default::default()
3500        };
3501
3502        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3503
3504        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3505
3506        assert_eq!(dynamic_quotas.len(), 4);
3507        assert!(!dynamic_quotas.is_empty());
3508
3509        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3510        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3511    }
3512
3513    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
3514    /// also ratelimit the next batches in the same message automatically.
3515    #[cfg(feature = "processing")]
3516    #[tokio::test]
3517    async fn test_ratelimit_per_batch() {
3518        use relay_base_schema::organization::OrganizationId;
3519        use relay_protocol::FiniteF64;
3520
3521        let rate_limited_org = Scoping {
3522            organization_id: OrganizationId::new(1),
3523            project_id: ProjectId::new(21),
3524            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3525            key_id: Some(17),
3526        };
3527
3528        let not_rate_limited_org = Scoping {
3529            organization_id: OrganizationId::new(2),
3530            project_id: ProjectId::new(21),
3531            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3532            key_id: Some(17),
3533        };
3534
3535        let message = {
3536            let project_info = {
3537                let quota = Quota {
3538                    id: Some("testing".into()),
3539                    categories: [DataCategory::MetricBucket].into(),
3540                    scope: relay_quotas::QuotaScope::Organization,
3541                    scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3542                    limit: Some(0),
3543                    window: None,
3544                    reason_code: Some(ReasonCode::new("test")),
3545                    namespace: None,
3546                };
3547
3548                let mut config = ProjectConfig::default();
3549                config.quotas.push(quota);
3550
3551                Arc::new(ProjectInfo {
3552                    config,
3553                    ..Default::default()
3554                })
3555            };
3556
3557            let project_metrics = |scoping| ProjectBuckets {
3558                buckets: vec![Bucket {
3559                    name: "d:transactions/bar".into(),
3560                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3561                    timestamp: UnixTimestamp::now(),
3562                    tags: Default::default(),
3563                    width: 10,
3564                    metadata: BucketMetadata::default(),
3565                }],
3566                rate_limits: Default::default(),
3567                project_info: project_info.clone(),
3568                scoping,
3569            };
3570
3571            let buckets = hashbrown::HashMap::from([
3572                (
3573                    rate_limited_org.project_key,
3574                    project_metrics(rate_limited_org),
3575                ),
3576                (
3577                    not_rate_limited_org.project_key,
3578                    project_metrics(not_rate_limited_org),
3579                ),
3580            ]);
3581
3582            FlushBuckets {
3583                partition_key: 0,
3584                buckets,
3585            }
3586        };
3587
3588        // ensure the order of the map while iterating is as expected.
3589        assert_eq!(message.buckets.keys().count(), 2);
3590
3591        let config = {
3592            let config_json = serde_json::json!({
3593                "processing": {
3594                    "enabled": true,
3595                    "kafka_config": [],
3596                    "redis": {
3597                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3598                    }
3599                }
3600            });
3601            Config::from_json_value(config_json).unwrap()
3602        };
3603
3604        let (store, handle) = {
3605            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3606                let org_id = match msg {
3607                    Store::Metrics(x) => x.scoping.organization_id,
3608                    _ => panic!("received envelope when expecting only metrics"),
3609                };
3610                org_ids.push(org_id);
3611            };
3612
3613            mock_service("store_forwarder", vec![], f)
3614        };
3615
3616        let processor = create_test_processor(config).await;
3617        assert!(processor.redis_rate_limiter_enabled());
3618
3619        processor.encode_metrics_processing(message, &store).await;
3620
3621        drop(store);
3622        let orgs_not_ratelimited = handle.await.unwrap();
3623
3624        assert_eq!(
3625            orgs_not_ratelimited,
3626            vec![not_rate_limited_org.organization_id]
3627        );
3628    }
3629
3630    #[tokio::test]
3631    async fn test_browser_version_extraction_with_pii_like_data() {
3632        let processor = create_test_processor(Default::default()).await;
3633        let outcome_aggregator = Addr::dummy();
3634        let event_id = EventId::new();
3635
3636        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3637            .parse()
3638            .unwrap();
3639
3640        let request_meta = RequestMeta::new(dsn);
3641        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3642
3643        envelope.add_item({
3644                let mut item = Item::new(ItemType::Event);
3645                item.set_payload(
3646                    ContentType::Json,
3647                    r#"
3648                    {
3649                        "request": {
3650                            "headers": [
3651                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3652                            ]
3653                        }
3654                    }
3655                "#,
3656                );
3657                item
3658            });
3659
3660        let mut datascrubbing_settings = DataScrubbingConfig::default();
3661        // enable all the default scrubbing
3662        datascrubbing_settings.scrub_data = true;
3663        datascrubbing_settings.scrub_defaults = true;
3664        datascrubbing_settings.scrub_ip_addresses = true;
3665
3666        // Make sure to mask any IP-like looking data
3667        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3668
3669        let config = ProjectConfig {
3670            datascrubbing_settings,
3671            pii_config: Some(pii_config),
3672            ..Default::default()
3673        };
3674
3675        let project_info = ProjectInfo {
3676            config,
3677            ..Default::default()
3678        };
3679
3680        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3681        assert_eq!(envelopes.len(), 1);
3682
3683        let (group, envelope) = envelopes.pop().unwrap();
3684        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3685
3686        let message = ProcessEnvelopeGrouped {
3687            group,
3688            envelope,
3689            ctx: processing::Context {
3690                project_info: &project_info,
3691                ..processing::Context::for_test()
3692            },
3693        };
3694
3695        let Ok(Some(Submit::Envelope(mut new_envelope))) =
3696            processor.process(&mut Token::noop(), message).await
3697        else {
3698            panic!();
3699        };
3700        let new_envelope = new_envelope.envelope_mut();
3701
3702        let event_item = new_envelope.items().last().unwrap();
3703        let annotated_event: Annotated<Event> =
3704            Annotated::from_json_bytes(&event_item.payload()).unwrap();
3705        let event = annotated_event.into_value().unwrap();
3706        let headers = event
3707            .request
3708            .into_value()
3709            .unwrap()
3710            .headers
3711            .into_value()
3712            .unwrap();
3713
3714        // IP-like data must be masked
3715        assert_eq!(
3716            Some(
3717                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3718            ),
3719            headers.get_header("User-Agent")
3720        );
3721        // But we still get correct browser and version number
3722        let contexts = event.contexts.into_value().unwrap();
3723        let browser = contexts.0.get("browser").unwrap();
3724        assert_eq!(
3725            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3726            browser.to_json().unwrap()
3727        );
3728    }
3729
3730    #[tokio::test]
3731    #[cfg(feature = "processing")]
3732    async fn test_materialize_dsc() {
3733        use crate::services::projects::project::PublicKeyConfig;
3734
3735        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3736            .parse()
3737            .unwrap();
3738        let request_meta = RequestMeta::new(dsn);
3739        let mut envelope = Envelope::from_request(None, request_meta);
3740
3741        let dsc = r#"{
3742            "trace_id": "00000000-0000-0000-0000-000000000000",
3743            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3744            "sample_rate": "0.2"
3745        }"#;
3746        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3747
3748        let mut item = Item::new(ItemType::Event);
3749        item.set_payload(ContentType::Json, r#"{}"#);
3750        envelope.add_item(item);
3751
3752        let outcome_aggregator = Addr::dummy();
3753        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3754
3755        let mut project_info = ProjectInfo::default();
3756        project_info.public_keys.push(PublicKeyConfig {
3757            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3758            numeric_id: Some(1),
3759        });
3760
3761        let config = serde_json::json!({
3762            "processing": {
3763                "enabled": true,
3764                "kafka_config": [],
3765            }
3766        });
3767
3768        let message = ProcessEnvelopeGrouped {
3769            group: ProcessingGroup::Transaction,
3770            envelope: managed_envelope,
3771            ctx: processing::Context {
3772                config: &Config::from_json_value(config.clone()).unwrap(),
3773                project_info: &project_info,
3774                sampling_project_info: Some(&project_info),
3775                ..processing::Context::for_test()
3776            },
3777        };
3778
3779        let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3780        let Ok(Some(Submit::Envelope(envelope))) =
3781            processor.process(&mut Token::noop(), message).await
3782        else {
3783            panic!();
3784        };
3785        let event = envelope
3786            .envelope()
3787            .get_item_by(|item| item.ty() == &ItemType::Event)
3788            .unwrap();
3789
3790        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3791        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3792        Object(
3793            {
3794                "environment": ~,
3795                "public_key": String(
3796                    "e12d836b15bb49d7bbf99e64295d995b",
3797                ),
3798                "release": ~,
3799                "replay_id": ~,
3800                "sample_rate": String(
3801                    "0.2",
3802                ),
3803                "trace_id": String(
3804                    "00000000000000000000000000000000",
3805                ),
3806                "transaction": ~,
3807            },
3808        )
3809        "###);
3810    }
3811
3812    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3813        let mut event = Annotated::<Event>::from_json(
3814            r#"
3815            {
3816                "type": "transaction",
3817                "transaction": "/foo/",
3818                "timestamp": 946684810.0,
3819                "start_timestamp": 946684800.0,
3820                "contexts": {
3821                    "trace": {
3822                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3823                        "span_id": "fa90fdead5f74053",
3824                        "op": "http.server",
3825                        "type": "trace"
3826                    }
3827                },
3828                "transaction_info": {
3829                    "source": "url"
3830                }
3831            }
3832            "#,
3833        )
3834        .unwrap();
3835        let e = event.value_mut().as_mut().unwrap();
3836        e.transaction.set_value(Some(transaction_name.into()));
3837
3838        e.transaction_info
3839            .value_mut()
3840            .as_mut()
3841            .unwrap()
3842            .source
3843            .set_value(Some(source));
3844
3845        relay_statsd::with_capturing_test_client(|| {
3846            utils::log_transaction_name_metrics(&mut event, |event| {
3847                let config = NormalizationConfig {
3848                    transaction_name_config: TransactionNameConfig {
3849                        rules: &[TransactionNameRule {
3850                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
3851                            expiry: DateTime::<Utc>::MAX_UTC,
3852                            redaction: RedactionRule::Replace {
3853                                substitution: "*".to_owned(),
3854                            },
3855                        }],
3856                    },
3857                    ..Default::default()
3858                };
3859                relay_event_normalization::normalize_event(event, &config)
3860            });
3861        })
3862    }
3863
3864    #[test]
3865    fn test_log_transaction_metrics_none() {
3866        let captures = capture_test_event("/nothing", TransactionSource::Url);
3867        insta::assert_debug_snapshot!(captures, @r###"
3868        [
3869            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3870        ]
3871        "###);
3872    }
3873
3874    #[test]
3875    fn test_log_transaction_metrics_rule() {
3876        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3877        insta::assert_debug_snapshot!(captures, @r###"
3878        [
3879            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3880        ]
3881        "###);
3882    }
3883
3884    #[test]
3885    fn test_log_transaction_metrics_pattern() {
3886        let captures = capture_test_event("/something/12345", TransactionSource::Url);
3887        insta::assert_debug_snapshot!(captures, @r###"
3888        [
3889            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3890        ]
3891        "###);
3892    }
3893
3894    #[test]
3895    fn test_log_transaction_metrics_both() {
3896        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3897        insta::assert_debug_snapshot!(captures, @r###"
3898        [
3899            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3900        ]
3901        "###);
3902    }
3903
3904    #[test]
3905    fn test_log_transaction_metrics_no_match() {
3906        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3907        insta::assert_debug_snapshot!(captures, @r###"
3908        [
3909            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3910        ]
3911        "###);
3912    }
3913
3914    /// Confirms that the hardcoded value we use for the fixed length of the measurement MRI is
3915    /// correct. Unit test is placed here because it has dependencies to relay-server and therefore
3916    /// cannot be called from relay-metrics.
3917    #[test]
3918    fn test_mri_overhead_constant() {
3919        let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
3920
3921        let derived_value = {
3922            let name = "foobar".to_owned();
3923            let value = 5.into(); // Arbitrary value.
3924            let unit = MetricUnit::Duration(DurationUnit::default());
3925            let tags = TransactionMeasurementTags {
3926                measurement_rating: None,
3927                universal_tags: CommonTags(BTreeMap::new()),
3928                score_profile_version: None,
3929            };
3930
3931            let measurement = TransactionMetric::Measurement {
3932                name: name.clone(),
3933                value,
3934                unit,
3935                tags,
3936            };
3937
3938            let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
3939            metric.name.len() - unit.to_string().len() - name.len()
3940        };
3941        assert_eq!(
3942            hardcoded_value, derived_value,
3943            "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
3944        );
3945    }
3946
3947    #[tokio::test]
3948    async fn test_process_metrics_bucket_metadata() {
3949        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3950        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3951        let received_at = Utc::now();
3952        let config = Config::default();
3953
3954        let (aggregator, mut aggregator_rx) = Addr::custom();
3955        let processor = create_test_processor_with_addrs(
3956            config,
3957            Addrs {
3958                aggregator,
3959                ..Default::default()
3960            },
3961        )
3962        .await;
3963
3964        let mut item = Item::new(ItemType::Statsd);
3965        item.set_payload(
3966            ContentType::Text,
3967            "transactions/foo:3182887624:4267882815|s",
3968        );
3969        for (source, expected_received_at) in [
3970            (
3971                BucketSource::External,
3972                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3973            ),
3974            (BucketSource::Internal, None),
3975        ] {
3976            let message = ProcessMetrics {
3977                data: MetricData::Raw(vec![item.clone()]),
3978                project_key,
3979                source,
3980                received_at,
3981                sent_at: Some(Utc::now()),
3982            };
3983            processor.handle_process_metrics(&mut token, message);
3984
3985            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3986            let buckets = merge_buckets.buckets;
3987            assert_eq!(buckets.len(), 1);
3988            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3989        }
3990    }
3991
3992    #[tokio::test]
3993    async fn test_process_batched_metrics() {
3994        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3995        let received_at = Utc::now();
3996        let config = Config::default();
3997
3998        let (aggregator, mut aggregator_rx) = Addr::custom();
3999        let processor = create_test_processor_with_addrs(
4000            config,
4001            Addrs {
4002                aggregator,
4003                ..Default::default()
4004            },
4005        )
4006        .await;
4007
4008        let payload = r#"{
4009    "buckets": {
4010        "11111111111111111111111111111111": [
4011            {
4012                "timestamp": 1615889440,
4013                "width": 0,
4014                "name": "d:custom/endpoint.response_time@millisecond",
4015                "type": "d",
4016                "value": [
4017                  68.0
4018                ],
4019                "tags": {
4020                  "route": "user_index"
4021                }
4022            }
4023        ],
4024        "22222222222222222222222222222222": [
4025            {
4026                "timestamp": 1615889440,
4027                "width": 0,
4028                "name": "d:custom/endpoint.cache_rate@none",
4029                "type": "d",
4030                "value": [
4031                  36.0
4032                ]
4033            }
4034        ]
4035    }
4036}
4037"#;
4038        let message = ProcessBatchedMetrics {
4039            payload: Bytes::from(payload),
4040            source: BucketSource::Internal,
4041            received_at,
4042            sent_at: Some(Utc::now()),
4043        };
4044        processor.handle_process_batched_metrics(&mut token, message);
4045
4046        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
4047        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
4048
4049        let mut messages = vec![mb1, mb2];
4050        messages.sort_by_key(|mb| mb.project_key);
4051
4052        let actual = messages
4053            .into_iter()
4054            .map(|mb| (mb.project_key, mb.buckets))
4055            .collect::<Vec<_>>();
4056
4057        assert_debug_snapshot!(actual, @r###"
4058        [
4059            (
4060                ProjectKey("11111111111111111111111111111111"),
4061                [
4062                    Bucket {
4063                        timestamp: UnixTimestamp(1615889440),
4064                        width: 0,
4065                        name: MetricName(
4066                            "d:custom/endpoint.response_time@millisecond",
4067                        ),
4068                        value: Distribution(
4069                            [
4070                                68.0,
4071                            ],
4072                        ),
4073                        tags: {
4074                            "route": "user_index",
4075                        },
4076                        metadata: BucketMetadata {
4077                            merges: 1,
4078                            received_at: None,
4079                            extracted_from_indexed: false,
4080                        },
4081                    },
4082                ],
4083            ),
4084            (
4085                ProjectKey("22222222222222222222222222222222"),
4086                [
4087                    Bucket {
4088                        timestamp: UnixTimestamp(1615889440),
4089                        width: 0,
4090                        name: MetricName(
4091                            "d:custom/endpoint.cache_rate@none",
4092                        ),
4093                        value: Distribution(
4094                            [
4095                                36.0,
4096                            ],
4097                        ),
4098                        tags: {},
4099                        metadata: BucketMetadata {
4100                            merges: 1,
4101                            received_at: None,
4102                            extracted_from_indexed: false,
4103                        },
4104                    },
4105                ],
4106            ),
4107        ]
4108        "###);
4109    }
4110}