relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27    ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42    self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::{Integration, SpansIntegration};
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::ExtractedMetrics;
49use crate::metrics_extraction::transactions::types::ExtractMetricsError;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::sessions::SessionsProcessor;
53use crate::processing::spans::SpansProcessor;
54use crate::processing::trace_metrics::TraceMetricsProcessor;
55use crate::processing::utils::event::{
56    EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
57    event_type,
58};
59use crate::processing::utils::transaction::ExtractMetricsContext;
60use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
61use crate::service::ServiceError;
62use crate::services::global_config::GlobalConfigHandle;
63use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
64use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
65use crate::services::projects::cache::ProjectCacheHandle;
66use crate::services::projects::project::{ProjectInfo, ProjectState};
67use crate::services::upstream::{
68    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
69};
70use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
71use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
72use crate::{http, processing};
73use relay_threading::AsyncPool;
74#[cfg(feature = "processing")]
75use {
76    crate::services::global_rate_limits::{GlobalRateLimits, GlobalRateLimitsServiceHandle},
77    crate::services::processor::nnswitch::SwitchProcessingError,
78    crate::services::store::{Store, StoreEnvelope},
79    crate::utils::Enforcement,
80    itertools::Itertools,
81    relay_cardinality::{
82        CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
83        RedisSetLimiterOptions,
84    },
85    relay_dynamic_config::CardinalityLimiterMode,
86    relay_quotas::{RateLimitingError, RedisRateLimiter},
87    relay_redis::{AsyncRedisClient, RedisClients},
88    std::time::Instant,
89    symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
90};
91
92mod attachment;
93mod dynamic_sampling;
94mod event;
95mod metrics;
96mod nel;
97mod profile;
98mod profile_chunk;
99mod replay;
100mod report;
101mod span;
102
103#[cfg(all(sentry, feature = "processing"))]
104mod playstation;
105mod standalone;
106#[cfg(feature = "processing")]
107mod unreal;
108
109#[cfg(feature = "processing")]
110mod nnswitch;
111
112/// Creates the block only if used with `processing` feature.
113///
114/// Provided code block will be executed only if the provided config has `processing_enabled` set.
115macro_rules! if_processing {
116    ($config:expr, $if_true:block) => {
117        #[cfg(feature = "processing")] {
118            if $config.processing_enabled() $if_true
119        }
120    };
121    ($config:expr, $if_true:block else $if_false:block) => {
122        {
123            #[cfg(feature = "processing")] {
124                if $config.processing_enabled() $if_true else $if_false
125            }
126            #[cfg(not(feature = "processing"))] {
127                $if_false
128            }
129        }
130    };
131}
132
133/// The minimum clock drift for correction to apply.
134pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
135
136#[derive(Debug)]
137pub struct GroupTypeError;
138
139impl Display for GroupTypeError {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        f.write_str("failed to convert processing group into corresponding type")
142    }
143}
144
145impl std::error::Error for GroupTypeError {}
146
147macro_rules! processing_group {
148    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
149        #[derive(Clone, Copy, Debug)]
150        pub struct $ty;
151
152        impl From<$ty> for ProcessingGroup {
153            fn from(_: $ty) -> Self {
154                ProcessingGroup::$variant
155            }
156        }
157
158        impl TryFrom<ProcessingGroup> for $ty {
159            type Error = GroupTypeError;
160
161            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
162                if matches!(value, ProcessingGroup::$variant) {
163                    return Ok($ty);
164                }
165                $($(
166                    if matches!(value, ProcessingGroup::$other) {
167                        return Ok($ty);
168                    }
169                )+)?
170                return Err(GroupTypeError);
171            }
172        }
173    };
174}
175
176/// A marker trait.
177///
178/// Should be used only with groups which are responsible for processing envelopes with events.
179pub trait EventProcessing {}
180
181processing_group!(TransactionGroup, Transaction);
182impl EventProcessing for TransactionGroup {}
183
184processing_group!(ErrorGroup, Error);
185impl EventProcessing for ErrorGroup {}
186
187processing_group!(SessionGroup, Session);
188processing_group!(StandaloneGroup, Standalone);
189processing_group!(ClientReportGroup, ClientReport);
190processing_group!(ReplayGroup, Replay);
191processing_group!(CheckInGroup, CheckIn);
192processing_group!(LogGroup, Log, Nel);
193processing_group!(TraceMetricGroup, TraceMetric);
194processing_group!(SpanGroup, Span);
195
196processing_group!(ProfileChunkGroup, ProfileChunk);
197processing_group!(MetricsGroup, Metrics);
198processing_group!(ForwardUnknownGroup, ForwardUnknown);
199processing_group!(Ungrouped, Ungrouped);
200
201/// Processed group type marker.
202///
203/// Marks the envelopes which passed through the processing pipeline.
204#[derive(Clone, Copy, Debug)]
205pub struct Processed;
206
207/// Describes the groups of the processable items.
208#[derive(Clone, Copy, Debug)]
209pub enum ProcessingGroup {
210    /// All the transaction related items.
211    ///
212    /// Includes transactions, related attachments, profiles.
213    Transaction,
214    /// All the items which require (have or create) events.
215    ///
216    /// This includes: errors, NEL, security reports, user reports, some of the
217    /// attachments.
218    Error,
219    /// Session events.
220    Session,
221    /// Standalone items which can be sent alone without any event attached to it in the current
222    /// envelope e.g. some attachments, user reports.
223    Standalone,
224    /// Outcomes.
225    ClientReport,
226    /// Replays and ReplayRecordings.
227    Replay,
228    /// Crons.
229    CheckIn,
230    /// NEL reports.
231    Nel,
232    /// Logs.
233    Log,
234    /// Trace metrics.
235    TraceMetric,
236    /// Spans.
237    Span,
238    /// Span V2 spans.
239    SpanV2,
240    /// Metrics.
241    Metrics,
242    /// ProfileChunk.
243    ProfileChunk,
244    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
245    /// decide what to do with them.
246    ForwardUnknown,
247    /// All the items in the envelope that could not be grouped.
248    Ungrouped,
249}
250
251impl ProcessingGroup {
252    /// Splits provided envelope into list of tuples of groups with associated envelopes.
253    fn split_envelope(
254        mut envelope: Envelope,
255        project_info: &ProjectInfo,
256    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
257        let headers = envelope.headers().clone();
258        let mut grouped_envelopes = smallvec![];
259
260        // Extract replays.
261        let replay_items = envelope.take_items_by(|item| {
262            matches!(
263                item.ty(),
264                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
265            )
266        });
267        if !replay_items.is_empty() {
268            grouped_envelopes.push((
269                ProcessingGroup::Replay,
270                Envelope::from_parts(headers.clone(), replay_items),
271            ))
272        }
273
274        // Keep all the sessions together in one envelope.
275        let session_items = envelope
276            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
277        if !session_items.is_empty() {
278            grouped_envelopes.push((
279                ProcessingGroup::Session,
280                Envelope::from_parts(headers.clone(), session_items),
281            ))
282        }
283
284        let span_v2_items = envelope.take_items_by(|item| {
285            let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
286            let is_supported_integration = matches!(
287                item.integration(),
288                Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
289            );
290            let is_span = matches!(item.ty(), &ItemType::Span);
291
292            ItemContainer::<SpanV2>::is_container(item)
293                || (exp_feature && is_span)
294                || (exp_feature && is_supported_integration)
295        });
296
297        if !span_v2_items.is_empty() {
298            grouped_envelopes.push((
299                ProcessingGroup::SpanV2,
300                Envelope::from_parts(headers.clone(), span_v2_items),
301            ))
302        }
303
304        // Extract spans.
305        let span_items = envelope.take_items_by(|item| {
306            matches!(item.ty(), &ItemType::Span)
307                || matches!(item.integration(), Some(Integration::Spans(_)))
308        });
309
310        if !span_items.is_empty() {
311            grouped_envelopes.push((
312                ProcessingGroup::Span,
313                Envelope::from_parts(headers.clone(), span_items),
314            ))
315        }
316
317        // Extract logs.
318        let logs_items = envelope.take_items_by(|item| {
319            matches!(item.ty(), &ItemType::Log)
320                || matches!(item.integration(), Some(Integration::Logs(_)))
321        });
322        if !logs_items.is_empty() {
323            grouped_envelopes.push((
324                ProcessingGroup::Log,
325                Envelope::from_parts(headers.clone(), logs_items),
326            ))
327        }
328
329        // Extract trace metrics.
330        let trace_metric_items =
331            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
332        if !trace_metric_items.is_empty() {
333            grouped_envelopes.push((
334                ProcessingGroup::TraceMetric,
335                Envelope::from_parts(headers.clone(), trace_metric_items),
336            ))
337        }
338
339        // NEL items are transformed into logs in their own processing step.
340        let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
341        if !nel_items.is_empty() {
342            grouped_envelopes.push((
343                ProcessingGroup::Nel,
344                Envelope::from_parts(headers.clone(), nel_items),
345            ))
346        }
347
348        // Extract all metric items.
349        //
350        // Note: Should only be relevant in proxy mode. In other modes we send metrics through
351        // a separate pipeline.
352        let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
353        if !metric_items.is_empty() {
354            grouped_envelopes.push((
355                ProcessingGroup::Metrics,
356                Envelope::from_parts(headers.clone(), metric_items),
357            ))
358        }
359
360        // Extract profile chunks.
361        let profile_chunk_items =
362            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
363        if !profile_chunk_items.is_empty() {
364            grouped_envelopes.push((
365                ProcessingGroup::ProfileChunk,
366                Envelope::from_parts(headers.clone(), profile_chunk_items),
367            ))
368        }
369
370        // Extract all standalone items.
371        //
372        // Note: only if there are no items in the envelope which can create events, otherwise they
373        // will be in the same envelope with all require event items.
374        if !envelope.items().any(Item::creates_event) {
375            let standalone_items = envelope.take_items_by(Item::requires_event);
376            if !standalone_items.is_empty() {
377                grouped_envelopes.push((
378                    ProcessingGroup::Standalone,
379                    Envelope::from_parts(headers.clone(), standalone_items),
380                ))
381            }
382        };
383
384        // Make sure we create separate envelopes for each `RawSecurity` report.
385        let security_reports_items = envelope
386            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
387            .into_iter()
388            .map(|item| {
389                let headers = headers.clone();
390                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
391                let mut envelope = Envelope::from_parts(headers, items);
392                envelope.set_event_id(EventId::new());
393                (ProcessingGroup::Error, envelope)
394            });
395        grouped_envelopes.extend(security_reports_items);
396
397        // Extract all the items which require an event into separate envelope.
398        let require_event_items = envelope.take_items_by(Item::requires_event);
399        if !require_event_items.is_empty() {
400            let group = if require_event_items
401                .iter()
402                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
403            {
404                ProcessingGroup::Transaction
405            } else {
406                ProcessingGroup::Error
407            };
408
409            grouped_envelopes.push((
410                group,
411                Envelope::from_parts(headers.clone(), require_event_items),
412            ))
413        }
414
415        // Get the rest of the envelopes, one per item.
416        let envelopes = envelope.items_mut().map(|item| {
417            let headers = headers.clone();
418            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
419            let envelope = Envelope::from_parts(headers, items);
420            let item_type = item.ty();
421            let group = if matches!(item_type, &ItemType::CheckIn) {
422                ProcessingGroup::CheckIn
423            } else if matches!(item.ty(), &ItemType::ClientReport) {
424                ProcessingGroup::ClientReport
425            } else if matches!(item_type, &ItemType::Unknown(_)) {
426                ProcessingGroup::ForwardUnknown
427            } else {
428                // Cannot group this item type.
429                ProcessingGroup::Ungrouped
430            };
431
432            (group, envelope)
433        });
434        grouped_envelopes.extend(envelopes);
435
436        grouped_envelopes
437    }
438
439    /// Returns the name of the group.
440    pub fn variant(&self) -> &'static str {
441        match self {
442            ProcessingGroup::Transaction => "transaction",
443            ProcessingGroup::Error => "error",
444            ProcessingGroup::Session => "session",
445            ProcessingGroup::Standalone => "standalone",
446            ProcessingGroup::ClientReport => "client_report",
447            ProcessingGroup::Replay => "replay",
448            ProcessingGroup::CheckIn => "check_in",
449            ProcessingGroup::Log => "log",
450            ProcessingGroup::TraceMetric => "trace_metric",
451            ProcessingGroup::Nel => "nel",
452            ProcessingGroup::Span => "span",
453            ProcessingGroup::SpanV2 => "span_v2",
454            ProcessingGroup::Metrics => "metrics",
455            ProcessingGroup::ProfileChunk => "profile_chunk",
456            ProcessingGroup::ForwardUnknown => "forward_unknown",
457            ProcessingGroup::Ungrouped => "ungrouped",
458        }
459    }
460}
461
462impl From<ProcessingGroup> for AppFeature {
463    fn from(value: ProcessingGroup) -> Self {
464        match value {
465            ProcessingGroup::Transaction => AppFeature::Transactions,
466            ProcessingGroup::Error => AppFeature::Errors,
467            ProcessingGroup::Session => AppFeature::Sessions,
468            ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
469            ProcessingGroup::ClientReport => AppFeature::ClientReports,
470            ProcessingGroup::Replay => AppFeature::Replays,
471            ProcessingGroup::CheckIn => AppFeature::CheckIns,
472            ProcessingGroup::Log => AppFeature::Logs,
473            ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
474            ProcessingGroup::Nel => AppFeature::Logs,
475            ProcessingGroup::Span => AppFeature::Spans,
476            ProcessingGroup::SpanV2 => AppFeature::Spans,
477            ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
478            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
479            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
480            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
481        }
482    }
483}
484
485/// An error returned when handling [`ProcessEnvelope`].
486#[derive(Debug, thiserror::Error)]
487pub enum ProcessingError {
488    #[error("invalid json in event")]
489    InvalidJson(#[source] serde_json::Error),
490
491    #[error("invalid message pack event payload")]
492    InvalidMsgpack(#[from] rmp_serde::decode::Error),
493
494    #[cfg(feature = "processing")]
495    #[error("invalid unreal crash report")]
496    InvalidUnrealReport(#[source] Unreal4Error),
497
498    #[error("event payload too large")]
499    PayloadTooLarge(DiscardItemType),
500
501    #[error("invalid transaction event")]
502    InvalidTransaction,
503
504    #[error("envelope processor failed")]
505    ProcessingFailed(#[from] ProcessingAction),
506
507    #[error("duplicate {0} in event")]
508    DuplicateItem(ItemType),
509
510    #[error("failed to extract event payload")]
511    NoEventPayload,
512
513    #[error("missing project id in DSN")]
514    MissingProjectId,
515
516    #[error("invalid security report type: {0:?}")]
517    InvalidSecurityType(Bytes),
518
519    #[error("unsupported security report type")]
520    UnsupportedSecurityType,
521
522    #[error("invalid security report")]
523    InvalidSecurityReport(#[source] serde_json::Error),
524
525    #[error("invalid nel report")]
526    InvalidNelReport(#[source] NetworkReportError),
527
528    #[error("event filtered with reason: {0:?}")]
529    EventFiltered(FilterStatKey),
530
531    #[error("missing or invalid required event timestamp")]
532    InvalidTimestamp,
533
534    #[error("could not serialize event payload")]
535    SerializeFailed(#[source] serde_json::Error),
536
537    #[cfg(feature = "processing")]
538    #[error("failed to apply quotas")]
539    QuotasFailed(#[from] RateLimitingError),
540
541    #[error("invalid pii config")]
542    PiiConfigError(PiiConfigError),
543
544    #[error("invalid processing group type")]
545    InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
546
547    #[error("invalid replay")]
548    InvalidReplay(DiscardReason),
549
550    #[error("replay filtered with reason: {0:?}")]
551    ReplayFiltered(FilterStatKey),
552
553    #[cfg(feature = "processing")]
554    #[error("nintendo switch dying message processing failed {0:?}")]
555    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
556
557    #[cfg(all(sentry, feature = "processing"))]
558    #[error("playstation dump processing failed: {0}")]
559    InvalidPlaystationDump(String),
560
561    #[error("processing group does not match specific processor")]
562    ProcessingGroupMismatch,
563    #[error("new processing pipeline failed")]
564    ProcessingFailure,
565}
566
567impl ProcessingError {
568    pub fn to_outcome(&self) -> Option<Outcome> {
569        match self {
570            Self::PayloadTooLarge(payload_type) => {
571                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
572            }
573            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
574            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
575            Self::InvalidSecurityType(_) => {
576                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
577            }
578            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
579            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
580            Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
581            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
582            Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
583            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
584            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
585            #[cfg(feature = "processing")]
586            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
587            #[cfg(all(sentry, feature = "processing"))]
588            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
589            #[cfg(feature = "processing")]
590            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
591                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
592            }
593            #[cfg(feature = "processing")]
594            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
595            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
596                Some(Outcome::Invalid(DiscardReason::Internal))
597            }
598            #[cfg(feature = "processing")]
599            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
600            Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
601            Self::MissingProjectId => None,
602            Self::EventFiltered(_) => None,
603            Self::InvalidProcessingGroup(_) => None,
604            Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
605            Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
606
607            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
608            // Outcomes are emitted in the new processing pipeline already.
609            Self::ProcessingFailure => None,
610        }
611    }
612
613    fn is_unexpected(&self) -> bool {
614        self.to_outcome()
615            .is_some_and(|outcome| outcome.is_unexpected())
616    }
617}
618
619#[cfg(feature = "processing")]
620impl From<Unreal4Error> for ProcessingError {
621    fn from(err: Unreal4Error) -> Self {
622        match err.kind() {
623            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
624            _ => ProcessingError::InvalidUnrealReport(err),
625        }
626    }
627}
628
629impl From<ExtractMetricsError> for ProcessingError {
630    fn from(error: ExtractMetricsError) -> Self {
631        match error {
632            ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
633                Self::InvalidTimestamp
634            }
635        }
636    }
637}
638
639impl From<InvalidProcessingGroupType> for ProcessingError {
640    fn from(value: InvalidProcessingGroupType) -> Self {
641        Self::InvalidProcessingGroup(Box::new(value))
642    }
643}
644
645type ExtractedEvent = (Annotated<Event>, usize);
646
647/// A container for extracted metrics during processing.
648///
649/// The container enforces that the extracted metrics are correctly tagged
650/// with the dynamic sampling decision.
651#[derive(Debug)]
652pub struct ProcessingExtractedMetrics {
653    metrics: ExtractedMetrics,
654}
655
656impl ProcessingExtractedMetrics {
657    pub fn new() -> Self {
658        Self {
659            metrics: ExtractedMetrics::default(),
660        }
661    }
662
663    /// Extends the contained metrics with [`ExtractedMetrics`].
664    pub fn extend(
665        &mut self,
666        extracted: ExtractedMetrics,
667        sampling_decision: Option<SamplingDecision>,
668    ) {
669        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
670        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
671    }
672
673    /// Extends the contained project metrics.
674    pub fn extend_project_metrics<I>(
675        &mut self,
676        buckets: I,
677        sampling_decision: Option<SamplingDecision>,
678    ) where
679        I: IntoIterator<Item = Bucket>,
680    {
681        self.metrics
682            .project_metrics
683            .extend(buckets.into_iter().map(|mut bucket| {
684                bucket.metadata.extracted_from_indexed =
685                    sampling_decision == Some(SamplingDecision::Keep);
686                bucket
687            }));
688    }
689
690    /// Extends the contained sampling metrics.
691    pub fn extend_sampling_metrics<I>(
692        &mut self,
693        buckets: I,
694        sampling_decision: Option<SamplingDecision>,
695    ) where
696        I: IntoIterator<Item = Bucket>,
697    {
698        self.metrics
699            .sampling_metrics
700            .extend(buckets.into_iter().map(|mut bucket| {
701                bucket.metadata.extracted_from_indexed =
702                    sampling_decision == Some(SamplingDecision::Keep);
703                bucket
704            }));
705    }
706
707    /// Applies rate limits to the contained metrics.
708    ///
709    /// This is used to apply rate limits which have been enforced on sampled items of an envelope
710    /// to also consistently apply to the metrics extracted from these items.
711    #[cfg(feature = "processing")]
712    fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
713        // Metric namespaces which need to be dropped.
714        let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
715        // Metrics belonging to this metric namespace need to have the `extracted_from_indexed`
716        // flag reset to `false`.
717        let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
718
719        for (namespace, limit, indexed) in [
720            (
721                MetricNamespace::Transactions,
722                &enforcement.event,
723                &enforcement.event_indexed,
724            ),
725            (
726                MetricNamespace::Spans,
727                &enforcement.spans,
728                &enforcement.spans_indexed,
729            ),
730        ] {
731            if limit.is_active() {
732                drop_namespaces.push(namespace);
733            } else if indexed.is_active() && !enforced_consistently {
734                // If the enforcement was not computed by consistently checking the limits,
735                // the quota for the metrics has not yet been incremented.
736                // In this case we have a dropped indexed payload but a metric which still needs to
737                // be accounted for, make sure the metric will still be rate limited.
738                reset_extracted_from_indexed.push(namespace);
739            }
740        }
741
742        if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
743            self.retain_mut(|bucket| {
744                let Some(namespace) = bucket.name.try_namespace() else {
745                    return true;
746                };
747
748                if drop_namespaces.contains(&namespace) {
749                    return false;
750                }
751
752                if reset_extracted_from_indexed.contains(&namespace) {
753                    bucket.metadata.extracted_from_indexed = false;
754                }
755
756                true
757            });
758        }
759    }
760
761    #[cfg(feature = "processing")]
762    fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
763        self.metrics.project_metrics.retain_mut(&mut f);
764        self.metrics.sampling_metrics.retain_mut(&mut f);
765    }
766}
767
768fn send_metrics(
769    metrics: ExtractedMetrics,
770    project_key: ProjectKey,
771    sampling_key: Option<ProjectKey>,
772    aggregator: &Addr<Aggregator>,
773) {
774    let ExtractedMetrics {
775        project_metrics,
776        sampling_metrics,
777    } = metrics;
778
779    if !project_metrics.is_empty() {
780        aggregator.send(MergeBuckets {
781            project_key,
782            buckets: project_metrics,
783        });
784    }
785
786    if !sampling_metrics.is_empty() {
787        // If no sampling project state is available, we associate the sampling
788        // metrics with the current project.
789        //
790        // project_without_tracing         -> metrics goes to self
791        // dependent_project_with_tracing  -> metrics goes to root
792        // root_project_with_tracing       -> metrics goes to root == self
793        let sampling_project_key = sampling_key.unwrap_or(project_key);
794        aggregator.send(MergeBuckets {
795            project_key: sampling_project_key,
796            buckets: sampling_metrics,
797        });
798    }
799}
800
801/// Function for on-off switches that filter specific item types (profiles, spans)
802/// based on a feature flag.
803///
804/// If the project config did not come from the upstream, we keep the items.
805fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
806    match config.relay_mode() {
807        RelayMode::Proxy => false,
808        RelayMode::Managed => !project_info.has_feature(feature),
809    }
810}
811
812/// The result of the envelope processing containing the processed envelope along with the partial
813/// result.
814#[derive(Debug)]
815#[expect(
816    clippy::large_enum_variant,
817    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
818)]
819enum ProcessingResult {
820    Envelope {
821        managed_envelope: TypedEnvelope<Processed>,
822        extracted_metrics: ProcessingExtractedMetrics,
823    },
824    Output(Output<Outputs>),
825}
826
827impl ProcessingResult {
828    /// Creates a [`ProcessingResult`] with no metrics.
829    fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
830        Self::Envelope {
831            managed_envelope,
832            extracted_metrics: ProcessingExtractedMetrics::new(),
833        }
834    }
835}
836
837/// All items which can be submitted upstream.
838#[derive(Debug)]
839#[expect(
840    clippy::large_enum_variant,
841    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
842)]
843enum Submit<'a> {
844    /// A processed envelope.
845    Envelope(TypedEnvelope<Processed>),
846    /// The output of a [`processing::Processor`].
847    Output {
848        output: Outputs,
849        ctx: processing::ForwardContext<'a>,
850    },
851}
852
853/// Applies processing to all contents of the given envelope.
854///
855/// Depending on the contents of the envelope and Relay's mode, this includes:
856///
857///  - Basic normalization and validation for all item types.
858///  - Clock drift correction if the required `sent_at` header is present.
859///  - Expansion of certain item types (e.g. unreal).
860///  - Store normalization for event payloads in processing mode.
861///  - Rate limiters and inbound filters on events in processing mode.
862#[derive(Debug)]
863pub struct ProcessEnvelope {
864    /// Envelope to process.
865    pub envelope: ManagedEnvelope,
866    /// The project info.
867    pub project_info: Arc<ProjectInfo>,
868    /// Currently active cached rate limits for this project.
869    pub rate_limits: Arc<RateLimits>,
870    /// Root sampling project info.
871    pub sampling_project_info: Option<Arc<ProjectInfo>>,
872    /// Sampling reservoir counters.
873    pub reservoir_counters: ReservoirCounters,
874}
875
876/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
877#[derive(Debug)]
878struct ProcessEnvelopeGrouped<'a> {
879    /// The group the envelope belongs to.
880    pub group: ProcessingGroup,
881    /// Envelope to process.
882    pub envelope: ManagedEnvelope,
883    /// The processing context.
884    pub ctx: processing::Context<'a>,
885    /// Sampling reservoir counters.
886    pub reservoir_counters: &'a ReservoirCounters,
887}
888
889/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
890///
891/// This parses and validates the metrics:
892///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
893///    ignored independently.
894///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
895///    dropped together on parsing failure.
896///  - Other envelope items will be ignored with an error message.
897///
898/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
899/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
900#[derive(Debug)]
901pub struct ProcessMetrics {
902    /// A list of metric items.
903    pub data: MetricData,
904    /// The target project.
905    pub project_key: ProjectKey,
906    /// Whether to keep or reset the metric metadata.
907    pub source: BucketSource,
908    /// The wall clock time at which the request was received.
909    pub received_at: DateTime<Utc>,
910    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
911    /// correction.
912    pub sent_at: Option<DateTime<Utc>>,
913}
914
915/// Raw unparsed metric data.
916#[derive(Debug)]
917pub enum MetricData {
918    /// Raw data, unparsed envelope items.
919    Raw(Vec<Item>),
920    /// Already parsed buckets but unprocessed.
921    Parsed(Vec<Bucket>),
922}
923
924impl MetricData {
925    /// Consumes the metric data and parses the contained buckets.
926    ///
927    /// If the contained data is already parsed the buckets are returned unchanged.
928    /// Raw buckets are parsed and created with the passed `timestamp`.
929    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
930        let items = match self {
931            Self::Parsed(buckets) => return buckets,
932            Self::Raw(items) => items,
933        };
934
935        let mut buckets = Vec::new();
936        for item in items {
937            let payload = item.payload();
938            if item.ty() == &ItemType::Statsd {
939                for bucket_result in Bucket::parse_all(&payload, timestamp) {
940                    match bucket_result {
941                        Ok(bucket) => buckets.push(bucket),
942                        Err(error) => relay_log::debug!(
943                            error = &error as &dyn Error,
944                            "failed to parse metric bucket from statsd format",
945                        ),
946                    }
947                }
948            } else if item.ty() == &ItemType::MetricBuckets {
949                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
950                    Ok(parsed_buckets) => {
951                        // Re-use the allocation of `b` if possible.
952                        if buckets.is_empty() {
953                            buckets = parsed_buckets;
954                        } else {
955                            buckets.extend(parsed_buckets);
956                        }
957                    }
958                    Err(error) => {
959                        relay_log::debug!(
960                            error = &error as &dyn Error,
961                            "failed to parse metric bucket",
962                        );
963                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
964                    }
965                }
966            } else {
967                relay_log::error!(
968                    "invalid item of type {} passed to ProcessMetrics",
969                    item.ty()
970                );
971            }
972        }
973        buckets
974    }
975}
976
977#[derive(Debug)]
978pub struct ProcessBatchedMetrics {
979    /// Metrics payload in JSON format.
980    pub payload: Bytes,
981    /// Whether to keep or reset the metric metadata.
982    pub source: BucketSource,
983    /// The wall clock time at which the request was received.
984    pub received_at: DateTime<Utc>,
985    /// The wall clock time at which the request was received.
986    pub sent_at: Option<DateTime<Utc>>,
987}
988
989/// Source information where a metric bucket originates from.
990#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
991pub enum BucketSource {
992    /// The metric bucket originated from an internal Relay use case.
993    ///
994    /// The metric bucket originates either from within the same Relay
995    /// or was accepted coming from another Relay which is registered as
996    /// an internal Relay via Relay's configuration.
997    Internal,
998    /// The bucket source originated from an untrusted source.
999    ///
1000    /// Managed Relays sending extracted metrics are considered external,
1001    /// it's a project use case but it comes from an untrusted source.
1002    External,
1003}
1004
1005impl BucketSource {
1006    /// Infers the bucket source from [`RequestMeta::request_trust`].
1007    pub fn from_meta(meta: &RequestMeta) -> Self {
1008        match meta.request_trust() {
1009            RequestTrust::Trusted => Self::Internal,
1010            RequestTrust::Untrusted => Self::External,
1011        }
1012    }
1013}
1014
1015/// Sends a client report to the upstream.
1016#[derive(Debug)]
1017pub struct SubmitClientReports {
1018    /// The client report to be sent.
1019    pub client_reports: Vec<ClientReport>,
1020    /// Scoping information for the client report.
1021    pub scoping: Scoping,
1022}
1023
1024/// CPU-intensive processing tasks for envelopes.
1025#[derive(Debug)]
1026pub enum EnvelopeProcessor {
1027    ProcessEnvelope(Box<ProcessEnvelope>),
1028    ProcessProjectMetrics(Box<ProcessMetrics>),
1029    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1030    FlushBuckets(Box<FlushBuckets>),
1031    SubmitClientReports(Box<SubmitClientReports>),
1032}
1033
1034impl EnvelopeProcessor {
1035    /// Returns the name of the message variant.
1036    pub fn variant(&self) -> &'static str {
1037        match self {
1038            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1039            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1040            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1041            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1042            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1043        }
1044    }
1045}
1046
1047impl relay_system::Interface for EnvelopeProcessor {}
1048
1049impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1050    type Response = relay_system::NoResponse;
1051
1052    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1053        Self::ProcessEnvelope(Box::new(message))
1054    }
1055}
1056
1057impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1058    type Response = NoResponse;
1059
1060    fn from_message(message: ProcessMetrics, _: ()) -> Self {
1061        Self::ProcessProjectMetrics(Box::new(message))
1062    }
1063}
1064
1065impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1066    type Response = NoResponse;
1067
1068    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1069        Self::ProcessBatchedMetrics(Box::new(message))
1070    }
1071}
1072
1073impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1074    type Response = NoResponse;
1075
1076    fn from_message(message: FlushBuckets, _: ()) -> Self {
1077        Self::FlushBuckets(Box::new(message))
1078    }
1079}
1080
1081impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1082    type Response = NoResponse;
1083
1084    fn from_message(message: SubmitClientReports, _: ()) -> Self {
1085        Self::SubmitClientReports(Box::new(message))
1086    }
1087}
1088
1089/// The asynchronous thread pool used for scheduling processing tasks in the processor.
1090pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1091
1092/// Service implementing the [`EnvelopeProcessor`] interface.
1093///
1094/// This service handles messages in a worker pool with configurable concurrency.
1095#[derive(Clone)]
1096pub struct EnvelopeProcessorService {
1097    inner: Arc<InnerProcessor>,
1098}
1099
1100/// Contains the addresses of services that the processor publishes to.
1101pub struct Addrs {
1102    pub outcome_aggregator: Addr<TrackOutcome>,
1103    pub upstream_relay: Addr<UpstreamRelay>,
1104    #[cfg(feature = "processing")]
1105    pub store_forwarder: Option<Addr<Store>>,
1106    pub aggregator: Addr<Aggregator>,
1107    #[cfg(feature = "processing")]
1108    pub global_rate_limits: Option<Addr<GlobalRateLimits>>,
1109}
1110
1111impl Default for Addrs {
1112    fn default() -> Self {
1113        Addrs {
1114            outcome_aggregator: Addr::dummy(),
1115            upstream_relay: Addr::dummy(),
1116            #[cfg(feature = "processing")]
1117            store_forwarder: None,
1118            aggregator: Addr::dummy(),
1119            #[cfg(feature = "processing")]
1120            global_rate_limits: None,
1121        }
1122    }
1123}
1124
1125struct InnerProcessor {
1126    pool: EnvelopeProcessorServicePool,
1127    config: Arc<Config>,
1128    global_config: GlobalConfigHandle,
1129    project_cache: ProjectCacheHandle,
1130    cogs: Cogs,
1131    #[cfg(feature = "processing")]
1132    quotas_client: Option<AsyncRedisClient>,
1133    addrs: Addrs,
1134    #[cfg(feature = "processing")]
1135    rate_limiter: Option<Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>>,
1136    geoip_lookup: GeoIpLookup,
1137    #[cfg(feature = "processing")]
1138    cardinality_limiter: Option<CardinalityLimiter>,
1139    metric_outcomes: MetricOutcomes,
1140    processing: Processing,
1141}
1142
1143struct Processing {
1144    logs: LogsProcessor,
1145    trace_metrics: TraceMetricsProcessor,
1146    spans: SpansProcessor,
1147    check_ins: CheckInsProcessor,
1148    sessions: SessionsProcessor,
1149}
1150
1151impl EnvelopeProcessorService {
1152    /// Creates a multi-threaded envelope processor.
1153    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1154    pub fn new(
1155        pool: EnvelopeProcessorServicePool,
1156        config: Arc<Config>,
1157        global_config: GlobalConfigHandle,
1158        project_cache: ProjectCacheHandle,
1159        cogs: Cogs,
1160        #[cfg(feature = "processing")] redis: Option<RedisClients>,
1161        addrs: Addrs,
1162        metric_outcomes: MetricOutcomes,
1163    ) -> Self {
1164        let geoip_lookup = config
1165            .geoip_path()
1166            .and_then(
1167                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1168                    Ok(geoip) => Some(geoip),
1169                    Err(err) => {
1170                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1171                        None
1172                    }
1173                },
1174            )
1175            .unwrap_or_else(GeoIpLookup::empty);
1176
1177        #[cfg(feature = "processing")]
1178        let (cardinality, quotas) = match redis {
1179            Some(RedisClients {
1180                cardinality,
1181                quotas,
1182                ..
1183            }) => (Some(cardinality), Some(quotas)),
1184            None => (None, None),
1185        };
1186
1187        #[cfg(feature = "processing")]
1188        let global_rate_limits = addrs.global_rate_limits.clone().map(Into::into);
1189
1190        #[cfg(feature = "processing")]
1191        let rate_limiter = match (quotas.clone(), global_rate_limits) {
1192            (Some(redis), Some(global)) => {
1193                Some(RedisRateLimiter::new(redis, global).max_limit(config.max_rate_limit()))
1194            }
1195            _ => None,
1196        };
1197
1198        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1199            #[cfg(feature = "processing")]
1200            project_cache.clone(),
1201            #[cfg(feature = "processing")]
1202            rate_limiter.clone(),
1203        ));
1204        #[cfg(feature = "processing")]
1205        let rate_limiter = rate_limiter.map(Arc::new);
1206
1207        let inner = InnerProcessor {
1208            pool,
1209            global_config,
1210            project_cache,
1211            cogs,
1212            #[cfg(feature = "processing")]
1213            quotas_client: quotas.clone(),
1214            #[cfg(feature = "processing")]
1215            rate_limiter,
1216            addrs,
1217            #[cfg(feature = "processing")]
1218            cardinality_limiter: cardinality
1219                .map(|cardinality| {
1220                    RedisSetLimiter::new(
1221                        RedisSetLimiterOptions {
1222                            cache_vacuum_interval: config
1223                                .cardinality_limiter_cache_vacuum_interval(),
1224                        },
1225                        cardinality,
1226                    )
1227                })
1228                .map(CardinalityLimiter::new),
1229            metric_outcomes,
1230            processing: Processing {
1231                logs: LogsProcessor::new(Arc::clone(&quota_limiter)),
1232                trace_metrics: TraceMetricsProcessor::new(Arc::clone(&quota_limiter)),
1233                spans: SpansProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1234                check_ins: CheckInsProcessor::new(Arc::clone(&quota_limiter)),
1235                sessions: SessionsProcessor::new(quota_limiter),
1236            },
1237            geoip_lookup,
1238            config,
1239        };
1240
1241        Self {
1242            inner: Arc::new(inner),
1243        }
1244    }
1245
1246    async fn enforce_quotas<Group>(
1247        &self,
1248        managed_envelope: &mut TypedEnvelope<Group>,
1249        event: Annotated<Event>,
1250        extracted_metrics: &mut ProcessingExtractedMetrics,
1251        ctx: processing::Context<'_>,
1252    ) -> Result<Annotated<Event>, ProcessingError> {
1253        // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not
1254        // applied in the fast path, all cached quotas can be applied here.
1255        let cached_result = RateLimiter::Cached
1256            .enforce(managed_envelope, event, extracted_metrics, ctx)
1257            .await?;
1258
1259        if_processing!(self.inner.config, {
1260            let rate_limiter = match self.inner.rate_limiter.clone() {
1261                Some(rate_limiter) => rate_limiter,
1262                None => return Ok(cached_result.event),
1263            };
1264
1265            // Enforce all quotas consistently with Redis.
1266            let consistent_result = RateLimiter::Consistent(rate_limiter)
1267                .enforce(
1268                    managed_envelope,
1269                    cached_result.event,
1270                    extracted_metrics,
1271                    ctx
1272                )
1273                .await?;
1274
1275            // Update cached rate limits with the freshly computed ones.
1276            if !consistent_result.rate_limits.is_empty() {
1277                self.inner
1278                    .project_cache
1279                    .get(managed_envelope.scoping().project_key)
1280                    .rate_limits()
1281                    .merge(consistent_result.rate_limits);
1282            }
1283
1284            Ok(consistent_result.event)
1285        } else { Ok(cached_result.event) })
1286    }
1287
1288    /// Processes the general errors, and the items which require or create the events.
1289    async fn process_errors(
1290        &self,
1291        managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1292        project_id: ProjectId,
1293        mut ctx: processing::Context<'_>,
1294    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1295        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1296        let mut metrics = Metrics::default();
1297        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1298
1299        // Events can also contain user reports.
1300        report::process_user_reports(managed_envelope);
1301
1302        if_processing!(self.inner.config, {
1303            unreal::expand(managed_envelope, &self.inner.config)?;
1304            #[cfg(sentry)]
1305            playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1306            nnswitch::expand(managed_envelope)?;
1307        });
1308
1309        let extraction_result = event::extract(
1310            managed_envelope,
1311            &mut metrics,
1312            event_fully_normalized,
1313            &self.inner.config,
1314        )?;
1315        let mut event = extraction_result.event;
1316
1317        if_processing!(self.inner.config, {
1318            if let Some(inner_event_fully_normalized) =
1319                unreal::process(managed_envelope, &mut event)?
1320            {
1321                event_fully_normalized = inner_event_fully_normalized;
1322            }
1323            #[cfg(sentry)]
1324            if let Some(inner_event_fully_normalized) =
1325                playstation::process(managed_envelope, &mut event, ctx.project_info)?
1326            {
1327                event_fully_normalized = inner_event_fully_normalized;
1328            }
1329            if let Some(inner_event_fully_normalized) =
1330                attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1331            {
1332                event_fully_normalized = inner_event_fully_normalized;
1333            }
1334        });
1335
1336        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1337            managed_envelope,
1338            &mut event,
1339            ctx.project_info,
1340            ctx.sampling_project_info,
1341        );
1342
1343        let attachments = managed_envelope
1344            .envelope()
1345            .items()
1346            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1347        processing::utils::event::finalize(
1348            managed_envelope.envelope().headers(),
1349            &mut event,
1350            attachments,
1351            &mut metrics,
1352            &self.inner.config,
1353        )?;
1354        event_fully_normalized = processing::utils::event::normalize(
1355            managed_envelope.envelope().headers(),
1356            &mut event,
1357            event_fully_normalized,
1358            &ctx,
1359            &self.inner.geoip_lookup,
1360        )?;
1361        let filter_run = processing::utils::event::filter(
1362            managed_envelope.envelope().headers(),
1363            &mut event,
1364            &ctx,
1365        )
1366        .map_err(|err| {
1367            managed_envelope.reject(Outcome::Filtered(err.clone()));
1368            ProcessingError::EventFiltered(err)
1369        })?;
1370
1371        if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1372            dynamic_sampling::tag_error_with_sampling_decision(
1373                managed_envelope,
1374                &mut event,
1375                ctx.sampling_project_info,
1376                &self.inner.config,
1377            )
1378            .await;
1379        }
1380
1381        event = self
1382            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1383            .await?;
1384
1385        if event.value().is_some() {
1386            event::scrub(&mut event, ctx.project_info)?;
1387            event::serialize(
1388                managed_envelope,
1389                &mut event,
1390                event_fully_normalized,
1391                EventMetricsExtracted(false),
1392                SpansExtracted(false),
1393            )?;
1394            event::emit_feedback_metrics(managed_envelope.envelope());
1395        }
1396
1397        attachment::scrub(managed_envelope, ctx.project_info);
1398
1399        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1400            relay_log::error!(
1401                tags.project = %project_id,
1402                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1403                "ingested event without normalizing"
1404            );
1405        }
1406
1407        Ok(Some(extracted_metrics))
1408    }
1409
1410    /// Processes only transactions and transaction-related items.
1411    #[allow(unused_assignments)]
1412    #[allow(clippy::too_many_arguments)]
1413    async fn process_transactions(
1414        &self,
1415        managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1416        cogs: &mut Token,
1417        project_id: ProjectId,
1418        mut ctx: processing::Context<'_>,
1419        reservoir_counters: &ReservoirCounters,
1420    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1421        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1422        let mut event_metrics_extracted = EventMetricsExtracted(false);
1423        let mut spans_extracted = SpansExtracted(false);
1424        let mut metrics = Metrics::default();
1425        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1426
1427        // We extract the main event from the envelope.
1428        let extraction_result = event::extract(
1429            managed_envelope,
1430            &mut metrics,
1431            event_fully_normalized,
1432            &self.inner.config,
1433        )?;
1434
1435        // If metrics were extracted we mark that.
1436        if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1437            event_metrics_extracted = inner_event_metrics_extracted;
1438        }
1439        if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1440            spans_extracted = inner_spans_extracted;
1441        };
1442
1443        // We take the main event out of the result.
1444        let mut event = extraction_result.event;
1445
1446        let profile_id = profile::filter(
1447            managed_envelope,
1448            &event,
1449            ctx.config,
1450            project_id,
1451            ctx.project_info,
1452        );
1453        profile::transfer_id(&mut event, profile_id);
1454
1455        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1456            managed_envelope,
1457            &mut event,
1458            ctx.project_info,
1459            ctx.sampling_project_info,
1460        );
1461
1462        let attachments = managed_envelope
1463            .envelope()
1464            .items()
1465            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1466        processing::utils::event::finalize(
1467            managed_envelope.envelope().headers(),
1468            &mut event,
1469            attachments,
1470            &mut metrics,
1471            &self.inner.config,
1472        )?;
1473
1474        event_fully_normalized = processing::utils::event::normalize(
1475            managed_envelope.envelope().headers(),
1476            &mut event,
1477            event_fully_normalized,
1478            &ctx,
1479            &self.inner.geoip_lookup,
1480        )?;
1481
1482        let filter_run = processing::utils::event::filter(
1483            managed_envelope.envelope().headers(),
1484            &mut event,
1485            &ctx,
1486        )
1487        .map_err(|err| {
1488            managed_envelope.reject(Outcome::Filtered(err.clone()));
1489            ProcessingError::EventFiltered(err)
1490        })?;
1491
1492        // Always run dynamic sampling on processing Relays,
1493        // but delay decision until inbound filters have been fully processed.
1494        // Also, we require transaction metrics to be enabled before sampling.
1495        let run_dynamic_sampling = (matches!(filter_run, FiltersStatus::Ok)
1496            || self.inner.config.processing_enabled())
1497            && matches!(&ctx.project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled());
1498
1499        let sampling_result = match run_dynamic_sampling {
1500            true => {
1501                #[allow(unused_mut)]
1502                let mut reservoir = ReservoirEvaluator::new(Arc::clone(reservoir_counters));
1503                #[cfg(feature = "processing")]
1504                if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
1505                    reservoir.set_redis(managed_envelope.scoping().organization_id, quotas_client);
1506                }
1507                processing::utils::dynamic_sampling::run(
1508                    managed_envelope.envelope().headers().dsc(),
1509                    &mut event,
1510                    &ctx,
1511                    Some(&reservoir),
1512                )
1513                .await
1514            }
1515            false => SamplingResult::Pending,
1516        };
1517
1518        relay_statsd::metric!(
1519            counter(RelayCounters::SamplingDecision) += 1,
1520            decision = sampling_result.decision().as_str(),
1521            item = "transaction"
1522        );
1523
1524        #[cfg(feature = "processing")]
1525        let server_sample_rate = sampling_result.sample_rate();
1526
1527        if let Some(outcome) = sampling_result.into_dropped_outcome() {
1528            // Process profiles before dropping the transaction, if necessary.
1529            // Before metric extraction to make sure the profile count is reflected correctly.
1530            profile::process(
1531                managed_envelope,
1532                &mut event,
1533                ctx.global_config,
1534                ctx.config,
1535                ctx.project_info,
1536            );
1537            // Extract metrics here, we're about to drop the event/transaction.
1538            event_metrics_extracted = processing::utils::transaction::extract_metrics(
1539                &mut event,
1540                &mut extracted_metrics,
1541                ExtractMetricsContext {
1542                    dsc: managed_envelope.envelope().dsc(),
1543                    project_id,
1544                    ctx: &ctx,
1545                    sampling_decision: SamplingDecision::Drop,
1546                    event_metrics_extracted,
1547                    spans_extracted,
1548                },
1549            )?;
1550
1551            dynamic_sampling::drop_unsampled_items(
1552                managed_envelope,
1553                event,
1554                outcome,
1555                spans_extracted,
1556            );
1557
1558            // At this point we have:
1559            //  - An empty envelope.
1560            //  - An envelope containing only processed profiles.
1561            // We need to make sure there are enough quotas for these profiles.
1562            event = self
1563                .enforce_quotas(
1564                    managed_envelope,
1565                    Annotated::empty(),
1566                    &mut extracted_metrics,
1567                    ctx,
1568                )
1569                .await?;
1570
1571            return Ok(Some(extracted_metrics));
1572        }
1573
1574        let _post_ds = cogs.start_category("post_ds");
1575
1576        // Need to scrub the transaction before extracting spans.
1577        //
1578        // Unconditionally scrub to make sure PII is removed as early as possible.
1579        event::scrub(&mut event, ctx.project_info)?;
1580
1581        attachment::scrub(managed_envelope, ctx.project_info);
1582
1583        if_processing!(self.inner.config, {
1584            // Process profiles before extracting metrics, to make sure they are removed if they are invalid.
1585            let profile_id = profile::process(
1586                managed_envelope,
1587                &mut event,
1588                ctx.global_config,
1589                ctx.config,
1590                ctx.project_info,
1591            );
1592            profile::transfer_id(&mut event, profile_id);
1593            profile::scrub_profiler_id(&mut event);
1594
1595            // Always extract metrics in processing Relays for sampled items.
1596            event_metrics_extracted = processing::utils::transaction::extract_metrics(
1597                &mut event,
1598                &mut extracted_metrics,
1599                ExtractMetricsContext {
1600                    dsc: managed_envelope.envelope().dsc(),
1601                    project_id,
1602                    ctx: &ctx,
1603                    sampling_decision: SamplingDecision::Keep,
1604                    event_metrics_extracted,
1605                    spans_extracted,
1606                },
1607            )?;
1608
1609            spans_extracted = span::extract_from_event(
1610                managed_envelope,
1611                &event,
1612                ctx.global_config,
1613                ctx.config,
1614                server_sample_rate,
1615                event_metrics_extracted,
1616                spans_extracted,
1617            );
1618        });
1619
1620        event = self
1621            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1622            .await?;
1623
1624        if_processing!(self.inner.config, {
1625            event = span::maybe_discard_transaction(managed_envelope, event, ctx.project_info);
1626        });
1627
1628        // Event may have been dropped because of a quota and the envelope can be empty.
1629        if event.value().is_some() {
1630            event::serialize(
1631                managed_envelope,
1632                &mut event,
1633                event_fully_normalized,
1634                event_metrics_extracted,
1635                spans_extracted,
1636            )?;
1637        }
1638
1639        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1640            relay_log::error!(
1641                tags.project = %project_id,
1642                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1643                "ingested event without normalizing"
1644            );
1645        };
1646
1647        Ok(Some(extracted_metrics))
1648    }
1649
1650    async fn process_profile_chunks(
1651        &self,
1652        managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
1653        ctx: processing::Context<'_>,
1654    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1655        profile_chunk::filter(managed_envelope, ctx.project_info);
1656
1657        if_processing!(self.inner.config, {
1658            profile_chunk::process(
1659                managed_envelope,
1660                ctx.project_info,
1661                ctx.global_config,
1662                ctx.config,
1663            );
1664        });
1665
1666        self.enforce_quotas(
1667            managed_envelope,
1668            Annotated::empty(),
1669            &mut ProcessingExtractedMetrics::new(),
1670            ctx,
1671        )
1672        .await?;
1673
1674        Ok(None)
1675    }
1676
1677    /// Processes standalone items that require an event ID, but do not have an event on the same envelope.
1678    async fn process_standalone(
1679        &self,
1680        managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1681        project_id: ProjectId,
1682        ctx: processing::Context<'_>,
1683    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1684        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1685
1686        standalone::process(managed_envelope);
1687
1688        profile::filter(
1689            managed_envelope,
1690            &Annotated::empty(),
1691            ctx.config,
1692            project_id,
1693            ctx.project_info,
1694        );
1695
1696        self.enforce_quotas(
1697            managed_envelope,
1698            Annotated::empty(),
1699            &mut extracted_metrics,
1700            ctx,
1701        )
1702        .await?;
1703
1704        report::process_user_reports(managed_envelope);
1705        attachment::scrub(managed_envelope, ctx.project_info);
1706
1707        Ok(Some(extracted_metrics))
1708    }
1709
1710    /// Processes user and client reports.
1711    async fn process_client_reports(
1712        &self,
1713        managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1714        ctx: processing::Context<'_>,
1715    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1716        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1717
1718        self.enforce_quotas(
1719            managed_envelope,
1720            Annotated::empty(),
1721            &mut extracted_metrics,
1722            ctx,
1723        )
1724        .await?;
1725
1726        report::process_client_reports(
1727            managed_envelope,
1728            ctx.config,
1729            ctx.project_info,
1730            self.inner.addrs.outcome_aggregator.clone(),
1731        );
1732
1733        Ok(Some(extracted_metrics))
1734    }
1735
1736    /// Processes replays.
1737    async fn process_replays(
1738        &self,
1739        managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1740        ctx: processing::Context<'_>,
1741    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1742        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1743
1744        replay::process(
1745            managed_envelope,
1746            ctx.global_config,
1747            ctx.config,
1748            ctx.project_info,
1749            &self.inner.geoip_lookup,
1750        )?;
1751
1752        self.enforce_quotas(
1753            managed_envelope,
1754            Annotated::empty(),
1755            &mut extracted_metrics,
1756            ctx,
1757        )
1758        .await?;
1759
1760        Ok(Some(extracted_metrics))
1761    }
1762
1763    async fn process_nel(
1764        &self,
1765        mut managed_envelope: ManagedEnvelope,
1766        ctx: processing::Context<'_>,
1767    ) -> Result<ProcessingResult, ProcessingError> {
1768        nel::convert_to_logs(&mut managed_envelope);
1769        self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1770            .await
1771    }
1772
1773    async fn process_with_processor<P: processing::Processor>(
1774        &self,
1775        processor: &P,
1776        mut managed_envelope: ManagedEnvelope,
1777        ctx: processing::Context<'_>,
1778    ) -> Result<ProcessingResult, ProcessingError>
1779    where
1780        Outputs: From<P::Output>,
1781    {
1782        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1783            debug_assert!(
1784                false,
1785                "there must be work for the {} processor",
1786                std::any::type_name::<P>(),
1787            );
1788            return Err(ProcessingError::ProcessingGroupMismatch);
1789        };
1790
1791        managed_envelope.update();
1792        match managed_envelope.envelope().is_empty() {
1793            true => managed_envelope.accept(),
1794            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1795        }
1796
1797        processor
1798            .process(work, ctx)
1799            .await
1800            .map_err(|err| {
1801                relay_log::debug!(
1802                    error = &err as &dyn std::error::Error,
1803                    "processing pipeline failed"
1804                );
1805                ProcessingError::ProcessingFailure
1806            })
1807            .map(|o| o.map(Into::into))
1808            .map(ProcessingResult::Output)
1809    }
1810
1811    /// Processes standalone spans.
1812    ///
1813    /// This function does *not* run for spans extracted from transactions.
1814    async fn process_standalone_spans(
1815        &self,
1816        managed_envelope: &mut TypedEnvelope<SpanGroup>,
1817        _project_id: ProjectId,
1818        ctx: processing::Context<'_>,
1819    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1820        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1821
1822        span::filter(managed_envelope, ctx.config, ctx.project_info);
1823        span::convert_otel_traces_data(managed_envelope);
1824
1825        if_processing!(self.inner.config, {
1826            span::process(
1827                managed_envelope,
1828                &mut Annotated::empty(),
1829                &mut extracted_metrics,
1830                _project_id,
1831                ctx,
1832                &self.inner.geoip_lookup,
1833            )
1834            .await;
1835        });
1836
1837        self.enforce_quotas(
1838            managed_envelope,
1839            Annotated::empty(),
1840            &mut extracted_metrics,
1841            ctx,
1842        )
1843        .await?;
1844
1845        Ok(Some(extracted_metrics))
1846    }
1847
1848    async fn process_envelope(
1849        &self,
1850        cogs: &mut Token,
1851        project_id: ProjectId,
1852        message: ProcessEnvelopeGrouped<'_>,
1853    ) -> Result<ProcessingResult, ProcessingError> {
1854        let ProcessEnvelopeGrouped {
1855            group,
1856            envelope: mut managed_envelope,
1857            ctx,
1858            reservoir_counters,
1859        } = message;
1860
1861        // Pre-process the envelope headers.
1862        if let Some(sampling_state) = ctx.sampling_project_info {
1863            // Both transactions and standalone span envelopes need a normalized DSC header
1864            // to make sampling rules based on the segment/transaction name work correctly.
1865            managed_envelope
1866                .envelope_mut()
1867                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1868        }
1869
1870        // Set the event retention. Effectively, this value will only be available in processing
1871        // mode when the full project config is queried from the upstream.
1872        if let Some(retention) = ctx.project_info.config.event_retention {
1873            managed_envelope.envelope_mut().set_retention(retention);
1874        }
1875
1876        // Set the event retention. Effectively, this value will only be available in processing
1877        // mode when the full project config is queried from the upstream.
1878        if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1879            managed_envelope
1880                .envelope_mut()
1881                .set_downsampled_retention(retention);
1882        }
1883
1884        // Ensure the project ID is updated to the stored instance for this project cache. This can
1885        // differ in two cases:
1886        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
1887        //  2. The DSN was moved and the envelope sent to the old project ID.
1888        managed_envelope
1889            .envelope_mut()
1890            .meta_mut()
1891            .set_project_id(project_id);
1892
1893        macro_rules! run {
1894            ($fn_name:ident $(, $args:expr)*) => {
1895                async {
1896                    let mut managed_envelope = (managed_envelope, group).try_into()?;
1897                    match self.$fn_name(&mut managed_envelope, $($args),*).await {
1898                        Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1899                            managed_envelope: managed_envelope.into_processed(),
1900                            extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1901                        }),
1902                        Err(error) => {
1903                            relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1904                            if let Some(outcome) = error.to_outcome() {
1905                                managed_envelope.reject(outcome);
1906                            }
1907
1908                            return Err(error);
1909                        }
1910                    }
1911                }.await
1912            };
1913        }
1914
1915        relay_log::trace!("Processing {group} group", group = group.variant());
1916
1917        match group {
1918            ProcessingGroup::Error => run!(process_errors, project_id, ctx),
1919            ProcessingGroup::Transaction => {
1920                run!(
1921                    process_transactions,
1922                    cogs,
1923                    project_id,
1924                    ctx,
1925                    reservoir_counters
1926                )
1927            }
1928            ProcessingGroup::Session => {
1929                self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1930                    .await
1931            }
1932            ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx),
1933            ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
1934            ProcessingGroup::Replay => {
1935                run!(process_replays, ctx)
1936            }
1937            ProcessingGroup::CheckIn => {
1938                self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1939                    .await
1940            }
1941            ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1942            ProcessingGroup::Log => {
1943                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1944                    .await
1945            }
1946            ProcessingGroup::TraceMetric => {
1947                self.process_with_processor(
1948                    &self.inner.processing.trace_metrics,
1949                    managed_envelope,
1950                    ctx,
1951                )
1952                .await
1953            }
1954            ProcessingGroup::SpanV2 => {
1955                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1956                    .await
1957            }
1958            ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1959            ProcessingGroup::ProfileChunk => {
1960                run!(process_profile_chunks, ctx)
1961            }
1962            // Currently is not used.
1963            ProcessingGroup::Metrics => {
1964                // In proxy mode we simply forward the metrics.
1965                // This group shouldn't be used outside of proxy mode.
1966                if self.inner.config.relay_mode() != RelayMode::Proxy {
1967                    relay_log::error!(
1968                        tags.project = %project_id,
1969                        items = ?managed_envelope.envelope().items().next().map(Item::ty),
1970                        "received metrics in the process_state"
1971                    );
1972                }
1973
1974                Ok(ProcessingResult::no_metrics(
1975                    managed_envelope.into_processed(),
1976                ))
1977            }
1978            // Fallback to the legacy process_state implementation for Ungrouped events.
1979            ProcessingGroup::Ungrouped => {
1980                relay_log::error!(
1981                    tags.project = %project_id,
1982                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
1983                    "could not identify the processing group based on the envelope's items"
1984                );
1985
1986                Ok(ProcessingResult::no_metrics(
1987                    managed_envelope.into_processed(),
1988                ))
1989            }
1990            // Leave this group unchanged.
1991            //
1992            // This will later be forwarded to upstream.
1993            ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
1994                managed_envelope.into_processed(),
1995            )),
1996        }
1997    }
1998
1999    /// Processes the envelope and returns the processed envelope back.
2000    ///
2001    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
2002    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
2003    /// to be dropped, this is `None`.
2004    async fn process<'a>(
2005        &self,
2006        cogs: &mut Token,
2007        mut message: ProcessEnvelopeGrouped<'a>,
2008    ) -> Result<Option<Submit<'a>>, ProcessingError> {
2009        let ProcessEnvelopeGrouped {
2010            ref mut envelope,
2011            ctx,
2012            ..
2013        } = message;
2014
2015        // Prefer the project's project ID, and fall back to the stated project id from the
2016        // envelope. The project ID is available in all modes, other than in proxy mode, where
2017        // envelopes for unknown projects are forwarded blindly.
2018        //
2019        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
2020        // since we cannot process an envelope without project ID, so drop it.
2021        let Some(project_id) = ctx
2022            .project_info
2023            .project_id
2024            .or_else(|| envelope.envelope().meta().project_id())
2025        else {
2026            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2027            return Err(ProcessingError::MissingProjectId);
2028        };
2029
2030        let client = envelope.envelope().meta().client().map(str::to_owned);
2031        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2032        let project_key = envelope.envelope().meta().public_key();
2033        // Only allow sending to the sampling key, if we successfully loaded a sampling project
2034        // info relating to it. This filters out unknown/invalid project keys as well as project
2035        // keys from different organizations.
2036        let sampling_key = envelope
2037            .envelope()
2038            .sampling_key()
2039            .filter(|_| ctx.sampling_project_info.is_some());
2040
2041        // We set additional information on the scope, which will be removed after processing the
2042        // envelope.
2043        relay_log::configure_scope(|scope| {
2044            scope.set_tag("project", project_id);
2045            if let Some(client) = client {
2046                scope.set_tag("sdk", client);
2047            }
2048            if let Some(user_agent) = user_agent {
2049                scope.set_extra("user_agent", user_agent.into());
2050            }
2051        });
2052
2053        let result = match self.process_envelope(cogs, project_id, message).await {
2054            Ok(ProcessingResult::Envelope {
2055                mut managed_envelope,
2056                extracted_metrics,
2057            }) => {
2058                // The envelope could be modified or even emptied during processing, which
2059                // requires re-computation of the context.
2060                managed_envelope.update();
2061
2062                let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2063                send_metrics(
2064                    extracted_metrics.metrics,
2065                    project_key,
2066                    sampling_key,
2067                    &self.inner.addrs.aggregator,
2068                );
2069
2070                let envelope_response = if managed_envelope.envelope().is_empty() {
2071                    if !has_metrics {
2072                        // Individual rate limits have already been issued
2073                        managed_envelope.reject(Outcome::RateLimited(None));
2074                    } else {
2075                        managed_envelope.accept();
2076                    }
2077
2078                    None
2079                } else {
2080                    Some(managed_envelope)
2081                };
2082
2083                Ok(envelope_response.map(Submit::Envelope))
2084            }
2085            Ok(ProcessingResult::Output(Output { main, metrics })) => {
2086                if let Some(metrics) = metrics {
2087                    metrics.accept(|metrics| {
2088                        send_metrics(
2089                            metrics,
2090                            project_key,
2091                            sampling_key,
2092                            &self.inner.addrs.aggregator,
2093                        );
2094                    });
2095                }
2096
2097                let ctx = ctx.to_forward();
2098                Ok(main.map(|output| Submit::Output { output, ctx }))
2099            }
2100            Err(err) => Err(err),
2101        };
2102
2103        relay_log::configure_scope(|scope| {
2104            scope.remove_tag("project");
2105            scope.remove_tag("sdk");
2106            scope.remove_tag("user_agent");
2107        });
2108
2109        result
2110    }
2111
2112    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2113        let project_key = message.envelope.envelope().meta().public_key();
2114        let wait_time = message.envelope.age();
2115        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2116
2117        // This COGS handling may need an overhaul in the future:
2118        // Cancel the passed in token, to start individual measurements per envelope instead.
2119        cogs.cancel();
2120
2121        let scoping = message.envelope.scoping();
2122        for (group, envelope) in ProcessingGroup::split_envelope(
2123            *message.envelope.into_envelope(),
2124            &message.project_info,
2125        ) {
2126            let mut cogs = self
2127                .inner
2128                .cogs
2129                .timed(ResourceId::Relay, AppFeature::from(group));
2130
2131            let mut envelope =
2132                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2133            envelope.scope(scoping);
2134
2135            let global_config = self.inner.global_config.current();
2136
2137            let ctx = processing::Context {
2138                config: &self.inner.config,
2139                global_config: &global_config,
2140                project_info: &message.project_info,
2141                sampling_project_info: message.sampling_project_info.as_deref(),
2142                rate_limits: &message.rate_limits,
2143            };
2144
2145            let message = ProcessEnvelopeGrouped {
2146                group,
2147                envelope,
2148                ctx,
2149                reservoir_counters: &message.reservoir_counters,
2150            };
2151
2152            let result = metric!(
2153                timer(RelayTimers::EnvelopeProcessingTime),
2154                group = group.variant(),
2155                { self.process(&mut cogs, message).await }
2156            );
2157
2158            match result {
2159                Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2160                Ok(None) => {}
2161                Err(error) if error.is_unexpected() => {
2162                    relay_log::error!(
2163                        tags.project_key = %project_key,
2164                        error = &error as &dyn Error,
2165                        "error processing envelope"
2166                    )
2167                }
2168                Err(error) => {
2169                    relay_log::debug!(
2170                        tags.project_key = %project_key,
2171                        error = &error as &dyn Error,
2172                        "error processing envelope"
2173                    )
2174                }
2175            }
2176        }
2177    }
2178
2179    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2180        let ProcessMetrics {
2181            data,
2182            project_key,
2183            received_at,
2184            sent_at,
2185            source,
2186        } = message;
2187
2188        let received_timestamp =
2189            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2190
2191        let mut buckets = data.into_buckets(received_timestamp);
2192        if buckets.is_empty() {
2193            return;
2194        };
2195        cogs.update(relay_metrics::cogs::BySize(&buckets));
2196
2197        let clock_drift_processor =
2198            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2199
2200        buckets.retain_mut(|bucket| {
2201            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2202                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2203                return false;
2204            }
2205
2206            if !self::metrics::is_valid_namespace(bucket) {
2207                return false;
2208            }
2209
2210            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2211
2212            if !matches!(source, BucketSource::Internal) {
2213                bucket.metadata = BucketMetadata::new(received_timestamp);
2214            }
2215
2216            true
2217        });
2218
2219        let project = self.inner.project_cache.get(project_key);
2220
2221        // Best effort check to filter and rate limit buckets, if there is no project state
2222        // available at the current time, we will check again after flushing.
2223        let buckets = match project.state() {
2224            ProjectState::Enabled(project_info) => {
2225                let rate_limits = project.rate_limits().current_limits();
2226                self.check_buckets(project_key, project_info, &rate_limits, buckets)
2227            }
2228            _ => buckets,
2229        };
2230
2231        relay_log::trace!("merging metric buckets into the aggregator");
2232        self.inner
2233            .addrs
2234            .aggregator
2235            .send(MergeBuckets::new(project_key, buckets));
2236    }
2237
2238    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2239        let ProcessBatchedMetrics {
2240            payload,
2241            source,
2242            received_at,
2243            sent_at,
2244        } = message;
2245
2246        #[derive(serde::Deserialize)]
2247        struct Wrapper {
2248            buckets: HashMap<ProjectKey, Vec<Bucket>>,
2249        }
2250
2251        let buckets = match serde_json::from_slice(&payload) {
2252            Ok(Wrapper { buckets }) => buckets,
2253            Err(error) => {
2254                relay_log::debug!(
2255                    error = &error as &dyn Error,
2256                    "failed to parse batched metrics",
2257                );
2258                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2259                return;
2260            }
2261        };
2262
2263        for (project_key, buckets) in buckets {
2264            self.handle_process_metrics(
2265                cogs,
2266                ProcessMetrics {
2267                    data: MetricData::Parsed(buckets),
2268                    project_key,
2269                    source,
2270                    received_at,
2271                    sent_at,
2272                },
2273            )
2274        }
2275    }
2276
2277    fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2278        let _submit = cogs.start_category("submit");
2279
2280        #[cfg(feature = "processing")]
2281        if self.inner.config.processing_enabled()
2282            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2283        {
2284            match submit {
2285                Submit::Envelope(envelope) => store_forwarder.send(StoreEnvelope { envelope }),
2286                Submit::Output { output, ctx } => output
2287                    .forward_store(store_forwarder, ctx)
2288                    .unwrap_or_else(|err| err.into_inner()),
2289            }
2290            return;
2291        }
2292
2293        let mut envelope = match submit {
2294            Submit::Envelope(envelope) => envelope,
2295            Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2296                Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2297                Err(_) => {
2298                    relay_log::error!("failed to serialize output to an envelope");
2299                    return;
2300                }
2301            },
2302        };
2303
2304        if envelope.envelope_mut().is_empty() {
2305            envelope.accept();
2306            return;
2307        }
2308
2309        // Override the `sent_at` timestamp. Since the envelope went through basic
2310        // normalization, all timestamps have been corrected. We propagate the new
2311        // `sent_at` to allow the next Relay to double-check this timestamp and
2312        // potentially apply correction again. This is done as close to sending as
2313        // possible so that we avoid internal delays.
2314        envelope.envelope_mut().set_sent_at(Utc::now());
2315
2316        relay_log::trace!("sending envelope to sentry endpoint");
2317        let http_encoding = self.inner.config.http_encoding();
2318        let result = envelope.envelope().to_vec().and_then(|v| {
2319            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2320        });
2321
2322        match result {
2323            Ok(body) => {
2324                self.inner
2325                    .addrs
2326                    .upstream_relay
2327                    .send(SendRequest(SendEnvelope {
2328                        envelope,
2329                        body,
2330                        http_encoding,
2331                        project_cache: self.inner.project_cache.clone(),
2332                    }));
2333            }
2334            Err(error) => {
2335                // Errors are only logged for what we consider an internal discard reason. These
2336                // indicate errors in the infrastructure or implementation bugs.
2337                relay_log::error!(
2338                    error = &error as &dyn Error,
2339                    tags.project_key = %envelope.scoping().project_key,
2340                    "failed to serialize envelope payload"
2341                );
2342
2343                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2344            }
2345        }
2346    }
2347
2348    fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2349        let SubmitClientReports {
2350            client_reports,
2351            scoping,
2352        } = message;
2353
2354        let upstream = self.inner.config.upstream_descriptor();
2355        let dsn = PartialDsn::outbound(&scoping, upstream);
2356
2357        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2358        for client_report in client_reports {
2359            match client_report.serialize() {
2360                Ok(payload) => {
2361                    let mut item = Item::new(ItemType::ClientReport);
2362                    item.set_payload(ContentType::Json, payload);
2363                    envelope.add_item(item);
2364                }
2365                Err(error) => {
2366                    relay_log::error!(
2367                        error = &error as &dyn std::error::Error,
2368                        "failed to serialize client report"
2369                    );
2370                }
2371            }
2372        }
2373
2374        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2375        self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2376    }
2377
2378    fn check_buckets(
2379        &self,
2380        project_key: ProjectKey,
2381        project_info: &ProjectInfo,
2382        rate_limits: &RateLimits,
2383        buckets: Vec<Bucket>,
2384    ) -> Vec<Bucket> {
2385        let Some(scoping) = project_info.scoping(project_key) else {
2386            relay_log::error!(
2387                tags.project_key = project_key.as_str(),
2388                "there is no scoping: dropping {} buckets",
2389                buckets.len(),
2390            );
2391            return Vec::new();
2392        };
2393
2394        let mut buckets = self::metrics::apply_project_info(
2395            buckets,
2396            &self.inner.metric_outcomes,
2397            project_info,
2398            scoping,
2399        );
2400
2401        let namespaces: BTreeSet<MetricNamespace> = buckets
2402            .iter()
2403            .filter_map(|bucket| bucket.name.try_namespace())
2404            .collect();
2405
2406        for namespace in namespaces {
2407            let limits = rate_limits.check_with_quotas(
2408                project_info.get_quotas(),
2409                scoping.item(DataCategory::MetricBucket),
2410            );
2411
2412            if limits.is_limited() {
2413                let rejected;
2414                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2415                    bucket.name.try_namespace() == Some(namespace)
2416                });
2417
2418                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2419                self.inner.metric_outcomes.track(
2420                    scoping,
2421                    &rejected,
2422                    Outcome::RateLimited(reason_code),
2423                );
2424            }
2425        }
2426
2427        let quotas = project_info.config.quotas.clone();
2428        match MetricsLimiter::create(buckets, quotas, scoping) {
2429            Ok(mut bucket_limiter) => {
2430                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2431                bucket_limiter.into_buckets()
2432            }
2433            Err(buckets) => buckets,
2434        }
2435    }
2436
2437    #[cfg(feature = "processing")]
2438    async fn rate_limit_buckets(
2439        &self,
2440        scoping: Scoping,
2441        project_info: &ProjectInfo,
2442        mut buckets: Vec<Bucket>,
2443    ) -> Vec<Bucket> {
2444        let Some(rate_limiter) = &self.inner.rate_limiter else {
2445            return buckets;
2446        };
2447
2448        let global_config = self.inner.global_config.current();
2449        let namespaces = buckets
2450            .iter()
2451            .filter_map(|bucket| bucket.name.try_namespace())
2452            .counts();
2453
2454        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2455
2456        for (namespace, quantity) in namespaces {
2457            let item_scoping = scoping.metric_bucket(namespace);
2458
2459            let limits = match rate_limiter
2460                .is_rate_limited(quotas, item_scoping, quantity, false)
2461                .await
2462            {
2463                Ok(limits) => limits,
2464                Err(err) => {
2465                    relay_log::error!(
2466                        error = &err as &dyn std::error::Error,
2467                        "failed to check redis rate limits"
2468                    );
2469                    break;
2470                }
2471            };
2472
2473            if limits.is_limited() {
2474                let rejected;
2475                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2476                    bucket.name.try_namespace() == Some(namespace)
2477                });
2478
2479                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2480                self.inner.metric_outcomes.track(
2481                    scoping,
2482                    &rejected,
2483                    Outcome::RateLimited(reason_code),
2484                );
2485
2486                self.inner
2487                    .project_cache
2488                    .get(item_scoping.scoping.project_key)
2489                    .rate_limits()
2490                    .merge(limits);
2491            }
2492        }
2493
2494        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2495            Err(buckets) => buckets,
2496            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2497        }
2498    }
2499
2500    /// Check and apply rate limits to metrics buckets for transactions and spans.
2501    #[cfg(feature = "processing")]
2502    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2503        relay_log::trace!("handle_rate_limit_buckets");
2504
2505        let scoping = *bucket_limiter.scoping();
2506
2507        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2508            let global_config = self.inner.global_config.current();
2509            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2510
2511            // We set over_accept_once such that the limit is actually reached, which allows subsequent
2512            // calls with quantity=0 to be rate limited.
2513            let over_accept_once = true;
2514            let mut rate_limits = RateLimits::new();
2515
2516            for category in [DataCategory::Transaction, DataCategory::Span] {
2517                let count = bucket_limiter.count(category);
2518
2519                let timer = Instant::now();
2520                let mut is_limited = false;
2521
2522                if let Some(count) = count {
2523                    match rate_limiter
2524                        .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2525                        .await
2526                    {
2527                        Ok(limits) => {
2528                            is_limited = limits.is_limited();
2529                            rate_limits.merge(limits)
2530                        }
2531                        Err(e) => relay_log::error!(error = &e as &dyn Error),
2532                    }
2533                }
2534
2535                relay_statsd::metric!(
2536                    timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2537                    category = category.name(),
2538                    limited = if is_limited { "true" } else { "false" },
2539                    count = match count {
2540                        None => "none",
2541                        Some(0) => "0",
2542                        Some(1) => "1",
2543                        Some(1..=10) => "10",
2544                        Some(1..=25) => "25",
2545                        Some(1..=50) => "50",
2546                        Some(51..=100) => "100",
2547                        Some(101..=500) => "500",
2548                        _ => "> 500",
2549                    },
2550                );
2551            }
2552
2553            if rate_limits.is_limited() {
2554                let was_enforced =
2555                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2556
2557                if was_enforced {
2558                    // Update the rate limits in the project cache.
2559                    self.inner
2560                        .project_cache
2561                        .get(scoping.project_key)
2562                        .rate_limits()
2563                        .merge(rate_limits);
2564                }
2565            }
2566        }
2567
2568        bucket_limiter.into_buckets()
2569    }
2570
2571    /// Cardinality limits the passed buckets and returns a filtered vector of only accepted buckets.
2572    #[cfg(feature = "processing")]
2573    async fn cardinality_limit_buckets(
2574        &self,
2575        scoping: Scoping,
2576        limits: &[CardinalityLimit],
2577        buckets: Vec<Bucket>,
2578    ) -> Vec<Bucket> {
2579        let global_config = self.inner.global_config.current();
2580        let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2581
2582        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2583            return buckets;
2584        }
2585
2586        let Some(ref limiter) = self.inner.cardinality_limiter else {
2587            return buckets;
2588        };
2589
2590        let scope = relay_cardinality::Scoping {
2591            organization_id: scoping.organization_id,
2592            project_id: scoping.project_id,
2593        };
2594
2595        let limits = match limiter
2596            .check_cardinality_limits(scope, limits, buckets)
2597            .await
2598        {
2599            Ok(limits) => limits,
2600            Err((buckets, error)) => {
2601                relay_log::error!(
2602                    error = &error as &dyn std::error::Error,
2603                    "cardinality limiter failed"
2604                );
2605                return buckets;
2606            }
2607        };
2608
2609        let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2610        if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2611            for limit in limits.exceeded_limits() {
2612                relay_log::with_scope(
2613                    |scope| {
2614                        // Set the organization as user so we can alert on distinct org_ids.
2615                        scope.set_user(Some(relay_log::sentry::User {
2616                            id: Some(scoping.organization_id.to_string()),
2617                            ..Default::default()
2618                        }));
2619                    },
2620                    || {
2621                        relay_log::error!(
2622                            tags.organization_id = scoping.organization_id.value(),
2623                            tags.limit_id = limit.id,
2624                            tags.passive = limit.passive,
2625                            "Cardinality Limit"
2626                        );
2627                    },
2628                );
2629            }
2630        }
2631
2632        for (limit, reports) in limits.cardinality_reports() {
2633            for report in reports {
2634                self.inner
2635                    .metric_outcomes
2636                    .cardinality(scoping, limit, report);
2637            }
2638        }
2639
2640        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2641            return limits.into_source();
2642        }
2643
2644        let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2645
2646        for (bucket, exceeded) in rejected {
2647            self.inner.metric_outcomes.track(
2648                scoping,
2649                &[bucket],
2650                Outcome::CardinalityLimited(exceeded.id.clone()),
2651            );
2652        }
2653        accepted
2654    }
2655
2656    /// Processes metric buckets and sends them to kafka.
2657    ///
2658    /// This function runs the following steps:
2659    ///  - cardinality limiting
2660    ///  - rate limiting
2661    ///  - submit to `StoreForwarder`
2662    #[cfg(feature = "processing")]
2663    async fn encode_metrics_processing(
2664        &self,
2665        message: FlushBuckets,
2666        store_forwarder: &Addr<Store>,
2667    ) {
2668        use crate::constants::DEFAULT_EVENT_RETENTION;
2669        use crate::services::store::StoreMetrics;
2670
2671        for ProjectBuckets {
2672            buckets,
2673            scoping,
2674            project_info,
2675            ..
2676        } in message.buckets.into_values()
2677        {
2678            let buckets = self
2679                .rate_limit_buckets(scoping, &project_info, buckets)
2680                .await;
2681
2682            let limits = project_info.get_cardinality_limits();
2683            let buckets = self
2684                .cardinality_limit_buckets(scoping, limits, buckets)
2685                .await;
2686
2687            if buckets.is_empty() {
2688                continue;
2689            }
2690
2691            let retention = project_info
2692                .config
2693                .event_retention
2694                .unwrap_or(DEFAULT_EVENT_RETENTION);
2695
2696            // The store forwarder takes care of bucket splitting internally, so we can submit the
2697            // entire list of buckets. There is no batching needed here.
2698            store_forwarder.send(StoreMetrics {
2699                buckets,
2700                scoping,
2701                retention,
2702            });
2703        }
2704    }
2705
2706    /// Serializes metric buckets to JSON and sends them to the upstream.
2707    ///
2708    /// This function runs the following steps:
2709    ///  - partitioning
2710    ///  - batching by configured size limit
2711    ///  - serialize to JSON and pack in an envelope
2712    ///  - submit the envelope to upstream or kafka depending on configuration
2713    ///
2714    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2715    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2716    /// already.
2717    fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2718        let FlushBuckets {
2719            partition_key,
2720            buckets,
2721        } = message;
2722
2723        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2724        let upstream = self.inner.config.upstream_descriptor();
2725
2726        for ProjectBuckets {
2727            buckets, scoping, ..
2728        } in buckets.values()
2729        {
2730            let dsn = PartialDsn::outbound(scoping, upstream);
2731
2732            relay_statsd::metric!(
2733                histogram(RelayHistograms::PartitionKeys) = u64::from(partition_key)
2734            );
2735
2736            let mut num_batches = 0;
2737            for batch in BucketsView::from(buckets).by_size(batch_size) {
2738                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2739
2740                let mut item = Item::new(ItemType::MetricBuckets);
2741                item.set_source_quantities(crate::metrics::extract_quantities(batch));
2742                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2743                envelope.add_item(item);
2744
2745                let mut envelope =
2746                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2747                envelope
2748                    .set_partition_key(Some(partition_key))
2749                    .scope(*scoping);
2750
2751                relay_statsd::metric!(
2752                    histogram(RelayHistograms::BucketsPerBatch) = batch.len() as u64
2753                );
2754
2755                self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2756                num_batches += 1;
2757            }
2758
2759            relay_statsd::metric!(histogram(RelayHistograms::BatchesPerPartition) = num_batches);
2760        }
2761    }
2762
2763    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
2764    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2765        if partition.is_empty() {
2766            return;
2767        }
2768
2769        let (unencoded, project_info) = partition.take();
2770        let http_encoding = self.inner.config.http_encoding();
2771        let encoded = match encode_payload(&unencoded, http_encoding) {
2772            Ok(payload) => payload,
2773            Err(error) => {
2774                let error = &error as &dyn std::error::Error;
2775                relay_log::error!(error, "failed to encode metrics payload");
2776                return;
2777            }
2778        };
2779
2780        let request = SendMetricsRequest {
2781            partition_key: partition_key.to_string(),
2782            unencoded,
2783            encoded,
2784            project_info,
2785            http_encoding,
2786            metric_outcomes: self.inner.metric_outcomes.clone(),
2787        };
2788
2789        self.inner.addrs.upstream_relay.send(SendRequest(request));
2790    }
2791
2792    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
2793    ///
2794    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
2795    /// payload directly instead of per-project Envelopes.
2796    ///
2797    /// This function runs the following steps:
2798    ///  - partitioning
2799    ///  - batching by configured size limit
2800    ///  - serialize to JSON
2801    ///  - submit directly to the upstream
2802    ///
2803    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2804    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2805    /// already.
2806    fn encode_metrics_global(&self, message: FlushBuckets) {
2807        let FlushBuckets {
2808            partition_key,
2809            buckets,
2810        } = message;
2811
2812        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2813        let mut partition = Partition::new(batch_size);
2814        let mut partition_splits = 0;
2815
2816        for ProjectBuckets {
2817            buckets, scoping, ..
2818        } in buckets.values()
2819        {
2820            for bucket in buckets {
2821                let mut remaining = Some(BucketView::new(bucket));
2822
2823                while let Some(bucket) = remaining.take() {
2824                    if let Some(next) = partition.insert(bucket, *scoping) {
2825                        // A part of the bucket could not be inserted. Take the partition and submit
2826                        // it immediately. Repeat until the final part was inserted. This should
2827                        // always result in a request, otherwise we would enter an endless loop.
2828                        self.send_global_partition(partition_key, &mut partition);
2829                        remaining = Some(next);
2830                        partition_splits += 1;
2831                    }
2832                }
2833            }
2834        }
2835
2836        if partition_splits > 0 {
2837            metric!(histogram(RelayHistograms::PartitionSplits) = partition_splits);
2838        }
2839
2840        self.send_global_partition(partition_key, &mut partition);
2841    }
2842
2843    async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2844        for (project_key, pb) in message.buckets.iter_mut() {
2845            let buckets = std::mem::take(&mut pb.buckets);
2846            pb.buckets =
2847                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2848        }
2849
2850        #[cfg(feature = "processing")]
2851        if self.inner.config.processing_enabled()
2852            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2853        {
2854            return self
2855                .encode_metrics_processing(message, store_forwarder)
2856                .await;
2857        }
2858
2859        if self.inner.config.http_global_metrics() {
2860            self.encode_metrics_global(message)
2861        } else {
2862            self.encode_metrics_envelope(cogs, message)
2863        }
2864    }
2865
2866    #[cfg(all(test, feature = "processing"))]
2867    fn redis_rate_limiter_enabled(&self) -> bool {
2868        self.inner.rate_limiter.is_some()
2869    }
2870
2871    async fn handle_message(&self, message: EnvelopeProcessor) {
2872        let ty = message.variant();
2873        let feature_weights = self.feature_weights(&message);
2874
2875        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2876            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2877
2878            match message {
2879                EnvelopeProcessor::ProcessEnvelope(m) => {
2880                    self.handle_process_envelope(&mut cogs, *m).await
2881                }
2882                EnvelopeProcessor::ProcessProjectMetrics(m) => {
2883                    self.handle_process_metrics(&mut cogs, *m)
2884                }
2885                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2886                    self.handle_process_batched_metrics(&mut cogs, *m)
2887                }
2888                EnvelopeProcessor::FlushBuckets(m) => {
2889                    self.handle_flush_buckets(&mut cogs, *m).await
2890                }
2891                EnvelopeProcessor::SubmitClientReports(m) => {
2892                    self.handle_submit_client_reports(&mut cogs, *m)
2893                }
2894            }
2895        });
2896    }
2897
2898    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2899        match message {
2900            // Envelope is split later and tokens are attributed then.
2901            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2902            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2903            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2904            EnvelopeProcessor::FlushBuckets(v) => v
2905                .buckets
2906                .values()
2907                .map(|s| {
2908                    if self.inner.config.processing_enabled() {
2909                        // Processing does not encode the metrics but instead rate and cardinality
2910                        // limits the metrics, which scales by count and not size.
2911                        relay_metrics::cogs::ByCount(&s.buckets).into()
2912                    } else {
2913                        relay_metrics::cogs::BySize(&s.buckets).into()
2914                    }
2915                })
2916                .fold(FeatureWeights::none(), FeatureWeights::merge),
2917            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2918        }
2919    }
2920}
2921
2922impl Service for EnvelopeProcessorService {
2923    type Interface = EnvelopeProcessor;
2924
2925    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2926        while let Some(message) = rx.recv().await {
2927            let service = self.clone();
2928            self.inner
2929                .pool
2930                .spawn_async(
2931                    async move {
2932                        service.handle_message(message).await;
2933                    }
2934                    .boxed(),
2935                )
2936                .await;
2937        }
2938    }
2939}
2940
2941/// Result of the enforcement of rate limiting.
2942///
2943/// If the event is already `None` or it's rate limited, it will be `None`
2944/// within the [`Annotated`].
2945struct EnforcementResult {
2946    event: Annotated<Event>,
2947    #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2948    rate_limits: RateLimits,
2949}
2950
2951impl EnforcementResult {
2952    /// Creates a new [`EnforcementResult`].
2953    pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
2954        Self { event, rate_limits }
2955    }
2956}
2957
2958#[derive(Clone)]
2959enum RateLimiter {
2960    Cached,
2961    #[cfg(feature = "processing")]
2962    Consistent(Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>),
2963}
2964
2965impl RateLimiter {
2966    async fn enforce<Group>(
2967        &self,
2968        managed_envelope: &mut TypedEnvelope<Group>,
2969        event: Annotated<Event>,
2970        _extracted_metrics: &mut ProcessingExtractedMetrics,
2971        ctx: processing::Context<'_>,
2972    ) -> Result<EnforcementResult, ProcessingError> {
2973        if managed_envelope.envelope().is_empty() && event.value().is_none() {
2974            return Ok(EnforcementResult::new(event, RateLimits::default()));
2975        }
2976
2977        let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
2978        if quotas.is_empty() {
2979            return Ok(EnforcementResult::new(event, RateLimits::default()));
2980        }
2981
2982        let event_category = event_category(&event);
2983
2984        // We extract the rate limiters, in case we perform consistent rate limiting, since we will
2985        // need Redis access.
2986        //
2987        // When invoking the rate limiter, capture if the event item has been rate limited to also
2988        // remove it from the processing state eventually.
2989        let this = self.clone();
2990        let mut envelope_limiter =
2991            EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
2992                let this = this.clone();
2993
2994                async move {
2995                    match this {
2996                        #[cfg(feature = "processing")]
2997                        RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
2998                            rate_limiter
2999                                .is_rate_limited(quotas, item_scope, _quantity, false)
3000                                .await?,
3001                        ),
3002                        _ => Ok::<_, ProcessingError>(
3003                            ctx.rate_limits.check_with_quotas(quotas, item_scope),
3004                        ),
3005                    }
3006                }
3007            });
3008
3009        // Tell the envelope limiter about the event, since it has been removed from the Envelope at
3010        // this stage in processing.
3011        if let Some(category) = event_category {
3012            envelope_limiter.assume_event(category);
3013        }
3014
3015        let scoping = managed_envelope.scoping();
3016        let (enforcement, rate_limits) =
3017            metric!(timer(RelayTimers::EventProcessingRateLimiting), {
3018                envelope_limiter
3019                    .compute(managed_envelope.envelope_mut(), &scoping)
3020                    .await
3021            })?;
3022        let event_active = enforcement.is_event_active();
3023
3024        // Use the same rate limits as used for the envelope on the metrics.
3025        // Those rate limits should not be checked for expiry or similar to ensure a consistent
3026        // limiting of envelope items and metrics.
3027        #[cfg(feature = "processing")]
3028        _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3029        enforcement.apply_with_outcomes(managed_envelope);
3030
3031        if event_active {
3032            debug_assert!(managed_envelope.envelope().is_empty());
3033            return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3034        }
3035
3036        Ok(EnforcementResult::new(event, rate_limits))
3037    }
3038}
3039
3040pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3041    let envelope_body: Vec<u8> = match http_encoding {
3042        HttpEncoding::Identity => return Ok(body.clone()),
3043        HttpEncoding::Deflate => {
3044            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3045            encoder.write_all(body.as_ref())?;
3046            encoder.finish()?
3047        }
3048        HttpEncoding::Gzip => {
3049            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3050            encoder.write_all(body.as_ref())?;
3051            encoder.finish()?
3052        }
3053        HttpEncoding::Br => {
3054            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
3055            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3056            encoder.write_all(body.as_ref())?;
3057            encoder.into_inner()
3058        }
3059        HttpEncoding::Zstd => {
3060            // Use the fastest compression level, our main objective here is to get the best
3061            // compression ratio for least amount of time spent.
3062            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3063            encoder.write_all(body.as_ref())?;
3064            encoder.finish()?
3065        }
3066    };
3067
3068    Ok(envelope_body.into())
3069}
3070
3071/// An upstream request that submits an envelope via HTTP.
3072#[derive(Debug)]
3073pub struct SendEnvelope {
3074    pub envelope: TypedEnvelope<Processed>,
3075    pub body: Bytes,
3076    pub http_encoding: HttpEncoding,
3077    pub project_cache: ProjectCacheHandle,
3078}
3079
3080impl UpstreamRequest for SendEnvelope {
3081    fn method(&self) -> reqwest::Method {
3082        reqwest::Method::POST
3083    }
3084
3085    fn path(&self) -> Cow<'_, str> {
3086        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3087    }
3088
3089    fn route(&self) -> &'static str {
3090        "envelope"
3091    }
3092
3093    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3094        let envelope_body = self.body.clone();
3095        metric!(histogram(RelayHistograms::UpstreamEnvelopeBodySize) = envelope_body.len() as u64);
3096
3097        let meta = &self.envelope.meta();
3098        let shard = self.envelope.partition_key().map(|p| p.to_string());
3099        builder
3100            .content_encoding(self.http_encoding)
3101            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3102            .header_opt("User-Agent", meta.user_agent())
3103            .header("X-Sentry-Auth", meta.auth_header())
3104            .header("X-Forwarded-For", meta.forwarded_for())
3105            .header("Content-Type", envelope::CONTENT_TYPE)
3106            .header_opt("X-Sentry-Relay-Shard", shard)
3107            .body(envelope_body);
3108
3109        Ok(())
3110    }
3111
3112    fn sign(&mut self) -> Option<Sign> {
3113        Some(Sign::Optional(SignatureType::RequestSign))
3114    }
3115
3116    fn respond(
3117        self: Box<Self>,
3118        result: Result<http::Response, UpstreamRequestError>,
3119    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3120        Box::pin(async move {
3121            let result = match result {
3122                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3123                Err(error) => Err(error),
3124            };
3125
3126            match result {
3127                Ok(()) => self.envelope.accept(),
3128                Err(error) if error.is_received() => {
3129                    let scoping = self.envelope.scoping();
3130                    self.envelope.accept();
3131
3132                    if let UpstreamRequestError::RateLimited(limits) = error {
3133                        self.project_cache
3134                            .get(scoping.project_key)
3135                            .rate_limits()
3136                            .merge(limits.scope(&scoping));
3137                    }
3138                }
3139                Err(error) => {
3140                    // Errors are only logged for what we consider an internal discard reason. These
3141                    // indicate errors in the infrastructure or implementation bugs.
3142                    let mut envelope = self.envelope;
3143                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3144                    relay_log::error!(
3145                        error = &error as &dyn Error,
3146                        tags.project_key = %envelope.scoping().project_key,
3147                        "error sending envelope"
3148                    );
3149                }
3150            }
3151        })
3152    }
3153}
3154
3155/// A container for metric buckets from multiple projects.
3156///
3157/// This container is used to send metrics to the upstream in global batches as part of the
3158/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
3159/// the size of all metrics and allows to split them into multiple batches. See
3160/// [`insert`](Self::insert) for more information.
3161#[derive(Debug)]
3162struct Partition<'a> {
3163    max_size: usize,
3164    remaining: usize,
3165    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3166    project_info: HashMap<ProjectKey, Scoping>,
3167}
3168
3169impl<'a> Partition<'a> {
3170    /// Creates a new partition with the given maximum size in bytes.
3171    pub fn new(size: usize) -> Self {
3172        Self {
3173            max_size: size,
3174            remaining: size,
3175            views: HashMap::new(),
3176            project_info: HashMap::new(),
3177        }
3178    }
3179
3180    /// Inserts a bucket into the partition, splitting it if necessary.
3181    ///
3182    /// This function attempts to add the bucket to this partition. If the bucket does not fit
3183    /// entirely into the partition given its maximum size, the remaining part of the bucket is
3184    /// returned from this function call.
3185    ///
3186    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
3187    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
3188    /// partition. Afterwards, the caller is responsible to call this function again with the
3189    /// remaining bucket until it is fully inserted.
3190    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3191        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3192
3193        if let Some(current) = current {
3194            self.remaining = self.remaining.saturating_sub(current.estimated_size());
3195            self.views
3196                .entry(scoping.project_key)
3197                .or_default()
3198                .push(current);
3199
3200            self.project_info
3201                .entry(scoping.project_key)
3202                .or_insert(scoping);
3203        }
3204
3205        next
3206    }
3207
3208    /// Returns `true` if the partition does not hold any data.
3209    fn is_empty(&self) -> bool {
3210        self.views.is_empty()
3211    }
3212
3213    /// Returns the serialized buckets for this partition.
3214    ///
3215    /// This empties the partition, so that it can be reused.
3216    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3217        #[derive(serde::Serialize)]
3218        struct Wrapper<'a> {
3219            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3220        }
3221
3222        let buckets = &self.views;
3223        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3224
3225        let scopings = self.project_info.clone();
3226        self.project_info.clear();
3227
3228        self.views.clear();
3229        self.remaining = self.max_size;
3230
3231        (payload, scopings)
3232    }
3233}
3234
3235/// An upstream request that submits metric buckets via HTTP.
3236///
3237/// This request is not awaited. It automatically tracks outcomes if the request is not received.
3238#[derive(Debug)]
3239struct SendMetricsRequest {
3240    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
3241    partition_key: String,
3242    /// Serialized metric buckets without encoding applied, used for signing.
3243    unencoded: Bytes,
3244    /// Serialized metric buckets with the stated HTTP encoding applied.
3245    encoded: Bytes,
3246    /// Mapping of all contained project keys to their scoping and extraction mode.
3247    ///
3248    /// Used to track outcomes for transmission failures.
3249    project_info: HashMap<ProjectKey, Scoping>,
3250    /// Encoding (compression) of the payload.
3251    http_encoding: HttpEncoding,
3252    /// Metric outcomes instance to send outcomes on error.
3253    metric_outcomes: MetricOutcomes,
3254}
3255
3256impl SendMetricsRequest {
3257    fn create_error_outcomes(self) {
3258        #[derive(serde::Deserialize)]
3259        struct Wrapper {
3260            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3261        }
3262
3263        let buckets = match serde_json::from_slice(&self.unencoded) {
3264            Ok(Wrapper { buckets }) => buckets,
3265            Err(err) => {
3266                relay_log::error!(
3267                    error = &err as &dyn std::error::Error,
3268                    "failed to parse buckets from failed transmission"
3269                );
3270                return;
3271            }
3272        };
3273
3274        for (key, buckets) in buckets {
3275            let Some(&scoping) = self.project_info.get(&key) else {
3276                relay_log::error!("missing scoping for project key");
3277                continue;
3278            };
3279
3280            self.metric_outcomes.track(
3281                scoping,
3282                &buckets,
3283                Outcome::Invalid(DiscardReason::Internal),
3284            );
3285        }
3286    }
3287}
3288
3289impl UpstreamRequest for SendMetricsRequest {
3290    fn set_relay_id(&self) -> bool {
3291        true
3292    }
3293
3294    fn sign(&mut self) -> Option<Sign> {
3295        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3296    }
3297
3298    fn method(&self) -> reqwest::Method {
3299        reqwest::Method::POST
3300    }
3301
3302    fn path(&self) -> Cow<'_, str> {
3303        "/api/0/relays/metrics/".into()
3304    }
3305
3306    fn route(&self) -> &'static str {
3307        "global_metrics"
3308    }
3309
3310    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3311        metric!(histogram(RelayHistograms::UpstreamMetricsBodySize) = self.encoded.len() as u64);
3312
3313        builder
3314            .content_encoding(self.http_encoding)
3315            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3316            .header(header::CONTENT_TYPE, b"application/json")
3317            .body(self.encoded.clone());
3318
3319        Ok(())
3320    }
3321
3322    fn respond(
3323        self: Box<Self>,
3324        result: Result<http::Response, UpstreamRequestError>,
3325    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3326        Box::pin(async {
3327            match result {
3328                Ok(mut response) => {
3329                    response.consume().await.ok();
3330                }
3331                Err(error) => {
3332                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3333
3334                    // If the request did not arrive at the upstream, we are responsible for outcomes.
3335                    // Otherwise, the upstream is responsible to log outcomes.
3336                    if error.is_received() {
3337                        return;
3338                    }
3339
3340                    self.create_error_outcomes()
3341                }
3342            }
3343        })
3344    }
3345}
3346
3347/// Container for global and project level [`Quota`].
3348#[derive(Copy, Clone, Debug)]
3349struct CombinedQuotas<'a> {
3350    global_quotas: &'a [Quota],
3351    project_quotas: &'a [Quota],
3352}
3353
3354impl<'a> CombinedQuotas<'a> {
3355    /// Returns a new [`CombinedQuotas`].
3356    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3357        Self {
3358            global_quotas: &global_config.quotas,
3359            project_quotas,
3360        }
3361    }
3362
3363    /// Returns `true` if both global quotas and project quotas are empty.
3364    pub fn is_empty(&self) -> bool {
3365        self.len() == 0
3366    }
3367
3368    /// Returns the number of both global and project quotas.
3369    pub fn len(&self) -> usize {
3370        self.global_quotas.len() + self.project_quotas.len()
3371    }
3372}
3373
3374impl<'a> IntoIterator for CombinedQuotas<'a> {
3375    type Item = &'a Quota;
3376    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3377
3378    fn into_iter(self) -> Self::IntoIter {
3379        self.global_quotas.iter().chain(self.project_quotas.iter())
3380    }
3381}
3382
3383#[cfg(test)]
3384mod tests {
3385    use std::collections::BTreeMap;
3386
3387    use insta::assert_debug_snapshot;
3388    use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3389    use relay_common::glob2::LazyGlob;
3390    use relay_dynamic_config::ProjectConfig;
3391    use relay_event_normalization::{
3392        MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3393        TransactionNameRule,
3394    };
3395    use relay_event_schema::protocol::TransactionSource;
3396    use relay_pii::DataScrubbingConfig;
3397    use similar_asserts::assert_eq;
3398
3399    use crate::metrics_extraction::IntoMetric;
3400    use crate::metrics_extraction::transactions::types::{
3401        CommonTags, TransactionMeasurementTags, TransactionMetric,
3402    };
3403    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3404
3405    #[cfg(feature = "processing")]
3406    use {
3407        relay_metrics::BucketValue,
3408        relay_quotas::{QuotaScope, ReasonCode},
3409        relay_test::mock_service,
3410    };
3411
3412    use super::*;
3413
3414    #[cfg(feature = "processing")]
3415    fn mock_quota(id: &str) -> Quota {
3416        Quota {
3417            id: Some(id.into()),
3418            categories: smallvec::smallvec![DataCategory::MetricBucket],
3419            scope: QuotaScope::Organization,
3420            scope_id: None,
3421            limit: Some(0),
3422            window: None,
3423            reason_code: None,
3424            namespace: None,
3425        }
3426    }
3427
3428    #[cfg(feature = "processing")]
3429    #[test]
3430    fn test_dynamic_quotas() {
3431        let global_config = GlobalConfig {
3432            quotas: vec![mock_quota("foo"), mock_quota("bar")],
3433            ..Default::default()
3434        };
3435
3436        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3437
3438        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3439
3440        assert_eq!(dynamic_quotas.len(), 4);
3441        assert!(!dynamic_quotas.is_empty());
3442
3443        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3444        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3445    }
3446
3447    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
3448    /// also ratelimit the next batches in the same message automatically.
3449    #[cfg(feature = "processing")]
3450    #[tokio::test]
3451    async fn test_ratelimit_per_batch() {
3452        use relay_base_schema::organization::OrganizationId;
3453        use relay_protocol::FiniteF64;
3454
3455        let rate_limited_org = Scoping {
3456            organization_id: OrganizationId::new(1),
3457            project_id: ProjectId::new(21),
3458            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3459            key_id: Some(17),
3460        };
3461
3462        let not_rate_limited_org = Scoping {
3463            organization_id: OrganizationId::new(2),
3464            project_id: ProjectId::new(21),
3465            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3466            key_id: Some(17),
3467        };
3468
3469        let message = {
3470            let project_info = {
3471                let quota = Quota {
3472                    id: Some("testing".into()),
3473                    categories: vec![DataCategory::MetricBucket].into(),
3474                    scope: relay_quotas::QuotaScope::Organization,
3475                    scope_id: Some(rate_limited_org.organization_id.to_string()),
3476                    limit: Some(0),
3477                    window: None,
3478                    reason_code: Some(ReasonCode::new("test")),
3479                    namespace: None,
3480                };
3481
3482                let mut config = ProjectConfig::default();
3483                config.quotas.push(quota);
3484
3485                Arc::new(ProjectInfo {
3486                    config,
3487                    ..Default::default()
3488                })
3489            };
3490
3491            let project_metrics = |scoping| ProjectBuckets {
3492                buckets: vec![Bucket {
3493                    name: "d:transactions/bar".into(),
3494                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3495                    timestamp: UnixTimestamp::now(),
3496                    tags: Default::default(),
3497                    width: 10,
3498                    metadata: BucketMetadata::default(),
3499                }],
3500                rate_limits: Default::default(),
3501                project_info: project_info.clone(),
3502                scoping,
3503            };
3504
3505            let buckets = hashbrown::HashMap::from([
3506                (
3507                    rate_limited_org.project_key,
3508                    project_metrics(rate_limited_org),
3509                ),
3510                (
3511                    not_rate_limited_org.project_key,
3512                    project_metrics(not_rate_limited_org),
3513                ),
3514            ]);
3515
3516            FlushBuckets {
3517                partition_key: 0,
3518                buckets,
3519            }
3520        };
3521
3522        // ensure the order of the map while iterating is as expected.
3523        assert_eq!(message.buckets.keys().count(), 2);
3524
3525        let config = {
3526            let config_json = serde_json::json!({
3527                "processing": {
3528                    "enabled": true,
3529                    "kafka_config": [],
3530                    "redis": {
3531                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3532                    }
3533                }
3534            });
3535            Config::from_json_value(config_json).unwrap()
3536        };
3537
3538        let (store, handle) = {
3539            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3540                let org_id = match msg {
3541                    Store::Metrics(x) => x.scoping.organization_id,
3542                    _ => panic!("received envelope when expecting only metrics"),
3543                };
3544                org_ids.push(org_id);
3545            };
3546
3547            mock_service("store_forwarder", vec![], f)
3548        };
3549
3550        let processor = create_test_processor(config).await;
3551        assert!(processor.redis_rate_limiter_enabled());
3552
3553        processor.encode_metrics_processing(message, &store).await;
3554
3555        drop(store);
3556        let orgs_not_ratelimited = handle.await.unwrap();
3557
3558        assert_eq!(
3559            orgs_not_ratelimited,
3560            vec![not_rate_limited_org.organization_id]
3561        );
3562    }
3563
3564    #[tokio::test]
3565    async fn test_browser_version_extraction_with_pii_like_data() {
3566        let processor = create_test_processor(Default::default()).await;
3567        let outcome_aggregator = Addr::dummy();
3568        let event_id = EventId::new();
3569
3570        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3571            .parse()
3572            .unwrap();
3573
3574        let request_meta = RequestMeta::new(dsn);
3575        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3576
3577        envelope.add_item({
3578                let mut item = Item::new(ItemType::Event);
3579                item.set_payload(
3580                    ContentType::Json,
3581                    r#"
3582                    {
3583                        "request": {
3584                            "headers": [
3585                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3586                            ]
3587                        }
3588                    }
3589                "#,
3590                );
3591                item
3592            });
3593
3594        let mut datascrubbing_settings = DataScrubbingConfig::default();
3595        // enable all the default scrubbing
3596        datascrubbing_settings.scrub_data = true;
3597        datascrubbing_settings.scrub_defaults = true;
3598        datascrubbing_settings.scrub_ip_addresses = true;
3599
3600        // Make sure to mask any IP-like looking data
3601        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3602
3603        let config = ProjectConfig {
3604            datascrubbing_settings,
3605            pii_config: Some(pii_config),
3606            ..Default::default()
3607        };
3608
3609        let project_info = ProjectInfo {
3610            config,
3611            ..Default::default()
3612        };
3613
3614        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3615        assert_eq!(envelopes.len(), 1);
3616
3617        let (group, envelope) = envelopes.pop().unwrap();
3618        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3619
3620        let message = ProcessEnvelopeGrouped {
3621            group,
3622            envelope,
3623            ctx: processing::Context {
3624                project_info: &project_info,
3625                ..processing::Context::for_test()
3626            },
3627            reservoir_counters: &ReservoirCounters::default(),
3628        };
3629
3630        let Ok(Some(Submit::Envelope(mut new_envelope))) =
3631            processor.process(&mut Token::noop(), message).await
3632        else {
3633            panic!();
3634        };
3635        let new_envelope = new_envelope.envelope_mut();
3636
3637        let event_item = new_envelope.items().last().unwrap();
3638        let annotated_event: Annotated<Event> =
3639            Annotated::from_json_bytes(&event_item.payload()).unwrap();
3640        let event = annotated_event.into_value().unwrap();
3641        let headers = event
3642            .request
3643            .into_value()
3644            .unwrap()
3645            .headers
3646            .into_value()
3647            .unwrap();
3648
3649        // IP-like data must be masked
3650        assert_eq!(
3651            Some(
3652                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3653            ),
3654            headers.get_header("User-Agent")
3655        );
3656        // But we still get correct browser and version number
3657        let contexts = event.contexts.into_value().unwrap();
3658        let browser = contexts.0.get("browser").unwrap();
3659        assert_eq!(
3660            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3661            browser.to_json().unwrap()
3662        );
3663    }
3664
3665    #[tokio::test]
3666    #[cfg(feature = "processing")]
3667    async fn test_materialize_dsc() {
3668        use crate::services::projects::project::PublicKeyConfig;
3669
3670        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3671            .parse()
3672            .unwrap();
3673        let request_meta = RequestMeta::new(dsn);
3674        let mut envelope = Envelope::from_request(None, request_meta);
3675
3676        let dsc = r#"{
3677            "trace_id": "00000000-0000-0000-0000-000000000000",
3678            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3679            "sample_rate": "0.2"
3680        }"#;
3681        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3682
3683        let mut item = Item::new(ItemType::Event);
3684        item.set_payload(ContentType::Json, r#"{}"#);
3685        envelope.add_item(item);
3686
3687        let outcome_aggregator = Addr::dummy();
3688        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3689
3690        let mut project_info = ProjectInfo::default();
3691        project_info.public_keys.push(PublicKeyConfig {
3692            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3693            numeric_id: Some(1),
3694        });
3695
3696        let config = serde_json::json!({
3697            "processing": {
3698                "enabled": true,
3699                "kafka_config": [],
3700            }
3701        });
3702
3703        let message = ProcessEnvelopeGrouped {
3704            group: ProcessingGroup::Transaction,
3705            envelope: managed_envelope,
3706            ctx: processing::Context {
3707                config: &Config::from_json_value(config.clone()).unwrap(),
3708                project_info: &project_info,
3709                sampling_project_info: Some(&project_info),
3710                ..processing::Context::for_test()
3711            },
3712            reservoir_counters: &ReservoirCounters::default(),
3713        };
3714
3715        let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3716        let Ok(Some(Submit::Envelope(envelope))) =
3717            processor.process(&mut Token::noop(), message).await
3718        else {
3719            panic!();
3720        };
3721        let event = envelope
3722            .envelope()
3723            .get_item_by(|item| item.ty() == &ItemType::Event)
3724            .unwrap();
3725
3726        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3727        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3728        Object(
3729            {
3730                "environment": ~,
3731                "public_key": String(
3732                    "e12d836b15bb49d7bbf99e64295d995b",
3733                ),
3734                "release": ~,
3735                "replay_id": ~,
3736                "sample_rate": String(
3737                    "0.2",
3738                ),
3739                "trace_id": String(
3740                    "00000000000000000000000000000000",
3741                ),
3742                "transaction": ~,
3743            },
3744        )
3745        "###);
3746    }
3747
3748    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3749        let mut event = Annotated::<Event>::from_json(
3750            r#"
3751            {
3752                "type": "transaction",
3753                "transaction": "/foo/",
3754                "timestamp": 946684810.0,
3755                "start_timestamp": 946684800.0,
3756                "contexts": {
3757                    "trace": {
3758                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3759                        "span_id": "fa90fdead5f74053",
3760                        "op": "http.server",
3761                        "type": "trace"
3762                    }
3763                },
3764                "transaction_info": {
3765                    "source": "url"
3766                }
3767            }
3768            "#,
3769        )
3770        .unwrap();
3771        let e = event.value_mut().as_mut().unwrap();
3772        e.transaction.set_value(Some(transaction_name.into()));
3773
3774        e.transaction_info
3775            .value_mut()
3776            .as_mut()
3777            .unwrap()
3778            .source
3779            .set_value(Some(source));
3780
3781        relay_statsd::with_capturing_test_client(|| {
3782            utils::log_transaction_name_metrics(&mut event, |event| {
3783                let config = NormalizationConfig {
3784                    transaction_name_config: TransactionNameConfig {
3785                        rules: &[TransactionNameRule {
3786                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
3787                            expiry: DateTime::<Utc>::MAX_UTC,
3788                            redaction: RedactionRule::Replace {
3789                                substitution: "*".to_owned(),
3790                            },
3791                        }],
3792                    },
3793                    ..Default::default()
3794                };
3795                relay_event_normalization::normalize_event(event, &config)
3796            });
3797        })
3798    }
3799
3800    #[test]
3801    fn test_log_transaction_metrics_none() {
3802        let captures = capture_test_event("/nothing", TransactionSource::Url);
3803        insta::assert_debug_snapshot!(captures, @r###"
3804        [
3805            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3806        ]
3807        "###);
3808    }
3809
3810    #[test]
3811    fn test_log_transaction_metrics_rule() {
3812        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3813        insta::assert_debug_snapshot!(captures, @r###"
3814        [
3815            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3816        ]
3817        "###);
3818    }
3819
3820    #[test]
3821    fn test_log_transaction_metrics_pattern() {
3822        let captures = capture_test_event("/something/12345", TransactionSource::Url);
3823        insta::assert_debug_snapshot!(captures, @r###"
3824        [
3825            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3826        ]
3827        "###);
3828    }
3829
3830    #[test]
3831    fn test_log_transaction_metrics_both() {
3832        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3833        insta::assert_debug_snapshot!(captures, @r###"
3834        [
3835            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3836        ]
3837        "###);
3838    }
3839
3840    #[test]
3841    fn test_log_transaction_metrics_no_match() {
3842        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3843        insta::assert_debug_snapshot!(captures, @r###"
3844        [
3845            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3846        ]
3847        "###);
3848    }
3849
3850    /// Confirms that the hardcoded value we use for the fixed length of the measurement MRI is
3851    /// correct. Unit test is placed here because it has dependencies to relay-server and therefore
3852    /// cannot be called from relay-metrics.
3853    #[test]
3854    fn test_mri_overhead_constant() {
3855        let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
3856
3857        let derived_value = {
3858            let name = "foobar".to_owned();
3859            let value = 5.into(); // Arbitrary value.
3860            let unit = MetricUnit::Duration(DurationUnit::default());
3861            let tags = TransactionMeasurementTags {
3862                measurement_rating: None,
3863                universal_tags: CommonTags(BTreeMap::new()),
3864                score_profile_version: None,
3865            };
3866
3867            let measurement = TransactionMetric::Measurement {
3868                name: name.clone(),
3869                value,
3870                unit,
3871                tags,
3872            };
3873
3874            let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
3875            metric.name.len() - unit.to_string().len() - name.len()
3876        };
3877        assert_eq!(
3878            hardcoded_value, derived_value,
3879            "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
3880        );
3881    }
3882
3883    #[tokio::test]
3884    async fn test_process_metrics_bucket_metadata() {
3885        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3886        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3887        let received_at = Utc::now();
3888        let config = Config::default();
3889
3890        let (aggregator, mut aggregator_rx) = Addr::custom();
3891        let processor = create_test_processor_with_addrs(
3892            config,
3893            Addrs {
3894                aggregator,
3895                ..Default::default()
3896            },
3897        )
3898        .await;
3899
3900        let mut item = Item::new(ItemType::Statsd);
3901        item.set_payload(
3902            ContentType::Text,
3903            "transactions/foo:3182887624:4267882815|s",
3904        );
3905        for (source, expected_received_at) in [
3906            (
3907                BucketSource::External,
3908                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3909            ),
3910            (BucketSource::Internal, None),
3911        ] {
3912            let message = ProcessMetrics {
3913                data: MetricData::Raw(vec![item.clone()]),
3914                project_key,
3915                source,
3916                received_at,
3917                sent_at: Some(Utc::now()),
3918            };
3919            processor.handle_process_metrics(&mut token, message);
3920
3921            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3922            let buckets = merge_buckets.buckets;
3923            assert_eq!(buckets.len(), 1);
3924            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3925        }
3926    }
3927
3928    #[tokio::test]
3929    async fn test_process_batched_metrics() {
3930        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3931        let received_at = Utc::now();
3932        let config = Config::default();
3933
3934        let (aggregator, mut aggregator_rx) = Addr::custom();
3935        let processor = create_test_processor_with_addrs(
3936            config,
3937            Addrs {
3938                aggregator,
3939                ..Default::default()
3940            },
3941        )
3942        .await;
3943
3944        let payload = r#"{
3945    "buckets": {
3946        "11111111111111111111111111111111": [
3947            {
3948                "timestamp": 1615889440,
3949                "width": 0,
3950                "name": "d:custom/endpoint.response_time@millisecond",
3951                "type": "d",
3952                "value": [
3953                  68.0
3954                ],
3955                "tags": {
3956                  "route": "user_index"
3957                }
3958            }
3959        ],
3960        "22222222222222222222222222222222": [
3961            {
3962                "timestamp": 1615889440,
3963                "width": 0,
3964                "name": "d:custom/endpoint.cache_rate@none",
3965                "type": "d",
3966                "value": [
3967                  36.0
3968                ]
3969            }
3970        ]
3971    }
3972}
3973"#;
3974        let message = ProcessBatchedMetrics {
3975            payload: Bytes::from(payload),
3976            source: BucketSource::Internal,
3977            received_at,
3978            sent_at: Some(Utc::now()),
3979        };
3980        processor.handle_process_batched_metrics(&mut token, message);
3981
3982        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3983        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3984
3985        let mut messages = vec![mb1, mb2];
3986        messages.sort_by_key(|mb| mb.project_key);
3987
3988        let actual = messages
3989            .into_iter()
3990            .map(|mb| (mb.project_key, mb.buckets))
3991            .collect::<Vec<_>>();
3992
3993        assert_debug_snapshot!(actual, @r###"
3994        [
3995            (
3996                ProjectKey("11111111111111111111111111111111"),
3997                [
3998                    Bucket {
3999                        timestamp: UnixTimestamp(1615889440),
4000                        width: 0,
4001                        name: MetricName(
4002                            "d:custom/endpoint.response_time@millisecond",
4003                        ),
4004                        value: Distribution(
4005                            [
4006                                68.0,
4007                            ],
4008                        ),
4009                        tags: {
4010                            "route": "user_index",
4011                        },
4012                        metadata: BucketMetadata {
4013                            merges: 1,
4014                            received_at: None,
4015                            extracted_from_indexed: false,
4016                        },
4017                    },
4018                ],
4019            ),
4020            (
4021                ProjectKey("22222222222222222222222222222222"),
4022                [
4023                    Bucket {
4024                        timestamp: UnixTimestamp(1615889440),
4025                        width: 0,
4026                        name: MetricName(
4027                            "d:custom/endpoint.cache_rate@none",
4028                        ),
4029                        value: Distribution(
4030                            [
4031                                36.0,
4032                            ],
4033                        ),
4034                        tags: {},
4035                        metadata: BucketMetadata {
4036                            merges: 1,
4037                            received_at: None,
4038                            extracted_from_indexed: false,
4039                        },
4040                    },
4041                ],
4042            ),
4043        ]
4044        "###);
4045    }
4046}