relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27    ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_protocol::Annotated;
32use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
33use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision};
34use relay_statsd::metric;
35use relay_system::{Addr, FromMessage, NoResponse, Service};
36use reqwest::header;
37use smallvec::{SmallVec, smallvec};
38use zstd::stream::Encoder as ZstdEncoder;
39
40use crate::envelope::{
41    self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
42};
43use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
44use crate::integrations::Integration;
45use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
46use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
47use crate::metrics_extraction::transactions::ExtractedMetrics;
48use crate::metrics_extraction::transactions::types::ExtractMetricsError;
49use crate::processing::attachments::AttachmentProcessor;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::client_reports::ClientReportsProcessor;
52use crate::processing::errors::{ErrorsProcessor, SwitchProcessingError};
53use crate::processing::logs::LogsProcessor;
54use crate::processing::profile_chunks::ProfileChunksProcessor;
55use crate::processing::profiles::ProfilesProcessor;
56use crate::processing::replays::ReplaysProcessor;
57use crate::processing::sessions::SessionsProcessor;
58use crate::processing::spans::SpansProcessor;
59use crate::processing::trace_attachments::TraceAttachmentsProcessor;
60use crate::processing::trace_metrics::TraceMetricsProcessor;
61use crate::processing::transactions::TransactionProcessor;
62use crate::processing::user_reports::UserReportsProcessor;
63use crate::processing::utils::event::{
64    EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
65    event_type,
66};
67use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
68use crate::service::ServiceError;
69use crate::services::global_config::GlobalConfigHandle;
70use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
71use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
72use crate::services::projects::cache::ProjectCacheHandle;
73use crate::services::projects::project::{ProjectInfo, ProjectState};
74use crate::services::upstream::{
75    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
76};
77use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
78use crate::utils::{self, CheckLimits, EnvelopeLimiter};
79use crate::{http, processing};
80use relay_threading::AsyncPool;
81#[cfg(feature = "processing")]
82use {
83    crate::services::objectstore::Objectstore,
84    crate::services::store::Store,
85    crate::utils::Enforcement,
86    itertools::Itertools,
87    relay_cardinality::{
88        CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
89        RedisSetLimiterOptions,
90    },
91    relay_dynamic_config::CardinalityLimiterMode,
92    relay_quotas::{RateLimitingError, RedisRateLimiter},
93    relay_redis::RedisClients,
94    std::time::Instant,
95    symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
96};
97
98mod attachment;
99mod dynamic_sampling;
100pub mod event;
101mod metrics;
102mod nel;
103mod profile;
104mod report;
105mod span;
106
107#[cfg(all(sentry, feature = "processing"))]
108pub mod playstation;
109mod standalone;
110#[cfg(feature = "processing")]
111mod unreal;
112
113#[cfg(feature = "processing")]
114mod nnswitch;
115
116/// Creates the block only if used with `processing` feature.
117///
118/// Provided code block will be executed only if the provided config has `processing_enabled` set.
119macro_rules! if_processing {
120    ($config:expr, $if_true:block) => {
121        #[cfg(feature = "processing")] {
122            if $config.processing_enabled() $if_true
123        }
124    };
125    ($config:expr, $if_true:block else $if_false:block) => {
126        {
127            #[cfg(feature = "processing")] {
128                if $config.processing_enabled() $if_true else $if_false
129            }
130            #[cfg(not(feature = "processing"))] {
131                $if_false
132            }
133        }
134    };
135}
136
137/// The minimum clock drift for correction to apply.
138pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
139
140#[derive(Debug)]
141pub struct GroupTypeError;
142
143impl Display for GroupTypeError {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        f.write_str("failed to convert processing group into corresponding type")
146    }
147}
148
149impl std::error::Error for GroupTypeError {}
150
151macro_rules! processing_group {
152    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
153        #[derive(Clone, Copy, Debug)]
154        pub struct $ty;
155
156        impl From<$ty> for ProcessingGroup {
157            fn from(_: $ty) -> Self {
158                ProcessingGroup::$variant
159            }
160        }
161
162        impl TryFrom<ProcessingGroup> for $ty {
163            type Error = GroupTypeError;
164
165            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
166                if matches!(value, ProcessingGroup::$variant) {
167                    return Ok($ty);
168                }
169                $($(
170                    if matches!(value, ProcessingGroup::$other) {
171                        return Ok($ty);
172                    }
173                )+)?
174                return Err(GroupTypeError);
175            }
176        }
177    };
178}
179
180/// A marker trait.
181///
182/// Should be used only with groups which are responsible for processing envelopes with events.
183pub trait EventProcessing {}
184
185processing_group!(TransactionGroup, Transaction);
186impl EventProcessing for TransactionGroup {}
187
188processing_group!(ErrorGroup, Error);
189impl EventProcessing for ErrorGroup {}
190
191processing_group!(SessionGroup, Session);
192processing_group!(
193    StandaloneGroup,
194    Standalone,
195    StandaloneAttachments,
196    StandaloneUserReports,
197    StandaloneProfiles
198);
199processing_group!(ClientReportGroup, ClientReport);
200processing_group!(ReplayGroup, Replay);
201processing_group!(CheckInGroup, CheckIn);
202processing_group!(LogGroup, Log, Nel);
203processing_group!(TraceMetricGroup, TraceMetric);
204processing_group!(SpanGroup, Span);
205
206processing_group!(ProfileChunkGroup, ProfileChunk);
207processing_group!(MetricsGroup, Metrics);
208processing_group!(ForwardUnknownGroup, ForwardUnknown);
209processing_group!(Ungrouped, Ungrouped);
210
211/// Processed group type marker.
212///
213/// Marks the envelopes which passed through the processing pipeline.
214#[derive(Clone, Copy, Debug)]
215pub struct Processed;
216
217/// Describes the groups of the processable items.
218#[derive(Clone, Copy, Debug)]
219pub enum ProcessingGroup {
220    /// All the transaction related items.
221    ///
222    /// Includes transactions, related attachments, profiles.
223    Transaction,
224    /// All the items which require (have or create) events.
225    ///
226    /// This includes: errors, NEL, security reports, user reports, some of the
227    /// attachments.
228    Error,
229    /// Session events.
230    Session,
231    /// Standalone items which can be sent alone without any event attached to it in the current
232    /// envelope e.g. some attachments, user reports.
233    Standalone,
234    /// Standalone attachments
235    ///
236    /// Attachments that are send without an item that creates an event in the same envelope.
237    StandaloneAttachments,
238    /// Standalone user reports
239    ///
240    /// User reports that are send without an item that creates an event in the same envelope.
241    StandaloneUserReports,
242    /// Standalone profiles
243    ///
244    /// Profiles which had their transaction sampled.
245    StandaloneProfiles,
246    /// Outcomes.
247    ClientReport,
248    /// Replays and ReplayRecordings.
249    Replay,
250    /// Crons.
251    CheckIn,
252    /// NEL reports.
253    Nel,
254    /// Logs.
255    Log,
256    /// Trace metrics.
257    TraceMetric,
258    /// Spans.
259    Span,
260    /// Span V2 spans.
261    SpanV2,
262    /// Metrics.
263    Metrics,
264    /// ProfileChunk.
265    ProfileChunk,
266    /// V2 attachments without span / log association.
267    TraceAttachment,
268    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
269    /// decide what to do with them.
270    ForwardUnknown,
271    /// All the items in the envelope that could not be grouped.
272    Ungrouped,
273}
274
275impl ProcessingGroup {
276    /// Splits provided envelope into list of tuples of groups with associated envelopes.
277    fn split_envelope(
278        mut envelope: Envelope,
279        project_info: &ProjectInfo,
280    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
281        let headers = envelope.headers().clone();
282        let mut grouped_envelopes = smallvec![];
283
284        // Extract replays.
285        let replay_items = envelope.take_items_by(|item| {
286            matches!(
287                item.ty(),
288                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
289            )
290        });
291        if !replay_items.is_empty() {
292            grouped_envelopes.push((
293                ProcessingGroup::Replay,
294                Envelope::from_parts(headers.clone(), replay_items),
295            ))
296        }
297
298        // Keep all the sessions together in one envelope.
299        let session_items = envelope
300            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
301        if !session_items.is_empty() {
302            grouped_envelopes.push((
303                ProcessingGroup::Session,
304                Envelope::from_parts(headers.clone(), session_items),
305            ))
306        }
307
308        let span_v2_items = envelope.take_items_by(|item| {
309            let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
310
311            ItemContainer::<SpanV2>::is_container(item)
312                || matches!(item.integration(), Some(Integration::Spans(_)))
313                // Process standalone spans (v1) via the v2 pipeline
314                || (exp_feature && matches!(item.ty(), &ItemType::Span))
315                || (exp_feature && item.is_span_attachment())
316        });
317
318        if !span_v2_items.is_empty() {
319            grouped_envelopes.push((
320                ProcessingGroup::SpanV2,
321                Envelope::from_parts(headers.clone(), span_v2_items),
322            ))
323        }
324
325        // Extract spans.
326        let span_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Span));
327        if !span_items.is_empty() {
328            grouped_envelopes.push((
329                ProcessingGroup::Span,
330                Envelope::from_parts(headers.clone(), span_items),
331            ))
332        }
333
334        // Extract logs.
335        let logs_items = envelope.take_items_by(|item| {
336            matches!(item.ty(), &ItemType::Log)
337                || matches!(item.integration(), Some(Integration::Logs(_)))
338        });
339        if !logs_items.is_empty() {
340            grouped_envelopes.push((
341                ProcessingGroup::Log,
342                Envelope::from_parts(headers.clone(), logs_items),
343            ))
344        }
345
346        // Extract trace metrics.
347        let trace_metric_items =
348            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
349        if !trace_metric_items.is_empty() {
350            grouped_envelopes.push((
351                ProcessingGroup::TraceMetric,
352                Envelope::from_parts(headers.clone(), trace_metric_items),
353            ))
354        }
355
356        // NEL items are transformed into logs in their own processing step.
357        let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
358        if !nel_items.is_empty() {
359            grouped_envelopes.push((
360                ProcessingGroup::Nel,
361                Envelope::from_parts(headers.clone(), nel_items),
362            ))
363        }
364
365        // Extract all metric items.
366        //
367        // Note: Should only be relevant in proxy mode. In other modes we send metrics through
368        // a separate pipeline.
369        let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
370        if !metric_items.is_empty() {
371            grouped_envelopes.push((
372                ProcessingGroup::Metrics,
373                Envelope::from_parts(headers.clone(), metric_items),
374            ))
375        }
376
377        // Extract profile chunks.
378        let profile_chunk_items =
379            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
380        if !profile_chunk_items.is_empty() {
381            grouped_envelopes.push((
382                ProcessingGroup::ProfileChunk,
383                Envelope::from_parts(headers.clone(), profile_chunk_items),
384            ))
385        }
386
387        let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
388        if !trace_attachment_items.is_empty() {
389            grouped_envelopes.push((
390                ProcessingGroup::TraceAttachment,
391                Envelope::from_parts(headers.clone(), trace_attachment_items),
392            ))
393        }
394
395        if !envelope.items().any(Item::creates_event) {
396            // Extract the standalone attachments
397            let standalone_attachments = envelope
398                .take_items_by(|i| i.requires_event() && matches!(i.ty(), ItemType::Attachment));
399            if !standalone_attachments.is_empty() {
400                grouped_envelopes.push((
401                    ProcessingGroup::StandaloneAttachments,
402                    Envelope::from_parts(headers.clone(), standalone_attachments),
403                ))
404            }
405
406            // Extract the standalone user reports
407            let standalone_user_reports =
408                envelope.take_items_by(|i| matches!(i.ty(), ItemType::UserReport));
409            if !standalone_user_reports.is_empty() {
410                grouped_envelopes.push((
411                    ProcessingGroup::StandaloneUserReports,
412                    Envelope::from_parts(headers.clone(), standalone_user_reports),
413                ));
414            }
415
416            // Extract the standalone profiles
417            let standalone_profiles =
418                envelope.take_items_by(|i| matches!(i.ty(), ItemType::Profile));
419            if !standalone_profiles.is_empty() {
420                grouped_envelopes.push((
421                    ProcessingGroup::StandaloneProfiles,
422                    Envelope::from_parts(headers.clone(), standalone_profiles),
423                ));
424            }
425        }
426
427        // Extract all standalone items.
428        //
429        // Note: only if there are no items in the envelope which can create events, otherwise they
430        // will be in the same envelope with all require event items.
431        if !envelope.items().any(Item::creates_event) {
432            let standalone_items = envelope.take_items_by(Item::requires_event);
433            if !standalone_items.is_empty() {
434                grouped_envelopes.push((
435                    ProcessingGroup::Standalone,
436                    Envelope::from_parts(headers.clone(), standalone_items),
437                ))
438            }
439        };
440
441        // Make sure we create separate envelopes for each `RawSecurity` report.
442        let security_reports_items = envelope
443            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
444            .into_iter()
445            .map(|item| {
446                let headers = headers.clone();
447                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
448                let mut envelope = Envelope::from_parts(headers, items);
449                envelope.set_event_id(EventId::new());
450                (ProcessingGroup::Error, envelope)
451            });
452        grouped_envelopes.extend(security_reports_items);
453
454        // Extract all the items which require an event into separate envelope.
455        let require_event_items = envelope.take_items_by(Item::requires_event);
456        if !require_event_items.is_empty() {
457            let group = if require_event_items
458                .iter()
459                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
460            {
461                ProcessingGroup::Transaction
462            } else {
463                ProcessingGroup::Error
464            };
465
466            grouped_envelopes.push((
467                group,
468                Envelope::from_parts(headers.clone(), require_event_items),
469            ))
470        }
471
472        // Get the rest of the envelopes, one per item.
473        let envelopes = envelope.items_mut().map(|item| {
474            let headers = headers.clone();
475            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
476            let envelope = Envelope::from_parts(headers, items);
477            let item_type = item.ty();
478            let group = if matches!(item_type, &ItemType::CheckIn) {
479                ProcessingGroup::CheckIn
480            } else if matches!(item.ty(), &ItemType::ClientReport) {
481                ProcessingGroup::ClientReport
482            } else if matches!(item_type, &ItemType::Unknown(_)) {
483                ProcessingGroup::ForwardUnknown
484            } else {
485                // Cannot group this item type.
486                ProcessingGroup::Ungrouped
487            };
488
489            (group, envelope)
490        });
491        grouped_envelopes.extend(envelopes);
492
493        grouped_envelopes
494    }
495
496    /// Returns the name of the group.
497    pub fn variant(&self) -> &'static str {
498        match self {
499            ProcessingGroup::Transaction => "transaction",
500            ProcessingGroup::Error => "error",
501            ProcessingGroup::Session => "session",
502            ProcessingGroup::Standalone => "standalone",
503            ProcessingGroup::StandaloneAttachments => "standalone_attachment",
504            ProcessingGroup::StandaloneUserReports => "standalone_user_reports",
505            ProcessingGroup::StandaloneProfiles => "standalone_profiles",
506            ProcessingGroup::ClientReport => "client_report",
507            ProcessingGroup::Replay => "replay",
508            ProcessingGroup::CheckIn => "check_in",
509            ProcessingGroup::Log => "log",
510            ProcessingGroup::TraceMetric => "trace_metric",
511            ProcessingGroup::Nel => "nel",
512            ProcessingGroup::Span => "span",
513            ProcessingGroup::SpanV2 => "span_v2",
514            ProcessingGroup::Metrics => "metrics",
515            ProcessingGroup::ProfileChunk => "profile_chunk",
516            ProcessingGroup::TraceAttachment => "trace_attachment",
517            ProcessingGroup::ForwardUnknown => "forward_unknown",
518            ProcessingGroup::Ungrouped => "ungrouped",
519        }
520    }
521}
522
523impl From<ProcessingGroup> for AppFeature {
524    fn from(value: ProcessingGroup) -> Self {
525        match value {
526            ProcessingGroup::Transaction => AppFeature::Transactions,
527            ProcessingGroup::Error => AppFeature::Errors,
528            ProcessingGroup::Session => AppFeature::Sessions,
529            ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
530            ProcessingGroup::StandaloneAttachments => AppFeature::UnattributedEnvelope,
531            ProcessingGroup::StandaloneUserReports => AppFeature::UserReports,
532            ProcessingGroup::StandaloneProfiles => AppFeature::Profiles,
533            ProcessingGroup::ClientReport => AppFeature::ClientReports,
534            ProcessingGroup::Replay => AppFeature::Replays,
535            ProcessingGroup::CheckIn => AppFeature::CheckIns,
536            ProcessingGroup::Log => AppFeature::Logs,
537            ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
538            ProcessingGroup::Nel => AppFeature::Logs,
539            ProcessingGroup::Span => AppFeature::Spans,
540            ProcessingGroup::SpanV2 => AppFeature::Spans,
541            ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
542            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
543            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
544            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
545            ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
546        }
547    }
548}
549
550/// An error returned when handling [`ProcessEnvelope`].
551#[derive(Debug, thiserror::Error)]
552pub enum ProcessingError {
553    #[error("invalid json in event")]
554    InvalidJson(#[source] serde_json::Error),
555
556    #[error("invalid message pack event payload")]
557    InvalidMsgpack(#[from] rmp_serde::decode::Error),
558
559    #[cfg(feature = "processing")]
560    #[error("invalid unreal crash report")]
561    InvalidUnrealReport(#[source] Unreal4Error),
562
563    #[error("event payload too large")]
564    PayloadTooLarge(DiscardItemType),
565
566    #[error("invalid transaction event")]
567    InvalidTransaction,
568
569    #[error("the item is not allowed/supported in this envelope")]
570    UnsupportedItem,
571
572    #[error("envelope processor failed")]
573    ProcessingFailed(#[from] ProcessingAction),
574
575    #[error("duplicate {0} in event")]
576    DuplicateItem(ItemType),
577
578    #[error("failed to extract event payload")]
579    NoEventPayload,
580
581    #[error("missing project id in DSN")]
582    MissingProjectId,
583
584    #[error("invalid security report type: {0:?}")]
585    InvalidSecurityType(Bytes),
586
587    #[error("unsupported security report type")]
588    UnsupportedSecurityType,
589
590    #[error("invalid security report")]
591    InvalidSecurityReport(#[source] serde_json::Error),
592
593    #[error("invalid nel report")]
594    InvalidNelReport(#[source] NetworkReportError),
595
596    #[error("event filtered with reason: {0:?}")]
597    EventFiltered(FilterStatKey),
598
599    #[error("missing or invalid required event timestamp")]
600    InvalidTimestamp,
601
602    #[error("could not serialize event payload")]
603    SerializeFailed(#[source] serde_json::Error),
604
605    #[cfg(feature = "processing")]
606    #[error("failed to apply quotas")]
607    QuotasFailed(#[from] RateLimitingError),
608
609    #[error("invalid processing group type")]
610    InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
611
612    #[error("nintendo switch dying message processing failed {0:?}")]
613    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
614
615    #[cfg(all(sentry, feature = "processing"))]
616    #[error("playstation dump processing failed: {0}")]
617    InvalidPlaystationDump(String),
618
619    #[error("processing group does not match specific processor")]
620    ProcessingGroupMismatch,
621    #[error("new processing pipeline failed")]
622    ProcessingFailure,
623}
624
625impl ProcessingError {
626    pub fn to_outcome(&self) -> Option<Outcome> {
627        match self {
628            Self::PayloadTooLarge(payload_type) => {
629                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
630            }
631            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
632            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
633            Self::InvalidSecurityType(_) => {
634                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
635            }
636            Self::UnsupportedItem => Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)),
637            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
638            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
639            Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
640            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
641            Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
642            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
643            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
644            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
645            #[cfg(all(sentry, feature = "processing"))]
646            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
647            #[cfg(feature = "processing")]
648            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
649                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
650            }
651            #[cfg(feature = "processing")]
652            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
653            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
654                Some(Outcome::Invalid(DiscardReason::Internal))
655            }
656            #[cfg(feature = "processing")]
657            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
658            Self::MissingProjectId => None,
659            Self::EventFiltered(key) => Some(Outcome::Filtered(key.clone())),
660            Self::InvalidProcessingGroup(_) => None,
661
662            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
663            // Outcomes are emitted in the new processing pipeline already.
664            Self::ProcessingFailure => None,
665        }
666    }
667
668    fn is_unexpected(&self) -> bool {
669        self.to_outcome()
670            .is_some_and(|outcome| outcome.is_unexpected())
671    }
672}
673
674#[cfg(feature = "processing")]
675impl From<Unreal4Error> for ProcessingError {
676    fn from(err: Unreal4Error) -> Self {
677        match err.kind() {
678            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
679            _ => ProcessingError::InvalidUnrealReport(err),
680        }
681    }
682}
683
684impl From<ExtractMetricsError> for ProcessingError {
685    fn from(error: ExtractMetricsError) -> Self {
686        match error {
687            ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
688                Self::InvalidTimestamp
689            }
690        }
691    }
692}
693
694impl From<InvalidProcessingGroupType> for ProcessingError {
695    fn from(value: InvalidProcessingGroupType) -> Self {
696        Self::InvalidProcessingGroup(Box::new(value))
697    }
698}
699
700type ExtractedEvent = (Annotated<Event>, usize);
701
702/// A container for extracted metrics during processing.
703///
704/// The container enforces that the extracted metrics are correctly tagged
705/// with the dynamic sampling decision.
706#[derive(Debug)]
707pub struct ProcessingExtractedMetrics {
708    metrics: ExtractedMetrics,
709}
710
711impl ProcessingExtractedMetrics {
712    pub fn new() -> Self {
713        Self {
714            metrics: ExtractedMetrics::default(),
715        }
716    }
717
718    pub fn into_inner(self) -> ExtractedMetrics {
719        self.metrics
720    }
721
722    /// Extends the contained metrics with [`ExtractedMetrics`].
723    pub fn extend(
724        &mut self,
725        extracted: ExtractedMetrics,
726        sampling_decision: Option<SamplingDecision>,
727    ) {
728        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
729        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
730    }
731
732    /// Extends the contained project metrics.
733    pub fn extend_project_metrics<I>(
734        &mut self,
735        buckets: I,
736        sampling_decision: Option<SamplingDecision>,
737    ) where
738        I: IntoIterator<Item = Bucket>,
739    {
740        self.metrics
741            .project_metrics
742            .extend(buckets.into_iter().map(|mut bucket| {
743                bucket.metadata.extracted_from_indexed =
744                    sampling_decision == Some(SamplingDecision::Keep);
745                bucket
746            }));
747    }
748
749    /// Extends the contained sampling metrics.
750    pub fn extend_sampling_metrics<I>(
751        &mut self,
752        buckets: I,
753        sampling_decision: Option<SamplingDecision>,
754    ) where
755        I: IntoIterator<Item = Bucket>,
756    {
757        self.metrics
758            .sampling_metrics
759            .extend(buckets.into_iter().map(|mut bucket| {
760                bucket.metadata.extracted_from_indexed =
761                    sampling_decision == Some(SamplingDecision::Keep);
762                bucket
763            }));
764    }
765
766    /// Applies rate limits to the contained metrics.
767    ///
768    /// This is used to apply rate limits which have been enforced on sampled items of an envelope
769    /// to also consistently apply to the metrics extracted from these items.
770    #[cfg(feature = "processing")]
771    fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
772        // Metric namespaces which need to be dropped.
773        let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
774        // Metrics belonging to this metric namespace need to have the `extracted_from_indexed`
775        // flag reset to `false`.
776        let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
777
778        for (namespace, limit, indexed) in [
779            (
780                MetricNamespace::Transactions,
781                &enforcement.event,
782                &enforcement.event_indexed,
783            ),
784            (
785                MetricNamespace::Spans,
786                &enforcement.spans,
787                &enforcement.spans_indexed,
788            ),
789        ] {
790            if limit.is_active() {
791                drop_namespaces.push(namespace);
792            } else if indexed.is_active() && !enforced_consistently {
793                // If the enforcement was not computed by consistently checking the limits,
794                // the quota for the metrics has not yet been incremented.
795                // In this case we have a dropped indexed payload but a metric which still needs to
796                // be accounted for, make sure the metric will still be rate limited.
797                reset_extracted_from_indexed.push(namespace);
798            }
799        }
800
801        if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
802            self.retain_mut(|bucket| {
803                let Some(namespace) = bucket.name.try_namespace() else {
804                    return true;
805                };
806
807                if drop_namespaces.contains(&namespace) {
808                    return false;
809                }
810
811                if reset_extracted_from_indexed.contains(&namespace) {
812                    bucket.metadata.extracted_from_indexed = false;
813                }
814
815                true
816            });
817        }
818    }
819
820    #[cfg(feature = "processing")]
821    fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
822        self.metrics.project_metrics.retain_mut(&mut f);
823        self.metrics.sampling_metrics.retain_mut(&mut f);
824    }
825}
826
827fn send_metrics(
828    metrics: ExtractedMetrics,
829    project_key: ProjectKey,
830    sampling_key: Option<ProjectKey>,
831    aggregator: &Addr<Aggregator>,
832) {
833    let ExtractedMetrics {
834        project_metrics,
835        sampling_metrics,
836    } = metrics;
837
838    if !project_metrics.is_empty() {
839        aggregator.send(MergeBuckets {
840            project_key,
841            buckets: project_metrics,
842        });
843    }
844
845    if !sampling_metrics.is_empty() {
846        // If no sampling project state is available, we associate the sampling
847        // metrics with the current project.
848        //
849        // project_without_tracing         -> metrics goes to self
850        // dependent_project_with_tracing  -> metrics goes to root
851        // root_project_with_tracing       -> metrics goes to root == self
852        let sampling_project_key = sampling_key.unwrap_or(project_key);
853        aggregator.send(MergeBuckets {
854            project_key: sampling_project_key,
855            buckets: sampling_metrics,
856        });
857    }
858}
859
860/// Function for on-off switches that filter specific item types (profiles, spans)
861/// based on a feature flag.
862///
863/// If the project config did not come from the upstream, we keep the items.
864fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
865    match config.relay_mode() {
866        RelayMode::Proxy => false,
867        RelayMode::Managed => !project_info.has_feature(feature),
868    }
869}
870
871/// The result of the envelope processing containing the processed envelope along with the partial
872/// result.
873#[derive(Debug)]
874#[expect(
875    clippy::large_enum_variant,
876    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
877)]
878enum ProcessingResult {
879    Envelope {
880        managed_envelope: TypedEnvelope<Processed>,
881        extracted_metrics: ProcessingExtractedMetrics,
882    },
883    Output(Output<Outputs>),
884}
885
886impl ProcessingResult {
887    /// Creates a [`ProcessingResult`] with no metrics.
888    fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
889        Self::Envelope {
890            managed_envelope,
891            extracted_metrics: ProcessingExtractedMetrics::new(),
892        }
893    }
894}
895
896/// All items which can be submitted upstream.
897#[derive(Debug)]
898#[expect(
899    clippy::large_enum_variant,
900    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
901)]
902enum Submit<'a> {
903    /// A processed envelope.
904    Envelope(TypedEnvelope<Processed>),
905    /// The output of a [`processing::Processor`].
906    Output {
907        output: Outputs,
908        ctx: processing::ForwardContext<'a>,
909    },
910}
911
912/// Applies processing to all contents of the given envelope.
913///
914/// Depending on the contents of the envelope and Relay's mode, this includes:
915///
916///  - Basic normalization and validation for all item types.
917///  - Clock drift correction if the required `sent_at` header is present.
918///  - Expansion of certain item types (e.g. unreal).
919///  - Store normalization for event payloads in processing mode.
920///  - Rate limiters and inbound filters on events in processing mode.
921#[derive(Debug)]
922pub struct ProcessEnvelope {
923    /// Envelope to process.
924    pub envelope: ManagedEnvelope,
925    /// The project info.
926    pub project_info: Arc<ProjectInfo>,
927    /// Currently active cached rate limits for this project.
928    pub rate_limits: Arc<RateLimits>,
929    /// Root sampling project info.
930    pub sampling_project_info: Option<Arc<ProjectInfo>>,
931    /// Sampling reservoir counters.
932    pub reservoir_counters: ReservoirCounters,
933}
934
935/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
936#[derive(Debug)]
937struct ProcessEnvelopeGrouped<'a> {
938    /// The group the envelope belongs to.
939    pub group: ProcessingGroup,
940    /// Envelope to process.
941    pub envelope: ManagedEnvelope,
942    /// The processing context.
943    pub ctx: processing::Context<'a>,
944}
945
946/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
947///
948/// This parses and validates the metrics:
949///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
950///    ignored independently.
951///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
952///    dropped together on parsing failure.
953///  - Other envelope items will be ignored with an error message.
954///
955/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
956/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
957#[derive(Debug)]
958pub struct ProcessMetrics {
959    /// A list of metric items.
960    pub data: MetricData,
961    /// The target project.
962    pub project_key: ProjectKey,
963    /// Whether to keep or reset the metric metadata.
964    pub source: BucketSource,
965    /// The wall clock time at which the request was received.
966    pub received_at: DateTime<Utc>,
967    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
968    /// correction.
969    pub sent_at: Option<DateTime<Utc>>,
970}
971
972/// Raw unparsed metric data.
973#[derive(Debug)]
974pub enum MetricData {
975    /// Raw data, unparsed envelope items.
976    Raw(Vec<Item>),
977    /// Already parsed buckets but unprocessed.
978    Parsed(Vec<Bucket>),
979}
980
981impl MetricData {
982    /// Consumes the metric data and parses the contained buckets.
983    ///
984    /// If the contained data is already parsed the buckets are returned unchanged.
985    /// Raw buckets are parsed and created with the passed `timestamp`.
986    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
987        let items = match self {
988            Self::Parsed(buckets) => return buckets,
989            Self::Raw(items) => items,
990        };
991
992        let mut buckets = Vec::new();
993        for item in items {
994            let payload = item.payload();
995            if item.ty() == &ItemType::Statsd {
996                for bucket_result in Bucket::parse_all(&payload, timestamp) {
997                    match bucket_result {
998                        Ok(bucket) => buckets.push(bucket),
999                        Err(error) => relay_log::debug!(
1000                            error = &error as &dyn Error,
1001                            "failed to parse metric bucket from statsd format",
1002                        ),
1003                    }
1004                }
1005            } else if item.ty() == &ItemType::MetricBuckets {
1006                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
1007                    Ok(parsed_buckets) => {
1008                        // Re-use the allocation of `b` if possible.
1009                        if buckets.is_empty() {
1010                            buckets = parsed_buckets;
1011                        } else {
1012                            buckets.extend(parsed_buckets);
1013                        }
1014                    }
1015                    Err(error) => {
1016                        relay_log::debug!(
1017                            error = &error as &dyn Error,
1018                            "failed to parse metric bucket",
1019                        );
1020                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1021                    }
1022                }
1023            } else {
1024                relay_log::error!(
1025                    "invalid item of type {} passed to ProcessMetrics",
1026                    item.ty()
1027                );
1028            }
1029        }
1030        buckets
1031    }
1032}
1033
1034#[derive(Debug)]
1035pub struct ProcessBatchedMetrics {
1036    /// Metrics payload in JSON format.
1037    pub payload: Bytes,
1038    /// Whether to keep or reset the metric metadata.
1039    pub source: BucketSource,
1040    /// The wall clock time at which the request was received.
1041    pub received_at: DateTime<Utc>,
1042    /// The wall clock time at which the request was received.
1043    pub sent_at: Option<DateTime<Utc>>,
1044}
1045
1046/// Source information where a metric bucket originates from.
1047#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1048pub enum BucketSource {
1049    /// The metric bucket originated from an internal Relay use case.
1050    ///
1051    /// The metric bucket originates either from within the same Relay
1052    /// or was accepted coming from another Relay which is registered as
1053    /// an internal Relay via Relay's configuration.
1054    Internal,
1055    /// The bucket source originated from an untrusted source.
1056    ///
1057    /// Managed Relays sending extracted metrics are considered external,
1058    /// it's a project use case but it comes from an untrusted source.
1059    External,
1060}
1061
1062impl BucketSource {
1063    /// Infers the bucket source from [`RequestMeta::request_trust`].
1064    pub fn from_meta(meta: &RequestMeta) -> Self {
1065        match meta.request_trust() {
1066            RequestTrust::Trusted => Self::Internal,
1067            RequestTrust::Untrusted => Self::External,
1068        }
1069    }
1070}
1071
1072/// Sends a client report to the upstream.
1073#[derive(Debug)]
1074pub struct SubmitClientReports {
1075    /// The client report to be sent.
1076    pub client_reports: Vec<ClientReport>,
1077    /// Scoping information for the client report.
1078    pub scoping: Scoping,
1079}
1080
1081/// CPU-intensive processing tasks for envelopes.
1082#[derive(Debug)]
1083pub enum EnvelopeProcessor {
1084    ProcessEnvelope(Box<ProcessEnvelope>),
1085    ProcessProjectMetrics(Box<ProcessMetrics>),
1086    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1087    FlushBuckets(Box<FlushBuckets>),
1088    SubmitClientReports(Box<SubmitClientReports>),
1089}
1090
1091impl EnvelopeProcessor {
1092    /// Returns the name of the message variant.
1093    pub fn variant(&self) -> &'static str {
1094        match self {
1095            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1096            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1097            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1098            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1099            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1100        }
1101    }
1102}
1103
1104impl relay_system::Interface for EnvelopeProcessor {}
1105
1106impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1107    type Response = relay_system::NoResponse;
1108
1109    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1110        Self::ProcessEnvelope(Box::new(message))
1111    }
1112}
1113
1114impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1115    type Response = NoResponse;
1116
1117    fn from_message(message: ProcessMetrics, _: ()) -> Self {
1118        Self::ProcessProjectMetrics(Box::new(message))
1119    }
1120}
1121
1122impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1123    type Response = NoResponse;
1124
1125    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1126        Self::ProcessBatchedMetrics(Box::new(message))
1127    }
1128}
1129
1130impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1131    type Response = NoResponse;
1132
1133    fn from_message(message: FlushBuckets, _: ()) -> Self {
1134        Self::FlushBuckets(Box::new(message))
1135    }
1136}
1137
1138impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1139    type Response = NoResponse;
1140
1141    fn from_message(message: SubmitClientReports, _: ()) -> Self {
1142        Self::SubmitClientReports(Box::new(message))
1143    }
1144}
1145
1146/// The asynchronous thread pool used for scheduling processing tasks in the processor.
1147pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1148
1149/// Service implementing the [`EnvelopeProcessor`] interface.
1150///
1151/// This service handles messages in a worker pool with configurable concurrency.
1152#[derive(Clone)]
1153pub struct EnvelopeProcessorService {
1154    inner: Arc<InnerProcessor>,
1155}
1156
1157/// Contains the addresses of services that the processor publishes to.
1158pub struct Addrs {
1159    pub outcome_aggregator: Addr<TrackOutcome>,
1160    pub upstream_relay: Addr<UpstreamRelay>,
1161    #[cfg(feature = "processing")]
1162    pub objectstore: Option<Addr<Objectstore>>,
1163    #[cfg(feature = "processing")]
1164    pub store_forwarder: Option<Addr<Store>>,
1165    pub aggregator: Addr<Aggregator>,
1166}
1167
1168impl Default for Addrs {
1169    fn default() -> Self {
1170        Addrs {
1171            outcome_aggregator: Addr::dummy(),
1172            upstream_relay: Addr::dummy(),
1173            #[cfg(feature = "processing")]
1174            objectstore: None,
1175            #[cfg(feature = "processing")]
1176            store_forwarder: None,
1177            aggregator: Addr::dummy(),
1178        }
1179    }
1180}
1181
1182struct InnerProcessor {
1183    pool: EnvelopeProcessorServicePool,
1184    config: Arc<Config>,
1185    global_config: GlobalConfigHandle,
1186    project_cache: ProjectCacheHandle,
1187    cogs: Cogs,
1188    addrs: Addrs,
1189    #[cfg(feature = "processing")]
1190    rate_limiter: Option<Arc<RedisRateLimiter>>,
1191    geoip_lookup: GeoIpLookup,
1192    #[cfg(feature = "processing")]
1193    cardinality_limiter: Option<CardinalityLimiter>,
1194    metric_outcomes: MetricOutcomes,
1195    processing: Processing,
1196}
1197
1198struct Processing {
1199    errors: ErrorsProcessor,
1200    logs: LogsProcessor,
1201    trace_metrics: TraceMetricsProcessor,
1202    spans: SpansProcessor,
1203    check_ins: CheckInsProcessor,
1204    sessions: SessionsProcessor,
1205    transactions: TransactionProcessor,
1206    profile_chunks: ProfileChunksProcessor,
1207    trace_attachments: TraceAttachmentsProcessor,
1208    replays: ReplaysProcessor,
1209    client_reports: ClientReportsProcessor,
1210    attachments: AttachmentProcessor,
1211    user_reports: UserReportsProcessor,
1212    profiles: ProfilesProcessor,
1213}
1214
1215impl EnvelopeProcessorService {
1216    /// Creates a multi-threaded envelope processor.
1217    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1218    pub fn new(
1219        pool: EnvelopeProcessorServicePool,
1220        config: Arc<Config>,
1221        global_config: GlobalConfigHandle,
1222        project_cache: ProjectCacheHandle,
1223        cogs: Cogs,
1224        #[cfg(feature = "processing")] redis: Option<RedisClients>,
1225        addrs: Addrs,
1226        metric_outcomes: MetricOutcomes,
1227    ) -> Self {
1228        let geoip_lookup = config
1229            .geoip_path()
1230            .and_then(
1231                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1232                    Ok(geoip) => Some(geoip),
1233                    Err(err) => {
1234                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1235                        None
1236                    }
1237                },
1238            )
1239            .unwrap_or_else(GeoIpLookup::empty);
1240
1241        #[cfg(feature = "processing")]
1242        let (cardinality, quotas) = match redis {
1243            Some(RedisClients {
1244                cardinality,
1245                quotas,
1246                ..
1247            }) => (Some(cardinality), Some(quotas)),
1248            None => (None, None),
1249        };
1250        #[cfg(not(feature = "processing"))]
1251        let quotas = None;
1252
1253        #[cfg(feature = "processing")]
1254        let rate_limiter = quotas.clone().map(|redis| {
1255            RedisRateLimiter::new(redis)
1256                .max_limit(config.max_rate_limit())
1257                .cache(config.quota_cache_ratio(), config.quota_cache_max())
1258        });
1259
1260        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1261            #[cfg(feature = "processing")]
1262            project_cache.clone(),
1263            #[cfg(feature = "processing")]
1264            rate_limiter.clone(),
1265        ));
1266        #[cfg(feature = "processing")]
1267        let rate_limiter = rate_limiter.map(Arc::new);
1268        let outcome_aggregator = addrs.outcome_aggregator.clone();
1269        let inner = InnerProcessor {
1270            pool,
1271            global_config,
1272            project_cache,
1273            cogs,
1274            #[cfg(feature = "processing")]
1275            rate_limiter,
1276            addrs,
1277            #[cfg(feature = "processing")]
1278            cardinality_limiter: cardinality
1279                .map(|cardinality| {
1280                    RedisSetLimiter::new(
1281                        RedisSetLimiterOptions {
1282                            cache_vacuum_interval: config
1283                                .cardinality_limiter_cache_vacuum_interval(),
1284                        },
1285                        cardinality,
1286                    )
1287                })
1288                .map(CardinalityLimiter::new),
1289            metric_outcomes,
1290            processing: Processing {
1291                errors: ErrorsProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1292                logs: LogsProcessor::new(Arc::clone(&quota_limiter)),
1293                trace_metrics: TraceMetricsProcessor::new(Arc::clone(&quota_limiter)),
1294                spans: SpansProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1295                check_ins: CheckInsProcessor::new(Arc::clone(&quota_limiter)),
1296                sessions: SessionsProcessor::new(Arc::clone(&quota_limiter)),
1297                transactions: TransactionProcessor::new(
1298                    Arc::clone(&quota_limiter),
1299                    geoip_lookup.clone(),
1300                    quotas.clone(),
1301                ),
1302                profile_chunks: ProfileChunksProcessor::new(Arc::clone(&quota_limiter)),
1303                trace_attachments: TraceAttachmentsProcessor::new(Arc::clone(&quota_limiter)),
1304                replays: ReplaysProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1305                client_reports: ClientReportsProcessor::new(outcome_aggregator),
1306                attachments: AttachmentProcessor::new(Arc::clone(&quota_limiter)),
1307                user_reports: UserReportsProcessor::new(Arc::clone(&quota_limiter)),
1308                profiles: ProfilesProcessor::new(quota_limiter),
1309            },
1310            geoip_lookup,
1311            config,
1312        };
1313
1314        Self {
1315            inner: Arc::new(inner),
1316        }
1317    }
1318
1319    async fn enforce_quotas<Group>(
1320        &self,
1321        managed_envelope: &mut TypedEnvelope<Group>,
1322        event: Annotated<Event>,
1323        extracted_metrics: &mut ProcessingExtractedMetrics,
1324        ctx: processing::Context<'_>,
1325    ) -> Result<Annotated<Event>, ProcessingError> {
1326        // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not
1327        // applied in the fast path, all cached quotas can be applied here.
1328        let cached_result = RateLimiter::Cached
1329            .enforce(managed_envelope, event, extracted_metrics, ctx)
1330            .await?;
1331
1332        if_processing!(self.inner.config, {
1333            let rate_limiter = match self.inner.rate_limiter.clone() {
1334                Some(rate_limiter) => rate_limiter,
1335                None => return Ok(cached_result.event),
1336            };
1337
1338            // Enforce all quotas consistently with Redis.
1339            let consistent_result = RateLimiter::Consistent(rate_limiter)
1340                .enforce(
1341                    managed_envelope,
1342                    cached_result.event,
1343                    extracted_metrics,
1344                    ctx
1345                )
1346                .await?;
1347
1348            // Update cached rate limits with the freshly computed ones.
1349            if !consistent_result.rate_limits.is_empty() {
1350                self.inner
1351                    .project_cache
1352                    .get(managed_envelope.scoping().project_key)
1353                    .rate_limits()
1354                    .merge(consistent_result.rate_limits);
1355            }
1356
1357            Ok(consistent_result.event)
1358        } else { Ok(cached_result.event) })
1359    }
1360
1361    /// Processes the general errors, and the items which require or create the events.
1362    async fn process_errors(
1363        &self,
1364        managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1365        project_id: ProjectId,
1366        mut ctx: processing::Context<'_>,
1367    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1368        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1369        let mut metrics = Metrics::default();
1370        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1371
1372        // Events can also contain user reports.
1373        report::process_user_reports(managed_envelope);
1374
1375        if_processing!(self.inner.config, {
1376            unreal::expand(managed_envelope, &self.inner.config)?;
1377            #[cfg(sentry)]
1378            playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1379            nnswitch::expand(managed_envelope)?;
1380        });
1381
1382        let mut event = event::extract(
1383            managed_envelope,
1384            &mut metrics,
1385            event_fully_normalized,
1386            &self.inner.config,
1387        )?;
1388
1389        if_processing!(self.inner.config, {
1390            if let Some(inner_event_fully_normalized) =
1391                unreal::process(managed_envelope, &mut event)?
1392            {
1393                event_fully_normalized = inner_event_fully_normalized;
1394            }
1395            #[cfg(sentry)]
1396            if let Some(inner_event_fully_normalized) =
1397                playstation::process(managed_envelope, &mut event, ctx.project_info)?
1398            {
1399                event_fully_normalized = inner_event_fully_normalized;
1400            }
1401            if let Some(inner_event_fully_normalized) =
1402                attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1403            {
1404                event_fully_normalized = inner_event_fully_normalized;
1405            }
1406        });
1407
1408        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1409            managed_envelope,
1410            &mut event,
1411            ctx.project_info,
1412            ctx.sampling_project_info,
1413        );
1414
1415        let attachments = managed_envelope
1416            .envelope()
1417            .items()
1418            .filter(|item| item.attachment_type() == Some(AttachmentType::Attachment));
1419        processing::utils::event::finalize(
1420            managed_envelope.envelope().headers(),
1421            &mut event,
1422            attachments,
1423            &mut metrics,
1424            ctx.config,
1425        )?;
1426        event_fully_normalized = processing::utils::event::normalize(
1427            managed_envelope.envelope().headers(),
1428            &mut event,
1429            event_fully_normalized,
1430            project_id,
1431            ctx,
1432            &self.inner.geoip_lookup,
1433        )?;
1434        let filter_run =
1435            processing::utils::event::filter(managed_envelope.envelope().headers(), &event, ctx)
1436                .map_err(ProcessingError::EventFiltered)?;
1437
1438        if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1439            dynamic_sampling::tag_error_with_sampling_decision(
1440                managed_envelope,
1441                &mut event,
1442                ctx.sampling_project_info,
1443                &self.inner.config,
1444            )
1445            .await;
1446        }
1447
1448        event = self
1449            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1450            .await?;
1451
1452        if event.value().is_some() {
1453            processing::utils::event::scrub(&mut event, ctx.project_info)?;
1454            event::serialize(
1455                managed_envelope,
1456                &mut event,
1457                event_fully_normalized,
1458                EventMetricsExtracted(false),
1459                SpansExtracted(false),
1460            )?;
1461            event::emit_feedback_metrics(managed_envelope.envelope());
1462        }
1463
1464        let attachments = managed_envelope
1465            .envelope_mut()
1466            .items_mut()
1467            .filter(|i| i.ty() == &ItemType::Attachment);
1468        processing::utils::attachments::scrub(attachments, ctx.project_info, None);
1469
1470        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1471            relay_log::error!(
1472                tags.project = %project_id,
1473                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1474                "ingested event without normalizing"
1475            );
1476        }
1477
1478        Ok(Some(extracted_metrics))
1479    }
1480
1481    /// Processes standalone items that require an event ID, but do not have an event on the same envelope.
1482    async fn process_standalone(
1483        &self,
1484        managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1485        ctx: processing::Context<'_>,
1486    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1487        for item in managed_envelope.envelope().items() {
1488            let attachment_type_tag = match item.attachment_type() {
1489                Some(t) => &t.to_string(),
1490                None => "",
1491            };
1492            relay_statsd::metric!(
1493                counter(RelayCounters::StandaloneItem) += 1,
1494                processor = "old",
1495                item_type = item.ty().name(),
1496                attachment_type = attachment_type_tag,
1497            );
1498        }
1499
1500        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1501
1502        standalone::process(managed_envelope);
1503
1504        profile::filter(managed_envelope, ctx.config, ctx.project_info);
1505
1506        self.enforce_quotas(
1507            managed_envelope,
1508            Annotated::empty(),
1509            &mut extracted_metrics,
1510            ctx,
1511        )
1512        .await?;
1513
1514        report::process_user_reports(managed_envelope);
1515
1516        Ok(Some(extracted_metrics))
1517    }
1518
1519    async fn process_nel(
1520        &self,
1521        mut managed_envelope: ManagedEnvelope,
1522        ctx: processing::Context<'_>,
1523    ) -> Result<ProcessingResult, ProcessingError> {
1524        nel::convert_to_logs(&mut managed_envelope);
1525        self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1526            .await
1527    }
1528
1529    async fn process_with_processor<P: processing::Processor>(
1530        &self,
1531        processor: &P,
1532        mut managed_envelope: ManagedEnvelope,
1533        ctx: processing::Context<'_>,
1534    ) -> Result<ProcessingResult, ProcessingError>
1535    where
1536        Outputs: From<P::Output>,
1537    {
1538        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1539            debug_assert!(
1540                false,
1541                "there must be work for the {} processor",
1542                std::any::type_name::<P>(),
1543            );
1544            return Err(ProcessingError::ProcessingGroupMismatch);
1545        };
1546
1547        managed_envelope.update();
1548        match managed_envelope.envelope().is_empty() {
1549            true => managed_envelope.accept(),
1550            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1551        }
1552
1553        processor
1554            .process(work, ctx)
1555            .await
1556            .map_err(|err| {
1557                relay_log::debug!(
1558                    error = &err as &dyn std::error::Error,
1559                    "processing pipeline failed"
1560                );
1561                ProcessingError::ProcessingFailure
1562            })
1563            .map(|o| o.map(Into::into))
1564            .map(ProcessingResult::Output)
1565    }
1566
1567    /// Processes standalone spans.
1568    ///
1569    /// This function does *not* run for spans extracted from transactions.
1570    async fn process_standalone_spans(
1571        &self,
1572        managed_envelope: &mut TypedEnvelope<SpanGroup>,
1573        _project_id: ProjectId,
1574        ctx: processing::Context<'_>,
1575    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1576        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1577
1578        span::filter(managed_envelope, ctx.config, ctx.project_info);
1579
1580        if_processing!(self.inner.config, {
1581            span::process(
1582                managed_envelope,
1583                &mut Annotated::empty(),
1584                &mut extracted_metrics,
1585                _project_id,
1586                ctx,
1587                &self.inner.geoip_lookup,
1588            )
1589            .await;
1590        });
1591
1592        self.enforce_quotas(
1593            managed_envelope,
1594            Annotated::empty(),
1595            &mut extracted_metrics,
1596            ctx,
1597        )
1598        .await?;
1599
1600        Ok(Some(extracted_metrics))
1601    }
1602
1603    async fn process_envelope(
1604        &self,
1605        project_id: ProjectId,
1606        message: ProcessEnvelopeGrouped<'_>,
1607    ) -> Result<ProcessingResult, ProcessingError> {
1608        let ProcessEnvelopeGrouped {
1609            group,
1610            envelope: mut managed_envelope,
1611            ctx,
1612        } = message;
1613
1614        // Pre-process the envelope headers.
1615        if let Some(sampling_state) = ctx.sampling_project_info {
1616            // Both transactions and standalone span envelopes need a normalized DSC header
1617            // to make sampling rules based on the segment/transaction name work correctly.
1618            managed_envelope
1619                .envelope_mut()
1620                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1621        }
1622
1623        // Set the event retention. Effectively, this value will only be available in processing
1624        // mode when the full project config is queried from the upstream.
1625        if let Some(retention) = ctx.project_info.config.event_retention {
1626            managed_envelope.envelope_mut().set_retention(retention);
1627        }
1628
1629        // Set the event retention. Effectively, this value will only be available in processing
1630        // mode when the full project config is queried from the upstream.
1631        if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1632            managed_envelope
1633                .envelope_mut()
1634                .set_downsampled_retention(retention);
1635        }
1636
1637        // Ensure the project ID is updated to the stored instance for this project cache. This can
1638        // differ in two cases:
1639        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
1640        //  2. The DSN was moved and the envelope sent to the old project ID.
1641        managed_envelope
1642            .envelope_mut()
1643            .meta_mut()
1644            .set_project_id(project_id);
1645
1646        macro_rules! run {
1647            ($fn_name:ident $(, $args:expr)*) => {
1648                async {
1649                    let mut managed_envelope = (managed_envelope, group).try_into()?;
1650                    match self.$fn_name(&mut managed_envelope, $($args),*).await {
1651                        Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1652                            managed_envelope: managed_envelope.into_processed(),
1653                            extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1654                        }),
1655                        Err(error) => {
1656                            relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1657                            if let Some(outcome) = error.to_outcome() {
1658                                managed_envelope.reject(outcome);
1659                            }
1660
1661                            return Err(error);
1662                        }
1663                    }
1664                }.await
1665            };
1666        }
1667
1668        relay_log::trace!("Processing {group} group", group = group.variant());
1669
1670        match group {
1671            ProcessingGroup::Error => {
1672                if ctx.project_info.has_feature(Feature::NewErrorProcessing) {
1673                    self.process_with_processor(
1674                        &self.inner.processing.errors,
1675                        managed_envelope,
1676                        ctx,
1677                    )
1678                    .await
1679                } else {
1680                    run!(process_errors, project_id, ctx)
1681                }
1682            }
1683            ProcessingGroup::Transaction => {
1684                self.process_with_processor(
1685                    &self.inner.processing.transactions,
1686                    managed_envelope,
1687                    ctx,
1688                )
1689                .await
1690            }
1691            ProcessingGroup::Session => {
1692                self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1693                    .await
1694            }
1695            ProcessingGroup::Standalone => run!(process_standalone, ctx),
1696            ProcessingGroup::StandaloneAttachments => {
1697                self.process_with_processor(
1698                    &self.inner.processing.attachments,
1699                    managed_envelope,
1700                    ctx,
1701                )
1702                .await
1703            }
1704            ProcessingGroup::StandaloneUserReports => {
1705                self.process_with_processor(
1706                    &self.inner.processing.user_reports,
1707                    managed_envelope,
1708                    ctx,
1709                )
1710                .await
1711            }
1712            ProcessingGroup::StandaloneProfiles => {
1713                self.process_with_processor(&self.inner.processing.profiles, managed_envelope, ctx)
1714                    .await
1715            }
1716            ProcessingGroup::ClientReport => {
1717                self.process_with_processor(
1718                    &self.inner.processing.client_reports,
1719                    managed_envelope,
1720                    ctx,
1721                )
1722                .await
1723            }
1724            ProcessingGroup::Replay => {
1725                self.process_with_processor(&self.inner.processing.replays, managed_envelope, ctx)
1726                    .await
1727            }
1728            ProcessingGroup::CheckIn => {
1729                self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1730                    .await
1731            }
1732            ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1733            ProcessingGroup::Log => {
1734                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1735                    .await
1736            }
1737            ProcessingGroup::TraceMetric => {
1738                self.process_with_processor(
1739                    &self.inner.processing.trace_metrics,
1740                    managed_envelope,
1741                    ctx,
1742                )
1743                .await
1744            }
1745            ProcessingGroup::SpanV2 => {
1746                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1747                    .await
1748            }
1749            ProcessingGroup::TraceAttachment => {
1750                self.process_with_processor(
1751                    &self.inner.processing.trace_attachments,
1752                    managed_envelope,
1753                    ctx,
1754                )
1755                .await
1756            }
1757            ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1758            ProcessingGroup::ProfileChunk => {
1759                self.process_with_processor(
1760                    &self.inner.processing.profile_chunks,
1761                    managed_envelope,
1762                    ctx,
1763                )
1764                .await
1765            }
1766            // Currently is not used.
1767            ProcessingGroup::Metrics => {
1768                // In proxy mode we simply forward the metrics.
1769                // This group shouldn't be used outside of proxy mode.
1770                if self.inner.config.relay_mode() != RelayMode::Proxy {
1771                    relay_log::error!(
1772                        tags.project = %project_id,
1773                        items = ?managed_envelope.envelope().items().next().map(Item::ty),
1774                        "received metrics in the process_state"
1775                    );
1776                }
1777
1778                Ok(ProcessingResult::no_metrics(
1779                    managed_envelope.into_processed(),
1780                ))
1781            }
1782            // Fallback to the legacy process_state implementation for Ungrouped events.
1783            ProcessingGroup::Ungrouped => {
1784                relay_log::error!(
1785                    tags.project = %project_id,
1786                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
1787                    "could not identify the processing group based on the envelope's items"
1788                );
1789
1790                Ok(ProcessingResult::no_metrics(
1791                    managed_envelope.into_processed(),
1792                ))
1793            }
1794            // Leave this group unchanged.
1795            //
1796            // This will later be forwarded to upstream.
1797            ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
1798                managed_envelope.into_processed(),
1799            )),
1800        }
1801    }
1802
1803    /// Processes the envelope and returns the processed envelope back.
1804    ///
1805    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
1806    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
1807    /// to be dropped, this is `None`.
1808    async fn process<'a>(
1809        &self,
1810        mut message: ProcessEnvelopeGrouped<'a>,
1811    ) -> Result<Option<Submit<'a>>, ProcessingError> {
1812        let ProcessEnvelopeGrouped {
1813            ref mut envelope,
1814            ctx,
1815            ..
1816        } = message;
1817
1818        // Prefer the project's project ID, and fall back to the stated project id from the
1819        // envelope. The project ID is available in all modes, other than in proxy mode, where
1820        // envelopes for unknown projects are forwarded blindly.
1821        //
1822        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
1823        // since we cannot process an envelope without project ID, so drop it.
1824        let Some(project_id) = ctx
1825            .project_info
1826            .project_id
1827            .or_else(|| envelope.envelope().meta().project_id())
1828        else {
1829            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1830            return Err(ProcessingError::MissingProjectId);
1831        };
1832
1833        let client = envelope.envelope().meta().client().map(str::to_owned);
1834        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1835        let project_key = envelope.envelope().meta().public_key();
1836        // Only allow sending to the sampling key, if we successfully loaded a sampling project
1837        // info relating to it. This filters out unknown/invalid project keys as well as project
1838        // keys from different organizations.
1839        let sampling_key = envelope
1840            .envelope()
1841            .sampling_key()
1842            .filter(|_| ctx.sampling_project_info.is_some());
1843
1844        // We set additional information on the scope, which will be removed after processing the
1845        // envelope.
1846        relay_log::configure_scope(|scope| {
1847            scope.set_tag("project", project_id);
1848            if let Some(client) = client {
1849                scope.set_tag("sdk", client);
1850            }
1851            if let Some(user_agent) = user_agent {
1852                scope.set_extra("user_agent", user_agent.into());
1853            }
1854        });
1855
1856        let result = match self.process_envelope(project_id, message).await {
1857            Ok(ProcessingResult::Envelope {
1858                mut managed_envelope,
1859                extracted_metrics,
1860            }) => {
1861                // The envelope could be modified or even emptied during processing, which
1862                // requires re-computation of the context.
1863                managed_envelope.update();
1864
1865                let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
1866                send_metrics(
1867                    extracted_metrics.metrics,
1868                    project_key,
1869                    sampling_key,
1870                    &self.inner.addrs.aggregator,
1871                );
1872
1873                let envelope_response = if managed_envelope.envelope().is_empty() {
1874                    if !has_metrics {
1875                        // Individual rate limits have already been issued
1876                        managed_envelope.reject(Outcome::RateLimited(None));
1877                    } else {
1878                        managed_envelope.accept();
1879                    }
1880
1881                    None
1882                } else {
1883                    Some(managed_envelope)
1884                };
1885
1886                Ok(envelope_response.map(Submit::Envelope))
1887            }
1888            Ok(ProcessingResult::Output(Output { main, metrics })) => {
1889                if let Some(metrics) = metrics {
1890                    metrics.accept(|metrics| {
1891                        send_metrics(
1892                            metrics,
1893                            project_key,
1894                            sampling_key,
1895                            &self.inner.addrs.aggregator,
1896                        );
1897                    });
1898                }
1899
1900                let ctx = ctx.to_forward();
1901                Ok(main.map(|output| Submit::Output { output, ctx }))
1902            }
1903            Err(err) => Err(err),
1904        };
1905
1906        relay_log::configure_scope(|scope| {
1907            scope.remove_tag("project");
1908            scope.remove_tag("sdk");
1909            scope.remove_tag("user_agent");
1910        });
1911
1912        result
1913    }
1914
1915    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1916        let project_key = message.envelope.envelope().meta().public_key();
1917        let wait_time = message.envelope.age();
1918        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1919
1920        // This COGS handling may need an overhaul in the future:
1921        // Cancel the passed in token, to start individual measurements per envelope instead.
1922        cogs.cancel();
1923
1924        let scoping = message.envelope.scoping();
1925        for (group, envelope) in ProcessingGroup::split_envelope(
1926            *message.envelope.into_envelope(),
1927            &message.project_info,
1928        ) {
1929            let mut cogs = self
1930                .inner
1931                .cogs
1932                .timed(ResourceId::Relay, AppFeature::from(group));
1933
1934            let mut envelope =
1935                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1936            envelope.scope(scoping);
1937
1938            let global_config = self.inner.global_config.current();
1939
1940            let ctx = processing::Context {
1941                config: &self.inner.config,
1942                global_config: &global_config,
1943                project_info: &message.project_info,
1944                sampling_project_info: message.sampling_project_info.as_deref(),
1945                rate_limits: &message.rate_limits,
1946                reservoir_counters: &message.reservoir_counters,
1947            };
1948
1949            let message = ProcessEnvelopeGrouped {
1950                group,
1951                envelope,
1952                ctx,
1953            };
1954
1955            let result = metric!(
1956                timer(RelayTimers::EnvelopeProcessingTime),
1957                group = group.variant(),
1958                { self.process(message).await }
1959            );
1960
1961            match result {
1962                Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
1963                Ok(None) => {}
1964                Err(error) if error.is_unexpected() => {
1965                    relay_log::error!(
1966                        tags.project_key = %project_key,
1967                        error = &error as &dyn Error,
1968                        "error processing envelope"
1969                    )
1970                }
1971                Err(error) => {
1972                    relay_log::debug!(
1973                        tags.project_key = %project_key,
1974                        error = &error as &dyn Error,
1975                        "error processing envelope"
1976                    )
1977                }
1978            }
1979        }
1980    }
1981
1982    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1983        let ProcessMetrics {
1984            data,
1985            project_key,
1986            received_at,
1987            sent_at,
1988            source,
1989        } = message;
1990
1991        let received_timestamp =
1992            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1993
1994        let mut buckets = data.into_buckets(received_timestamp);
1995        if buckets.is_empty() {
1996            return;
1997        };
1998        cogs.update(relay_metrics::cogs::BySize(&buckets));
1999
2000        let clock_drift_processor =
2001            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2002
2003        buckets.retain_mut(|bucket| {
2004            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2005                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2006                return false;
2007            }
2008
2009            if !self::metrics::is_valid_namespace(bucket) {
2010                return false;
2011            }
2012
2013            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2014
2015            if !matches!(source, BucketSource::Internal) {
2016                bucket.metadata = BucketMetadata::new(received_timestamp);
2017            }
2018
2019            true
2020        });
2021
2022        let project = self.inner.project_cache.get(project_key);
2023
2024        // Best effort check to filter and rate limit buckets, if there is no project state
2025        // available at the current time, we will check again after flushing.
2026        let buckets = match project.state() {
2027            ProjectState::Enabled(project_info) => {
2028                let rate_limits = project.rate_limits().current_limits();
2029                self.check_buckets(project_key, project_info, &rate_limits, buckets)
2030            }
2031            _ => buckets,
2032        };
2033
2034        relay_log::trace!("merging metric buckets into the aggregator");
2035        self.inner
2036            .addrs
2037            .aggregator
2038            .send(MergeBuckets::new(project_key, buckets));
2039    }
2040
2041    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2042        let ProcessBatchedMetrics {
2043            payload,
2044            source,
2045            received_at,
2046            sent_at,
2047        } = message;
2048
2049        #[derive(serde::Deserialize)]
2050        struct Wrapper {
2051            buckets: HashMap<ProjectKey, Vec<Bucket>>,
2052        }
2053
2054        let buckets = match serde_json::from_slice(&payload) {
2055            Ok(Wrapper { buckets }) => buckets,
2056            Err(error) => {
2057                relay_log::debug!(
2058                    error = &error as &dyn Error,
2059                    "failed to parse batched metrics",
2060                );
2061                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2062                return;
2063            }
2064        };
2065
2066        for (project_key, buckets) in buckets {
2067            self.handle_process_metrics(
2068                cogs,
2069                ProcessMetrics {
2070                    data: MetricData::Parsed(buckets),
2071                    project_key,
2072                    source,
2073                    received_at,
2074                    sent_at,
2075                },
2076            )
2077        }
2078    }
2079
2080    fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2081        let _submit = cogs.start_category("submit");
2082
2083        #[cfg(feature = "processing")]
2084        if self.inner.config.processing_enabled()
2085            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2086        {
2087            use crate::processing::StoreHandle;
2088
2089            let objectstore = self.inner.addrs.objectstore.as_ref();
2090            let global_config = &self.inner.global_config.current();
2091            let handle = StoreHandle::new(store_forwarder, objectstore, global_config);
2092
2093            match submit {
2094                // Once check-ins and errors are fully moved to the new pipeline, this is only
2095                // used for metrics forwarding.
2096                //
2097                // Metrics forwarding will n_never_ forward an envelope in processing, making
2098                // this branch here unused.
2099                Submit::Envelope(envelope) => handle.send_envelope(envelope.into_inner()),
2100                Submit::Output { output, ctx } => output
2101                    .forward_store(handle, ctx)
2102                    .unwrap_or_else(|err| err.into_inner()),
2103            }
2104            return;
2105        }
2106
2107        let mut envelope = match submit {
2108            Submit::Envelope(envelope) => envelope,
2109            Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2110                Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2111                Err(_) => {
2112                    relay_log::error!("failed to serialize output to an envelope");
2113                    return;
2114                }
2115            },
2116        };
2117
2118        if envelope.envelope_mut().is_empty() {
2119            envelope.accept();
2120            return;
2121        }
2122
2123        // Override the `sent_at` timestamp. Since the envelope went through basic
2124        // normalization, all timestamps have been corrected. We propagate the new
2125        // `sent_at` to allow the next Relay to double-check this timestamp and
2126        // potentially apply correction again. This is done as close to sending as
2127        // possible so that we avoid internal delays.
2128        envelope.envelope_mut().set_sent_at(Utc::now());
2129
2130        relay_log::trace!("sending envelope to sentry endpoint");
2131        let http_encoding = self.inner.config.http_encoding();
2132        let result = envelope.envelope().to_vec().and_then(|v| {
2133            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2134        });
2135
2136        match result {
2137            Ok(body) => {
2138                self.inner
2139                    .addrs
2140                    .upstream_relay
2141                    .send(SendRequest(SendEnvelope {
2142                        envelope,
2143                        body,
2144                        http_encoding,
2145                        project_cache: self.inner.project_cache.clone(),
2146                    }));
2147            }
2148            Err(error) => {
2149                // Errors are only logged for what we consider an internal discard reason. These
2150                // indicate errors in the infrastructure or implementation bugs.
2151                relay_log::error!(
2152                    error = &error as &dyn Error,
2153                    tags.project_key = %envelope.scoping().project_key,
2154                    "failed to serialize envelope payload"
2155                );
2156
2157                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2158            }
2159        }
2160    }
2161
2162    fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2163        let SubmitClientReports {
2164            client_reports,
2165            scoping,
2166        } = message;
2167
2168        let upstream = self.inner.config.upstream_descriptor();
2169        let dsn = PartialDsn::outbound(&scoping, upstream);
2170
2171        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2172        for client_report in client_reports {
2173            match client_report.serialize() {
2174                Ok(payload) => {
2175                    let mut item = Item::new(ItemType::ClientReport);
2176                    item.set_payload(ContentType::Json, payload);
2177                    envelope.add_item(item);
2178                }
2179                Err(error) => {
2180                    relay_log::error!(
2181                        error = &error as &dyn std::error::Error,
2182                        "failed to serialize client report"
2183                    );
2184                }
2185            }
2186        }
2187
2188        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2189        self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2190    }
2191
2192    fn check_buckets(
2193        &self,
2194        project_key: ProjectKey,
2195        project_info: &ProjectInfo,
2196        rate_limits: &RateLimits,
2197        buckets: Vec<Bucket>,
2198    ) -> Vec<Bucket> {
2199        let Some(scoping) = project_info.scoping(project_key) else {
2200            relay_log::error!(
2201                tags.project_key = project_key.as_str(),
2202                "there is no scoping: dropping {} buckets",
2203                buckets.len(),
2204            );
2205            return Vec::new();
2206        };
2207
2208        let mut buckets = self::metrics::apply_project_info(
2209            buckets,
2210            &self.inner.metric_outcomes,
2211            project_info,
2212            scoping,
2213        );
2214
2215        let namespaces: BTreeSet<MetricNamespace> = buckets
2216            .iter()
2217            .filter_map(|bucket| bucket.name.try_namespace())
2218            .collect();
2219
2220        for namespace in namespaces {
2221            let limits = rate_limits.check_with_quotas(
2222                project_info.get_quotas(),
2223                scoping.item(DataCategory::MetricBucket),
2224            );
2225
2226            if limits.is_limited() {
2227                let rejected;
2228                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2229                    bucket.name.try_namespace() == Some(namespace)
2230                });
2231
2232                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2233                self.inner.metric_outcomes.track(
2234                    scoping,
2235                    &rejected,
2236                    Outcome::RateLimited(reason_code),
2237                );
2238            }
2239        }
2240
2241        let quotas = project_info.config.quotas.clone();
2242        match MetricsLimiter::create(buckets, quotas, scoping) {
2243            Ok(mut bucket_limiter) => {
2244                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2245                bucket_limiter.into_buckets()
2246            }
2247            Err(buckets) => buckets,
2248        }
2249    }
2250
2251    #[cfg(feature = "processing")]
2252    async fn rate_limit_buckets(
2253        &self,
2254        scoping: Scoping,
2255        project_info: &ProjectInfo,
2256        mut buckets: Vec<Bucket>,
2257    ) -> Vec<Bucket> {
2258        let Some(rate_limiter) = &self.inner.rate_limiter else {
2259            return buckets;
2260        };
2261
2262        let global_config = self.inner.global_config.current();
2263        let namespaces = buckets
2264            .iter()
2265            .filter_map(|bucket| bucket.name.try_namespace())
2266            .counts();
2267
2268        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2269
2270        for (namespace, quantity) in namespaces {
2271            let item_scoping = scoping.metric_bucket(namespace);
2272
2273            let limits = match rate_limiter
2274                .is_rate_limited(quotas, item_scoping, quantity, false)
2275                .await
2276            {
2277                Ok(limits) => limits,
2278                Err(err) => {
2279                    relay_log::error!(
2280                        error = &err as &dyn std::error::Error,
2281                        "failed to check redis rate limits"
2282                    );
2283                    break;
2284                }
2285            };
2286
2287            if limits.is_limited() {
2288                let rejected;
2289                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2290                    bucket.name.try_namespace() == Some(namespace)
2291                });
2292
2293                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2294                self.inner.metric_outcomes.track(
2295                    scoping,
2296                    &rejected,
2297                    Outcome::RateLimited(reason_code),
2298                );
2299
2300                self.inner
2301                    .project_cache
2302                    .get(item_scoping.scoping.project_key)
2303                    .rate_limits()
2304                    .merge(limits);
2305            }
2306        }
2307
2308        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2309            Err(buckets) => buckets,
2310            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2311        }
2312    }
2313
2314    /// Check and apply rate limits to metrics buckets for transactions and spans.
2315    #[cfg(feature = "processing")]
2316    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2317        relay_log::trace!("handle_rate_limit_buckets");
2318
2319        let scoping = *bucket_limiter.scoping();
2320
2321        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2322            let global_config = self.inner.global_config.current();
2323            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2324
2325            // We set over_accept_once such that the limit is actually reached, which allows subsequent
2326            // calls with quantity=0 to be rate limited.
2327            let over_accept_once = true;
2328            let mut rate_limits = RateLimits::new();
2329
2330            for category in [DataCategory::Transaction, DataCategory::Span] {
2331                let count = bucket_limiter.count(category);
2332
2333                let timer = Instant::now();
2334                let mut is_limited = false;
2335
2336                if let Some(count) = count {
2337                    match rate_limiter
2338                        .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2339                        .await
2340                    {
2341                        Ok(limits) => {
2342                            is_limited = limits.is_limited();
2343                            rate_limits.merge(limits)
2344                        }
2345                        Err(e) => {
2346                            relay_log::error!(error = &e as &dyn Error, "rate limiting error")
2347                        }
2348                    }
2349                }
2350
2351                relay_statsd::metric!(
2352                    timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2353                    category = category.name(),
2354                    limited = if is_limited { "true" } else { "false" },
2355                    count = match count {
2356                        None => "none",
2357                        Some(0) => "0",
2358                        Some(1) => "1",
2359                        Some(1..=10) => "10",
2360                        Some(1..=25) => "25",
2361                        Some(1..=50) => "50",
2362                        Some(51..=100) => "100",
2363                        Some(101..=500) => "500",
2364                        _ => "> 500",
2365                    },
2366                );
2367            }
2368
2369            if rate_limits.is_limited() {
2370                let was_enforced =
2371                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2372
2373                if was_enforced {
2374                    // Update the rate limits in the project cache.
2375                    self.inner
2376                        .project_cache
2377                        .get(scoping.project_key)
2378                        .rate_limits()
2379                        .merge(rate_limits);
2380                }
2381            }
2382        }
2383
2384        bucket_limiter.into_buckets()
2385    }
2386
2387    /// Cardinality limits the passed buckets and returns a filtered vector of only accepted buckets.
2388    #[cfg(feature = "processing")]
2389    async fn cardinality_limit_buckets(
2390        &self,
2391        scoping: Scoping,
2392        limits: &[CardinalityLimit],
2393        buckets: Vec<Bucket>,
2394    ) -> Vec<Bucket> {
2395        let global_config = self.inner.global_config.current();
2396        let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2397
2398        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2399            return buckets;
2400        }
2401
2402        let Some(ref limiter) = self.inner.cardinality_limiter else {
2403            return buckets;
2404        };
2405
2406        let scope = relay_cardinality::Scoping {
2407            organization_id: scoping.organization_id,
2408            project_id: scoping.project_id,
2409        };
2410
2411        let limits = match limiter
2412            .check_cardinality_limits(scope, limits, buckets)
2413            .await
2414        {
2415            Ok(limits) => limits,
2416            Err((buckets, error)) => {
2417                relay_log::error!(
2418                    error = &error as &dyn std::error::Error,
2419                    "cardinality limiter failed"
2420                );
2421                return buckets;
2422            }
2423        };
2424
2425        let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2426        if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2427            for limit in limits.exceeded_limits() {
2428                relay_log::with_scope(
2429                    |scope| {
2430                        // Set the organization as user so we can alert on distinct org_ids.
2431                        scope.set_user(Some(relay_log::sentry::User {
2432                            id: Some(scoping.organization_id.to_string()),
2433                            ..Default::default()
2434                        }));
2435                    },
2436                    || {
2437                        relay_log::error!(
2438                            tags.organization_id = scoping.organization_id.value(),
2439                            tags.limit_id = limit.id,
2440                            tags.passive = limit.passive,
2441                            "Cardinality Limit"
2442                        );
2443                    },
2444                );
2445            }
2446        }
2447
2448        for (limit, reports) in limits.cardinality_reports() {
2449            for report in reports {
2450                self.inner
2451                    .metric_outcomes
2452                    .cardinality(scoping, limit, report);
2453            }
2454        }
2455
2456        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2457            return limits.into_source();
2458        }
2459
2460        let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2461
2462        for (bucket, exceeded) in rejected {
2463            self.inner.metric_outcomes.track(
2464                scoping,
2465                &[bucket],
2466                Outcome::CardinalityLimited(exceeded.id.clone()),
2467            );
2468        }
2469        accepted
2470    }
2471
2472    /// Processes metric buckets and sends them to kafka.
2473    ///
2474    /// This function runs the following steps:
2475    ///  - cardinality limiting
2476    ///  - rate limiting
2477    ///  - submit to `StoreForwarder`
2478    #[cfg(feature = "processing")]
2479    async fn encode_metrics_processing(
2480        &self,
2481        message: FlushBuckets,
2482        store_forwarder: &Addr<Store>,
2483    ) {
2484        use crate::constants::DEFAULT_EVENT_RETENTION;
2485        use crate::services::store::StoreMetrics;
2486
2487        for ProjectBuckets {
2488            buckets,
2489            scoping,
2490            project_info,
2491            ..
2492        } in message.buckets.into_values()
2493        {
2494            let buckets = self
2495                .rate_limit_buckets(scoping, &project_info, buckets)
2496                .await;
2497
2498            let limits = project_info.get_cardinality_limits();
2499            let buckets = self
2500                .cardinality_limit_buckets(scoping, limits, buckets)
2501                .await;
2502
2503            if buckets.is_empty() {
2504                continue;
2505            }
2506
2507            let retention = project_info
2508                .config
2509                .event_retention
2510                .unwrap_or(DEFAULT_EVENT_RETENTION);
2511
2512            // The store forwarder takes care of bucket splitting internally, so we can submit the
2513            // entire list of buckets. There is no batching needed here.
2514            store_forwarder.send(StoreMetrics {
2515                buckets,
2516                scoping,
2517                retention,
2518            });
2519        }
2520    }
2521
2522    /// Serializes metric buckets to JSON and sends them to the upstream.
2523    ///
2524    /// This function runs the following steps:
2525    ///  - partitioning
2526    ///  - batching by configured size limit
2527    ///  - serialize to JSON and pack in an envelope
2528    ///  - submit the envelope to upstream or kafka depending on configuration
2529    ///
2530    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2531    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2532    /// already.
2533    fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2534        let FlushBuckets {
2535            partition_key,
2536            buckets,
2537        } = message;
2538
2539        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2540        let upstream = self.inner.config.upstream_descriptor();
2541
2542        for ProjectBuckets {
2543            buckets, scoping, ..
2544        } in buckets.values()
2545        {
2546            let dsn = PartialDsn::outbound(scoping, upstream);
2547
2548            relay_statsd::metric!(
2549                distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2550            );
2551
2552            let mut num_batches = 0;
2553            for batch in BucketsView::from(buckets).by_size(batch_size) {
2554                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2555
2556                let mut item = Item::new(ItemType::MetricBuckets);
2557                item.set_source_quantities(crate::metrics::extract_quantities(batch));
2558                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2559                envelope.add_item(item);
2560
2561                let mut envelope =
2562                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2563                envelope
2564                    .set_partition_key(Some(partition_key))
2565                    .scope(*scoping);
2566
2567                relay_statsd::metric!(
2568                    distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2569                );
2570
2571                self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2572                num_batches += 1;
2573            }
2574
2575            relay_statsd::metric!(
2576                distribution(RelayDistributions::BatchesPerPartition) = num_batches
2577            );
2578        }
2579    }
2580
2581    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
2582    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2583        if partition.is_empty() {
2584            return;
2585        }
2586
2587        let (unencoded, project_info) = partition.take();
2588        let http_encoding = self.inner.config.http_encoding();
2589        let encoded = match encode_payload(&unencoded, http_encoding) {
2590            Ok(payload) => payload,
2591            Err(error) => {
2592                let error = &error as &dyn std::error::Error;
2593                relay_log::error!(error, "failed to encode metrics payload");
2594                return;
2595            }
2596        };
2597
2598        let request = SendMetricsRequest {
2599            partition_key: partition_key.to_string(),
2600            unencoded,
2601            encoded,
2602            project_info,
2603            http_encoding,
2604            metric_outcomes: self.inner.metric_outcomes.clone(),
2605        };
2606
2607        self.inner.addrs.upstream_relay.send(SendRequest(request));
2608    }
2609
2610    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
2611    ///
2612    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
2613    /// payload directly instead of per-project Envelopes.
2614    ///
2615    /// This function runs the following steps:
2616    ///  - partitioning
2617    ///  - batching by configured size limit
2618    ///  - serialize to JSON
2619    ///  - submit directly to the upstream
2620    ///
2621    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2622    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2623    /// already.
2624    fn encode_metrics_global(&self, message: FlushBuckets) {
2625        let FlushBuckets {
2626            partition_key,
2627            buckets,
2628        } = message;
2629
2630        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2631        let mut partition = Partition::new(batch_size);
2632        let mut partition_splits = 0;
2633
2634        for ProjectBuckets {
2635            buckets, scoping, ..
2636        } in buckets.values()
2637        {
2638            for bucket in buckets {
2639                let mut remaining = Some(BucketView::new(bucket));
2640
2641                while let Some(bucket) = remaining.take() {
2642                    if let Some(next) = partition.insert(bucket, *scoping) {
2643                        // A part of the bucket could not be inserted. Take the partition and submit
2644                        // it immediately. Repeat until the final part was inserted. This should
2645                        // always result in a request, otherwise we would enter an endless loop.
2646                        self.send_global_partition(partition_key, &mut partition);
2647                        remaining = Some(next);
2648                        partition_splits += 1;
2649                    }
2650                }
2651            }
2652        }
2653
2654        if partition_splits > 0 {
2655            metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2656        }
2657
2658        self.send_global_partition(partition_key, &mut partition);
2659    }
2660
2661    async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2662        for (project_key, pb) in message.buckets.iter_mut() {
2663            let buckets = std::mem::take(&mut pb.buckets);
2664            pb.buckets =
2665                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2666        }
2667
2668        #[cfg(feature = "processing")]
2669        if self.inner.config.processing_enabled()
2670            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2671        {
2672            return self
2673                .encode_metrics_processing(message, store_forwarder)
2674                .await;
2675        }
2676
2677        if self.inner.config.http_global_metrics() {
2678            self.encode_metrics_global(message)
2679        } else {
2680            self.encode_metrics_envelope(cogs, message)
2681        }
2682    }
2683
2684    #[cfg(all(test, feature = "processing"))]
2685    fn redis_rate_limiter_enabled(&self) -> bool {
2686        self.inner.rate_limiter.is_some()
2687    }
2688
2689    async fn handle_message(&self, message: EnvelopeProcessor) {
2690        let ty = message.variant();
2691        let feature_weights = self.feature_weights(&message);
2692
2693        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2694            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2695
2696            match message {
2697                EnvelopeProcessor::ProcessEnvelope(m) => {
2698                    self.handle_process_envelope(&mut cogs, *m).await
2699                }
2700                EnvelopeProcessor::ProcessProjectMetrics(m) => {
2701                    self.handle_process_metrics(&mut cogs, *m)
2702                }
2703                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2704                    self.handle_process_batched_metrics(&mut cogs, *m)
2705                }
2706                EnvelopeProcessor::FlushBuckets(m) => {
2707                    self.handle_flush_buckets(&mut cogs, *m).await
2708                }
2709                EnvelopeProcessor::SubmitClientReports(m) => {
2710                    self.handle_submit_client_reports(&mut cogs, *m)
2711                }
2712            }
2713        });
2714    }
2715
2716    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2717        match message {
2718            // Envelope is split later and tokens are attributed then.
2719            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2720            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2721            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2722            EnvelopeProcessor::FlushBuckets(v) => v
2723                .buckets
2724                .values()
2725                .map(|s| {
2726                    if self.inner.config.processing_enabled() {
2727                        // Processing does not encode the metrics but instead rate and cardinality
2728                        // limits the metrics, which scales by count and not size.
2729                        relay_metrics::cogs::ByCount(&s.buckets).into()
2730                    } else {
2731                        relay_metrics::cogs::BySize(&s.buckets).into()
2732                    }
2733                })
2734                .fold(FeatureWeights::none(), FeatureWeights::merge),
2735            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2736        }
2737    }
2738}
2739
2740impl Service for EnvelopeProcessorService {
2741    type Interface = EnvelopeProcessor;
2742
2743    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2744        while let Some(message) = rx.recv().await {
2745            let service = self.clone();
2746            self.inner
2747                .pool
2748                .spawn_async(
2749                    async move {
2750                        service.handle_message(message).await;
2751                    }
2752                    .boxed(),
2753                )
2754                .await;
2755        }
2756    }
2757}
2758
2759/// Result of the enforcement of rate limiting.
2760///
2761/// If the event is already `None` or it's rate limited, it will be `None`
2762/// within the [`Annotated`].
2763struct EnforcementResult {
2764    event: Annotated<Event>,
2765    #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2766    rate_limits: RateLimits,
2767}
2768
2769impl EnforcementResult {
2770    /// Creates a new [`EnforcementResult`].
2771    pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
2772        Self { event, rate_limits }
2773    }
2774}
2775
2776#[derive(Clone)]
2777enum RateLimiter {
2778    Cached,
2779    #[cfg(feature = "processing")]
2780    Consistent(Arc<RedisRateLimiter>),
2781}
2782
2783impl RateLimiter {
2784    async fn enforce<Group>(
2785        &self,
2786        managed_envelope: &mut TypedEnvelope<Group>,
2787        event: Annotated<Event>,
2788        _extracted_metrics: &mut ProcessingExtractedMetrics,
2789        ctx: processing::Context<'_>,
2790    ) -> Result<EnforcementResult, ProcessingError> {
2791        if managed_envelope.envelope().is_empty() && event.value().is_none() {
2792            return Ok(EnforcementResult::new(event, RateLimits::default()));
2793        }
2794
2795        let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
2796        if quotas.is_empty() {
2797            return Ok(EnforcementResult::new(event, RateLimits::default()));
2798        }
2799
2800        let event_category = event_category(&event);
2801
2802        // We extract the rate limiters, in case we perform consistent rate limiting, since we will
2803        // need Redis access.
2804        //
2805        // When invoking the rate limiter, capture if the event item has been rate limited to also
2806        // remove it from the processing state eventually.
2807        let this = self.clone();
2808        let mut envelope_limiter =
2809            EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
2810                let this = this.clone();
2811
2812                async move {
2813                    match this {
2814                        #[cfg(feature = "processing")]
2815                        RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
2816                            rate_limiter
2817                                .is_rate_limited(quotas, item_scope, _quantity, false)
2818                                .await?,
2819                        ),
2820                        _ => Ok::<_, ProcessingError>(
2821                            ctx.rate_limits.check_with_quotas(quotas, item_scope),
2822                        ),
2823                    }
2824                }
2825            });
2826
2827        // Tell the envelope limiter about the event, since it has been removed from the Envelope at
2828        // this stage in processing.
2829        if let Some(category) = event_category {
2830            envelope_limiter.assume_event(category);
2831        }
2832
2833        let scoping = managed_envelope.scoping();
2834        let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
2835            envelope_limiter
2836                .compute(managed_envelope.envelope_mut(), &scoping)
2837                .await
2838        })?;
2839        let event_active = enforcement.is_event_active();
2840
2841        // Use the same rate limits as used for the envelope on the metrics.
2842        // Those rate limits should not be checked for expiry or similar to ensure a consistent
2843        // limiting of envelope items and metrics.
2844        #[cfg(feature = "processing")]
2845        _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
2846        enforcement.apply_with_outcomes(managed_envelope);
2847
2848        if event_active {
2849            debug_assert!(managed_envelope.envelope().is_empty());
2850            return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
2851        }
2852
2853        Ok(EnforcementResult::new(event, rate_limits))
2854    }
2855
2856    fn name(&self) -> &'static str {
2857        match self {
2858            Self::Cached => "cached",
2859            #[cfg(feature = "processing")]
2860            Self::Consistent(_) => "consistent",
2861        }
2862    }
2863}
2864
2865pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2866    let envelope_body: Vec<u8> = match http_encoding {
2867        HttpEncoding::Identity => return Ok(body.clone()),
2868        HttpEncoding::Deflate => {
2869            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2870            encoder.write_all(body.as_ref())?;
2871            encoder.finish()?
2872        }
2873        HttpEncoding::Gzip => {
2874            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2875            encoder.write_all(body.as_ref())?;
2876            encoder.finish()?
2877        }
2878        HttpEncoding::Br => {
2879            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
2880            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2881            encoder.write_all(body.as_ref())?;
2882            encoder.into_inner()
2883        }
2884        HttpEncoding::Zstd => {
2885            // Use the fastest compression level, our main objective here is to get the best
2886            // compression ratio for least amount of time spent.
2887            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2888            encoder.write_all(body.as_ref())?;
2889            encoder.finish()?
2890        }
2891    };
2892
2893    Ok(envelope_body.into())
2894}
2895
2896/// An upstream request that submits an envelope via HTTP.
2897#[derive(Debug)]
2898pub struct SendEnvelope {
2899    pub envelope: TypedEnvelope<Processed>,
2900    pub body: Bytes,
2901    pub http_encoding: HttpEncoding,
2902    pub project_cache: ProjectCacheHandle,
2903}
2904
2905impl UpstreamRequest for SendEnvelope {
2906    fn method(&self) -> reqwest::Method {
2907        reqwest::Method::POST
2908    }
2909
2910    fn path(&self) -> Cow<'_, str> {
2911        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2912    }
2913
2914    fn route(&self) -> &'static str {
2915        "envelope"
2916    }
2917
2918    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2919        let envelope_body = self.body.clone();
2920        metric!(
2921            distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2922        );
2923
2924        let meta = &self.envelope.meta();
2925        let shard = self.envelope.partition_key().map(|p| p.to_string());
2926        builder
2927            .content_encoding(self.http_encoding)
2928            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2929            .header_opt("User-Agent", meta.user_agent())
2930            .header("X-Sentry-Auth", meta.auth_header())
2931            .header("X-Forwarded-For", meta.forwarded_for())
2932            .header("Content-Type", envelope::CONTENT_TYPE)
2933            .header_opt("X-Sentry-Relay-Shard", shard)
2934            .body(envelope_body);
2935
2936        Ok(())
2937    }
2938
2939    fn sign(&mut self) -> Option<Sign> {
2940        Some(Sign::Optional(SignatureType::RequestSign))
2941    }
2942
2943    fn respond(
2944        self: Box<Self>,
2945        result: Result<http::Response, UpstreamRequestError>,
2946    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2947        Box::pin(async move {
2948            let result = match result {
2949                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2950                Err(error) => Err(error),
2951            };
2952
2953            match result {
2954                Ok(()) => self.envelope.accept(),
2955                Err(error) if error.is_received() => {
2956                    let scoping = self.envelope.scoping();
2957                    self.envelope.accept();
2958
2959                    if let UpstreamRequestError::RateLimited(limits) = error {
2960                        self.project_cache
2961                            .get(scoping.project_key)
2962                            .rate_limits()
2963                            .merge(limits.scope(&scoping));
2964                    }
2965                }
2966                Err(error) => {
2967                    // Errors are only logged for what we consider an internal discard reason. These
2968                    // indicate errors in the infrastructure or implementation bugs.
2969                    let mut envelope = self.envelope;
2970                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2971                    relay_log::error!(
2972                        error = &error as &dyn Error,
2973                        tags.project_key = %envelope.scoping().project_key,
2974                        "error sending envelope"
2975                    );
2976                }
2977            }
2978        })
2979    }
2980}
2981
2982/// A container for metric buckets from multiple projects.
2983///
2984/// This container is used to send metrics to the upstream in global batches as part of the
2985/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
2986/// the size of all metrics and allows to split them into multiple batches. See
2987/// [`insert`](Self::insert) for more information.
2988#[derive(Debug)]
2989struct Partition<'a> {
2990    max_size: usize,
2991    remaining: usize,
2992    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2993    project_info: HashMap<ProjectKey, Scoping>,
2994}
2995
2996impl<'a> Partition<'a> {
2997    /// Creates a new partition with the given maximum size in bytes.
2998    pub fn new(size: usize) -> Self {
2999        Self {
3000            max_size: size,
3001            remaining: size,
3002            views: HashMap::new(),
3003            project_info: HashMap::new(),
3004        }
3005    }
3006
3007    /// Inserts a bucket into the partition, splitting it if necessary.
3008    ///
3009    /// This function attempts to add the bucket to this partition. If the bucket does not fit
3010    /// entirely into the partition given its maximum size, the remaining part of the bucket is
3011    /// returned from this function call.
3012    ///
3013    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
3014    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
3015    /// partition. Afterwards, the caller is responsible to call this function again with the
3016    /// remaining bucket until it is fully inserted.
3017    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3018        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3019
3020        if let Some(current) = current {
3021            self.remaining = self.remaining.saturating_sub(current.estimated_size());
3022            self.views
3023                .entry(scoping.project_key)
3024                .or_default()
3025                .push(current);
3026
3027            self.project_info
3028                .entry(scoping.project_key)
3029                .or_insert(scoping);
3030        }
3031
3032        next
3033    }
3034
3035    /// Returns `true` if the partition does not hold any data.
3036    fn is_empty(&self) -> bool {
3037        self.views.is_empty()
3038    }
3039
3040    /// Returns the serialized buckets for this partition.
3041    ///
3042    /// This empties the partition, so that it can be reused.
3043    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3044        #[derive(serde::Serialize)]
3045        struct Wrapper<'a> {
3046            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3047        }
3048
3049        let buckets = &self.views;
3050        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3051
3052        let scopings = self.project_info.clone();
3053        self.project_info.clear();
3054
3055        self.views.clear();
3056        self.remaining = self.max_size;
3057
3058        (payload, scopings)
3059    }
3060}
3061
3062/// An upstream request that submits metric buckets via HTTP.
3063///
3064/// This request is not awaited. It automatically tracks outcomes if the request is not received.
3065#[derive(Debug)]
3066struct SendMetricsRequest {
3067    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
3068    partition_key: String,
3069    /// Serialized metric buckets without encoding applied, used for signing.
3070    unencoded: Bytes,
3071    /// Serialized metric buckets with the stated HTTP encoding applied.
3072    encoded: Bytes,
3073    /// Mapping of all contained project keys to their scoping and extraction mode.
3074    ///
3075    /// Used to track outcomes for transmission failures.
3076    project_info: HashMap<ProjectKey, Scoping>,
3077    /// Encoding (compression) of the payload.
3078    http_encoding: HttpEncoding,
3079    /// Metric outcomes instance to send outcomes on error.
3080    metric_outcomes: MetricOutcomes,
3081}
3082
3083impl SendMetricsRequest {
3084    fn create_error_outcomes(self) {
3085        #[derive(serde::Deserialize)]
3086        struct Wrapper {
3087            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3088        }
3089
3090        let buckets = match serde_json::from_slice(&self.unencoded) {
3091            Ok(Wrapper { buckets }) => buckets,
3092            Err(err) => {
3093                relay_log::error!(
3094                    error = &err as &dyn std::error::Error,
3095                    "failed to parse buckets from failed transmission"
3096                );
3097                return;
3098            }
3099        };
3100
3101        for (key, buckets) in buckets {
3102            let Some(&scoping) = self.project_info.get(&key) else {
3103                relay_log::error!("missing scoping for project key");
3104                continue;
3105            };
3106
3107            self.metric_outcomes.track(
3108                scoping,
3109                &buckets,
3110                Outcome::Invalid(DiscardReason::Internal),
3111            );
3112        }
3113    }
3114}
3115
3116impl UpstreamRequest for SendMetricsRequest {
3117    fn set_relay_id(&self) -> bool {
3118        true
3119    }
3120
3121    fn sign(&mut self) -> Option<Sign> {
3122        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3123    }
3124
3125    fn method(&self) -> reqwest::Method {
3126        reqwest::Method::POST
3127    }
3128
3129    fn path(&self) -> Cow<'_, str> {
3130        "/api/0/relays/metrics/".into()
3131    }
3132
3133    fn route(&self) -> &'static str {
3134        "global_metrics"
3135    }
3136
3137    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3138        metric!(
3139            distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3140        );
3141
3142        builder
3143            .content_encoding(self.http_encoding)
3144            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3145            .header(header::CONTENT_TYPE, b"application/json")
3146            .body(self.encoded.clone());
3147
3148        Ok(())
3149    }
3150
3151    fn respond(
3152        self: Box<Self>,
3153        result: Result<http::Response, UpstreamRequestError>,
3154    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3155        Box::pin(async {
3156            match result {
3157                Ok(mut response) => {
3158                    response.consume().await.ok();
3159                }
3160                Err(error) => {
3161                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3162
3163                    // If the request did not arrive at the upstream, we are responsible for outcomes.
3164                    // Otherwise, the upstream is responsible to log outcomes.
3165                    if error.is_received() {
3166                        return;
3167                    }
3168
3169                    self.create_error_outcomes()
3170                }
3171            }
3172        })
3173    }
3174}
3175
3176/// Container for global and project level [`Quota`].
3177#[derive(Copy, Clone, Debug)]
3178struct CombinedQuotas<'a> {
3179    global_quotas: &'a [Quota],
3180    project_quotas: &'a [Quota],
3181}
3182
3183impl<'a> CombinedQuotas<'a> {
3184    /// Returns a new [`CombinedQuotas`].
3185    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3186        Self {
3187            global_quotas: &global_config.quotas,
3188            project_quotas,
3189        }
3190    }
3191
3192    /// Returns `true` if both global quotas and project quotas are empty.
3193    pub fn is_empty(&self) -> bool {
3194        self.len() == 0
3195    }
3196
3197    /// Returns the number of both global and project quotas.
3198    pub fn len(&self) -> usize {
3199        self.global_quotas.len() + self.project_quotas.len()
3200    }
3201}
3202
3203impl<'a> IntoIterator for CombinedQuotas<'a> {
3204    type Item = &'a Quota;
3205    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3206
3207    fn into_iter(self) -> Self::IntoIter {
3208        self.global_quotas.iter().chain(self.project_quotas.iter())
3209    }
3210}
3211
3212#[cfg(test)]
3213mod tests {
3214    use insta::assert_debug_snapshot;
3215    use relay_common::glob2::LazyGlob;
3216    use relay_dynamic_config::ProjectConfig;
3217    use relay_event_normalization::{
3218        NormalizationConfig, RedactionRule, TransactionNameConfig, TransactionNameRule,
3219    };
3220    use relay_event_schema::protocol::TransactionSource;
3221    use relay_pii::DataScrubbingConfig;
3222    use similar_asserts::assert_eq;
3223
3224    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3225
3226    #[cfg(feature = "processing")]
3227    use {
3228        relay_metrics::BucketValue,
3229        relay_quotas::{QuotaScope, ReasonCode},
3230        relay_test::mock_service,
3231    };
3232
3233    use super::*;
3234
3235    #[cfg(feature = "processing")]
3236    fn mock_quota(id: &str) -> Quota {
3237        Quota {
3238            id: Some(id.into()),
3239            categories: [DataCategory::MetricBucket].into(),
3240            scope: QuotaScope::Organization,
3241            scope_id: None,
3242            limit: Some(0),
3243            window: None,
3244            reason_code: None,
3245            namespace: None,
3246        }
3247    }
3248
3249    #[cfg(feature = "processing")]
3250    #[test]
3251    fn test_dynamic_quotas() {
3252        let global_config = GlobalConfig {
3253            quotas: vec![mock_quota("foo"), mock_quota("bar")],
3254            ..Default::default()
3255        };
3256
3257        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3258
3259        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3260
3261        assert_eq!(dynamic_quotas.len(), 4);
3262        assert!(!dynamic_quotas.is_empty());
3263
3264        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3265        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3266    }
3267
3268    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
3269    /// also ratelimit the next batches in the same message automatically.
3270    #[cfg(feature = "processing")]
3271    #[tokio::test]
3272    async fn test_ratelimit_per_batch() {
3273        use relay_base_schema::organization::OrganizationId;
3274        use relay_protocol::FiniteF64;
3275
3276        let rate_limited_org = Scoping {
3277            organization_id: OrganizationId::new(1),
3278            project_id: ProjectId::new(21),
3279            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3280            key_id: Some(17),
3281        };
3282
3283        let not_rate_limited_org = Scoping {
3284            organization_id: OrganizationId::new(2),
3285            project_id: ProjectId::new(21),
3286            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3287            key_id: Some(17),
3288        };
3289
3290        let message = {
3291            let project_info = {
3292                let quota = Quota {
3293                    id: Some("testing".into()),
3294                    categories: [DataCategory::MetricBucket].into(),
3295                    scope: relay_quotas::QuotaScope::Organization,
3296                    scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3297                    limit: Some(0),
3298                    window: None,
3299                    reason_code: Some(ReasonCode::new("test")),
3300                    namespace: None,
3301                };
3302
3303                let mut config = ProjectConfig::default();
3304                config.quotas.push(quota);
3305
3306                Arc::new(ProjectInfo {
3307                    config,
3308                    ..Default::default()
3309                })
3310            };
3311
3312            let project_metrics = |scoping| ProjectBuckets {
3313                buckets: vec![Bucket {
3314                    name: "d:transactions/bar".into(),
3315                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3316                    timestamp: UnixTimestamp::now(),
3317                    tags: Default::default(),
3318                    width: 10,
3319                    metadata: BucketMetadata::default(),
3320                }],
3321                rate_limits: Default::default(),
3322                project_info: project_info.clone(),
3323                scoping,
3324            };
3325
3326            let buckets = hashbrown::HashMap::from([
3327                (
3328                    rate_limited_org.project_key,
3329                    project_metrics(rate_limited_org),
3330                ),
3331                (
3332                    not_rate_limited_org.project_key,
3333                    project_metrics(not_rate_limited_org),
3334                ),
3335            ]);
3336
3337            FlushBuckets {
3338                partition_key: 0,
3339                buckets,
3340            }
3341        };
3342
3343        // ensure the order of the map while iterating is as expected.
3344        assert_eq!(message.buckets.keys().count(), 2);
3345
3346        let config = {
3347            let config_json = serde_json::json!({
3348                "processing": {
3349                    "enabled": true,
3350                    "kafka_config": [],
3351                    "redis": {
3352                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3353                    }
3354                }
3355            });
3356            Config::from_json_value(config_json).unwrap()
3357        };
3358
3359        let (store, handle) = {
3360            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3361                let org_id = match msg {
3362                    Store::Metrics(x) => x.scoping.organization_id,
3363                    _ => panic!("received envelope when expecting only metrics"),
3364                };
3365                org_ids.push(org_id);
3366            };
3367
3368            mock_service("store_forwarder", vec![], f)
3369        };
3370
3371        let processor = create_test_processor(config).await;
3372        assert!(processor.redis_rate_limiter_enabled());
3373
3374        processor.encode_metrics_processing(message, &store).await;
3375
3376        drop(store);
3377        let orgs_not_ratelimited = handle.await.unwrap();
3378
3379        assert_eq!(
3380            orgs_not_ratelimited,
3381            vec![not_rate_limited_org.organization_id]
3382        );
3383    }
3384
3385    #[tokio::test]
3386    async fn test_browser_version_extraction_with_pii_like_data() {
3387        let processor = create_test_processor(Default::default()).await;
3388        let outcome_aggregator = Addr::dummy();
3389        let event_id = EventId::new();
3390
3391        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3392            .parse()
3393            .unwrap();
3394
3395        let request_meta = RequestMeta::new(dsn);
3396        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3397
3398        envelope.add_item({
3399                let mut item = Item::new(ItemType::Event);
3400                item.set_payload(
3401                    ContentType::Json,
3402                    r#"
3403                    {
3404                        "request": {
3405                            "headers": [
3406                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3407                            ]
3408                        }
3409                    }
3410                "#,
3411                );
3412                item
3413            });
3414
3415        let mut datascrubbing_settings = DataScrubbingConfig::default();
3416        // enable all the default scrubbing
3417        datascrubbing_settings.scrub_data = true;
3418        datascrubbing_settings.scrub_defaults = true;
3419        datascrubbing_settings.scrub_ip_addresses = true;
3420
3421        // Make sure to mask any IP-like looking data
3422        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3423
3424        let config = ProjectConfig {
3425            datascrubbing_settings,
3426            pii_config: Some(pii_config),
3427            ..Default::default()
3428        };
3429
3430        let project_info = ProjectInfo {
3431            config,
3432            ..Default::default()
3433        };
3434
3435        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3436        assert_eq!(envelopes.len(), 1);
3437
3438        let (group, envelope) = envelopes.pop().unwrap();
3439        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3440
3441        let message = ProcessEnvelopeGrouped {
3442            group,
3443            envelope,
3444            ctx: processing::Context {
3445                project_info: &project_info,
3446                ..processing::Context::for_test()
3447            },
3448        };
3449
3450        let Ok(Some(Submit::Envelope(mut new_envelope))) = processor.process(message).await else {
3451            panic!();
3452        };
3453        let new_envelope = new_envelope.envelope_mut();
3454
3455        let event_item = new_envelope.items().last().unwrap();
3456        let annotated_event: Annotated<Event> =
3457            Annotated::from_json_bytes(&event_item.payload()).unwrap();
3458        let event = annotated_event.into_value().unwrap();
3459        let headers = event
3460            .request
3461            .into_value()
3462            .unwrap()
3463            .headers
3464            .into_value()
3465            .unwrap();
3466
3467        // IP-like data must be masked
3468        assert_eq!(
3469            Some(
3470                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3471            ),
3472            headers.get_header("User-Agent")
3473        );
3474        // But we still get correct browser and version number
3475        let contexts = event.contexts.into_value().unwrap();
3476        let browser = contexts.0.get("browser").unwrap();
3477        assert_eq!(
3478            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3479            browser.to_json().unwrap()
3480        );
3481    }
3482
3483    #[tokio::test]
3484    #[cfg(feature = "processing")]
3485    async fn test_materialize_dsc() {
3486        use crate::services::projects::project::PublicKeyConfig;
3487
3488        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3489            .parse()
3490            .unwrap();
3491        let request_meta = RequestMeta::new(dsn);
3492        let mut envelope = Envelope::from_request(None, request_meta);
3493
3494        let dsc = r#"{
3495            "trace_id": "00000000-0000-0000-0000-000000000000",
3496            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3497            "sample_rate": "0.2"
3498        }"#;
3499        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3500
3501        let mut item = Item::new(ItemType::Event);
3502        item.set_payload(ContentType::Json, r#"{}"#);
3503        envelope.add_item(item);
3504
3505        let outcome_aggregator = Addr::dummy();
3506        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3507
3508        let mut project_info = ProjectInfo::default();
3509        project_info.public_keys.push(PublicKeyConfig {
3510            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3511            numeric_id: Some(1),
3512        });
3513
3514        let config = serde_json::json!({
3515            "processing": {
3516                "enabled": true,
3517                "kafka_config": [],
3518            }
3519        });
3520
3521        let message = ProcessEnvelopeGrouped {
3522            group: ProcessingGroup::Error,
3523            envelope: managed_envelope,
3524            ctx: processing::Context {
3525                config: &Config::from_json_value(config.clone()).unwrap(),
3526                project_info: &project_info,
3527                sampling_project_info: Some(&project_info),
3528                ..processing::Context::for_test()
3529            },
3530        };
3531
3532        let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3533        let Ok(Some(Submit::Envelope(envelope))) = processor.process(message).await else {
3534            panic!();
3535        };
3536        let event = envelope
3537            .envelope()
3538            .get_item_by(|item| item.ty() == &ItemType::Event)
3539            .unwrap();
3540
3541        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3542        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3543        Object(
3544            {
3545                "environment": ~,
3546                "public_key": String(
3547                    "e12d836b15bb49d7bbf99e64295d995b",
3548                ),
3549                "release": ~,
3550                "replay_id": ~,
3551                "sample_rate": String(
3552                    "0.2",
3553                ),
3554                "trace_id": String(
3555                    "00000000000000000000000000000000",
3556                ),
3557                "transaction": ~,
3558            },
3559        )
3560        "###);
3561    }
3562
3563    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3564        let mut event = Annotated::<Event>::from_json(
3565            r#"
3566            {
3567                "type": "transaction",
3568                "transaction": "/foo/",
3569                "timestamp": 946684810.0,
3570                "start_timestamp": 946684800.0,
3571                "contexts": {
3572                    "trace": {
3573                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3574                        "span_id": "fa90fdead5f74053",
3575                        "op": "http.server",
3576                        "type": "trace"
3577                    }
3578                },
3579                "transaction_info": {
3580                    "source": "url"
3581                }
3582            }
3583            "#,
3584        )
3585        .unwrap();
3586        let e = event.value_mut().as_mut().unwrap();
3587        e.transaction.set_value(Some(transaction_name.into()));
3588
3589        e.transaction_info
3590            .value_mut()
3591            .as_mut()
3592            .unwrap()
3593            .source
3594            .set_value(Some(source));
3595
3596        relay_statsd::with_capturing_test_client(|| {
3597            utils::log_transaction_name_metrics(&mut event, |event| {
3598                let config = NormalizationConfig {
3599                    transaction_name_config: TransactionNameConfig {
3600                        rules: &[TransactionNameRule {
3601                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
3602                            expiry: DateTime::<Utc>::MAX_UTC,
3603                            redaction: RedactionRule::Replace {
3604                                substitution: "*".to_owned(),
3605                            },
3606                        }],
3607                    },
3608                    ..Default::default()
3609                };
3610                relay_event_normalization::normalize_event(event, &config)
3611            });
3612        })
3613    }
3614
3615    #[test]
3616    fn test_log_transaction_metrics_none() {
3617        let captures = capture_test_event("/nothing", TransactionSource::Url);
3618        insta::assert_debug_snapshot!(captures, @r###"
3619        [
3620            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3621        ]
3622        "###);
3623    }
3624
3625    #[test]
3626    fn test_log_transaction_metrics_rule() {
3627        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3628        insta::assert_debug_snapshot!(captures, @r###"
3629        [
3630            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3631        ]
3632        "###);
3633    }
3634
3635    #[test]
3636    fn test_log_transaction_metrics_pattern() {
3637        let captures = capture_test_event("/something/12345", TransactionSource::Url);
3638        insta::assert_debug_snapshot!(captures, @r###"
3639        [
3640            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3641        ]
3642        "###);
3643    }
3644
3645    #[test]
3646    fn test_log_transaction_metrics_both() {
3647        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3648        insta::assert_debug_snapshot!(captures, @r###"
3649        [
3650            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3651        ]
3652        "###);
3653    }
3654
3655    #[test]
3656    fn test_log_transaction_metrics_no_match() {
3657        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3658        insta::assert_debug_snapshot!(captures, @r###"
3659        [
3660            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3661        ]
3662        "###);
3663    }
3664
3665    #[tokio::test]
3666    async fn test_process_metrics_bucket_metadata() {
3667        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3668        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3669        let received_at = Utc::now();
3670        let config = Config::default();
3671
3672        let (aggregator, mut aggregator_rx) = Addr::custom();
3673        let processor = create_test_processor_with_addrs(
3674            config,
3675            Addrs {
3676                aggregator,
3677                ..Default::default()
3678            },
3679        )
3680        .await;
3681
3682        let mut item = Item::new(ItemType::Statsd);
3683        item.set_payload(
3684            ContentType::Text,
3685            "transactions/foo:3182887624:4267882815|s",
3686        );
3687        for (source, expected_received_at) in [
3688            (
3689                BucketSource::External,
3690                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3691            ),
3692            (BucketSource::Internal, None),
3693        ] {
3694            let message = ProcessMetrics {
3695                data: MetricData::Raw(vec![item.clone()]),
3696                project_key,
3697                source,
3698                received_at,
3699                sent_at: Some(Utc::now()),
3700            };
3701            processor.handle_process_metrics(&mut token, message);
3702
3703            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3704            let buckets = merge_buckets.buckets;
3705            assert_eq!(buckets.len(), 1);
3706            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3707        }
3708    }
3709
3710    #[tokio::test]
3711    async fn test_process_batched_metrics() {
3712        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3713        let received_at = Utc::now();
3714        let config = Config::default();
3715
3716        let (aggregator, mut aggregator_rx) = Addr::custom();
3717        let processor = create_test_processor_with_addrs(
3718            config,
3719            Addrs {
3720                aggregator,
3721                ..Default::default()
3722            },
3723        )
3724        .await;
3725
3726        let payload = r#"{
3727    "buckets": {
3728        "11111111111111111111111111111111": [
3729            {
3730                "timestamp": 1615889440,
3731                "width": 0,
3732                "name": "d:custom/endpoint.response_time@millisecond",
3733                "type": "d",
3734                "value": [
3735                  68.0
3736                ],
3737                "tags": {
3738                  "route": "user_index"
3739                }
3740            }
3741        ],
3742        "22222222222222222222222222222222": [
3743            {
3744                "timestamp": 1615889440,
3745                "width": 0,
3746                "name": "d:custom/endpoint.cache_rate@none",
3747                "type": "d",
3748                "value": [
3749                  36.0
3750                ]
3751            }
3752        ]
3753    }
3754}
3755"#;
3756        let message = ProcessBatchedMetrics {
3757            payload: Bytes::from(payload),
3758            source: BucketSource::Internal,
3759            received_at,
3760            sent_at: Some(Utc::now()),
3761        };
3762        processor.handle_process_batched_metrics(&mut token, message);
3763
3764        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3765        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3766
3767        let mut messages = vec![mb1, mb2];
3768        messages.sort_by_key(|mb| mb.project_key);
3769
3770        let actual = messages
3771            .into_iter()
3772            .map(|mb| (mb.project_key, mb.buckets))
3773            .collect::<Vec<_>>();
3774
3775        assert_debug_snapshot!(actual, @r###"
3776        [
3777            (
3778                ProjectKey("11111111111111111111111111111111"),
3779                [
3780                    Bucket {
3781                        timestamp: UnixTimestamp(1615889440),
3782                        width: 0,
3783                        name: MetricName(
3784                            "d:custom/endpoint.response_time@millisecond",
3785                        ),
3786                        value: Distribution(
3787                            [
3788                                68.0,
3789                            ],
3790                        ),
3791                        tags: {
3792                            "route": "user_index",
3793                        },
3794                        metadata: BucketMetadata {
3795                            merges: 1,
3796                            received_at: None,
3797                            extracted_from_indexed: false,
3798                        },
3799                    },
3800                ],
3801            ),
3802            (
3803                ProjectKey("22222222222222222222222222222222"),
3804                [
3805                    Bucket {
3806                        timestamp: UnixTimestamp(1615889440),
3807                        width: 0,
3808                        name: MetricName(
3809                            "d:custom/endpoint.cache_rate@none",
3810                        ),
3811                        value: Distribution(
3812                            [
3813                                36.0,
3814                            ],
3815                        ),
3816                        tags: {},
3817                        metadata: BucketMetadata {
3818                            merges: 1,
3819                            received_at: None,
3820                            extracted_from_indexed: false,
3821                        },
3822                    },
3823                ],
3824            ),
3825        ]
3826        "###);
3827    }
3828}