relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::{Arc, Once};
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, NormalizationLevel, RelayMode};
23use relay_dynamic_config::{CombinedMetricExtractionConfig, ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{
25    ClockDriftProcessor, CombinedMeasurementsConfig, EventValidationConfig, GeoIpLookup,
26    MeasurementsConfig, NormalizationConfig, RawUserAgentInfo, TransactionNameConfig,
27    normalize_event, validate_event,
28};
29use relay_event_schema::processor::ProcessingAction;
30use relay_event_schema::protocol::{
31    ClientReport, Event, EventId, EventType, IpAddr, Metrics, NetworkReportError,
32};
33use relay_filter::FilterStatKey;
34use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
35use relay_pii::PiiConfigError;
36use relay_protocol::{Annotated, Empty};
37use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
38use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
39use relay_statsd::metric;
40use relay_system::{Addr, FromMessage, NoResponse, Service};
41use reqwest::header;
42use smallvec::{SmallVec, smallvec};
43use zstd::stream::Encoder as ZstdEncoder;
44
45use crate::constants::DEFAULT_EVENT_RETENTION;
46use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType};
47use crate::extractors::{PartialDsn, RequestMeta};
48use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
49use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
50use crate::metrics_extraction::transactions::types::ExtractMetricsError;
51use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor};
52use crate::processing::logs::{LogOutput, LogsProcessor};
53use crate::processing::{Forward as _, Output, Processor as _, QuotaRateLimiter};
54use crate::service::ServiceError;
55use crate::services::global_config::GlobalConfigHandle;
56use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
57use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
58use crate::services::processor::event::FiltersStatus;
59use crate::services::projects::cache::ProjectCacheHandle;
60use crate::services::projects::project::{ProjectInfo, ProjectState};
61use crate::services::test_store::{Capture, TestStore};
62use crate::services::upstream::{
63    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
64};
65use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
66use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
67use crate::{http, processing};
68use relay_base_schema::organization::OrganizationId;
69use relay_threading::AsyncPool;
70#[cfg(feature = "processing")]
71use {
72    crate::managed::ItemAction,
73    crate::services::global_rate_limits::{GlobalRateLimits, GlobalRateLimitsServiceHandle},
74    crate::services::processor::nnswitch::SwitchProcessingError,
75    crate::services::store::{Store, StoreEnvelope},
76    crate::utils::Enforcement,
77    itertools::Itertools,
78    relay_cardinality::{
79        CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
80        RedisSetLimiterOptions,
81    },
82    relay_dynamic_config::{CardinalityLimiterMode, MetricExtractionGroups},
83    relay_quotas::{RateLimitingError, RedisRateLimiter},
84    relay_redis::{AsyncRedisClient, RedisClients},
85    std::time::Instant,
86    symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
87};
88
89mod attachment;
90mod dynamic_sampling;
91mod event;
92mod metrics;
93mod nel;
94mod profile;
95mod profile_chunk;
96mod replay;
97mod report;
98mod session;
99mod span;
100mod transaction;
101pub use span::extract_transaction_span;
102
103#[cfg(all(sentry, feature = "processing"))]
104mod playstation;
105mod standalone;
106#[cfg(feature = "processing")]
107mod unreal;
108
109#[cfg(feature = "processing")]
110mod nnswitch;
111
112/// Creates the block only if used with `processing` feature.
113///
114/// Provided code block will be executed only if the provided config has `processing_enabled` set.
115macro_rules! if_processing {
116    ($config:expr, $if_true:block) => {
117        #[cfg(feature = "processing")] {
118            if $config.processing_enabled() $if_true
119        }
120    };
121    ($config:expr, $if_true:block else $if_false:block) => {
122        {
123            #[cfg(feature = "processing")] {
124                if $config.processing_enabled() $if_true else $if_false
125            }
126            #[cfg(not(feature = "processing"))] {
127                $if_false
128            }
129        }
130    };
131}
132
133/// The minimum clock drift for correction to apply.
134const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
135
136#[derive(Debug)]
137pub struct GroupTypeError;
138
139impl Display for GroupTypeError {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        f.write_str("failed to convert processing group into corresponding type")
142    }
143}
144
145impl std::error::Error for GroupTypeError {}
146
147macro_rules! processing_group {
148    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
149        #[derive(Clone, Copy, Debug)]
150        pub struct $ty;
151
152        impl From<$ty> for ProcessingGroup {
153            fn from(_: $ty) -> Self {
154                ProcessingGroup::$variant
155            }
156        }
157
158        impl TryFrom<ProcessingGroup> for $ty {
159            type Error = GroupTypeError;
160
161            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
162                if matches!(value, ProcessingGroup::$variant) {
163                    return Ok($ty);
164                }
165                $($(
166                    if matches!(value, ProcessingGroup::$other) {
167                        return Ok($ty);
168                    }
169                )+)?
170                return Err(GroupTypeError);
171            }
172        }
173    };
174}
175
176/// A marker trait.
177///
178/// Should be used only with groups which are responsible for processing envelopes with events.
179pub trait EventProcessing {}
180
181/// A trait for processing groups that can be dynamically sampled.
182pub trait Sampling {
183    /// Whether dynamic sampling should run under the given project's conditions.
184    fn supports_sampling(project_info: &ProjectInfo) -> bool;
185
186    /// Whether reservoir sampling applies to this processing group (a.k.a. data type).
187    fn supports_reservoir_sampling() -> bool;
188}
189
190processing_group!(TransactionGroup, Transaction);
191impl EventProcessing for TransactionGroup {}
192
193impl Sampling for TransactionGroup {
194    fn supports_sampling(project_info: &ProjectInfo) -> bool {
195        // For transactions, we require transaction metrics to be enabled before sampling.
196        matches!(&project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled())
197    }
198
199    fn supports_reservoir_sampling() -> bool {
200        true
201    }
202}
203
204processing_group!(ErrorGroup, Error);
205impl EventProcessing for ErrorGroup {}
206
207processing_group!(SessionGroup, Session);
208processing_group!(StandaloneGroup, Standalone);
209processing_group!(ClientReportGroup, ClientReport);
210processing_group!(ReplayGroup, Replay);
211processing_group!(CheckInGroup, CheckIn);
212processing_group!(LogGroup, Log, Nel);
213processing_group!(SpanGroup, Span);
214
215impl Sampling for SpanGroup {
216    fn supports_sampling(project_info: &ProjectInfo) -> bool {
217        // If no metrics could be extracted, do not sample anything.
218        matches!(&project_info.config().metric_extraction, ErrorBoundary::Ok(c) if c.is_supported())
219    }
220
221    fn supports_reservoir_sampling() -> bool {
222        false
223    }
224}
225
226processing_group!(ProfileChunkGroup, ProfileChunk);
227processing_group!(MetricsGroup, Metrics);
228processing_group!(ForwardUnknownGroup, ForwardUnknown);
229processing_group!(Ungrouped, Ungrouped);
230
231/// Processed group type marker.
232///
233/// Marks the envelopes which passed through the processing pipeline.
234#[derive(Clone, Copy, Debug)]
235pub struct Processed;
236
237/// Describes the groups of the processable items.
238#[derive(Clone, Copy, Debug)]
239pub enum ProcessingGroup {
240    /// All the transaction related items.
241    ///
242    /// Includes transactions, related attachments, profiles.
243    Transaction,
244    /// All the items which require (have or create) events.
245    ///
246    /// This includes: errors, NEL, security reports, user reports, some of the
247    /// attachments.
248    Error,
249    /// Session events.
250    Session,
251    /// Standalone items which can be sent alone without any event attached to it in the current
252    /// envelope e.g. some attachments, user reports.
253    Standalone,
254    /// Outcomes.
255    ClientReport,
256    /// Replays and ReplayRecordings.
257    Replay,
258    /// Crons.
259    CheckIn,
260    /// NEL reports.
261    Nel,
262    /// Logs.
263    Log,
264    /// Spans.
265    Span,
266    /// Metrics.
267    Metrics,
268    /// ProfileChunk.
269    ProfileChunk,
270    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
271    /// decide what to do with them.
272    ForwardUnknown,
273    /// All the items in the envelope that could not be grouped.
274    Ungrouped,
275}
276
277impl ProcessingGroup {
278    /// Splits provided envelope into list of tuples of groups with associated envelopes.
279    pub fn split_envelope(mut envelope: Envelope) -> SmallVec<[(Self, Box<Envelope>); 3]> {
280        let headers = envelope.headers().clone();
281        let mut grouped_envelopes = smallvec![];
282
283        // Extract replays.
284        let replay_items = envelope.take_items_by(|item| {
285            matches!(
286                item.ty(),
287                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
288            )
289        });
290        if !replay_items.is_empty() {
291            grouped_envelopes.push((
292                ProcessingGroup::Replay,
293                Envelope::from_parts(headers.clone(), replay_items),
294            ))
295        }
296
297        // Keep all the sessions together in one envelope.
298        let session_items = envelope
299            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
300        if !session_items.is_empty() {
301            grouped_envelopes.push((
302                ProcessingGroup::Session,
303                Envelope::from_parts(headers.clone(), session_items),
304            ))
305        }
306
307        // Extract spans.
308        let span_items = envelope.take_items_by(|item| {
309            matches!(
310                item.ty(),
311                &ItemType::Span | &ItemType::OtelSpan | &ItemType::OtelTracesData
312            )
313        });
314
315        if !span_items.is_empty() {
316            grouped_envelopes.push((
317                ProcessingGroup::Span,
318                Envelope::from_parts(headers.clone(), span_items),
319            ))
320        }
321
322        // Extract logs.
323        let logs_items =
324            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Log | &ItemType::OtelLog));
325
326        if !logs_items.is_empty() {
327            grouped_envelopes.push((
328                ProcessingGroup::Log,
329                Envelope::from_parts(headers.clone(), logs_items),
330            ))
331        }
332
333        // NEL items are transformed into logs in their own processing step.
334        let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
335        if !nel_items.is_empty() {
336            grouped_envelopes.push((
337                ProcessingGroup::Nel,
338                Envelope::from_parts(headers.clone(), nel_items),
339            ))
340        }
341
342        // Extract all metric items.
343        //
344        // Note: Should only be relevant in proxy mode. In other modes we send metrics through
345        // a separate pipeline.
346        let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
347        if !metric_items.is_empty() {
348            grouped_envelopes.push((
349                ProcessingGroup::Metrics,
350                Envelope::from_parts(headers.clone(), metric_items),
351            ))
352        }
353
354        // Extract profile chunks.
355        let profile_chunk_items =
356            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
357        if !profile_chunk_items.is_empty() {
358            grouped_envelopes.push((
359                ProcessingGroup::ProfileChunk,
360                Envelope::from_parts(headers.clone(), profile_chunk_items),
361            ))
362        }
363
364        // Extract all standalone items.
365        //
366        // Note: only if there are no items in the envelope which can create events, otherwise they
367        // will be in the same envelope with all require event items.
368        if !envelope.items().any(Item::creates_event) {
369            let standalone_items = envelope.take_items_by(Item::requires_event);
370            if !standalone_items.is_empty() {
371                grouped_envelopes.push((
372                    ProcessingGroup::Standalone,
373                    Envelope::from_parts(headers.clone(), standalone_items),
374                ))
375            }
376        };
377
378        // Make sure we create separate envelopes for each `RawSecurity` report.
379        let security_reports_items = envelope
380            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
381            .into_iter()
382            .map(|item| {
383                let headers = headers.clone();
384                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
385                let mut envelope = Envelope::from_parts(headers, items);
386                envelope.set_event_id(EventId::new());
387                (ProcessingGroup::Error, envelope)
388            });
389        grouped_envelopes.extend(security_reports_items);
390
391        // Extract all the items which require an event into separate envelope.
392        let require_event_items = envelope.take_items_by(Item::requires_event);
393        if !require_event_items.is_empty() {
394            let group = if require_event_items
395                .iter()
396                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
397            {
398                ProcessingGroup::Transaction
399            } else {
400                ProcessingGroup::Error
401            };
402
403            grouped_envelopes.push((
404                group,
405                Envelope::from_parts(headers.clone(), require_event_items),
406            ))
407        }
408
409        // Get the rest of the envelopes, one per item.
410        let envelopes = envelope.items_mut().map(|item| {
411            let headers = headers.clone();
412            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
413            let envelope = Envelope::from_parts(headers, items);
414            let item_type = item.ty();
415            let group = if matches!(item_type, &ItemType::CheckIn) {
416                ProcessingGroup::CheckIn
417            } else if matches!(item.ty(), &ItemType::ClientReport) {
418                ProcessingGroup::ClientReport
419            } else if matches!(item_type, &ItemType::Unknown(_)) {
420                ProcessingGroup::ForwardUnknown
421            } else {
422                // Cannot group this item type.
423                ProcessingGroup::Ungrouped
424            };
425
426            (group, envelope)
427        });
428        grouped_envelopes.extend(envelopes);
429
430        grouped_envelopes
431    }
432
433    /// Returns the name of the group.
434    pub fn variant(&self) -> &'static str {
435        match self {
436            ProcessingGroup::Transaction => "transaction",
437            ProcessingGroup::Error => "error",
438            ProcessingGroup::Session => "session",
439            ProcessingGroup::Standalone => "standalone",
440            ProcessingGroup::ClientReport => "client_report",
441            ProcessingGroup::Replay => "replay",
442            ProcessingGroup::CheckIn => "check_in",
443            ProcessingGroup::Log => "log",
444            ProcessingGroup::Nel => "nel",
445            ProcessingGroup::Span => "span",
446            ProcessingGroup::Metrics => "metrics",
447            ProcessingGroup::ProfileChunk => "profile_chunk",
448            ProcessingGroup::ForwardUnknown => "forward_unknown",
449            ProcessingGroup::Ungrouped => "ungrouped",
450        }
451    }
452}
453
454impl From<ProcessingGroup> for AppFeature {
455    fn from(value: ProcessingGroup) -> Self {
456        match value {
457            ProcessingGroup::Transaction => AppFeature::Transactions,
458            ProcessingGroup::Error => AppFeature::Errors,
459            ProcessingGroup::Session => AppFeature::Sessions,
460            ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
461            ProcessingGroup::ClientReport => AppFeature::ClientReports,
462            ProcessingGroup::Replay => AppFeature::Replays,
463            ProcessingGroup::CheckIn => AppFeature::CheckIns,
464            ProcessingGroup::Log => AppFeature::Logs,
465            ProcessingGroup::Nel => AppFeature::Logs,
466            ProcessingGroup::Span => AppFeature::Spans,
467            ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
468            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
469            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
470            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
471        }
472    }
473}
474
475/// An error returned when handling [`ProcessEnvelope`].
476#[derive(Debug, thiserror::Error)]
477pub enum ProcessingError {
478    #[error("invalid json in event")]
479    InvalidJson(#[source] serde_json::Error),
480
481    #[error("invalid message pack event payload")]
482    InvalidMsgpack(#[from] rmp_serde::decode::Error),
483
484    #[cfg(feature = "processing")]
485    #[error("invalid unreal crash report")]
486    InvalidUnrealReport(#[source] Unreal4Error),
487
488    #[error("event payload too large")]
489    PayloadTooLarge(DiscardItemType),
490
491    #[error("invalid transaction event")]
492    InvalidTransaction,
493
494    #[error("envelope processor failed")]
495    ProcessingFailed(#[from] ProcessingAction),
496
497    #[error("duplicate {0} in event")]
498    DuplicateItem(ItemType),
499
500    #[error("failed to extract event payload")]
501    NoEventPayload,
502
503    #[error("missing project id in DSN")]
504    MissingProjectId,
505
506    #[error("invalid security report type: {0:?}")]
507    InvalidSecurityType(Bytes),
508
509    #[error("unsupported security report type")]
510    UnsupportedSecurityType,
511
512    #[error("invalid security report")]
513    InvalidSecurityReport(#[source] serde_json::Error),
514
515    #[error("invalid nel report")]
516    InvalidNelReport(#[source] NetworkReportError),
517
518    #[error("event filtered with reason: {0:?}")]
519    EventFiltered(FilterStatKey),
520
521    #[error("missing or invalid required event timestamp")]
522    InvalidTimestamp,
523
524    #[error("could not serialize event payload")]
525    SerializeFailed(#[source] serde_json::Error),
526
527    #[cfg(feature = "processing")]
528    #[error("failed to apply quotas")]
529    QuotasFailed(#[from] RateLimitingError),
530
531    #[error("invalid pii config")]
532    PiiConfigError(PiiConfigError),
533
534    #[error("invalid processing group type")]
535    InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
536
537    #[error("invalid replay")]
538    InvalidReplay(DiscardReason),
539
540    #[error("replay filtered with reason: {0:?}")]
541    ReplayFiltered(FilterStatKey),
542
543    #[cfg(feature = "processing")]
544    #[error("nintendo switch dying message processing failed {0:?}")]
545    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
546
547    #[cfg(all(sentry, feature = "processing"))]
548    #[error("playstation dump processing failed: {0}")]
549    InvalidPlaystationDump(String),
550
551    #[error("processing group does not match specific processor")]
552    ProcessingGroupMismatch,
553    #[error("new processing pipeline failed")]
554    ProcessingFailure,
555    #[error("failed to serialize processing result to an envelope")]
556    ProcessingEnvelopeSerialization,
557}
558
559impl ProcessingError {
560    pub fn to_outcome(&self) -> Option<Outcome> {
561        match self {
562            Self::PayloadTooLarge(payload_type) => {
563                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
564            }
565            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
566            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
567            Self::InvalidSecurityType(_) => {
568                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
569            }
570            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
571            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
572            Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
573            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
574            Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
575            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
576            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
577            #[cfg(feature = "processing")]
578            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
579            #[cfg(all(sentry, feature = "processing"))]
580            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
581            #[cfg(feature = "processing")]
582            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
583                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
584            }
585            #[cfg(feature = "processing")]
586            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
587            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
588                Some(Outcome::Invalid(DiscardReason::Internal))
589            }
590            #[cfg(feature = "processing")]
591            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
592            Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
593            Self::MissingProjectId => None,
594            Self::EventFiltered(_) => None,
595            Self::InvalidProcessingGroup(_) => None,
596            Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
597            Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
598
599            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
600            // Outcomes are emitted in the new processing pipeline already.
601            Self::ProcessingFailure => None,
602            Self::ProcessingEnvelopeSerialization => {
603                Some(Outcome::Invalid(DiscardReason::Internal))
604            }
605        }
606    }
607
608    fn is_unexpected(&self) -> bool {
609        self.to_outcome()
610            .is_some_and(|outcome| outcome.is_unexpected())
611    }
612}
613
614#[cfg(feature = "processing")]
615impl From<Unreal4Error> for ProcessingError {
616    fn from(err: Unreal4Error) -> Self {
617        match err.kind() {
618            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
619            _ => ProcessingError::InvalidUnrealReport(err),
620        }
621    }
622}
623
624impl From<ExtractMetricsError> for ProcessingError {
625    fn from(error: ExtractMetricsError) -> Self {
626        match error {
627            ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
628                Self::InvalidTimestamp
629            }
630        }
631    }
632}
633
634impl From<InvalidProcessingGroupType> for ProcessingError {
635    fn from(value: InvalidProcessingGroupType) -> Self {
636        Self::InvalidProcessingGroup(Box::new(value))
637    }
638}
639
640type ExtractedEvent = (Annotated<Event>, usize);
641
642/// A container for extracted metrics during processing.
643///
644/// The container enforces that the extracted metrics are correctly tagged
645/// with the dynamic sampling decision.
646#[derive(Debug)]
647pub struct ProcessingExtractedMetrics {
648    metrics: ExtractedMetrics,
649}
650
651impl ProcessingExtractedMetrics {
652    pub fn new() -> Self {
653        Self {
654            metrics: ExtractedMetrics::default(),
655        }
656    }
657
658    /// Extends the contained metrics with [`ExtractedMetrics`].
659    pub fn extend(
660        &mut self,
661        extracted: ExtractedMetrics,
662        sampling_decision: Option<SamplingDecision>,
663    ) {
664        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
665        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
666    }
667
668    /// Extends the contained project metrics.
669    pub fn extend_project_metrics<I>(
670        &mut self,
671        buckets: I,
672        sampling_decision: Option<SamplingDecision>,
673    ) where
674        I: IntoIterator<Item = Bucket>,
675    {
676        self.metrics
677            .project_metrics
678            .extend(buckets.into_iter().map(|mut bucket| {
679                bucket.metadata.extracted_from_indexed =
680                    sampling_decision == Some(SamplingDecision::Keep);
681                bucket
682            }));
683    }
684
685    /// Extends the contained sampling metrics.
686    pub fn extend_sampling_metrics<I>(
687        &mut self,
688        buckets: I,
689        sampling_decision: Option<SamplingDecision>,
690    ) where
691        I: IntoIterator<Item = Bucket>,
692    {
693        self.metrics
694            .sampling_metrics
695            .extend(buckets.into_iter().map(|mut bucket| {
696                bucket.metadata.extracted_from_indexed =
697                    sampling_decision == Some(SamplingDecision::Keep);
698                bucket
699            }));
700    }
701
702    /// Applies rate limits to the contained metrics.
703    ///
704    /// This is used to apply rate limits which have been enforced on sampled items of an envelope
705    /// to also consistently apply to the metrics extracted from these items.
706    #[cfg(feature = "processing")]
707    fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
708        // Metric namespaces which need to be dropped.
709        let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
710        // Metrics belonging to this metric namespace need to have the `extracted_from_indexed`
711        // flag reset to `false`.
712        let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
713
714        for (namespace, limit, indexed) in [
715            (
716                MetricNamespace::Transactions,
717                &enforcement.event,
718                &enforcement.event_indexed,
719            ),
720            (
721                MetricNamespace::Spans,
722                &enforcement.spans,
723                &enforcement.spans_indexed,
724            ),
725        ] {
726            if limit.is_active() {
727                drop_namespaces.push(namespace);
728            } else if indexed.is_active() && !enforced_consistently {
729                // If the enforcement was not computed by consistently checking the limits,
730                // the quota for the metrics has not yet been incremented.
731                // In this case we have a dropped indexed payload but a metric which still needs to
732                // be accounted for, make sure the metric will still be rate limited.
733                reset_extracted_from_indexed.push(namespace);
734            }
735        }
736
737        if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
738            self.retain_mut(|bucket| {
739                let Some(namespace) = bucket.name.try_namespace() else {
740                    return true;
741                };
742
743                if drop_namespaces.contains(&namespace) {
744                    return false;
745                }
746
747                if reset_extracted_from_indexed.contains(&namespace) {
748                    bucket.metadata.extracted_from_indexed = false;
749                }
750
751                true
752            });
753        }
754    }
755
756    #[cfg(feature = "processing")]
757    fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
758        self.metrics.project_metrics.retain_mut(&mut f);
759        self.metrics.sampling_metrics.retain_mut(&mut f);
760    }
761}
762
763fn send_metrics(
764    metrics: ExtractedMetrics,
765    project_key: ProjectKey,
766    sampling_key: Option<ProjectKey>,
767    aggregator: &Addr<Aggregator>,
768) {
769    let ExtractedMetrics {
770        project_metrics,
771        sampling_metrics,
772    } = metrics;
773
774    if !project_metrics.is_empty() {
775        aggregator.send(MergeBuckets {
776            project_key,
777            buckets: project_metrics,
778        });
779    }
780
781    if !sampling_metrics.is_empty() {
782        // If no sampling project state is available, we associate the sampling
783        // metrics with the current project.
784        //
785        // project_without_tracing         -> metrics goes to self
786        // dependent_project_with_tracing  -> metrics goes to root
787        // root_project_with_tracing       -> metrics goes to root == self
788        let sampling_project_key = sampling_key.unwrap_or(project_key);
789        aggregator.send(MergeBuckets {
790            project_key: sampling_project_key,
791            buckets: sampling_metrics,
792        });
793    }
794}
795
796/// Returns the data category if there is an event.
797///
798/// The data category is computed from the event type. Both `Default` and `Error` events map to
799/// the `Error` data category. If there is no Event, `None` is returned.
800fn event_category(event: &Annotated<Event>) -> Option<DataCategory> {
801    event_type(event).map(DataCategory::from)
802}
803
804/// Returns the event type if there is an event.
805///
806/// If the event does not have a type, `Some(EventType::Default)` is assumed. If, in contrast, there
807/// is no event, `None` is returned.
808fn event_type(event: &Annotated<Event>) -> Option<EventType> {
809    event
810        .value()
811        .map(|event| event.ty.value().copied().unwrap_or_default())
812}
813
814/// Function for on-off switches that filter specific item types (profiles, spans)
815/// based on a feature flag.
816///
817/// If the project config did not come from the upstream, we keep the items.
818fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
819    match config.relay_mode() {
820        RelayMode::Proxy | RelayMode::Static | RelayMode::Capture => false,
821        RelayMode::Managed => !project_info.has_feature(feature),
822    }
823}
824
825/// New type representing the normalization state of the event.
826#[derive(Copy, Clone)]
827struct EventFullyNormalized(bool);
828
829impl EventFullyNormalized {
830    /// Returns `true` if the event is fully normalized, `false` otherwise.
831    pub fn new(envelope: &Envelope) -> Self {
832        let event_fully_normalized = envelope.meta().is_from_internal_relay()
833            && envelope
834                .items()
835                .any(|item| item.creates_event() && item.fully_normalized());
836
837        Self(event_fully_normalized)
838    }
839}
840
841/// New type representing whether metrics were extracted from transactions/spans.
842#[derive(Debug, Copy, Clone)]
843struct EventMetricsExtracted(bool);
844
845/// New type representing whether spans were extracted.
846#[derive(Debug, Copy, Clone)]
847struct SpansExtracted(bool);
848
849/// The result of the envelope processing containing the processed envelope along with the partial
850/// result.
851#[expect(
852    clippy::large_enum_variant,
853    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
854)]
855#[derive(Debug)]
856enum ProcessingResult {
857    Envelope {
858        managed_envelope: TypedEnvelope<Processed>,
859        extracted_metrics: ProcessingExtractedMetrics,
860    },
861    Logs(Output<LogOutput>),
862}
863
864impl ProcessingResult {
865    /// Creates a [`ProcessingResult`] with no metrics.
866    fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
867        Self::Envelope {
868            managed_envelope,
869            extracted_metrics: ProcessingExtractedMetrics::new(),
870        }
871    }
872}
873
874/// All items which can be submitted upstream.
875#[expect(
876    clippy::large_enum_variant,
877    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
878)]
879enum Submit {
880    /// A processed envelope.
881    Envelope(TypedEnvelope<Processed>),
882    /// The output of the [log processor](LogsProcessor).
883    Logs(LogOutput),
884}
885
886/// Applies processing to all contents of the given envelope.
887///
888/// Depending on the contents of the envelope and Relay's mode, this includes:
889///
890///  - Basic normalization and validation for all item types.
891///  - Clock drift correction if the required `sent_at` header is present.
892///  - Expansion of certain item types (e.g. unreal).
893///  - Store normalization for event payloads in processing mode.
894///  - Rate limiters and inbound filters on events in processing mode.
895#[derive(Debug)]
896pub struct ProcessEnvelope {
897    /// Envelope to process.
898    pub envelope: ManagedEnvelope,
899    /// The project info.
900    pub project_info: Arc<ProjectInfo>,
901    /// Currently active cached rate limits for this project.
902    pub rate_limits: Arc<RateLimits>,
903    /// Root sampling project info.
904    pub sampling_project_info: Option<Arc<ProjectInfo>>,
905    /// Sampling reservoir counters.
906    pub reservoir_counters: ReservoirCounters,
907}
908
909/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
910#[derive(Debug)]
911struct ProcessEnvelopeGrouped {
912    /// The group the envelope belongs to.
913    pub group: ProcessingGroup,
914    /// Envelope to process.
915    pub envelope: ManagedEnvelope,
916    /// The project info.
917    pub project_info: Arc<ProjectInfo>,
918    /// Currently active cached rate limits for this project.
919    pub rate_limits: Arc<RateLimits>,
920    /// Root sampling project info.
921    pub sampling_project_info: Option<Arc<ProjectInfo>>,
922    /// Sampling reservoir counters.
923    pub reservoir_counters: ReservoirCounters,
924}
925
926/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
927///
928/// This parses and validates the metrics:
929///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
930///    ignored independently.
931///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
932///    dropped together on parsing failure.
933///  - Other envelope items will be ignored with an error message.
934///
935/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
936/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
937#[derive(Debug)]
938pub struct ProcessMetrics {
939    /// A list of metric items.
940    pub data: MetricData,
941    /// The target project.
942    pub project_key: ProjectKey,
943    /// Whether to keep or reset the metric metadata.
944    pub source: BucketSource,
945    /// The wall clock time at which the request was received.
946    pub received_at: DateTime<Utc>,
947    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
948    /// correction.
949    pub sent_at: Option<DateTime<Utc>>,
950}
951
952/// Raw unparsed metric data.
953#[derive(Debug)]
954pub enum MetricData {
955    /// Raw data, unparsed envelope items.
956    Raw(Vec<Item>),
957    /// Already parsed buckets but unprocessed.
958    Parsed(Vec<Bucket>),
959}
960
961impl MetricData {
962    /// Consumes the metric data and parses the contained buckets.
963    ///
964    /// If the contained data is already parsed the buckets are returned unchanged.
965    /// Raw buckets are parsed and created with the passed `timestamp`.
966    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
967        let items = match self {
968            Self::Parsed(buckets) => return buckets,
969            Self::Raw(items) => items,
970        };
971
972        let mut buckets = Vec::new();
973        for item in items {
974            let payload = item.payload();
975            if item.ty() == &ItemType::Statsd {
976                for bucket_result in Bucket::parse_all(&payload, timestamp) {
977                    match bucket_result {
978                        Ok(bucket) => buckets.push(bucket),
979                        Err(error) => relay_log::debug!(
980                            error = &error as &dyn Error,
981                            "failed to parse metric bucket from statsd format",
982                        ),
983                    }
984                }
985            } else if item.ty() == &ItemType::MetricBuckets {
986                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
987                    Ok(parsed_buckets) => {
988                        // Re-use the allocation of `b` if possible.
989                        if buckets.is_empty() {
990                            buckets = parsed_buckets;
991                        } else {
992                            buckets.extend(parsed_buckets);
993                        }
994                    }
995                    Err(error) => {
996                        relay_log::debug!(
997                            error = &error as &dyn Error,
998                            "failed to parse metric bucket",
999                        );
1000                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1001                    }
1002                }
1003            } else {
1004                relay_log::error!(
1005                    "invalid item of type {} passed to ProcessMetrics",
1006                    item.ty()
1007                );
1008            }
1009        }
1010        buckets
1011    }
1012}
1013
1014#[derive(Debug)]
1015pub struct ProcessBatchedMetrics {
1016    /// Metrics payload in JSON format.
1017    pub payload: Bytes,
1018    /// Whether to keep or reset the metric metadata.
1019    pub source: BucketSource,
1020    /// The wall clock time at which the request was received.
1021    pub received_at: DateTime<Utc>,
1022    /// The wall clock time at which the request was received.
1023    pub sent_at: Option<DateTime<Utc>>,
1024}
1025
1026/// Source information where a metric bucket originates from.
1027#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1028pub enum BucketSource {
1029    /// The metric bucket originated from an internal Relay use case.
1030    ///
1031    /// The metric bucket originates either from within the same Relay
1032    /// or was accepted coming from another Relay which is registered as
1033    /// an internal Relay via Relay's configuration.
1034    Internal,
1035    /// The bucket source originated from an untrusted source.
1036    ///
1037    /// Managed Relays sending extracted metrics are considered external,
1038    /// it's a project use case but it comes from an untrusted source.
1039    External,
1040}
1041
1042impl BucketSource {
1043    /// Infers the bucket source from [`RequestMeta::is_from_internal_relay`].
1044    pub fn from_meta(meta: &RequestMeta) -> Self {
1045        match meta.is_from_internal_relay() {
1046            true => Self::Internal,
1047            false => Self::External,
1048        }
1049    }
1050}
1051
1052/// Sends a client report to the upstream.
1053#[derive(Debug)]
1054pub struct SubmitClientReports {
1055    /// The client report to be sent.
1056    pub client_reports: Vec<ClientReport>,
1057    /// Scoping information for the client report.
1058    pub scoping: Scoping,
1059}
1060
1061/// CPU-intensive processing tasks for envelopes.
1062#[derive(Debug)]
1063pub enum EnvelopeProcessor {
1064    ProcessEnvelope(Box<ProcessEnvelope>),
1065    ProcessProjectMetrics(Box<ProcessMetrics>),
1066    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1067    FlushBuckets(Box<FlushBuckets>),
1068    SubmitClientReports(Box<SubmitClientReports>),
1069}
1070
1071impl EnvelopeProcessor {
1072    /// Returns the name of the message variant.
1073    pub fn variant(&self) -> &'static str {
1074        match self {
1075            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1076            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1077            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1078            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1079            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1080        }
1081    }
1082}
1083
1084impl relay_system::Interface for EnvelopeProcessor {}
1085
1086impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1087    type Response = relay_system::NoResponse;
1088
1089    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1090        Self::ProcessEnvelope(Box::new(message))
1091    }
1092}
1093
1094impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1095    type Response = NoResponse;
1096
1097    fn from_message(message: ProcessMetrics, _: ()) -> Self {
1098        Self::ProcessProjectMetrics(Box::new(message))
1099    }
1100}
1101
1102impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1103    type Response = NoResponse;
1104
1105    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1106        Self::ProcessBatchedMetrics(Box::new(message))
1107    }
1108}
1109
1110impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1111    type Response = NoResponse;
1112
1113    fn from_message(message: FlushBuckets, _: ()) -> Self {
1114        Self::FlushBuckets(Box::new(message))
1115    }
1116}
1117
1118impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1119    type Response = NoResponse;
1120
1121    fn from_message(message: SubmitClientReports, _: ()) -> Self {
1122        Self::SubmitClientReports(Box::new(message))
1123    }
1124}
1125
1126/// The asynchronous thread pool used for scheduling processing tasks in the processor.
1127pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1128
1129/// Service implementing the [`EnvelopeProcessor`] interface.
1130///
1131/// This service handles messages in a worker pool with configurable concurrency.
1132#[derive(Clone)]
1133pub struct EnvelopeProcessorService {
1134    inner: Arc<InnerProcessor>,
1135}
1136
1137/// Contains the addresses of services that the processor publishes to.
1138pub struct Addrs {
1139    pub outcome_aggregator: Addr<TrackOutcome>,
1140    pub upstream_relay: Addr<UpstreamRelay>,
1141    pub test_store: Addr<TestStore>,
1142    #[cfg(feature = "processing")]
1143    pub store_forwarder: Option<Addr<Store>>,
1144    pub aggregator: Addr<Aggregator>,
1145    #[cfg(feature = "processing")]
1146    pub global_rate_limits: Option<Addr<GlobalRateLimits>>,
1147}
1148
1149impl Default for Addrs {
1150    fn default() -> Self {
1151        Addrs {
1152            outcome_aggregator: Addr::dummy(),
1153            upstream_relay: Addr::dummy(),
1154            test_store: Addr::dummy(),
1155            #[cfg(feature = "processing")]
1156            store_forwarder: None,
1157            aggregator: Addr::dummy(),
1158            #[cfg(feature = "processing")]
1159            global_rate_limits: None,
1160        }
1161    }
1162}
1163
1164struct InnerProcessor {
1165    pool: EnvelopeProcessorServicePool,
1166    config: Arc<Config>,
1167    global_config: GlobalConfigHandle,
1168    project_cache: ProjectCacheHandle,
1169    cogs: Cogs,
1170    #[cfg(feature = "processing")]
1171    quotas_client: Option<AsyncRedisClient>,
1172    addrs: Addrs,
1173    #[cfg(feature = "processing")]
1174    rate_limiter: Option<Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>>,
1175    geoip_lookup: Option<GeoIpLookup>,
1176    #[cfg(feature = "processing")]
1177    cardinality_limiter: Option<CardinalityLimiter>,
1178    metric_outcomes: MetricOutcomes,
1179    processing: Processing,
1180}
1181
1182struct Processing {
1183    logs: LogsProcessor,
1184}
1185
1186impl EnvelopeProcessorService {
1187    /// Creates a multi-threaded envelope processor.
1188    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1189    pub fn new(
1190        pool: EnvelopeProcessorServicePool,
1191        config: Arc<Config>,
1192        global_config: GlobalConfigHandle,
1193        project_cache: ProjectCacheHandle,
1194        cogs: Cogs,
1195        #[cfg(feature = "processing")] redis: Option<RedisClients>,
1196        addrs: Addrs,
1197        metric_outcomes: MetricOutcomes,
1198    ) -> Self {
1199        let geoip_lookup = config.geoip_path().and_then(|p| {
1200            match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1201                Ok(geoip) => Some(geoip),
1202                Err(err) => {
1203                    relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1204                    None
1205                }
1206            }
1207        });
1208
1209        #[cfg(feature = "processing")]
1210        let (cardinality, quotas) = match redis {
1211            Some(RedisClients {
1212                cardinality,
1213                quotas,
1214                ..
1215            }) => (Some(cardinality), Some(quotas)),
1216            None => (None, None),
1217        };
1218
1219        #[cfg(feature = "processing")]
1220        let global_rate_limits = addrs.global_rate_limits.clone().map(Into::into);
1221
1222        #[cfg(feature = "processing")]
1223        let rate_limiter = match (quotas.clone(), global_rate_limits) {
1224            (Some(redis), Some(global)) => {
1225                Some(RedisRateLimiter::new(redis, global).max_limit(config.max_rate_limit()))
1226            }
1227            _ => None,
1228        };
1229
1230        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1231            #[cfg(feature = "processing")]
1232            project_cache.clone(),
1233            #[cfg(feature = "processing")]
1234            rate_limiter.clone(),
1235        ));
1236        #[cfg(feature = "processing")]
1237        let rate_limiter = rate_limiter.map(Arc::new);
1238
1239        let inner = InnerProcessor {
1240            pool,
1241            global_config,
1242            project_cache,
1243            cogs,
1244            #[cfg(feature = "processing")]
1245            quotas_client: quotas.clone(),
1246            #[cfg(feature = "processing")]
1247            rate_limiter,
1248            addrs,
1249            geoip_lookup,
1250            #[cfg(feature = "processing")]
1251            cardinality_limiter: cardinality
1252                .map(|cardinality| {
1253                    RedisSetLimiter::new(
1254                        RedisSetLimiterOptions {
1255                            cache_vacuum_interval: config
1256                                .cardinality_limiter_cache_vacuum_interval(),
1257                        },
1258                        cardinality,
1259                    )
1260                })
1261                .map(CardinalityLimiter::new),
1262            metric_outcomes,
1263            processing: Processing {
1264                logs: LogsProcessor::new(quota_limiter),
1265            },
1266            config,
1267        };
1268
1269        Self {
1270            inner: Arc::new(inner),
1271        }
1272    }
1273
1274    /// Normalize monitor check-ins and remove invalid ones.
1275    #[cfg(feature = "processing")]
1276    fn normalize_checkins(
1277        &self,
1278        managed_envelope: &mut TypedEnvelope<CheckInGroup>,
1279        project_id: ProjectId,
1280    ) {
1281        managed_envelope.retain_items(|item| {
1282            if item.ty() != &ItemType::CheckIn {
1283                return ItemAction::Keep;
1284            }
1285
1286            match relay_monitors::process_check_in(&item.payload(), project_id) {
1287                Ok(result) => {
1288                    item.set_routing_hint(result.routing_hint);
1289                    item.set_payload(ContentType::Json, result.payload);
1290                    ItemAction::Keep
1291                }
1292                Err(error) => {
1293                    // TODO: Track an outcome.
1294                    relay_log::debug!(
1295                        error = &error as &dyn Error,
1296                        "dropped invalid monitor check-in"
1297                    );
1298                    ItemAction::DropSilently
1299                }
1300            }
1301        })
1302    }
1303
1304    async fn enforce_quotas<Group>(
1305        &self,
1306        managed_envelope: &mut TypedEnvelope<Group>,
1307        event: Annotated<Event>,
1308        extracted_metrics: &mut ProcessingExtractedMetrics,
1309        project_info: &ProjectInfo,
1310        rate_limits: &RateLimits,
1311    ) -> Result<Annotated<Event>, ProcessingError> {
1312        let global_config = self.inner.global_config.current();
1313        // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not
1314        // applied in the fast path, all cached quotas can be applied here.
1315        let cached_result = RateLimiter::Cached
1316            .enforce(
1317                managed_envelope,
1318                event,
1319                extracted_metrics,
1320                &global_config,
1321                project_info,
1322                rate_limits,
1323            )
1324            .await?;
1325
1326        if_processing!(self.inner.config, {
1327            let rate_limiter = match self.inner.rate_limiter.clone() {
1328                Some(rate_limiter) => rate_limiter,
1329                None => return Ok(cached_result.event),
1330            };
1331
1332            // Enforce all quotas consistently with Redis.
1333            let consistent_result = RateLimiter::Consistent(rate_limiter)
1334                .enforce(
1335                    managed_envelope,
1336                    cached_result.event,
1337                    extracted_metrics,
1338                    &global_config,
1339                    project_info,
1340                    rate_limits,
1341                )
1342                .await?;
1343
1344            // Update cached rate limits with the freshly computed ones.
1345            if !consistent_result.rate_limits.is_empty() {
1346                self.inner
1347                    .project_cache
1348                    .get(managed_envelope.scoping().project_key)
1349                    .rate_limits()
1350                    .merge(consistent_result.rate_limits);
1351            }
1352
1353            Ok(consistent_result.event)
1354        } else { Ok(cached_result.event) })
1355    }
1356
1357    /// Extract transaction metrics.
1358    #[allow(clippy::too_many_arguments)]
1359    fn extract_transaction_metrics(
1360        &self,
1361        managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1362        event: &mut Annotated<Event>,
1363        extracted_metrics: &mut ProcessingExtractedMetrics,
1364        project_id: ProjectId,
1365        project_info: Arc<ProjectInfo>,
1366        sampling_decision: SamplingDecision,
1367        event_metrics_extracted: EventMetricsExtracted,
1368        spans_extracted: SpansExtracted,
1369    ) -> Result<EventMetricsExtracted, ProcessingError> {
1370        if event_metrics_extracted.0 {
1371            return Ok(event_metrics_extracted);
1372        }
1373        let Some(event) = event.value_mut() else {
1374            return Ok(event_metrics_extracted);
1375        };
1376
1377        // NOTE: This function requires a `metric_extraction` in the project config. Legacy configs
1378        // will upsert this configuration from transaction and conditional tagging fields, even if
1379        // it is not present in the actual project config payload.
1380        let global = self.inner.global_config.current();
1381        let combined_config = {
1382            let config = match &project_info.config.metric_extraction {
1383                ErrorBoundary::Ok(config) if config.is_supported() => config,
1384                _ => return Ok(event_metrics_extracted),
1385            };
1386            let global_config = match &global.metric_extraction {
1387                ErrorBoundary::Ok(global_config) => global_config,
1388                #[allow(unused_variables)]
1389                ErrorBoundary::Err(e) => {
1390                    if_processing!(self.inner.config, {
1391                        // Config is invalid, but we will try to extract what we can with just the
1392                        // project config.
1393                        relay_log::error!("Failed to parse global extraction config {e}");
1394                        MetricExtractionGroups::EMPTY
1395                    } else {
1396                        // If there's an error with global metrics extraction, it is safe to assume that this
1397                        // Relay instance is not up-to-date, and we should skip extraction.
1398                        relay_log::debug!("Failed to parse global extraction config: {e}");
1399                        return Ok(event_metrics_extracted);
1400                    })
1401                }
1402            };
1403            CombinedMetricExtractionConfig::new(global_config, config)
1404        };
1405
1406        // Require a valid transaction metrics config.
1407        let tx_config = match &project_info.config.transaction_metrics {
1408            Some(ErrorBoundary::Ok(tx_config)) => tx_config,
1409            Some(ErrorBoundary::Err(e)) => {
1410                relay_log::debug!("Failed to parse legacy transaction metrics config: {e}");
1411                return Ok(event_metrics_extracted);
1412            }
1413            None => {
1414                relay_log::debug!("Legacy transaction metrics config is missing");
1415                return Ok(event_metrics_extracted);
1416            }
1417        };
1418
1419        if !tx_config.is_enabled() {
1420            static TX_CONFIG_ERROR: Once = Once::new();
1421            TX_CONFIG_ERROR.call_once(|| {
1422                if self.inner.config.processing_enabled() {
1423                    relay_log::error!(
1424                        "Processing Relay outdated, received tx config in version {}, which is not supported",
1425                        tx_config.version
1426                    );
1427                }
1428            });
1429
1430            return Ok(event_metrics_extracted);
1431        }
1432
1433        // If spans were already extracted for an event, we rely on span processing to extract metrics.
1434        let extract_spans = !spans_extracted.0
1435            && project_info.config.features.produces_spans()
1436            && utils::sample(global.options.span_extraction_sample_rate.unwrap_or(1.0)).is_keep();
1437
1438        let metrics = crate::metrics_extraction::event::extract_metrics(
1439            event,
1440            combined_config,
1441            sampling_decision,
1442            project_id,
1443            self.inner
1444                .config
1445                .aggregator_config_for(MetricNamespace::Spans)
1446                .max_tag_value_length,
1447            extract_spans,
1448        );
1449
1450        extracted_metrics.extend(metrics, Some(sampling_decision));
1451
1452        if !project_info.has_feature(Feature::DiscardTransaction) {
1453            let transaction_from_dsc = managed_envelope
1454                .envelope()
1455                .dsc()
1456                .and_then(|dsc| dsc.transaction.as_deref());
1457
1458            let extractor = TransactionExtractor {
1459                config: tx_config,
1460                generic_config: Some(combined_config),
1461                transaction_from_dsc,
1462                sampling_decision,
1463                target_project_id: project_id,
1464            };
1465
1466            extracted_metrics.extend(extractor.extract(event)?, Some(sampling_decision));
1467        }
1468
1469        Ok(EventMetricsExtracted(true))
1470    }
1471
1472    fn normalize_event<Group: EventProcessing>(
1473        &self,
1474        managed_envelope: &mut TypedEnvelope<Group>,
1475        event: &mut Annotated<Event>,
1476        project_id: ProjectId,
1477        project_info: Arc<ProjectInfo>,
1478        mut event_fully_normalized: EventFullyNormalized,
1479    ) -> Result<EventFullyNormalized, ProcessingError> {
1480        if event.value().is_empty() {
1481            // NOTE(iker): only processing relays create events from
1482            // attachments, so these events won't be normalized in
1483            // non-processing relays even if the config is set to run full
1484            // normalization.
1485            return Ok(event_fully_normalized);
1486        }
1487
1488        let full_normalization = match self.inner.config.normalization_level() {
1489            NormalizationLevel::Full => true,
1490            NormalizationLevel::Default => {
1491                if self.inner.config.processing_enabled() && event_fully_normalized.0 {
1492                    return Ok(event_fully_normalized);
1493                }
1494
1495                self.inner.config.processing_enabled()
1496            }
1497        };
1498
1499        let request_meta = managed_envelope.envelope().meta();
1500        let client_ipaddr = request_meta.client_addr().map(IpAddr::from);
1501
1502        let transaction_aggregator_config = self
1503            .inner
1504            .config
1505            .aggregator_config_for(MetricNamespace::Transactions);
1506
1507        let global_config = self.inner.global_config.current();
1508        let ai_model_costs = global_config.ai_model_costs.clone().ok();
1509        let http_span_allowed_hosts = global_config.options.http_span_allowed_hosts.as_slice();
1510
1511        let retention_days: i64 = project_info
1512            .config
1513            .event_retention
1514            .unwrap_or(DEFAULT_EVENT_RETENTION)
1515            .into();
1516
1517        utils::log_transaction_name_metrics(event, |event| {
1518            let event_validation_config = EventValidationConfig {
1519                received_at: Some(managed_envelope.received_at()),
1520                max_secs_in_past: Some(retention_days * 24 * 3600),
1521                max_secs_in_future: Some(self.inner.config.max_secs_in_future()),
1522                transaction_timestamp_range: Some(transaction_aggregator_config.timestamp_range()),
1523                is_validated: false,
1524            };
1525
1526            let key_id = project_info
1527                .get_public_key_config()
1528                .and_then(|key| Some(key.numeric_id?.to_string()));
1529            if full_normalization && key_id.is_none() {
1530                relay_log::error!(
1531                    "project state for key {} is missing key id",
1532                    managed_envelope.envelope().meta().public_key()
1533                );
1534            }
1535
1536            let normalization_config = NormalizationConfig {
1537                project_id: Some(project_id.value()),
1538                client: request_meta.client().map(str::to_owned),
1539                key_id,
1540                protocol_version: Some(request_meta.version().to_string()),
1541                grouping_config: project_info.config.grouping_config.clone(),
1542                client_ip: client_ipaddr.as_ref(),
1543                // if the setting is enabled we do not want to infer the ip address
1544                infer_ip_address: !project_info
1545                    .config
1546                    .datascrubbing_settings
1547                    .scrub_ip_addresses,
1548                client_sample_rate: managed_envelope
1549                    .envelope()
1550                    .dsc()
1551                    .and_then(|ctx| ctx.sample_rate),
1552                user_agent: RawUserAgentInfo {
1553                    user_agent: request_meta.user_agent(),
1554                    client_hints: request_meta.client_hints().as_deref(),
1555                },
1556                max_name_and_unit_len: Some(
1557                    transaction_aggregator_config
1558                        .max_name_length
1559                        .saturating_sub(MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD),
1560                ),
1561                breakdowns_config: project_info.config.breakdowns_v2.as_ref(),
1562                performance_score: project_info.config.performance_score.as_ref(),
1563                normalize_user_agent: Some(true),
1564                transaction_name_config: TransactionNameConfig {
1565                    rules: &project_info.config.tx_name_rules,
1566                },
1567                device_class_synthesis_config: project_info
1568                    .has_feature(Feature::DeviceClassSynthesis),
1569                enrich_spans: project_info.has_feature(Feature::ExtractSpansFromEvent)
1570                    || project_info.has_feature(Feature::ExtractCommonSpanMetricsFromEvent),
1571                max_tag_value_length: self
1572                    .inner
1573                    .config
1574                    .aggregator_config_for(MetricNamespace::Spans)
1575                    .max_tag_value_length,
1576                is_renormalize: false,
1577                remove_other: full_normalization,
1578                emit_event_errors: full_normalization,
1579                span_description_rules: project_info.config.span_description_rules.as_ref(),
1580                geoip_lookup: self.inner.geoip_lookup.as_ref(),
1581                ai_model_costs: ai_model_costs.as_ref(),
1582                enable_trimming: true,
1583                measurements: Some(CombinedMeasurementsConfig::new(
1584                    project_info.config().measurements.as_ref(),
1585                    global_config.measurements.as_ref(),
1586                )),
1587                normalize_spans: true,
1588                replay_id: managed_envelope
1589                    .envelope()
1590                    .dsc()
1591                    .and_then(|ctx| ctx.replay_id),
1592                span_allowed_hosts: http_span_allowed_hosts,
1593                span_op_defaults: global_config.span_op_defaults.borrow(),
1594                performance_issues_spans: project_info.has_feature(Feature::PerformanceIssuesSpans),
1595            };
1596
1597            metric!(timer(RelayTimers::EventProcessingNormalization), {
1598                validate_event(event, &event_validation_config)
1599                    .map_err(|_| ProcessingError::InvalidTransaction)?;
1600                normalize_event(event, &normalization_config);
1601                if full_normalization && event::has_unprintable_fields(event) {
1602                    metric!(counter(RelayCounters::EventCorrupted) += 1);
1603                }
1604                Result::<(), ProcessingError>::Ok(())
1605            })
1606        })?;
1607
1608        event_fully_normalized.0 |= full_normalization;
1609
1610        Ok(event_fully_normalized)
1611    }
1612
1613    /// Processes the general errors, and the items which require or create the events.
1614    async fn process_errors(
1615        &self,
1616        managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1617        project_id: ProjectId,
1618        project_info: Arc<ProjectInfo>,
1619        mut sampling_project_info: Option<Arc<ProjectInfo>>,
1620        rate_limits: Arc<RateLimits>,
1621    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1622        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1623        let mut metrics = Metrics::default();
1624        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1625
1626        // Events can also contain user reports.
1627        report::process_user_reports(managed_envelope);
1628
1629        if_processing!(self.inner.config, {
1630            unreal::expand(managed_envelope, &self.inner.config)?;
1631            #[cfg(sentry)]
1632            playstation::expand(managed_envelope, &self.inner.config, &project_info)?;
1633            nnswitch::expand(managed_envelope)?;
1634        });
1635
1636        let extraction_result = event::extract(
1637            managed_envelope,
1638            &mut metrics,
1639            event_fully_normalized,
1640            &self.inner.config,
1641        )?;
1642        let mut event = extraction_result.event;
1643
1644        if_processing!(self.inner.config, {
1645            if let Some(inner_event_fully_normalized) =
1646                unreal::process(managed_envelope, &mut event)?
1647            {
1648                event_fully_normalized = inner_event_fully_normalized;
1649            }
1650            #[cfg(sentry)]
1651            if let Some(inner_event_fully_normalized) =
1652                playstation::process(managed_envelope, &mut event, &project_info)?
1653            {
1654                event_fully_normalized = inner_event_fully_normalized;
1655            }
1656            if let Some(inner_event_fully_normalized) =
1657                attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1658            {
1659                event_fully_normalized = inner_event_fully_normalized;
1660            }
1661        });
1662
1663        sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1664            managed_envelope,
1665            &mut event,
1666            project_info.clone(),
1667            sampling_project_info,
1668        );
1669        event::finalize(
1670            managed_envelope,
1671            &mut event,
1672            &mut metrics,
1673            &self.inner.config,
1674        )?;
1675        event_fully_normalized = self.normalize_event(
1676            managed_envelope,
1677            &mut event,
1678            project_id,
1679            project_info.clone(),
1680            event_fully_normalized,
1681        )?;
1682        let filter_run = event::filter(
1683            managed_envelope,
1684            &mut event,
1685            project_info.clone(),
1686            &self.inner.global_config.current(),
1687        )?;
1688
1689        if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1690            dynamic_sampling::tag_error_with_sampling_decision(
1691                managed_envelope,
1692                &mut event,
1693                sampling_project_info,
1694                &self.inner.config,
1695            )
1696            .await;
1697        }
1698
1699        event = self
1700            .enforce_quotas(
1701                managed_envelope,
1702                event,
1703                &mut extracted_metrics,
1704                &project_info,
1705                &rate_limits,
1706            )
1707            .await?;
1708
1709        if event.value().is_some() {
1710            event::scrub(&mut event, project_info.clone())?;
1711            event::serialize(
1712                managed_envelope,
1713                &mut event,
1714                event_fully_normalized,
1715                EventMetricsExtracted(false),
1716                SpansExtracted(false),
1717            )?;
1718            event::emit_feedback_metrics(managed_envelope.envelope());
1719        }
1720
1721        attachment::scrub(managed_envelope, project_info);
1722
1723        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1724            relay_log::error!(
1725                tags.project = %project_id,
1726                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1727                "ingested event without normalizing"
1728            );
1729        }
1730
1731        Ok(Some(extracted_metrics))
1732    }
1733
1734    /// Processes only transactions and transaction-related items.
1735    #[allow(unused_assignments)]
1736    #[allow(clippy::too_many_arguments)]
1737    async fn process_transactions(
1738        &self,
1739        managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1740        cogs: &mut Token,
1741        config: Arc<Config>,
1742        project_id: ProjectId,
1743        project_info: Arc<ProjectInfo>,
1744        mut sampling_project_info: Option<Arc<ProjectInfo>>,
1745        rate_limits: Arc<RateLimits>,
1746        reservoir_counters: ReservoirCounters,
1747    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1748        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1749        let mut event_metrics_extracted = EventMetricsExtracted(false);
1750        let mut spans_extracted = SpansExtracted(false);
1751        let mut metrics = Metrics::default();
1752        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1753
1754        let global_config = self.inner.global_config.current();
1755
1756        transaction::drop_invalid_items(managed_envelope, &global_config);
1757
1758        relay_cogs::with!(cogs, "event_extract", {
1759            // We extract the main event from the envelope.
1760            let extraction_result = event::extract(
1761                managed_envelope,
1762                &mut metrics,
1763                event_fully_normalized,
1764                &self.inner.config,
1765            )?;
1766        });
1767
1768        // If metrics were extracted we mark that.
1769        if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1770            event_metrics_extracted = inner_event_metrics_extracted;
1771        }
1772        if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1773            spans_extracted = inner_spans_extracted;
1774        };
1775
1776        // We take the main event out of the result.
1777        let mut event = extraction_result.event;
1778
1779        relay_cogs::with!(cogs, "profile_filter", {
1780            let profile_id = profile::filter(
1781                managed_envelope,
1782                &event,
1783                config.clone(),
1784                project_id,
1785                &project_info,
1786            );
1787            profile::transfer_id(&mut event, profile_id);
1788        });
1789
1790        relay_cogs::with!(cogs, "dynamic_sampling_dsc", {
1791            sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1792                managed_envelope,
1793                &mut event,
1794                project_info.clone(),
1795                sampling_project_info,
1796            );
1797        });
1798
1799        relay_cogs::with!(cogs, "event_finalize", {
1800            event::finalize(
1801                managed_envelope,
1802                &mut event,
1803                &mut metrics,
1804                &self.inner.config,
1805            )?;
1806        });
1807
1808        relay_cogs::with!(cogs, "event_normalize", {
1809            event_fully_normalized = self.normalize_event(
1810                managed_envelope,
1811                &mut event,
1812                project_id,
1813                project_info.clone(),
1814                event_fully_normalized,
1815            )?;
1816        });
1817
1818        relay_cogs::with!(cogs, "filter", {
1819            let filter_run = event::filter(
1820                managed_envelope,
1821                &mut event,
1822                project_info.clone(),
1823                &self.inner.global_config.current(),
1824            )?;
1825        });
1826
1827        // Always run dynamic sampling on processing Relays,
1828        // but delay decision until inbound filters have been fully processed.
1829        let run_dynamic_sampling =
1830            matches!(filter_run, FiltersStatus::Ok) || self.inner.config.processing_enabled();
1831
1832        let reservoir = self.new_reservoir_evaluator(
1833            managed_envelope.scoping().organization_id,
1834            reservoir_counters,
1835        );
1836
1837        relay_cogs::with!(cogs, "dynamic_sampling_run", {
1838            let sampling_result = match run_dynamic_sampling {
1839                true => {
1840                    dynamic_sampling::run(
1841                        managed_envelope,
1842                        &mut event,
1843                        config.clone(),
1844                        project_info.clone(),
1845                        sampling_project_info,
1846                        &reservoir,
1847                    )
1848                    .await
1849                }
1850                false => SamplingResult::Pending,
1851            };
1852        });
1853
1854        #[cfg(feature = "processing")]
1855        let server_sample_rate = match sampling_result {
1856            SamplingResult::Match(ref sampling_match) => Some(sampling_match.sample_rate()),
1857            SamplingResult::NoMatch | SamplingResult::Pending => None,
1858        };
1859
1860        if let Some(outcome) = sampling_result.into_dropped_outcome() {
1861            // Process profiles before dropping the transaction, if necessary.
1862            // Before metric extraction to make sure the profile count is reflected correctly.
1863            profile::process(
1864                managed_envelope,
1865                &mut event,
1866                &global_config,
1867                config.clone(),
1868                project_info.clone(),
1869            );
1870            // Extract metrics here, we're about to drop the event/transaction.
1871            event_metrics_extracted = self.extract_transaction_metrics(
1872                managed_envelope,
1873                &mut event,
1874                &mut extracted_metrics,
1875                project_id,
1876                project_info.clone(),
1877                SamplingDecision::Drop,
1878                event_metrics_extracted,
1879                spans_extracted,
1880            )?;
1881
1882            dynamic_sampling::drop_unsampled_items(
1883                managed_envelope,
1884                event,
1885                outcome,
1886                spans_extracted,
1887            );
1888
1889            // At this point we have:
1890            //  - An empty envelope.
1891            //  - An envelope containing only processed profiles.
1892            // We need to make sure there are enough quotas for these profiles.
1893            event = self
1894                .enforce_quotas(
1895                    managed_envelope,
1896                    Annotated::empty(),
1897                    &mut extracted_metrics,
1898                    &project_info,
1899                    &rate_limits,
1900                )
1901                .await?;
1902
1903            return Ok(Some(extracted_metrics));
1904        }
1905
1906        let _post_ds = cogs.start_category("post_ds");
1907
1908        // Need to scrub the transaction before extracting spans.
1909        //
1910        // Unconditionally scrub to make sure PII is removed as early as possible.
1911        event::scrub(&mut event, project_info.clone())?;
1912
1913        // TODO: remove once `relay.drop-transaction-attachments` has graduated.
1914        attachment::scrub(managed_envelope, project_info.clone());
1915
1916        if_processing!(self.inner.config, {
1917            // Process profiles before extracting metrics, to make sure they are removed if they are invalid.
1918            let profile_id = profile::process(
1919                managed_envelope,
1920                &mut event,
1921                &global_config,
1922                config.clone(),
1923                project_info.clone(),
1924            );
1925            profile::transfer_id(&mut event, profile_id);
1926            profile::scrub_profiler_id(&mut event);
1927
1928            // Always extract metrics in processing Relays for sampled items.
1929            event_metrics_extracted = self.extract_transaction_metrics(
1930                managed_envelope,
1931                &mut event,
1932                &mut extracted_metrics,
1933                project_id,
1934                project_info.clone(),
1935                SamplingDecision::Keep,
1936                event_metrics_extracted,
1937                spans_extracted,
1938            )?;
1939
1940            if project_info.has_feature(Feature::ExtractSpansFromEvent) {
1941                spans_extracted = span::extract_from_event(
1942                    managed_envelope,
1943                    &event,
1944                    &global_config,
1945                    config,
1946                    server_sample_rate,
1947                    event_metrics_extracted,
1948                    spans_extracted,
1949                );
1950            }
1951        });
1952
1953        event = self
1954            .enforce_quotas(
1955                managed_envelope,
1956                event,
1957                &mut extracted_metrics,
1958                &project_info,
1959                &rate_limits,
1960            )
1961            .await?;
1962
1963        if_processing!(self.inner.config, {
1964            event = span::maybe_discard_transaction(managed_envelope, event, project_info);
1965        });
1966
1967        // Event may have been dropped because of a quota and the envelope can be empty.
1968        if event.value().is_some() {
1969            event::serialize(
1970                managed_envelope,
1971                &mut event,
1972                event_fully_normalized,
1973                event_metrics_extracted,
1974                spans_extracted,
1975            )?;
1976        }
1977
1978        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1979            relay_log::error!(
1980                tags.project = %project_id,
1981                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1982                "ingested event without normalizing"
1983            );
1984        };
1985
1986        Ok(Some(extracted_metrics))
1987    }
1988
1989    async fn process_profile_chunks(
1990        &self,
1991        managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
1992        project_info: Arc<ProjectInfo>,
1993        rate_limits: Arc<RateLimits>,
1994    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1995        profile_chunk::filter(managed_envelope, project_info.clone());
1996
1997        if_processing!(self.inner.config, {
1998            profile_chunk::process(
1999                managed_envelope,
2000                &project_info,
2001                &self.inner.global_config.current(),
2002                &self.inner.config,
2003            );
2004        });
2005
2006        self.enforce_quotas(
2007            managed_envelope,
2008            Annotated::empty(),
2009            &mut ProcessingExtractedMetrics::new(),
2010            &project_info,
2011            &rate_limits,
2012        )
2013        .await?;
2014
2015        Ok(None)
2016    }
2017
2018    /// Processes standalone items that require an event ID, but do not have an event on the same envelope.
2019    async fn process_standalone(
2020        &self,
2021        managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
2022        config: Arc<Config>,
2023        project_id: ProjectId,
2024        project_info: Arc<ProjectInfo>,
2025        rate_limits: Arc<RateLimits>,
2026    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2027        let mut extracted_metrics = ProcessingExtractedMetrics::new();
2028
2029        standalone::process(managed_envelope);
2030
2031        profile::filter(
2032            managed_envelope,
2033            &Annotated::empty(),
2034            config,
2035            project_id,
2036            &project_info,
2037        );
2038
2039        self.enforce_quotas(
2040            managed_envelope,
2041            Annotated::empty(),
2042            &mut extracted_metrics,
2043            &project_info,
2044            &rate_limits,
2045        )
2046        .await?;
2047
2048        report::process_user_reports(managed_envelope);
2049        attachment::scrub(managed_envelope, project_info);
2050
2051        Ok(Some(extracted_metrics))
2052    }
2053
2054    /// Processes user sessions.
2055    async fn process_sessions(
2056        &self,
2057        managed_envelope: &mut TypedEnvelope<SessionGroup>,
2058        config: &Config,
2059        project_info: &ProjectInfo,
2060        rate_limits: &RateLimits,
2061    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2062        let mut extracted_metrics = ProcessingExtractedMetrics::new();
2063
2064        session::process(
2065            managed_envelope,
2066            &self.inner.global_config.current(),
2067            config,
2068            &mut extracted_metrics,
2069            project_info,
2070        );
2071
2072        self.enforce_quotas(
2073            managed_envelope,
2074            Annotated::empty(),
2075            &mut extracted_metrics,
2076            project_info,
2077            rate_limits,
2078        )
2079        .await?;
2080
2081        Ok(Some(extracted_metrics))
2082    }
2083
2084    /// Processes user and client reports.
2085    async fn process_client_reports(
2086        &self,
2087        managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
2088        config: Arc<Config>,
2089        project_info: Arc<ProjectInfo>,
2090        rate_limits: Arc<RateLimits>,
2091    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2092        let mut extracted_metrics = ProcessingExtractedMetrics::new();
2093
2094        self.enforce_quotas(
2095            managed_envelope,
2096            Annotated::empty(),
2097            &mut extracted_metrics,
2098            &project_info,
2099            &rate_limits,
2100        )
2101        .await?;
2102
2103        report::process_client_reports(
2104            managed_envelope,
2105            &config,
2106            &project_info,
2107            self.inner.addrs.outcome_aggregator.clone(),
2108        );
2109
2110        Ok(Some(extracted_metrics))
2111    }
2112
2113    /// Processes replays.
2114    async fn process_replays(
2115        &self,
2116        managed_envelope: &mut TypedEnvelope<ReplayGroup>,
2117        config: Arc<Config>,
2118        project_info: Arc<ProjectInfo>,
2119        rate_limits: Arc<RateLimits>,
2120    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2121        let mut extracted_metrics = ProcessingExtractedMetrics::new();
2122
2123        replay::process(
2124            managed_envelope,
2125            &self.inner.global_config.current(),
2126            &config,
2127            &project_info,
2128            self.inner.geoip_lookup.as_ref(),
2129        )?;
2130
2131        self.enforce_quotas(
2132            managed_envelope,
2133            Annotated::empty(),
2134            &mut extracted_metrics,
2135            &project_info,
2136            &rate_limits,
2137        )
2138        .await?;
2139
2140        Ok(Some(extracted_metrics))
2141    }
2142
2143    /// Processes cron check-ins.
2144    async fn process_checkins(
2145        &self,
2146        managed_envelope: &mut TypedEnvelope<CheckInGroup>,
2147        _project_id: ProjectId,
2148        project_info: Arc<ProjectInfo>,
2149        rate_limits: Arc<RateLimits>,
2150    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2151        self.enforce_quotas(
2152            managed_envelope,
2153            Annotated::empty(),
2154            &mut ProcessingExtractedMetrics::new(),
2155            &project_info,
2156            &rate_limits,
2157        )
2158        .await?;
2159
2160        if_processing!(self.inner.config, {
2161            self.normalize_checkins(managed_envelope, _project_id);
2162        });
2163
2164        Ok(None)
2165    }
2166
2167    async fn process_nel(
2168        &self,
2169        mut managed_envelope: ManagedEnvelope,
2170        ctx: processing::Context<'_>,
2171    ) -> Result<ProcessingResult, ProcessingError> {
2172        nel::convert_to_logs(&mut managed_envelope);
2173        self.process_logs(managed_envelope, ctx).await
2174    }
2175
2176    /// Process logs
2177    ///
2178    async fn process_logs(
2179        &self,
2180        mut managed_envelope: ManagedEnvelope,
2181        ctx: processing::Context<'_>,
2182    ) -> Result<ProcessingResult, ProcessingError> {
2183        let processor = &self.inner.processing.logs;
2184        let Some(logs) = processor.prepare_envelope(&mut managed_envelope) else {
2185            debug_assert!(
2186                false,
2187                "there must be work for the logs processor in the logs processing group"
2188            );
2189            return Err(ProcessingError::ProcessingGroupMismatch);
2190        };
2191
2192        managed_envelope.update();
2193        match managed_envelope.envelope().is_empty() {
2194            true => managed_envelope.accept(),
2195            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
2196        }
2197
2198        processor
2199            .process(logs, ctx)
2200            .await
2201            .map_err(|_| ProcessingError::ProcessingFailure)
2202            .map(ProcessingResult::Logs)
2203    }
2204
2205    /// Processes standalone spans.
2206    ///
2207    /// This function does *not* run for spans extracted from transactions.
2208    #[allow(clippy::too_many_arguments)]
2209    async fn process_standalone_spans(
2210        &self,
2211        managed_envelope: &mut TypedEnvelope<SpanGroup>,
2212        config: Arc<Config>,
2213        _project_id: ProjectId,
2214        project_info: Arc<ProjectInfo>,
2215        _sampling_project_info: Option<Arc<ProjectInfo>>,
2216        rate_limits: Arc<RateLimits>,
2217        _reservoir_counters: ReservoirCounters,
2218    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2219        let mut extracted_metrics = ProcessingExtractedMetrics::new();
2220
2221        span::expand_v2_spans(managed_envelope)?;
2222        span::filter(managed_envelope, config.clone(), project_info.clone());
2223        span::convert_otel_traces_data(managed_envelope);
2224
2225        if_processing!(self.inner.config, {
2226            let global_config = self.inner.global_config.current();
2227            let reservoir = self.new_reservoir_evaluator(
2228                managed_envelope.scoping().organization_id,
2229                _reservoir_counters,
2230            );
2231
2232            span::process(
2233                managed_envelope,
2234                &mut Annotated::empty(),
2235                &mut extracted_metrics,
2236                &global_config,
2237                config,
2238                _project_id,
2239                project_info.clone(),
2240                _sampling_project_info,
2241                self.inner.geoip_lookup.as_ref(),
2242                &reservoir,
2243            )
2244            .await;
2245        });
2246
2247        self.enforce_quotas(
2248            managed_envelope,
2249            Annotated::empty(),
2250            &mut extracted_metrics,
2251            &project_info,
2252            &rate_limits,
2253        )
2254        .await?;
2255
2256        Ok(Some(extracted_metrics))
2257    }
2258
2259    async fn process_envelope(
2260        &self,
2261        cogs: &mut Token,
2262        project_id: ProjectId,
2263        message: ProcessEnvelopeGrouped,
2264    ) -> Result<ProcessingResult, ProcessingError> {
2265        let ProcessEnvelopeGrouped {
2266            group,
2267            envelope: mut managed_envelope,
2268            project_info,
2269            rate_limits,
2270            sampling_project_info,
2271            reservoir_counters,
2272        } = message;
2273
2274        // Pre-process the envelope headers.
2275        if let Some(sampling_state) = sampling_project_info.as_ref() {
2276            // Both transactions and standalone span envelopes need a normalized DSC header
2277            // to make sampling rules based on the segment/transaction name work correctly.
2278            managed_envelope
2279                .envelope_mut()
2280                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
2281        }
2282
2283        // Set the event retention. Effectively, this value will only be available in processing
2284        // mode when the full project config is queried from the upstream.
2285        if let Some(retention) = project_info.config.event_retention {
2286            managed_envelope.envelope_mut().set_retention(retention);
2287        }
2288
2289        // Ensure the project ID is updated to the stored instance for this project cache. This can
2290        // differ in two cases:
2291        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
2292        //  2. The DSN was moved and the envelope sent to the old project ID.
2293        managed_envelope
2294            .envelope_mut()
2295            .meta_mut()
2296            .set_project_id(project_id);
2297
2298        macro_rules! run {
2299            ($fn_name:ident $(, $args:expr)*) => {
2300                async {
2301                    let mut managed_envelope = (managed_envelope, group).try_into()?;
2302                    match self.$fn_name(&mut managed_envelope, $($args),*).await {
2303                        Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
2304                            managed_envelope: managed_envelope.into_processed(),
2305                            extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
2306                        }),
2307                        Err(error) => {
2308                            relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
2309                            if let Some(outcome) = error.to_outcome() {
2310                                managed_envelope.reject(outcome);
2311                            }
2312
2313                            return Err(error);
2314                        }
2315                    }
2316                }.await
2317            };
2318        }
2319
2320        let global_config = self.inner.global_config.current();
2321        let ctx = processing::Context {
2322            config: &self.inner.config,
2323            global_config: &global_config,
2324            project_info: &project_info,
2325            rate_limits: &rate_limits,
2326        };
2327
2328        relay_log::trace!("Processing {group} group", group = group.variant());
2329
2330        match group {
2331            ProcessingGroup::Error => run!(
2332                process_errors,
2333                project_id,
2334                project_info,
2335                sampling_project_info,
2336                rate_limits
2337            ),
2338            ProcessingGroup::Transaction => {
2339                run!(
2340                    process_transactions,
2341                    cogs,
2342                    self.inner.config.clone(),
2343                    project_id,
2344                    project_info,
2345                    sampling_project_info,
2346                    rate_limits,
2347                    reservoir_counters
2348                )
2349            }
2350            ProcessingGroup::Session => run!(
2351                process_sessions,
2352                &self.inner.config.clone(),
2353                &project_info,
2354                &rate_limits
2355            ),
2356            ProcessingGroup::Standalone => run!(
2357                process_standalone,
2358                self.inner.config.clone(),
2359                project_id,
2360                project_info,
2361                rate_limits
2362            ),
2363            ProcessingGroup::ClientReport => run!(
2364                process_client_reports,
2365                self.inner.config.clone(),
2366                project_info,
2367                rate_limits
2368            ),
2369            ProcessingGroup::Replay => {
2370                run!(
2371                    process_replays,
2372                    self.inner.config.clone(),
2373                    project_info,
2374                    rate_limits
2375                )
2376            }
2377            ProcessingGroup::CheckIn => {
2378                run!(process_checkins, project_id, project_info, rate_limits)
2379            }
2380            ProcessingGroup::Log => self.process_logs(managed_envelope, ctx).await,
2381            ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
2382            ProcessingGroup::Span => run!(
2383                process_standalone_spans,
2384                self.inner.config.clone(),
2385                project_id,
2386                project_info,
2387                sampling_project_info,
2388                rate_limits,
2389                reservoir_counters
2390            ),
2391            ProcessingGroup::ProfileChunk => {
2392                run!(process_profile_chunks, project_info, rate_limits)
2393            }
2394            // Currently is not used.
2395            ProcessingGroup::Metrics => {
2396                // In proxy mode we simply forward the metrics.
2397                // This group shouldn't be used outside of proxy mode.
2398                if self.inner.config.relay_mode() != RelayMode::Proxy {
2399                    relay_log::error!(
2400                        tags.project = %project_id,
2401                        items = ?managed_envelope.envelope().items().next().map(Item::ty),
2402                        "received metrics in the process_state"
2403                    );
2404                }
2405
2406                Ok(ProcessingResult::no_metrics(
2407                    managed_envelope.into_processed(),
2408                ))
2409            }
2410            // Fallback to the legacy process_state implementation for Ungrouped events.
2411            ProcessingGroup::Ungrouped => {
2412                relay_log::error!(
2413                    tags.project = %project_id,
2414                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
2415                    "could not identify the processing group based on the envelope's items"
2416                );
2417
2418                Ok(ProcessingResult::no_metrics(
2419                    managed_envelope.into_processed(),
2420                ))
2421            }
2422            // Leave this group unchanged.
2423            //
2424            // This will later be forwarded to upstream.
2425            ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
2426                managed_envelope.into_processed(),
2427            )),
2428        }
2429    }
2430
2431    /// Processes the envelope and returns the processed envelope back.
2432    ///
2433    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
2434    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
2435    /// to be dropped, this is `None`.
2436    async fn process(
2437        &self,
2438        cogs: &mut Token,
2439        mut message: ProcessEnvelopeGrouped,
2440    ) -> Result<Option<Submit>, ProcessingError> {
2441        let ProcessEnvelopeGrouped {
2442            ref mut envelope,
2443            ref project_info,
2444            ..
2445        } = message;
2446
2447        // Prefer the project's project ID, and fall back to the stated project id from the
2448        // envelope. The project ID is available in all modes, other than in proxy mode, where
2449        // envelopes for unknown projects are forwarded blindly.
2450        //
2451        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
2452        // since we cannot process an envelope without project ID, so drop it.
2453        let Some(project_id) = project_info
2454            .project_id
2455            .or_else(|| envelope.envelope().meta().project_id())
2456        else {
2457            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2458            return Err(ProcessingError::MissingProjectId);
2459        };
2460
2461        let client = envelope.envelope().meta().client().map(str::to_owned);
2462        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2463        let project_key = envelope.envelope().meta().public_key();
2464        let sampling_key = envelope.envelope().sampling_key();
2465
2466        let has_ourlogs_new_byte_count =
2467            project_info.has_feature(Feature::OurLogsCalculatedByteCount);
2468
2469        // We set additional information on the scope, which will be removed after processing the
2470        // envelope.
2471        relay_log::configure_scope(|scope| {
2472            scope.set_tag("project", project_id);
2473            if let Some(client) = client {
2474                scope.set_tag("sdk", client);
2475            }
2476            if let Some(user_agent) = user_agent {
2477                scope.set_extra("user_agent", user_agent.into());
2478            }
2479        });
2480
2481        let result = match self.process_envelope(cogs, project_id, message).await {
2482            Ok(ProcessingResult::Envelope {
2483                mut managed_envelope,
2484                extracted_metrics,
2485            }) => {
2486                // The envelope could be modified or even emptied during processing, which
2487                // requires re-computation of the context.
2488                managed_envelope.update();
2489
2490                let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2491                send_metrics(
2492                    extracted_metrics.metrics,
2493                    project_key,
2494                    sampling_key,
2495                    &self.inner.addrs.aggregator,
2496                );
2497
2498                let envelope_response = if managed_envelope.envelope().is_empty() {
2499                    if !has_metrics {
2500                        // Individual rate limits have already been issued
2501                        managed_envelope.reject(Outcome::RateLimited(None));
2502                    } else {
2503                        managed_envelope.accept();
2504                    }
2505
2506                    None
2507                } else {
2508                    Some(managed_envelope)
2509                };
2510
2511                Ok(envelope_response.map(Submit::Envelope))
2512            }
2513            Ok(ProcessingResult::Logs(Output { main, metrics })) => {
2514                send_metrics(
2515                    metrics.metrics,
2516                    project_key,
2517                    sampling_key,
2518                    &self.inner.addrs.aggregator,
2519                );
2520
2521                if has_ourlogs_new_byte_count {
2522                    Ok(Some(Submit::Logs(main)))
2523                } else {
2524                    let envelope = main
2525                        .serialize_envelope()
2526                        .map_err(|_| ProcessingError::ProcessingEnvelopeSerialization)?;
2527                    let managed_envelope = ManagedEnvelope::from(envelope);
2528
2529                    Ok(Some(Submit::Envelope(managed_envelope.into_processed())))
2530                }
2531            }
2532            Err(err) => Err(err),
2533        };
2534
2535        relay_log::configure_scope(|scope| {
2536            scope.remove_tag("project");
2537            scope.remove_tag("sdk");
2538            scope.remove_tag("user_agent");
2539        });
2540
2541        result
2542    }
2543
2544    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2545        let project_key = message.envelope.envelope().meta().public_key();
2546        let wait_time = message.envelope.age();
2547        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2548
2549        // This COGS handling may need an overhaul in the future:
2550        // Cancel the passed in token, to start individual measurements per envelope instead.
2551        cogs.cancel();
2552
2553        let scoping = message.envelope.scoping();
2554        for (group, envelope) in ProcessingGroup::split_envelope(*message.envelope.into_envelope())
2555        {
2556            let mut cogs = self
2557                .inner
2558                .cogs
2559                .timed(ResourceId::Relay, AppFeature::from(group));
2560
2561            let mut envelope = ManagedEnvelope::new(
2562                envelope,
2563                self.inner.addrs.outcome_aggregator.clone(),
2564                self.inner.addrs.test_store.clone(),
2565            );
2566            envelope.scope(scoping);
2567
2568            let message = ProcessEnvelopeGrouped {
2569                group,
2570                envelope,
2571                project_info: Arc::clone(&message.project_info),
2572                rate_limits: Arc::clone(&message.rate_limits),
2573                sampling_project_info: message.sampling_project_info.as_ref().map(Arc::clone),
2574                reservoir_counters: Arc::clone(&message.reservoir_counters),
2575            };
2576
2577            let result = metric!(
2578                timer(RelayTimers::EnvelopeProcessingTime),
2579                group = group.variant(),
2580                { self.process(&mut cogs, message).await }
2581            );
2582
2583            match result {
2584                Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2585                Ok(None) => {}
2586                Err(error) if error.is_unexpected() => {
2587                    relay_log::error!(
2588                        tags.project_key = %project_key,
2589                        error = &error as &dyn Error,
2590                        "error processing envelope"
2591                    )
2592                }
2593                Err(error) => {
2594                    relay_log::debug!(
2595                        tags.project_key = %project_key,
2596                        error = &error as &dyn Error,
2597                        "error processing envelope"
2598                    )
2599                }
2600            }
2601        }
2602    }
2603
2604    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2605        let ProcessMetrics {
2606            data,
2607            project_key,
2608            received_at,
2609            sent_at,
2610            source,
2611        } = message;
2612
2613        let received_timestamp =
2614            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2615
2616        let mut buckets = data.into_buckets(received_timestamp);
2617        if buckets.is_empty() {
2618            return;
2619        };
2620        cogs.update(relay_metrics::cogs::BySize(&buckets));
2621
2622        let clock_drift_processor =
2623            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2624
2625        buckets.retain_mut(|bucket| {
2626            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2627                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2628                return false;
2629            }
2630
2631            if !self::metrics::is_valid_namespace(bucket, source) {
2632                return false;
2633            }
2634
2635            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2636
2637            if !matches!(source, BucketSource::Internal) {
2638                bucket.metadata = BucketMetadata::new(received_timestamp);
2639            }
2640
2641            true
2642        });
2643
2644        let project = self.inner.project_cache.get(project_key);
2645
2646        // Best effort check to filter and rate limit buckets, if there is no project state
2647        // available at the current time, we will check again after flushing.
2648        let buckets = match project.state() {
2649            ProjectState::Enabled(project_info) => {
2650                let rate_limits = project.rate_limits().current_limits();
2651                self.check_buckets(project_key, project_info, &rate_limits, buckets)
2652            }
2653            _ => buckets,
2654        };
2655
2656        relay_log::trace!("merging metric buckets into the aggregator");
2657        self.inner
2658            .addrs
2659            .aggregator
2660            .send(MergeBuckets::new(project_key, buckets));
2661    }
2662
2663    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2664        let ProcessBatchedMetrics {
2665            payload,
2666            source,
2667            received_at,
2668            sent_at,
2669        } = message;
2670
2671        #[derive(serde::Deserialize)]
2672        struct Wrapper {
2673            buckets: HashMap<ProjectKey, Vec<Bucket>>,
2674        }
2675
2676        let buckets = match serde_json::from_slice(&payload) {
2677            Ok(Wrapper { buckets }) => buckets,
2678            Err(error) => {
2679                relay_log::debug!(
2680                    error = &error as &dyn Error,
2681                    "failed to parse batched metrics",
2682                );
2683                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2684                return;
2685            }
2686        };
2687
2688        for (project_key, buckets) in buckets {
2689            self.handle_process_metrics(
2690                cogs,
2691                ProcessMetrics {
2692                    data: MetricData::Parsed(buckets),
2693                    project_key,
2694                    source,
2695                    received_at,
2696                    sent_at,
2697                },
2698            )
2699        }
2700    }
2701
2702    fn submit_upstream(&self, cogs: &mut Token, submit: Submit) {
2703        let _submit = cogs.start_category("submit");
2704
2705        #[cfg(feature = "processing")]
2706        if self.inner.config.processing_enabled() {
2707            if let Some(store_forwarder) = &self.inner.addrs.store_forwarder {
2708                match submit {
2709                    Submit::Envelope(envelope) => store_forwarder.send(StoreEnvelope { envelope }),
2710                    Submit::Logs(output) => output
2711                        .forward_store(store_forwarder)
2712                        .unwrap_or_else(|err| err.into_inner()),
2713                }
2714                return;
2715            }
2716        }
2717
2718        let mut envelope = match submit {
2719            Submit::Envelope(envelope) => envelope,
2720            Submit::Logs(output) => match output.serialize_envelope() {
2721                Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2722                Err(_) => {
2723                    relay_log::error!("failed to serialize output to an envelope");
2724                    return;
2725                }
2726            },
2727        };
2728
2729        // If we are in capture mode, we stash away the event instead of forwarding it.
2730        if Capture::should_capture(&self.inner.config) {
2731            relay_log::trace!("capturing envelope in memory");
2732            self.inner
2733                .addrs
2734                .test_store
2735                .send(Capture::accepted(envelope));
2736            return;
2737        }
2738
2739        // Override the `sent_at` timestamp. Since the envelope went through basic
2740        // normalization, all timestamps have been corrected. We propagate the new
2741        // `sent_at` to allow the next Relay to double-check this timestamp and
2742        // potentially apply correction again. This is done as close to sending as
2743        // possible so that we avoid internal delays.
2744        envelope.envelope_mut().set_sent_at(Utc::now());
2745
2746        relay_log::trace!("sending envelope to sentry endpoint");
2747        let http_encoding = self.inner.config.http_encoding();
2748        let result = envelope.envelope().to_vec().and_then(|v| {
2749            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2750        });
2751
2752        match result {
2753            Ok(body) => {
2754                self.inner
2755                    .addrs
2756                    .upstream_relay
2757                    .send(SendRequest(SendEnvelope {
2758                        envelope,
2759                        body,
2760                        http_encoding,
2761                        project_cache: self.inner.project_cache.clone(),
2762                    }));
2763            }
2764            Err(error) => {
2765                // Errors are only logged for what we consider an internal discard reason. These
2766                // indicate errors in the infrastructure or implementation bugs.
2767                relay_log::error!(
2768                    error = &error as &dyn Error,
2769                    tags.project_key = %envelope.scoping().project_key,
2770                    "failed to serialize envelope payload"
2771                );
2772
2773                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2774            }
2775        }
2776    }
2777
2778    fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2779        let SubmitClientReports {
2780            client_reports,
2781            scoping,
2782        } = message;
2783
2784        let upstream = self.inner.config.upstream_descriptor();
2785        let dsn = PartialDsn::outbound(&scoping, upstream);
2786
2787        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2788        for client_report in client_reports {
2789            let mut item = Item::new(ItemType::ClientReport);
2790            item.set_payload(ContentType::Json, client_report.serialize().unwrap()); // TODO: unwrap OK?
2791            envelope.add_item(item);
2792        }
2793
2794        let envelope = ManagedEnvelope::new(
2795            envelope,
2796            self.inner.addrs.outcome_aggregator.clone(),
2797            self.inner.addrs.test_store.clone(),
2798        );
2799        self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2800    }
2801
2802    fn check_buckets(
2803        &self,
2804        project_key: ProjectKey,
2805        project_info: &ProjectInfo,
2806        rate_limits: &RateLimits,
2807        buckets: Vec<Bucket>,
2808    ) -> Vec<Bucket> {
2809        let Some(scoping) = project_info.scoping(project_key) else {
2810            relay_log::error!(
2811                tags.project_key = project_key.as_str(),
2812                "there is no scoping: dropping {} buckets",
2813                buckets.len(),
2814            );
2815            return Vec::new();
2816        };
2817
2818        let mut buckets = self::metrics::apply_project_info(
2819            buckets,
2820            &self.inner.metric_outcomes,
2821            project_info,
2822            scoping,
2823        );
2824
2825        let namespaces: BTreeSet<MetricNamespace> = buckets
2826            .iter()
2827            .filter_map(|bucket| bucket.name.try_namespace())
2828            .collect();
2829
2830        for namespace in namespaces {
2831            let limits = rate_limits.check_with_quotas(
2832                project_info.get_quotas(),
2833                scoping.item(DataCategory::MetricBucket),
2834            );
2835
2836            if limits.is_limited() {
2837                let rejected;
2838                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2839                    bucket.name.try_namespace() == Some(namespace)
2840                });
2841
2842                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2843                self.inner.metric_outcomes.track(
2844                    scoping,
2845                    &rejected,
2846                    Outcome::RateLimited(reason_code),
2847                );
2848            }
2849        }
2850
2851        let quotas = project_info.config.quotas.clone();
2852        match MetricsLimiter::create(buckets, quotas, scoping) {
2853            Ok(mut bucket_limiter) => {
2854                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2855                bucket_limiter.into_buckets()
2856            }
2857            Err(buckets) => buckets,
2858        }
2859    }
2860
2861    #[cfg(feature = "processing")]
2862    async fn rate_limit_buckets(
2863        &self,
2864        scoping: Scoping,
2865        project_info: &ProjectInfo,
2866        mut buckets: Vec<Bucket>,
2867    ) -> Vec<Bucket> {
2868        let Some(rate_limiter) = &self.inner.rate_limiter else {
2869            return buckets;
2870        };
2871
2872        let global_config = self.inner.global_config.current();
2873        let namespaces = buckets
2874            .iter()
2875            .filter_map(|bucket| bucket.name.try_namespace())
2876            .counts();
2877
2878        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2879
2880        for (namespace, quantity) in namespaces {
2881            let item_scoping = scoping.metric_bucket(namespace);
2882
2883            let limits = match rate_limiter
2884                .is_rate_limited(quotas, item_scoping, quantity, false)
2885                .await
2886            {
2887                Ok(limits) => limits,
2888                Err(err) => {
2889                    relay_log::error!(
2890                        error = &err as &dyn std::error::Error,
2891                        "failed to check redis rate limits"
2892                    );
2893                    break;
2894                }
2895            };
2896
2897            if limits.is_limited() {
2898                let rejected;
2899                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2900                    bucket.name.try_namespace() == Some(namespace)
2901                });
2902
2903                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2904                self.inner.metric_outcomes.track(
2905                    scoping,
2906                    &rejected,
2907                    Outcome::RateLimited(reason_code),
2908                );
2909
2910                self.inner
2911                    .project_cache
2912                    .get(item_scoping.scoping.project_key)
2913                    .rate_limits()
2914                    .merge(limits);
2915            }
2916        }
2917
2918        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2919            Err(buckets) => buckets,
2920            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2921        }
2922    }
2923
2924    /// Check and apply rate limits to metrics buckets for transactions and spans.
2925    #[cfg(feature = "processing")]
2926    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2927        relay_log::trace!("handle_rate_limit_buckets");
2928
2929        let scoping = *bucket_limiter.scoping();
2930
2931        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2932            let global_config = self.inner.global_config.current();
2933            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2934
2935            // We set over_accept_once such that the limit is actually reached, which allows subsequent
2936            // calls with quantity=0 to be rate limited.
2937            let over_accept_once = true;
2938            let mut rate_limits = RateLimits::new();
2939
2940            for category in [DataCategory::Transaction, DataCategory::Span] {
2941                let count = bucket_limiter.count(category);
2942
2943                let timer = Instant::now();
2944                let mut is_limited = false;
2945
2946                if let Some(count) = count {
2947                    match rate_limiter
2948                        .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2949                        .await
2950                    {
2951                        Ok(limits) => {
2952                            is_limited = limits.is_limited();
2953                            rate_limits.merge(limits)
2954                        }
2955                        Err(e) => relay_log::error!(error = &e as &dyn Error),
2956                    }
2957                }
2958
2959                relay_statsd::metric!(
2960                    timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2961                    category = category.name(),
2962                    limited = if is_limited { "true" } else { "false" },
2963                    count = match count {
2964                        None => "none",
2965                        Some(0) => "0",
2966                        Some(1) => "1",
2967                        Some(1..=10) => "10",
2968                        Some(1..=25) => "25",
2969                        Some(1..=50) => "50",
2970                        Some(51..=100) => "100",
2971                        Some(101..=500) => "500",
2972                        _ => "> 500",
2973                    },
2974                );
2975            }
2976
2977            if rate_limits.is_limited() {
2978                let was_enforced =
2979                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2980
2981                if was_enforced {
2982                    // Update the rate limits in the project cache.
2983                    self.inner
2984                        .project_cache
2985                        .get(scoping.project_key)
2986                        .rate_limits()
2987                        .merge(rate_limits);
2988                }
2989            }
2990        }
2991
2992        bucket_limiter.into_buckets()
2993    }
2994
2995    /// Cardinality limits the passed buckets and returns a filtered vector of only accepted buckets.
2996    #[cfg(feature = "processing")]
2997    async fn cardinality_limit_buckets(
2998        &self,
2999        scoping: Scoping,
3000        limits: &[CardinalityLimit],
3001        buckets: Vec<Bucket>,
3002    ) -> Vec<Bucket> {
3003        let global_config = self.inner.global_config.current();
3004        let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
3005
3006        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
3007            return buckets;
3008        }
3009
3010        let Some(ref limiter) = self.inner.cardinality_limiter else {
3011            return buckets;
3012        };
3013
3014        let scope = relay_cardinality::Scoping {
3015            organization_id: scoping.organization_id,
3016            project_id: scoping.project_id,
3017        };
3018
3019        let limits = match limiter
3020            .check_cardinality_limits(scope, limits, buckets)
3021            .await
3022        {
3023            Ok(limits) => limits,
3024            Err((buckets, error)) => {
3025                relay_log::error!(
3026                    error = &error as &dyn std::error::Error,
3027                    "cardinality limiter failed"
3028                );
3029                return buckets;
3030            }
3031        };
3032
3033        let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
3034        if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
3035            for limit in limits.exceeded_limits() {
3036                relay_log::with_scope(
3037                    |scope| {
3038                        // Set the organization as user so we can alert on distinct org_ids.
3039                        scope.set_user(Some(relay_log::sentry::User {
3040                            id: Some(scoping.organization_id.to_string()),
3041                            ..Default::default()
3042                        }));
3043                    },
3044                    || {
3045                        relay_log::error!(
3046                            tags.organization_id = scoping.organization_id.value(),
3047                            tags.limit_id = limit.id,
3048                            tags.passive = limit.passive,
3049                            "Cardinality Limit"
3050                        );
3051                    },
3052                );
3053            }
3054        }
3055
3056        for (limit, reports) in limits.cardinality_reports() {
3057            for report in reports {
3058                self.inner
3059                    .metric_outcomes
3060                    .cardinality(scoping, limit, report);
3061            }
3062        }
3063
3064        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
3065            return limits.into_source();
3066        }
3067
3068        let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
3069
3070        for (bucket, exceeded) in rejected {
3071            self.inner.metric_outcomes.track(
3072                scoping,
3073                &[bucket],
3074                Outcome::CardinalityLimited(exceeded.id.clone()),
3075            );
3076        }
3077        accepted
3078    }
3079
3080    /// Processes metric buckets and sends them to kafka.
3081    ///
3082    /// This function runs the following steps:
3083    ///  - cardinality limiting
3084    ///  - rate limiting
3085    ///  - submit to `StoreForwarder`
3086    #[cfg(feature = "processing")]
3087    async fn encode_metrics_processing(
3088        &self,
3089        message: FlushBuckets,
3090        store_forwarder: &Addr<Store>,
3091    ) {
3092        use crate::constants::DEFAULT_EVENT_RETENTION;
3093        use crate::services::store::StoreMetrics;
3094
3095        for ProjectBuckets {
3096            buckets,
3097            scoping,
3098            project_info,
3099            ..
3100        } in message.buckets.into_values()
3101        {
3102            let buckets = self
3103                .rate_limit_buckets(scoping, &project_info, buckets)
3104                .await;
3105
3106            let limits = project_info.get_cardinality_limits();
3107            let buckets = self
3108                .cardinality_limit_buckets(scoping, limits, buckets)
3109                .await;
3110
3111            if buckets.is_empty() {
3112                continue;
3113            }
3114
3115            let retention = project_info
3116                .config
3117                .event_retention
3118                .unwrap_or(DEFAULT_EVENT_RETENTION);
3119
3120            // The store forwarder takes care of bucket splitting internally, so we can submit the
3121            // entire list of buckets. There is no batching needed here.
3122            store_forwarder.send(StoreMetrics {
3123                buckets,
3124                scoping,
3125                retention,
3126            });
3127        }
3128    }
3129
3130    /// Serializes metric buckets to JSON and sends them to the upstream.
3131    ///
3132    /// This function runs the following steps:
3133    ///  - partitioning
3134    ///  - batching by configured size limit
3135    ///  - serialize to JSON and pack in an envelope
3136    ///  - submit the envelope to upstream or kafka depending on configuration
3137    ///
3138    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
3139    /// access to the central Redis instance. Cached rate limits are applied in the project cache
3140    /// already.
3141    fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
3142        let FlushBuckets {
3143            partition_key,
3144            buckets,
3145        } = message;
3146
3147        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
3148        let upstream = self.inner.config.upstream_descriptor();
3149
3150        for ProjectBuckets {
3151            buckets, scoping, ..
3152        } in buckets.values()
3153        {
3154            let dsn = PartialDsn::outbound(scoping, upstream);
3155
3156            relay_statsd::metric!(
3157                histogram(RelayHistograms::PartitionKeys) = u64::from(partition_key)
3158            );
3159
3160            let mut num_batches = 0;
3161            for batch in BucketsView::from(buckets).by_size(batch_size) {
3162                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
3163
3164                let mut item = Item::new(ItemType::MetricBuckets);
3165                item.set_source_quantities(crate::metrics::extract_quantities(batch));
3166                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
3167                envelope.add_item(item);
3168
3169                let mut envelope = ManagedEnvelope::new(
3170                    envelope,
3171                    self.inner.addrs.outcome_aggregator.clone(),
3172                    self.inner.addrs.test_store.clone(),
3173                );
3174                envelope
3175                    .set_partition_key(Some(partition_key))
3176                    .scope(*scoping);
3177
3178                relay_statsd::metric!(
3179                    histogram(RelayHistograms::BucketsPerBatch) = batch.len() as u64
3180                );
3181
3182                self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
3183                num_batches += 1;
3184            }
3185
3186            relay_statsd::metric!(histogram(RelayHistograms::BatchesPerPartition) = num_batches);
3187        }
3188    }
3189
3190    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
3191    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
3192        if partition.is_empty() {
3193            return;
3194        }
3195
3196        let (unencoded, project_info) = partition.take();
3197        let http_encoding = self.inner.config.http_encoding();
3198        let encoded = match encode_payload(&unencoded, http_encoding) {
3199            Ok(payload) => payload,
3200            Err(error) => {
3201                let error = &error as &dyn std::error::Error;
3202                relay_log::error!(error, "failed to encode metrics payload");
3203                return;
3204            }
3205        };
3206
3207        let request = SendMetricsRequest {
3208            partition_key: partition_key.to_string(),
3209            unencoded,
3210            encoded,
3211            project_info,
3212            http_encoding,
3213            metric_outcomes: self.inner.metric_outcomes.clone(),
3214        };
3215
3216        self.inner.addrs.upstream_relay.send(SendRequest(request));
3217    }
3218
3219    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
3220    ///
3221    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
3222    /// payload directly instead of per-project Envelopes.
3223    ///
3224    /// This function runs the following steps:
3225    ///  - partitioning
3226    ///  - batching by configured size limit
3227    ///  - serialize to JSON
3228    ///  - submit directly to the upstream
3229    ///
3230    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
3231    /// access to the central Redis instance. Cached rate limits are applied in the project cache
3232    /// already.
3233    fn encode_metrics_global(&self, message: FlushBuckets) {
3234        let FlushBuckets {
3235            partition_key,
3236            buckets,
3237        } = message;
3238
3239        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
3240        let mut partition = Partition::new(batch_size);
3241        let mut partition_splits = 0;
3242
3243        for ProjectBuckets {
3244            buckets, scoping, ..
3245        } in buckets.values()
3246        {
3247            for bucket in buckets {
3248                let mut remaining = Some(BucketView::new(bucket));
3249
3250                while let Some(bucket) = remaining.take() {
3251                    if let Some(next) = partition.insert(bucket, *scoping) {
3252                        // A part of the bucket could not be inserted. Take the partition and submit
3253                        // it immediately. Repeat until the final part was inserted. This should
3254                        // always result in a request, otherwise we would enter an endless loop.
3255                        self.send_global_partition(partition_key, &mut partition);
3256                        remaining = Some(next);
3257                        partition_splits += 1;
3258                    }
3259                }
3260            }
3261        }
3262
3263        if partition_splits > 0 {
3264            metric!(histogram(RelayHistograms::PartitionSplits) = partition_splits);
3265        }
3266
3267        self.send_global_partition(partition_key, &mut partition);
3268    }
3269
3270    async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
3271        for (project_key, pb) in message.buckets.iter_mut() {
3272            let buckets = std::mem::take(&mut pb.buckets);
3273            pb.buckets =
3274                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
3275        }
3276
3277        #[cfg(feature = "processing")]
3278        if self.inner.config.processing_enabled() {
3279            if let Some(ref store_forwarder) = self.inner.addrs.store_forwarder {
3280                return self
3281                    .encode_metrics_processing(message, store_forwarder)
3282                    .await;
3283            }
3284        }
3285
3286        if self.inner.config.http_global_metrics() {
3287            self.encode_metrics_global(message)
3288        } else {
3289            self.encode_metrics_envelope(cogs, message)
3290        }
3291    }
3292
3293    #[cfg(all(test, feature = "processing"))]
3294    fn redis_rate_limiter_enabled(&self) -> bool {
3295        self.inner.rate_limiter.is_some()
3296    }
3297
3298    async fn handle_message(&self, message: EnvelopeProcessor) {
3299        let ty = message.variant();
3300        let feature_weights = self.feature_weights(&message);
3301
3302        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
3303            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
3304
3305            match message {
3306                EnvelopeProcessor::ProcessEnvelope(m) => {
3307                    self.handle_process_envelope(&mut cogs, *m).await
3308                }
3309                EnvelopeProcessor::ProcessProjectMetrics(m) => {
3310                    self.handle_process_metrics(&mut cogs, *m)
3311                }
3312                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
3313                    self.handle_process_batched_metrics(&mut cogs, *m)
3314                }
3315                EnvelopeProcessor::FlushBuckets(m) => {
3316                    self.handle_flush_buckets(&mut cogs, *m).await
3317                }
3318                EnvelopeProcessor::SubmitClientReports(m) => {
3319                    self.handle_submit_client_reports(&mut cogs, *m)
3320                }
3321            }
3322        });
3323    }
3324
3325    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
3326        match message {
3327            // Envelope is split later and tokens are attributed then.
3328            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
3329            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
3330            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
3331            EnvelopeProcessor::FlushBuckets(v) => v
3332                .buckets
3333                .values()
3334                .map(|s| {
3335                    if self.inner.config.processing_enabled() {
3336                        // Processing does not encode the metrics but instead rate and cardinality
3337                        // limits the metrics, which scales by count and not size.
3338                        relay_metrics::cogs::ByCount(&s.buckets).into()
3339                    } else {
3340                        relay_metrics::cogs::BySize(&s.buckets).into()
3341                    }
3342                })
3343                .fold(FeatureWeights::none(), FeatureWeights::merge),
3344            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
3345        }
3346    }
3347
3348    fn new_reservoir_evaluator(
3349        &self,
3350        _organization_id: OrganizationId,
3351        reservoir_counters: ReservoirCounters,
3352    ) -> ReservoirEvaluator {
3353        #[cfg_attr(not(feature = "processing"), expect(unused_mut))]
3354        let mut reservoir = ReservoirEvaluator::new(reservoir_counters);
3355
3356        #[cfg(feature = "processing")]
3357        if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
3358            reservoir.set_redis(_organization_id, quotas_client);
3359        }
3360
3361        reservoir
3362    }
3363}
3364
3365impl Service for EnvelopeProcessorService {
3366    type Interface = EnvelopeProcessor;
3367
3368    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
3369        while let Some(message) = rx.recv().await {
3370            let service = self.clone();
3371            self.inner
3372                .pool
3373                .spawn_async(
3374                    async move {
3375                        service.handle_message(message).await;
3376                    }
3377                    .boxed(),
3378                )
3379                .await;
3380        }
3381    }
3382}
3383
3384/// Result of the enforcement of rate limiting.
3385///
3386/// If the event is already `None` or it's rate limited, it will be `None`
3387/// within the [`Annotated`].
3388struct EnforcementResult {
3389    event: Annotated<Event>,
3390    #[cfg_attr(not(feature = "processing"), expect(dead_code))]
3391    rate_limits: RateLimits,
3392}
3393
3394impl EnforcementResult {
3395    /// Creates a new [`EnforcementResult`].
3396    pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
3397        Self { event, rate_limits }
3398    }
3399}
3400
3401#[derive(Clone)]
3402enum RateLimiter {
3403    Cached,
3404    #[cfg(feature = "processing")]
3405    Consistent(Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>),
3406}
3407
3408impl RateLimiter {
3409    async fn enforce<Group>(
3410        &self,
3411        managed_envelope: &mut TypedEnvelope<Group>,
3412        event: Annotated<Event>,
3413        _extracted_metrics: &mut ProcessingExtractedMetrics,
3414        global_config: &GlobalConfig,
3415        project_info: &ProjectInfo,
3416        rate_limits: &RateLimits,
3417    ) -> Result<EnforcementResult, ProcessingError> {
3418        if managed_envelope.envelope().is_empty() && event.value().is_none() {
3419            return Ok(EnforcementResult::new(event, RateLimits::default()));
3420        }
3421
3422        let quotas = CombinedQuotas::new(global_config, project_info.get_quotas());
3423        if quotas.is_empty() {
3424            return Ok(EnforcementResult::new(event, RateLimits::default()));
3425        }
3426
3427        let event_category = event_category(&event);
3428
3429        // We extract the rate limiters, in case we perform consistent rate limiting, since we will
3430        // need Redis access.
3431        //
3432        // When invoking the rate limiter, capture if the event item has been rate limited to also
3433        // remove it from the processing state eventually.
3434        let this = self.clone();
3435        let rate_limits_clone = rate_limits.clone();
3436        let mut envelope_limiter =
3437            EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
3438                let this = this.clone();
3439                let rate_limits_clone = rate_limits_clone.clone();
3440
3441                async move {
3442                    match this {
3443                        #[cfg(feature = "processing")]
3444                        RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
3445                            rate_limiter
3446                                .is_rate_limited(quotas, item_scope, _quantity, false)
3447                                .await?,
3448                        ),
3449                        _ => Ok::<_, ProcessingError>(
3450                            rate_limits_clone.check_with_quotas(quotas, item_scope),
3451                        ),
3452                    }
3453                }
3454            });
3455
3456        // Tell the envelope limiter about the event, since it has been removed from the Envelope at
3457        // this stage in processing.
3458        if let Some(category) = event_category {
3459            envelope_limiter.assume_event(category);
3460        }
3461
3462        let scoping = managed_envelope.scoping();
3463        let (enforcement, rate_limits) =
3464            metric!(timer(RelayTimers::EventProcessingRateLimiting), {
3465                envelope_limiter
3466                    .compute(managed_envelope.envelope_mut(), &scoping)
3467                    .await
3468            })?;
3469        let event_active = enforcement.is_event_active();
3470
3471        // Use the same rate limits as used for the envelope on the metrics.
3472        // Those rate limits should not be checked for expiry or similar to ensure a consistent
3473        // limiting of envelope items and metrics.
3474        #[cfg(feature = "processing")]
3475        _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3476        enforcement.apply_with_outcomes(managed_envelope);
3477
3478        if event_active {
3479            debug_assert!(managed_envelope.envelope().is_empty());
3480            return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3481        }
3482
3483        Ok(EnforcementResult::new(event, rate_limits))
3484    }
3485}
3486
3487fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3488    let envelope_body: Vec<u8> = match http_encoding {
3489        HttpEncoding::Identity => return Ok(body.clone()),
3490        HttpEncoding::Deflate => {
3491            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3492            encoder.write_all(body.as_ref())?;
3493            encoder.finish()?
3494        }
3495        HttpEncoding::Gzip => {
3496            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3497            encoder.write_all(body.as_ref())?;
3498            encoder.finish()?
3499        }
3500        HttpEncoding::Br => {
3501            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
3502            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3503            encoder.write_all(body.as_ref())?;
3504            encoder.into_inner()
3505        }
3506        HttpEncoding::Zstd => {
3507            // Use the fastest compression level, our main objective here is to get the best
3508            // compression ratio for least amount of time spent.
3509            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3510            encoder.write_all(body.as_ref())?;
3511            encoder.finish()?
3512        }
3513    };
3514
3515    Ok(envelope_body.into())
3516}
3517
3518/// An upstream request that submits an envelope via HTTP.
3519#[derive(Debug)]
3520pub struct SendEnvelope {
3521    envelope: TypedEnvelope<Processed>,
3522    body: Bytes,
3523    http_encoding: HttpEncoding,
3524    project_cache: ProjectCacheHandle,
3525}
3526
3527impl UpstreamRequest for SendEnvelope {
3528    fn method(&self) -> reqwest::Method {
3529        reqwest::Method::POST
3530    }
3531
3532    fn path(&self) -> Cow<'_, str> {
3533        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3534    }
3535
3536    fn route(&self) -> &'static str {
3537        "envelope"
3538    }
3539
3540    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3541        let envelope_body = self.body.clone();
3542        metric!(histogram(RelayHistograms::UpstreamEnvelopeBodySize) = envelope_body.len() as u64);
3543
3544        let meta = &self.envelope.meta();
3545        let shard = self.envelope.partition_key().map(|p| p.to_string());
3546        builder
3547            .content_encoding(self.http_encoding)
3548            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3549            .header_opt("User-Agent", meta.user_agent())
3550            .header("X-Sentry-Auth", meta.auth_header())
3551            .header("X-Forwarded-For", meta.forwarded_for())
3552            .header("Content-Type", envelope::CONTENT_TYPE)
3553            .header_opt("X-Sentry-Relay-Shard", shard)
3554            .body(envelope_body);
3555
3556        Ok(())
3557    }
3558
3559    fn sign(&mut self) -> Option<Sign> {
3560        Some(Sign::Optional(SignatureType::RequestSign))
3561    }
3562
3563    fn respond(
3564        self: Box<Self>,
3565        result: Result<http::Response, UpstreamRequestError>,
3566    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3567        Box::pin(async move {
3568            let result = match result {
3569                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3570                Err(error) => Err(error),
3571            };
3572
3573            match result {
3574                Ok(()) => self.envelope.accept(),
3575                Err(error) if error.is_received() => {
3576                    let scoping = self.envelope.scoping();
3577                    self.envelope.accept();
3578
3579                    if let UpstreamRequestError::RateLimited(limits) = error {
3580                        self.project_cache
3581                            .get(scoping.project_key)
3582                            .rate_limits()
3583                            .merge(limits.scope(&scoping));
3584                    }
3585                }
3586                Err(error) => {
3587                    // Errors are only logged for what we consider an internal discard reason. These
3588                    // indicate errors in the infrastructure or implementation bugs.
3589                    let mut envelope = self.envelope;
3590                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3591                    relay_log::error!(
3592                        error = &error as &dyn Error,
3593                        tags.project_key = %envelope.scoping().project_key,
3594                        "error sending envelope"
3595                    );
3596                }
3597            }
3598        })
3599    }
3600}
3601
3602/// A container for metric buckets from multiple projects.
3603///
3604/// This container is used to send metrics to the upstream in global batches as part of the
3605/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
3606/// the size of all metrics and allows to split them into multiple batches. See
3607/// [`insert`](Self::insert) for more information.
3608#[derive(Debug)]
3609struct Partition<'a> {
3610    max_size: usize,
3611    remaining: usize,
3612    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3613    project_info: HashMap<ProjectKey, Scoping>,
3614}
3615
3616impl<'a> Partition<'a> {
3617    /// Creates a new partition with the given maximum size in bytes.
3618    pub fn new(size: usize) -> Self {
3619        Self {
3620            max_size: size,
3621            remaining: size,
3622            views: HashMap::new(),
3623            project_info: HashMap::new(),
3624        }
3625    }
3626
3627    /// Inserts a bucket into the partition, splitting it if necessary.
3628    ///
3629    /// This function attempts to add the bucket to this partition. If the bucket does not fit
3630    /// entirely into the partition given its maximum size, the remaining part of the bucket is
3631    /// returned from this function call.
3632    ///
3633    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
3634    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
3635    /// partition. Afterwards, the caller is responsible to call this function again with the
3636    /// remaining bucket until it is fully inserted.
3637    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3638        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3639
3640        if let Some(current) = current {
3641            self.remaining = self.remaining.saturating_sub(current.estimated_size());
3642            self.views
3643                .entry(scoping.project_key)
3644                .or_default()
3645                .push(current);
3646
3647            self.project_info
3648                .entry(scoping.project_key)
3649                .or_insert(scoping);
3650        }
3651
3652        next
3653    }
3654
3655    /// Returns `true` if the partition does not hold any data.
3656    fn is_empty(&self) -> bool {
3657        self.views.is_empty()
3658    }
3659
3660    /// Returns the serialized buckets for this partition.
3661    ///
3662    /// This empties the partition, so that it can be reused.
3663    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3664        #[derive(serde::Serialize)]
3665        struct Wrapper<'a> {
3666            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3667        }
3668
3669        let buckets = &self.views;
3670        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3671
3672        let scopings = self.project_info.clone();
3673        self.project_info.clear();
3674
3675        self.views.clear();
3676        self.remaining = self.max_size;
3677
3678        (payload, scopings)
3679    }
3680}
3681
3682/// An upstream request that submits metric buckets via HTTP.
3683///
3684/// This request is not awaited. It automatically tracks outcomes if the request is not received.
3685#[derive(Debug)]
3686struct SendMetricsRequest {
3687    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
3688    partition_key: String,
3689    /// Serialized metric buckets without encoding applied, used for signing.
3690    unencoded: Bytes,
3691    /// Serialized metric buckets with the stated HTTP encoding applied.
3692    encoded: Bytes,
3693    /// Mapping of all contained project keys to their scoping and extraction mode.
3694    ///
3695    /// Used to track outcomes for transmission failures.
3696    project_info: HashMap<ProjectKey, Scoping>,
3697    /// Encoding (compression) of the payload.
3698    http_encoding: HttpEncoding,
3699    /// Metric outcomes instance to send outcomes on error.
3700    metric_outcomes: MetricOutcomes,
3701}
3702
3703impl SendMetricsRequest {
3704    fn create_error_outcomes(self) {
3705        #[derive(serde::Deserialize)]
3706        struct Wrapper {
3707            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3708        }
3709
3710        let buckets = match serde_json::from_slice(&self.unencoded) {
3711            Ok(Wrapper { buckets }) => buckets,
3712            Err(err) => {
3713                relay_log::error!(
3714                    error = &err as &dyn std::error::Error,
3715                    "failed to parse buckets from failed transmission"
3716                );
3717                return;
3718            }
3719        };
3720
3721        for (key, buckets) in buckets {
3722            let Some(&scoping) = self.project_info.get(&key) else {
3723                relay_log::error!("missing scoping for project key");
3724                continue;
3725            };
3726
3727            self.metric_outcomes.track(
3728                scoping,
3729                &buckets,
3730                Outcome::Invalid(DiscardReason::Internal),
3731            );
3732        }
3733    }
3734}
3735
3736impl UpstreamRequest for SendMetricsRequest {
3737    fn set_relay_id(&self) -> bool {
3738        true
3739    }
3740
3741    fn sign(&mut self) -> Option<Sign> {
3742        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3743    }
3744
3745    fn method(&self) -> reqwest::Method {
3746        reqwest::Method::POST
3747    }
3748
3749    fn path(&self) -> Cow<'_, str> {
3750        "/api/0/relays/metrics/".into()
3751    }
3752
3753    fn route(&self) -> &'static str {
3754        "global_metrics"
3755    }
3756
3757    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3758        metric!(histogram(RelayHistograms::UpstreamMetricsBodySize) = self.encoded.len() as u64);
3759
3760        builder
3761            .content_encoding(self.http_encoding)
3762            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3763            .header(header::CONTENT_TYPE, b"application/json")
3764            .body(self.encoded.clone());
3765
3766        Ok(())
3767    }
3768
3769    fn respond(
3770        self: Box<Self>,
3771        result: Result<http::Response, UpstreamRequestError>,
3772    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3773        Box::pin(async {
3774            match result {
3775                Ok(mut response) => {
3776                    response.consume().await.ok();
3777                }
3778                Err(error) => {
3779                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3780
3781                    // If the request did not arrive at the upstream, we are responsible for outcomes.
3782                    // Otherwise, the upstream is responsible to log outcomes.
3783                    if error.is_received() {
3784                        return;
3785                    }
3786
3787                    self.create_error_outcomes()
3788                }
3789            }
3790        })
3791    }
3792}
3793
3794/// Container for global and project level [`Quota`].
3795#[derive(Copy, Clone, Debug)]
3796struct CombinedQuotas<'a> {
3797    global_quotas: &'a [Quota],
3798    project_quotas: &'a [Quota],
3799}
3800
3801impl<'a> CombinedQuotas<'a> {
3802    /// Returns a new [`CombinedQuotas`].
3803    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3804        Self {
3805            global_quotas: &global_config.quotas,
3806            project_quotas,
3807        }
3808    }
3809
3810    /// Returns `true` if both global quotas and project quotas are empty.
3811    pub fn is_empty(&self) -> bool {
3812        self.len() == 0
3813    }
3814
3815    /// Returns the number of both global and project quotas.
3816    pub fn len(&self) -> usize {
3817        self.global_quotas.len() + self.project_quotas.len()
3818    }
3819}
3820
3821impl<'a> IntoIterator for CombinedQuotas<'a> {
3822    type Item = &'a Quota;
3823    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3824
3825    fn into_iter(self) -> Self::IntoIter {
3826        self.global_quotas.iter().chain(self.project_quotas.iter())
3827    }
3828}
3829
3830#[cfg(test)]
3831mod tests {
3832    use std::collections::BTreeMap;
3833    use std::env;
3834
3835    use insta::assert_debug_snapshot;
3836    use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3837    use relay_common::glob2::LazyGlob;
3838    use relay_dynamic_config::ProjectConfig;
3839    use relay_event_normalization::{RedactionRule, TransactionNameRule};
3840    use relay_event_schema::protocol::TransactionSource;
3841    use relay_pii::DataScrubbingConfig;
3842    use similar_asserts::assert_eq;
3843
3844    use crate::metrics_extraction::IntoMetric;
3845    use crate::metrics_extraction::transactions::types::{
3846        CommonTags, TransactionMeasurementTags, TransactionMetric,
3847    };
3848    use crate::testutils::{self, create_test_processor, create_test_processor_with_addrs};
3849
3850    #[cfg(feature = "processing")]
3851    use {
3852        relay_metrics::BucketValue,
3853        relay_quotas::{QuotaScope, ReasonCode},
3854        relay_test::mock_service,
3855    };
3856
3857    use super::*;
3858
3859    #[cfg(feature = "processing")]
3860    fn mock_quota(id: &str) -> Quota {
3861        Quota {
3862            id: Some(id.into()),
3863            categories: smallvec::smallvec![DataCategory::MetricBucket],
3864            scope: QuotaScope::Organization,
3865            scope_id: None,
3866            limit: Some(0),
3867            window: None,
3868            reason_code: None,
3869            namespace: None,
3870        }
3871    }
3872
3873    #[cfg(feature = "processing")]
3874    #[test]
3875    fn test_dynamic_quotas() {
3876        let global_config = GlobalConfig {
3877            quotas: vec![mock_quota("foo"), mock_quota("bar")],
3878            ..Default::default()
3879        };
3880
3881        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3882
3883        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3884
3885        assert_eq!(dynamic_quotas.len(), 4);
3886        assert!(!dynamic_quotas.is_empty());
3887
3888        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3889        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3890    }
3891
3892    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
3893    /// also ratelimit the next batches in the same message automatically.
3894    #[cfg(feature = "processing")]
3895    #[tokio::test]
3896    async fn test_ratelimit_per_batch() {
3897        use relay_base_schema::organization::OrganizationId;
3898        use relay_protocol::FiniteF64;
3899
3900        let rate_limited_org = Scoping {
3901            organization_id: OrganizationId::new(1),
3902            project_id: ProjectId::new(21),
3903            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3904            key_id: Some(17),
3905        };
3906
3907        let not_rate_limited_org = Scoping {
3908            organization_id: OrganizationId::new(2),
3909            project_id: ProjectId::new(21),
3910            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3911            key_id: Some(17),
3912        };
3913
3914        let message = {
3915            let project_info = {
3916                let quota = Quota {
3917                    id: Some("testing".into()),
3918                    categories: vec![DataCategory::MetricBucket].into(),
3919                    scope: relay_quotas::QuotaScope::Organization,
3920                    scope_id: Some(rate_limited_org.organization_id.to_string()),
3921                    limit: Some(0),
3922                    window: None,
3923                    reason_code: Some(ReasonCode::new("test")),
3924                    namespace: None,
3925                };
3926
3927                let mut config = ProjectConfig::default();
3928                config.quotas.push(quota);
3929
3930                Arc::new(ProjectInfo {
3931                    config,
3932                    ..Default::default()
3933                })
3934            };
3935
3936            let project_metrics = |scoping| ProjectBuckets {
3937                buckets: vec![Bucket {
3938                    name: "d:transactions/bar".into(),
3939                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3940                    timestamp: UnixTimestamp::now(),
3941                    tags: Default::default(),
3942                    width: 10,
3943                    metadata: BucketMetadata::default(),
3944                }],
3945                rate_limits: Default::default(),
3946                project_info: project_info.clone(),
3947                scoping,
3948            };
3949
3950            let buckets = hashbrown::HashMap::from([
3951                (
3952                    rate_limited_org.project_key,
3953                    project_metrics(rate_limited_org),
3954                ),
3955                (
3956                    not_rate_limited_org.project_key,
3957                    project_metrics(not_rate_limited_org),
3958                ),
3959            ]);
3960
3961            FlushBuckets {
3962                partition_key: 0,
3963                buckets,
3964            }
3965        };
3966
3967        // ensure the order of the map while iterating is as expected.
3968        assert_eq!(message.buckets.keys().count(), 2);
3969
3970        let config = {
3971            let config_json = serde_json::json!({
3972                "processing": {
3973                    "enabled": true,
3974                    "kafka_config": [],
3975                    "redis": {
3976                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3977                    }
3978                }
3979            });
3980            Config::from_json_value(config_json).unwrap()
3981        };
3982
3983        let (store, handle) = {
3984            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3985                let org_id = match msg {
3986                    Store::Metrics(x) => x.scoping.organization_id,
3987                    _ => panic!("received envelope when expecting only metrics"),
3988                };
3989                org_ids.push(org_id);
3990            };
3991
3992            mock_service("store_forwarder", vec![], f)
3993        };
3994
3995        let processor = create_test_processor(config).await;
3996        assert!(processor.redis_rate_limiter_enabled());
3997
3998        processor.encode_metrics_processing(message, &store).await;
3999
4000        drop(store);
4001        let orgs_not_ratelimited = handle.await.unwrap();
4002
4003        assert_eq!(
4004            orgs_not_ratelimited,
4005            vec![not_rate_limited_org.organization_id]
4006        );
4007    }
4008
4009    #[tokio::test]
4010    async fn test_browser_version_extraction_with_pii_like_data() {
4011        let processor = create_test_processor(Default::default()).await;
4012        let (outcome_aggregator, test_store) = testutils::processor_services();
4013        let event_id = EventId::new();
4014
4015        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
4016            .parse()
4017            .unwrap();
4018
4019        let request_meta = RequestMeta::new(dsn);
4020        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
4021
4022        envelope.add_item({
4023                let mut item = Item::new(ItemType::Event);
4024                item.set_payload(
4025                    ContentType::Json,
4026                    r#"
4027                    {
4028                        "request": {
4029                            "headers": [
4030                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
4031                            ]
4032                        }
4033                    }
4034                "#,
4035                );
4036                item
4037            });
4038
4039        let mut datascrubbing_settings = DataScrubbingConfig::default();
4040        // enable all the default scrubbing
4041        datascrubbing_settings.scrub_data = true;
4042        datascrubbing_settings.scrub_defaults = true;
4043        datascrubbing_settings.scrub_ip_addresses = true;
4044
4045        // Make sure to mask any IP-like looking data
4046        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
4047
4048        let config = ProjectConfig {
4049            datascrubbing_settings,
4050            pii_config: Some(pii_config),
4051            ..Default::default()
4052        };
4053
4054        let project_info = ProjectInfo {
4055            config,
4056            ..Default::default()
4057        };
4058
4059        let mut envelopes = ProcessingGroup::split_envelope(*envelope);
4060        assert_eq!(envelopes.len(), 1);
4061
4062        let (group, envelope) = envelopes.pop().unwrap();
4063        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store);
4064
4065        let message = ProcessEnvelopeGrouped {
4066            group,
4067            envelope,
4068            project_info: Arc::new(project_info),
4069            rate_limits: Default::default(),
4070            sampling_project_info: None,
4071            reservoir_counters: ReservoirCounters::default(),
4072        };
4073
4074        let Ok(Some(Submit::Envelope(mut new_envelope))) =
4075            processor.process(&mut Token::noop(), message).await
4076        else {
4077            panic!();
4078        };
4079        let new_envelope = new_envelope.envelope_mut();
4080
4081        let event_item = new_envelope.items().last().unwrap();
4082        let annotated_event: Annotated<Event> =
4083            Annotated::from_json_bytes(&event_item.payload()).unwrap();
4084        let event = annotated_event.into_value().unwrap();
4085        let headers = event
4086            .request
4087            .into_value()
4088            .unwrap()
4089            .headers
4090            .into_value()
4091            .unwrap();
4092
4093        // IP-like data must be masked
4094        assert_eq!(
4095            Some(
4096                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
4097            ),
4098            headers.get_header("User-Agent")
4099        );
4100        // But we still get correct browser and version number
4101        let contexts = event.contexts.into_value().unwrap();
4102        let browser = contexts.0.get("browser").unwrap();
4103        assert_eq!(
4104            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
4105            browser.to_json().unwrap()
4106        );
4107    }
4108
4109    #[tokio::test]
4110    #[cfg(feature = "processing")]
4111    async fn test_materialize_dsc() {
4112        use crate::services::projects::project::PublicKeyConfig;
4113
4114        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
4115            .parse()
4116            .unwrap();
4117        let request_meta = RequestMeta::new(dsn);
4118        let mut envelope = Envelope::from_request(None, request_meta);
4119
4120        let dsc = r#"{
4121            "trace_id": "00000000-0000-0000-0000-000000000000",
4122            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
4123            "sample_rate": "0.2"
4124        }"#;
4125        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
4126
4127        let mut item = Item::new(ItemType::Event);
4128        item.set_payload(ContentType::Json, r#"{}"#);
4129        envelope.add_item(item);
4130
4131        let (outcome_aggregator, test_store) = testutils::processor_services();
4132        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store);
4133
4134        let mut project_info = ProjectInfo::default();
4135        project_info.public_keys.push(PublicKeyConfig {
4136            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
4137            numeric_id: Some(1),
4138        });
4139        let project_info = Arc::new(project_info);
4140
4141        let message = ProcessEnvelopeGrouped {
4142            group: ProcessingGroup::Transaction,
4143            envelope: managed_envelope,
4144            project_info: project_info.clone(),
4145            rate_limits: Default::default(),
4146            sampling_project_info: Some(project_info),
4147            reservoir_counters: ReservoirCounters::default(),
4148        };
4149
4150        let config = Config::from_json_value(serde_json::json!({
4151            "processing": {
4152                "enabled": true,
4153                "kafka_config": [],
4154            }
4155        }))
4156        .unwrap();
4157
4158        let processor = create_test_processor(config).await;
4159        let Ok(Some(Submit::Envelope(envelope))) =
4160            processor.process(&mut Token::noop(), message).await
4161        else {
4162            panic!();
4163        };
4164        let event = envelope
4165            .envelope()
4166            .get_item_by(|item| item.ty() == &ItemType::Event)
4167            .unwrap();
4168
4169        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
4170        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r#"
4171        Object(
4172            {
4173                "environment": ~,
4174                "public_key": String(
4175                    "e12d836b15bb49d7bbf99e64295d995b",
4176                ),
4177                "release": ~,
4178                "replay_id": ~,
4179                "sample_rate": String(
4180                    "0.2",
4181                ),
4182                "trace_id": String(
4183                    "00000000000000000000000000000000",
4184                ),
4185                "transaction": ~,
4186            },
4187        )
4188        "#);
4189    }
4190
4191    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
4192        let mut event = Annotated::<Event>::from_json(
4193            r#"
4194            {
4195                "type": "transaction",
4196                "transaction": "/foo/",
4197                "timestamp": 946684810.0,
4198                "start_timestamp": 946684800.0,
4199                "contexts": {
4200                    "trace": {
4201                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
4202                        "span_id": "fa90fdead5f74053",
4203                        "op": "http.server",
4204                        "type": "trace"
4205                    }
4206                },
4207                "transaction_info": {
4208                    "source": "url"
4209                }
4210            }
4211            "#,
4212        )
4213        .unwrap();
4214        let e = event.value_mut().as_mut().unwrap();
4215        e.transaction.set_value(Some(transaction_name.into()));
4216
4217        e.transaction_info
4218            .value_mut()
4219            .as_mut()
4220            .unwrap()
4221            .source
4222            .set_value(Some(source));
4223
4224        relay_statsd::with_capturing_test_client(|| {
4225            utils::log_transaction_name_metrics(&mut event, |event| {
4226                let config = NormalizationConfig {
4227                    transaction_name_config: TransactionNameConfig {
4228                        rules: &[TransactionNameRule {
4229                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
4230                            expiry: DateTime::<Utc>::MAX_UTC,
4231                            redaction: RedactionRule::Replace {
4232                                substitution: "*".to_owned(),
4233                            },
4234                        }],
4235                    },
4236                    ..Default::default()
4237                };
4238                normalize_event(event, &config)
4239            });
4240        })
4241    }
4242
4243    #[test]
4244    fn test_log_transaction_metrics_none() {
4245        let captures = capture_test_event("/nothing", TransactionSource::Url);
4246        insta::assert_debug_snapshot!(captures, @r#"
4247        [
4248            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
4249        ]
4250        "#);
4251    }
4252
4253    #[test]
4254    fn test_log_transaction_metrics_rule() {
4255        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
4256        insta::assert_debug_snapshot!(captures, @r#"
4257        [
4258            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
4259        ]
4260        "#);
4261    }
4262
4263    #[test]
4264    fn test_log_transaction_metrics_pattern() {
4265        let captures = capture_test_event("/something/12345", TransactionSource::Url);
4266        insta::assert_debug_snapshot!(captures, @r#"
4267        [
4268            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
4269        ]
4270        "#);
4271    }
4272
4273    #[test]
4274    fn test_log_transaction_metrics_both() {
4275        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
4276        insta::assert_debug_snapshot!(captures, @r#"
4277        [
4278            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
4279        ]
4280        "#);
4281    }
4282
4283    #[test]
4284    fn test_log_transaction_metrics_no_match() {
4285        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
4286        insta::assert_debug_snapshot!(captures, @r#"
4287        [
4288            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
4289        ]
4290        "#);
4291    }
4292
4293    /// Confirms that the hardcoded value we use for the fixed length of the measurement MRI is
4294    /// correct. Unit test is placed here because it has dependencies to relay-server and therefore
4295    /// cannot be called from relay-metrics.
4296    #[test]
4297    fn test_mri_overhead_constant() {
4298        let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
4299
4300        let derived_value = {
4301            let name = "foobar".to_owned();
4302            let value = 5.into(); // Arbitrary value.
4303            let unit = MetricUnit::Duration(DurationUnit::default());
4304            let tags = TransactionMeasurementTags {
4305                measurement_rating: None,
4306                universal_tags: CommonTags(BTreeMap::new()),
4307                score_profile_version: None,
4308            };
4309
4310            let measurement = TransactionMetric::Measurement {
4311                name: name.clone(),
4312                value,
4313                unit,
4314                tags,
4315            };
4316
4317            let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
4318            metric.name.len() - unit.to_string().len() - name.len()
4319        };
4320        assert_eq!(
4321            hardcoded_value, derived_value,
4322            "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
4323        );
4324    }
4325
4326    #[tokio::test]
4327    async fn test_process_metrics_bucket_metadata() {
4328        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
4329        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
4330        let received_at = Utc::now();
4331        let config = Config::default();
4332
4333        let (aggregator, mut aggregator_rx) = Addr::custom();
4334        let processor = create_test_processor_with_addrs(
4335            config,
4336            Addrs {
4337                aggregator,
4338                ..Default::default()
4339            },
4340        )
4341        .await;
4342
4343        let mut item = Item::new(ItemType::Statsd);
4344        item.set_payload(
4345            ContentType::Text,
4346            "transactions/foo:3182887624:4267882815|s",
4347        );
4348        for (source, expected_received_at) in [
4349            (
4350                BucketSource::External,
4351                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
4352            ),
4353            (BucketSource::Internal, None),
4354        ] {
4355            let message = ProcessMetrics {
4356                data: MetricData::Raw(vec![item.clone()]),
4357                project_key,
4358                source,
4359                received_at,
4360                sent_at: Some(Utc::now()),
4361            };
4362            processor.handle_process_metrics(&mut token, message);
4363
4364            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
4365            let buckets = merge_buckets.buckets;
4366            assert_eq!(buckets.len(), 1);
4367            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
4368        }
4369    }
4370
4371    #[tokio::test]
4372    async fn test_process_batched_metrics() {
4373        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
4374        let received_at = Utc::now();
4375        let config = Config::default();
4376
4377        let (aggregator, mut aggregator_rx) = Addr::custom();
4378        let processor = create_test_processor_with_addrs(
4379            config,
4380            Addrs {
4381                aggregator,
4382                ..Default::default()
4383            },
4384        )
4385        .await;
4386
4387        let payload = r#"{
4388    "buckets": {
4389        "11111111111111111111111111111111": [
4390            {
4391                "timestamp": 1615889440,
4392                "width": 0,
4393                "name": "d:custom/endpoint.response_time@millisecond",
4394                "type": "d",
4395                "value": [
4396                  68.0
4397                ],
4398                "tags": {
4399                  "route": "user_index"
4400                }
4401            }
4402        ],
4403        "22222222222222222222222222222222": [
4404            {
4405                "timestamp": 1615889440,
4406                "width": 0,
4407                "name": "d:custom/endpoint.cache_rate@none",
4408                "type": "d",
4409                "value": [
4410                  36.0
4411                ]
4412            }
4413        ]
4414    }
4415}
4416"#;
4417        let message = ProcessBatchedMetrics {
4418            payload: Bytes::from(payload),
4419            source: BucketSource::Internal,
4420            received_at,
4421            sent_at: Some(Utc::now()),
4422        };
4423        processor.handle_process_batched_metrics(&mut token, message);
4424
4425        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
4426        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
4427
4428        let mut messages = vec![mb1, mb2];
4429        messages.sort_by_key(|mb| mb.project_key);
4430
4431        let actual = messages
4432            .into_iter()
4433            .map(|mb| (mb.project_key, mb.buckets))
4434            .collect::<Vec<_>>();
4435
4436        assert_debug_snapshot!(actual, @r###"
4437        [
4438            (
4439                ProjectKey("11111111111111111111111111111111"),
4440                [
4441                    Bucket {
4442                        timestamp: UnixTimestamp(1615889440),
4443                        width: 0,
4444                        name: MetricName(
4445                            "d:custom/endpoint.response_time@millisecond",
4446                        ),
4447                        value: Distribution(
4448                            [
4449                                68.0,
4450                            ],
4451                        ),
4452                        tags: {
4453                            "route": "user_index",
4454                        },
4455                        metadata: BucketMetadata {
4456                            merges: 1,
4457                            received_at: None,
4458                            extracted_from_indexed: false,
4459                        },
4460                    },
4461                ],
4462            ),
4463            (
4464                ProjectKey("22222222222222222222222222222222"),
4465                [
4466                    Bucket {
4467                        timestamp: UnixTimestamp(1615889440),
4468                        width: 0,
4469                        name: MetricName(
4470                            "d:custom/endpoint.cache_rate@none",
4471                        ),
4472                        value: Distribution(
4473                            [
4474                                36.0,
4475                            ],
4476                        ),
4477                        tags: {},
4478                        metadata: BucketMetadata {
4479                            merges: 1,
4480                            received_at: None,
4481                            extracted_from_indexed: false,
4482                        },
4483                    },
4484                ],
4485            ),
4486        ]
4487        "###);
4488    }
4489}