1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::{Arc, Once};
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{CombinedMetricExtractionConfig, ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27    ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42    self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::{Integration, SpansIntegration};
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::types::ExtractMetricsError;
49use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor};
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::sessions::SessionsProcessor;
53use crate::processing::spans::SpansProcessor;
54use crate::processing::trace_metrics::TraceMetricsProcessor;
55use crate::processing::utils::event::{
56    EventFullyNormalized, EventMetricsExtracted, SpansExtracted, event_category, event_type,
57};
58use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
59use crate::service::ServiceError;
60use crate::services::global_config::GlobalConfigHandle;
61use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
62use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
63use crate::services::processor::event::FiltersStatus;
64use crate::services::projects::cache::ProjectCacheHandle;
65use crate::services::projects::project::{ProjectInfo, ProjectState};
66use crate::services::upstream::{
67    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
68};
69use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
70use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
71use crate::{http, processing};
72use relay_base_schema::organization::OrganizationId;
73use relay_threading::AsyncPool;
74#[cfg(feature = "processing")]
75use {
76    crate::services::global_rate_limits::{GlobalRateLimits, GlobalRateLimitsServiceHandle},
77    crate::services::processor::nnswitch::SwitchProcessingError,
78    crate::services::store::{Store, StoreEnvelope},
79    crate::utils::Enforcement,
80    itertools::Itertools,
81    relay_cardinality::{
82        CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
83        RedisSetLimiterOptions,
84    },
85    relay_dynamic_config::{CardinalityLimiterMode, MetricExtractionGroups},
86    relay_quotas::{RateLimitingError, RedisRateLimiter},
87    relay_redis::{AsyncRedisClient, RedisClients},
88    std::time::Instant,
89    symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
90};
91
92mod attachment;
93mod dynamic_sampling;
94mod event;
95mod metrics;
96mod nel;
97mod profile;
98mod profile_chunk;
99mod replay;
100mod report;
101mod span;
102
103#[cfg(all(sentry, feature = "processing"))]
104mod playstation;
105mod standalone;
106#[cfg(feature = "processing")]
107mod unreal;
108
109#[cfg(feature = "processing")]
110mod nnswitch;
111
112macro_rules! if_processing {
116    ($config:expr, $if_true:block) => {
117        #[cfg(feature = "processing")] {
118            if $config.processing_enabled() $if_true
119        }
120    };
121    ($config:expr, $if_true:block else $if_false:block) => {
122        {
123            #[cfg(feature = "processing")] {
124                if $config.processing_enabled() $if_true else $if_false
125            }
126            #[cfg(not(feature = "processing"))] {
127                $if_false
128            }
129        }
130    };
131}
132
133pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
135
136#[derive(Debug)]
137pub struct GroupTypeError;
138
139impl Display for GroupTypeError {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        f.write_str("failed to convert processing group into corresponding type")
142    }
143}
144
145impl std::error::Error for GroupTypeError {}
146
147macro_rules! processing_group {
148    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
149        #[derive(Clone, Copy, Debug)]
150        pub struct $ty;
151
152        impl From<$ty> for ProcessingGroup {
153            fn from(_: $ty) -> Self {
154                ProcessingGroup::$variant
155            }
156        }
157
158        impl TryFrom<ProcessingGroup> for $ty {
159            type Error = GroupTypeError;
160
161            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
162                if matches!(value, ProcessingGroup::$variant) {
163                    return Ok($ty);
164                }
165                $($(
166                    if matches!(value, ProcessingGroup::$other) {
167                        return Ok($ty);
168                    }
169                )+)?
170                return Err(GroupTypeError);
171            }
172        }
173    };
174}
175
176pub trait EventProcessing {}
180
181pub trait Sampling {
183    fn supports_sampling(project_info: &ProjectInfo) -> bool;
185
186    fn supports_reservoir_sampling() -> bool;
188}
189
190processing_group!(TransactionGroup, Transaction);
191impl EventProcessing for TransactionGroup {}
192
193impl Sampling for TransactionGroup {
194    fn supports_sampling(project_info: &ProjectInfo) -> bool {
195        matches!(&project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled())
197    }
198
199    fn supports_reservoir_sampling() -> bool {
200        true
201    }
202}
203
204processing_group!(ErrorGroup, Error);
205impl EventProcessing for ErrorGroup {}
206
207processing_group!(SessionGroup, Session);
208processing_group!(StandaloneGroup, Standalone);
209processing_group!(ClientReportGroup, ClientReport);
210processing_group!(ReplayGroup, Replay);
211processing_group!(CheckInGroup, CheckIn);
212processing_group!(LogGroup, Log, Nel);
213processing_group!(TraceMetricGroup, TraceMetric);
214processing_group!(SpanGroup, Span);
215
216impl Sampling for SpanGroup {
217    fn supports_sampling(project_info: &ProjectInfo) -> bool {
218        matches!(&project_info.config().metric_extraction, ErrorBoundary::Ok(c) if c.is_supported())
220    }
221
222    fn supports_reservoir_sampling() -> bool {
223        false
224    }
225}
226
227processing_group!(ProfileChunkGroup, ProfileChunk);
228processing_group!(MetricsGroup, Metrics);
229processing_group!(ForwardUnknownGroup, ForwardUnknown);
230processing_group!(Ungrouped, Ungrouped);
231
232#[derive(Clone, Copy, Debug)]
236pub struct Processed;
237
238#[derive(Clone, Copy, Debug)]
240pub enum ProcessingGroup {
241    Transaction,
245    Error,
250    Session,
252    Standalone,
255    ClientReport,
257    Replay,
259    CheckIn,
261    Nel,
263    Log,
265    TraceMetric,
267    Span,
269    SpanV2,
271    Metrics,
273    ProfileChunk,
275    ForwardUnknown,
278    Ungrouped,
280}
281
282impl ProcessingGroup {
283    fn split_envelope(
285        mut envelope: Envelope,
286        project_info: &ProjectInfo,
287    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
288        let headers = envelope.headers().clone();
289        let mut grouped_envelopes = smallvec![];
290
291        let replay_items = envelope.take_items_by(|item| {
293            matches!(
294                item.ty(),
295                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
296            )
297        });
298        if !replay_items.is_empty() {
299            grouped_envelopes.push((
300                ProcessingGroup::Replay,
301                Envelope::from_parts(headers.clone(), replay_items),
302            ))
303        }
304
305        let session_items = envelope
307            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
308        if !session_items.is_empty() {
309            grouped_envelopes.push((
310                ProcessingGroup::Session,
311                Envelope::from_parts(headers.clone(), session_items),
312            ))
313        }
314
315        let span_v2_items = envelope.take_items_by(|item| {
316            let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
317            let is_supported_integration = matches!(
318                item.integration(),
319                Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
320            );
321            let is_span = matches!(item.ty(), &ItemType::Span);
322
323            ItemContainer::<SpanV2>::is_container(item)
324                || (exp_feature && is_span)
325                || (exp_feature && is_supported_integration)
326        });
327
328        if !span_v2_items.is_empty() {
329            grouped_envelopes.push((
330                ProcessingGroup::SpanV2,
331                Envelope::from_parts(headers.clone(), span_v2_items),
332            ))
333        }
334
335        let span_items = envelope.take_items_by(|item| {
337            matches!(item.ty(), &ItemType::Span)
338                || matches!(item.integration(), Some(Integration::Spans(_)))
339        });
340
341        if !span_items.is_empty() {
342            grouped_envelopes.push((
343                ProcessingGroup::Span,
344                Envelope::from_parts(headers.clone(), span_items),
345            ))
346        }
347
348        let logs_items = envelope.take_items_by(|item| {
350            matches!(item.ty(), &ItemType::Log)
351                || matches!(item.integration(), Some(Integration::Logs(_)))
352        });
353        if !logs_items.is_empty() {
354            grouped_envelopes.push((
355                ProcessingGroup::Log,
356                Envelope::from_parts(headers.clone(), logs_items),
357            ))
358        }
359
360        let trace_metric_items =
362            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
363        if !trace_metric_items.is_empty() {
364            grouped_envelopes.push((
365                ProcessingGroup::TraceMetric,
366                Envelope::from_parts(headers.clone(), trace_metric_items),
367            ))
368        }
369
370        let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
372        if !nel_items.is_empty() {
373            grouped_envelopes.push((
374                ProcessingGroup::Nel,
375                Envelope::from_parts(headers.clone(), nel_items),
376            ))
377        }
378
379        let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
384        if !metric_items.is_empty() {
385            grouped_envelopes.push((
386                ProcessingGroup::Metrics,
387                Envelope::from_parts(headers.clone(), metric_items),
388            ))
389        }
390
391        let profile_chunk_items =
393            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
394        if !profile_chunk_items.is_empty() {
395            grouped_envelopes.push((
396                ProcessingGroup::ProfileChunk,
397                Envelope::from_parts(headers.clone(), profile_chunk_items),
398            ))
399        }
400
401        if !envelope.items().any(Item::creates_event) {
406            let standalone_items = envelope.take_items_by(Item::requires_event);
407            if !standalone_items.is_empty() {
408                grouped_envelopes.push((
409                    ProcessingGroup::Standalone,
410                    Envelope::from_parts(headers.clone(), standalone_items),
411                ))
412            }
413        };
414
415        let security_reports_items = envelope
417            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
418            .into_iter()
419            .map(|item| {
420                let headers = headers.clone();
421                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
422                let mut envelope = Envelope::from_parts(headers, items);
423                envelope.set_event_id(EventId::new());
424                (ProcessingGroup::Error, envelope)
425            });
426        grouped_envelopes.extend(security_reports_items);
427
428        let require_event_items = envelope.take_items_by(Item::requires_event);
430        if !require_event_items.is_empty() {
431            let group = if require_event_items
432                .iter()
433                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
434            {
435                ProcessingGroup::Transaction
436            } else {
437                ProcessingGroup::Error
438            };
439
440            grouped_envelopes.push((
441                group,
442                Envelope::from_parts(headers.clone(), require_event_items),
443            ))
444        }
445
446        let envelopes = envelope.items_mut().map(|item| {
448            let headers = headers.clone();
449            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
450            let envelope = Envelope::from_parts(headers, items);
451            let item_type = item.ty();
452            let group = if matches!(item_type, &ItemType::CheckIn) {
453                ProcessingGroup::CheckIn
454            } else if matches!(item.ty(), &ItemType::ClientReport) {
455                ProcessingGroup::ClientReport
456            } else if matches!(item_type, &ItemType::Unknown(_)) {
457                ProcessingGroup::ForwardUnknown
458            } else {
459                ProcessingGroup::Ungrouped
461            };
462
463            (group, envelope)
464        });
465        grouped_envelopes.extend(envelopes);
466
467        grouped_envelopes
468    }
469
470    pub fn variant(&self) -> &'static str {
472        match self {
473            ProcessingGroup::Transaction => "transaction",
474            ProcessingGroup::Error => "error",
475            ProcessingGroup::Session => "session",
476            ProcessingGroup::Standalone => "standalone",
477            ProcessingGroup::ClientReport => "client_report",
478            ProcessingGroup::Replay => "replay",
479            ProcessingGroup::CheckIn => "check_in",
480            ProcessingGroup::Log => "log",
481            ProcessingGroup::TraceMetric => "trace_metric",
482            ProcessingGroup::Nel => "nel",
483            ProcessingGroup::Span => "span",
484            ProcessingGroup::SpanV2 => "span_v2",
485            ProcessingGroup::Metrics => "metrics",
486            ProcessingGroup::ProfileChunk => "profile_chunk",
487            ProcessingGroup::ForwardUnknown => "forward_unknown",
488            ProcessingGroup::Ungrouped => "ungrouped",
489        }
490    }
491}
492
493impl From<ProcessingGroup> for AppFeature {
494    fn from(value: ProcessingGroup) -> Self {
495        match value {
496            ProcessingGroup::Transaction => AppFeature::Transactions,
497            ProcessingGroup::Error => AppFeature::Errors,
498            ProcessingGroup::Session => AppFeature::Sessions,
499            ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
500            ProcessingGroup::ClientReport => AppFeature::ClientReports,
501            ProcessingGroup::Replay => AppFeature::Replays,
502            ProcessingGroup::CheckIn => AppFeature::CheckIns,
503            ProcessingGroup::Log => AppFeature::Logs,
504            ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
505            ProcessingGroup::Nel => AppFeature::Logs,
506            ProcessingGroup::Span => AppFeature::Spans,
507            ProcessingGroup::SpanV2 => AppFeature::Spans,
508            ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
509            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
510            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
511            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
512        }
513    }
514}
515
516#[derive(Debug, thiserror::Error)]
518pub enum ProcessingError {
519    #[error("invalid json in event")]
520    InvalidJson(#[source] serde_json::Error),
521
522    #[error("invalid message pack event payload")]
523    InvalidMsgpack(#[from] rmp_serde::decode::Error),
524
525    #[cfg(feature = "processing")]
526    #[error("invalid unreal crash report")]
527    InvalidUnrealReport(#[source] Unreal4Error),
528
529    #[error("event payload too large")]
530    PayloadTooLarge(DiscardItemType),
531
532    #[error("invalid transaction event")]
533    InvalidTransaction,
534
535    #[error("envelope processor failed")]
536    ProcessingFailed(#[from] ProcessingAction),
537
538    #[error("duplicate {0} in event")]
539    DuplicateItem(ItemType),
540
541    #[error("failed to extract event payload")]
542    NoEventPayload,
543
544    #[error("missing project id in DSN")]
545    MissingProjectId,
546
547    #[error("invalid security report type: {0:?}")]
548    InvalidSecurityType(Bytes),
549
550    #[error("unsupported security report type")]
551    UnsupportedSecurityType,
552
553    #[error("invalid security report")]
554    InvalidSecurityReport(#[source] serde_json::Error),
555
556    #[error("invalid nel report")]
557    InvalidNelReport(#[source] NetworkReportError),
558
559    #[error("event filtered with reason: {0:?}")]
560    EventFiltered(FilterStatKey),
561
562    #[error("missing or invalid required event timestamp")]
563    InvalidTimestamp,
564
565    #[error("could not serialize event payload")]
566    SerializeFailed(#[source] serde_json::Error),
567
568    #[cfg(feature = "processing")]
569    #[error("failed to apply quotas")]
570    QuotasFailed(#[from] RateLimitingError),
571
572    #[error("invalid pii config")]
573    PiiConfigError(PiiConfigError),
574
575    #[error("invalid processing group type")]
576    InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
577
578    #[error("invalid replay")]
579    InvalidReplay(DiscardReason),
580
581    #[error("replay filtered with reason: {0:?}")]
582    ReplayFiltered(FilterStatKey),
583
584    #[cfg(feature = "processing")]
585    #[error("nintendo switch dying message processing failed {0:?}")]
586    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
587
588    #[cfg(all(sentry, feature = "processing"))]
589    #[error("playstation dump processing failed: {0}")]
590    InvalidPlaystationDump(String),
591
592    #[error("processing group does not match specific processor")]
593    ProcessingGroupMismatch,
594    #[error("new processing pipeline failed")]
595    ProcessingFailure,
596}
597
598impl ProcessingError {
599    pub fn to_outcome(&self) -> Option<Outcome> {
600        match self {
601            Self::PayloadTooLarge(payload_type) => {
602                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
603            }
604            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
605            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
606            Self::InvalidSecurityType(_) => {
607                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
608            }
609            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
610            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
611            Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
612            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
613            Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
614            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
615            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
616            #[cfg(feature = "processing")]
617            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
618            #[cfg(all(sentry, feature = "processing"))]
619            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
620            #[cfg(feature = "processing")]
621            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
622                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
623            }
624            #[cfg(feature = "processing")]
625            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
626            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
627                Some(Outcome::Invalid(DiscardReason::Internal))
628            }
629            #[cfg(feature = "processing")]
630            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
631            Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
632            Self::MissingProjectId => None,
633            Self::EventFiltered(_) => None,
634            Self::InvalidProcessingGroup(_) => None,
635            Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
636            Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
637
638            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
639            Self::ProcessingFailure => None,
641        }
642    }
643
644    fn is_unexpected(&self) -> bool {
645        self.to_outcome()
646            .is_some_and(|outcome| outcome.is_unexpected())
647    }
648}
649
650#[cfg(feature = "processing")]
651impl From<Unreal4Error> for ProcessingError {
652    fn from(err: Unreal4Error) -> Self {
653        match err.kind() {
654            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
655            _ => ProcessingError::InvalidUnrealReport(err),
656        }
657    }
658}
659
660impl From<ExtractMetricsError> for ProcessingError {
661    fn from(error: ExtractMetricsError) -> Self {
662        match error {
663            ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
664                Self::InvalidTimestamp
665            }
666        }
667    }
668}
669
670impl From<InvalidProcessingGroupType> for ProcessingError {
671    fn from(value: InvalidProcessingGroupType) -> Self {
672        Self::InvalidProcessingGroup(Box::new(value))
673    }
674}
675
676type ExtractedEvent = (Annotated<Event>, usize);
677
678#[derive(Debug)]
683pub struct ProcessingExtractedMetrics {
684    metrics: ExtractedMetrics,
685}
686
687impl ProcessingExtractedMetrics {
688    pub fn new() -> Self {
689        Self {
690            metrics: ExtractedMetrics::default(),
691        }
692    }
693
694    pub fn extend(
696        &mut self,
697        extracted: ExtractedMetrics,
698        sampling_decision: Option<SamplingDecision>,
699    ) {
700        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
701        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
702    }
703
704    pub fn extend_project_metrics<I>(
706        &mut self,
707        buckets: I,
708        sampling_decision: Option<SamplingDecision>,
709    ) where
710        I: IntoIterator<Item = Bucket>,
711    {
712        self.metrics
713            .project_metrics
714            .extend(buckets.into_iter().map(|mut bucket| {
715                bucket.metadata.extracted_from_indexed =
716                    sampling_decision == Some(SamplingDecision::Keep);
717                bucket
718            }));
719    }
720
721    pub fn extend_sampling_metrics<I>(
723        &mut self,
724        buckets: I,
725        sampling_decision: Option<SamplingDecision>,
726    ) where
727        I: IntoIterator<Item = Bucket>,
728    {
729        self.metrics
730            .sampling_metrics
731            .extend(buckets.into_iter().map(|mut bucket| {
732                bucket.metadata.extracted_from_indexed =
733                    sampling_decision == Some(SamplingDecision::Keep);
734                bucket
735            }));
736    }
737
738    #[cfg(feature = "processing")]
743    fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
744        let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
746        let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
749
750        for (namespace, limit, indexed) in [
751            (
752                MetricNamespace::Transactions,
753                &enforcement.event,
754                &enforcement.event_indexed,
755            ),
756            (
757                MetricNamespace::Spans,
758                &enforcement.spans,
759                &enforcement.spans_indexed,
760            ),
761        ] {
762            if limit.is_active() {
763                drop_namespaces.push(namespace);
764            } else if indexed.is_active() && !enforced_consistently {
765                reset_extracted_from_indexed.push(namespace);
770            }
771        }
772
773        if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
774            self.retain_mut(|bucket| {
775                let Some(namespace) = bucket.name.try_namespace() else {
776                    return true;
777                };
778
779                if drop_namespaces.contains(&namespace) {
780                    return false;
781                }
782
783                if reset_extracted_from_indexed.contains(&namespace) {
784                    bucket.metadata.extracted_from_indexed = false;
785                }
786
787                true
788            });
789        }
790    }
791
792    #[cfg(feature = "processing")]
793    fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
794        self.metrics.project_metrics.retain_mut(&mut f);
795        self.metrics.sampling_metrics.retain_mut(&mut f);
796    }
797}
798
799fn send_metrics(
800    metrics: ExtractedMetrics,
801    project_key: ProjectKey,
802    sampling_key: Option<ProjectKey>,
803    aggregator: &Addr<Aggregator>,
804) {
805    let ExtractedMetrics {
806        project_metrics,
807        sampling_metrics,
808    } = metrics;
809
810    if !project_metrics.is_empty() {
811        aggregator.send(MergeBuckets {
812            project_key,
813            buckets: project_metrics,
814        });
815    }
816
817    if !sampling_metrics.is_empty() {
818        let sampling_project_key = sampling_key.unwrap_or(project_key);
825        aggregator.send(MergeBuckets {
826            project_key: sampling_project_key,
827            buckets: sampling_metrics,
828        });
829    }
830}
831
832fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
837    match config.relay_mode() {
838        RelayMode::Proxy => false,
839        RelayMode::Managed => !project_info.has_feature(feature),
840    }
841}
842
843#[derive(Debug)]
846#[expect(
847    clippy::large_enum_variant,
848    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
849)]
850enum ProcessingResult {
851    Envelope {
852        managed_envelope: TypedEnvelope<Processed>,
853        extracted_metrics: ProcessingExtractedMetrics,
854    },
855    Output(Output<Outputs>),
856}
857
858impl ProcessingResult {
859    fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
861        Self::Envelope {
862            managed_envelope,
863            extracted_metrics: ProcessingExtractedMetrics::new(),
864        }
865    }
866}
867
868#[derive(Debug)]
870#[expect(
871    clippy::large_enum_variant,
872    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
873)]
874enum Submit<'a> {
875    Envelope(TypedEnvelope<Processed>),
877    Output {
879        output: Outputs,
880        ctx: processing::ForwardContext<'a>,
881    },
882}
883
884#[derive(Debug)]
894pub struct ProcessEnvelope {
895    pub envelope: ManagedEnvelope,
897    pub project_info: Arc<ProjectInfo>,
899    pub rate_limits: Arc<RateLimits>,
901    pub sampling_project_info: Option<Arc<ProjectInfo>>,
903    pub reservoir_counters: ReservoirCounters,
905}
906
907#[derive(Debug)]
909struct ProcessEnvelopeGrouped<'a> {
910    pub group: ProcessingGroup,
912    pub envelope: ManagedEnvelope,
914    pub ctx: processing::Context<'a>,
916    pub reservoir_counters: &'a ReservoirCounters,
918}
919
920#[derive(Debug)]
932pub struct ProcessMetrics {
933    pub data: MetricData,
935    pub project_key: ProjectKey,
937    pub source: BucketSource,
939    pub received_at: DateTime<Utc>,
941    pub sent_at: Option<DateTime<Utc>>,
944}
945
946#[derive(Debug)]
948pub enum MetricData {
949    Raw(Vec<Item>),
951    Parsed(Vec<Bucket>),
953}
954
955impl MetricData {
956    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
961        let items = match self {
962            Self::Parsed(buckets) => return buckets,
963            Self::Raw(items) => items,
964        };
965
966        let mut buckets = Vec::new();
967        for item in items {
968            let payload = item.payload();
969            if item.ty() == &ItemType::Statsd {
970                for bucket_result in Bucket::parse_all(&payload, timestamp) {
971                    match bucket_result {
972                        Ok(bucket) => buckets.push(bucket),
973                        Err(error) => relay_log::debug!(
974                            error = &error as &dyn Error,
975                            "failed to parse metric bucket from statsd format",
976                        ),
977                    }
978                }
979            } else if item.ty() == &ItemType::MetricBuckets {
980                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
981                    Ok(parsed_buckets) => {
982                        if buckets.is_empty() {
984                            buckets = parsed_buckets;
985                        } else {
986                            buckets.extend(parsed_buckets);
987                        }
988                    }
989                    Err(error) => {
990                        relay_log::debug!(
991                            error = &error as &dyn Error,
992                            "failed to parse metric bucket",
993                        );
994                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
995                    }
996                }
997            } else {
998                relay_log::error!(
999                    "invalid item of type {} passed to ProcessMetrics",
1000                    item.ty()
1001                );
1002            }
1003        }
1004        buckets
1005    }
1006}
1007
1008#[derive(Debug)]
1009pub struct ProcessBatchedMetrics {
1010    pub payload: Bytes,
1012    pub source: BucketSource,
1014    pub received_at: DateTime<Utc>,
1016    pub sent_at: Option<DateTime<Utc>>,
1018}
1019
1020#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1022pub enum BucketSource {
1023    Internal,
1029    External,
1034}
1035
1036impl BucketSource {
1037    pub fn from_meta(meta: &RequestMeta) -> Self {
1039        match meta.request_trust() {
1040            RequestTrust::Trusted => Self::Internal,
1041            RequestTrust::Untrusted => Self::External,
1042        }
1043    }
1044}
1045
1046#[derive(Debug)]
1048pub struct SubmitClientReports {
1049    pub client_reports: Vec<ClientReport>,
1051    pub scoping: Scoping,
1053}
1054
1055#[derive(Debug)]
1057pub enum EnvelopeProcessor {
1058    ProcessEnvelope(Box<ProcessEnvelope>),
1059    ProcessProjectMetrics(Box<ProcessMetrics>),
1060    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1061    FlushBuckets(Box<FlushBuckets>),
1062    SubmitClientReports(Box<SubmitClientReports>),
1063}
1064
1065impl EnvelopeProcessor {
1066    pub fn variant(&self) -> &'static str {
1068        match self {
1069            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1070            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1071            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1072            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1073            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1074        }
1075    }
1076}
1077
1078impl relay_system::Interface for EnvelopeProcessor {}
1079
1080impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1081    type Response = relay_system::NoResponse;
1082
1083    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1084        Self::ProcessEnvelope(Box::new(message))
1085    }
1086}
1087
1088impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1089    type Response = NoResponse;
1090
1091    fn from_message(message: ProcessMetrics, _: ()) -> Self {
1092        Self::ProcessProjectMetrics(Box::new(message))
1093    }
1094}
1095
1096impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1097    type Response = NoResponse;
1098
1099    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1100        Self::ProcessBatchedMetrics(Box::new(message))
1101    }
1102}
1103
1104impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1105    type Response = NoResponse;
1106
1107    fn from_message(message: FlushBuckets, _: ()) -> Self {
1108        Self::FlushBuckets(Box::new(message))
1109    }
1110}
1111
1112impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1113    type Response = NoResponse;
1114
1115    fn from_message(message: SubmitClientReports, _: ()) -> Self {
1116        Self::SubmitClientReports(Box::new(message))
1117    }
1118}
1119
1120pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1122
1123#[derive(Clone)]
1127pub struct EnvelopeProcessorService {
1128    inner: Arc<InnerProcessor>,
1129}
1130
1131pub struct Addrs {
1133    pub outcome_aggregator: Addr<TrackOutcome>,
1134    pub upstream_relay: Addr<UpstreamRelay>,
1135    #[cfg(feature = "processing")]
1136    pub store_forwarder: Option<Addr<Store>>,
1137    pub aggregator: Addr<Aggregator>,
1138    #[cfg(feature = "processing")]
1139    pub global_rate_limits: Option<Addr<GlobalRateLimits>>,
1140}
1141
1142impl Default for Addrs {
1143    fn default() -> Self {
1144        Addrs {
1145            outcome_aggregator: Addr::dummy(),
1146            upstream_relay: Addr::dummy(),
1147            #[cfg(feature = "processing")]
1148            store_forwarder: None,
1149            aggregator: Addr::dummy(),
1150            #[cfg(feature = "processing")]
1151            global_rate_limits: None,
1152        }
1153    }
1154}
1155
1156struct InnerProcessor {
1157    pool: EnvelopeProcessorServicePool,
1158    config: Arc<Config>,
1159    global_config: GlobalConfigHandle,
1160    project_cache: ProjectCacheHandle,
1161    cogs: Cogs,
1162    #[cfg(feature = "processing")]
1163    quotas_client: Option<AsyncRedisClient>,
1164    addrs: Addrs,
1165    #[cfg(feature = "processing")]
1166    rate_limiter: Option<Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>>,
1167    geoip_lookup: GeoIpLookup,
1168    #[cfg(feature = "processing")]
1169    cardinality_limiter: Option<CardinalityLimiter>,
1170    metric_outcomes: MetricOutcomes,
1171    processing: Processing,
1172}
1173
1174struct Processing {
1175    logs: LogsProcessor,
1176    trace_metrics: TraceMetricsProcessor,
1177    spans: SpansProcessor,
1178    check_ins: CheckInsProcessor,
1179    sessions: SessionsProcessor,
1180}
1181
1182impl EnvelopeProcessorService {
1183    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1185    pub fn new(
1186        pool: EnvelopeProcessorServicePool,
1187        config: Arc<Config>,
1188        global_config: GlobalConfigHandle,
1189        project_cache: ProjectCacheHandle,
1190        cogs: Cogs,
1191        #[cfg(feature = "processing")] redis: Option<RedisClients>,
1192        addrs: Addrs,
1193        metric_outcomes: MetricOutcomes,
1194    ) -> Self {
1195        let geoip_lookup = config
1196            .geoip_path()
1197            .and_then(
1198                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1199                    Ok(geoip) => Some(geoip),
1200                    Err(err) => {
1201                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1202                        None
1203                    }
1204                },
1205            )
1206            .unwrap_or_else(GeoIpLookup::empty);
1207
1208        #[cfg(feature = "processing")]
1209        let (cardinality, quotas) = match redis {
1210            Some(RedisClients {
1211                cardinality,
1212                quotas,
1213                ..
1214            }) => (Some(cardinality), Some(quotas)),
1215            None => (None, None),
1216        };
1217
1218        #[cfg(feature = "processing")]
1219        let global_rate_limits = addrs.global_rate_limits.clone().map(Into::into);
1220
1221        #[cfg(feature = "processing")]
1222        let rate_limiter = match (quotas.clone(), global_rate_limits) {
1223            (Some(redis), Some(global)) => {
1224                Some(RedisRateLimiter::new(redis, global).max_limit(config.max_rate_limit()))
1225            }
1226            _ => None,
1227        };
1228
1229        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1230            #[cfg(feature = "processing")]
1231            project_cache.clone(),
1232            #[cfg(feature = "processing")]
1233            rate_limiter.clone(),
1234        ));
1235        #[cfg(feature = "processing")]
1236        let rate_limiter = rate_limiter.map(Arc::new);
1237
1238        let inner = InnerProcessor {
1239            pool,
1240            global_config,
1241            project_cache,
1242            cogs,
1243            #[cfg(feature = "processing")]
1244            quotas_client: quotas.clone(),
1245            #[cfg(feature = "processing")]
1246            rate_limiter,
1247            addrs,
1248            #[cfg(feature = "processing")]
1249            cardinality_limiter: cardinality
1250                .map(|cardinality| {
1251                    RedisSetLimiter::new(
1252                        RedisSetLimiterOptions {
1253                            cache_vacuum_interval: config
1254                                .cardinality_limiter_cache_vacuum_interval(),
1255                        },
1256                        cardinality,
1257                    )
1258                })
1259                .map(CardinalityLimiter::new),
1260            metric_outcomes,
1261            processing: Processing {
1262                logs: LogsProcessor::new(Arc::clone("a_limiter)),
1263                trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)),
1264                spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1265                check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)),
1266                sessions: SessionsProcessor::new(quota_limiter),
1267            },
1268            geoip_lookup,
1269            config,
1270        };
1271
1272        Self {
1273            inner: Arc::new(inner),
1274        }
1275    }
1276
1277    async fn enforce_quotas<Group>(
1278        &self,
1279        managed_envelope: &mut TypedEnvelope<Group>,
1280        event: Annotated<Event>,
1281        extracted_metrics: &mut ProcessingExtractedMetrics,
1282        ctx: processing::Context<'_>,
1283    ) -> Result<Annotated<Event>, ProcessingError> {
1284        let cached_result = RateLimiter::Cached
1287            .enforce(managed_envelope, event, extracted_metrics, ctx)
1288            .await?;
1289
1290        if_processing!(self.inner.config, {
1291            let rate_limiter = match self.inner.rate_limiter.clone() {
1292                Some(rate_limiter) => rate_limiter,
1293                None => return Ok(cached_result.event),
1294            };
1295
1296            let consistent_result = RateLimiter::Consistent(rate_limiter)
1298                .enforce(
1299                    managed_envelope,
1300                    cached_result.event,
1301                    extracted_metrics,
1302                    ctx
1303                )
1304                .await?;
1305
1306            if !consistent_result.rate_limits.is_empty() {
1308                self.inner
1309                    .project_cache
1310                    .get(managed_envelope.scoping().project_key)
1311                    .rate_limits()
1312                    .merge(consistent_result.rate_limits);
1313            }
1314
1315            Ok(consistent_result.event)
1316        } else { Ok(cached_result.event) })
1317    }
1318
1319    #[allow(clippy::too_many_arguments)]
1321    fn extract_transaction_metrics(
1322        &self,
1323        managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1324        event: &mut Annotated<Event>,
1325        extracted_metrics: &mut ProcessingExtractedMetrics,
1326        project_id: ProjectId,
1327        project_info: &ProjectInfo,
1328        sampling_decision: SamplingDecision,
1329        event_metrics_extracted: EventMetricsExtracted,
1330        spans_extracted: SpansExtracted,
1331    ) -> Result<EventMetricsExtracted, ProcessingError> {
1332        if event_metrics_extracted.0 {
1333            return Ok(event_metrics_extracted);
1334        }
1335        let Some(event) = event.value_mut() else {
1336            return Ok(event_metrics_extracted);
1337        };
1338
1339        let global = self.inner.global_config.current();
1343        let combined_config = {
1344            let config = match &project_info.config.metric_extraction {
1345                ErrorBoundary::Ok(config) if config.is_supported() => config,
1346                _ => return Ok(event_metrics_extracted),
1347            };
1348            let global_config = match &global.metric_extraction {
1349                ErrorBoundary::Ok(global_config) => global_config,
1350                #[allow(unused_variables)]
1351                ErrorBoundary::Err(e) => {
1352                    if_processing!(self.inner.config, {
1353                        relay_log::error!("Failed to parse global extraction config {e}");
1356                        MetricExtractionGroups::EMPTY
1357                    } else {
1358                        relay_log::debug!("Failed to parse global extraction config: {e}");
1361                        return Ok(event_metrics_extracted);
1362                    })
1363                }
1364            };
1365            CombinedMetricExtractionConfig::new(global_config, config)
1366        };
1367
1368        let tx_config = match &project_info.config.transaction_metrics {
1370            Some(ErrorBoundary::Ok(tx_config)) => tx_config,
1371            Some(ErrorBoundary::Err(e)) => {
1372                relay_log::debug!("Failed to parse legacy transaction metrics config: {e}");
1373                return Ok(event_metrics_extracted);
1374            }
1375            None => {
1376                relay_log::debug!("Legacy transaction metrics config is missing");
1377                return Ok(event_metrics_extracted);
1378            }
1379        };
1380
1381        if !tx_config.is_enabled() {
1382            static TX_CONFIG_ERROR: Once = Once::new();
1383            TX_CONFIG_ERROR.call_once(|| {
1384                if self.inner.config.processing_enabled() {
1385                    relay_log::error!(
1386                        "Processing Relay outdated, received tx config in version {}, which is not supported",
1387                        tx_config.version
1388                    );
1389                }
1390            });
1391
1392            return Ok(event_metrics_extracted);
1393        }
1394
1395        let extract_spans = !spans_extracted.0
1397            && utils::sample(global.options.span_extraction_sample_rate.unwrap_or(1.0)).is_keep();
1398
1399        let metrics = crate::metrics_extraction::event::extract_metrics(
1400            event,
1401            crate::metrics_extraction::event::ExtractMetricsConfig {
1402                config: combined_config,
1403                sampling_decision,
1404                target_project_id: project_id,
1405                max_tag_value_size: self
1406                    .inner
1407                    .config
1408                    .aggregator_config_for(MetricNamespace::Spans)
1409                    .max_tag_value_length,
1410                extract_spans,
1411                transaction_from_dsc: managed_envelope
1412                    .envelope()
1413                    .dsc()
1414                    .and_then(|dsc| dsc.transaction.as_deref()),
1415            },
1416        );
1417
1418        extracted_metrics.extend(metrics, Some(sampling_decision));
1419
1420        if !project_info.has_feature(Feature::DiscardTransaction) {
1421            let transaction_from_dsc = managed_envelope
1422                .envelope()
1423                .dsc()
1424                .and_then(|dsc| dsc.transaction.as_deref());
1425
1426            let extractor = TransactionExtractor {
1427                config: tx_config,
1428                generic_config: Some(combined_config),
1429                transaction_from_dsc,
1430                sampling_decision,
1431                target_project_id: project_id,
1432            };
1433
1434            extracted_metrics.extend(extractor.extract(event)?, Some(sampling_decision));
1435        }
1436
1437        Ok(EventMetricsExtracted(true))
1438    }
1439
1440    async fn process_errors(
1442        &self,
1443        managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1444        project_id: ProjectId,
1445        mut ctx: processing::Context<'_>,
1446    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1447        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1448        let mut metrics = Metrics::default();
1449        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1450
1451        report::process_user_reports(managed_envelope);
1453
1454        if_processing!(self.inner.config, {
1455            unreal::expand(managed_envelope, &self.inner.config)?;
1456            #[cfg(sentry)]
1457            playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1458            nnswitch::expand(managed_envelope)?;
1459        });
1460
1461        let extraction_result = event::extract(
1462            managed_envelope,
1463            &mut metrics,
1464            event_fully_normalized,
1465            &self.inner.config,
1466        )?;
1467        let mut event = extraction_result.event;
1468
1469        if_processing!(self.inner.config, {
1470            if let Some(inner_event_fully_normalized) =
1471                unreal::process(managed_envelope, &mut event)?
1472            {
1473                event_fully_normalized = inner_event_fully_normalized;
1474            }
1475            #[cfg(sentry)]
1476            if let Some(inner_event_fully_normalized) =
1477                playstation::process(managed_envelope, &mut event, ctx.project_info)?
1478            {
1479                event_fully_normalized = inner_event_fully_normalized;
1480            }
1481            if let Some(inner_event_fully_normalized) =
1482                attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1483            {
1484                event_fully_normalized = inner_event_fully_normalized;
1485            }
1486        });
1487
1488        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1489            managed_envelope,
1490            &mut event,
1491            ctx.project_info,
1492            ctx.sampling_project_info,
1493        );
1494
1495        let attachments = managed_envelope
1496            .envelope()
1497            .items()
1498            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1499        processing::utils::event::finalize(
1500            managed_envelope.envelope().headers(),
1501            &mut event,
1502            attachments,
1503            &mut metrics,
1504            &self.inner.config,
1505        )?;
1506        event_fully_normalized = processing::utils::event::normalize(
1507            managed_envelope.envelope().headers(),
1508            &mut event,
1509            event_fully_normalized,
1510            &ctx,
1511            &self.inner.geoip_lookup,
1512        )?;
1513        let filter_run = event::filter(
1514            managed_envelope,
1515            &mut event,
1516            ctx.project_info,
1517            &self.inner.global_config.current(),
1518        )?;
1519
1520        if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1521            dynamic_sampling::tag_error_with_sampling_decision(
1522                managed_envelope,
1523                &mut event,
1524                ctx.sampling_project_info,
1525                &self.inner.config,
1526            )
1527            .await;
1528        }
1529
1530        event = self
1531            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1532            .await?;
1533
1534        if event.value().is_some() {
1535            event::scrub(&mut event, ctx.project_info)?;
1536            event::serialize(
1537                managed_envelope,
1538                &mut event,
1539                event_fully_normalized,
1540                EventMetricsExtracted(false),
1541                SpansExtracted(false),
1542            )?;
1543            event::emit_feedback_metrics(managed_envelope.envelope());
1544        }
1545
1546        attachment::scrub(managed_envelope, ctx.project_info);
1547
1548        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1549            relay_log::error!(
1550                tags.project = %project_id,
1551                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1552                "ingested event without normalizing"
1553            );
1554        }
1555
1556        Ok(Some(extracted_metrics))
1557    }
1558
1559    #[allow(unused_assignments)]
1561    #[allow(clippy::too_many_arguments)]
1562    async fn process_transactions(
1563        &self,
1564        managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1565        cogs: &mut Token,
1566        project_id: ProjectId,
1567        mut ctx: processing::Context<'_>,
1568        reservoir_counters: &ReservoirCounters,
1569    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1570        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1571        let mut event_metrics_extracted = EventMetricsExtracted(false);
1572        let mut spans_extracted = SpansExtracted(false);
1573        let mut metrics = Metrics::default();
1574        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1575
1576        let extraction_result = event::extract(
1578            managed_envelope,
1579            &mut metrics,
1580            event_fully_normalized,
1581            &self.inner.config,
1582        )?;
1583
1584        if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1586            event_metrics_extracted = inner_event_metrics_extracted;
1587        }
1588        if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1589            spans_extracted = inner_spans_extracted;
1590        };
1591
1592        let mut event = extraction_result.event;
1594
1595        let profile_id = profile::filter(
1596            managed_envelope,
1597            &event,
1598            ctx.config,
1599            project_id,
1600            ctx.project_info,
1601        );
1602        profile::transfer_id(&mut event, profile_id);
1603
1604        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1605            managed_envelope,
1606            &mut event,
1607            ctx.project_info,
1608            ctx.sampling_project_info,
1609        );
1610
1611        let attachments = managed_envelope
1612            .envelope()
1613            .items()
1614            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1615        processing::utils::event::finalize(
1616            managed_envelope.envelope().headers(),
1617            &mut event,
1618            attachments,
1619            &mut metrics,
1620            &self.inner.config,
1621        )?;
1622
1623        event_fully_normalized = processing::utils::event::normalize(
1624            managed_envelope.envelope().headers(),
1625            &mut event,
1626            event_fully_normalized,
1627            &ctx,
1628            &self.inner.geoip_lookup,
1629        )?;
1630
1631        let filter_run = event::filter(
1632            managed_envelope,
1633            &mut event,
1634            ctx.project_info,
1635            ctx.global_config,
1636        )?;
1637
1638        let run_dynamic_sampling =
1641            matches!(filter_run, FiltersStatus::Ok) || self.inner.config.processing_enabled();
1642
1643        let reservoir = self.new_reservoir_evaluator(
1644            managed_envelope.scoping().organization_id,
1645            reservoir_counters,
1646        );
1647
1648        let sampling_result = match run_dynamic_sampling {
1649            true => {
1650                dynamic_sampling::run(
1651                    managed_envelope,
1652                    &mut event,
1653                    ctx.config,
1654                    ctx.project_info,
1655                    ctx.sampling_project_info,
1656                    &reservoir,
1657                )
1658                .await
1659            }
1660            false => SamplingResult::Pending,
1661        };
1662
1663        relay_statsd::metric!(
1664            counter(RelayCounters::SamplingDecision) += 1,
1665            decision = sampling_result.decision().as_str(),
1666            item = "transaction"
1667        );
1668
1669        #[cfg(feature = "processing")]
1670        let server_sample_rate = sampling_result.sample_rate();
1671
1672        if let Some(outcome) = sampling_result.into_dropped_outcome() {
1673            profile::process(
1676                managed_envelope,
1677                &mut event,
1678                ctx.global_config,
1679                ctx.config,
1680                ctx.project_info,
1681            );
1682            event_metrics_extracted = self.extract_transaction_metrics(
1684                managed_envelope,
1685                &mut event,
1686                &mut extracted_metrics,
1687                project_id,
1688                ctx.project_info,
1689                SamplingDecision::Drop,
1690                event_metrics_extracted,
1691                spans_extracted,
1692            )?;
1693
1694            dynamic_sampling::drop_unsampled_items(
1695                managed_envelope,
1696                event,
1697                outcome,
1698                spans_extracted,
1699            );
1700
1701            event = self
1706                .enforce_quotas(
1707                    managed_envelope,
1708                    Annotated::empty(),
1709                    &mut extracted_metrics,
1710                    ctx,
1711                )
1712                .await?;
1713
1714            return Ok(Some(extracted_metrics));
1715        }
1716
1717        let _post_ds = cogs.start_category("post_ds");
1718
1719        event::scrub(&mut event, ctx.project_info)?;
1723
1724        attachment::scrub(managed_envelope, ctx.project_info);
1726
1727        if_processing!(self.inner.config, {
1728            let profile_id = profile::process(
1730                managed_envelope,
1731                &mut event,
1732                ctx.global_config,
1733                ctx.config,
1734                ctx.project_info,
1735            );
1736            profile::transfer_id(&mut event, profile_id);
1737            profile::scrub_profiler_id(&mut event);
1738
1739            event_metrics_extracted = self.extract_transaction_metrics(
1741                managed_envelope,
1742                &mut event,
1743                &mut extracted_metrics,
1744                project_id,
1745                ctx.project_info,
1746                SamplingDecision::Keep,
1747                event_metrics_extracted,
1748                spans_extracted,
1749            )?;
1750
1751            spans_extracted = span::extract_from_event(
1752                managed_envelope,
1753                &event,
1754                ctx.global_config,
1755                ctx.config,
1756                server_sample_rate,
1757                event_metrics_extracted,
1758                spans_extracted,
1759            );
1760        });
1761
1762        event = self
1763            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1764            .await?;
1765
1766        if_processing!(self.inner.config, {
1767            event = span::maybe_discard_transaction(managed_envelope, event, ctx.project_info);
1768        });
1769
1770        if event.value().is_some() {
1772            event::serialize(
1773                managed_envelope,
1774                &mut event,
1775                event_fully_normalized,
1776                event_metrics_extracted,
1777                spans_extracted,
1778            )?;
1779        }
1780
1781        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1782            relay_log::error!(
1783                tags.project = %project_id,
1784                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1785                "ingested event without normalizing"
1786            );
1787        };
1788
1789        Ok(Some(extracted_metrics))
1790    }
1791
1792    async fn process_profile_chunks(
1793        &self,
1794        managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
1795        ctx: processing::Context<'_>,
1796    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1797        profile_chunk::filter(managed_envelope, ctx.project_info);
1798
1799        if_processing!(self.inner.config, {
1800            profile_chunk::process(
1801                managed_envelope,
1802                ctx.project_info,
1803                ctx.global_config,
1804                ctx.config,
1805            );
1806        });
1807
1808        self.enforce_quotas(
1809            managed_envelope,
1810            Annotated::empty(),
1811            &mut ProcessingExtractedMetrics::new(),
1812            ctx,
1813        )
1814        .await?;
1815
1816        Ok(None)
1817    }
1818
1819    async fn process_standalone(
1821        &self,
1822        managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1823        project_id: ProjectId,
1824        ctx: processing::Context<'_>,
1825    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1826        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1827
1828        standalone::process(managed_envelope);
1829
1830        profile::filter(
1831            managed_envelope,
1832            &Annotated::empty(),
1833            ctx.config,
1834            project_id,
1835            ctx.project_info,
1836        );
1837
1838        self.enforce_quotas(
1839            managed_envelope,
1840            Annotated::empty(),
1841            &mut extracted_metrics,
1842            ctx,
1843        )
1844        .await?;
1845
1846        report::process_user_reports(managed_envelope);
1847        attachment::scrub(managed_envelope, ctx.project_info);
1848
1849        Ok(Some(extracted_metrics))
1850    }
1851
1852    async fn process_client_reports(
1854        &self,
1855        managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1856        ctx: processing::Context<'_>,
1857    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1858        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1859
1860        self.enforce_quotas(
1861            managed_envelope,
1862            Annotated::empty(),
1863            &mut extracted_metrics,
1864            ctx,
1865        )
1866        .await?;
1867
1868        report::process_client_reports(
1869            managed_envelope,
1870            ctx.config,
1871            ctx.project_info,
1872            self.inner.addrs.outcome_aggregator.clone(),
1873        );
1874
1875        Ok(Some(extracted_metrics))
1876    }
1877
1878    async fn process_replays(
1880        &self,
1881        managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1882        ctx: processing::Context<'_>,
1883    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1884        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1885
1886        replay::process(
1887            managed_envelope,
1888            ctx.global_config,
1889            ctx.config,
1890            ctx.project_info,
1891            &self.inner.geoip_lookup,
1892        )?;
1893
1894        self.enforce_quotas(
1895            managed_envelope,
1896            Annotated::empty(),
1897            &mut extracted_metrics,
1898            ctx,
1899        )
1900        .await?;
1901
1902        Ok(Some(extracted_metrics))
1903    }
1904
1905    async fn process_nel(
1906        &self,
1907        mut managed_envelope: ManagedEnvelope,
1908        ctx: processing::Context<'_>,
1909    ) -> Result<ProcessingResult, ProcessingError> {
1910        nel::convert_to_logs(&mut managed_envelope);
1911        self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1912            .await
1913    }
1914
1915    async fn process_with_processor<P: processing::Processor>(
1916        &self,
1917        processor: &P,
1918        mut managed_envelope: ManagedEnvelope,
1919        ctx: processing::Context<'_>,
1920    ) -> Result<ProcessingResult, ProcessingError>
1921    where
1922        Outputs: From<P::Output>,
1923    {
1924        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1925            debug_assert!(
1926                false,
1927                "there must be work for the {} processor",
1928                std::any::type_name::<P>(),
1929            );
1930            return Err(ProcessingError::ProcessingGroupMismatch);
1931        };
1932
1933        managed_envelope.update();
1934        match managed_envelope.envelope().is_empty() {
1935            true => managed_envelope.accept(),
1936            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1937        }
1938
1939        processor
1940            .process(work, ctx)
1941            .await
1942            .map_err(|err| {
1943                relay_log::debug!(
1944                    error = &err as &dyn std::error::Error,
1945                    "processing pipeline failed"
1946                );
1947                ProcessingError::ProcessingFailure
1948            })
1949            .map(|o| o.map(Into::into))
1950            .map(ProcessingResult::Output)
1951    }
1952
1953    #[allow(clippy::too_many_arguments)]
1957    async fn process_standalone_spans(
1958        &self,
1959        managed_envelope: &mut TypedEnvelope<SpanGroup>,
1960        _project_id: ProjectId,
1961        ctx: processing::Context<'_>,
1962        _reservoir_counters: &ReservoirCounters,
1963    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1964        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1965
1966        span::filter(managed_envelope, ctx.config, ctx.project_info);
1967        span::convert_otel_traces_data(managed_envelope);
1968
1969        if_processing!(self.inner.config, {
1970            let reservoir = self.new_reservoir_evaluator(
1971                managed_envelope.scoping().organization_id,
1972                _reservoir_counters,
1973            );
1974
1975            span::process(
1976                managed_envelope,
1977                &mut Annotated::empty(),
1978                &mut extracted_metrics,
1979                _project_id,
1980                ctx,
1981                &self.inner.geoip_lookup,
1982                &reservoir,
1983            )
1984            .await;
1985        });
1986
1987        self.enforce_quotas(
1988            managed_envelope,
1989            Annotated::empty(),
1990            &mut extracted_metrics,
1991            ctx,
1992        )
1993        .await?;
1994
1995        Ok(Some(extracted_metrics))
1996    }
1997
1998    async fn process_envelope(
1999        &self,
2000        cogs: &mut Token,
2001        project_id: ProjectId,
2002        message: ProcessEnvelopeGrouped<'_>,
2003    ) -> Result<ProcessingResult, ProcessingError> {
2004        let ProcessEnvelopeGrouped {
2005            group,
2006            envelope: mut managed_envelope,
2007            ctx,
2008            reservoir_counters,
2009        } = message;
2010
2011        if let Some(sampling_state) = ctx.sampling_project_info {
2013            managed_envelope
2016                .envelope_mut()
2017                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
2018        }
2019
2020        if let Some(retention) = ctx.project_info.config.event_retention {
2023            managed_envelope.envelope_mut().set_retention(retention);
2024        }
2025
2026        if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
2029            managed_envelope
2030                .envelope_mut()
2031                .set_downsampled_retention(retention);
2032        }
2033
2034        managed_envelope
2039            .envelope_mut()
2040            .meta_mut()
2041            .set_project_id(project_id);
2042
2043        macro_rules! run {
2044            ($fn_name:ident $(, $args:expr)*) => {
2045                async {
2046                    let mut managed_envelope = (managed_envelope, group).try_into()?;
2047                    match self.$fn_name(&mut managed_envelope, $($args),*).await {
2048                        Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
2049                            managed_envelope: managed_envelope.into_processed(),
2050                            extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
2051                        }),
2052                        Err(error) => {
2053                            relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
2054                            if let Some(outcome) = error.to_outcome() {
2055                                managed_envelope.reject(outcome);
2056                            }
2057
2058                            return Err(error);
2059                        }
2060                    }
2061                }.await
2062            };
2063        }
2064
2065        relay_log::trace!("Processing {group} group", group = group.variant());
2066
2067        match group {
2068            ProcessingGroup::Error => run!(process_errors, project_id, ctx),
2069            ProcessingGroup::Transaction => {
2070                run!(
2071                    process_transactions,
2072                    cogs,
2073                    project_id,
2074                    ctx,
2075                    reservoir_counters
2076                )
2077            }
2078            ProcessingGroup::Session => {
2079                self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
2080                    .await
2081            }
2082            ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx),
2083            ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
2084            ProcessingGroup::Replay => {
2085                run!(process_replays, ctx)
2086            }
2087            ProcessingGroup::CheckIn => {
2088                self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
2089                    .await
2090            }
2091            ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
2092            ProcessingGroup::Log => {
2093                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
2094                    .await
2095            }
2096            ProcessingGroup::TraceMetric => {
2097                self.process_with_processor(
2098                    &self.inner.processing.trace_metrics,
2099                    managed_envelope,
2100                    ctx,
2101                )
2102                .await
2103            }
2104            ProcessingGroup::SpanV2 => {
2105                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
2106                    .await
2107            }
2108            ProcessingGroup::Span => run!(
2109                process_standalone_spans,
2110                project_id,
2111                ctx,
2112                reservoir_counters
2113            ),
2114            ProcessingGroup::ProfileChunk => {
2115                run!(process_profile_chunks, ctx)
2116            }
2117            ProcessingGroup::Metrics => {
2119                if self.inner.config.relay_mode() != RelayMode::Proxy {
2122                    relay_log::error!(
2123                        tags.project = %project_id,
2124                        items = ?managed_envelope.envelope().items().next().map(Item::ty),
2125                        "received metrics in the process_state"
2126                    );
2127                }
2128
2129                Ok(ProcessingResult::no_metrics(
2130                    managed_envelope.into_processed(),
2131                ))
2132            }
2133            ProcessingGroup::Ungrouped => {
2135                relay_log::error!(
2136                    tags.project = %project_id,
2137                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
2138                    "could not identify the processing group based on the envelope's items"
2139                );
2140
2141                Ok(ProcessingResult::no_metrics(
2142                    managed_envelope.into_processed(),
2143                ))
2144            }
2145            ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
2149                managed_envelope.into_processed(),
2150            )),
2151        }
2152    }
2153
2154    async fn process<'a>(
2160        &self,
2161        cogs: &mut Token,
2162        mut message: ProcessEnvelopeGrouped<'a>,
2163    ) -> Result<Option<Submit<'a>>, ProcessingError> {
2164        let ProcessEnvelopeGrouped {
2165            ref mut envelope,
2166            ctx,
2167            ..
2168        } = message;
2169
2170        let Some(project_id) = ctx
2177            .project_info
2178            .project_id
2179            .or_else(|| envelope.envelope().meta().project_id())
2180        else {
2181            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2182            return Err(ProcessingError::MissingProjectId);
2183        };
2184
2185        let client = envelope.envelope().meta().client().map(str::to_owned);
2186        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2187        let project_key = envelope.envelope().meta().public_key();
2188        let sampling_key = envelope
2192            .envelope()
2193            .sampling_key()
2194            .filter(|_| ctx.sampling_project_info.is_some());
2195
2196        relay_log::configure_scope(|scope| {
2199            scope.set_tag("project", project_id);
2200            if let Some(client) = client {
2201                scope.set_tag("sdk", client);
2202            }
2203            if let Some(user_agent) = user_agent {
2204                scope.set_extra("user_agent", user_agent.into());
2205            }
2206        });
2207
2208        let result = match self.process_envelope(cogs, project_id, message).await {
2209            Ok(ProcessingResult::Envelope {
2210                mut managed_envelope,
2211                extracted_metrics,
2212            }) => {
2213                managed_envelope.update();
2216
2217                let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2218                send_metrics(
2219                    extracted_metrics.metrics,
2220                    project_key,
2221                    sampling_key,
2222                    &self.inner.addrs.aggregator,
2223                );
2224
2225                let envelope_response = if managed_envelope.envelope().is_empty() {
2226                    if !has_metrics {
2227                        managed_envelope.reject(Outcome::RateLimited(None));
2229                    } else {
2230                        managed_envelope.accept();
2231                    }
2232
2233                    None
2234                } else {
2235                    Some(managed_envelope)
2236                };
2237
2238                Ok(envelope_response.map(Submit::Envelope))
2239            }
2240            Ok(ProcessingResult::Output(Output { main, metrics })) => {
2241                if let Some(metrics) = metrics {
2242                    metrics.accept(|metrics| {
2243                        send_metrics(
2244                            metrics,
2245                            project_key,
2246                            sampling_key,
2247                            &self.inner.addrs.aggregator,
2248                        );
2249                    });
2250                }
2251
2252                let ctx = ctx.to_forward();
2253                Ok(main.map(|output| Submit::Output { output, ctx }))
2254            }
2255            Err(err) => Err(err),
2256        };
2257
2258        relay_log::configure_scope(|scope| {
2259            scope.remove_tag("project");
2260            scope.remove_tag("sdk");
2261            scope.remove_tag("user_agent");
2262        });
2263
2264        result
2265    }
2266
2267    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2268        let project_key = message.envelope.envelope().meta().public_key();
2269        let wait_time = message.envelope.age();
2270        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2271
2272        cogs.cancel();
2275
2276        let scoping = message.envelope.scoping();
2277        for (group, envelope) in ProcessingGroup::split_envelope(
2278            *message.envelope.into_envelope(),
2279            &message.project_info,
2280        ) {
2281            let mut cogs = self
2282                .inner
2283                .cogs
2284                .timed(ResourceId::Relay, AppFeature::from(group));
2285
2286            let mut envelope =
2287                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2288            envelope.scope(scoping);
2289
2290            let global_config = self.inner.global_config.current();
2291
2292            let ctx = processing::Context {
2293                config: &self.inner.config,
2294                global_config: &global_config,
2295                project_info: &message.project_info,
2296                sampling_project_info: message.sampling_project_info.as_deref(),
2297                rate_limits: &message.rate_limits,
2298            };
2299
2300            let message = ProcessEnvelopeGrouped {
2301                group,
2302                envelope,
2303                ctx,
2304                reservoir_counters: &message.reservoir_counters,
2305            };
2306
2307            let result = metric!(
2308                timer(RelayTimers::EnvelopeProcessingTime),
2309                group = group.variant(),
2310                { self.process(&mut cogs, message).await }
2311            );
2312
2313            match result {
2314                Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2315                Ok(None) => {}
2316                Err(error) if error.is_unexpected() => {
2317                    relay_log::error!(
2318                        tags.project_key = %project_key,
2319                        error = &error as &dyn Error,
2320                        "error processing envelope"
2321                    )
2322                }
2323                Err(error) => {
2324                    relay_log::debug!(
2325                        tags.project_key = %project_key,
2326                        error = &error as &dyn Error,
2327                        "error processing envelope"
2328                    )
2329                }
2330            }
2331        }
2332    }
2333
2334    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2335        let ProcessMetrics {
2336            data,
2337            project_key,
2338            received_at,
2339            sent_at,
2340            source,
2341        } = message;
2342
2343        let received_timestamp =
2344            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2345
2346        let mut buckets = data.into_buckets(received_timestamp);
2347        if buckets.is_empty() {
2348            return;
2349        };
2350        cogs.update(relay_metrics::cogs::BySize(&buckets));
2351
2352        let clock_drift_processor =
2353            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2354
2355        buckets.retain_mut(|bucket| {
2356            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2357                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2358                return false;
2359            }
2360
2361            if !self::metrics::is_valid_namespace(bucket) {
2362                return false;
2363            }
2364
2365            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2366
2367            if !matches!(source, BucketSource::Internal) {
2368                bucket.metadata = BucketMetadata::new(received_timestamp);
2369            }
2370
2371            true
2372        });
2373
2374        let project = self.inner.project_cache.get(project_key);
2375
2376        let buckets = match project.state() {
2379            ProjectState::Enabled(project_info) => {
2380                let rate_limits = project.rate_limits().current_limits();
2381                self.check_buckets(project_key, project_info, &rate_limits, buckets)
2382            }
2383            _ => buckets,
2384        };
2385
2386        relay_log::trace!("merging metric buckets into the aggregator");
2387        self.inner
2388            .addrs
2389            .aggregator
2390            .send(MergeBuckets::new(project_key, buckets));
2391    }
2392
2393    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2394        let ProcessBatchedMetrics {
2395            payload,
2396            source,
2397            received_at,
2398            sent_at,
2399        } = message;
2400
2401        #[derive(serde::Deserialize)]
2402        struct Wrapper {
2403            buckets: HashMap<ProjectKey, Vec<Bucket>>,
2404        }
2405
2406        let buckets = match serde_json::from_slice(&payload) {
2407            Ok(Wrapper { buckets }) => buckets,
2408            Err(error) => {
2409                relay_log::debug!(
2410                    error = &error as &dyn Error,
2411                    "failed to parse batched metrics",
2412                );
2413                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2414                return;
2415            }
2416        };
2417
2418        for (project_key, buckets) in buckets {
2419            self.handle_process_metrics(
2420                cogs,
2421                ProcessMetrics {
2422                    data: MetricData::Parsed(buckets),
2423                    project_key,
2424                    source,
2425                    received_at,
2426                    sent_at,
2427                },
2428            )
2429        }
2430    }
2431
2432    fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2433        let _submit = cogs.start_category("submit");
2434
2435        #[cfg(feature = "processing")]
2436        if self.inner.config.processing_enabled()
2437            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2438        {
2439            match submit {
2440                Submit::Envelope(envelope) => store_forwarder.send(StoreEnvelope { envelope }),
2441                Submit::Output { output, ctx } => output
2442                    .forward_store(store_forwarder, ctx)
2443                    .unwrap_or_else(|err| err.into_inner()),
2444            }
2445            return;
2446        }
2447
2448        let mut envelope = match submit {
2449            Submit::Envelope(envelope) => envelope,
2450            Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2451                Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2452                Err(_) => {
2453                    relay_log::error!("failed to serialize output to an envelope");
2454                    return;
2455                }
2456            },
2457        };
2458
2459        if envelope.envelope_mut().is_empty() {
2460            envelope.accept();
2461            return;
2462        }
2463
2464        envelope.envelope_mut().set_sent_at(Utc::now());
2470
2471        relay_log::trace!("sending envelope to sentry endpoint");
2472        let http_encoding = self.inner.config.http_encoding();
2473        let result = envelope.envelope().to_vec().and_then(|v| {
2474            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2475        });
2476
2477        match result {
2478            Ok(body) => {
2479                self.inner
2480                    .addrs
2481                    .upstream_relay
2482                    .send(SendRequest(SendEnvelope {
2483                        envelope,
2484                        body,
2485                        http_encoding,
2486                        project_cache: self.inner.project_cache.clone(),
2487                    }));
2488            }
2489            Err(error) => {
2490                relay_log::error!(
2493                    error = &error as &dyn Error,
2494                    tags.project_key = %envelope.scoping().project_key,
2495                    "failed to serialize envelope payload"
2496                );
2497
2498                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2499            }
2500        }
2501    }
2502
2503    fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2504        let SubmitClientReports {
2505            client_reports,
2506            scoping,
2507        } = message;
2508
2509        let upstream = self.inner.config.upstream_descriptor();
2510        let dsn = PartialDsn::outbound(&scoping, upstream);
2511
2512        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2513        for client_report in client_reports {
2514            match client_report.serialize() {
2515                Ok(payload) => {
2516                    let mut item = Item::new(ItemType::ClientReport);
2517                    item.set_payload(ContentType::Json, payload);
2518                    envelope.add_item(item);
2519                }
2520                Err(error) => {
2521                    relay_log::error!(
2522                        error = &error as &dyn std::error::Error,
2523                        "failed to serialize client report"
2524                    );
2525                }
2526            }
2527        }
2528
2529        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2530        self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2531    }
2532
2533    fn check_buckets(
2534        &self,
2535        project_key: ProjectKey,
2536        project_info: &ProjectInfo,
2537        rate_limits: &RateLimits,
2538        buckets: Vec<Bucket>,
2539    ) -> Vec<Bucket> {
2540        let Some(scoping) = project_info.scoping(project_key) else {
2541            relay_log::error!(
2542                tags.project_key = project_key.as_str(),
2543                "there is no scoping: dropping {} buckets",
2544                buckets.len(),
2545            );
2546            return Vec::new();
2547        };
2548
2549        let mut buckets = self::metrics::apply_project_info(
2550            buckets,
2551            &self.inner.metric_outcomes,
2552            project_info,
2553            scoping,
2554        );
2555
2556        let namespaces: BTreeSet<MetricNamespace> = buckets
2557            .iter()
2558            .filter_map(|bucket| bucket.name.try_namespace())
2559            .collect();
2560
2561        for namespace in namespaces {
2562            let limits = rate_limits.check_with_quotas(
2563                project_info.get_quotas(),
2564                scoping.item(DataCategory::MetricBucket),
2565            );
2566
2567            if limits.is_limited() {
2568                let rejected;
2569                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2570                    bucket.name.try_namespace() == Some(namespace)
2571                });
2572
2573                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2574                self.inner.metric_outcomes.track(
2575                    scoping,
2576                    &rejected,
2577                    Outcome::RateLimited(reason_code),
2578                );
2579            }
2580        }
2581
2582        let quotas = project_info.config.quotas.clone();
2583        match MetricsLimiter::create(buckets, quotas, scoping) {
2584            Ok(mut bucket_limiter) => {
2585                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2586                bucket_limiter.into_buckets()
2587            }
2588            Err(buckets) => buckets,
2589        }
2590    }
2591
2592    #[cfg(feature = "processing")]
2593    async fn rate_limit_buckets(
2594        &self,
2595        scoping: Scoping,
2596        project_info: &ProjectInfo,
2597        mut buckets: Vec<Bucket>,
2598    ) -> Vec<Bucket> {
2599        let Some(rate_limiter) = &self.inner.rate_limiter else {
2600            return buckets;
2601        };
2602
2603        let global_config = self.inner.global_config.current();
2604        let namespaces = buckets
2605            .iter()
2606            .filter_map(|bucket| bucket.name.try_namespace())
2607            .counts();
2608
2609        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2610
2611        for (namespace, quantity) in namespaces {
2612            let item_scoping = scoping.metric_bucket(namespace);
2613
2614            let limits = match rate_limiter
2615                .is_rate_limited(quotas, item_scoping, quantity, false)
2616                .await
2617            {
2618                Ok(limits) => limits,
2619                Err(err) => {
2620                    relay_log::error!(
2621                        error = &err as &dyn std::error::Error,
2622                        "failed to check redis rate limits"
2623                    );
2624                    break;
2625                }
2626            };
2627
2628            if limits.is_limited() {
2629                let rejected;
2630                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2631                    bucket.name.try_namespace() == Some(namespace)
2632                });
2633
2634                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2635                self.inner.metric_outcomes.track(
2636                    scoping,
2637                    &rejected,
2638                    Outcome::RateLimited(reason_code),
2639                );
2640
2641                self.inner
2642                    .project_cache
2643                    .get(item_scoping.scoping.project_key)
2644                    .rate_limits()
2645                    .merge(limits);
2646            }
2647        }
2648
2649        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2650            Err(buckets) => buckets,
2651            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2652        }
2653    }
2654
2655    #[cfg(feature = "processing")]
2657    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2658        relay_log::trace!("handle_rate_limit_buckets");
2659
2660        let scoping = *bucket_limiter.scoping();
2661
2662        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2663            let global_config = self.inner.global_config.current();
2664            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2665
2666            let over_accept_once = true;
2669            let mut rate_limits = RateLimits::new();
2670
2671            for category in [DataCategory::Transaction, DataCategory::Span] {
2672                let count = bucket_limiter.count(category);
2673
2674                let timer = Instant::now();
2675                let mut is_limited = false;
2676
2677                if let Some(count) = count {
2678                    match rate_limiter
2679                        .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2680                        .await
2681                    {
2682                        Ok(limits) => {
2683                            is_limited = limits.is_limited();
2684                            rate_limits.merge(limits)
2685                        }
2686                        Err(e) => relay_log::error!(error = &e as &dyn Error),
2687                    }
2688                }
2689
2690                relay_statsd::metric!(
2691                    timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2692                    category = category.name(),
2693                    limited = if is_limited { "true" } else { "false" },
2694                    count = match count {
2695                        None => "none",
2696                        Some(0) => "0",
2697                        Some(1) => "1",
2698                        Some(1..=10) => "10",
2699                        Some(1..=25) => "25",
2700                        Some(1..=50) => "50",
2701                        Some(51..=100) => "100",
2702                        Some(101..=500) => "500",
2703                        _ => "> 500",
2704                    },
2705                );
2706            }
2707
2708            if rate_limits.is_limited() {
2709                let was_enforced =
2710                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2711
2712                if was_enforced {
2713                    self.inner
2715                        .project_cache
2716                        .get(scoping.project_key)
2717                        .rate_limits()
2718                        .merge(rate_limits);
2719                }
2720            }
2721        }
2722
2723        bucket_limiter.into_buckets()
2724    }
2725
2726    #[cfg(feature = "processing")]
2728    async fn cardinality_limit_buckets(
2729        &self,
2730        scoping: Scoping,
2731        limits: &[CardinalityLimit],
2732        buckets: Vec<Bucket>,
2733    ) -> Vec<Bucket> {
2734        let global_config = self.inner.global_config.current();
2735        let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2736
2737        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2738            return buckets;
2739        }
2740
2741        let Some(ref limiter) = self.inner.cardinality_limiter else {
2742            return buckets;
2743        };
2744
2745        let scope = relay_cardinality::Scoping {
2746            organization_id: scoping.organization_id,
2747            project_id: scoping.project_id,
2748        };
2749
2750        let limits = match limiter
2751            .check_cardinality_limits(scope, limits, buckets)
2752            .await
2753        {
2754            Ok(limits) => limits,
2755            Err((buckets, error)) => {
2756                relay_log::error!(
2757                    error = &error as &dyn std::error::Error,
2758                    "cardinality limiter failed"
2759                );
2760                return buckets;
2761            }
2762        };
2763
2764        let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2765        if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2766            for limit in limits.exceeded_limits() {
2767                relay_log::with_scope(
2768                    |scope| {
2769                        scope.set_user(Some(relay_log::sentry::User {
2771                            id: Some(scoping.organization_id.to_string()),
2772                            ..Default::default()
2773                        }));
2774                    },
2775                    || {
2776                        relay_log::error!(
2777                            tags.organization_id = scoping.organization_id.value(),
2778                            tags.limit_id = limit.id,
2779                            tags.passive = limit.passive,
2780                            "Cardinality Limit"
2781                        );
2782                    },
2783                );
2784            }
2785        }
2786
2787        for (limit, reports) in limits.cardinality_reports() {
2788            for report in reports {
2789                self.inner
2790                    .metric_outcomes
2791                    .cardinality(scoping, limit, report);
2792            }
2793        }
2794
2795        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2796            return limits.into_source();
2797        }
2798
2799        let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2800
2801        for (bucket, exceeded) in rejected {
2802            self.inner.metric_outcomes.track(
2803                scoping,
2804                &[bucket],
2805                Outcome::CardinalityLimited(exceeded.id.clone()),
2806            );
2807        }
2808        accepted
2809    }
2810
2811    #[cfg(feature = "processing")]
2818    async fn encode_metrics_processing(
2819        &self,
2820        message: FlushBuckets,
2821        store_forwarder: &Addr<Store>,
2822    ) {
2823        use crate::constants::DEFAULT_EVENT_RETENTION;
2824        use crate::services::store::StoreMetrics;
2825
2826        for ProjectBuckets {
2827            buckets,
2828            scoping,
2829            project_info,
2830            ..
2831        } in message.buckets.into_values()
2832        {
2833            let buckets = self
2834                .rate_limit_buckets(scoping, &project_info, buckets)
2835                .await;
2836
2837            let limits = project_info.get_cardinality_limits();
2838            let buckets = self
2839                .cardinality_limit_buckets(scoping, limits, buckets)
2840                .await;
2841
2842            if buckets.is_empty() {
2843                continue;
2844            }
2845
2846            let retention = project_info
2847                .config
2848                .event_retention
2849                .unwrap_or(DEFAULT_EVENT_RETENTION);
2850
2851            store_forwarder.send(StoreMetrics {
2854                buckets,
2855                scoping,
2856                retention,
2857            });
2858        }
2859    }
2860
2861    fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2873        let FlushBuckets {
2874            partition_key,
2875            buckets,
2876        } = message;
2877
2878        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2879        let upstream = self.inner.config.upstream_descriptor();
2880
2881        for ProjectBuckets {
2882            buckets, scoping, ..
2883        } in buckets.values()
2884        {
2885            let dsn = PartialDsn::outbound(scoping, upstream);
2886
2887            relay_statsd::metric!(
2888                histogram(RelayHistograms::PartitionKeys) = u64::from(partition_key)
2889            );
2890
2891            let mut num_batches = 0;
2892            for batch in BucketsView::from(buckets).by_size(batch_size) {
2893                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2894
2895                let mut item = Item::new(ItemType::MetricBuckets);
2896                item.set_source_quantities(crate::metrics::extract_quantities(batch));
2897                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2898                envelope.add_item(item);
2899
2900                let mut envelope =
2901                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2902                envelope
2903                    .set_partition_key(Some(partition_key))
2904                    .scope(*scoping);
2905
2906                relay_statsd::metric!(
2907                    histogram(RelayHistograms::BucketsPerBatch) = batch.len() as u64
2908                );
2909
2910                self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2911                num_batches += 1;
2912            }
2913
2914            relay_statsd::metric!(histogram(RelayHistograms::BatchesPerPartition) = num_batches);
2915        }
2916    }
2917
2918    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2920        if partition.is_empty() {
2921            return;
2922        }
2923
2924        let (unencoded, project_info) = partition.take();
2925        let http_encoding = self.inner.config.http_encoding();
2926        let encoded = match encode_payload(&unencoded, http_encoding) {
2927            Ok(payload) => payload,
2928            Err(error) => {
2929                let error = &error as &dyn std::error::Error;
2930                relay_log::error!(error, "failed to encode metrics payload");
2931                return;
2932            }
2933        };
2934
2935        let request = SendMetricsRequest {
2936            partition_key: partition_key.to_string(),
2937            unencoded,
2938            encoded,
2939            project_info,
2940            http_encoding,
2941            metric_outcomes: self.inner.metric_outcomes.clone(),
2942        };
2943
2944        self.inner.addrs.upstream_relay.send(SendRequest(request));
2945    }
2946
2947    fn encode_metrics_global(&self, message: FlushBuckets) {
2962        let FlushBuckets {
2963            partition_key,
2964            buckets,
2965        } = message;
2966
2967        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2968        let mut partition = Partition::new(batch_size);
2969        let mut partition_splits = 0;
2970
2971        for ProjectBuckets {
2972            buckets, scoping, ..
2973        } in buckets.values()
2974        {
2975            for bucket in buckets {
2976                let mut remaining = Some(BucketView::new(bucket));
2977
2978                while let Some(bucket) = remaining.take() {
2979                    if let Some(next) = partition.insert(bucket, *scoping) {
2980                        self.send_global_partition(partition_key, &mut partition);
2984                        remaining = Some(next);
2985                        partition_splits += 1;
2986                    }
2987                }
2988            }
2989        }
2990
2991        if partition_splits > 0 {
2992            metric!(histogram(RelayHistograms::PartitionSplits) = partition_splits);
2993        }
2994
2995        self.send_global_partition(partition_key, &mut partition);
2996    }
2997
2998    async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2999        for (project_key, pb) in message.buckets.iter_mut() {
3000            let buckets = std::mem::take(&mut pb.buckets);
3001            pb.buckets =
3002                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
3003        }
3004
3005        #[cfg(feature = "processing")]
3006        if self.inner.config.processing_enabled()
3007            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
3008        {
3009            return self
3010                .encode_metrics_processing(message, store_forwarder)
3011                .await;
3012        }
3013
3014        if self.inner.config.http_global_metrics() {
3015            self.encode_metrics_global(message)
3016        } else {
3017            self.encode_metrics_envelope(cogs, message)
3018        }
3019    }
3020
3021    #[cfg(all(test, feature = "processing"))]
3022    fn redis_rate_limiter_enabled(&self) -> bool {
3023        self.inner.rate_limiter.is_some()
3024    }
3025
3026    async fn handle_message(&self, message: EnvelopeProcessor) {
3027        let ty = message.variant();
3028        let feature_weights = self.feature_weights(&message);
3029
3030        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
3031            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
3032
3033            match message {
3034                EnvelopeProcessor::ProcessEnvelope(m) => {
3035                    self.handle_process_envelope(&mut cogs, *m).await
3036                }
3037                EnvelopeProcessor::ProcessProjectMetrics(m) => {
3038                    self.handle_process_metrics(&mut cogs, *m)
3039                }
3040                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
3041                    self.handle_process_batched_metrics(&mut cogs, *m)
3042                }
3043                EnvelopeProcessor::FlushBuckets(m) => {
3044                    self.handle_flush_buckets(&mut cogs, *m).await
3045                }
3046                EnvelopeProcessor::SubmitClientReports(m) => {
3047                    self.handle_submit_client_reports(&mut cogs, *m)
3048                }
3049            }
3050        });
3051    }
3052
3053    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
3054        match message {
3055            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
3057            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
3058            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
3059            EnvelopeProcessor::FlushBuckets(v) => v
3060                .buckets
3061                .values()
3062                .map(|s| {
3063                    if self.inner.config.processing_enabled() {
3064                        relay_metrics::cogs::ByCount(&s.buckets).into()
3067                    } else {
3068                        relay_metrics::cogs::BySize(&s.buckets).into()
3069                    }
3070                })
3071                .fold(FeatureWeights::none(), FeatureWeights::merge),
3072            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
3073        }
3074    }
3075
3076    fn new_reservoir_evaluator(
3077        &self,
3078        _organization_id: OrganizationId,
3079        reservoir_counters: &ReservoirCounters,
3080    ) -> ReservoirEvaluator<'_> {
3081        #[cfg_attr(not(feature = "processing"), expect(unused_mut))]
3082        let mut reservoir = ReservoirEvaluator::new(Arc::clone(reservoir_counters));
3083
3084        #[cfg(feature = "processing")]
3085        if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
3086            reservoir.set_redis(_organization_id, quotas_client);
3087        }
3088
3089        reservoir
3090    }
3091}
3092
3093impl Service for EnvelopeProcessorService {
3094    type Interface = EnvelopeProcessor;
3095
3096    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
3097        while let Some(message) = rx.recv().await {
3098            let service = self.clone();
3099            self.inner
3100                .pool
3101                .spawn_async(
3102                    async move {
3103                        service.handle_message(message).await;
3104                    }
3105                    .boxed(),
3106                )
3107                .await;
3108        }
3109    }
3110}
3111
3112struct EnforcementResult {
3117    event: Annotated<Event>,
3118    #[cfg_attr(not(feature = "processing"), expect(dead_code))]
3119    rate_limits: RateLimits,
3120}
3121
3122impl EnforcementResult {
3123    pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
3125        Self { event, rate_limits }
3126    }
3127}
3128
3129#[derive(Clone)]
3130enum RateLimiter {
3131    Cached,
3132    #[cfg(feature = "processing")]
3133    Consistent(Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>),
3134}
3135
3136impl RateLimiter {
3137    async fn enforce<Group>(
3138        &self,
3139        managed_envelope: &mut TypedEnvelope<Group>,
3140        event: Annotated<Event>,
3141        _extracted_metrics: &mut ProcessingExtractedMetrics,
3142        ctx: processing::Context<'_>,
3143    ) -> Result<EnforcementResult, ProcessingError> {
3144        if managed_envelope.envelope().is_empty() && event.value().is_none() {
3145            return Ok(EnforcementResult::new(event, RateLimits::default()));
3146        }
3147
3148        let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
3149        if quotas.is_empty() {
3150            return Ok(EnforcementResult::new(event, RateLimits::default()));
3151        }
3152
3153        let event_category = event_category(&event);
3154
3155        let this = self.clone();
3161        let mut envelope_limiter =
3162            EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
3163                let this = this.clone();
3164
3165                async move {
3166                    match this {
3167                        #[cfg(feature = "processing")]
3168                        RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
3169                            rate_limiter
3170                                .is_rate_limited(quotas, item_scope, _quantity, false)
3171                                .await?,
3172                        ),
3173                        _ => Ok::<_, ProcessingError>(
3174                            ctx.rate_limits.check_with_quotas(quotas, item_scope),
3175                        ),
3176                    }
3177                }
3178            });
3179
3180        if let Some(category) = event_category {
3183            envelope_limiter.assume_event(category);
3184        }
3185
3186        let scoping = managed_envelope.scoping();
3187        let (enforcement, rate_limits) =
3188            metric!(timer(RelayTimers::EventProcessingRateLimiting), {
3189                envelope_limiter
3190                    .compute(managed_envelope.envelope_mut(), &scoping)
3191                    .await
3192            })?;
3193        let event_active = enforcement.is_event_active();
3194
3195        #[cfg(feature = "processing")]
3199        _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3200        enforcement.apply_with_outcomes(managed_envelope);
3201
3202        if event_active {
3203            debug_assert!(managed_envelope.envelope().is_empty());
3204            return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3205        }
3206
3207        Ok(EnforcementResult::new(event, rate_limits))
3208    }
3209}
3210
3211pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3212    let envelope_body: Vec<u8> = match http_encoding {
3213        HttpEncoding::Identity => return Ok(body.clone()),
3214        HttpEncoding::Deflate => {
3215            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3216            encoder.write_all(body.as_ref())?;
3217            encoder.finish()?
3218        }
3219        HttpEncoding::Gzip => {
3220            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3221            encoder.write_all(body.as_ref())?;
3222            encoder.finish()?
3223        }
3224        HttpEncoding::Br => {
3225            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3227            encoder.write_all(body.as_ref())?;
3228            encoder.into_inner()
3229        }
3230        HttpEncoding::Zstd => {
3231            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3234            encoder.write_all(body.as_ref())?;
3235            encoder.finish()?
3236        }
3237    };
3238
3239    Ok(envelope_body.into())
3240}
3241
3242#[derive(Debug)]
3244pub struct SendEnvelope {
3245    pub envelope: TypedEnvelope<Processed>,
3246    pub body: Bytes,
3247    pub http_encoding: HttpEncoding,
3248    pub project_cache: ProjectCacheHandle,
3249}
3250
3251impl UpstreamRequest for SendEnvelope {
3252    fn method(&self) -> reqwest::Method {
3253        reqwest::Method::POST
3254    }
3255
3256    fn path(&self) -> Cow<'_, str> {
3257        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3258    }
3259
3260    fn route(&self) -> &'static str {
3261        "envelope"
3262    }
3263
3264    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3265        let envelope_body = self.body.clone();
3266        metric!(histogram(RelayHistograms::UpstreamEnvelopeBodySize) = envelope_body.len() as u64);
3267
3268        let meta = &self.envelope.meta();
3269        let shard = self.envelope.partition_key().map(|p| p.to_string());
3270        builder
3271            .content_encoding(self.http_encoding)
3272            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3273            .header_opt("User-Agent", meta.user_agent())
3274            .header("X-Sentry-Auth", meta.auth_header())
3275            .header("X-Forwarded-For", meta.forwarded_for())
3276            .header("Content-Type", envelope::CONTENT_TYPE)
3277            .header_opt("X-Sentry-Relay-Shard", shard)
3278            .body(envelope_body);
3279
3280        Ok(())
3281    }
3282
3283    fn sign(&mut self) -> Option<Sign> {
3284        Some(Sign::Optional(SignatureType::RequestSign))
3285    }
3286
3287    fn respond(
3288        self: Box<Self>,
3289        result: Result<http::Response, UpstreamRequestError>,
3290    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3291        Box::pin(async move {
3292            let result = match result {
3293                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3294                Err(error) => Err(error),
3295            };
3296
3297            match result {
3298                Ok(()) => self.envelope.accept(),
3299                Err(error) if error.is_received() => {
3300                    let scoping = self.envelope.scoping();
3301                    self.envelope.accept();
3302
3303                    if let UpstreamRequestError::RateLimited(limits) = error {
3304                        self.project_cache
3305                            .get(scoping.project_key)
3306                            .rate_limits()
3307                            .merge(limits.scope(&scoping));
3308                    }
3309                }
3310                Err(error) => {
3311                    let mut envelope = self.envelope;
3314                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3315                    relay_log::error!(
3316                        error = &error as &dyn Error,
3317                        tags.project_key = %envelope.scoping().project_key,
3318                        "error sending envelope"
3319                    );
3320                }
3321            }
3322        })
3323    }
3324}
3325
3326#[derive(Debug)]
3333struct Partition<'a> {
3334    max_size: usize,
3335    remaining: usize,
3336    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3337    project_info: HashMap<ProjectKey, Scoping>,
3338}
3339
3340impl<'a> Partition<'a> {
3341    pub fn new(size: usize) -> Self {
3343        Self {
3344            max_size: size,
3345            remaining: size,
3346            views: HashMap::new(),
3347            project_info: HashMap::new(),
3348        }
3349    }
3350
3351    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3362        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3363
3364        if let Some(current) = current {
3365            self.remaining = self.remaining.saturating_sub(current.estimated_size());
3366            self.views
3367                .entry(scoping.project_key)
3368                .or_default()
3369                .push(current);
3370
3371            self.project_info
3372                .entry(scoping.project_key)
3373                .or_insert(scoping);
3374        }
3375
3376        next
3377    }
3378
3379    fn is_empty(&self) -> bool {
3381        self.views.is_empty()
3382    }
3383
3384    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3388        #[derive(serde::Serialize)]
3389        struct Wrapper<'a> {
3390            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3391        }
3392
3393        let buckets = &self.views;
3394        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3395
3396        let scopings = self.project_info.clone();
3397        self.project_info.clear();
3398
3399        self.views.clear();
3400        self.remaining = self.max_size;
3401
3402        (payload, scopings)
3403    }
3404}
3405
3406#[derive(Debug)]
3410struct SendMetricsRequest {
3411    partition_key: String,
3413    unencoded: Bytes,
3415    encoded: Bytes,
3417    project_info: HashMap<ProjectKey, Scoping>,
3421    http_encoding: HttpEncoding,
3423    metric_outcomes: MetricOutcomes,
3425}
3426
3427impl SendMetricsRequest {
3428    fn create_error_outcomes(self) {
3429        #[derive(serde::Deserialize)]
3430        struct Wrapper {
3431            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3432        }
3433
3434        let buckets = match serde_json::from_slice(&self.unencoded) {
3435            Ok(Wrapper { buckets }) => buckets,
3436            Err(err) => {
3437                relay_log::error!(
3438                    error = &err as &dyn std::error::Error,
3439                    "failed to parse buckets from failed transmission"
3440                );
3441                return;
3442            }
3443        };
3444
3445        for (key, buckets) in buckets {
3446            let Some(&scoping) = self.project_info.get(&key) else {
3447                relay_log::error!("missing scoping for project key");
3448                continue;
3449            };
3450
3451            self.metric_outcomes.track(
3452                scoping,
3453                &buckets,
3454                Outcome::Invalid(DiscardReason::Internal),
3455            );
3456        }
3457    }
3458}
3459
3460impl UpstreamRequest for SendMetricsRequest {
3461    fn set_relay_id(&self) -> bool {
3462        true
3463    }
3464
3465    fn sign(&mut self) -> Option<Sign> {
3466        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3467    }
3468
3469    fn method(&self) -> reqwest::Method {
3470        reqwest::Method::POST
3471    }
3472
3473    fn path(&self) -> Cow<'_, str> {
3474        "/api/0/relays/metrics/".into()
3475    }
3476
3477    fn route(&self) -> &'static str {
3478        "global_metrics"
3479    }
3480
3481    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3482        metric!(histogram(RelayHistograms::UpstreamMetricsBodySize) = self.encoded.len() as u64);
3483
3484        builder
3485            .content_encoding(self.http_encoding)
3486            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3487            .header(header::CONTENT_TYPE, b"application/json")
3488            .body(self.encoded.clone());
3489
3490        Ok(())
3491    }
3492
3493    fn respond(
3494        self: Box<Self>,
3495        result: Result<http::Response, UpstreamRequestError>,
3496    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3497        Box::pin(async {
3498            match result {
3499                Ok(mut response) => {
3500                    response.consume().await.ok();
3501                }
3502                Err(error) => {
3503                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3504
3505                    if error.is_received() {
3508                        return;
3509                    }
3510
3511                    self.create_error_outcomes()
3512                }
3513            }
3514        })
3515    }
3516}
3517
3518#[derive(Copy, Clone, Debug)]
3520struct CombinedQuotas<'a> {
3521    global_quotas: &'a [Quota],
3522    project_quotas: &'a [Quota],
3523}
3524
3525impl<'a> CombinedQuotas<'a> {
3526    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3528        Self {
3529            global_quotas: &global_config.quotas,
3530            project_quotas,
3531        }
3532    }
3533
3534    pub fn is_empty(&self) -> bool {
3536        self.len() == 0
3537    }
3538
3539    pub fn len(&self) -> usize {
3541        self.global_quotas.len() + self.project_quotas.len()
3542    }
3543}
3544
3545impl<'a> IntoIterator for CombinedQuotas<'a> {
3546    type Item = &'a Quota;
3547    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3548
3549    fn into_iter(self) -> Self::IntoIter {
3550        self.global_quotas.iter().chain(self.project_quotas.iter())
3551    }
3552}
3553
3554#[cfg(test)]
3555mod tests {
3556    use std::collections::BTreeMap;
3557    use std::env;
3558
3559    use insta::assert_debug_snapshot;
3560    use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3561    use relay_common::glob2::LazyGlob;
3562    use relay_dynamic_config::ProjectConfig;
3563    use relay_event_normalization::{
3564        MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3565        TransactionNameRule,
3566    };
3567    use relay_event_schema::protocol::TransactionSource;
3568    use relay_pii::DataScrubbingConfig;
3569    use similar_asserts::assert_eq;
3570
3571    use crate::metrics_extraction::IntoMetric;
3572    use crate::metrics_extraction::transactions::types::{
3573        CommonTags, TransactionMeasurementTags, TransactionMetric,
3574    };
3575    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3576
3577    #[cfg(feature = "processing")]
3578    use {
3579        relay_metrics::BucketValue,
3580        relay_quotas::{QuotaScope, ReasonCode},
3581        relay_test::mock_service,
3582    };
3583
3584    use super::*;
3585
3586    #[cfg(feature = "processing")]
3587    fn mock_quota(id: &str) -> Quota {
3588        Quota {
3589            id: Some(id.into()),
3590            categories: smallvec::smallvec![DataCategory::MetricBucket],
3591            scope: QuotaScope::Organization,
3592            scope_id: None,
3593            limit: Some(0),
3594            window: None,
3595            reason_code: None,
3596            namespace: None,
3597        }
3598    }
3599
3600    #[cfg(feature = "processing")]
3601    #[test]
3602    fn test_dynamic_quotas() {
3603        let global_config = GlobalConfig {
3604            quotas: vec![mock_quota("foo"), mock_quota("bar")],
3605            ..Default::default()
3606        };
3607
3608        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3609
3610        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3611
3612        assert_eq!(dynamic_quotas.len(), 4);
3613        assert!(!dynamic_quotas.is_empty());
3614
3615        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3616        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3617    }
3618
3619    #[cfg(feature = "processing")]
3622    #[tokio::test]
3623    async fn test_ratelimit_per_batch() {
3624        use relay_base_schema::organization::OrganizationId;
3625        use relay_protocol::FiniteF64;
3626
3627        let rate_limited_org = Scoping {
3628            organization_id: OrganizationId::new(1),
3629            project_id: ProjectId::new(21),
3630            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3631            key_id: Some(17),
3632        };
3633
3634        let not_rate_limited_org = Scoping {
3635            organization_id: OrganizationId::new(2),
3636            project_id: ProjectId::new(21),
3637            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3638            key_id: Some(17),
3639        };
3640
3641        let message = {
3642            let project_info = {
3643                let quota = Quota {
3644                    id: Some("testing".into()),
3645                    categories: vec![DataCategory::MetricBucket].into(),
3646                    scope: relay_quotas::QuotaScope::Organization,
3647                    scope_id: Some(rate_limited_org.organization_id.to_string()),
3648                    limit: Some(0),
3649                    window: None,
3650                    reason_code: Some(ReasonCode::new("test")),
3651                    namespace: None,
3652                };
3653
3654                let mut config = ProjectConfig::default();
3655                config.quotas.push(quota);
3656
3657                Arc::new(ProjectInfo {
3658                    config,
3659                    ..Default::default()
3660                })
3661            };
3662
3663            let project_metrics = |scoping| ProjectBuckets {
3664                buckets: vec![Bucket {
3665                    name: "d:transactions/bar".into(),
3666                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3667                    timestamp: UnixTimestamp::now(),
3668                    tags: Default::default(),
3669                    width: 10,
3670                    metadata: BucketMetadata::default(),
3671                }],
3672                rate_limits: Default::default(),
3673                project_info: project_info.clone(),
3674                scoping,
3675            };
3676
3677            let buckets = hashbrown::HashMap::from([
3678                (
3679                    rate_limited_org.project_key,
3680                    project_metrics(rate_limited_org),
3681                ),
3682                (
3683                    not_rate_limited_org.project_key,
3684                    project_metrics(not_rate_limited_org),
3685                ),
3686            ]);
3687
3688            FlushBuckets {
3689                partition_key: 0,
3690                buckets,
3691            }
3692        };
3693
3694        assert_eq!(message.buckets.keys().count(), 2);
3696
3697        let config = {
3698            let config_json = serde_json::json!({
3699                "processing": {
3700                    "enabled": true,
3701                    "kafka_config": [],
3702                    "redis": {
3703                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3704                    }
3705                }
3706            });
3707            Config::from_json_value(config_json).unwrap()
3708        };
3709
3710        let (store, handle) = {
3711            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3712                let org_id = match msg {
3713                    Store::Metrics(x) => x.scoping.organization_id,
3714                    _ => panic!("received envelope when expecting only metrics"),
3715                };
3716                org_ids.push(org_id);
3717            };
3718
3719            mock_service("store_forwarder", vec![], f)
3720        };
3721
3722        let processor = create_test_processor(config).await;
3723        assert!(processor.redis_rate_limiter_enabled());
3724
3725        processor.encode_metrics_processing(message, &store).await;
3726
3727        drop(store);
3728        let orgs_not_ratelimited = handle.await.unwrap();
3729
3730        assert_eq!(
3731            orgs_not_ratelimited,
3732            vec![not_rate_limited_org.organization_id]
3733        );
3734    }
3735
3736    #[tokio::test]
3737    async fn test_browser_version_extraction_with_pii_like_data() {
3738        let processor = create_test_processor(Default::default()).await;
3739        let outcome_aggregator = Addr::dummy();
3740        let event_id = EventId::new();
3741
3742        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3743            .parse()
3744            .unwrap();
3745
3746        let request_meta = RequestMeta::new(dsn);
3747        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3748
3749        envelope.add_item({
3750                let mut item = Item::new(ItemType::Event);
3751                item.set_payload(
3752                    ContentType::Json,
3753                    r#"
3754                    {
3755                        "request": {
3756                            "headers": [
3757                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3758                            ]
3759                        }
3760                    }
3761                "#,
3762                );
3763                item
3764            });
3765
3766        let mut datascrubbing_settings = DataScrubbingConfig::default();
3767        datascrubbing_settings.scrub_data = true;
3769        datascrubbing_settings.scrub_defaults = true;
3770        datascrubbing_settings.scrub_ip_addresses = true;
3771
3772        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3774
3775        let config = ProjectConfig {
3776            datascrubbing_settings,
3777            pii_config: Some(pii_config),
3778            ..Default::default()
3779        };
3780
3781        let project_info = ProjectInfo {
3782            config,
3783            ..Default::default()
3784        };
3785
3786        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3787        assert_eq!(envelopes.len(), 1);
3788
3789        let (group, envelope) = envelopes.pop().unwrap();
3790        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3791
3792        let message = ProcessEnvelopeGrouped {
3793            group,
3794            envelope,
3795            ctx: processing::Context {
3796                project_info: &project_info,
3797                ..processing::Context::for_test()
3798            },
3799            reservoir_counters: &ReservoirCounters::default(),
3800        };
3801
3802        let Ok(Some(Submit::Envelope(mut new_envelope))) =
3803            processor.process(&mut Token::noop(), message).await
3804        else {
3805            panic!();
3806        };
3807        let new_envelope = new_envelope.envelope_mut();
3808
3809        let event_item = new_envelope.items().last().unwrap();
3810        let annotated_event: Annotated<Event> =
3811            Annotated::from_json_bytes(&event_item.payload()).unwrap();
3812        let event = annotated_event.into_value().unwrap();
3813        let headers = event
3814            .request
3815            .into_value()
3816            .unwrap()
3817            .headers
3818            .into_value()
3819            .unwrap();
3820
3821        assert_eq!(
3823            Some(
3824                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3825            ),
3826            headers.get_header("User-Agent")
3827        );
3828        let contexts = event.contexts.into_value().unwrap();
3830        let browser = contexts.0.get("browser").unwrap();
3831        assert_eq!(
3832            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3833            browser.to_json().unwrap()
3834        );
3835    }
3836
3837    #[tokio::test]
3838    #[cfg(feature = "processing")]
3839    async fn test_materialize_dsc() {
3840        use crate::services::projects::project::PublicKeyConfig;
3841
3842        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3843            .parse()
3844            .unwrap();
3845        let request_meta = RequestMeta::new(dsn);
3846        let mut envelope = Envelope::from_request(None, request_meta);
3847
3848        let dsc = r#"{
3849            "trace_id": "00000000-0000-0000-0000-000000000000",
3850            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3851            "sample_rate": "0.2"
3852        }"#;
3853        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3854
3855        let mut item = Item::new(ItemType::Event);
3856        item.set_payload(ContentType::Json, r#"{}"#);
3857        envelope.add_item(item);
3858
3859        let outcome_aggregator = Addr::dummy();
3860        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3861
3862        let mut project_info = ProjectInfo::default();
3863        project_info.public_keys.push(PublicKeyConfig {
3864            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3865            numeric_id: Some(1),
3866        });
3867
3868        let config = serde_json::json!({
3869            "processing": {
3870                "enabled": true,
3871                "kafka_config": [],
3872            }
3873        });
3874
3875        let message = ProcessEnvelopeGrouped {
3876            group: ProcessingGroup::Transaction,
3877            envelope: managed_envelope,
3878            ctx: processing::Context {
3879                config: &Config::from_json_value(config.clone()).unwrap(),
3880                project_info: &project_info,
3881                sampling_project_info: Some(&project_info),
3882                ..processing::Context::for_test()
3883            },
3884            reservoir_counters: &ReservoirCounters::default(),
3885        };
3886
3887        let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3888        let Ok(Some(Submit::Envelope(envelope))) =
3889            processor.process(&mut Token::noop(), message).await
3890        else {
3891            panic!();
3892        };
3893        let event = envelope
3894            .envelope()
3895            .get_item_by(|item| item.ty() == &ItemType::Event)
3896            .unwrap();
3897
3898        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3899        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3900        Object(
3901            {
3902                "environment": ~,
3903                "public_key": String(
3904                    "e12d836b15bb49d7bbf99e64295d995b",
3905                ),
3906                "release": ~,
3907                "replay_id": ~,
3908                "sample_rate": String(
3909                    "0.2",
3910                ),
3911                "trace_id": String(
3912                    "00000000000000000000000000000000",
3913                ),
3914                "transaction": ~,
3915            },
3916        )
3917        "###);
3918    }
3919
3920    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3921        let mut event = Annotated::<Event>::from_json(
3922            r#"
3923            {
3924                "type": "transaction",
3925                "transaction": "/foo/",
3926                "timestamp": 946684810.0,
3927                "start_timestamp": 946684800.0,
3928                "contexts": {
3929                    "trace": {
3930                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3931                        "span_id": "fa90fdead5f74053",
3932                        "op": "http.server",
3933                        "type": "trace"
3934                    }
3935                },
3936                "transaction_info": {
3937                    "source": "url"
3938                }
3939            }
3940            "#,
3941        )
3942        .unwrap();
3943        let e = event.value_mut().as_mut().unwrap();
3944        e.transaction.set_value(Some(transaction_name.into()));
3945
3946        e.transaction_info
3947            .value_mut()
3948            .as_mut()
3949            .unwrap()
3950            .source
3951            .set_value(Some(source));
3952
3953        relay_statsd::with_capturing_test_client(|| {
3954            utils::log_transaction_name_metrics(&mut event, |event| {
3955                let config = NormalizationConfig {
3956                    transaction_name_config: TransactionNameConfig {
3957                        rules: &[TransactionNameRule {
3958                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
3959                            expiry: DateTime::<Utc>::MAX_UTC,
3960                            redaction: RedactionRule::Replace {
3961                                substitution: "*".to_owned(),
3962                            },
3963                        }],
3964                    },
3965                    ..Default::default()
3966                };
3967                relay_event_normalization::normalize_event(event, &config)
3968            });
3969        })
3970    }
3971
3972    #[test]
3973    fn test_log_transaction_metrics_none() {
3974        let captures = capture_test_event("/nothing", TransactionSource::Url);
3975        insta::assert_debug_snapshot!(captures, @r###"
3976        [
3977            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3978        ]
3979        "###);
3980    }
3981
3982    #[test]
3983    fn test_log_transaction_metrics_rule() {
3984        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3985        insta::assert_debug_snapshot!(captures, @r###"
3986        [
3987            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3988        ]
3989        "###);
3990    }
3991
3992    #[test]
3993    fn test_log_transaction_metrics_pattern() {
3994        let captures = capture_test_event("/something/12345", TransactionSource::Url);
3995        insta::assert_debug_snapshot!(captures, @r###"
3996        [
3997            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3998        ]
3999        "###);
4000    }
4001
4002    #[test]
4003    fn test_log_transaction_metrics_both() {
4004        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
4005        insta::assert_debug_snapshot!(captures, @r###"
4006        [
4007            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
4008        ]
4009        "###);
4010    }
4011
4012    #[test]
4013    fn test_log_transaction_metrics_no_match() {
4014        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
4015        insta::assert_debug_snapshot!(captures, @r###"
4016        [
4017            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
4018        ]
4019        "###);
4020    }
4021
4022    #[test]
4026    fn test_mri_overhead_constant() {
4027        let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
4028
4029        let derived_value = {
4030            let name = "foobar".to_owned();
4031            let value = 5.into(); let unit = MetricUnit::Duration(DurationUnit::default());
4033            let tags = TransactionMeasurementTags {
4034                measurement_rating: None,
4035                universal_tags: CommonTags(BTreeMap::new()),
4036                score_profile_version: None,
4037            };
4038
4039            let measurement = TransactionMetric::Measurement {
4040                name: name.clone(),
4041                value,
4042                unit,
4043                tags,
4044            };
4045
4046            let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
4047            metric.name.len() - unit.to_string().len() - name.len()
4048        };
4049        assert_eq!(
4050            hardcoded_value, derived_value,
4051            "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
4052        );
4053    }
4054
4055    #[tokio::test]
4056    async fn test_process_metrics_bucket_metadata() {
4057        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
4058        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
4059        let received_at = Utc::now();
4060        let config = Config::default();
4061
4062        let (aggregator, mut aggregator_rx) = Addr::custom();
4063        let processor = create_test_processor_with_addrs(
4064            config,
4065            Addrs {
4066                aggregator,
4067                ..Default::default()
4068            },
4069        )
4070        .await;
4071
4072        let mut item = Item::new(ItemType::Statsd);
4073        item.set_payload(
4074            ContentType::Text,
4075            "transactions/foo:3182887624:4267882815|s",
4076        );
4077        for (source, expected_received_at) in [
4078            (
4079                BucketSource::External,
4080                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
4081            ),
4082            (BucketSource::Internal, None),
4083        ] {
4084            let message = ProcessMetrics {
4085                data: MetricData::Raw(vec![item.clone()]),
4086                project_key,
4087                source,
4088                received_at,
4089                sent_at: Some(Utc::now()),
4090            };
4091            processor.handle_process_metrics(&mut token, message);
4092
4093            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
4094            let buckets = merge_buckets.buckets;
4095            assert_eq!(buckets.len(), 1);
4096            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
4097        }
4098    }
4099
4100    #[tokio::test]
4101    async fn test_process_batched_metrics() {
4102        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
4103        let received_at = Utc::now();
4104        let config = Config::default();
4105
4106        let (aggregator, mut aggregator_rx) = Addr::custom();
4107        let processor = create_test_processor_with_addrs(
4108            config,
4109            Addrs {
4110                aggregator,
4111                ..Default::default()
4112            },
4113        )
4114        .await;
4115
4116        let payload = r#"{
4117    "buckets": {
4118        "11111111111111111111111111111111": [
4119            {
4120                "timestamp": 1615889440,
4121                "width": 0,
4122                "name": "d:custom/endpoint.response_time@millisecond",
4123                "type": "d",
4124                "value": [
4125                  68.0
4126                ],
4127                "tags": {
4128                  "route": "user_index"
4129                }
4130            }
4131        ],
4132        "22222222222222222222222222222222": [
4133            {
4134                "timestamp": 1615889440,
4135                "width": 0,
4136                "name": "d:custom/endpoint.cache_rate@none",
4137                "type": "d",
4138                "value": [
4139                  36.0
4140                ]
4141            }
4142        ]
4143    }
4144}
4145"#;
4146        let message = ProcessBatchedMetrics {
4147            payload: Bytes::from(payload),
4148            source: BucketSource::Internal,
4149            received_at,
4150            sent_at: Some(Utc::now()),
4151        };
4152        processor.handle_process_batched_metrics(&mut token, message);
4153
4154        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
4155        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
4156
4157        let mut messages = vec![mb1, mb2];
4158        messages.sort_by_key(|mb| mb.project_key);
4159
4160        let actual = messages
4161            .into_iter()
4162            .map(|mb| (mb.project_key, mb.buckets))
4163            .collect::<Vec<_>>();
4164
4165        assert_debug_snapshot!(actual, @r###"
4166        [
4167            (
4168                ProjectKey("11111111111111111111111111111111"),
4169                [
4170                    Bucket {
4171                        timestamp: UnixTimestamp(1615889440),
4172                        width: 0,
4173                        name: MetricName(
4174                            "d:custom/endpoint.response_time@millisecond",
4175                        ),
4176                        value: Distribution(
4177                            [
4178                                68.0,
4179                            ],
4180                        ),
4181                        tags: {
4182                            "route": "user_index",
4183                        },
4184                        metadata: BucketMetadata {
4185                            merges: 1,
4186                            received_at: None,
4187                            extracted_from_indexed: false,
4188                        },
4189                    },
4190                ],
4191            ),
4192            (
4193                ProjectKey("22222222222222222222222222222222"),
4194                [
4195                    Bucket {
4196                        timestamp: UnixTimestamp(1615889440),
4197                        width: 0,
4198                        name: MetricName(
4199                            "d:custom/endpoint.cache_rate@none",
4200                        ),
4201                        value: Distribution(
4202                            [
4203                                36.0,
4204                            ],
4205                        ),
4206                        tags: {},
4207                        metadata: BucketMetadata {
4208                            merges: 1,
4209                            received_at: None,
4210                            extracted_from_indexed: false,
4211                        },
4212                    },
4213                ],
4214            ),
4215        ]
4216        "###);
4217    }
4218}