relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::{Arc, Once};
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, NormalizationLevel, RelayMode};
23use relay_dynamic_config::{CombinedMetricExtractionConfig, ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{
25    ClockDriftProcessor, CombinedMeasurementsConfig, EventValidationConfig, GeoIpLookup,
26    MeasurementsConfig, NormalizationConfig, RawUserAgentInfo, TransactionNameConfig,
27    normalize_event, validate_event,
28};
29use relay_event_schema::processor::ProcessingAction;
30use relay_event_schema::protocol::{
31    ClientReport, Event, EventId, EventType, IpAddr, Metrics, NetworkReportError, SpanV2,
32};
33use relay_filter::FilterStatKey;
34use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
35use relay_pii::PiiConfigError;
36use relay_protocol::{Annotated, Empty};
37use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
38use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
39use relay_statsd::metric;
40use relay_system::{Addr, FromMessage, NoResponse, Service};
41use reqwest::header;
42use smallvec::{SmallVec, smallvec};
43use zstd::stream::Encoder as ZstdEncoder;
44
45use crate::constants::DEFAULT_EVENT_RETENTION;
46use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType};
47use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
48use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
49use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
50use crate::metrics_extraction::transactions::types::ExtractMetricsError;
51use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor};
52use crate::processing::logs::LogsProcessor;
53use crate::processing::spans::SpansProcessor;
54use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
55use crate::service::ServiceError;
56use crate::services::global_config::GlobalConfigHandle;
57use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
58use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
59use crate::services::processor::event::FiltersStatus;
60use crate::services::projects::cache::ProjectCacheHandle;
61use crate::services::projects::project::{ProjectInfo, ProjectState};
62use crate::services::upstream::{
63    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
64};
65use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
66use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
67use crate::{http, processing};
68use relay_base_schema::organization::OrganizationId;
69use relay_threading::AsyncPool;
70#[cfg(feature = "processing")]
71use {
72    crate::managed::ItemAction,
73    crate::services::global_rate_limits::{GlobalRateLimits, GlobalRateLimitsServiceHandle},
74    crate::services::processor::nnswitch::SwitchProcessingError,
75    crate::services::store::{Store, StoreEnvelope},
76    crate::utils::Enforcement,
77    itertools::Itertools,
78    relay_cardinality::{
79        CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
80        RedisSetLimiterOptions,
81    },
82    relay_dynamic_config::{CardinalityLimiterMode, MetricExtractionGroups},
83    relay_quotas::{RateLimitingError, RedisRateLimiter},
84    relay_redis::{AsyncRedisClient, RedisClients},
85    std::time::Instant,
86    symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
87};
88
89mod attachment;
90mod dynamic_sampling;
91mod event;
92mod metrics;
93mod nel;
94mod profile;
95mod profile_chunk;
96mod replay;
97mod report;
98mod session;
99mod span;
100pub use span::extract_transaction_span;
101
102#[cfg(all(sentry, feature = "processing"))]
103mod playstation;
104mod standalone;
105#[cfg(feature = "processing")]
106mod unreal;
107
108#[cfg(feature = "processing")]
109mod nnswitch;
110
111/// Creates the block only if used with `processing` feature.
112///
113/// Provided code block will be executed only if the provided config has `processing_enabled` set.
114macro_rules! if_processing {
115    ($config:expr, $if_true:block) => {
116        #[cfg(feature = "processing")] {
117            if $config.processing_enabled() $if_true
118        }
119    };
120    ($config:expr, $if_true:block else $if_false:block) => {
121        {
122            #[cfg(feature = "processing")] {
123                if $config.processing_enabled() $if_true else $if_false
124            }
125            #[cfg(not(feature = "processing"))] {
126                $if_false
127            }
128        }
129    };
130}
131
132/// The minimum clock drift for correction to apply.
133const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
134
135#[derive(Debug)]
136pub struct GroupTypeError;
137
138impl Display for GroupTypeError {
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        f.write_str("failed to convert processing group into corresponding type")
141    }
142}
143
144impl std::error::Error for GroupTypeError {}
145
146macro_rules! processing_group {
147    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
148        #[derive(Clone, Copy, Debug)]
149        pub struct $ty;
150
151        impl From<$ty> for ProcessingGroup {
152            fn from(_: $ty) -> Self {
153                ProcessingGroup::$variant
154            }
155        }
156
157        impl TryFrom<ProcessingGroup> for $ty {
158            type Error = GroupTypeError;
159
160            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
161                if matches!(value, ProcessingGroup::$variant) {
162                    return Ok($ty);
163                }
164                $($(
165                    if matches!(value, ProcessingGroup::$other) {
166                        return Ok($ty);
167                    }
168                )+)?
169                return Err(GroupTypeError);
170            }
171        }
172    };
173}
174
175/// A marker trait.
176///
177/// Should be used only with groups which are responsible for processing envelopes with events.
178pub trait EventProcessing {}
179
180/// A trait for processing groups that can be dynamically sampled.
181pub trait Sampling {
182    /// Whether dynamic sampling should run under the given project's conditions.
183    fn supports_sampling(project_info: &ProjectInfo) -> bool;
184
185    /// Whether reservoir sampling applies to this processing group (a.k.a. data type).
186    fn supports_reservoir_sampling() -> bool;
187}
188
189processing_group!(TransactionGroup, Transaction);
190impl EventProcessing for TransactionGroup {}
191
192impl Sampling for TransactionGroup {
193    fn supports_sampling(project_info: &ProjectInfo) -> bool {
194        // For transactions, we require transaction metrics to be enabled before sampling.
195        matches!(&project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled())
196    }
197
198    fn supports_reservoir_sampling() -> bool {
199        true
200    }
201}
202
203processing_group!(ErrorGroup, Error);
204impl EventProcessing for ErrorGroup {}
205
206processing_group!(SessionGroup, Session);
207processing_group!(StandaloneGroup, Standalone);
208processing_group!(ClientReportGroup, ClientReport);
209processing_group!(ReplayGroup, Replay);
210processing_group!(CheckInGroup, CheckIn);
211processing_group!(LogGroup, Log, Nel);
212processing_group!(SpanGroup, Span);
213
214impl Sampling for SpanGroup {
215    fn supports_sampling(project_info: &ProjectInfo) -> bool {
216        // If no metrics could be extracted, do not sample anything.
217        matches!(&project_info.config().metric_extraction, ErrorBoundary::Ok(c) if c.is_supported())
218    }
219
220    fn supports_reservoir_sampling() -> bool {
221        false
222    }
223}
224
225processing_group!(ProfileChunkGroup, ProfileChunk);
226processing_group!(MetricsGroup, Metrics);
227processing_group!(ForwardUnknownGroup, ForwardUnknown);
228processing_group!(Ungrouped, Ungrouped);
229
230/// Processed group type marker.
231///
232/// Marks the envelopes which passed through the processing pipeline.
233#[derive(Clone, Copy, Debug)]
234pub struct Processed;
235
236/// Describes the groups of the processable items.
237#[derive(Clone, Copy, Debug)]
238pub enum ProcessingGroup {
239    /// All the transaction related items.
240    ///
241    /// Includes transactions, related attachments, profiles.
242    Transaction,
243    /// All the items which require (have or create) events.
244    ///
245    /// This includes: errors, NEL, security reports, user reports, some of the
246    /// attachments.
247    Error,
248    /// Session events.
249    Session,
250    /// Standalone items which can be sent alone without any event attached to it in the current
251    /// envelope e.g. some attachments, user reports.
252    Standalone,
253    /// Outcomes.
254    ClientReport,
255    /// Replays and ReplayRecordings.
256    Replay,
257    /// Crons.
258    CheckIn,
259    /// NEL reports.
260    Nel,
261    /// Logs.
262    Log,
263    /// Spans.
264    Span,
265    /// Span V2 spans.
266    SpanV2,
267    /// Metrics.
268    Metrics,
269    /// ProfileChunk.
270    ProfileChunk,
271    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
272    /// decide what to do with them.
273    ForwardUnknown,
274    /// All the items in the envelope that could not be grouped.
275    Ungrouped,
276}
277
278impl ProcessingGroup {
279    /// Splits provided envelope into list of tuples of groups with associated envelopes.
280    fn split_envelope(
281        mut envelope: Envelope,
282        project_info: &ProjectInfo,
283    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
284        let headers = envelope.headers().clone();
285        let mut grouped_envelopes = smallvec![];
286
287        // Extract replays.
288        let replay_items = envelope.take_items_by(|item| {
289            matches!(
290                item.ty(),
291                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
292            )
293        });
294        if !replay_items.is_empty() {
295            grouped_envelopes.push((
296                ProcessingGroup::Replay,
297                Envelope::from_parts(headers.clone(), replay_items),
298            ))
299        }
300
301        // Keep all the sessions together in one envelope.
302        let session_items = envelope
303            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
304        if !session_items.is_empty() {
305            grouped_envelopes.push((
306                ProcessingGroup::Session,
307                Envelope::from_parts(headers.clone(), session_items),
308            ))
309        }
310
311        if project_info.has_feature(Feature::SpanV2ExperimentalProcessing) {
312            let span_v2_items = envelope.take_items_by(ItemContainer::<SpanV2>::is_container);
313
314            if !span_v2_items.is_empty() {
315                grouped_envelopes.push((
316                    ProcessingGroup::SpanV2,
317                    Envelope::from_parts(headers.clone(), span_v2_items),
318                ))
319            }
320        }
321
322        // Extract spans.
323        let span_items = envelope.take_items_by(|item| {
324            matches!(
325                item.ty(),
326                &ItemType::Span | &ItemType::OtelSpan | &ItemType::OtelTracesData
327            )
328        });
329
330        if !span_items.is_empty() {
331            grouped_envelopes.push((
332                ProcessingGroup::Span,
333                Envelope::from_parts(headers.clone(), span_items),
334            ))
335        }
336
337        // Extract logs.
338        let logs_items = envelope
339            .take_items_by(|item| matches!(item.ty(), &ItemType::Log | &ItemType::OtelLogsData));
340        if !logs_items.is_empty() {
341            grouped_envelopes.push((
342                ProcessingGroup::Log,
343                Envelope::from_parts(headers.clone(), logs_items),
344            ))
345        }
346
347        // NEL items are transformed into logs in their own processing step.
348        let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
349        if !nel_items.is_empty() {
350            grouped_envelopes.push((
351                ProcessingGroup::Nel,
352                Envelope::from_parts(headers.clone(), nel_items),
353            ))
354        }
355
356        // Extract all metric items.
357        //
358        // Note: Should only be relevant in proxy mode. In other modes we send metrics through
359        // a separate pipeline.
360        let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
361        if !metric_items.is_empty() {
362            grouped_envelopes.push((
363                ProcessingGroup::Metrics,
364                Envelope::from_parts(headers.clone(), metric_items),
365            ))
366        }
367
368        // Extract profile chunks.
369        let profile_chunk_items =
370            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
371        if !profile_chunk_items.is_empty() {
372            grouped_envelopes.push((
373                ProcessingGroup::ProfileChunk,
374                Envelope::from_parts(headers.clone(), profile_chunk_items),
375            ))
376        }
377
378        // Extract all standalone items.
379        //
380        // Note: only if there are no items in the envelope which can create events, otherwise they
381        // will be in the same envelope with all require event items.
382        if !envelope.items().any(Item::creates_event) {
383            let standalone_items = envelope.take_items_by(Item::requires_event);
384            if !standalone_items.is_empty() {
385                grouped_envelopes.push((
386                    ProcessingGroup::Standalone,
387                    Envelope::from_parts(headers.clone(), standalone_items),
388                ))
389            }
390        };
391
392        // Make sure we create separate envelopes for each `RawSecurity` report.
393        let security_reports_items = envelope
394            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
395            .into_iter()
396            .map(|item| {
397                let headers = headers.clone();
398                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
399                let mut envelope = Envelope::from_parts(headers, items);
400                envelope.set_event_id(EventId::new());
401                (ProcessingGroup::Error, envelope)
402            });
403        grouped_envelopes.extend(security_reports_items);
404
405        // Extract all the items which require an event into separate envelope.
406        let require_event_items = envelope.take_items_by(Item::requires_event);
407        if !require_event_items.is_empty() {
408            let group = if require_event_items
409                .iter()
410                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
411            {
412                ProcessingGroup::Transaction
413            } else {
414                ProcessingGroup::Error
415            };
416
417            grouped_envelopes.push((
418                group,
419                Envelope::from_parts(headers.clone(), require_event_items),
420            ))
421        }
422
423        // Get the rest of the envelopes, one per item.
424        let envelopes = envelope.items_mut().map(|item| {
425            let headers = headers.clone();
426            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
427            let envelope = Envelope::from_parts(headers, items);
428            let item_type = item.ty();
429            let group = if matches!(item_type, &ItemType::CheckIn) {
430                ProcessingGroup::CheckIn
431            } else if matches!(item.ty(), &ItemType::ClientReport) {
432                ProcessingGroup::ClientReport
433            } else if matches!(item_type, &ItemType::Unknown(_)) {
434                ProcessingGroup::ForwardUnknown
435            } else {
436                // Cannot group this item type.
437                ProcessingGroup::Ungrouped
438            };
439
440            (group, envelope)
441        });
442        grouped_envelopes.extend(envelopes);
443
444        grouped_envelopes
445    }
446
447    /// Returns the name of the group.
448    pub fn variant(&self) -> &'static str {
449        match self {
450            ProcessingGroup::Transaction => "transaction",
451            ProcessingGroup::Error => "error",
452            ProcessingGroup::Session => "session",
453            ProcessingGroup::Standalone => "standalone",
454            ProcessingGroup::ClientReport => "client_report",
455            ProcessingGroup::Replay => "replay",
456            ProcessingGroup::CheckIn => "check_in",
457            ProcessingGroup::Log => "log",
458            ProcessingGroup::Nel => "nel",
459            ProcessingGroup::Span => "span",
460            ProcessingGroup::SpanV2 => "span_v2",
461            ProcessingGroup::Metrics => "metrics",
462            ProcessingGroup::ProfileChunk => "profile_chunk",
463            ProcessingGroup::ForwardUnknown => "forward_unknown",
464            ProcessingGroup::Ungrouped => "ungrouped",
465        }
466    }
467}
468
469impl From<ProcessingGroup> for AppFeature {
470    fn from(value: ProcessingGroup) -> Self {
471        match value {
472            ProcessingGroup::Transaction => AppFeature::Transactions,
473            ProcessingGroup::Error => AppFeature::Errors,
474            ProcessingGroup::Session => AppFeature::Sessions,
475            ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
476            ProcessingGroup::ClientReport => AppFeature::ClientReports,
477            ProcessingGroup::Replay => AppFeature::Replays,
478            ProcessingGroup::CheckIn => AppFeature::CheckIns,
479            ProcessingGroup::Log => AppFeature::Logs,
480            ProcessingGroup::Nel => AppFeature::Logs,
481            ProcessingGroup::Span => AppFeature::Spans,
482            ProcessingGroup::SpanV2 => AppFeature::Spans,
483            ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
484            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
485            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
486            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
487        }
488    }
489}
490
491/// An error returned when handling [`ProcessEnvelope`].
492#[derive(Debug, thiserror::Error)]
493pub enum ProcessingError {
494    #[error("invalid json in event")]
495    InvalidJson(#[source] serde_json::Error),
496
497    #[error("invalid message pack event payload")]
498    InvalidMsgpack(#[from] rmp_serde::decode::Error),
499
500    #[cfg(feature = "processing")]
501    #[error("invalid unreal crash report")]
502    InvalidUnrealReport(#[source] Unreal4Error),
503
504    #[error("event payload too large")]
505    PayloadTooLarge(DiscardItemType),
506
507    #[error("invalid transaction event")]
508    InvalidTransaction,
509
510    #[error("envelope processor failed")]
511    ProcessingFailed(#[from] ProcessingAction),
512
513    #[error("duplicate {0} in event")]
514    DuplicateItem(ItemType),
515
516    #[error("failed to extract event payload")]
517    NoEventPayload,
518
519    #[error("missing project id in DSN")]
520    MissingProjectId,
521
522    #[error("invalid security report type: {0:?}")]
523    InvalidSecurityType(Bytes),
524
525    #[error("unsupported security report type")]
526    UnsupportedSecurityType,
527
528    #[error("invalid security report")]
529    InvalidSecurityReport(#[source] serde_json::Error),
530
531    #[error("invalid nel report")]
532    InvalidNelReport(#[source] NetworkReportError),
533
534    #[error("event filtered with reason: {0:?}")]
535    EventFiltered(FilterStatKey),
536
537    #[error("missing or invalid required event timestamp")]
538    InvalidTimestamp,
539
540    #[error("could not serialize event payload")]
541    SerializeFailed(#[source] serde_json::Error),
542
543    #[cfg(feature = "processing")]
544    #[error("failed to apply quotas")]
545    QuotasFailed(#[from] RateLimitingError),
546
547    #[error("invalid pii config")]
548    PiiConfigError(PiiConfigError),
549
550    #[error("invalid processing group type")]
551    InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
552
553    #[error("invalid replay")]
554    InvalidReplay(DiscardReason),
555
556    #[error("replay filtered with reason: {0:?}")]
557    ReplayFiltered(FilterStatKey),
558
559    #[cfg(feature = "processing")]
560    #[error("nintendo switch dying message processing failed {0:?}")]
561    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
562
563    #[cfg(all(sentry, feature = "processing"))]
564    #[error("playstation dump processing failed: {0}")]
565    InvalidPlaystationDump(String),
566
567    #[error("processing group does not match specific processor")]
568    ProcessingGroupMismatch,
569    #[error("new processing pipeline failed")]
570    ProcessingFailure,
571}
572
573impl ProcessingError {
574    pub fn to_outcome(&self) -> Option<Outcome> {
575        match self {
576            Self::PayloadTooLarge(payload_type) => {
577                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
578            }
579            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
580            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
581            Self::InvalidSecurityType(_) => {
582                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
583            }
584            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
585            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
586            Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
587            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
588            Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
589            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
590            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
591            #[cfg(feature = "processing")]
592            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
593            #[cfg(all(sentry, feature = "processing"))]
594            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
595            #[cfg(feature = "processing")]
596            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
597                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
598            }
599            #[cfg(feature = "processing")]
600            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
601            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
602                Some(Outcome::Invalid(DiscardReason::Internal))
603            }
604            #[cfg(feature = "processing")]
605            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
606            Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
607            Self::MissingProjectId => None,
608            Self::EventFiltered(_) => None,
609            Self::InvalidProcessingGroup(_) => None,
610            Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
611            Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
612
613            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
614            // Outcomes are emitted in the new processing pipeline already.
615            Self::ProcessingFailure => None,
616        }
617    }
618
619    fn is_unexpected(&self) -> bool {
620        self.to_outcome()
621            .is_some_and(|outcome| outcome.is_unexpected())
622    }
623}
624
625#[cfg(feature = "processing")]
626impl From<Unreal4Error> for ProcessingError {
627    fn from(err: Unreal4Error) -> Self {
628        match err.kind() {
629            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
630            _ => ProcessingError::InvalidUnrealReport(err),
631        }
632    }
633}
634
635impl From<ExtractMetricsError> for ProcessingError {
636    fn from(error: ExtractMetricsError) -> Self {
637        match error {
638            ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
639                Self::InvalidTimestamp
640            }
641        }
642    }
643}
644
645impl From<InvalidProcessingGroupType> for ProcessingError {
646    fn from(value: InvalidProcessingGroupType) -> Self {
647        Self::InvalidProcessingGroup(Box::new(value))
648    }
649}
650
651type ExtractedEvent = (Annotated<Event>, usize);
652
653/// A container for extracted metrics during processing.
654///
655/// The container enforces that the extracted metrics are correctly tagged
656/// with the dynamic sampling decision.
657#[derive(Debug)]
658pub struct ProcessingExtractedMetrics {
659    metrics: ExtractedMetrics,
660}
661
662impl ProcessingExtractedMetrics {
663    pub fn new() -> Self {
664        Self {
665            metrics: ExtractedMetrics::default(),
666        }
667    }
668
669    /// Extends the contained metrics with [`ExtractedMetrics`].
670    pub fn extend(
671        &mut self,
672        extracted: ExtractedMetrics,
673        sampling_decision: Option<SamplingDecision>,
674    ) {
675        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
676        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
677    }
678
679    /// Extends the contained project metrics.
680    pub fn extend_project_metrics<I>(
681        &mut self,
682        buckets: I,
683        sampling_decision: Option<SamplingDecision>,
684    ) where
685        I: IntoIterator<Item = Bucket>,
686    {
687        self.metrics
688            .project_metrics
689            .extend(buckets.into_iter().map(|mut bucket| {
690                bucket.metadata.extracted_from_indexed =
691                    sampling_decision == Some(SamplingDecision::Keep);
692                bucket
693            }));
694    }
695
696    /// Extends the contained sampling metrics.
697    pub fn extend_sampling_metrics<I>(
698        &mut self,
699        buckets: I,
700        sampling_decision: Option<SamplingDecision>,
701    ) where
702        I: IntoIterator<Item = Bucket>,
703    {
704        self.metrics
705            .sampling_metrics
706            .extend(buckets.into_iter().map(|mut bucket| {
707                bucket.metadata.extracted_from_indexed =
708                    sampling_decision == Some(SamplingDecision::Keep);
709                bucket
710            }));
711    }
712
713    /// Applies rate limits to the contained metrics.
714    ///
715    /// This is used to apply rate limits which have been enforced on sampled items of an envelope
716    /// to also consistently apply to the metrics extracted from these items.
717    #[cfg(feature = "processing")]
718    fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
719        // Metric namespaces which need to be dropped.
720        let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
721        // Metrics belonging to this metric namespace need to have the `extracted_from_indexed`
722        // flag reset to `false`.
723        let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
724
725        for (namespace, limit, indexed) in [
726            (
727                MetricNamespace::Transactions,
728                &enforcement.event,
729                &enforcement.event_indexed,
730            ),
731            (
732                MetricNamespace::Spans,
733                &enforcement.spans,
734                &enforcement.spans_indexed,
735            ),
736        ] {
737            if limit.is_active() {
738                drop_namespaces.push(namespace);
739            } else if indexed.is_active() && !enforced_consistently {
740                // If the enforcement was not computed by consistently checking the limits,
741                // the quota for the metrics has not yet been incremented.
742                // In this case we have a dropped indexed payload but a metric which still needs to
743                // be accounted for, make sure the metric will still be rate limited.
744                reset_extracted_from_indexed.push(namespace);
745            }
746        }
747
748        if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
749            self.retain_mut(|bucket| {
750                let Some(namespace) = bucket.name.try_namespace() else {
751                    return true;
752                };
753
754                if drop_namespaces.contains(&namespace) {
755                    return false;
756                }
757
758                if reset_extracted_from_indexed.contains(&namespace) {
759                    bucket.metadata.extracted_from_indexed = false;
760                }
761
762                true
763            });
764        }
765    }
766
767    #[cfg(feature = "processing")]
768    fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
769        self.metrics.project_metrics.retain_mut(&mut f);
770        self.metrics.sampling_metrics.retain_mut(&mut f);
771    }
772}
773
774fn send_metrics(
775    metrics: ExtractedMetrics,
776    project_key: ProjectKey,
777    sampling_key: Option<ProjectKey>,
778    aggregator: &Addr<Aggregator>,
779) {
780    let ExtractedMetrics {
781        project_metrics,
782        sampling_metrics,
783    } = metrics;
784
785    if !project_metrics.is_empty() {
786        aggregator.send(MergeBuckets {
787            project_key,
788            buckets: project_metrics,
789        });
790    }
791
792    if !sampling_metrics.is_empty() {
793        // If no sampling project state is available, we associate the sampling
794        // metrics with the current project.
795        //
796        // project_without_tracing         -> metrics goes to self
797        // dependent_project_with_tracing  -> metrics goes to root
798        // root_project_with_tracing       -> metrics goes to root == self
799        let sampling_project_key = sampling_key.unwrap_or(project_key);
800        aggregator.send(MergeBuckets {
801            project_key: sampling_project_key,
802            buckets: sampling_metrics,
803        });
804    }
805}
806
807/// Returns the data category if there is an event.
808///
809/// The data category is computed from the event type. Both `Default` and `Error` events map to
810/// the `Error` data category. If there is no Event, `None` is returned.
811fn event_category(event: &Annotated<Event>) -> Option<DataCategory> {
812    event_type(event).map(DataCategory::from)
813}
814
815/// Returns the event type if there is an event.
816///
817/// If the event does not have a type, `Some(EventType::Default)` is assumed. If, in contrast, there
818/// is no event, `None` is returned.
819fn event_type(event: &Annotated<Event>) -> Option<EventType> {
820    event
821        .value()
822        .map(|event| event.ty.value().copied().unwrap_or_default())
823}
824
825/// Function for on-off switches that filter specific item types (profiles, spans)
826/// based on a feature flag.
827///
828/// If the project config did not come from the upstream, we keep the items.
829fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
830    match config.relay_mode() {
831        RelayMode::Proxy => false,
832        RelayMode::Managed => !project_info.has_feature(feature),
833    }
834}
835
836/// New type representing the normalization state of the event.
837#[derive(Copy, Clone)]
838struct EventFullyNormalized(bool);
839
840impl EventFullyNormalized {
841    /// Returns `true` if the event is fully normalized, `false` otherwise.
842    pub fn new(envelope: &Envelope) -> Self {
843        let event_fully_normalized = envelope.meta().request_trust().is_trusted()
844            && envelope
845                .items()
846                .any(|item| item.creates_event() && item.fully_normalized());
847
848        Self(event_fully_normalized)
849    }
850}
851
852/// New type representing whether metrics were extracted from transactions/spans.
853#[derive(Debug, Copy, Clone)]
854struct EventMetricsExtracted(bool);
855
856/// New type representing whether spans were extracted.
857#[derive(Debug, Copy, Clone)]
858struct SpansExtracted(bool);
859
860/// The result of the envelope processing containing the processed envelope along with the partial
861/// result.
862#[derive(Debug)]
863#[expect(
864    clippy::large_enum_variant,
865    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
866)]
867enum ProcessingResult {
868    Envelope {
869        managed_envelope: TypedEnvelope<Processed>,
870        extracted_metrics: ProcessingExtractedMetrics,
871    },
872    Output(Output<Outputs>),
873}
874
875impl ProcessingResult {
876    /// Creates a [`ProcessingResult`] with no metrics.
877    fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
878        Self::Envelope {
879            managed_envelope,
880            extracted_metrics: ProcessingExtractedMetrics::new(),
881        }
882    }
883}
884
885/// All items which can be submitted upstream.
886#[derive(Debug)]
887#[expect(
888    clippy::large_enum_variant,
889    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
890)]
891enum Submit {
892    /// A processed envelope.
893    Envelope(TypedEnvelope<Processed>),
894    /// The output of a [`processing::Processor`].
895    Output(Outputs),
896}
897
898/// Applies processing to all contents of the given envelope.
899///
900/// Depending on the contents of the envelope and Relay's mode, this includes:
901///
902///  - Basic normalization and validation for all item types.
903///  - Clock drift correction if the required `sent_at` header is present.
904///  - Expansion of certain item types (e.g. unreal).
905///  - Store normalization for event payloads in processing mode.
906///  - Rate limiters and inbound filters on events in processing mode.
907#[derive(Debug)]
908pub struct ProcessEnvelope {
909    /// Envelope to process.
910    pub envelope: ManagedEnvelope,
911    /// The project info.
912    pub project_info: Arc<ProjectInfo>,
913    /// Currently active cached rate limits for this project.
914    pub rate_limits: Arc<RateLimits>,
915    /// Root sampling project info.
916    pub sampling_project_info: Option<Arc<ProjectInfo>>,
917    /// Sampling reservoir counters.
918    pub reservoir_counters: ReservoirCounters,
919}
920
921/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
922#[derive(Debug)]
923struct ProcessEnvelopeGrouped {
924    /// The group the envelope belongs to.
925    pub group: ProcessingGroup,
926    /// Envelope to process.
927    pub envelope: ManagedEnvelope,
928    /// The project info.
929    pub project_info: Arc<ProjectInfo>,
930    /// Currently active cached rate limits for this project.
931    pub rate_limits: Arc<RateLimits>,
932    /// Root sampling project info.
933    pub sampling_project_info: Option<Arc<ProjectInfo>>,
934    /// Sampling reservoir counters.
935    pub reservoir_counters: ReservoirCounters,
936}
937
938/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
939///
940/// This parses and validates the metrics:
941///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
942///    ignored independently.
943///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
944///    dropped together on parsing failure.
945///  - Other envelope items will be ignored with an error message.
946///
947/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
948/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
949#[derive(Debug)]
950pub struct ProcessMetrics {
951    /// A list of metric items.
952    pub data: MetricData,
953    /// The target project.
954    pub project_key: ProjectKey,
955    /// Whether to keep or reset the metric metadata.
956    pub source: BucketSource,
957    /// The wall clock time at which the request was received.
958    pub received_at: DateTime<Utc>,
959    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
960    /// correction.
961    pub sent_at: Option<DateTime<Utc>>,
962}
963
964/// Raw unparsed metric data.
965#[derive(Debug)]
966pub enum MetricData {
967    /// Raw data, unparsed envelope items.
968    Raw(Vec<Item>),
969    /// Already parsed buckets but unprocessed.
970    Parsed(Vec<Bucket>),
971}
972
973impl MetricData {
974    /// Consumes the metric data and parses the contained buckets.
975    ///
976    /// If the contained data is already parsed the buckets are returned unchanged.
977    /// Raw buckets are parsed and created with the passed `timestamp`.
978    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
979        let items = match self {
980            Self::Parsed(buckets) => return buckets,
981            Self::Raw(items) => items,
982        };
983
984        let mut buckets = Vec::new();
985        for item in items {
986            let payload = item.payload();
987            if item.ty() == &ItemType::Statsd {
988                for bucket_result in Bucket::parse_all(&payload, timestamp) {
989                    match bucket_result {
990                        Ok(bucket) => buckets.push(bucket),
991                        Err(error) => relay_log::debug!(
992                            error = &error as &dyn Error,
993                            "failed to parse metric bucket from statsd format",
994                        ),
995                    }
996                }
997            } else if item.ty() == &ItemType::MetricBuckets {
998                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
999                    Ok(parsed_buckets) => {
1000                        // Re-use the allocation of `b` if possible.
1001                        if buckets.is_empty() {
1002                            buckets = parsed_buckets;
1003                        } else {
1004                            buckets.extend(parsed_buckets);
1005                        }
1006                    }
1007                    Err(error) => {
1008                        relay_log::debug!(
1009                            error = &error as &dyn Error,
1010                            "failed to parse metric bucket",
1011                        );
1012                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1013                    }
1014                }
1015            } else {
1016                relay_log::error!(
1017                    "invalid item of type {} passed to ProcessMetrics",
1018                    item.ty()
1019                );
1020            }
1021        }
1022        buckets
1023    }
1024}
1025
1026#[derive(Debug)]
1027pub struct ProcessBatchedMetrics {
1028    /// Metrics payload in JSON format.
1029    pub payload: Bytes,
1030    /// Whether to keep or reset the metric metadata.
1031    pub source: BucketSource,
1032    /// The wall clock time at which the request was received.
1033    pub received_at: DateTime<Utc>,
1034    /// The wall clock time at which the request was received.
1035    pub sent_at: Option<DateTime<Utc>>,
1036}
1037
1038/// Source information where a metric bucket originates from.
1039#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1040pub enum BucketSource {
1041    /// The metric bucket originated from an internal Relay use case.
1042    ///
1043    /// The metric bucket originates either from within the same Relay
1044    /// or was accepted coming from another Relay which is registered as
1045    /// an internal Relay via Relay's configuration.
1046    Internal,
1047    /// The bucket source originated from an untrusted source.
1048    ///
1049    /// Managed Relays sending extracted metrics are considered external,
1050    /// it's a project use case but it comes from an untrusted source.
1051    External,
1052}
1053
1054impl BucketSource {
1055    /// Infers the bucket source from [`RequestMeta::request_trust`].
1056    pub fn from_meta(meta: &RequestMeta) -> Self {
1057        match meta.request_trust() {
1058            RequestTrust::Trusted => Self::Internal,
1059            RequestTrust::Untrusted => Self::External,
1060        }
1061    }
1062}
1063
1064/// Sends a client report to the upstream.
1065#[derive(Debug)]
1066pub struct SubmitClientReports {
1067    /// The client report to be sent.
1068    pub client_reports: Vec<ClientReport>,
1069    /// Scoping information for the client report.
1070    pub scoping: Scoping,
1071}
1072
1073/// CPU-intensive processing tasks for envelopes.
1074#[derive(Debug)]
1075pub enum EnvelopeProcessor {
1076    ProcessEnvelope(Box<ProcessEnvelope>),
1077    ProcessProjectMetrics(Box<ProcessMetrics>),
1078    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1079    FlushBuckets(Box<FlushBuckets>),
1080    SubmitClientReports(Box<SubmitClientReports>),
1081}
1082
1083impl EnvelopeProcessor {
1084    /// Returns the name of the message variant.
1085    pub fn variant(&self) -> &'static str {
1086        match self {
1087            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1088            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1089            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1090            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1091            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1092        }
1093    }
1094}
1095
1096impl relay_system::Interface for EnvelopeProcessor {}
1097
1098impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1099    type Response = relay_system::NoResponse;
1100
1101    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1102        Self::ProcessEnvelope(Box::new(message))
1103    }
1104}
1105
1106impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1107    type Response = NoResponse;
1108
1109    fn from_message(message: ProcessMetrics, _: ()) -> Self {
1110        Self::ProcessProjectMetrics(Box::new(message))
1111    }
1112}
1113
1114impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1115    type Response = NoResponse;
1116
1117    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1118        Self::ProcessBatchedMetrics(Box::new(message))
1119    }
1120}
1121
1122impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1123    type Response = NoResponse;
1124
1125    fn from_message(message: FlushBuckets, _: ()) -> Self {
1126        Self::FlushBuckets(Box::new(message))
1127    }
1128}
1129
1130impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1131    type Response = NoResponse;
1132
1133    fn from_message(message: SubmitClientReports, _: ()) -> Self {
1134        Self::SubmitClientReports(Box::new(message))
1135    }
1136}
1137
1138/// The asynchronous thread pool used for scheduling processing tasks in the processor.
1139pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1140
1141/// Service implementing the [`EnvelopeProcessor`] interface.
1142///
1143/// This service handles messages in a worker pool with configurable concurrency.
1144#[derive(Clone)]
1145pub struct EnvelopeProcessorService {
1146    inner: Arc<InnerProcessor>,
1147}
1148
1149/// Contains the addresses of services that the processor publishes to.
1150pub struct Addrs {
1151    pub outcome_aggregator: Addr<TrackOutcome>,
1152    pub upstream_relay: Addr<UpstreamRelay>,
1153    #[cfg(feature = "processing")]
1154    pub store_forwarder: Option<Addr<Store>>,
1155    pub aggregator: Addr<Aggregator>,
1156    #[cfg(feature = "processing")]
1157    pub global_rate_limits: Option<Addr<GlobalRateLimits>>,
1158}
1159
1160impl Default for Addrs {
1161    fn default() -> Self {
1162        Addrs {
1163            outcome_aggregator: Addr::dummy(),
1164            upstream_relay: Addr::dummy(),
1165            #[cfg(feature = "processing")]
1166            store_forwarder: None,
1167            aggregator: Addr::dummy(),
1168            #[cfg(feature = "processing")]
1169            global_rate_limits: None,
1170        }
1171    }
1172}
1173
1174struct InnerProcessor {
1175    pool: EnvelopeProcessorServicePool,
1176    config: Arc<Config>,
1177    global_config: GlobalConfigHandle,
1178    project_cache: ProjectCacheHandle,
1179    cogs: Cogs,
1180    #[cfg(feature = "processing")]
1181    quotas_client: Option<AsyncRedisClient>,
1182    addrs: Addrs,
1183    #[cfg(feature = "processing")]
1184    rate_limiter: Option<Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>>,
1185    geoip_lookup: GeoIpLookup,
1186    #[cfg(feature = "processing")]
1187    cardinality_limiter: Option<CardinalityLimiter>,
1188    metric_outcomes: MetricOutcomes,
1189    processing: Processing,
1190}
1191
1192struct Processing {
1193    logs: LogsProcessor,
1194    spans: SpansProcessor,
1195}
1196
1197impl EnvelopeProcessorService {
1198    /// Creates a multi-threaded envelope processor.
1199    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1200    pub fn new(
1201        pool: EnvelopeProcessorServicePool,
1202        config: Arc<Config>,
1203        global_config: GlobalConfigHandle,
1204        project_cache: ProjectCacheHandle,
1205        cogs: Cogs,
1206        #[cfg(feature = "processing")] redis: Option<RedisClients>,
1207        addrs: Addrs,
1208        metric_outcomes: MetricOutcomes,
1209    ) -> Self {
1210        let geoip_lookup = config
1211            .geoip_path()
1212            .and_then(
1213                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1214                    Ok(geoip) => Some(geoip),
1215                    Err(err) => {
1216                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1217                        None
1218                    }
1219                },
1220            )
1221            .unwrap_or_else(GeoIpLookup::empty);
1222
1223        #[cfg(feature = "processing")]
1224        let (cardinality, quotas) = match redis {
1225            Some(RedisClients {
1226                cardinality,
1227                quotas,
1228                ..
1229            }) => (Some(cardinality), Some(quotas)),
1230            None => (None, None),
1231        };
1232
1233        #[cfg(feature = "processing")]
1234        let global_rate_limits = addrs.global_rate_limits.clone().map(Into::into);
1235
1236        #[cfg(feature = "processing")]
1237        let rate_limiter = match (quotas.clone(), global_rate_limits) {
1238            (Some(redis), Some(global)) => {
1239                Some(RedisRateLimiter::new(redis, global).max_limit(config.max_rate_limit()))
1240            }
1241            _ => None,
1242        };
1243
1244        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1245            #[cfg(feature = "processing")]
1246            project_cache.clone(),
1247            #[cfg(feature = "processing")]
1248            rate_limiter.clone(),
1249        ));
1250        #[cfg(feature = "processing")]
1251        let rate_limiter = rate_limiter.map(Arc::new);
1252
1253        let inner = InnerProcessor {
1254            pool,
1255            global_config,
1256            project_cache,
1257            cogs,
1258            #[cfg(feature = "processing")]
1259            quotas_client: quotas.clone(),
1260            #[cfg(feature = "processing")]
1261            rate_limiter,
1262            addrs,
1263            #[cfg(feature = "processing")]
1264            cardinality_limiter: cardinality
1265                .map(|cardinality| {
1266                    RedisSetLimiter::new(
1267                        RedisSetLimiterOptions {
1268                            cache_vacuum_interval: config
1269                                .cardinality_limiter_cache_vacuum_interval(),
1270                        },
1271                        cardinality,
1272                    )
1273                })
1274                .map(CardinalityLimiter::new),
1275            metric_outcomes,
1276            processing: Processing {
1277                logs: LogsProcessor::new(Arc::clone(&quota_limiter)),
1278                spans: SpansProcessor::new(quota_limiter, geoip_lookup.clone()),
1279            },
1280            geoip_lookup,
1281            config,
1282        };
1283
1284        Self {
1285            inner: Arc::new(inner),
1286        }
1287    }
1288
1289    /// Normalize monitor check-ins and remove invalid ones.
1290    #[cfg(feature = "processing")]
1291    fn normalize_checkins(
1292        &self,
1293        managed_envelope: &mut TypedEnvelope<CheckInGroup>,
1294        project_id: ProjectId,
1295    ) {
1296        managed_envelope.retain_items(|item| {
1297            if item.ty() != &ItemType::CheckIn {
1298                return ItemAction::Keep;
1299            }
1300
1301            match relay_monitors::process_check_in(&item.payload(), project_id) {
1302                Ok(result) => {
1303                    item.set_routing_hint(result.routing_hint);
1304                    item.set_payload(ContentType::Json, result.payload);
1305                    ItemAction::Keep
1306                }
1307                Err(error) => {
1308                    // TODO: Track an outcome.
1309                    relay_log::debug!(
1310                        error = &error as &dyn Error,
1311                        "dropped invalid monitor check-in"
1312                    );
1313                    ItemAction::DropSilently
1314                }
1315            }
1316        })
1317    }
1318
1319    async fn enforce_quotas<Group>(
1320        &self,
1321        managed_envelope: &mut TypedEnvelope<Group>,
1322        event: Annotated<Event>,
1323        extracted_metrics: &mut ProcessingExtractedMetrics,
1324        project_info: &ProjectInfo,
1325        rate_limits: &RateLimits,
1326    ) -> Result<Annotated<Event>, ProcessingError> {
1327        let global_config = self.inner.global_config.current();
1328        // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not
1329        // applied in the fast path, all cached quotas can be applied here.
1330        let cached_result = RateLimiter::Cached
1331            .enforce(
1332                managed_envelope,
1333                event,
1334                extracted_metrics,
1335                &global_config,
1336                project_info,
1337                rate_limits,
1338            )
1339            .await?;
1340
1341        if_processing!(self.inner.config, {
1342            let rate_limiter = match self.inner.rate_limiter.clone() {
1343                Some(rate_limiter) => rate_limiter,
1344                None => return Ok(cached_result.event),
1345            };
1346
1347            // Enforce all quotas consistently with Redis.
1348            let consistent_result = RateLimiter::Consistent(rate_limiter)
1349                .enforce(
1350                    managed_envelope,
1351                    cached_result.event,
1352                    extracted_metrics,
1353                    &global_config,
1354                    project_info,
1355                    rate_limits,
1356                )
1357                .await?;
1358
1359            // Update cached rate limits with the freshly computed ones.
1360            if !consistent_result.rate_limits.is_empty() {
1361                self.inner
1362                    .project_cache
1363                    .get(managed_envelope.scoping().project_key)
1364                    .rate_limits()
1365                    .merge(consistent_result.rate_limits);
1366            }
1367
1368            Ok(consistent_result.event)
1369        } else { Ok(cached_result.event) })
1370    }
1371
1372    /// Extract transaction metrics.
1373    #[allow(clippy::too_many_arguments)]
1374    fn extract_transaction_metrics(
1375        &self,
1376        managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1377        event: &mut Annotated<Event>,
1378        extracted_metrics: &mut ProcessingExtractedMetrics,
1379        project_id: ProjectId,
1380        project_info: Arc<ProjectInfo>,
1381        sampling_decision: SamplingDecision,
1382        event_metrics_extracted: EventMetricsExtracted,
1383        spans_extracted: SpansExtracted,
1384    ) -> Result<EventMetricsExtracted, ProcessingError> {
1385        if event_metrics_extracted.0 {
1386            return Ok(event_metrics_extracted);
1387        }
1388        let Some(event) = event.value_mut() else {
1389            return Ok(event_metrics_extracted);
1390        };
1391
1392        // NOTE: This function requires a `metric_extraction` in the project config. Legacy configs
1393        // will upsert this configuration from transaction and conditional tagging fields, even if
1394        // it is not present in the actual project config payload.
1395        let global = self.inner.global_config.current();
1396        let combined_config = {
1397            let config = match &project_info.config.metric_extraction {
1398                ErrorBoundary::Ok(config) if config.is_supported() => config,
1399                _ => return Ok(event_metrics_extracted),
1400            };
1401            let global_config = match &global.metric_extraction {
1402                ErrorBoundary::Ok(global_config) => global_config,
1403                #[allow(unused_variables)]
1404                ErrorBoundary::Err(e) => {
1405                    if_processing!(self.inner.config, {
1406                        // Config is invalid, but we will try to extract what we can with just the
1407                        // project config.
1408                        relay_log::error!("Failed to parse global extraction config {e}");
1409                        MetricExtractionGroups::EMPTY
1410                    } else {
1411                        // If there's an error with global metrics extraction, it is safe to assume that this
1412                        // Relay instance is not up-to-date, and we should skip extraction.
1413                        relay_log::debug!("Failed to parse global extraction config: {e}");
1414                        return Ok(event_metrics_extracted);
1415                    })
1416                }
1417            };
1418            CombinedMetricExtractionConfig::new(global_config, config)
1419        };
1420
1421        // Require a valid transaction metrics config.
1422        let tx_config = match &project_info.config.transaction_metrics {
1423            Some(ErrorBoundary::Ok(tx_config)) => tx_config,
1424            Some(ErrorBoundary::Err(e)) => {
1425                relay_log::debug!("Failed to parse legacy transaction metrics config: {e}");
1426                return Ok(event_metrics_extracted);
1427            }
1428            None => {
1429                relay_log::debug!("Legacy transaction metrics config is missing");
1430                return Ok(event_metrics_extracted);
1431            }
1432        };
1433
1434        if !tx_config.is_enabled() {
1435            static TX_CONFIG_ERROR: Once = Once::new();
1436            TX_CONFIG_ERROR.call_once(|| {
1437                if self.inner.config.processing_enabled() {
1438                    relay_log::error!(
1439                        "Processing Relay outdated, received tx config in version {}, which is not supported",
1440                        tx_config.version
1441                    );
1442                }
1443            });
1444
1445            return Ok(event_metrics_extracted);
1446        }
1447
1448        // If spans were already extracted for an event, we rely on span processing to extract metrics.
1449        let extract_spans = !spans_extracted.0
1450            && utils::sample(global.options.span_extraction_sample_rate.unwrap_or(1.0)).is_keep();
1451
1452        let metrics = crate::metrics_extraction::event::extract_metrics(
1453            event,
1454            combined_config,
1455            sampling_decision,
1456            project_id,
1457            self.inner
1458                .config
1459                .aggregator_config_for(MetricNamespace::Spans)
1460                .max_tag_value_length,
1461            extract_spans,
1462        );
1463
1464        extracted_metrics.extend(metrics, Some(sampling_decision));
1465
1466        if !project_info.has_feature(Feature::DiscardTransaction) {
1467            let transaction_from_dsc = managed_envelope
1468                .envelope()
1469                .dsc()
1470                .and_then(|dsc| dsc.transaction.as_deref());
1471
1472            let extractor = TransactionExtractor {
1473                config: tx_config,
1474                generic_config: Some(combined_config),
1475                transaction_from_dsc,
1476                sampling_decision,
1477                target_project_id: project_id,
1478            };
1479
1480            extracted_metrics.extend(extractor.extract(event)?, Some(sampling_decision));
1481        }
1482
1483        Ok(EventMetricsExtracted(true))
1484    }
1485
1486    fn normalize_event<Group: EventProcessing>(
1487        &self,
1488        managed_envelope: &mut TypedEnvelope<Group>,
1489        event: &mut Annotated<Event>,
1490        project_id: ProjectId,
1491        project_info: Arc<ProjectInfo>,
1492        mut event_fully_normalized: EventFullyNormalized,
1493    ) -> Result<EventFullyNormalized, ProcessingError> {
1494        if event.value().is_empty() {
1495            // NOTE(iker): only processing relays create events from
1496            // attachments, so these events won't be normalized in
1497            // non-processing relays even if the config is set to run full
1498            // normalization.
1499            return Ok(event_fully_normalized);
1500        }
1501
1502        let full_normalization = match self.inner.config.normalization_level() {
1503            NormalizationLevel::Full => true,
1504            NormalizationLevel::Default => {
1505                if self.inner.config.processing_enabled() && event_fully_normalized.0 {
1506                    return Ok(event_fully_normalized);
1507                }
1508
1509                self.inner.config.processing_enabled()
1510            }
1511        };
1512
1513        let request_meta = managed_envelope.envelope().meta();
1514        let client_ipaddr = request_meta.client_addr().map(IpAddr::from);
1515
1516        let transaction_aggregator_config = self
1517            .inner
1518            .config
1519            .aggregator_config_for(MetricNamespace::Transactions);
1520
1521        let global_config = self.inner.global_config.current();
1522        let ai_model_costs = global_config.ai_model_costs.as_ref().ok();
1523        let ai_operation_type_map = global_config.ai_operation_type_map.as_ref().ok();
1524        let http_span_allowed_hosts = global_config.options.http_span_allowed_hosts.as_slice();
1525
1526        let retention_days: i64 = project_info
1527            .config
1528            .event_retention
1529            .unwrap_or(DEFAULT_EVENT_RETENTION)
1530            .into();
1531
1532        utils::log_transaction_name_metrics(event, |event| {
1533            let event_validation_config = EventValidationConfig {
1534                received_at: Some(managed_envelope.received_at()),
1535                max_secs_in_past: Some(retention_days * 24 * 3600),
1536                max_secs_in_future: Some(self.inner.config.max_secs_in_future()),
1537                transaction_timestamp_range: Some(transaction_aggregator_config.timestamp_range()),
1538                is_validated: false,
1539            };
1540
1541            let key_id = project_info
1542                .get_public_key_config()
1543                .and_then(|key| Some(key.numeric_id?.to_string()));
1544            if full_normalization && key_id.is_none() {
1545                relay_log::error!(
1546                    "project state for key {} is missing key id",
1547                    managed_envelope.envelope().meta().public_key()
1548                );
1549            }
1550
1551            let normalization_config = NormalizationConfig {
1552                project_id: Some(project_id.value()),
1553                client: request_meta.client().map(str::to_owned),
1554                key_id,
1555                protocol_version: Some(request_meta.version().to_string()),
1556                grouping_config: project_info.config.grouping_config.clone(),
1557                client_ip: client_ipaddr.as_ref(),
1558                // if the setting is enabled we do not want to infer the ip address
1559                infer_ip_address: !project_info
1560                    .config
1561                    .datascrubbing_settings
1562                    .scrub_ip_addresses,
1563                client_sample_rate: managed_envelope
1564                    .envelope()
1565                    .dsc()
1566                    .and_then(|ctx| ctx.sample_rate),
1567                user_agent: RawUserAgentInfo {
1568                    user_agent: request_meta.user_agent(),
1569                    client_hints: request_meta.client_hints(),
1570                },
1571                max_name_and_unit_len: Some(
1572                    transaction_aggregator_config
1573                        .max_name_length
1574                        .saturating_sub(MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD),
1575                ),
1576                breakdowns_config: project_info.config.breakdowns_v2.as_ref(),
1577                performance_score: project_info.config.performance_score.as_ref(),
1578                normalize_user_agent: Some(true),
1579                transaction_name_config: TransactionNameConfig {
1580                    rules: &project_info.config.tx_name_rules,
1581                },
1582                device_class_synthesis_config: project_info
1583                    .has_feature(Feature::DeviceClassSynthesis),
1584                enrich_spans: project_info.has_feature(Feature::ExtractSpansFromEvent),
1585                max_tag_value_length: self
1586                    .inner
1587                    .config
1588                    .aggregator_config_for(MetricNamespace::Spans)
1589                    .max_tag_value_length,
1590                is_renormalize: false,
1591                remove_other: full_normalization,
1592                emit_event_errors: full_normalization,
1593                span_description_rules: project_info.config.span_description_rules.as_ref(),
1594                geoip_lookup: Some(&self.inner.geoip_lookup),
1595                ai_model_costs,
1596                ai_operation_type_map,
1597                enable_trimming: true,
1598                measurements: Some(CombinedMeasurementsConfig::new(
1599                    project_info.config().measurements.as_ref(),
1600                    global_config.measurements.as_ref(),
1601                )),
1602                normalize_spans: true,
1603                replay_id: managed_envelope
1604                    .envelope()
1605                    .dsc()
1606                    .and_then(|ctx| ctx.replay_id),
1607                span_allowed_hosts: http_span_allowed_hosts,
1608                span_op_defaults: global_config.span_op_defaults.borrow(),
1609                performance_issues_spans: project_info.has_feature(Feature::PerformanceIssuesSpans),
1610            };
1611
1612            metric!(timer(RelayTimers::EventProcessingNormalization), {
1613                validate_event(event, &event_validation_config)
1614                    .map_err(|_| ProcessingError::InvalidTransaction)?;
1615                normalize_event(event, &normalization_config);
1616                if full_normalization && event::has_unprintable_fields(event) {
1617                    metric!(counter(RelayCounters::EventCorrupted) += 1);
1618                }
1619                Result::<(), ProcessingError>::Ok(())
1620            })
1621        })?;
1622
1623        event_fully_normalized.0 |= full_normalization;
1624
1625        Ok(event_fully_normalized)
1626    }
1627
1628    /// Processes the general errors, and the items which require or create the events.
1629    async fn process_errors(
1630        &self,
1631        managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1632        project_id: ProjectId,
1633        project_info: Arc<ProjectInfo>,
1634        mut sampling_project_info: Option<Arc<ProjectInfo>>,
1635        rate_limits: Arc<RateLimits>,
1636    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1637        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1638        let mut metrics = Metrics::default();
1639        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1640
1641        // Events can also contain user reports.
1642        report::process_user_reports(managed_envelope);
1643
1644        if_processing!(self.inner.config, {
1645            unreal::expand(managed_envelope, &self.inner.config)?;
1646            #[cfg(sentry)]
1647            playstation::expand(managed_envelope, &self.inner.config, &project_info)?;
1648            nnswitch::expand(managed_envelope)?;
1649        });
1650
1651        let extraction_result = event::extract(
1652            managed_envelope,
1653            &mut metrics,
1654            event_fully_normalized,
1655            &self.inner.config,
1656        )?;
1657        let mut event = extraction_result.event;
1658
1659        if_processing!(self.inner.config, {
1660            if let Some(inner_event_fully_normalized) =
1661                unreal::process(managed_envelope, &mut event)?
1662            {
1663                event_fully_normalized = inner_event_fully_normalized;
1664            }
1665            #[cfg(sentry)]
1666            if let Some(inner_event_fully_normalized) =
1667                playstation::process(managed_envelope, &mut event, &project_info)?
1668            {
1669                event_fully_normalized = inner_event_fully_normalized;
1670            }
1671            if let Some(inner_event_fully_normalized) =
1672                attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1673            {
1674                event_fully_normalized = inner_event_fully_normalized;
1675            }
1676        });
1677
1678        sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1679            managed_envelope,
1680            &mut event,
1681            project_info.clone(),
1682            sampling_project_info,
1683        );
1684        event::finalize(
1685            managed_envelope,
1686            &mut event,
1687            &mut metrics,
1688            &self.inner.config,
1689        )?;
1690        event_fully_normalized = self.normalize_event(
1691            managed_envelope,
1692            &mut event,
1693            project_id,
1694            project_info.clone(),
1695            event_fully_normalized,
1696        )?;
1697        let filter_run = event::filter(
1698            managed_envelope,
1699            &mut event,
1700            project_info.clone(),
1701            &self.inner.global_config.current(),
1702        )?;
1703
1704        if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1705            dynamic_sampling::tag_error_with_sampling_decision(
1706                managed_envelope,
1707                &mut event,
1708                sampling_project_info,
1709                &self.inner.config,
1710            )
1711            .await;
1712        }
1713
1714        event = self
1715            .enforce_quotas(
1716                managed_envelope,
1717                event,
1718                &mut extracted_metrics,
1719                &project_info,
1720                &rate_limits,
1721            )
1722            .await?;
1723
1724        if event.value().is_some() {
1725            event::scrub(&mut event, project_info.clone())?;
1726            event::serialize(
1727                managed_envelope,
1728                &mut event,
1729                event_fully_normalized,
1730                EventMetricsExtracted(false),
1731                SpansExtracted(false),
1732            )?;
1733            event::emit_feedback_metrics(managed_envelope.envelope());
1734        }
1735
1736        attachment::scrub(managed_envelope, project_info);
1737
1738        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1739            relay_log::error!(
1740                tags.project = %project_id,
1741                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1742                "ingested event without normalizing"
1743            );
1744        }
1745
1746        Ok(Some(extracted_metrics))
1747    }
1748
1749    /// Processes only transactions and transaction-related items.
1750    #[allow(unused_assignments)]
1751    #[allow(clippy::too_many_arguments)]
1752    async fn process_transactions(
1753        &self,
1754        managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1755        cogs: &mut Token,
1756        config: Arc<Config>,
1757        project_id: ProjectId,
1758        project_info: Arc<ProjectInfo>,
1759        mut sampling_project_info: Option<Arc<ProjectInfo>>,
1760        rate_limits: Arc<RateLimits>,
1761        reservoir_counters: ReservoirCounters,
1762    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1763        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1764        let mut event_metrics_extracted = EventMetricsExtracted(false);
1765        let mut spans_extracted = SpansExtracted(false);
1766        let mut metrics = Metrics::default();
1767        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1768
1769        let global_config = self.inner.global_config.current();
1770
1771        // We extract the main event from the envelope.
1772        let extraction_result = event::extract(
1773            managed_envelope,
1774            &mut metrics,
1775            event_fully_normalized,
1776            &self.inner.config,
1777        )?;
1778
1779        // If metrics were extracted we mark that.
1780        if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1781            event_metrics_extracted = inner_event_metrics_extracted;
1782        }
1783        if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1784            spans_extracted = inner_spans_extracted;
1785        };
1786
1787        // We take the main event out of the result.
1788        let mut event = extraction_result.event;
1789
1790        let profile_id = profile::filter(
1791            managed_envelope,
1792            &event,
1793            config.clone(),
1794            project_id,
1795            &project_info,
1796        );
1797        profile::transfer_id(&mut event, profile_id);
1798
1799        sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1800            managed_envelope,
1801            &mut event,
1802            project_info.clone(),
1803            sampling_project_info,
1804        );
1805
1806        event::finalize(
1807            managed_envelope,
1808            &mut event,
1809            &mut metrics,
1810            &self.inner.config,
1811        )?;
1812
1813        event_fully_normalized = self.normalize_event(
1814            managed_envelope,
1815            &mut event,
1816            project_id,
1817            project_info.clone(),
1818            event_fully_normalized,
1819        )?;
1820
1821        let filter_run = event::filter(
1822            managed_envelope,
1823            &mut event,
1824            project_info.clone(),
1825            &self.inner.global_config.current(),
1826        )?;
1827
1828        // Always run dynamic sampling on processing Relays,
1829        // but delay decision until inbound filters have been fully processed.
1830        let run_dynamic_sampling =
1831            matches!(filter_run, FiltersStatus::Ok) || self.inner.config.processing_enabled();
1832
1833        let reservoir = self.new_reservoir_evaluator(
1834            managed_envelope.scoping().organization_id,
1835            reservoir_counters,
1836        );
1837
1838        let sampling_result = match run_dynamic_sampling {
1839            true => {
1840                dynamic_sampling::run(
1841                    managed_envelope,
1842                    &mut event,
1843                    config.clone(),
1844                    project_info.clone(),
1845                    sampling_project_info,
1846                    &reservoir,
1847                )
1848                .await
1849            }
1850            false => SamplingResult::Pending,
1851        };
1852
1853        #[cfg(feature = "processing")]
1854        let server_sample_rate = sampling_result.sample_rate();
1855
1856        if let Some(outcome) = sampling_result.into_dropped_outcome() {
1857            // Process profiles before dropping the transaction, if necessary.
1858            // Before metric extraction to make sure the profile count is reflected correctly.
1859            profile::process(
1860                managed_envelope,
1861                &mut event,
1862                &global_config,
1863                config.clone(),
1864                project_info.clone(),
1865            );
1866            // Extract metrics here, we're about to drop the event/transaction.
1867            event_metrics_extracted = self.extract_transaction_metrics(
1868                managed_envelope,
1869                &mut event,
1870                &mut extracted_metrics,
1871                project_id,
1872                project_info.clone(),
1873                SamplingDecision::Drop,
1874                event_metrics_extracted,
1875                spans_extracted,
1876            )?;
1877
1878            dynamic_sampling::drop_unsampled_items(
1879                managed_envelope,
1880                event,
1881                outcome,
1882                spans_extracted,
1883            );
1884
1885            // At this point we have:
1886            //  - An empty envelope.
1887            //  - An envelope containing only processed profiles.
1888            // We need to make sure there are enough quotas for these profiles.
1889            event = self
1890                .enforce_quotas(
1891                    managed_envelope,
1892                    Annotated::empty(),
1893                    &mut extracted_metrics,
1894                    &project_info,
1895                    &rate_limits,
1896                )
1897                .await?;
1898
1899            return Ok(Some(extracted_metrics));
1900        }
1901
1902        let _post_ds = cogs.start_category("post_ds");
1903
1904        // Need to scrub the transaction before extracting spans.
1905        //
1906        // Unconditionally scrub to make sure PII is removed as early as possible.
1907        event::scrub(&mut event, project_info.clone())?;
1908
1909        // TODO: remove once `relay.drop-transaction-attachments` has graduated.
1910        attachment::scrub(managed_envelope, project_info.clone());
1911
1912        if_processing!(self.inner.config, {
1913            // Process profiles before extracting metrics, to make sure they are removed if they are invalid.
1914            let profile_id = profile::process(
1915                managed_envelope,
1916                &mut event,
1917                &global_config,
1918                config.clone(),
1919                project_info.clone(),
1920            );
1921            profile::transfer_id(&mut event, profile_id);
1922            profile::scrub_profiler_id(&mut event);
1923
1924            // Always extract metrics in processing Relays for sampled items.
1925            event_metrics_extracted = self.extract_transaction_metrics(
1926                managed_envelope,
1927                &mut event,
1928                &mut extracted_metrics,
1929                project_id,
1930                project_info.clone(),
1931                SamplingDecision::Keep,
1932                event_metrics_extracted,
1933                spans_extracted,
1934            )?;
1935
1936            if project_info.has_feature(Feature::ExtractSpansFromEvent) {
1937                spans_extracted = span::extract_from_event(
1938                    managed_envelope,
1939                    &event,
1940                    &global_config,
1941                    config,
1942                    server_sample_rate,
1943                    event_metrics_extracted,
1944                    spans_extracted,
1945                );
1946            }
1947        });
1948
1949        event = self
1950            .enforce_quotas(
1951                managed_envelope,
1952                event,
1953                &mut extracted_metrics,
1954                &project_info,
1955                &rate_limits,
1956            )
1957            .await?;
1958
1959        if_processing!(self.inner.config, {
1960            event = span::maybe_discard_transaction(managed_envelope, event, project_info);
1961        });
1962
1963        // Event may have been dropped because of a quota and the envelope can be empty.
1964        if event.value().is_some() {
1965            event::serialize(
1966                managed_envelope,
1967                &mut event,
1968                event_fully_normalized,
1969                event_metrics_extracted,
1970                spans_extracted,
1971            )?;
1972        }
1973
1974        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1975            relay_log::error!(
1976                tags.project = %project_id,
1977                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1978                "ingested event without normalizing"
1979            );
1980        };
1981
1982        Ok(Some(extracted_metrics))
1983    }
1984
1985    async fn process_profile_chunks(
1986        &self,
1987        managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
1988        project_info: Arc<ProjectInfo>,
1989        rate_limits: Arc<RateLimits>,
1990    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1991        profile_chunk::filter(managed_envelope, project_info.clone());
1992
1993        if_processing!(self.inner.config, {
1994            profile_chunk::process(
1995                managed_envelope,
1996                &project_info,
1997                &self.inner.global_config.current(),
1998                &self.inner.config,
1999            );
2000        });
2001
2002        self.enforce_quotas(
2003            managed_envelope,
2004            Annotated::empty(),
2005            &mut ProcessingExtractedMetrics::new(),
2006            &project_info,
2007            &rate_limits,
2008        )
2009        .await?;
2010
2011        Ok(None)
2012    }
2013
2014    /// Processes standalone items that require an event ID, but do not have an event on the same envelope.
2015    async fn process_standalone(
2016        &self,
2017        managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
2018        config: Arc<Config>,
2019        project_id: ProjectId,
2020        project_info: Arc<ProjectInfo>,
2021        rate_limits: Arc<RateLimits>,
2022    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2023        let mut extracted_metrics = ProcessingExtractedMetrics::new();
2024
2025        standalone::process(managed_envelope);
2026
2027        profile::filter(
2028            managed_envelope,
2029            &Annotated::empty(),
2030            config,
2031            project_id,
2032            &project_info,
2033        );
2034
2035        self.enforce_quotas(
2036            managed_envelope,
2037            Annotated::empty(),
2038            &mut extracted_metrics,
2039            &project_info,
2040            &rate_limits,
2041        )
2042        .await?;
2043
2044        report::process_user_reports(managed_envelope);
2045        attachment::scrub(managed_envelope, project_info);
2046
2047        Ok(Some(extracted_metrics))
2048    }
2049
2050    /// Processes user sessions.
2051    async fn process_sessions(
2052        &self,
2053        managed_envelope: &mut TypedEnvelope<SessionGroup>,
2054        config: &Config,
2055        project_info: &ProjectInfo,
2056        rate_limits: &RateLimits,
2057    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2058        let mut extracted_metrics = ProcessingExtractedMetrics::new();
2059
2060        session::process(
2061            managed_envelope,
2062            &self.inner.global_config.current(),
2063            config,
2064            &mut extracted_metrics,
2065            project_info,
2066        );
2067
2068        self.enforce_quotas(
2069            managed_envelope,
2070            Annotated::empty(),
2071            &mut extracted_metrics,
2072            project_info,
2073            rate_limits,
2074        )
2075        .await?;
2076
2077        Ok(Some(extracted_metrics))
2078    }
2079
2080    /// Processes user and client reports.
2081    async fn process_client_reports(
2082        &self,
2083        managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
2084        config: Arc<Config>,
2085        project_info: Arc<ProjectInfo>,
2086        rate_limits: Arc<RateLimits>,
2087    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2088        let mut extracted_metrics = ProcessingExtractedMetrics::new();
2089
2090        self.enforce_quotas(
2091            managed_envelope,
2092            Annotated::empty(),
2093            &mut extracted_metrics,
2094            &project_info,
2095            &rate_limits,
2096        )
2097        .await?;
2098
2099        report::process_client_reports(
2100            managed_envelope,
2101            &config,
2102            &project_info,
2103            self.inner.addrs.outcome_aggregator.clone(),
2104        );
2105
2106        Ok(Some(extracted_metrics))
2107    }
2108
2109    /// Processes replays.
2110    async fn process_replays(
2111        &self,
2112        managed_envelope: &mut TypedEnvelope<ReplayGroup>,
2113        config: Arc<Config>,
2114        project_info: Arc<ProjectInfo>,
2115        rate_limits: Arc<RateLimits>,
2116    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2117        let mut extracted_metrics = ProcessingExtractedMetrics::new();
2118
2119        replay::process(
2120            managed_envelope,
2121            &self.inner.global_config.current(),
2122            &config,
2123            &project_info,
2124            &self.inner.geoip_lookup,
2125        )?;
2126
2127        self.enforce_quotas(
2128            managed_envelope,
2129            Annotated::empty(),
2130            &mut extracted_metrics,
2131            &project_info,
2132            &rate_limits,
2133        )
2134        .await?;
2135
2136        Ok(Some(extracted_metrics))
2137    }
2138
2139    /// Processes cron check-ins.
2140    async fn process_checkins(
2141        &self,
2142        managed_envelope: &mut TypedEnvelope<CheckInGroup>,
2143        _project_id: ProjectId,
2144        project_info: Arc<ProjectInfo>,
2145        rate_limits: Arc<RateLimits>,
2146    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2147        self.enforce_quotas(
2148            managed_envelope,
2149            Annotated::empty(),
2150            &mut ProcessingExtractedMetrics::new(),
2151            &project_info,
2152            &rate_limits,
2153        )
2154        .await?;
2155
2156        if_processing!(self.inner.config, {
2157            self.normalize_checkins(managed_envelope, _project_id);
2158        });
2159
2160        Ok(None)
2161    }
2162
2163    async fn process_nel(
2164        &self,
2165        mut managed_envelope: ManagedEnvelope,
2166        ctx: processing::Context<'_>,
2167    ) -> Result<ProcessingResult, ProcessingError> {
2168        nel::convert_to_logs(&mut managed_envelope);
2169        self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
2170            .await
2171    }
2172
2173    async fn process_with_processor<P: processing::Processor>(
2174        &self,
2175        processor: &P,
2176        mut managed_envelope: ManagedEnvelope,
2177        ctx: processing::Context<'_>,
2178    ) -> Result<ProcessingResult, ProcessingError>
2179    where
2180        Outputs: From<P::Output>,
2181    {
2182        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
2183            debug_assert!(
2184                false,
2185                "there must be work for the {} processor",
2186                std::any::type_name::<P>(),
2187            );
2188            return Err(ProcessingError::ProcessingGroupMismatch);
2189        };
2190
2191        managed_envelope.update();
2192        match managed_envelope.envelope().is_empty() {
2193            true => managed_envelope.accept(),
2194            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
2195        }
2196
2197        processor
2198            .process(work, ctx)
2199            .await
2200            .map_err(|_| ProcessingError::ProcessingFailure)
2201            .map(|o| o.map(Into::into))
2202            .map(ProcessingResult::Output)
2203    }
2204
2205    /// Processes standalone spans.
2206    ///
2207    /// This function does *not* run for spans extracted from transactions.
2208    #[allow(clippy::too_many_arguments)]
2209    async fn process_standalone_spans(
2210        &self,
2211        managed_envelope: &mut TypedEnvelope<SpanGroup>,
2212        config: Arc<Config>,
2213        _project_id: ProjectId,
2214        project_info: Arc<ProjectInfo>,
2215        _sampling_project_info: Option<Arc<ProjectInfo>>,
2216        rate_limits: Arc<RateLimits>,
2217        _reservoir_counters: ReservoirCounters,
2218    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2219        let mut extracted_metrics = ProcessingExtractedMetrics::new();
2220
2221        span::expand_v2_spans(managed_envelope)?;
2222        span::filter(managed_envelope, config.clone(), project_info.clone());
2223        span::convert_otel_traces_data(managed_envelope);
2224
2225        if_processing!(self.inner.config, {
2226            let global_config = self.inner.global_config.current();
2227            let reservoir = self.new_reservoir_evaluator(
2228                managed_envelope.scoping().organization_id,
2229                _reservoir_counters,
2230            );
2231
2232            span::process(
2233                managed_envelope,
2234                &mut Annotated::empty(),
2235                &mut extracted_metrics,
2236                &global_config,
2237                config,
2238                _project_id,
2239                project_info.clone(),
2240                _sampling_project_info,
2241                &self.inner.geoip_lookup,
2242                &reservoir,
2243            )
2244            .await;
2245        });
2246
2247        self.enforce_quotas(
2248            managed_envelope,
2249            Annotated::empty(),
2250            &mut extracted_metrics,
2251            &project_info,
2252            &rate_limits,
2253        )
2254        .await?;
2255
2256        Ok(Some(extracted_metrics))
2257    }
2258
2259    async fn process_envelope(
2260        &self,
2261        cogs: &mut Token,
2262        project_id: ProjectId,
2263        message: ProcessEnvelopeGrouped,
2264    ) -> Result<ProcessingResult, ProcessingError> {
2265        let ProcessEnvelopeGrouped {
2266            group,
2267            envelope: mut managed_envelope,
2268            project_info,
2269            rate_limits,
2270            sampling_project_info,
2271            reservoir_counters,
2272        } = message;
2273
2274        // Pre-process the envelope headers.
2275        if let Some(sampling_state) = sampling_project_info.as_ref() {
2276            // Both transactions and standalone span envelopes need a normalized DSC header
2277            // to make sampling rules based on the segment/transaction name work correctly.
2278            managed_envelope
2279                .envelope_mut()
2280                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
2281        }
2282
2283        // Set the event retention. Effectively, this value will only be available in processing
2284        // mode when the full project config is queried from the upstream.
2285        if let Some(retention) = project_info.config.event_retention {
2286            managed_envelope.envelope_mut().set_retention(retention);
2287        }
2288
2289        // Set the event retention. Effectively, this value will only be available in processing
2290        // mode when the full project config is queried from the upstream.
2291        if let Some(retention) = project_info.config.downsampled_event_retention {
2292            managed_envelope
2293                .envelope_mut()
2294                .set_downsampled_retention(retention);
2295        }
2296
2297        // Ensure the project ID is updated to the stored instance for this project cache. This can
2298        // differ in two cases:
2299        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
2300        //  2. The DSN was moved and the envelope sent to the old project ID.
2301        managed_envelope
2302            .envelope_mut()
2303            .meta_mut()
2304            .set_project_id(project_id);
2305
2306        macro_rules! run {
2307            ($fn_name:ident $(, $args:expr)*) => {
2308                async {
2309                    let mut managed_envelope = (managed_envelope, group).try_into()?;
2310                    match self.$fn_name(&mut managed_envelope, $($args),*).await {
2311                        Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
2312                            managed_envelope: managed_envelope.into_processed(),
2313                            extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
2314                        }),
2315                        Err(error) => {
2316                            relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
2317                            if let Some(outcome) = error.to_outcome() {
2318                                managed_envelope.reject(outcome);
2319                            }
2320
2321                            return Err(error);
2322                        }
2323                    }
2324                }.await
2325            };
2326        }
2327
2328        let global_config = self.inner.global_config.current();
2329        let ctx = processing::Context {
2330            config: &self.inner.config,
2331            global_config: &global_config,
2332            project_info: &project_info,
2333            sampling_project_info: sampling_project_info.as_deref(),
2334            rate_limits: &rate_limits,
2335        };
2336
2337        relay_log::trace!("Processing {group} group", group = group.variant());
2338
2339        match group {
2340            ProcessingGroup::Error => run!(
2341                process_errors,
2342                project_id,
2343                project_info,
2344                sampling_project_info,
2345                rate_limits
2346            ),
2347            ProcessingGroup::Transaction => {
2348                run!(
2349                    process_transactions,
2350                    cogs,
2351                    self.inner.config.clone(),
2352                    project_id,
2353                    project_info,
2354                    sampling_project_info,
2355                    rate_limits,
2356                    reservoir_counters
2357                )
2358            }
2359            ProcessingGroup::Session => run!(
2360                process_sessions,
2361                &self.inner.config.clone(),
2362                &project_info,
2363                &rate_limits
2364            ),
2365            ProcessingGroup::Standalone => run!(
2366                process_standalone,
2367                self.inner.config.clone(),
2368                project_id,
2369                project_info,
2370                rate_limits
2371            ),
2372            ProcessingGroup::ClientReport => run!(
2373                process_client_reports,
2374                self.inner.config.clone(),
2375                project_info,
2376                rate_limits
2377            ),
2378            ProcessingGroup::Replay => {
2379                run!(
2380                    process_replays,
2381                    self.inner.config.clone(),
2382                    project_info,
2383                    rate_limits
2384                )
2385            }
2386            ProcessingGroup::CheckIn => {
2387                run!(process_checkins, project_id, project_info, rate_limits)
2388            }
2389            ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
2390            ProcessingGroup::Log => {
2391                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
2392                    .await
2393            }
2394            ProcessingGroup::SpanV2 => {
2395                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
2396                    .await
2397            }
2398            ProcessingGroup::Span => run!(
2399                process_standalone_spans,
2400                self.inner.config.clone(),
2401                project_id,
2402                project_info,
2403                sampling_project_info,
2404                rate_limits,
2405                reservoir_counters
2406            ),
2407            ProcessingGroup::ProfileChunk => {
2408                run!(process_profile_chunks, project_info, rate_limits)
2409            }
2410            // Currently is not used.
2411            ProcessingGroup::Metrics => {
2412                // In proxy mode we simply forward the metrics.
2413                // This group shouldn't be used outside of proxy mode.
2414                if self.inner.config.relay_mode() != RelayMode::Proxy {
2415                    relay_log::error!(
2416                        tags.project = %project_id,
2417                        items = ?managed_envelope.envelope().items().next().map(Item::ty),
2418                        "received metrics in the process_state"
2419                    );
2420                }
2421
2422                Ok(ProcessingResult::no_metrics(
2423                    managed_envelope.into_processed(),
2424                ))
2425            }
2426            // Fallback to the legacy process_state implementation for Ungrouped events.
2427            ProcessingGroup::Ungrouped => {
2428                relay_log::error!(
2429                    tags.project = %project_id,
2430                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
2431                    "could not identify the processing group based on the envelope's items"
2432                );
2433
2434                Ok(ProcessingResult::no_metrics(
2435                    managed_envelope.into_processed(),
2436                ))
2437            }
2438            // Leave this group unchanged.
2439            //
2440            // This will later be forwarded to upstream.
2441            ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
2442                managed_envelope.into_processed(),
2443            )),
2444        }
2445    }
2446
2447    /// Processes the envelope and returns the processed envelope back.
2448    ///
2449    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
2450    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
2451    /// to be dropped, this is `None`.
2452    async fn process(
2453        &self,
2454        cogs: &mut Token,
2455        mut message: ProcessEnvelopeGrouped,
2456    ) -> Result<Option<Submit>, ProcessingError> {
2457        let ProcessEnvelopeGrouped {
2458            ref mut envelope,
2459            ref project_info,
2460            ref sampling_project_info,
2461            ..
2462        } = message;
2463
2464        // Prefer the project's project ID, and fall back to the stated project id from the
2465        // envelope. The project ID is available in all modes, other than in proxy mode, where
2466        // envelopes for unknown projects are forwarded blindly.
2467        //
2468        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
2469        // since we cannot process an envelope without project ID, so drop it.
2470        let Some(project_id) = project_info
2471            .project_id
2472            .or_else(|| envelope.envelope().meta().project_id())
2473        else {
2474            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2475            return Err(ProcessingError::MissingProjectId);
2476        };
2477
2478        let client = envelope.envelope().meta().client().map(str::to_owned);
2479        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2480        let project_key = envelope.envelope().meta().public_key();
2481        // Only allow sending to the sampling key, if we successfully loaded a sampling project
2482        // info relating to it. This filters out unknown/invalid project keys as well as project
2483        // keys from different organizations.
2484        let sampling_key = envelope
2485            .envelope()
2486            .sampling_key()
2487            .filter(|_| sampling_project_info.is_some());
2488
2489        // We set additional information on the scope, which will be removed after processing the
2490        // envelope.
2491        relay_log::configure_scope(|scope| {
2492            scope.set_tag("project", project_id);
2493            if let Some(client) = client {
2494                scope.set_tag("sdk", client);
2495            }
2496            if let Some(user_agent) = user_agent {
2497                scope.set_extra("user_agent", user_agent.into());
2498            }
2499        });
2500
2501        let result = match self.process_envelope(cogs, project_id, message).await {
2502            Ok(ProcessingResult::Envelope {
2503                mut managed_envelope,
2504                extracted_metrics,
2505            }) => {
2506                // The envelope could be modified or even emptied during processing, which
2507                // requires re-computation of the context.
2508                managed_envelope.update();
2509
2510                let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2511                send_metrics(
2512                    extracted_metrics.metrics,
2513                    project_key,
2514                    sampling_key,
2515                    &self.inner.addrs.aggregator,
2516                );
2517
2518                let envelope_response = if managed_envelope.envelope().is_empty() {
2519                    if !has_metrics {
2520                        // Individual rate limits have already been issued
2521                        managed_envelope.reject(Outcome::RateLimited(None));
2522                    } else {
2523                        managed_envelope.accept();
2524                    }
2525
2526                    None
2527                } else {
2528                    Some(managed_envelope)
2529                };
2530
2531                Ok(envelope_response.map(Submit::Envelope))
2532            }
2533            Ok(ProcessingResult::Output(Output { main, metrics })) => {
2534                if let Some(metrics) = metrics {
2535                    metrics.accept(|metrics| {
2536                        send_metrics(
2537                            metrics,
2538                            project_key,
2539                            sampling_key,
2540                            &self.inner.addrs.aggregator,
2541                        );
2542                    });
2543                }
2544
2545                Ok(main.map(Submit::Output))
2546            }
2547            Err(err) => Err(err),
2548        };
2549
2550        relay_log::configure_scope(|scope| {
2551            scope.remove_tag("project");
2552            scope.remove_tag("sdk");
2553            scope.remove_tag("user_agent");
2554        });
2555
2556        result
2557    }
2558
2559    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2560        let project_key = message.envelope.envelope().meta().public_key();
2561        let wait_time = message.envelope.age();
2562        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2563
2564        // This COGS handling may need an overhaul in the future:
2565        // Cancel the passed in token, to start individual measurements per envelope instead.
2566        cogs.cancel();
2567
2568        let scoping = message.envelope.scoping();
2569        for (group, envelope) in ProcessingGroup::split_envelope(
2570            *message.envelope.into_envelope(),
2571            &message.project_info,
2572        ) {
2573            let mut cogs = self
2574                .inner
2575                .cogs
2576                .timed(ResourceId::Relay, AppFeature::from(group));
2577
2578            let mut envelope =
2579                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2580            envelope.scope(scoping);
2581
2582            let message = ProcessEnvelopeGrouped {
2583                group,
2584                envelope,
2585                project_info: Arc::clone(&message.project_info),
2586                rate_limits: Arc::clone(&message.rate_limits),
2587                sampling_project_info: message.sampling_project_info.as_ref().map(Arc::clone),
2588                reservoir_counters: Arc::clone(&message.reservoir_counters),
2589            };
2590
2591            let result = metric!(
2592                timer(RelayTimers::EnvelopeProcessingTime),
2593                group = group.variant(),
2594                { self.process(&mut cogs, message).await }
2595            );
2596
2597            match result {
2598                Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2599                Ok(None) => {}
2600                Err(error) if error.is_unexpected() => {
2601                    relay_log::error!(
2602                        tags.project_key = %project_key,
2603                        error = &error as &dyn Error,
2604                        "error processing envelope"
2605                    )
2606                }
2607                Err(error) => {
2608                    relay_log::debug!(
2609                        tags.project_key = %project_key,
2610                        error = &error as &dyn Error,
2611                        "error processing envelope"
2612                    )
2613                }
2614            }
2615        }
2616    }
2617
2618    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2619        let ProcessMetrics {
2620            data,
2621            project_key,
2622            received_at,
2623            sent_at,
2624            source,
2625        } = message;
2626
2627        let received_timestamp =
2628            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2629
2630        let mut buckets = data.into_buckets(received_timestamp);
2631        if buckets.is_empty() {
2632            return;
2633        };
2634        cogs.update(relay_metrics::cogs::BySize(&buckets));
2635
2636        let clock_drift_processor =
2637            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2638
2639        buckets.retain_mut(|bucket| {
2640            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2641                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2642                return false;
2643            }
2644
2645            if !self::metrics::is_valid_namespace(bucket) {
2646                return false;
2647            }
2648
2649            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2650
2651            if !matches!(source, BucketSource::Internal) {
2652                bucket.metadata = BucketMetadata::new(received_timestamp);
2653            }
2654
2655            true
2656        });
2657
2658        let project = self.inner.project_cache.get(project_key);
2659
2660        // Best effort check to filter and rate limit buckets, if there is no project state
2661        // available at the current time, we will check again after flushing.
2662        let buckets = match project.state() {
2663            ProjectState::Enabled(project_info) => {
2664                let rate_limits = project.rate_limits().current_limits();
2665                self.check_buckets(project_key, project_info, &rate_limits, buckets)
2666            }
2667            _ => buckets,
2668        };
2669
2670        relay_log::trace!("merging metric buckets into the aggregator");
2671        self.inner
2672            .addrs
2673            .aggregator
2674            .send(MergeBuckets::new(project_key, buckets));
2675    }
2676
2677    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2678        let ProcessBatchedMetrics {
2679            payload,
2680            source,
2681            received_at,
2682            sent_at,
2683        } = message;
2684
2685        #[derive(serde::Deserialize)]
2686        struct Wrapper {
2687            buckets: HashMap<ProjectKey, Vec<Bucket>>,
2688        }
2689
2690        let buckets = match serde_json::from_slice(&payload) {
2691            Ok(Wrapper { buckets }) => buckets,
2692            Err(error) => {
2693                relay_log::debug!(
2694                    error = &error as &dyn Error,
2695                    "failed to parse batched metrics",
2696                );
2697                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2698                return;
2699            }
2700        };
2701
2702        for (project_key, buckets) in buckets {
2703            self.handle_process_metrics(
2704                cogs,
2705                ProcessMetrics {
2706                    data: MetricData::Parsed(buckets),
2707                    project_key,
2708                    source,
2709                    received_at,
2710                    sent_at,
2711                },
2712            )
2713        }
2714    }
2715
2716    fn submit_upstream(&self, cogs: &mut Token, submit: Submit) {
2717        let _submit = cogs.start_category("submit");
2718
2719        #[cfg(feature = "processing")]
2720        if self.inner.config.processing_enabled()
2721            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2722        {
2723            match submit {
2724                Submit::Envelope(envelope) => store_forwarder.send(StoreEnvelope { envelope }),
2725                Submit::Output(output) => output
2726                    .forward_store(store_forwarder)
2727                    .unwrap_or_else(|err| err.into_inner()),
2728            }
2729            return;
2730        }
2731
2732        let mut envelope = match submit {
2733            Submit::Envelope(envelope) => envelope,
2734            Submit::Output(output) => match output.serialize_envelope() {
2735                Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2736                Err(_) => {
2737                    relay_log::error!("failed to serialize output to an envelope");
2738                    return;
2739                }
2740            },
2741        };
2742
2743        if envelope.envelope_mut().is_empty() {
2744            envelope.accept();
2745            return;
2746        }
2747
2748        // Override the `sent_at` timestamp. Since the envelope went through basic
2749        // normalization, all timestamps have been corrected. We propagate the new
2750        // `sent_at` to allow the next Relay to double-check this timestamp and
2751        // potentially apply correction again. This is done as close to sending as
2752        // possible so that we avoid internal delays.
2753        envelope.envelope_mut().set_sent_at(Utc::now());
2754
2755        relay_log::trace!("sending envelope to sentry endpoint");
2756        let http_encoding = self.inner.config.http_encoding();
2757        let result = envelope.envelope().to_vec().and_then(|v| {
2758            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2759        });
2760
2761        match result {
2762            Ok(body) => {
2763                self.inner
2764                    .addrs
2765                    .upstream_relay
2766                    .send(SendRequest(SendEnvelope {
2767                        envelope,
2768                        body,
2769                        http_encoding,
2770                        project_cache: self.inner.project_cache.clone(),
2771                    }));
2772            }
2773            Err(error) => {
2774                // Errors are only logged for what we consider an internal discard reason. These
2775                // indicate errors in the infrastructure or implementation bugs.
2776                relay_log::error!(
2777                    error = &error as &dyn Error,
2778                    tags.project_key = %envelope.scoping().project_key,
2779                    "failed to serialize envelope payload"
2780                );
2781
2782                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2783            }
2784        }
2785    }
2786
2787    fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2788        let SubmitClientReports {
2789            client_reports,
2790            scoping,
2791        } = message;
2792
2793        let upstream = self.inner.config.upstream_descriptor();
2794        let dsn = PartialDsn::outbound(&scoping, upstream);
2795
2796        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2797        for client_report in client_reports {
2798            let mut item = Item::new(ItemType::ClientReport);
2799            item.set_payload(ContentType::Json, client_report.serialize().unwrap()); // TODO: unwrap OK?
2800            envelope.add_item(item);
2801        }
2802
2803        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2804        self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2805    }
2806
2807    fn check_buckets(
2808        &self,
2809        project_key: ProjectKey,
2810        project_info: &ProjectInfo,
2811        rate_limits: &RateLimits,
2812        buckets: Vec<Bucket>,
2813    ) -> Vec<Bucket> {
2814        let Some(scoping) = project_info.scoping(project_key) else {
2815            relay_log::error!(
2816                tags.project_key = project_key.as_str(),
2817                "there is no scoping: dropping {} buckets",
2818                buckets.len(),
2819            );
2820            return Vec::new();
2821        };
2822
2823        let mut buckets = self::metrics::apply_project_info(
2824            buckets,
2825            &self.inner.metric_outcomes,
2826            project_info,
2827            scoping,
2828        );
2829
2830        let namespaces: BTreeSet<MetricNamespace> = buckets
2831            .iter()
2832            .filter_map(|bucket| bucket.name.try_namespace())
2833            .collect();
2834
2835        for namespace in namespaces {
2836            let limits = rate_limits.check_with_quotas(
2837                project_info.get_quotas(),
2838                scoping.item(DataCategory::MetricBucket),
2839            );
2840
2841            if limits.is_limited() {
2842                let rejected;
2843                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2844                    bucket.name.try_namespace() == Some(namespace)
2845                });
2846
2847                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2848                self.inner.metric_outcomes.track(
2849                    scoping,
2850                    &rejected,
2851                    Outcome::RateLimited(reason_code),
2852                );
2853            }
2854        }
2855
2856        let quotas = project_info.config.quotas.clone();
2857        match MetricsLimiter::create(buckets, quotas, scoping) {
2858            Ok(mut bucket_limiter) => {
2859                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2860                bucket_limiter.into_buckets()
2861            }
2862            Err(buckets) => buckets,
2863        }
2864    }
2865
2866    #[cfg(feature = "processing")]
2867    async fn rate_limit_buckets(
2868        &self,
2869        scoping: Scoping,
2870        project_info: &ProjectInfo,
2871        mut buckets: Vec<Bucket>,
2872    ) -> Vec<Bucket> {
2873        let Some(rate_limiter) = &self.inner.rate_limiter else {
2874            return buckets;
2875        };
2876
2877        let global_config = self.inner.global_config.current();
2878        let namespaces = buckets
2879            .iter()
2880            .filter_map(|bucket| bucket.name.try_namespace())
2881            .counts();
2882
2883        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2884
2885        for (namespace, quantity) in namespaces {
2886            let item_scoping = scoping.metric_bucket(namespace);
2887
2888            let limits = match rate_limiter
2889                .is_rate_limited(quotas, item_scoping, quantity, false)
2890                .await
2891            {
2892                Ok(limits) => limits,
2893                Err(err) => {
2894                    relay_log::error!(
2895                        error = &err as &dyn std::error::Error,
2896                        "failed to check redis rate limits"
2897                    );
2898                    break;
2899                }
2900            };
2901
2902            if limits.is_limited() {
2903                let rejected;
2904                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2905                    bucket.name.try_namespace() == Some(namespace)
2906                });
2907
2908                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2909                self.inner.metric_outcomes.track(
2910                    scoping,
2911                    &rejected,
2912                    Outcome::RateLimited(reason_code),
2913                );
2914
2915                self.inner
2916                    .project_cache
2917                    .get(item_scoping.scoping.project_key)
2918                    .rate_limits()
2919                    .merge(limits);
2920            }
2921        }
2922
2923        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2924            Err(buckets) => buckets,
2925            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2926        }
2927    }
2928
2929    /// Check and apply rate limits to metrics buckets for transactions and spans.
2930    #[cfg(feature = "processing")]
2931    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2932        relay_log::trace!("handle_rate_limit_buckets");
2933
2934        let scoping = *bucket_limiter.scoping();
2935
2936        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2937            let global_config = self.inner.global_config.current();
2938            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2939
2940            // We set over_accept_once such that the limit is actually reached, which allows subsequent
2941            // calls with quantity=0 to be rate limited.
2942            let over_accept_once = true;
2943            let mut rate_limits = RateLimits::new();
2944
2945            for category in [DataCategory::Transaction, DataCategory::Span] {
2946                let count = bucket_limiter.count(category);
2947
2948                let timer = Instant::now();
2949                let mut is_limited = false;
2950
2951                if let Some(count) = count {
2952                    match rate_limiter
2953                        .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2954                        .await
2955                    {
2956                        Ok(limits) => {
2957                            is_limited = limits.is_limited();
2958                            rate_limits.merge(limits)
2959                        }
2960                        Err(e) => relay_log::error!(error = &e as &dyn Error),
2961                    }
2962                }
2963
2964                relay_statsd::metric!(
2965                    timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2966                    category = category.name(),
2967                    limited = if is_limited { "true" } else { "false" },
2968                    count = match count {
2969                        None => "none",
2970                        Some(0) => "0",
2971                        Some(1) => "1",
2972                        Some(1..=10) => "10",
2973                        Some(1..=25) => "25",
2974                        Some(1..=50) => "50",
2975                        Some(51..=100) => "100",
2976                        Some(101..=500) => "500",
2977                        _ => "> 500",
2978                    },
2979                );
2980            }
2981
2982            if rate_limits.is_limited() {
2983                let was_enforced =
2984                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2985
2986                if was_enforced {
2987                    // Update the rate limits in the project cache.
2988                    self.inner
2989                        .project_cache
2990                        .get(scoping.project_key)
2991                        .rate_limits()
2992                        .merge(rate_limits);
2993                }
2994            }
2995        }
2996
2997        bucket_limiter.into_buckets()
2998    }
2999
3000    /// Cardinality limits the passed buckets and returns a filtered vector of only accepted buckets.
3001    #[cfg(feature = "processing")]
3002    async fn cardinality_limit_buckets(
3003        &self,
3004        scoping: Scoping,
3005        limits: &[CardinalityLimit],
3006        buckets: Vec<Bucket>,
3007    ) -> Vec<Bucket> {
3008        let global_config = self.inner.global_config.current();
3009        let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
3010
3011        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
3012            return buckets;
3013        }
3014
3015        let Some(ref limiter) = self.inner.cardinality_limiter else {
3016            return buckets;
3017        };
3018
3019        let scope = relay_cardinality::Scoping {
3020            organization_id: scoping.organization_id,
3021            project_id: scoping.project_id,
3022        };
3023
3024        let limits = match limiter
3025            .check_cardinality_limits(scope, limits, buckets)
3026            .await
3027        {
3028            Ok(limits) => limits,
3029            Err((buckets, error)) => {
3030                relay_log::error!(
3031                    error = &error as &dyn std::error::Error,
3032                    "cardinality limiter failed"
3033                );
3034                return buckets;
3035            }
3036        };
3037
3038        let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
3039        if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
3040            for limit in limits.exceeded_limits() {
3041                relay_log::with_scope(
3042                    |scope| {
3043                        // Set the organization as user so we can alert on distinct org_ids.
3044                        scope.set_user(Some(relay_log::sentry::User {
3045                            id: Some(scoping.organization_id.to_string()),
3046                            ..Default::default()
3047                        }));
3048                    },
3049                    || {
3050                        relay_log::error!(
3051                            tags.organization_id = scoping.organization_id.value(),
3052                            tags.limit_id = limit.id,
3053                            tags.passive = limit.passive,
3054                            "Cardinality Limit"
3055                        );
3056                    },
3057                );
3058            }
3059        }
3060
3061        for (limit, reports) in limits.cardinality_reports() {
3062            for report in reports {
3063                self.inner
3064                    .metric_outcomes
3065                    .cardinality(scoping, limit, report);
3066            }
3067        }
3068
3069        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
3070            return limits.into_source();
3071        }
3072
3073        let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
3074
3075        for (bucket, exceeded) in rejected {
3076            self.inner.metric_outcomes.track(
3077                scoping,
3078                &[bucket],
3079                Outcome::CardinalityLimited(exceeded.id.clone()),
3080            );
3081        }
3082        accepted
3083    }
3084
3085    /// Processes metric buckets and sends them to kafka.
3086    ///
3087    /// This function runs the following steps:
3088    ///  - cardinality limiting
3089    ///  - rate limiting
3090    ///  - submit to `StoreForwarder`
3091    #[cfg(feature = "processing")]
3092    async fn encode_metrics_processing(
3093        &self,
3094        message: FlushBuckets,
3095        store_forwarder: &Addr<Store>,
3096    ) {
3097        use crate::constants::DEFAULT_EVENT_RETENTION;
3098        use crate::services::store::StoreMetrics;
3099
3100        for ProjectBuckets {
3101            buckets,
3102            scoping,
3103            project_info,
3104            ..
3105        } in message.buckets.into_values()
3106        {
3107            let buckets = self
3108                .rate_limit_buckets(scoping, &project_info, buckets)
3109                .await;
3110
3111            let limits = project_info.get_cardinality_limits();
3112            let buckets = self
3113                .cardinality_limit_buckets(scoping, limits, buckets)
3114                .await;
3115
3116            if buckets.is_empty() {
3117                continue;
3118            }
3119
3120            let retention = project_info
3121                .config
3122                .event_retention
3123                .unwrap_or(DEFAULT_EVENT_RETENTION);
3124
3125            // The store forwarder takes care of bucket splitting internally, so we can submit the
3126            // entire list of buckets. There is no batching needed here.
3127            store_forwarder.send(StoreMetrics {
3128                buckets,
3129                scoping,
3130                retention,
3131            });
3132        }
3133    }
3134
3135    /// Serializes metric buckets to JSON and sends them to the upstream.
3136    ///
3137    /// This function runs the following steps:
3138    ///  - partitioning
3139    ///  - batching by configured size limit
3140    ///  - serialize to JSON and pack in an envelope
3141    ///  - submit the envelope to upstream or kafka depending on configuration
3142    ///
3143    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
3144    /// access to the central Redis instance. Cached rate limits are applied in the project cache
3145    /// already.
3146    fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
3147        let FlushBuckets {
3148            partition_key,
3149            buckets,
3150        } = message;
3151
3152        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
3153        let upstream = self.inner.config.upstream_descriptor();
3154
3155        for ProjectBuckets {
3156            buckets, scoping, ..
3157        } in buckets.values()
3158        {
3159            let dsn = PartialDsn::outbound(scoping, upstream);
3160
3161            relay_statsd::metric!(
3162                histogram(RelayHistograms::PartitionKeys) = u64::from(partition_key)
3163            );
3164
3165            let mut num_batches = 0;
3166            for batch in BucketsView::from(buckets).by_size(batch_size) {
3167                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
3168
3169                let mut item = Item::new(ItemType::MetricBuckets);
3170                item.set_source_quantities(crate::metrics::extract_quantities(batch));
3171                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
3172                envelope.add_item(item);
3173
3174                let mut envelope =
3175                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
3176                envelope
3177                    .set_partition_key(Some(partition_key))
3178                    .scope(*scoping);
3179
3180                relay_statsd::metric!(
3181                    histogram(RelayHistograms::BucketsPerBatch) = batch.len() as u64
3182                );
3183
3184                self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
3185                num_batches += 1;
3186            }
3187
3188            relay_statsd::metric!(histogram(RelayHistograms::BatchesPerPartition) = num_batches);
3189        }
3190    }
3191
3192    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
3193    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
3194        if partition.is_empty() {
3195            return;
3196        }
3197
3198        let (unencoded, project_info) = partition.take();
3199        let http_encoding = self.inner.config.http_encoding();
3200        let encoded = match encode_payload(&unencoded, http_encoding) {
3201            Ok(payload) => payload,
3202            Err(error) => {
3203                let error = &error as &dyn std::error::Error;
3204                relay_log::error!(error, "failed to encode metrics payload");
3205                return;
3206            }
3207        };
3208
3209        let request = SendMetricsRequest {
3210            partition_key: partition_key.to_string(),
3211            unencoded,
3212            encoded,
3213            project_info,
3214            http_encoding,
3215            metric_outcomes: self.inner.metric_outcomes.clone(),
3216        };
3217
3218        self.inner.addrs.upstream_relay.send(SendRequest(request));
3219    }
3220
3221    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
3222    ///
3223    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
3224    /// payload directly instead of per-project Envelopes.
3225    ///
3226    /// This function runs the following steps:
3227    ///  - partitioning
3228    ///  - batching by configured size limit
3229    ///  - serialize to JSON
3230    ///  - submit directly to the upstream
3231    ///
3232    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
3233    /// access to the central Redis instance. Cached rate limits are applied in the project cache
3234    /// already.
3235    fn encode_metrics_global(&self, message: FlushBuckets) {
3236        let FlushBuckets {
3237            partition_key,
3238            buckets,
3239        } = message;
3240
3241        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
3242        let mut partition = Partition::new(batch_size);
3243        let mut partition_splits = 0;
3244
3245        for ProjectBuckets {
3246            buckets, scoping, ..
3247        } in buckets.values()
3248        {
3249            for bucket in buckets {
3250                let mut remaining = Some(BucketView::new(bucket));
3251
3252                while let Some(bucket) = remaining.take() {
3253                    if let Some(next) = partition.insert(bucket, *scoping) {
3254                        // A part of the bucket could not be inserted. Take the partition and submit
3255                        // it immediately. Repeat until the final part was inserted. This should
3256                        // always result in a request, otherwise we would enter an endless loop.
3257                        self.send_global_partition(partition_key, &mut partition);
3258                        remaining = Some(next);
3259                        partition_splits += 1;
3260                    }
3261                }
3262            }
3263        }
3264
3265        if partition_splits > 0 {
3266            metric!(histogram(RelayHistograms::PartitionSplits) = partition_splits);
3267        }
3268
3269        self.send_global_partition(partition_key, &mut partition);
3270    }
3271
3272    async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
3273        for (project_key, pb) in message.buckets.iter_mut() {
3274            let buckets = std::mem::take(&mut pb.buckets);
3275            pb.buckets =
3276                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
3277        }
3278
3279        #[cfg(feature = "processing")]
3280        if self.inner.config.processing_enabled()
3281            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
3282        {
3283            return self
3284                .encode_metrics_processing(message, store_forwarder)
3285                .await;
3286        }
3287
3288        if self.inner.config.http_global_metrics() {
3289            self.encode_metrics_global(message)
3290        } else {
3291            self.encode_metrics_envelope(cogs, message)
3292        }
3293    }
3294
3295    #[cfg(all(test, feature = "processing"))]
3296    fn redis_rate_limiter_enabled(&self) -> bool {
3297        self.inner.rate_limiter.is_some()
3298    }
3299
3300    async fn handle_message(&self, message: EnvelopeProcessor) {
3301        let ty = message.variant();
3302        let feature_weights = self.feature_weights(&message);
3303
3304        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
3305            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
3306
3307            match message {
3308                EnvelopeProcessor::ProcessEnvelope(m) => {
3309                    self.handle_process_envelope(&mut cogs, *m).await
3310                }
3311                EnvelopeProcessor::ProcessProjectMetrics(m) => {
3312                    self.handle_process_metrics(&mut cogs, *m)
3313                }
3314                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
3315                    self.handle_process_batched_metrics(&mut cogs, *m)
3316                }
3317                EnvelopeProcessor::FlushBuckets(m) => {
3318                    self.handle_flush_buckets(&mut cogs, *m).await
3319                }
3320                EnvelopeProcessor::SubmitClientReports(m) => {
3321                    self.handle_submit_client_reports(&mut cogs, *m)
3322                }
3323            }
3324        });
3325    }
3326
3327    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
3328        match message {
3329            // Envelope is split later and tokens are attributed then.
3330            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
3331            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
3332            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
3333            EnvelopeProcessor::FlushBuckets(v) => v
3334                .buckets
3335                .values()
3336                .map(|s| {
3337                    if self.inner.config.processing_enabled() {
3338                        // Processing does not encode the metrics but instead rate and cardinality
3339                        // limits the metrics, which scales by count and not size.
3340                        relay_metrics::cogs::ByCount(&s.buckets).into()
3341                    } else {
3342                        relay_metrics::cogs::BySize(&s.buckets).into()
3343                    }
3344                })
3345                .fold(FeatureWeights::none(), FeatureWeights::merge),
3346            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
3347        }
3348    }
3349
3350    fn new_reservoir_evaluator(
3351        &self,
3352        _organization_id: OrganizationId,
3353        reservoir_counters: ReservoirCounters,
3354    ) -> ReservoirEvaluator<'_> {
3355        #[cfg_attr(not(feature = "processing"), expect(unused_mut))]
3356        let mut reservoir = ReservoirEvaluator::new(reservoir_counters);
3357
3358        #[cfg(feature = "processing")]
3359        if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
3360            reservoir.set_redis(_organization_id, quotas_client);
3361        }
3362
3363        reservoir
3364    }
3365}
3366
3367impl Service for EnvelopeProcessorService {
3368    type Interface = EnvelopeProcessor;
3369
3370    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
3371        while let Some(message) = rx.recv().await {
3372            let service = self.clone();
3373            self.inner
3374                .pool
3375                .spawn_async(
3376                    async move {
3377                        service.handle_message(message).await;
3378                    }
3379                    .boxed(),
3380                )
3381                .await;
3382        }
3383    }
3384}
3385
3386/// Result of the enforcement of rate limiting.
3387///
3388/// If the event is already `None` or it's rate limited, it will be `None`
3389/// within the [`Annotated`].
3390struct EnforcementResult {
3391    event: Annotated<Event>,
3392    #[cfg_attr(not(feature = "processing"), expect(dead_code))]
3393    rate_limits: RateLimits,
3394}
3395
3396impl EnforcementResult {
3397    /// Creates a new [`EnforcementResult`].
3398    pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
3399        Self { event, rate_limits }
3400    }
3401}
3402
3403#[derive(Clone)]
3404enum RateLimiter {
3405    Cached,
3406    #[cfg(feature = "processing")]
3407    Consistent(Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>),
3408}
3409
3410impl RateLimiter {
3411    async fn enforce<Group>(
3412        &self,
3413        managed_envelope: &mut TypedEnvelope<Group>,
3414        event: Annotated<Event>,
3415        _extracted_metrics: &mut ProcessingExtractedMetrics,
3416        global_config: &GlobalConfig,
3417        project_info: &ProjectInfo,
3418        rate_limits: &RateLimits,
3419    ) -> Result<EnforcementResult, ProcessingError> {
3420        if managed_envelope.envelope().is_empty() && event.value().is_none() {
3421            return Ok(EnforcementResult::new(event, RateLimits::default()));
3422        }
3423
3424        let quotas = CombinedQuotas::new(global_config, project_info.get_quotas());
3425        if quotas.is_empty() {
3426            return Ok(EnforcementResult::new(event, RateLimits::default()));
3427        }
3428
3429        let event_category = event_category(&event);
3430
3431        // We extract the rate limiters, in case we perform consistent rate limiting, since we will
3432        // need Redis access.
3433        //
3434        // When invoking the rate limiter, capture if the event item has been rate limited to also
3435        // remove it from the processing state eventually.
3436        let this = self.clone();
3437        let rate_limits_clone = rate_limits.clone();
3438        let mut envelope_limiter =
3439            EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
3440                let this = this.clone();
3441                let rate_limits_clone = rate_limits_clone.clone();
3442
3443                async move {
3444                    match this {
3445                        #[cfg(feature = "processing")]
3446                        RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
3447                            rate_limiter
3448                                .is_rate_limited(quotas, item_scope, _quantity, false)
3449                                .await?,
3450                        ),
3451                        _ => Ok::<_, ProcessingError>(
3452                            rate_limits_clone.check_with_quotas(quotas, item_scope),
3453                        ),
3454                    }
3455                }
3456            });
3457
3458        // Tell the envelope limiter about the event, since it has been removed from the Envelope at
3459        // this stage in processing.
3460        if let Some(category) = event_category {
3461            envelope_limiter.assume_event(category);
3462        }
3463
3464        let scoping = managed_envelope.scoping();
3465        let (enforcement, rate_limits) =
3466            metric!(timer(RelayTimers::EventProcessingRateLimiting), {
3467                envelope_limiter
3468                    .compute(managed_envelope.envelope_mut(), &scoping)
3469                    .await
3470            })?;
3471        let event_active = enforcement.is_event_active();
3472
3473        // Use the same rate limits as used for the envelope on the metrics.
3474        // Those rate limits should not be checked for expiry or similar to ensure a consistent
3475        // limiting of envelope items and metrics.
3476        #[cfg(feature = "processing")]
3477        _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3478        enforcement.apply_with_outcomes(managed_envelope);
3479
3480        if event_active {
3481            debug_assert!(managed_envelope.envelope().is_empty());
3482            return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3483        }
3484
3485        Ok(EnforcementResult::new(event, rate_limits))
3486    }
3487}
3488
3489fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3490    let envelope_body: Vec<u8> = match http_encoding {
3491        HttpEncoding::Identity => return Ok(body.clone()),
3492        HttpEncoding::Deflate => {
3493            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3494            encoder.write_all(body.as_ref())?;
3495            encoder.finish()?
3496        }
3497        HttpEncoding::Gzip => {
3498            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3499            encoder.write_all(body.as_ref())?;
3500            encoder.finish()?
3501        }
3502        HttpEncoding::Br => {
3503            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
3504            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3505            encoder.write_all(body.as_ref())?;
3506            encoder.into_inner()
3507        }
3508        HttpEncoding::Zstd => {
3509            // Use the fastest compression level, our main objective here is to get the best
3510            // compression ratio for least amount of time spent.
3511            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3512            encoder.write_all(body.as_ref())?;
3513            encoder.finish()?
3514        }
3515    };
3516
3517    Ok(envelope_body.into())
3518}
3519
3520/// An upstream request that submits an envelope via HTTP.
3521#[derive(Debug)]
3522pub struct SendEnvelope {
3523    envelope: TypedEnvelope<Processed>,
3524    body: Bytes,
3525    http_encoding: HttpEncoding,
3526    project_cache: ProjectCacheHandle,
3527}
3528
3529impl UpstreamRequest for SendEnvelope {
3530    fn method(&self) -> reqwest::Method {
3531        reqwest::Method::POST
3532    }
3533
3534    fn path(&self) -> Cow<'_, str> {
3535        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3536    }
3537
3538    fn route(&self) -> &'static str {
3539        "envelope"
3540    }
3541
3542    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3543        let envelope_body = self.body.clone();
3544        metric!(histogram(RelayHistograms::UpstreamEnvelopeBodySize) = envelope_body.len() as u64);
3545
3546        let meta = &self.envelope.meta();
3547        let shard = self.envelope.partition_key().map(|p| p.to_string());
3548        builder
3549            .content_encoding(self.http_encoding)
3550            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3551            .header_opt("User-Agent", meta.user_agent())
3552            .header("X-Sentry-Auth", meta.auth_header())
3553            .header("X-Forwarded-For", meta.forwarded_for())
3554            .header("Content-Type", envelope::CONTENT_TYPE)
3555            .header_opt("X-Sentry-Relay-Shard", shard)
3556            .body(envelope_body);
3557
3558        Ok(())
3559    }
3560
3561    fn sign(&mut self) -> Option<Sign> {
3562        Some(Sign::Optional(SignatureType::RequestSign))
3563    }
3564
3565    fn respond(
3566        self: Box<Self>,
3567        result: Result<http::Response, UpstreamRequestError>,
3568    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3569        Box::pin(async move {
3570            let result = match result {
3571                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3572                Err(error) => Err(error),
3573            };
3574
3575            match result {
3576                Ok(()) => self.envelope.accept(),
3577                Err(error) if error.is_received() => {
3578                    let scoping = self.envelope.scoping();
3579                    self.envelope.accept();
3580
3581                    if let UpstreamRequestError::RateLimited(limits) = error {
3582                        self.project_cache
3583                            .get(scoping.project_key)
3584                            .rate_limits()
3585                            .merge(limits.scope(&scoping));
3586                    }
3587                }
3588                Err(error) => {
3589                    // Errors are only logged for what we consider an internal discard reason. These
3590                    // indicate errors in the infrastructure or implementation bugs.
3591                    let mut envelope = self.envelope;
3592                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3593                    relay_log::error!(
3594                        error = &error as &dyn Error,
3595                        tags.project_key = %envelope.scoping().project_key,
3596                        "error sending envelope"
3597                    );
3598                }
3599            }
3600        })
3601    }
3602}
3603
3604/// A container for metric buckets from multiple projects.
3605///
3606/// This container is used to send metrics to the upstream in global batches as part of the
3607/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
3608/// the size of all metrics and allows to split them into multiple batches. See
3609/// [`insert`](Self::insert) for more information.
3610#[derive(Debug)]
3611struct Partition<'a> {
3612    max_size: usize,
3613    remaining: usize,
3614    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3615    project_info: HashMap<ProjectKey, Scoping>,
3616}
3617
3618impl<'a> Partition<'a> {
3619    /// Creates a new partition with the given maximum size in bytes.
3620    pub fn new(size: usize) -> Self {
3621        Self {
3622            max_size: size,
3623            remaining: size,
3624            views: HashMap::new(),
3625            project_info: HashMap::new(),
3626        }
3627    }
3628
3629    /// Inserts a bucket into the partition, splitting it if necessary.
3630    ///
3631    /// This function attempts to add the bucket to this partition. If the bucket does not fit
3632    /// entirely into the partition given its maximum size, the remaining part of the bucket is
3633    /// returned from this function call.
3634    ///
3635    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
3636    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
3637    /// partition. Afterwards, the caller is responsible to call this function again with the
3638    /// remaining bucket until it is fully inserted.
3639    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3640        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3641
3642        if let Some(current) = current {
3643            self.remaining = self.remaining.saturating_sub(current.estimated_size());
3644            self.views
3645                .entry(scoping.project_key)
3646                .or_default()
3647                .push(current);
3648
3649            self.project_info
3650                .entry(scoping.project_key)
3651                .or_insert(scoping);
3652        }
3653
3654        next
3655    }
3656
3657    /// Returns `true` if the partition does not hold any data.
3658    fn is_empty(&self) -> bool {
3659        self.views.is_empty()
3660    }
3661
3662    /// Returns the serialized buckets for this partition.
3663    ///
3664    /// This empties the partition, so that it can be reused.
3665    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3666        #[derive(serde::Serialize)]
3667        struct Wrapper<'a> {
3668            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3669        }
3670
3671        let buckets = &self.views;
3672        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3673
3674        let scopings = self.project_info.clone();
3675        self.project_info.clear();
3676
3677        self.views.clear();
3678        self.remaining = self.max_size;
3679
3680        (payload, scopings)
3681    }
3682}
3683
3684/// An upstream request that submits metric buckets via HTTP.
3685///
3686/// This request is not awaited. It automatically tracks outcomes if the request is not received.
3687#[derive(Debug)]
3688struct SendMetricsRequest {
3689    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
3690    partition_key: String,
3691    /// Serialized metric buckets without encoding applied, used for signing.
3692    unencoded: Bytes,
3693    /// Serialized metric buckets with the stated HTTP encoding applied.
3694    encoded: Bytes,
3695    /// Mapping of all contained project keys to their scoping and extraction mode.
3696    ///
3697    /// Used to track outcomes for transmission failures.
3698    project_info: HashMap<ProjectKey, Scoping>,
3699    /// Encoding (compression) of the payload.
3700    http_encoding: HttpEncoding,
3701    /// Metric outcomes instance to send outcomes on error.
3702    metric_outcomes: MetricOutcomes,
3703}
3704
3705impl SendMetricsRequest {
3706    fn create_error_outcomes(self) {
3707        #[derive(serde::Deserialize)]
3708        struct Wrapper {
3709            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3710        }
3711
3712        let buckets = match serde_json::from_slice(&self.unencoded) {
3713            Ok(Wrapper { buckets }) => buckets,
3714            Err(err) => {
3715                relay_log::error!(
3716                    error = &err as &dyn std::error::Error,
3717                    "failed to parse buckets from failed transmission"
3718                );
3719                return;
3720            }
3721        };
3722
3723        for (key, buckets) in buckets {
3724            let Some(&scoping) = self.project_info.get(&key) else {
3725                relay_log::error!("missing scoping for project key");
3726                continue;
3727            };
3728
3729            self.metric_outcomes.track(
3730                scoping,
3731                &buckets,
3732                Outcome::Invalid(DiscardReason::Internal),
3733            );
3734        }
3735    }
3736}
3737
3738impl UpstreamRequest for SendMetricsRequest {
3739    fn set_relay_id(&self) -> bool {
3740        true
3741    }
3742
3743    fn sign(&mut self) -> Option<Sign> {
3744        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3745    }
3746
3747    fn method(&self) -> reqwest::Method {
3748        reqwest::Method::POST
3749    }
3750
3751    fn path(&self) -> Cow<'_, str> {
3752        "/api/0/relays/metrics/".into()
3753    }
3754
3755    fn route(&self) -> &'static str {
3756        "global_metrics"
3757    }
3758
3759    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3760        metric!(histogram(RelayHistograms::UpstreamMetricsBodySize) = self.encoded.len() as u64);
3761
3762        builder
3763            .content_encoding(self.http_encoding)
3764            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3765            .header(header::CONTENT_TYPE, b"application/json")
3766            .body(self.encoded.clone());
3767
3768        Ok(())
3769    }
3770
3771    fn respond(
3772        self: Box<Self>,
3773        result: Result<http::Response, UpstreamRequestError>,
3774    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3775        Box::pin(async {
3776            match result {
3777                Ok(mut response) => {
3778                    response.consume().await.ok();
3779                }
3780                Err(error) => {
3781                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3782
3783                    // If the request did not arrive at the upstream, we are responsible for outcomes.
3784                    // Otherwise, the upstream is responsible to log outcomes.
3785                    if error.is_received() {
3786                        return;
3787                    }
3788
3789                    self.create_error_outcomes()
3790                }
3791            }
3792        })
3793    }
3794}
3795
3796/// Container for global and project level [`Quota`].
3797#[derive(Copy, Clone, Debug)]
3798struct CombinedQuotas<'a> {
3799    global_quotas: &'a [Quota],
3800    project_quotas: &'a [Quota],
3801}
3802
3803impl<'a> CombinedQuotas<'a> {
3804    /// Returns a new [`CombinedQuotas`].
3805    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3806        Self {
3807            global_quotas: &global_config.quotas,
3808            project_quotas,
3809        }
3810    }
3811
3812    /// Returns `true` if both global quotas and project quotas are empty.
3813    pub fn is_empty(&self) -> bool {
3814        self.len() == 0
3815    }
3816
3817    /// Returns the number of both global and project quotas.
3818    pub fn len(&self) -> usize {
3819        self.global_quotas.len() + self.project_quotas.len()
3820    }
3821}
3822
3823impl<'a> IntoIterator for CombinedQuotas<'a> {
3824    type Item = &'a Quota;
3825    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3826
3827    fn into_iter(self) -> Self::IntoIter {
3828        self.global_quotas.iter().chain(self.project_quotas.iter())
3829    }
3830}
3831
3832#[cfg(test)]
3833mod tests {
3834    use std::collections::BTreeMap;
3835    use std::env;
3836
3837    use insta::assert_debug_snapshot;
3838    use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3839    use relay_common::glob2::LazyGlob;
3840    use relay_dynamic_config::ProjectConfig;
3841    use relay_event_normalization::{RedactionRule, TransactionNameRule};
3842    use relay_event_schema::protocol::TransactionSource;
3843    use relay_pii::DataScrubbingConfig;
3844    use similar_asserts::assert_eq;
3845
3846    use crate::metrics_extraction::IntoMetric;
3847    use crate::metrics_extraction::transactions::types::{
3848        CommonTags, TransactionMeasurementTags, TransactionMetric,
3849    };
3850    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3851
3852    #[cfg(feature = "processing")]
3853    use {
3854        relay_metrics::BucketValue,
3855        relay_quotas::{QuotaScope, ReasonCode},
3856        relay_test::mock_service,
3857    };
3858
3859    use super::*;
3860
3861    #[cfg(feature = "processing")]
3862    fn mock_quota(id: &str) -> Quota {
3863        Quota {
3864            id: Some(id.into()),
3865            categories: smallvec::smallvec![DataCategory::MetricBucket],
3866            scope: QuotaScope::Organization,
3867            scope_id: None,
3868            limit: Some(0),
3869            window: None,
3870            reason_code: None,
3871            namespace: None,
3872        }
3873    }
3874
3875    #[cfg(feature = "processing")]
3876    #[test]
3877    fn test_dynamic_quotas() {
3878        let global_config = GlobalConfig {
3879            quotas: vec![mock_quota("foo"), mock_quota("bar")],
3880            ..Default::default()
3881        };
3882
3883        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3884
3885        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3886
3887        assert_eq!(dynamic_quotas.len(), 4);
3888        assert!(!dynamic_quotas.is_empty());
3889
3890        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3891        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3892    }
3893
3894    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
3895    /// also ratelimit the next batches in the same message automatically.
3896    #[cfg(feature = "processing")]
3897    #[tokio::test]
3898    async fn test_ratelimit_per_batch() {
3899        use relay_base_schema::organization::OrganizationId;
3900        use relay_protocol::FiniteF64;
3901
3902        let rate_limited_org = Scoping {
3903            organization_id: OrganizationId::new(1),
3904            project_id: ProjectId::new(21),
3905            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3906            key_id: Some(17),
3907        };
3908
3909        let not_rate_limited_org = Scoping {
3910            organization_id: OrganizationId::new(2),
3911            project_id: ProjectId::new(21),
3912            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3913            key_id: Some(17),
3914        };
3915
3916        let message = {
3917            let project_info = {
3918                let quota = Quota {
3919                    id: Some("testing".into()),
3920                    categories: vec![DataCategory::MetricBucket].into(),
3921                    scope: relay_quotas::QuotaScope::Organization,
3922                    scope_id: Some(rate_limited_org.organization_id.to_string()),
3923                    limit: Some(0),
3924                    window: None,
3925                    reason_code: Some(ReasonCode::new("test")),
3926                    namespace: None,
3927                };
3928
3929                let mut config = ProjectConfig::default();
3930                config.quotas.push(quota);
3931
3932                Arc::new(ProjectInfo {
3933                    config,
3934                    ..Default::default()
3935                })
3936            };
3937
3938            let project_metrics = |scoping| ProjectBuckets {
3939                buckets: vec![Bucket {
3940                    name: "d:transactions/bar".into(),
3941                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3942                    timestamp: UnixTimestamp::now(),
3943                    tags: Default::default(),
3944                    width: 10,
3945                    metadata: BucketMetadata::default(),
3946                }],
3947                rate_limits: Default::default(),
3948                project_info: project_info.clone(),
3949                scoping,
3950            };
3951
3952            let buckets = hashbrown::HashMap::from([
3953                (
3954                    rate_limited_org.project_key,
3955                    project_metrics(rate_limited_org),
3956                ),
3957                (
3958                    not_rate_limited_org.project_key,
3959                    project_metrics(not_rate_limited_org),
3960                ),
3961            ]);
3962
3963            FlushBuckets {
3964                partition_key: 0,
3965                buckets,
3966            }
3967        };
3968
3969        // ensure the order of the map while iterating is as expected.
3970        assert_eq!(message.buckets.keys().count(), 2);
3971
3972        let config = {
3973            let config_json = serde_json::json!({
3974                "processing": {
3975                    "enabled": true,
3976                    "kafka_config": [],
3977                    "redis": {
3978                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3979                    }
3980                }
3981            });
3982            Config::from_json_value(config_json).unwrap()
3983        };
3984
3985        let (store, handle) = {
3986            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3987                let org_id = match msg {
3988                    Store::Metrics(x) => x.scoping.organization_id,
3989                    _ => panic!("received envelope when expecting only metrics"),
3990                };
3991                org_ids.push(org_id);
3992            };
3993
3994            mock_service("store_forwarder", vec![], f)
3995        };
3996
3997        let processor = create_test_processor(config).await;
3998        assert!(processor.redis_rate_limiter_enabled());
3999
4000        processor.encode_metrics_processing(message, &store).await;
4001
4002        drop(store);
4003        let orgs_not_ratelimited = handle.await.unwrap();
4004
4005        assert_eq!(
4006            orgs_not_ratelimited,
4007            vec![not_rate_limited_org.organization_id]
4008        );
4009    }
4010
4011    #[tokio::test]
4012    async fn test_browser_version_extraction_with_pii_like_data() {
4013        let processor = create_test_processor(Default::default()).await;
4014        let outcome_aggregator = Addr::dummy();
4015        let event_id = EventId::new();
4016
4017        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
4018            .parse()
4019            .unwrap();
4020
4021        let request_meta = RequestMeta::new(dsn);
4022        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
4023
4024        envelope.add_item({
4025                let mut item = Item::new(ItemType::Event);
4026                item.set_payload(
4027                    ContentType::Json,
4028                    r#"
4029                    {
4030                        "request": {
4031                            "headers": [
4032                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
4033                            ]
4034                        }
4035                    }
4036                "#,
4037                );
4038                item
4039            });
4040
4041        let mut datascrubbing_settings = DataScrubbingConfig::default();
4042        // enable all the default scrubbing
4043        datascrubbing_settings.scrub_data = true;
4044        datascrubbing_settings.scrub_defaults = true;
4045        datascrubbing_settings.scrub_ip_addresses = true;
4046
4047        // Make sure to mask any IP-like looking data
4048        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
4049
4050        let config = ProjectConfig {
4051            datascrubbing_settings,
4052            pii_config: Some(pii_config),
4053            ..Default::default()
4054        };
4055
4056        let project_info = ProjectInfo {
4057            config,
4058            ..Default::default()
4059        };
4060
4061        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
4062        assert_eq!(envelopes.len(), 1);
4063
4064        let (group, envelope) = envelopes.pop().unwrap();
4065        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
4066
4067        let message = ProcessEnvelopeGrouped {
4068            group,
4069            envelope,
4070            project_info: Arc::new(project_info),
4071            rate_limits: Default::default(),
4072            sampling_project_info: None,
4073            reservoir_counters: ReservoirCounters::default(),
4074        };
4075
4076        let Ok(Some(Submit::Envelope(mut new_envelope))) =
4077            processor.process(&mut Token::noop(), message).await
4078        else {
4079            panic!();
4080        };
4081        let new_envelope = new_envelope.envelope_mut();
4082
4083        let event_item = new_envelope.items().last().unwrap();
4084        let annotated_event: Annotated<Event> =
4085            Annotated::from_json_bytes(&event_item.payload()).unwrap();
4086        let event = annotated_event.into_value().unwrap();
4087        let headers = event
4088            .request
4089            .into_value()
4090            .unwrap()
4091            .headers
4092            .into_value()
4093            .unwrap();
4094
4095        // IP-like data must be masked
4096        assert_eq!(
4097            Some(
4098                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
4099            ),
4100            headers.get_header("User-Agent")
4101        );
4102        // But we still get correct browser and version number
4103        let contexts = event.contexts.into_value().unwrap();
4104        let browser = contexts.0.get("browser").unwrap();
4105        assert_eq!(
4106            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
4107            browser.to_json().unwrap()
4108        );
4109    }
4110
4111    #[tokio::test]
4112    #[cfg(feature = "processing")]
4113    async fn test_materialize_dsc() {
4114        use crate::services::projects::project::PublicKeyConfig;
4115
4116        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
4117            .parse()
4118            .unwrap();
4119        let request_meta = RequestMeta::new(dsn);
4120        let mut envelope = Envelope::from_request(None, request_meta);
4121
4122        let dsc = r#"{
4123            "trace_id": "00000000-0000-0000-0000-000000000000",
4124            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
4125            "sample_rate": "0.2"
4126        }"#;
4127        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
4128
4129        let mut item = Item::new(ItemType::Event);
4130        item.set_payload(ContentType::Json, r#"{}"#);
4131        envelope.add_item(item);
4132
4133        let outcome_aggregator = Addr::dummy();
4134        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
4135
4136        let mut project_info = ProjectInfo::default();
4137        project_info.public_keys.push(PublicKeyConfig {
4138            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
4139            numeric_id: Some(1),
4140        });
4141        let project_info = Arc::new(project_info);
4142
4143        let message = ProcessEnvelopeGrouped {
4144            group: ProcessingGroup::Transaction,
4145            envelope: managed_envelope,
4146            project_info: project_info.clone(),
4147            rate_limits: Default::default(),
4148            sampling_project_info: Some(project_info),
4149            reservoir_counters: ReservoirCounters::default(),
4150        };
4151
4152        let config = Config::from_json_value(serde_json::json!({
4153            "processing": {
4154                "enabled": true,
4155                "kafka_config": [],
4156            }
4157        }))
4158        .unwrap();
4159
4160        let processor = create_test_processor(config).await;
4161        let Ok(Some(Submit::Envelope(envelope))) =
4162            processor.process(&mut Token::noop(), message).await
4163        else {
4164            panic!();
4165        };
4166        let event = envelope
4167            .envelope()
4168            .get_item_by(|item| item.ty() == &ItemType::Event)
4169            .unwrap();
4170
4171        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
4172        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
4173        Object(
4174            {
4175                "environment": ~,
4176                "public_key": String(
4177                    "e12d836b15bb49d7bbf99e64295d995b",
4178                ),
4179                "release": ~,
4180                "replay_id": ~,
4181                "sample_rate": String(
4182                    "0.2",
4183                ),
4184                "trace_id": String(
4185                    "00000000000000000000000000000000",
4186                ),
4187                "transaction": ~,
4188            },
4189        )
4190        "###);
4191    }
4192
4193    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
4194        let mut event = Annotated::<Event>::from_json(
4195            r#"
4196            {
4197                "type": "transaction",
4198                "transaction": "/foo/",
4199                "timestamp": 946684810.0,
4200                "start_timestamp": 946684800.0,
4201                "contexts": {
4202                    "trace": {
4203                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
4204                        "span_id": "fa90fdead5f74053",
4205                        "op": "http.server",
4206                        "type": "trace"
4207                    }
4208                },
4209                "transaction_info": {
4210                    "source": "url"
4211                }
4212            }
4213            "#,
4214        )
4215        .unwrap();
4216        let e = event.value_mut().as_mut().unwrap();
4217        e.transaction.set_value(Some(transaction_name.into()));
4218
4219        e.transaction_info
4220            .value_mut()
4221            .as_mut()
4222            .unwrap()
4223            .source
4224            .set_value(Some(source));
4225
4226        relay_statsd::with_capturing_test_client(|| {
4227            utils::log_transaction_name_metrics(&mut event, |event| {
4228                let config = NormalizationConfig {
4229                    transaction_name_config: TransactionNameConfig {
4230                        rules: &[TransactionNameRule {
4231                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
4232                            expiry: DateTime::<Utc>::MAX_UTC,
4233                            redaction: RedactionRule::Replace {
4234                                substitution: "*".to_owned(),
4235                            },
4236                        }],
4237                    },
4238                    ..Default::default()
4239                };
4240                normalize_event(event, &config)
4241            });
4242        })
4243    }
4244
4245    #[test]
4246    fn test_log_transaction_metrics_none() {
4247        let captures = capture_test_event("/nothing", TransactionSource::Url);
4248        insta::assert_debug_snapshot!(captures, @r###"
4249        [
4250            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
4251        ]
4252        "###);
4253    }
4254
4255    #[test]
4256    fn test_log_transaction_metrics_rule() {
4257        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
4258        insta::assert_debug_snapshot!(captures, @r###"
4259        [
4260            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
4261        ]
4262        "###);
4263    }
4264
4265    #[test]
4266    fn test_log_transaction_metrics_pattern() {
4267        let captures = capture_test_event("/something/12345", TransactionSource::Url);
4268        insta::assert_debug_snapshot!(captures, @r###"
4269        [
4270            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
4271        ]
4272        "###);
4273    }
4274
4275    #[test]
4276    fn test_log_transaction_metrics_both() {
4277        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
4278        insta::assert_debug_snapshot!(captures, @r###"
4279        [
4280            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
4281        ]
4282        "###);
4283    }
4284
4285    #[test]
4286    fn test_log_transaction_metrics_no_match() {
4287        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
4288        insta::assert_debug_snapshot!(captures, @r###"
4289        [
4290            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
4291        ]
4292        "###);
4293    }
4294
4295    /// Confirms that the hardcoded value we use for the fixed length of the measurement MRI is
4296    /// correct. Unit test is placed here because it has dependencies to relay-server and therefore
4297    /// cannot be called from relay-metrics.
4298    #[test]
4299    fn test_mri_overhead_constant() {
4300        let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
4301
4302        let derived_value = {
4303            let name = "foobar".to_owned();
4304            let value = 5.into(); // Arbitrary value.
4305            let unit = MetricUnit::Duration(DurationUnit::default());
4306            let tags = TransactionMeasurementTags {
4307                measurement_rating: None,
4308                universal_tags: CommonTags(BTreeMap::new()),
4309                score_profile_version: None,
4310            };
4311
4312            let measurement = TransactionMetric::Measurement {
4313                name: name.clone(),
4314                value,
4315                unit,
4316                tags,
4317            };
4318
4319            let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
4320            metric.name.len() - unit.to_string().len() - name.len()
4321        };
4322        assert_eq!(
4323            hardcoded_value, derived_value,
4324            "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
4325        );
4326    }
4327
4328    #[tokio::test]
4329    async fn test_process_metrics_bucket_metadata() {
4330        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
4331        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
4332        let received_at = Utc::now();
4333        let config = Config::default();
4334
4335        let (aggregator, mut aggregator_rx) = Addr::custom();
4336        let processor = create_test_processor_with_addrs(
4337            config,
4338            Addrs {
4339                aggregator,
4340                ..Default::default()
4341            },
4342        )
4343        .await;
4344
4345        let mut item = Item::new(ItemType::Statsd);
4346        item.set_payload(
4347            ContentType::Text,
4348            "transactions/foo:3182887624:4267882815|s",
4349        );
4350        for (source, expected_received_at) in [
4351            (
4352                BucketSource::External,
4353                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
4354            ),
4355            (BucketSource::Internal, None),
4356        ] {
4357            let message = ProcessMetrics {
4358                data: MetricData::Raw(vec![item.clone()]),
4359                project_key,
4360                source,
4361                received_at,
4362                sent_at: Some(Utc::now()),
4363            };
4364            processor.handle_process_metrics(&mut token, message);
4365
4366            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
4367            let buckets = merge_buckets.buckets;
4368            assert_eq!(buckets.len(), 1);
4369            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
4370        }
4371    }
4372
4373    #[tokio::test]
4374    async fn test_process_batched_metrics() {
4375        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
4376        let received_at = Utc::now();
4377        let config = Config::default();
4378
4379        let (aggregator, mut aggregator_rx) = Addr::custom();
4380        let processor = create_test_processor_with_addrs(
4381            config,
4382            Addrs {
4383                aggregator,
4384                ..Default::default()
4385            },
4386        )
4387        .await;
4388
4389        let payload = r#"{
4390    "buckets": {
4391        "11111111111111111111111111111111": [
4392            {
4393                "timestamp": 1615889440,
4394                "width": 0,
4395                "name": "d:custom/endpoint.response_time@millisecond",
4396                "type": "d",
4397                "value": [
4398                  68.0
4399                ],
4400                "tags": {
4401                  "route": "user_index"
4402                }
4403            }
4404        ],
4405        "22222222222222222222222222222222": [
4406            {
4407                "timestamp": 1615889440,
4408                "width": 0,
4409                "name": "d:custom/endpoint.cache_rate@none",
4410                "type": "d",
4411                "value": [
4412                  36.0
4413                ]
4414            }
4415        ]
4416    }
4417}
4418"#;
4419        let message = ProcessBatchedMetrics {
4420            payload: Bytes::from(payload),
4421            source: BucketSource::Internal,
4422            received_at,
4423            sent_at: Some(Utc::now()),
4424        };
4425        processor.handle_process_batched_metrics(&mut token, message);
4426
4427        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
4428        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
4429
4430        let mut messages = vec![mb1, mb2];
4431        messages.sort_by_key(|mb| mb.project_key);
4432
4433        let actual = messages
4434            .into_iter()
4435            .map(|mb| (mb.project_key, mb.buckets))
4436            .collect::<Vec<_>>();
4437
4438        assert_debug_snapshot!(actual, @r###"
4439        [
4440            (
4441                ProjectKey("11111111111111111111111111111111"),
4442                [
4443                    Bucket {
4444                        timestamp: UnixTimestamp(1615889440),
4445                        width: 0,
4446                        name: MetricName(
4447                            "d:custom/endpoint.response_time@millisecond",
4448                        ),
4449                        value: Distribution(
4450                            [
4451                                68.0,
4452                            ],
4453                        ),
4454                        tags: {
4455                            "route": "user_index",
4456                        },
4457                        metadata: BucketMetadata {
4458                            merges: 1,
4459                            received_at: None,
4460                            extracted_from_indexed: false,
4461                        },
4462                    },
4463                ],
4464            ),
4465            (
4466                ProjectKey("22222222222222222222222222222222"),
4467                [
4468                    Bucket {
4469                        timestamp: UnixTimestamp(1615889440),
4470                        width: 0,
4471                        name: MetricName(
4472                            "d:custom/endpoint.cache_rate@none",
4473                        ),
4474                        value: Distribution(
4475                            [
4476                                36.0,
4477                            ],
4478                        ),
4479                        tags: {},
4480                        metadata: BucketMetadata {
4481                            merges: 1,
4482                            received_at: None,
4483                            extracted_from_indexed: false,
4484                        },
4485                    },
4486                ],
4487            ),
4488        ]
4489        "###);
4490    }
4491}