relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27    ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_protocol::Annotated;
32use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
33use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision};
34use relay_statsd::metric;
35use relay_system::{Addr, FromMessage, NoResponse, Service};
36use reqwest::header;
37use smallvec::{SmallVec, smallvec};
38use zstd::stream::Encoder as ZstdEncoder;
39
40use crate::envelope::{
41    self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
42};
43use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
44use crate::integrations::Integration;
45use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
46use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
47use crate::metrics_extraction::transactions::ExtractedMetrics;
48use crate::metrics_extraction::transactions::types::ExtractMetricsError;
49use crate::processing::attachments::AttachmentProcessor;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::client_reports::ClientReportsProcessor;
52use crate::processing::errors::{ErrorsProcessor, SwitchProcessingError};
53use crate::processing::logs::LogsProcessor;
54use crate::processing::profile_chunks::ProfileChunksProcessor;
55use crate::processing::replays::ReplaysProcessor;
56use crate::processing::sessions::SessionsProcessor;
57use crate::processing::spans::SpansProcessor;
58use crate::processing::trace_attachments::TraceAttachmentsProcessor;
59use crate::processing::trace_metrics::TraceMetricsProcessor;
60use crate::processing::transactions::TransactionProcessor;
61use crate::processing::user_reports::UserReportsProcessor;
62use crate::processing::utils::event::{
63    EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
64    event_type,
65};
66use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
67use crate::service::ServiceError;
68use crate::services::global_config::GlobalConfigHandle;
69use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
70use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
71use crate::services::projects::cache::ProjectCacheHandle;
72use crate::services::projects::project::{ProjectInfo, ProjectState};
73use crate::services::upstream::{
74    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
75};
76use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
77use crate::utils::{self, CheckLimits, EnvelopeLimiter};
78use crate::{http, processing};
79use relay_threading::AsyncPool;
80#[cfg(feature = "processing")]
81use {
82    crate::services::objectstore::Objectstore,
83    crate::services::store::Store,
84    crate::utils::Enforcement,
85    itertools::Itertools,
86    relay_cardinality::{
87        CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
88        RedisSetLimiterOptions,
89    },
90    relay_dynamic_config::CardinalityLimiterMode,
91    relay_quotas::{RateLimitingError, RedisRateLimiter},
92    relay_redis::RedisClients,
93    std::time::Instant,
94    symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
95};
96
97mod attachment;
98mod dynamic_sampling;
99pub mod event;
100mod metrics;
101mod nel;
102mod profile;
103mod report;
104mod span;
105
106#[cfg(all(sentry, feature = "processing"))]
107pub mod playstation;
108mod standalone;
109#[cfg(feature = "processing")]
110mod unreal;
111
112#[cfg(feature = "processing")]
113mod nnswitch;
114
115/// Creates the block only if used with `processing` feature.
116///
117/// Provided code block will be executed only if the provided config has `processing_enabled` set.
118macro_rules! if_processing {
119    ($config:expr, $if_true:block) => {
120        #[cfg(feature = "processing")] {
121            if $config.processing_enabled() $if_true
122        }
123    };
124    ($config:expr, $if_true:block else $if_false:block) => {
125        {
126            #[cfg(feature = "processing")] {
127                if $config.processing_enabled() $if_true else $if_false
128            }
129            #[cfg(not(feature = "processing"))] {
130                $if_false
131            }
132        }
133    };
134}
135
136/// The minimum clock drift for correction to apply.
137pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
138
139#[derive(Debug)]
140pub struct GroupTypeError;
141
142impl Display for GroupTypeError {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        f.write_str("failed to convert processing group into corresponding type")
145    }
146}
147
148impl std::error::Error for GroupTypeError {}
149
150macro_rules! processing_group {
151    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
152        #[derive(Clone, Copy, Debug)]
153        pub struct $ty;
154
155        impl From<$ty> for ProcessingGroup {
156            fn from(_: $ty) -> Self {
157                ProcessingGroup::$variant
158            }
159        }
160
161        impl TryFrom<ProcessingGroup> for $ty {
162            type Error = GroupTypeError;
163
164            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
165                if matches!(value, ProcessingGroup::$variant) {
166                    return Ok($ty);
167                }
168                $($(
169                    if matches!(value, ProcessingGroup::$other) {
170                        return Ok($ty);
171                    }
172                )+)?
173                return Err(GroupTypeError);
174            }
175        }
176    };
177}
178
179/// A marker trait.
180///
181/// Should be used only with groups which are responsible for processing envelopes with events.
182pub trait EventProcessing {}
183
184processing_group!(TransactionGroup, Transaction);
185impl EventProcessing for TransactionGroup {}
186
187processing_group!(ErrorGroup, Error);
188impl EventProcessing for ErrorGroup {}
189
190processing_group!(SessionGroup, Session);
191processing_group!(
192    StandaloneGroup,
193    Standalone,
194    StandaloneAttachments,
195    StandaloneUserReports
196);
197processing_group!(ClientReportGroup, ClientReport);
198processing_group!(ReplayGroup, Replay);
199processing_group!(CheckInGroup, CheckIn);
200processing_group!(LogGroup, Log, Nel);
201processing_group!(TraceMetricGroup, TraceMetric);
202processing_group!(SpanGroup, Span);
203
204processing_group!(ProfileChunkGroup, ProfileChunk);
205processing_group!(MetricsGroup, Metrics);
206processing_group!(ForwardUnknownGroup, ForwardUnknown);
207processing_group!(Ungrouped, Ungrouped);
208
209/// Processed group type marker.
210///
211/// Marks the envelopes which passed through the processing pipeline.
212#[derive(Clone, Copy, Debug)]
213pub struct Processed;
214
215/// Describes the groups of the processable items.
216#[derive(Clone, Copy, Debug)]
217pub enum ProcessingGroup {
218    /// All the transaction related items.
219    ///
220    /// Includes transactions, related attachments, profiles.
221    Transaction,
222    /// All the items which require (have or create) events.
223    ///
224    /// This includes: errors, NEL, security reports, user reports, some of the
225    /// attachments.
226    Error,
227    /// Session events.
228    Session,
229    /// Standalone items which can be sent alone without any event attached to it in the current
230    /// envelope e.g. some attachments, user reports.
231    Standalone,
232    /// Standalone attachments
233    ///
234    /// Attachments that are send without an item that creates an event in the same envelope.
235    StandaloneAttachments,
236    /// Standalone user reports
237    ///
238    /// User reports that are send without an item that creates an event in the same envelope.
239    StandaloneUserReports,
240    /// Outcomes.
241    ClientReport,
242    /// Replays and ReplayRecordings.
243    Replay,
244    /// Crons.
245    CheckIn,
246    /// NEL reports.
247    Nel,
248    /// Logs.
249    Log,
250    /// Trace metrics.
251    TraceMetric,
252    /// Spans.
253    Span,
254    /// Span V2 spans.
255    SpanV2,
256    /// Metrics.
257    Metrics,
258    /// ProfileChunk.
259    ProfileChunk,
260    /// V2 attachments without span / log association.
261    TraceAttachment,
262    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
263    /// decide what to do with them.
264    ForwardUnknown,
265    /// All the items in the envelope that could not be grouped.
266    Ungrouped,
267}
268
269impl ProcessingGroup {
270    /// Splits provided envelope into list of tuples of groups with associated envelopes.
271    fn split_envelope(
272        mut envelope: Envelope,
273        project_info: &ProjectInfo,
274    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
275        let headers = envelope.headers().clone();
276        let mut grouped_envelopes = smallvec![];
277
278        // Extract replays.
279        let replay_items = envelope.take_items_by(|item| {
280            matches!(
281                item.ty(),
282                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
283            )
284        });
285        if !replay_items.is_empty() {
286            grouped_envelopes.push((
287                ProcessingGroup::Replay,
288                Envelope::from_parts(headers.clone(), replay_items),
289            ))
290        }
291
292        // Keep all the sessions together in one envelope.
293        let session_items = envelope
294            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
295        if !session_items.is_empty() {
296            grouped_envelopes.push((
297                ProcessingGroup::Session,
298                Envelope::from_parts(headers.clone(), session_items),
299            ))
300        }
301
302        let span_v2_items = envelope.take_items_by(|item| {
303            let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
304
305            ItemContainer::<SpanV2>::is_container(item)
306                || matches!(item.integration(), Some(Integration::Spans(_)))
307                // Process standalone spans (v1) via the v2 pipeline
308                || (exp_feature && matches!(item.ty(), &ItemType::Span))
309                || (exp_feature && item.is_span_attachment())
310        });
311
312        if !span_v2_items.is_empty() {
313            grouped_envelopes.push((
314                ProcessingGroup::SpanV2,
315                Envelope::from_parts(headers.clone(), span_v2_items),
316            ))
317        }
318
319        // Extract spans.
320        let span_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Span));
321        if !span_items.is_empty() {
322            grouped_envelopes.push((
323                ProcessingGroup::Span,
324                Envelope::from_parts(headers.clone(), span_items),
325            ))
326        }
327
328        // Extract logs.
329        let logs_items = envelope.take_items_by(|item| {
330            matches!(item.ty(), &ItemType::Log)
331                || matches!(item.integration(), Some(Integration::Logs(_)))
332        });
333        if !logs_items.is_empty() {
334            grouped_envelopes.push((
335                ProcessingGroup::Log,
336                Envelope::from_parts(headers.clone(), logs_items),
337            ))
338        }
339
340        // Extract trace metrics.
341        let trace_metric_items =
342            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
343        if !trace_metric_items.is_empty() {
344            grouped_envelopes.push((
345                ProcessingGroup::TraceMetric,
346                Envelope::from_parts(headers.clone(), trace_metric_items),
347            ))
348        }
349
350        // NEL items are transformed into logs in their own processing step.
351        let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
352        if !nel_items.is_empty() {
353            grouped_envelopes.push((
354                ProcessingGroup::Nel,
355                Envelope::from_parts(headers.clone(), nel_items),
356            ))
357        }
358
359        // Extract all metric items.
360        //
361        // Note: Should only be relevant in proxy mode. In other modes we send metrics through
362        // a separate pipeline.
363        let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
364        if !metric_items.is_empty() {
365            grouped_envelopes.push((
366                ProcessingGroup::Metrics,
367                Envelope::from_parts(headers.clone(), metric_items),
368            ))
369        }
370
371        // Extract profile chunks.
372        let profile_chunk_items =
373            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
374        if !profile_chunk_items.is_empty() {
375            grouped_envelopes.push((
376                ProcessingGroup::ProfileChunk,
377                Envelope::from_parts(headers.clone(), profile_chunk_items),
378            ))
379        }
380
381        let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
382        if !trace_attachment_items.is_empty() {
383            grouped_envelopes.push((
384                ProcessingGroup::TraceAttachment,
385                Envelope::from_parts(headers.clone(), trace_attachment_items),
386            ))
387        }
388
389        if !envelope.items().any(Item::creates_event) {
390            // Extract the standalone attachments
391            let standalone_attachments = envelope
392                .take_items_by(|i| i.requires_event() && matches!(i.ty(), ItemType::Attachment));
393            if !standalone_attachments.is_empty() {
394                grouped_envelopes.push((
395                    ProcessingGroup::StandaloneAttachments,
396                    Envelope::from_parts(headers.clone(), standalone_attachments),
397                ))
398            }
399
400            // Extract the standalone user reports
401            let standalone_user_reports =
402                envelope.take_items_by(|i| matches!(i.ty(), ItemType::UserReport));
403            if !standalone_user_reports.is_empty() {
404                grouped_envelopes.push((
405                    ProcessingGroup::StandaloneUserReports,
406                    Envelope::from_parts(headers.clone(), standalone_user_reports),
407                ));
408            }
409        }
410
411        // Extract all standalone items.
412        //
413        // Note: only if there are no items in the envelope which can create events, otherwise they
414        // will be in the same envelope with all require event items.
415        if !envelope.items().any(Item::creates_event) {
416            let standalone_items = envelope.take_items_by(Item::requires_event);
417            if !standalone_items.is_empty() {
418                grouped_envelopes.push((
419                    ProcessingGroup::Standalone,
420                    Envelope::from_parts(headers.clone(), standalone_items),
421                ))
422            }
423        };
424
425        // Make sure we create separate envelopes for each `RawSecurity` report.
426        let security_reports_items = envelope
427            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
428            .into_iter()
429            .map(|item| {
430                let headers = headers.clone();
431                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
432                let mut envelope = Envelope::from_parts(headers, items);
433                envelope.set_event_id(EventId::new());
434                (ProcessingGroup::Error, envelope)
435            });
436        grouped_envelopes.extend(security_reports_items);
437
438        // Extract all the items which require an event into separate envelope.
439        let require_event_items = envelope.take_items_by(Item::requires_event);
440        if !require_event_items.is_empty() {
441            let group = if require_event_items
442                .iter()
443                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
444            {
445                ProcessingGroup::Transaction
446            } else {
447                ProcessingGroup::Error
448            };
449
450            grouped_envelopes.push((
451                group,
452                Envelope::from_parts(headers.clone(), require_event_items),
453            ))
454        }
455
456        // Get the rest of the envelopes, one per item.
457        let envelopes = envelope.items_mut().map(|item| {
458            let headers = headers.clone();
459            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
460            let envelope = Envelope::from_parts(headers, items);
461            let item_type = item.ty();
462            let group = if matches!(item_type, &ItemType::CheckIn) {
463                ProcessingGroup::CheckIn
464            } else if matches!(item.ty(), &ItemType::ClientReport) {
465                ProcessingGroup::ClientReport
466            } else if matches!(item_type, &ItemType::Unknown(_)) {
467                ProcessingGroup::ForwardUnknown
468            } else {
469                // Cannot group this item type.
470                ProcessingGroup::Ungrouped
471            };
472
473            (group, envelope)
474        });
475        grouped_envelopes.extend(envelopes);
476
477        grouped_envelopes
478    }
479
480    /// Returns the name of the group.
481    pub fn variant(&self) -> &'static str {
482        match self {
483            ProcessingGroup::Transaction => "transaction",
484            ProcessingGroup::Error => "error",
485            ProcessingGroup::Session => "session",
486            ProcessingGroup::Standalone => "standalone",
487            ProcessingGroup::StandaloneAttachments => "standalone_attachment",
488            ProcessingGroup::StandaloneUserReports => "standalone_user_reports",
489            ProcessingGroup::ClientReport => "client_report",
490            ProcessingGroup::Replay => "replay",
491            ProcessingGroup::CheckIn => "check_in",
492            ProcessingGroup::Log => "log",
493            ProcessingGroup::TraceMetric => "trace_metric",
494            ProcessingGroup::Nel => "nel",
495            ProcessingGroup::Span => "span",
496            ProcessingGroup::SpanV2 => "span_v2",
497            ProcessingGroup::Metrics => "metrics",
498            ProcessingGroup::ProfileChunk => "profile_chunk",
499            ProcessingGroup::TraceAttachment => "trace_attachment",
500            ProcessingGroup::ForwardUnknown => "forward_unknown",
501            ProcessingGroup::Ungrouped => "ungrouped",
502        }
503    }
504}
505
506impl From<ProcessingGroup> for AppFeature {
507    fn from(value: ProcessingGroup) -> Self {
508        match value {
509            ProcessingGroup::Transaction => AppFeature::Transactions,
510            ProcessingGroup::Error => AppFeature::Errors,
511            ProcessingGroup::Session => AppFeature::Sessions,
512            ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
513            ProcessingGroup::StandaloneAttachments => AppFeature::UnattributedEnvelope,
514            ProcessingGroup::StandaloneUserReports => AppFeature::UserReports,
515            ProcessingGroup::ClientReport => AppFeature::ClientReports,
516            ProcessingGroup::Replay => AppFeature::Replays,
517            ProcessingGroup::CheckIn => AppFeature::CheckIns,
518            ProcessingGroup::Log => AppFeature::Logs,
519            ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
520            ProcessingGroup::Nel => AppFeature::Logs,
521            ProcessingGroup::Span => AppFeature::Spans,
522            ProcessingGroup::SpanV2 => AppFeature::Spans,
523            ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
524            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
525            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
526            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
527            ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
528        }
529    }
530}
531
532/// An error returned when handling [`ProcessEnvelope`].
533#[derive(Debug, thiserror::Error)]
534pub enum ProcessingError {
535    #[error("invalid json in event")]
536    InvalidJson(#[source] serde_json::Error),
537
538    #[error("invalid message pack event payload")]
539    InvalidMsgpack(#[from] rmp_serde::decode::Error),
540
541    #[cfg(feature = "processing")]
542    #[error("invalid unreal crash report")]
543    InvalidUnrealReport(#[source] Unreal4Error),
544
545    #[error("event payload too large")]
546    PayloadTooLarge(DiscardItemType),
547
548    #[error("invalid transaction event")]
549    InvalidTransaction,
550
551    #[error("the item is not allowed/supported in this envelope")]
552    UnsupportedItem,
553
554    #[error("envelope processor failed")]
555    ProcessingFailed(#[from] ProcessingAction),
556
557    #[error("duplicate {0} in event")]
558    DuplicateItem(ItemType),
559
560    #[error("failed to extract event payload")]
561    NoEventPayload,
562
563    #[error("missing project id in DSN")]
564    MissingProjectId,
565
566    #[error("invalid security report type: {0:?}")]
567    InvalidSecurityType(Bytes),
568
569    #[error("unsupported security report type")]
570    UnsupportedSecurityType,
571
572    #[error("invalid security report")]
573    InvalidSecurityReport(#[source] serde_json::Error),
574
575    #[error("invalid nel report")]
576    InvalidNelReport(#[source] NetworkReportError),
577
578    #[error("event filtered with reason: {0:?}")]
579    EventFiltered(FilterStatKey),
580
581    #[error("missing or invalid required event timestamp")]
582    InvalidTimestamp,
583
584    #[error("could not serialize event payload")]
585    SerializeFailed(#[source] serde_json::Error),
586
587    #[cfg(feature = "processing")]
588    #[error("failed to apply quotas")]
589    QuotasFailed(#[from] RateLimitingError),
590
591    #[error("invalid processing group type")]
592    InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
593
594    #[error("nintendo switch dying message processing failed {0:?}")]
595    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
596
597    #[cfg(all(sentry, feature = "processing"))]
598    #[error("playstation dump processing failed: {0}")]
599    InvalidPlaystationDump(String),
600
601    #[error("processing group does not match specific processor")]
602    ProcessingGroupMismatch,
603    #[error("new processing pipeline failed")]
604    ProcessingFailure,
605}
606
607impl ProcessingError {
608    pub fn to_outcome(&self) -> Option<Outcome> {
609        match self {
610            Self::PayloadTooLarge(payload_type) => {
611                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
612            }
613            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
614            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
615            Self::InvalidSecurityType(_) => {
616                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
617            }
618            Self::UnsupportedItem => Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)),
619            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
620            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
621            Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
622            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
623            Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
624            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
625            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
626            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
627            #[cfg(all(sentry, feature = "processing"))]
628            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
629            #[cfg(feature = "processing")]
630            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
631                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
632            }
633            #[cfg(feature = "processing")]
634            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
635            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
636                Some(Outcome::Invalid(DiscardReason::Internal))
637            }
638            #[cfg(feature = "processing")]
639            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
640            Self::MissingProjectId => None,
641            Self::EventFiltered(key) => Some(Outcome::Filtered(key.clone())),
642            Self::InvalidProcessingGroup(_) => None,
643
644            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
645            // Outcomes are emitted in the new processing pipeline already.
646            Self::ProcessingFailure => None,
647        }
648    }
649
650    fn is_unexpected(&self) -> bool {
651        self.to_outcome()
652            .is_some_and(|outcome| outcome.is_unexpected())
653    }
654}
655
656#[cfg(feature = "processing")]
657impl From<Unreal4Error> for ProcessingError {
658    fn from(err: Unreal4Error) -> Self {
659        match err.kind() {
660            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
661            _ => ProcessingError::InvalidUnrealReport(err),
662        }
663    }
664}
665
666impl From<ExtractMetricsError> for ProcessingError {
667    fn from(error: ExtractMetricsError) -> Self {
668        match error {
669            ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
670                Self::InvalidTimestamp
671            }
672        }
673    }
674}
675
676impl From<InvalidProcessingGroupType> for ProcessingError {
677    fn from(value: InvalidProcessingGroupType) -> Self {
678        Self::InvalidProcessingGroup(Box::new(value))
679    }
680}
681
682type ExtractedEvent = (Annotated<Event>, usize);
683
684/// A container for extracted metrics during processing.
685///
686/// The container enforces that the extracted metrics are correctly tagged
687/// with the dynamic sampling decision.
688#[derive(Debug)]
689pub struct ProcessingExtractedMetrics {
690    metrics: ExtractedMetrics,
691}
692
693impl ProcessingExtractedMetrics {
694    pub fn new() -> Self {
695        Self {
696            metrics: ExtractedMetrics::default(),
697        }
698    }
699
700    pub fn into_inner(self) -> ExtractedMetrics {
701        self.metrics
702    }
703
704    /// Extends the contained metrics with [`ExtractedMetrics`].
705    pub fn extend(
706        &mut self,
707        extracted: ExtractedMetrics,
708        sampling_decision: Option<SamplingDecision>,
709    ) {
710        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
711        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
712    }
713
714    /// Extends the contained project metrics.
715    pub fn extend_project_metrics<I>(
716        &mut self,
717        buckets: I,
718        sampling_decision: Option<SamplingDecision>,
719    ) where
720        I: IntoIterator<Item = Bucket>,
721    {
722        self.metrics
723            .project_metrics
724            .extend(buckets.into_iter().map(|mut bucket| {
725                bucket.metadata.extracted_from_indexed =
726                    sampling_decision == Some(SamplingDecision::Keep);
727                bucket
728            }));
729    }
730
731    /// Extends the contained sampling metrics.
732    pub fn extend_sampling_metrics<I>(
733        &mut self,
734        buckets: I,
735        sampling_decision: Option<SamplingDecision>,
736    ) where
737        I: IntoIterator<Item = Bucket>,
738    {
739        self.metrics
740            .sampling_metrics
741            .extend(buckets.into_iter().map(|mut bucket| {
742                bucket.metadata.extracted_from_indexed =
743                    sampling_decision == Some(SamplingDecision::Keep);
744                bucket
745            }));
746    }
747
748    /// Applies rate limits to the contained metrics.
749    ///
750    /// This is used to apply rate limits which have been enforced on sampled items of an envelope
751    /// to also consistently apply to the metrics extracted from these items.
752    #[cfg(feature = "processing")]
753    fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
754        // Metric namespaces which need to be dropped.
755        let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
756        // Metrics belonging to this metric namespace need to have the `extracted_from_indexed`
757        // flag reset to `false`.
758        let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
759
760        for (namespace, limit, indexed) in [
761            (
762                MetricNamespace::Transactions,
763                &enforcement.event,
764                &enforcement.event_indexed,
765            ),
766            (
767                MetricNamespace::Spans,
768                &enforcement.spans,
769                &enforcement.spans_indexed,
770            ),
771        ] {
772            if limit.is_active() {
773                drop_namespaces.push(namespace);
774            } else if indexed.is_active() && !enforced_consistently {
775                // If the enforcement was not computed by consistently checking the limits,
776                // the quota for the metrics has not yet been incremented.
777                // In this case we have a dropped indexed payload but a metric which still needs to
778                // be accounted for, make sure the metric will still be rate limited.
779                reset_extracted_from_indexed.push(namespace);
780            }
781        }
782
783        if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
784            self.retain_mut(|bucket| {
785                let Some(namespace) = bucket.name.try_namespace() else {
786                    return true;
787                };
788
789                if drop_namespaces.contains(&namespace) {
790                    return false;
791                }
792
793                if reset_extracted_from_indexed.contains(&namespace) {
794                    bucket.metadata.extracted_from_indexed = false;
795                }
796
797                true
798            });
799        }
800    }
801
802    #[cfg(feature = "processing")]
803    fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
804        self.metrics.project_metrics.retain_mut(&mut f);
805        self.metrics.sampling_metrics.retain_mut(&mut f);
806    }
807}
808
809fn send_metrics(
810    metrics: ExtractedMetrics,
811    project_key: ProjectKey,
812    sampling_key: Option<ProjectKey>,
813    aggregator: &Addr<Aggregator>,
814) {
815    let ExtractedMetrics {
816        project_metrics,
817        sampling_metrics,
818    } = metrics;
819
820    if !project_metrics.is_empty() {
821        aggregator.send(MergeBuckets {
822            project_key,
823            buckets: project_metrics,
824        });
825    }
826
827    if !sampling_metrics.is_empty() {
828        // If no sampling project state is available, we associate the sampling
829        // metrics with the current project.
830        //
831        // project_without_tracing         -> metrics goes to self
832        // dependent_project_with_tracing  -> metrics goes to root
833        // root_project_with_tracing       -> metrics goes to root == self
834        let sampling_project_key = sampling_key.unwrap_or(project_key);
835        aggregator.send(MergeBuckets {
836            project_key: sampling_project_key,
837            buckets: sampling_metrics,
838        });
839    }
840}
841
842/// Function for on-off switches that filter specific item types (profiles, spans)
843/// based on a feature flag.
844///
845/// If the project config did not come from the upstream, we keep the items.
846fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
847    match config.relay_mode() {
848        RelayMode::Proxy => false,
849        RelayMode::Managed => !project_info.has_feature(feature),
850    }
851}
852
853/// The result of the envelope processing containing the processed envelope along with the partial
854/// result.
855#[derive(Debug)]
856#[expect(
857    clippy::large_enum_variant,
858    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
859)]
860enum ProcessingResult {
861    Envelope {
862        managed_envelope: TypedEnvelope<Processed>,
863        extracted_metrics: ProcessingExtractedMetrics,
864    },
865    Output(Output<Outputs>),
866}
867
868impl ProcessingResult {
869    /// Creates a [`ProcessingResult`] with no metrics.
870    fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
871        Self::Envelope {
872            managed_envelope,
873            extracted_metrics: ProcessingExtractedMetrics::new(),
874        }
875    }
876}
877
878/// All items which can be submitted upstream.
879#[derive(Debug)]
880#[expect(
881    clippy::large_enum_variant,
882    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
883)]
884enum Submit<'a> {
885    /// A processed envelope.
886    Envelope(TypedEnvelope<Processed>),
887    /// The output of a [`processing::Processor`].
888    Output {
889        output: Outputs,
890        ctx: processing::ForwardContext<'a>,
891    },
892}
893
894/// Applies processing to all contents of the given envelope.
895///
896/// Depending on the contents of the envelope and Relay's mode, this includes:
897///
898///  - Basic normalization and validation for all item types.
899///  - Clock drift correction if the required `sent_at` header is present.
900///  - Expansion of certain item types (e.g. unreal).
901///  - Store normalization for event payloads in processing mode.
902///  - Rate limiters and inbound filters on events in processing mode.
903#[derive(Debug)]
904pub struct ProcessEnvelope {
905    /// Envelope to process.
906    pub envelope: ManagedEnvelope,
907    /// The project info.
908    pub project_info: Arc<ProjectInfo>,
909    /// Currently active cached rate limits for this project.
910    pub rate_limits: Arc<RateLimits>,
911    /// Root sampling project info.
912    pub sampling_project_info: Option<Arc<ProjectInfo>>,
913    /// Sampling reservoir counters.
914    pub reservoir_counters: ReservoirCounters,
915}
916
917/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
918#[derive(Debug)]
919struct ProcessEnvelopeGrouped<'a> {
920    /// The group the envelope belongs to.
921    pub group: ProcessingGroup,
922    /// Envelope to process.
923    pub envelope: ManagedEnvelope,
924    /// The processing context.
925    pub ctx: processing::Context<'a>,
926}
927
928/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
929///
930/// This parses and validates the metrics:
931///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
932///    ignored independently.
933///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
934///    dropped together on parsing failure.
935///  - Other envelope items will be ignored with an error message.
936///
937/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
938/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
939#[derive(Debug)]
940pub struct ProcessMetrics {
941    /// A list of metric items.
942    pub data: MetricData,
943    /// The target project.
944    pub project_key: ProjectKey,
945    /// Whether to keep or reset the metric metadata.
946    pub source: BucketSource,
947    /// The wall clock time at which the request was received.
948    pub received_at: DateTime<Utc>,
949    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
950    /// correction.
951    pub sent_at: Option<DateTime<Utc>>,
952}
953
954/// Raw unparsed metric data.
955#[derive(Debug)]
956pub enum MetricData {
957    /// Raw data, unparsed envelope items.
958    Raw(Vec<Item>),
959    /// Already parsed buckets but unprocessed.
960    Parsed(Vec<Bucket>),
961}
962
963impl MetricData {
964    /// Consumes the metric data and parses the contained buckets.
965    ///
966    /// If the contained data is already parsed the buckets are returned unchanged.
967    /// Raw buckets are parsed and created with the passed `timestamp`.
968    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
969        let items = match self {
970            Self::Parsed(buckets) => return buckets,
971            Self::Raw(items) => items,
972        };
973
974        let mut buckets = Vec::new();
975        for item in items {
976            let payload = item.payload();
977            if item.ty() == &ItemType::Statsd {
978                for bucket_result in Bucket::parse_all(&payload, timestamp) {
979                    match bucket_result {
980                        Ok(bucket) => buckets.push(bucket),
981                        Err(error) => relay_log::debug!(
982                            error = &error as &dyn Error,
983                            "failed to parse metric bucket from statsd format",
984                        ),
985                    }
986                }
987            } else if item.ty() == &ItemType::MetricBuckets {
988                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
989                    Ok(parsed_buckets) => {
990                        // Re-use the allocation of `b` if possible.
991                        if buckets.is_empty() {
992                            buckets = parsed_buckets;
993                        } else {
994                            buckets.extend(parsed_buckets);
995                        }
996                    }
997                    Err(error) => {
998                        relay_log::debug!(
999                            error = &error as &dyn Error,
1000                            "failed to parse metric bucket",
1001                        );
1002                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1003                    }
1004                }
1005            } else {
1006                relay_log::error!(
1007                    "invalid item of type {} passed to ProcessMetrics",
1008                    item.ty()
1009                );
1010            }
1011        }
1012        buckets
1013    }
1014}
1015
1016#[derive(Debug)]
1017pub struct ProcessBatchedMetrics {
1018    /// Metrics payload in JSON format.
1019    pub payload: Bytes,
1020    /// Whether to keep or reset the metric metadata.
1021    pub source: BucketSource,
1022    /// The wall clock time at which the request was received.
1023    pub received_at: DateTime<Utc>,
1024    /// The wall clock time at which the request was received.
1025    pub sent_at: Option<DateTime<Utc>>,
1026}
1027
1028/// Source information where a metric bucket originates from.
1029#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1030pub enum BucketSource {
1031    /// The metric bucket originated from an internal Relay use case.
1032    ///
1033    /// The metric bucket originates either from within the same Relay
1034    /// or was accepted coming from another Relay which is registered as
1035    /// an internal Relay via Relay's configuration.
1036    Internal,
1037    /// The bucket source originated from an untrusted source.
1038    ///
1039    /// Managed Relays sending extracted metrics are considered external,
1040    /// it's a project use case but it comes from an untrusted source.
1041    External,
1042}
1043
1044impl BucketSource {
1045    /// Infers the bucket source from [`RequestMeta::request_trust`].
1046    pub fn from_meta(meta: &RequestMeta) -> Self {
1047        match meta.request_trust() {
1048            RequestTrust::Trusted => Self::Internal,
1049            RequestTrust::Untrusted => Self::External,
1050        }
1051    }
1052}
1053
1054/// Sends a client report to the upstream.
1055#[derive(Debug)]
1056pub struct SubmitClientReports {
1057    /// The client report to be sent.
1058    pub client_reports: Vec<ClientReport>,
1059    /// Scoping information for the client report.
1060    pub scoping: Scoping,
1061}
1062
1063/// CPU-intensive processing tasks for envelopes.
1064#[derive(Debug)]
1065pub enum EnvelopeProcessor {
1066    ProcessEnvelope(Box<ProcessEnvelope>),
1067    ProcessProjectMetrics(Box<ProcessMetrics>),
1068    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1069    FlushBuckets(Box<FlushBuckets>),
1070    SubmitClientReports(Box<SubmitClientReports>),
1071}
1072
1073impl EnvelopeProcessor {
1074    /// Returns the name of the message variant.
1075    pub fn variant(&self) -> &'static str {
1076        match self {
1077            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1078            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1079            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1080            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1081            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1082        }
1083    }
1084}
1085
1086impl relay_system::Interface for EnvelopeProcessor {}
1087
1088impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1089    type Response = relay_system::NoResponse;
1090
1091    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1092        Self::ProcessEnvelope(Box::new(message))
1093    }
1094}
1095
1096impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1097    type Response = NoResponse;
1098
1099    fn from_message(message: ProcessMetrics, _: ()) -> Self {
1100        Self::ProcessProjectMetrics(Box::new(message))
1101    }
1102}
1103
1104impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1105    type Response = NoResponse;
1106
1107    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1108        Self::ProcessBatchedMetrics(Box::new(message))
1109    }
1110}
1111
1112impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1113    type Response = NoResponse;
1114
1115    fn from_message(message: FlushBuckets, _: ()) -> Self {
1116        Self::FlushBuckets(Box::new(message))
1117    }
1118}
1119
1120impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1121    type Response = NoResponse;
1122
1123    fn from_message(message: SubmitClientReports, _: ()) -> Self {
1124        Self::SubmitClientReports(Box::new(message))
1125    }
1126}
1127
1128/// The asynchronous thread pool used for scheduling processing tasks in the processor.
1129pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1130
1131/// Service implementing the [`EnvelopeProcessor`] interface.
1132///
1133/// This service handles messages in a worker pool with configurable concurrency.
1134#[derive(Clone)]
1135pub struct EnvelopeProcessorService {
1136    inner: Arc<InnerProcessor>,
1137}
1138
1139/// Contains the addresses of services that the processor publishes to.
1140pub struct Addrs {
1141    pub outcome_aggregator: Addr<TrackOutcome>,
1142    pub upstream_relay: Addr<UpstreamRelay>,
1143    #[cfg(feature = "processing")]
1144    pub objectstore: Option<Addr<Objectstore>>,
1145    #[cfg(feature = "processing")]
1146    pub store_forwarder: Option<Addr<Store>>,
1147    pub aggregator: Addr<Aggregator>,
1148}
1149
1150impl Default for Addrs {
1151    fn default() -> Self {
1152        Addrs {
1153            outcome_aggregator: Addr::dummy(),
1154            upstream_relay: Addr::dummy(),
1155            #[cfg(feature = "processing")]
1156            objectstore: None,
1157            #[cfg(feature = "processing")]
1158            store_forwarder: None,
1159            aggregator: Addr::dummy(),
1160        }
1161    }
1162}
1163
1164struct InnerProcessor {
1165    pool: EnvelopeProcessorServicePool,
1166    config: Arc<Config>,
1167    global_config: GlobalConfigHandle,
1168    project_cache: ProjectCacheHandle,
1169    cogs: Cogs,
1170    addrs: Addrs,
1171    #[cfg(feature = "processing")]
1172    rate_limiter: Option<Arc<RedisRateLimiter>>,
1173    geoip_lookup: GeoIpLookup,
1174    #[cfg(feature = "processing")]
1175    cardinality_limiter: Option<CardinalityLimiter>,
1176    metric_outcomes: MetricOutcomes,
1177    processing: Processing,
1178}
1179
1180struct Processing {
1181    errors: ErrorsProcessor,
1182    logs: LogsProcessor,
1183    trace_metrics: TraceMetricsProcessor,
1184    spans: SpansProcessor,
1185    check_ins: CheckInsProcessor,
1186    sessions: SessionsProcessor,
1187    transactions: TransactionProcessor,
1188    profile_chunks: ProfileChunksProcessor,
1189    trace_attachments: TraceAttachmentsProcessor,
1190    replays: ReplaysProcessor,
1191    client_reports: ClientReportsProcessor,
1192    attachments: AttachmentProcessor,
1193    user_reports: UserReportsProcessor,
1194}
1195
1196impl EnvelopeProcessorService {
1197    /// Creates a multi-threaded envelope processor.
1198    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1199    pub fn new(
1200        pool: EnvelopeProcessorServicePool,
1201        config: Arc<Config>,
1202        global_config: GlobalConfigHandle,
1203        project_cache: ProjectCacheHandle,
1204        cogs: Cogs,
1205        #[cfg(feature = "processing")] redis: Option<RedisClients>,
1206        addrs: Addrs,
1207        metric_outcomes: MetricOutcomes,
1208    ) -> Self {
1209        let geoip_lookup = config
1210            .geoip_path()
1211            .and_then(
1212                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1213                    Ok(geoip) => Some(geoip),
1214                    Err(err) => {
1215                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1216                        None
1217                    }
1218                },
1219            )
1220            .unwrap_or_else(GeoIpLookup::empty);
1221
1222        #[cfg(feature = "processing")]
1223        let (cardinality, quotas) = match redis {
1224            Some(RedisClients {
1225                cardinality,
1226                quotas,
1227                ..
1228            }) => (Some(cardinality), Some(quotas)),
1229            None => (None, None),
1230        };
1231        #[cfg(not(feature = "processing"))]
1232        let quotas = None;
1233
1234        #[cfg(feature = "processing")]
1235        let rate_limiter = quotas.clone().map(|redis| {
1236            RedisRateLimiter::new(redis)
1237                .max_limit(config.max_rate_limit())
1238                .cache(config.quota_cache_ratio(), config.quota_cache_max())
1239        });
1240
1241        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1242            #[cfg(feature = "processing")]
1243            project_cache.clone(),
1244            #[cfg(feature = "processing")]
1245            rate_limiter.clone(),
1246        ));
1247        #[cfg(feature = "processing")]
1248        let rate_limiter = rate_limiter.map(Arc::new);
1249        let outcome_aggregator = addrs.outcome_aggregator.clone();
1250        let inner = InnerProcessor {
1251            pool,
1252            global_config,
1253            project_cache,
1254            cogs,
1255            #[cfg(feature = "processing")]
1256            rate_limiter,
1257            addrs,
1258            #[cfg(feature = "processing")]
1259            cardinality_limiter: cardinality
1260                .map(|cardinality| {
1261                    RedisSetLimiter::new(
1262                        RedisSetLimiterOptions {
1263                            cache_vacuum_interval: config
1264                                .cardinality_limiter_cache_vacuum_interval(),
1265                        },
1266                        cardinality,
1267                    )
1268                })
1269                .map(CardinalityLimiter::new),
1270            metric_outcomes,
1271            processing: Processing {
1272                errors: ErrorsProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1273                logs: LogsProcessor::new(Arc::clone(&quota_limiter)),
1274                trace_metrics: TraceMetricsProcessor::new(Arc::clone(&quota_limiter)),
1275                spans: SpansProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1276                check_ins: CheckInsProcessor::new(Arc::clone(&quota_limiter)),
1277                sessions: SessionsProcessor::new(Arc::clone(&quota_limiter)),
1278                transactions: TransactionProcessor::new(
1279                    Arc::clone(&quota_limiter),
1280                    geoip_lookup.clone(),
1281                    quotas.clone(),
1282                ),
1283                profile_chunks: ProfileChunksProcessor::new(Arc::clone(&quota_limiter)),
1284                trace_attachments: TraceAttachmentsProcessor::new(Arc::clone(&quota_limiter)),
1285                replays: ReplaysProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1286                client_reports: ClientReportsProcessor::new(outcome_aggregator),
1287                attachments: AttachmentProcessor::new(Arc::clone(&quota_limiter)),
1288                user_reports: UserReportsProcessor::new(quota_limiter),
1289            },
1290            geoip_lookup,
1291            config,
1292        };
1293
1294        Self {
1295            inner: Arc::new(inner),
1296        }
1297    }
1298
1299    async fn enforce_quotas<Group>(
1300        &self,
1301        managed_envelope: &mut TypedEnvelope<Group>,
1302        event: Annotated<Event>,
1303        extracted_metrics: &mut ProcessingExtractedMetrics,
1304        ctx: processing::Context<'_>,
1305    ) -> Result<Annotated<Event>, ProcessingError> {
1306        // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not
1307        // applied in the fast path, all cached quotas can be applied here.
1308        let cached_result = RateLimiter::Cached
1309            .enforce(managed_envelope, event, extracted_metrics, ctx)
1310            .await?;
1311
1312        if_processing!(self.inner.config, {
1313            let rate_limiter = match self.inner.rate_limiter.clone() {
1314                Some(rate_limiter) => rate_limiter,
1315                None => return Ok(cached_result.event),
1316            };
1317
1318            // Enforce all quotas consistently with Redis.
1319            let consistent_result = RateLimiter::Consistent(rate_limiter)
1320                .enforce(
1321                    managed_envelope,
1322                    cached_result.event,
1323                    extracted_metrics,
1324                    ctx
1325                )
1326                .await?;
1327
1328            // Update cached rate limits with the freshly computed ones.
1329            if !consistent_result.rate_limits.is_empty() {
1330                self.inner
1331                    .project_cache
1332                    .get(managed_envelope.scoping().project_key)
1333                    .rate_limits()
1334                    .merge(consistent_result.rate_limits);
1335            }
1336
1337            Ok(consistent_result.event)
1338        } else { Ok(cached_result.event) })
1339    }
1340
1341    /// Processes the general errors, and the items which require or create the events.
1342    async fn process_errors(
1343        &self,
1344        managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1345        project_id: ProjectId,
1346        mut ctx: processing::Context<'_>,
1347    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1348        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1349        let mut metrics = Metrics::default();
1350        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1351
1352        // Events can also contain user reports.
1353        report::process_user_reports(managed_envelope);
1354
1355        if_processing!(self.inner.config, {
1356            unreal::expand(managed_envelope, &self.inner.config)?;
1357            #[cfg(sentry)]
1358            playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1359            nnswitch::expand(managed_envelope)?;
1360        });
1361
1362        let mut event = event::extract(
1363            managed_envelope,
1364            &mut metrics,
1365            event_fully_normalized,
1366            &self.inner.config,
1367        )?;
1368
1369        if_processing!(self.inner.config, {
1370            if let Some(inner_event_fully_normalized) =
1371                unreal::process(managed_envelope, &mut event)?
1372            {
1373                event_fully_normalized = inner_event_fully_normalized;
1374            }
1375            #[cfg(sentry)]
1376            if let Some(inner_event_fully_normalized) =
1377                playstation::process(managed_envelope, &mut event, ctx.project_info)?
1378            {
1379                event_fully_normalized = inner_event_fully_normalized;
1380            }
1381            if let Some(inner_event_fully_normalized) =
1382                attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1383            {
1384                event_fully_normalized = inner_event_fully_normalized;
1385            }
1386        });
1387
1388        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1389            managed_envelope,
1390            &mut event,
1391            ctx.project_info,
1392            ctx.sampling_project_info,
1393        );
1394
1395        let attachments = managed_envelope
1396            .envelope()
1397            .items()
1398            .filter(|item| item.attachment_type() == Some(AttachmentType::Attachment));
1399        processing::utils::event::finalize(
1400            managed_envelope.envelope().headers(),
1401            &mut event,
1402            attachments,
1403            &mut metrics,
1404            ctx.config,
1405        )?;
1406        event_fully_normalized = processing::utils::event::normalize(
1407            managed_envelope.envelope().headers(),
1408            &mut event,
1409            event_fully_normalized,
1410            project_id,
1411            ctx,
1412            &self.inner.geoip_lookup,
1413        )?;
1414        let filter_run =
1415            processing::utils::event::filter(managed_envelope.envelope().headers(), &event, ctx)
1416                .map_err(ProcessingError::EventFiltered)?;
1417
1418        if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1419            dynamic_sampling::tag_error_with_sampling_decision(
1420                managed_envelope,
1421                &mut event,
1422                ctx.sampling_project_info,
1423                &self.inner.config,
1424            )
1425            .await;
1426        }
1427
1428        event = self
1429            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1430            .await?;
1431
1432        if event.value().is_some() {
1433            processing::utils::event::scrub(&mut event, ctx.project_info)?;
1434            event::serialize(
1435                managed_envelope,
1436                &mut event,
1437                event_fully_normalized,
1438                EventMetricsExtracted(false),
1439                SpansExtracted(false),
1440            )?;
1441            event::emit_feedback_metrics(managed_envelope.envelope());
1442        }
1443
1444        let attachments = managed_envelope
1445            .envelope_mut()
1446            .items_mut()
1447            .filter(|i| i.ty() == &ItemType::Attachment);
1448        processing::utils::attachments::scrub(attachments, ctx.project_info, None);
1449
1450        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1451            relay_log::error!(
1452                tags.project = %project_id,
1453                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1454                "ingested event without normalizing"
1455            );
1456        }
1457
1458        Ok(Some(extracted_metrics))
1459    }
1460
1461    /// Processes standalone items that require an event ID, but do not have an event on the same envelope.
1462    async fn process_standalone(
1463        &self,
1464        managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1465        ctx: processing::Context<'_>,
1466    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1467        for item in managed_envelope.envelope().items() {
1468            let attachment_type_tag = match item.attachment_type() {
1469                Some(t) => &t.to_string(),
1470                None => "",
1471            };
1472            relay_statsd::metric!(
1473                counter(RelayCounters::StandaloneItem) += 1,
1474                processor = "old",
1475                item_type = item.ty().name(),
1476                attachment_type = attachment_type_tag,
1477            );
1478        }
1479
1480        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1481
1482        standalone::process(managed_envelope);
1483
1484        profile::filter(managed_envelope, ctx.config, ctx.project_info);
1485
1486        self.enforce_quotas(
1487            managed_envelope,
1488            Annotated::empty(),
1489            &mut extracted_metrics,
1490            ctx,
1491        )
1492        .await?;
1493
1494        report::process_user_reports(managed_envelope);
1495
1496        Ok(Some(extracted_metrics))
1497    }
1498
1499    async fn process_nel(
1500        &self,
1501        mut managed_envelope: ManagedEnvelope,
1502        ctx: processing::Context<'_>,
1503    ) -> Result<ProcessingResult, ProcessingError> {
1504        nel::convert_to_logs(&mut managed_envelope);
1505        self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1506            .await
1507    }
1508
1509    async fn process_with_processor<P: processing::Processor>(
1510        &self,
1511        processor: &P,
1512        mut managed_envelope: ManagedEnvelope,
1513        ctx: processing::Context<'_>,
1514    ) -> Result<ProcessingResult, ProcessingError>
1515    where
1516        Outputs: From<P::Output>,
1517    {
1518        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1519            debug_assert!(
1520                false,
1521                "there must be work for the {} processor",
1522                std::any::type_name::<P>(),
1523            );
1524            return Err(ProcessingError::ProcessingGroupMismatch);
1525        };
1526
1527        managed_envelope.update();
1528        match managed_envelope.envelope().is_empty() {
1529            true => managed_envelope.accept(),
1530            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1531        }
1532
1533        processor
1534            .process(work, ctx)
1535            .await
1536            .map_err(|err| {
1537                relay_log::debug!(
1538                    error = &err as &dyn std::error::Error,
1539                    "processing pipeline failed"
1540                );
1541                ProcessingError::ProcessingFailure
1542            })
1543            .map(|o| o.map(Into::into))
1544            .map(ProcessingResult::Output)
1545    }
1546
1547    /// Processes standalone spans.
1548    ///
1549    /// This function does *not* run for spans extracted from transactions.
1550    async fn process_standalone_spans(
1551        &self,
1552        managed_envelope: &mut TypedEnvelope<SpanGroup>,
1553        _project_id: ProjectId,
1554        ctx: processing::Context<'_>,
1555    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1556        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1557
1558        span::filter(managed_envelope, ctx.config, ctx.project_info);
1559
1560        if_processing!(self.inner.config, {
1561            span::process(
1562                managed_envelope,
1563                &mut Annotated::empty(),
1564                &mut extracted_metrics,
1565                _project_id,
1566                ctx,
1567                &self.inner.geoip_lookup,
1568            )
1569            .await;
1570        });
1571
1572        self.enforce_quotas(
1573            managed_envelope,
1574            Annotated::empty(),
1575            &mut extracted_metrics,
1576            ctx,
1577        )
1578        .await?;
1579
1580        Ok(Some(extracted_metrics))
1581    }
1582
1583    async fn process_envelope(
1584        &self,
1585        project_id: ProjectId,
1586        message: ProcessEnvelopeGrouped<'_>,
1587    ) -> Result<ProcessingResult, ProcessingError> {
1588        let ProcessEnvelopeGrouped {
1589            group,
1590            envelope: mut managed_envelope,
1591            ctx,
1592        } = message;
1593
1594        // Pre-process the envelope headers.
1595        if let Some(sampling_state) = ctx.sampling_project_info {
1596            // Both transactions and standalone span envelopes need a normalized DSC header
1597            // to make sampling rules based on the segment/transaction name work correctly.
1598            managed_envelope
1599                .envelope_mut()
1600                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1601        }
1602
1603        // Set the event retention. Effectively, this value will only be available in processing
1604        // mode when the full project config is queried from the upstream.
1605        if let Some(retention) = ctx.project_info.config.event_retention {
1606            managed_envelope.envelope_mut().set_retention(retention);
1607        }
1608
1609        // Set the event retention. Effectively, this value will only be available in processing
1610        // mode when the full project config is queried from the upstream.
1611        if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1612            managed_envelope
1613                .envelope_mut()
1614                .set_downsampled_retention(retention);
1615        }
1616
1617        // Ensure the project ID is updated to the stored instance for this project cache. This can
1618        // differ in two cases:
1619        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
1620        //  2. The DSN was moved and the envelope sent to the old project ID.
1621        managed_envelope
1622            .envelope_mut()
1623            .meta_mut()
1624            .set_project_id(project_id);
1625
1626        macro_rules! run {
1627            ($fn_name:ident $(, $args:expr)*) => {
1628                async {
1629                    let mut managed_envelope = (managed_envelope, group).try_into()?;
1630                    match self.$fn_name(&mut managed_envelope, $($args),*).await {
1631                        Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1632                            managed_envelope: managed_envelope.into_processed(),
1633                            extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1634                        }),
1635                        Err(error) => {
1636                            relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1637                            if let Some(outcome) = error.to_outcome() {
1638                                managed_envelope.reject(outcome);
1639                            }
1640
1641                            return Err(error);
1642                        }
1643                    }
1644                }.await
1645            };
1646        }
1647
1648        relay_log::trace!("Processing {group} group", group = group.variant());
1649
1650        match group {
1651            ProcessingGroup::Error => {
1652                if ctx.project_info.has_feature(Feature::NewErrorProcessing) {
1653                    self.process_with_processor(
1654                        &self.inner.processing.errors,
1655                        managed_envelope,
1656                        ctx,
1657                    )
1658                    .await
1659                } else {
1660                    run!(process_errors, project_id, ctx)
1661                }
1662            }
1663            ProcessingGroup::Transaction => {
1664                self.process_with_processor(
1665                    &self.inner.processing.transactions,
1666                    managed_envelope,
1667                    ctx,
1668                )
1669                .await
1670            }
1671            ProcessingGroup::Session => {
1672                self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1673                    .await
1674            }
1675            ProcessingGroup::Standalone => run!(process_standalone, ctx),
1676            ProcessingGroup::StandaloneAttachments => {
1677                self.process_with_processor(
1678                    &self.inner.processing.attachments,
1679                    managed_envelope,
1680                    ctx,
1681                )
1682                .await
1683            }
1684            ProcessingGroup::StandaloneUserReports => {
1685                self.process_with_processor(
1686                    &self.inner.processing.user_reports,
1687                    managed_envelope,
1688                    ctx,
1689                )
1690                .await
1691            }
1692            ProcessingGroup::ClientReport => {
1693                self.process_with_processor(
1694                    &self.inner.processing.client_reports,
1695                    managed_envelope,
1696                    ctx,
1697                )
1698                .await
1699            }
1700            ProcessingGroup::Replay => {
1701                self.process_with_processor(&self.inner.processing.replays, managed_envelope, ctx)
1702                    .await
1703            }
1704            ProcessingGroup::CheckIn => {
1705                self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1706                    .await
1707            }
1708            ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1709            ProcessingGroup::Log => {
1710                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1711                    .await
1712            }
1713            ProcessingGroup::TraceMetric => {
1714                self.process_with_processor(
1715                    &self.inner.processing.trace_metrics,
1716                    managed_envelope,
1717                    ctx,
1718                )
1719                .await
1720            }
1721            ProcessingGroup::SpanV2 => {
1722                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1723                    .await
1724            }
1725            ProcessingGroup::TraceAttachment => {
1726                self.process_with_processor(
1727                    &self.inner.processing.trace_attachments,
1728                    managed_envelope,
1729                    ctx,
1730                )
1731                .await
1732            }
1733            ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1734            ProcessingGroup::ProfileChunk => {
1735                self.process_with_processor(
1736                    &self.inner.processing.profile_chunks,
1737                    managed_envelope,
1738                    ctx,
1739                )
1740                .await
1741            }
1742            // Currently is not used.
1743            ProcessingGroup::Metrics => {
1744                // In proxy mode we simply forward the metrics.
1745                // This group shouldn't be used outside of proxy mode.
1746                if self.inner.config.relay_mode() != RelayMode::Proxy {
1747                    relay_log::error!(
1748                        tags.project = %project_id,
1749                        items = ?managed_envelope.envelope().items().next().map(Item::ty),
1750                        "received metrics in the process_state"
1751                    );
1752                }
1753
1754                Ok(ProcessingResult::no_metrics(
1755                    managed_envelope.into_processed(),
1756                ))
1757            }
1758            // Fallback to the legacy process_state implementation for Ungrouped events.
1759            ProcessingGroup::Ungrouped => {
1760                relay_log::error!(
1761                    tags.project = %project_id,
1762                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
1763                    "could not identify the processing group based on the envelope's items"
1764                );
1765
1766                Ok(ProcessingResult::no_metrics(
1767                    managed_envelope.into_processed(),
1768                ))
1769            }
1770            // Leave this group unchanged.
1771            //
1772            // This will later be forwarded to upstream.
1773            ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
1774                managed_envelope.into_processed(),
1775            )),
1776        }
1777    }
1778
1779    /// Processes the envelope and returns the processed envelope back.
1780    ///
1781    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
1782    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
1783    /// to be dropped, this is `None`.
1784    async fn process<'a>(
1785        &self,
1786        mut message: ProcessEnvelopeGrouped<'a>,
1787    ) -> Result<Option<Submit<'a>>, ProcessingError> {
1788        let ProcessEnvelopeGrouped {
1789            ref mut envelope,
1790            ctx,
1791            ..
1792        } = message;
1793
1794        // Prefer the project's project ID, and fall back to the stated project id from the
1795        // envelope. The project ID is available in all modes, other than in proxy mode, where
1796        // envelopes for unknown projects are forwarded blindly.
1797        //
1798        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
1799        // since we cannot process an envelope without project ID, so drop it.
1800        let Some(project_id) = ctx
1801            .project_info
1802            .project_id
1803            .or_else(|| envelope.envelope().meta().project_id())
1804        else {
1805            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1806            return Err(ProcessingError::MissingProjectId);
1807        };
1808
1809        let client = envelope.envelope().meta().client().map(str::to_owned);
1810        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1811        let project_key = envelope.envelope().meta().public_key();
1812        // Only allow sending to the sampling key, if we successfully loaded a sampling project
1813        // info relating to it. This filters out unknown/invalid project keys as well as project
1814        // keys from different organizations.
1815        let sampling_key = envelope
1816            .envelope()
1817            .sampling_key()
1818            .filter(|_| ctx.sampling_project_info.is_some());
1819
1820        // We set additional information on the scope, which will be removed after processing the
1821        // envelope.
1822        relay_log::configure_scope(|scope| {
1823            scope.set_tag("project", project_id);
1824            if let Some(client) = client {
1825                scope.set_tag("sdk", client);
1826            }
1827            if let Some(user_agent) = user_agent {
1828                scope.set_extra("user_agent", user_agent.into());
1829            }
1830        });
1831
1832        let result = match self.process_envelope(project_id, message).await {
1833            Ok(ProcessingResult::Envelope {
1834                mut managed_envelope,
1835                extracted_metrics,
1836            }) => {
1837                // The envelope could be modified or even emptied during processing, which
1838                // requires re-computation of the context.
1839                managed_envelope.update();
1840
1841                let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
1842                send_metrics(
1843                    extracted_metrics.metrics,
1844                    project_key,
1845                    sampling_key,
1846                    &self.inner.addrs.aggregator,
1847                );
1848
1849                let envelope_response = if managed_envelope.envelope().is_empty() {
1850                    if !has_metrics {
1851                        // Individual rate limits have already been issued
1852                        managed_envelope.reject(Outcome::RateLimited(None));
1853                    } else {
1854                        managed_envelope.accept();
1855                    }
1856
1857                    None
1858                } else {
1859                    Some(managed_envelope)
1860                };
1861
1862                Ok(envelope_response.map(Submit::Envelope))
1863            }
1864            Ok(ProcessingResult::Output(Output { main, metrics })) => {
1865                if let Some(metrics) = metrics {
1866                    metrics.accept(|metrics| {
1867                        send_metrics(
1868                            metrics,
1869                            project_key,
1870                            sampling_key,
1871                            &self.inner.addrs.aggregator,
1872                        );
1873                    });
1874                }
1875
1876                let ctx = ctx.to_forward();
1877                Ok(main.map(|output| Submit::Output { output, ctx }))
1878            }
1879            Err(err) => Err(err),
1880        };
1881
1882        relay_log::configure_scope(|scope| {
1883            scope.remove_tag("project");
1884            scope.remove_tag("sdk");
1885            scope.remove_tag("user_agent");
1886        });
1887
1888        result
1889    }
1890
1891    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1892        let project_key = message.envelope.envelope().meta().public_key();
1893        let wait_time = message.envelope.age();
1894        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1895
1896        // This COGS handling may need an overhaul in the future:
1897        // Cancel the passed in token, to start individual measurements per envelope instead.
1898        cogs.cancel();
1899
1900        let scoping = message.envelope.scoping();
1901        for (group, envelope) in ProcessingGroup::split_envelope(
1902            *message.envelope.into_envelope(),
1903            &message.project_info,
1904        ) {
1905            let mut cogs = self
1906                .inner
1907                .cogs
1908                .timed(ResourceId::Relay, AppFeature::from(group));
1909
1910            let mut envelope =
1911                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1912            envelope.scope(scoping);
1913
1914            let global_config = self.inner.global_config.current();
1915
1916            let ctx = processing::Context {
1917                config: &self.inner.config,
1918                global_config: &global_config,
1919                project_info: &message.project_info,
1920                sampling_project_info: message.sampling_project_info.as_deref(),
1921                rate_limits: &message.rate_limits,
1922                reservoir_counters: &message.reservoir_counters,
1923            };
1924
1925            let message = ProcessEnvelopeGrouped {
1926                group,
1927                envelope,
1928                ctx,
1929            };
1930
1931            let result = metric!(
1932                timer(RelayTimers::EnvelopeProcessingTime),
1933                group = group.variant(),
1934                { self.process(message).await }
1935            );
1936
1937            match result {
1938                Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
1939                Ok(None) => {}
1940                Err(error) if error.is_unexpected() => {
1941                    relay_log::error!(
1942                        tags.project_key = %project_key,
1943                        error = &error as &dyn Error,
1944                        "error processing envelope"
1945                    )
1946                }
1947                Err(error) => {
1948                    relay_log::debug!(
1949                        tags.project_key = %project_key,
1950                        error = &error as &dyn Error,
1951                        "error processing envelope"
1952                    )
1953                }
1954            }
1955        }
1956    }
1957
1958    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1959        let ProcessMetrics {
1960            data,
1961            project_key,
1962            received_at,
1963            sent_at,
1964            source,
1965        } = message;
1966
1967        let received_timestamp =
1968            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1969
1970        let mut buckets = data.into_buckets(received_timestamp);
1971        if buckets.is_empty() {
1972            return;
1973        };
1974        cogs.update(relay_metrics::cogs::BySize(&buckets));
1975
1976        let clock_drift_processor =
1977            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
1978
1979        buckets.retain_mut(|bucket| {
1980            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
1981                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
1982                return false;
1983            }
1984
1985            if !self::metrics::is_valid_namespace(bucket) {
1986                return false;
1987            }
1988
1989            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
1990
1991            if !matches!(source, BucketSource::Internal) {
1992                bucket.metadata = BucketMetadata::new(received_timestamp);
1993            }
1994
1995            true
1996        });
1997
1998        let project = self.inner.project_cache.get(project_key);
1999
2000        // Best effort check to filter and rate limit buckets, if there is no project state
2001        // available at the current time, we will check again after flushing.
2002        let buckets = match project.state() {
2003            ProjectState::Enabled(project_info) => {
2004                let rate_limits = project.rate_limits().current_limits();
2005                self.check_buckets(project_key, project_info, &rate_limits, buckets)
2006            }
2007            _ => buckets,
2008        };
2009
2010        relay_log::trace!("merging metric buckets into the aggregator");
2011        self.inner
2012            .addrs
2013            .aggregator
2014            .send(MergeBuckets::new(project_key, buckets));
2015    }
2016
2017    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2018        let ProcessBatchedMetrics {
2019            payload,
2020            source,
2021            received_at,
2022            sent_at,
2023        } = message;
2024
2025        #[derive(serde::Deserialize)]
2026        struct Wrapper {
2027            buckets: HashMap<ProjectKey, Vec<Bucket>>,
2028        }
2029
2030        let buckets = match serde_json::from_slice(&payload) {
2031            Ok(Wrapper { buckets }) => buckets,
2032            Err(error) => {
2033                relay_log::debug!(
2034                    error = &error as &dyn Error,
2035                    "failed to parse batched metrics",
2036                );
2037                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2038                return;
2039            }
2040        };
2041
2042        for (project_key, buckets) in buckets {
2043            self.handle_process_metrics(
2044                cogs,
2045                ProcessMetrics {
2046                    data: MetricData::Parsed(buckets),
2047                    project_key,
2048                    source,
2049                    received_at,
2050                    sent_at,
2051                },
2052            )
2053        }
2054    }
2055
2056    fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2057        let _submit = cogs.start_category("submit");
2058
2059        #[cfg(feature = "processing")]
2060        if self.inner.config.processing_enabled()
2061            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2062        {
2063            use crate::processing::StoreHandle;
2064
2065            let objectstore = self.inner.addrs.objectstore.as_ref();
2066            let global_config = &self.inner.global_config.current();
2067            let handle = StoreHandle::new(store_forwarder, objectstore, global_config);
2068
2069            match submit {
2070                // Once check-ins and errors are fully moved to the new pipeline, this is only
2071                // used for metrics forwarding.
2072                //
2073                // Metrics forwarding will n_never_ forward an envelope in processing, making
2074                // this branch here unused.
2075                Submit::Envelope(envelope) => handle.send_envelope(envelope.into_inner()),
2076                Submit::Output { output, ctx } => output
2077                    .forward_store(handle, ctx)
2078                    .unwrap_or_else(|err| err.into_inner()),
2079            }
2080            return;
2081        }
2082
2083        let mut envelope = match submit {
2084            Submit::Envelope(envelope) => envelope,
2085            Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2086                Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2087                Err(_) => {
2088                    relay_log::error!("failed to serialize output to an envelope");
2089                    return;
2090                }
2091            },
2092        };
2093
2094        if envelope.envelope_mut().is_empty() {
2095            envelope.accept();
2096            return;
2097        }
2098
2099        // Override the `sent_at` timestamp. Since the envelope went through basic
2100        // normalization, all timestamps have been corrected. We propagate the new
2101        // `sent_at` to allow the next Relay to double-check this timestamp and
2102        // potentially apply correction again. This is done as close to sending as
2103        // possible so that we avoid internal delays.
2104        envelope.envelope_mut().set_sent_at(Utc::now());
2105
2106        relay_log::trace!("sending envelope to sentry endpoint");
2107        let http_encoding = self.inner.config.http_encoding();
2108        let result = envelope.envelope().to_vec().and_then(|v| {
2109            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2110        });
2111
2112        match result {
2113            Ok(body) => {
2114                self.inner
2115                    .addrs
2116                    .upstream_relay
2117                    .send(SendRequest(SendEnvelope {
2118                        envelope,
2119                        body,
2120                        http_encoding,
2121                        project_cache: self.inner.project_cache.clone(),
2122                    }));
2123            }
2124            Err(error) => {
2125                // Errors are only logged for what we consider an internal discard reason. These
2126                // indicate errors in the infrastructure or implementation bugs.
2127                relay_log::error!(
2128                    error = &error as &dyn Error,
2129                    tags.project_key = %envelope.scoping().project_key,
2130                    "failed to serialize envelope payload"
2131                );
2132
2133                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2134            }
2135        }
2136    }
2137
2138    fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2139        let SubmitClientReports {
2140            client_reports,
2141            scoping,
2142        } = message;
2143
2144        let upstream = self.inner.config.upstream_descriptor();
2145        let dsn = PartialDsn::outbound(&scoping, upstream);
2146
2147        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2148        for client_report in client_reports {
2149            match client_report.serialize() {
2150                Ok(payload) => {
2151                    let mut item = Item::new(ItemType::ClientReport);
2152                    item.set_payload(ContentType::Json, payload);
2153                    envelope.add_item(item);
2154                }
2155                Err(error) => {
2156                    relay_log::error!(
2157                        error = &error as &dyn std::error::Error,
2158                        "failed to serialize client report"
2159                    );
2160                }
2161            }
2162        }
2163
2164        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2165        self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2166    }
2167
2168    fn check_buckets(
2169        &self,
2170        project_key: ProjectKey,
2171        project_info: &ProjectInfo,
2172        rate_limits: &RateLimits,
2173        buckets: Vec<Bucket>,
2174    ) -> Vec<Bucket> {
2175        let Some(scoping) = project_info.scoping(project_key) else {
2176            relay_log::error!(
2177                tags.project_key = project_key.as_str(),
2178                "there is no scoping: dropping {} buckets",
2179                buckets.len(),
2180            );
2181            return Vec::new();
2182        };
2183
2184        let mut buckets = self::metrics::apply_project_info(
2185            buckets,
2186            &self.inner.metric_outcomes,
2187            project_info,
2188            scoping,
2189        );
2190
2191        let namespaces: BTreeSet<MetricNamespace> = buckets
2192            .iter()
2193            .filter_map(|bucket| bucket.name.try_namespace())
2194            .collect();
2195
2196        for namespace in namespaces {
2197            let limits = rate_limits.check_with_quotas(
2198                project_info.get_quotas(),
2199                scoping.item(DataCategory::MetricBucket),
2200            );
2201
2202            if limits.is_limited() {
2203                let rejected;
2204                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2205                    bucket.name.try_namespace() == Some(namespace)
2206                });
2207
2208                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2209                self.inner.metric_outcomes.track(
2210                    scoping,
2211                    &rejected,
2212                    Outcome::RateLimited(reason_code),
2213                );
2214            }
2215        }
2216
2217        let quotas = project_info.config.quotas.clone();
2218        match MetricsLimiter::create(buckets, quotas, scoping) {
2219            Ok(mut bucket_limiter) => {
2220                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2221                bucket_limiter.into_buckets()
2222            }
2223            Err(buckets) => buckets,
2224        }
2225    }
2226
2227    #[cfg(feature = "processing")]
2228    async fn rate_limit_buckets(
2229        &self,
2230        scoping: Scoping,
2231        project_info: &ProjectInfo,
2232        mut buckets: Vec<Bucket>,
2233    ) -> Vec<Bucket> {
2234        let Some(rate_limiter) = &self.inner.rate_limiter else {
2235            return buckets;
2236        };
2237
2238        let global_config = self.inner.global_config.current();
2239        let namespaces = buckets
2240            .iter()
2241            .filter_map(|bucket| bucket.name.try_namespace())
2242            .counts();
2243
2244        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2245
2246        for (namespace, quantity) in namespaces {
2247            let item_scoping = scoping.metric_bucket(namespace);
2248
2249            let limits = match rate_limiter
2250                .is_rate_limited(quotas, item_scoping, quantity, false)
2251                .await
2252            {
2253                Ok(limits) => limits,
2254                Err(err) => {
2255                    relay_log::error!(
2256                        error = &err as &dyn std::error::Error,
2257                        "failed to check redis rate limits"
2258                    );
2259                    break;
2260                }
2261            };
2262
2263            if limits.is_limited() {
2264                let rejected;
2265                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2266                    bucket.name.try_namespace() == Some(namespace)
2267                });
2268
2269                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2270                self.inner.metric_outcomes.track(
2271                    scoping,
2272                    &rejected,
2273                    Outcome::RateLimited(reason_code),
2274                );
2275
2276                self.inner
2277                    .project_cache
2278                    .get(item_scoping.scoping.project_key)
2279                    .rate_limits()
2280                    .merge(limits);
2281            }
2282        }
2283
2284        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2285            Err(buckets) => buckets,
2286            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2287        }
2288    }
2289
2290    /// Check and apply rate limits to metrics buckets for transactions and spans.
2291    #[cfg(feature = "processing")]
2292    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2293        relay_log::trace!("handle_rate_limit_buckets");
2294
2295        let scoping = *bucket_limiter.scoping();
2296
2297        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2298            let global_config = self.inner.global_config.current();
2299            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2300
2301            // We set over_accept_once such that the limit is actually reached, which allows subsequent
2302            // calls with quantity=0 to be rate limited.
2303            let over_accept_once = true;
2304            let mut rate_limits = RateLimits::new();
2305
2306            for category in [DataCategory::Transaction, DataCategory::Span] {
2307                let count = bucket_limiter.count(category);
2308
2309                let timer = Instant::now();
2310                let mut is_limited = false;
2311
2312                if let Some(count) = count {
2313                    match rate_limiter
2314                        .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2315                        .await
2316                    {
2317                        Ok(limits) => {
2318                            is_limited = limits.is_limited();
2319                            rate_limits.merge(limits)
2320                        }
2321                        Err(e) => {
2322                            relay_log::error!(error = &e as &dyn Error, "rate limiting error")
2323                        }
2324                    }
2325                }
2326
2327                relay_statsd::metric!(
2328                    timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2329                    category = category.name(),
2330                    limited = if is_limited { "true" } else { "false" },
2331                    count = match count {
2332                        None => "none",
2333                        Some(0) => "0",
2334                        Some(1) => "1",
2335                        Some(1..=10) => "10",
2336                        Some(1..=25) => "25",
2337                        Some(1..=50) => "50",
2338                        Some(51..=100) => "100",
2339                        Some(101..=500) => "500",
2340                        _ => "> 500",
2341                    },
2342                );
2343            }
2344
2345            if rate_limits.is_limited() {
2346                let was_enforced =
2347                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2348
2349                if was_enforced {
2350                    // Update the rate limits in the project cache.
2351                    self.inner
2352                        .project_cache
2353                        .get(scoping.project_key)
2354                        .rate_limits()
2355                        .merge(rate_limits);
2356                }
2357            }
2358        }
2359
2360        bucket_limiter.into_buckets()
2361    }
2362
2363    /// Cardinality limits the passed buckets and returns a filtered vector of only accepted buckets.
2364    #[cfg(feature = "processing")]
2365    async fn cardinality_limit_buckets(
2366        &self,
2367        scoping: Scoping,
2368        limits: &[CardinalityLimit],
2369        buckets: Vec<Bucket>,
2370    ) -> Vec<Bucket> {
2371        let global_config = self.inner.global_config.current();
2372        let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2373
2374        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2375            return buckets;
2376        }
2377
2378        let Some(ref limiter) = self.inner.cardinality_limiter else {
2379            return buckets;
2380        };
2381
2382        let scope = relay_cardinality::Scoping {
2383            organization_id: scoping.organization_id,
2384            project_id: scoping.project_id,
2385        };
2386
2387        let limits = match limiter
2388            .check_cardinality_limits(scope, limits, buckets)
2389            .await
2390        {
2391            Ok(limits) => limits,
2392            Err((buckets, error)) => {
2393                relay_log::error!(
2394                    error = &error as &dyn std::error::Error,
2395                    "cardinality limiter failed"
2396                );
2397                return buckets;
2398            }
2399        };
2400
2401        let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2402        if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2403            for limit in limits.exceeded_limits() {
2404                relay_log::with_scope(
2405                    |scope| {
2406                        // Set the organization as user so we can alert on distinct org_ids.
2407                        scope.set_user(Some(relay_log::sentry::User {
2408                            id: Some(scoping.organization_id.to_string()),
2409                            ..Default::default()
2410                        }));
2411                    },
2412                    || {
2413                        relay_log::error!(
2414                            tags.organization_id = scoping.organization_id.value(),
2415                            tags.limit_id = limit.id,
2416                            tags.passive = limit.passive,
2417                            "Cardinality Limit"
2418                        );
2419                    },
2420                );
2421            }
2422        }
2423
2424        for (limit, reports) in limits.cardinality_reports() {
2425            for report in reports {
2426                self.inner
2427                    .metric_outcomes
2428                    .cardinality(scoping, limit, report);
2429            }
2430        }
2431
2432        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2433            return limits.into_source();
2434        }
2435
2436        let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2437
2438        for (bucket, exceeded) in rejected {
2439            self.inner.metric_outcomes.track(
2440                scoping,
2441                &[bucket],
2442                Outcome::CardinalityLimited(exceeded.id.clone()),
2443            );
2444        }
2445        accepted
2446    }
2447
2448    /// Processes metric buckets and sends them to kafka.
2449    ///
2450    /// This function runs the following steps:
2451    ///  - cardinality limiting
2452    ///  - rate limiting
2453    ///  - submit to `StoreForwarder`
2454    #[cfg(feature = "processing")]
2455    async fn encode_metrics_processing(
2456        &self,
2457        message: FlushBuckets,
2458        store_forwarder: &Addr<Store>,
2459    ) {
2460        use crate::constants::DEFAULT_EVENT_RETENTION;
2461        use crate::services::store::StoreMetrics;
2462
2463        for ProjectBuckets {
2464            buckets,
2465            scoping,
2466            project_info,
2467            ..
2468        } in message.buckets.into_values()
2469        {
2470            let buckets = self
2471                .rate_limit_buckets(scoping, &project_info, buckets)
2472                .await;
2473
2474            let limits = project_info.get_cardinality_limits();
2475            let buckets = self
2476                .cardinality_limit_buckets(scoping, limits, buckets)
2477                .await;
2478
2479            if buckets.is_empty() {
2480                continue;
2481            }
2482
2483            let retention = project_info
2484                .config
2485                .event_retention
2486                .unwrap_or(DEFAULT_EVENT_RETENTION);
2487
2488            // The store forwarder takes care of bucket splitting internally, so we can submit the
2489            // entire list of buckets. There is no batching needed here.
2490            store_forwarder.send(StoreMetrics {
2491                buckets,
2492                scoping,
2493                retention,
2494            });
2495        }
2496    }
2497
2498    /// Serializes metric buckets to JSON and sends them to the upstream.
2499    ///
2500    /// This function runs the following steps:
2501    ///  - partitioning
2502    ///  - batching by configured size limit
2503    ///  - serialize to JSON and pack in an envelope
2504    ///  - submit the envelope to upstream or kafka depending on configuration
2505    ///
2506    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2507    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2508    /// already.
2509    fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2510        let FlushBuckets {
2511            partition_key,
2512            buckets,
2513        } = message;
2514
2515        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2516        let upstream = self.inner.config.upstream_descriptor();
2517
2518        for ProjectBuckets {
2519            buckets, scoping, ..
2520        } in buckets.values()
2521        {
2522            let dsn = PartialDsn::outbound(scoping, upstream);
2523
2524            relay_statsd::metric!(
2525                distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2526            );
2527
2528            let mut num_batches = 0;
2529            for batch in BucketsView::from(buckets).by_size(batch_size) {
2530                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2531
2532                let mut item = Item::new(ItemType::MetricBuckets);
2533                item.set_source_quantities(crate::metrics::extract_quantities(batch));
2534                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2535                envelope.add_item(item);
2536
2537                let mut envelope =
2538                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2539                envelope
2540                    .set_partition_key(Some(partition_key))
2541                    .scope(*scoping);
2542
2543                relay_statsd::metric!(
2544                    distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2545                );
2546
2547                self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2548                num_batches += 1;
2549            }
2550
2551            relay_statsd::metric!(
2552                distribution(RelayDistributions::BatchesPerPartition) = num_batches
2553            );
2554        }
2555    }
2556
2557    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
2558    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2559        if partition.is_empty() {
2560            return;
2561        }
2562
2563        let (unencoded, project_info) = partition.take();
2564        let http_encoding = self.inner.config.http_encoding();
2565        let encoded = match encode_payload(&unencoded, http_encoding) {
2566            Ok(payload) => payload,
2567            Err(error) => {
2568                let error = &error as &dyn std::error::Error;
2569                relay_log::error!(error, "failed to encode metrics payload");
2570                return;
2571            }
2572        };
2573
2574        let request = SendMetricsRequest {
2575            partition_key: partition_key.to_string(),
2576            unencoded,
2577            encoded,
2578            project_info,
2579            http_encoding,
2580            metric_outcomes: self.inner.metric_outcomes.clone(),
2581        };
2582
2583        self.inner.addrs.upstream_relay.send(SendRequest(request));
2584    }
2585
2586    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
2587    ///
2588    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
2589    /// payload directly instead of per-project Envelopes.
2590    ///
2591    /// This function runs the following steps:
2592    ///  - partitioning
2593    ///  - batching by configured size limit
2594    ///  - serialize to JSON
2595    ///  - submit directly to the upstream
2596    ///
2597    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2598    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2599    /// already.
2600    fn encode_metrics_global(&self, message: FlushBuckets) {
2601        let FlushBuckets {
2602            partition_key,
2603            buckets,
2604        } = message;
2605
2606        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2607        let mut partition = Partition::new(batch_size);
2608        let mut partition_splits = 0;
2609
2610        for ProjectBuckets {
2611            buckets, scoping, ..
2612        } in buckets.values()
2613        {
2614            for bucket in buckets {
2615                let mut remaining = Some(BucketView::new(bucket));
2616
2617                while let Some(bucket) = remaining.take() {
2618                    if let Some(next) = partition.insert(bucket, *scoping) {
2619                        // A part of the bucket could not be inserted. Take the partition and submit
2620                        // it immediately. Repeat until the final part was inserted. This should
2621                        // always result in a request, otherwise we would enter an endless loop.
2622                        self.send_global_partition(partition_key, &mut partition);
2623                        remaining = Some(next);
2624                        partition_splits += 1;
2625                    }
2626                }
2627            }
2628        }
2629
2630        if partition_splits > 0 {
2631            metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2632        }
2633
2634        self.send_global_partition(partition_key, &mut partition);
2635    }
2636
2637    async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2638        for (project_key, pb) in message.buckets.iter_mut() {
2639            let buckets = std::mem::take(&mut pb.buckets);
2640            pb.buckets =
2641                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2642        }
2643
2644        #[cfg(feature = "processing")]
2645        if self.inner.config.processing_enabled()
2646            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2647        {
2648            return self
2649                .encode_metrics_processing(message, store_forwarder)
2650                .await;
2651        }
2652
2653        if self.inner.config.http_global_metrics() {
2654            self.encode_metrics_global(message)
2655        } else {
2656            self.encode_metrics_envelope(cogs, message)
2657        }
2658    }
2659
2660    #[cfg(all(test, feature = "processing"))]
2661    fn redis_rate_limiter_enabled(&self) -> bool {
2662        self.inner.rate_limiter.is_some()
2663    }
2664
2665    async fn handle_message(&self, message: EnvelopeProcessor) {
2666        let ty = message.variant();
2667        let feature_weights = self.feature_weights(&message);
2668
2669        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2670            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2671
2672            match message {
2673                EnvelopeProcessor::ProcessEnvelope(m) => {
2674                    self.handle_process_envelope(&mut cogs, *m).await
2675                }
2676                EnvelopeProcessor::ProcessProjectMetrics(m) => {
2677                    self.handle_process_metrics(&mut cogs, *m)
2678                }
2679                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2680                    self.handle_process_batched_metrics(&mut cogs, *m)
2681                }
2682                EnvelopeProcessor::FlushBuckets(m) => {
2683                    self.handle_flush_buckets(&mut cogs, *m).await
2684                }
2685                EnvelopeProcessor::SubmitClientReports(m) => {
2686                    self.handle_submit_client_reports(&mut cogs, *m)
2687                }
2688            }
2689        });
2690    }
2691
2692    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2693        match message {
2694            // Envelope is split later and tokens are attributed then.
2695            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2696            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2697            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2698            EnvelopeProcessor::FlushBuckets(v) => v
2699                .buckets
2700                .values()
2701                .map(|s| {
2702                    if self.inner.config.processing_enabled() {
2703                        // Processing does not encode the metrics but instead rate and cardinality
2704                        // limits the metrics, which scales by count and not size.
2705                        relay_metrics::cogs::ByCount(&s.buckets).into()
2706                    } else {
2707                        relay_metrics::cogs::BySize(&s.buckets).into()
2708                    }
2709                })
2710                .fold(FeatureWeights::none(), FeatureWeights::merge),
2711            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2712        }
2713    }
2714}
2715
2716impl Service for EnvelopeProcessorService {
2717    type Interface = EnvelopeProcessor;
2718
2719    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2720        while let Some(message) = rx.recv().await {
2721            let service = self.clone();
2722            self.inner
2723                .pool
2724                .spawn_async(
2725                    async move {
2726                        service.handle_message(message).await;
2727                    }
2728                    .boxed(),
2729                )
2730                .await;
2731        }
2732    }
2733}
2734
2735/// Result of the enforcement of rate limiting.
2736///
2737/// If the event is already `None` or it's rate limited, it will be `None`
2738/// within the [`Annotated`].
2739struct EnforcementResult {
2740    event: Annotated<Event>,
2741    #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2742    rate_limits: RateLimits,
2743}
2744
2745impl EnforcementResult {
2746    /// Creates a new [`EnforcementResult`].
2747    pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
2748        Self { event, rate_limits }
2749    }
2750}
2751
2752#[derive(Clone)]
2753enum RateLimiter {
2754    Cached,
2755    #[cfg(feature = "processing")]
2756    Consistent(Arc<RedisRateLimiter>),
2757}
2758
2759impl RateLimiter {
2760    async fn enforce<Group>(
2761        &self,
2762        managed_envelope: &mut TypedEnvelope<Group>,
2763        event: Annotated<Event>,
2764        _extracted_metrics: &mut ProcessingExtractedMetrics,
2765        ctx: processing::Context<'_>,
2766    ) -> Result<EnforcementResult, ProcessingError> {
2767        if managed_envelope.envelope().is_empty() && event.value().is_none() {
2768            return Ok(EnforcementResult::new(event, RateLimits::default()));
2769        }
2770
2771        let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
2772        if quotas.is_empty() {
2773            return Ok(EnforcementResult::new(event, RateLimits::default()));
2774        }
2775
2776        let event_category = event_category(&event);
2777
2778        // We extract the rate limiters, in case we perform consistent rate limiting, since we will
2779        // need Redis access.
2780        //
2781        // When invoking the rate limiter, capture if the event item has been rate limited to also
2782        // remove it from the processing state eventually.
2783        let this = self.clone();
2784        let mut envelope_limiter =
2785            EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
2786                let this = this.clone();
2787
2788                async move {
2789                    match this {
2790                        #[cfg(feature = "processing")]
2791                        RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
2792                            rate_limiter
2793                                .is_rate_limited(quotas, item_scope, _quantity, false)
2794                                .await?,
2795                        ),
2796                        _ => Ok::<_, ProcessingError>(
2797                            ctx.rate_limits.check_with_quotas(quotas, item_scope),
2798                        ),
2799                    }
2800                }
2801            });
2802
2803        // Tell the envelope limiter about the event, since it has been removed from the Envelope at
2804        // this stage in processing.
2805        if let Some(category) = event_category {
2806            envelope_limiter.assume_event(category);
2807        }
2808
2809        let scoping = managed_envelope.scoping();
2810        let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
2811            envelope_limiter
2812                .compute(managed_envelope.envelope_mut(), &scoping)
2813                .await
2814        })?;
2815        let event_active = enforcement.is_event_active();
2816
2817        // Use the same rate limits as used for the envelope on the metrics.
2818        // Those rate limits should not be checked for expiry or similar to ensure a consistent
2819        // limiting of envelope items and metrics.
2820        #[cfg(feature = "processing")]
2821        _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
2822        enforcement.apply_with_outcomes(managed_envelope);
2823
2824        if event_active {
2825            debug_assert!(managed_envelope.envelope().is_empty());
2826            return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
2827        }
2828
2829        Ok(EnforcementResult::new(event, rate_limits))
2830    }
2831
2832    fn name(&self) -> &'static str {
2833        match self {
2834            Self::Cached => "cached",
2835            #[cfg(feature = "processing")]
2836            Self::Consistent(_) => "consistent",
2837        }
2838    }
2839}
2840
2841pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2842    let envelope_body: Vec<u8> = match http_encoding {
2843        HttpEncoding::Identity => return Ok(body.clone()),
2844        HttpEncoding::Deflate => {
2845            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2846            encoder.write_all(body.as_ref())?;
2847            encoder.finish()?
2848        }
2849        HttpEncoding::Gzip => {
2850            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2851            encoder.write_all(body.as_ref())?;
2852            encoder.finish()?
2853        }
2854        HttpEncoding::Br => {
2855            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
2856            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2857            encoder.write_all(body.as_ref())?;
2858            encoder.into_inner()
2859        }
2860        HttpEncoding::Zstd => {
2861            // Use the fastest compression level, our main objective here is to get the best
2862            // compression ratio for least amount of time spent.
2863            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2864            encoder.write_all(body.as_ref())?;
2865            encoder.finish()?
2866        }
2867    };
2868
2869    Ok(envelope_body.into())
2870}
2871
2872/// An upstream request that submits an envelope via HTTP.
2873#[derive(Debug)]
2874pub struct SendEnvelope {
2875    pub envelope: TypedEnvelope<Processed>,
2876    pub body: Bytes,
2877    pub http_encoding: HttpEncoding,
2878    pub project_cache: ProjectCacheHandle,
2879}
2880
2881impl UpstreamRequest for SendEnvelope {
2882    fn method(&self) -> reqwest::Method {
2883        reqwest::Method::POST
2884    }
2885
2886    fn path(&self) -> Cow<'_, str> {
2887        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2888    }
2889
2890    fn route(&self) -> &'static str {
2891        "envelope"
2892    }
2893
2894    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2895        let envelope_body = self.body.clone();
2896        metric!(
2897            distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2898        );
2899
2900        let meta = &self.envelope.meta();
2901        let shard = self.envelope.partition_key().map(|p| p.to_string());
2902        builder
2903            .content_encoding(self.http_encoding)
2904            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2905            .header_opt("User-Agent", meta.user_agent())
2906            .header("X-Sentry-Auth", meta.auth_header())
2907            .header("X-Forwarded-For", meta.forwarded_for())
2908            .header("Content-Type", envelope::CONTENT_TYPE)
2909            .header_opt("X-Sentry-Relay-Shard", shard)
2910            .body(envelope_body);
2911
2912        Ok(())
2913    }
2914
2915    fn sign(&mut self) -> Option<Sign> {
2916        Some(Sign::Optional(SignatureType::RequestSign))
2917    }
2918
2919    fn respond(
2920        self: Box<Self>,
2921        result: Result<http::Response, UpstreamRequestError>,
2922    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2923        Box::pin(async move {
2924            let result = match result {
2925                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2926                Err(error) => Err(error),
2927            };
2928
2929            match result {
2930                Ok(()) => self.envelope.accept(),
2931                Err(error) if error.is_received() => {
2932                    let scoping = self.envelope.scoping();
2933                    self.envelope.accept();
2934
2935                    if let UpstreamRequestError::RateLimited(limits) = error {
2936                        self.project_cache
2937                            .get(scoping.project_key)
2938                            .rate_limits()
2939                            .merge(limits.scope(&scoping));
2940                    }
2941                }
2942                Err(error) => {
2943                    // Errors are only logged for what we consider an internal discard reason. These
2944                    // indicate errors in the infrastructure or implementation bugs.
2945                    let mut envelope = self.envelope;
2946                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2947                    relay_log::error!(
2948                        error = &error as &dyn Error,
2949                        tags.project_key = %envelope.scoping().project_key,
2950                        "error sending envelope"
2951                    );
2952                }
2953            }
2954        })
2955    }
2956}
2957
2958/// A container for metric buckets from multiple projects.
2959///
2960/// This container is used to send metrics to the upstream in global batches as part of the
2961/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
2962/// the size of all metrics and allows to split them into multiple batches. See
2963/// [`insert`](Self::insert) for more information.
2964#[derive(Debug)]
2965struct Partition<'a> {
2966    max_size: usize,
2967    remaining: usize,
2968    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2969    project_info: HashMap<ProjectKey, Scoping>,
2970}
2971
2972impl<'a> Partition<'a> {
2973    /// Creates a new partition with the given maximum size in bytes.
2974    pub fn new(size: usize) -> Self {
2975        Self {
2976            max_size: size,
2977            remaining: size,
2978            views: HashMap::new(),
2979            project_info: HashMap::new(),
2980        }
2981    }
2982
2983    /// Inserts a bucket into the partition, splitting it if necessary.
2984    ///
2985    /// This function attempts to add the bucket to this partition. If the bucket does not fit
2986    /// entirely into the partition given its maximum size, the remaining part of the bucket is
2987    /// returned from this function call.
2988    ///
2989    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
2990    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
2991    /// partition. Afterwards, the caller is responsible to call this function again with the
2992    /// remaining bucket until it is fully inserted.
2993    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
2994        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
2995
2996        if let Some(current) = current {
2997            self.remaining = self.remaining.saturating_sub(current.estimated_size());
2998            self.views
2999                .entry(scoping.project_key)
3000                .or_default()
3001                .push(current);
3002
3003            self.project_info
3004                .entry(scoping.project_key)
3005                .or_insert(scoping);
3006        }
3007
3008        next
3009    }
3010
3011    /// Returns `true` if the partition does not hold any data.
3012    fn is_empty(&self) -> bool {
3013        self.views.is_empty()
3014    }
3015
3016    /// Returns the serialized buckets for this partition.
3017    ///
3018    /// This empties the partition, so that it can be reused.
3019    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3020        #[derive(serde::Serialize)]
3021        struct Wrapper<'a> {
3022            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3023        }
3024
3025        let buckets = &self.views;
3026        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3027
3028        let scopings = self.project_info.clone();
3029        self.project_info.clear();
3030
3031        self.views.clear();
3032        self.remaining = self.max_size;
3033
3034        (payload, scopings)
3035    }
3036}
3037
3038/// An upstream request that submits metric buckets via HTTP.
3039///
3040/// This request is not awaited. It automatically tracks outcomes if the request is not received.
3041#[derive(Debug)]
3042struct SendMetricsRequest {
3043    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
3044    partition_key: String,
3045    /// Serialized metric buckets without encoding applied, used for signing.
3046    unencoded: Bytes,
3047    /// Serialized metric buckets with the stated HTTP encoding applied.
3048    encoded: Bytes,
3049    /// Mapping of all contained project keys to their scoping and extraction mode.
3050    ///
3051    /// Used to track outcomes for transmission failures.
3052    project_info: HashMap<ProjectKey, Scoping>,
3053    /// Encoding (compression) of the payload.
3054    http_encoding: HttpEncoding,
3055    /// Metric outcomes instance to send outcomes on error.
3056    metric_outcomes: MetricOutcomes,
3057}
3058
3059impl SendMetricsRequest {
3060    fn create_error_outcomes(self) {
3061        #[derive(serde::Deserialize)]
3062        struct Wrapper {
3063            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3064        }
3065
3066        let buckets = match serde_json::from_slice(&self.unencoded) {
3067            Ok(Wrapper { buckets }) => buckets,
3068            Err(err) => {
3069                relay_log::error!(
3070                    error = &err as &dyn std::error::Error,
3071                    "failed to parse buckets from failed transmission"
3072                );
3073                return;
3074            }
3075        };
3076
3077        for (key, buckets) in buckets {
3078            let Some(&scoping) = self.project_info.get(&key) else {
3079                relay_log::error!("missing scoping for project key");
3080                continue;
3081            };
3082
3083            self.metric_outcomes.track(
3084                scoping,
3085                &buckets,
3086                Outcome::Invalid(DiscardReason::Internal),
3087            );
3088        }
3089    }
3090}
3091
3092impl UpstreamRequest for SendMetricsRequest {
3093    fn set_relay_id(&self) -> bool {
3094        true
3095    }
3096
3097    fn sign(&mut self) -> Option<Sign> {
3098        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3099    }
3100
3101    fn method(&self) -> reqwest::Method {
3102        reqwest::Method::POST
3103    }
3104
3105    fn path(&self) -> Cow<'_, str> {
3106        "/api/0/relays/metrics/".into()
3107    }
3108
3109    fn route(&self) -> &'static str {
3110        "global_metrics"
3111    }
3112
3113    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3114        metric!(
3115            distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3116        );
3117
3118        builder
3119            .content_encoding(self.http_encoding)
3120            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3121            .header(header::CONTENT_TYPE, b"application/json")
3122            .body(self.encoded.clone());
3123
3124        Ok(())
3125    }
3126
3127    fn respond(
3128        self: Box<Self>,
3129        result: Result<http::Response, UpstreamRequestError>,
3130    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3131        Box::pin(async {
3132            match result {
3133                Ok(mut response) => {
3134                    response.consume().await.ok();
3135                }
3136                Err(error) => {
3137                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3138
3139                    // If the request did not arrive at the upstream, we are responsible for outcomes.
3140                    // Otherwise, the upstream is responsible to log outcomes.
3141                    if error.is_received() {
3142                        return;
3143                    }
3144
3145                    self.create_error_outcomes()
3146                }
3147            }
3148        })
3149    }
3150}
3151
3152/// Container for global and project level [`Quota`].
3153#[derive(Copy, Clone, Debug)]
3154struct CombinedQuotas<'a> {
3155    global_quotas: &'a [Quota],
3156    project_quotas: &'a [Quota],
3157}
3158
3159impl<'a> CombinedQuotas<'a> {
3160    /// Returns a new [`CombinedQuotas`].
3161    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3162        Self {
3163            global_quotas: &global_config.quotas,
3164            project_quotas,
3165        }
3166    }
3167
3168    /// Returns `true` if both global quotas and project quotas are empty.
3169    pub fn is_empty(&self) -> bool {
3170        self.len() == 0
3171    }
3172
3173    /// Returns the number of both global and project quotas.
3174    pub fn len(&self) -> usize {
3175        self.global_quotas.len() + self.project_quotas.len()
3176    }
3177}
3178
3179impl<'a> IntoIterator for CombinedQuotas<'a> {
3180    type Item = &'a Quota;
3181    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3182
3183    fn into_iter(self) -> Self::IntoIter {
3184        self.global_quotas.iter().chain(self.project_quotas.iter())
3185    }
3186}
3187
3188#[cfg(test)]
3189mod tests {
3190    use insta::assert_debug_snapshot;
3191    use relay_common::glob2::LazyGlob;
3192    use relay_dynamic_config::ProjectConfig;
3193    use relay_event_normalization::{
3194        NormalizationConfig, RedactionRule, TransactionNameConfig, TransactionNameRule,
3195    };
3196    use relay_event_schema::protocol::TransactionSource;
3197    use relay_pii::DataScrubbingConfig;
3198    use similar_asserts::assert_eq;
3199
3200    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3201
3202    #[cfg(feature = "processing")]
3203    use {
3204        relay_metrics::BucketValue,
3205        relay_quotas::{QuotaScope, ReasonCode},
3206        relay_test::mock_service,
3207    };
3208
3209    use super::*;
3210
3211    #[cfg(feature = "processing")]
3212    fn mock_quota(id: &str) -> Quota {
3213        Quota {
3214            id: Some(id.into()),
3215            categories: [DataCategory::MetricBucket].into(),
3216            scope: QuotaScope::Organization,
3217            scope_id: None,
3218            limit: Some(0),
3219            window: None,
3220            reason_code: None,
3221            namespace: None,
3222        }
3223    }
3224
3225    #[cfg(feature = "processing")]
3226    #[test]
3227    fn test_dynamic_quotas() {
3228        let global_config = GlobalConfig {
3229            quotas: vec![mock_quota("foo"), mock_quota("bar")],
3230            ..Default::default()
3231        };
3232
3233        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3234
3235        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3236
3237        assert_eq!(dynamic_quotas.len(), 4);
3238        assert!(!dynamic_quotas.is_empty());
3239
3240        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3241        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3242    }
3243
3244    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
3245    /// also ratelimit the next batches in the same message automatically.
3246    #[cfg(feature = "processing")]
3247    #[tokio::test]
3248    async fn test_ratelimit_per_batch() {
3249        use relay_base_schema::organization::OrganizationId;
3250        use relay_protocol::FiniteF64;
3251
3252        let rate_limited_org = Scoping {
3253            organization_id: OrganizationId::new(1),
3254            project_id: ProjectId::new(21),
3255            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3256            key_id: Some(17),
3257        };
3258
3259        let not_rate_limited_org = Scoping {
3260            organization_id: OrganizationId::new(2),
3261            project_id: ProjectId::new(21),
3262            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3263            key_id: Some(17),
3264        };
3265
3266        let message = {
3267            let project_info = {
3268                let quota = Quota {
3269                    id: Some("testing".into()),
3270                    categories: [DataCategory::MetricBucket].into(),
3271                    scope: relay_quotas::QuotaScope::Organization,
3272                    scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3273                    limit: Some(0),
3274                    window: None,
3275                    reason_code: Some(ReasonCode::new("test")),
3276                    namespace: None,
3277                };
3278
3279                let mut config = ProjectConfig::default();
3280                config.quotas.push(quota);
3281
3282                Arc::new(ProjectInfo {
3283                    config,
3284                    ..Default::default()
3285                })
3286            };
3287
3288            let project_metrics = |scoping| ProjectBuckets {
3289                buckets: vec![Bucket {
3290                    name: "d:transactions/bar".into(),
3291                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3292                    timestamp: UnixTimestamp::now(),
3293                    tags: Default::default(),
3294                    width: 10,
3295                    metadata: BucketMetadata::default(),
3296                }],
3297                rate_limits: Default::default(),
3298                project_info: project_info.clone(),
3299                scoping,
3300            };
3301
3302            let buckets = hashbrown::HashMap::from([
3303                (
3304                    rate_limited_org.project_key,
3305                    project_metrics(rate_limited_org),
3306                ),
3307                (
3308                    not_rate_limited_org.project_key,
3309                    project_metrics(not_rate_limited_org),
3310                ),
3311            ]);
3312
3313            FlushBuckets {
3314                partition_key: 0,
3315                buckets,
3316            }
3317        };
3318
3319        // ensure the order of the map while iterating is as expected.
3320        assert_eq!(message.buckets.keys().count(), 2);
3321
3322        let config = {
3323            let config_json = serde_json::json!({
3324                "processing": {
3325                    "enabled": true,
3326                    "kafka_config": [],
3327                    "redis": {
3328                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3329                    }
3330                }
3331            });
3332            Config::from_json_value(config_json).unwrap()
3333        };
3334
3335        let (store, handle) = {
3336            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3337                let org_id = match msg {
3338                    Store::Metrics(x) => x.scoping.organization_id,
3339                    _ => panic!("received envelope when expecting only metrics"),
3340                };
3341                org_ids.push(org_id);
3342            };
3343
3344            mock_service("store_forwarder", vec![], f)
3345        };
3346
3347        let processor = create_test_processor(config).await;
3348        assert!(processor.redis_rate_limiter_enabled());
3349
3350        processor.encode_metrics_processing(message, &store).await;
3351
3352        drop(store);
3353        let orgs_not_ratelimited = handle.await.unwrap();
3354
3355        assert_eq!(
3356            orgs_not_ratelimited,
3357            vec![not_rate_limited_org.organization_id]
3358        );
3359    }
3360
3361    #[tokio::test]
3362    async fn test_browser_version_extraction_with_pii_like_data() {
3363        let processor = create_test_processor(Default::default()).await;
3364        let outcome_aggregator = Addr::dummy();
3365        let event_id = EventId::new();
3366
3367        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3368            .parse()
3369            .unwrap();
3370
3371        let request_meta = RequestMeta::new(dsn);
3372        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3373
3374        envelope.add_item({
3375                let mut item = Item::new(ItemType::Event);
3376                item.set_payload(
3377                    ContentType::Json,
3378                    r#"
3379                    {
3380                        "request": {
3381                            "headers": [
3382                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3383                            ]
3384                        }
3385                    }
3386                "#,
3387                );
3388                item
3389            });
3390
3391        let mut datascrubbing_settings = DataScrubbingConfig::default();
3392        // enable all the default scrubbing
3393        datascrubbing_settings.scrub_data = true;
3394        datascrubbing_settings.scrub_defaults = true;
3395        datascrubbing_settings.scrub_ip_addresses = true;
3396
3397        // Make sure to mask any IP-like looking data
3398        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3399
3400        let config = ProjectConfig {
3401            datascrubbing_settings,
3402            pii_config: Some(pii_config),
3403            ..Default::default()
3404        };
3405
3406        let project_info = ProjectInfo {
3407            config,
3408            ..Default::default()
3409        };
3410
3411        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3412        assert_eq!(envelopes.len(), 1);
3413
3414        let (group, envelope) = envelopes.pop().unwrap();
3415        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3416
3417        let message = ProcessEnvelopeGrouped {
3418            group,
3419            envelope,
3420            ctx: processing::Context {
3421                project_info: &project_info,
3422                ..processing::Context::for_test()
3423            },
3424        };
3425
3426        let Ok(Some(Submit::Envelope(mut new_envelope))) = processor.process(message).await else {
3427            panic!();
3428        };
3429        let new_envelope = new_envelope.envelope_mut();
3430
3431        let event_item = new_envelope.items().last().unwrap();
3432        let annotated_event: Annotated<Event> =
3433            Annotated::from_json_bytes(&event_item.payload()).unwrap();
3434        let event = annotated_event.into_value().unwrap();
3435        let headers = event
3436            .request
3437            .into_value()
3438            .unwrap()
3439            .headers
3440            .into_value()
3441            .unwrap();
3442
3443        // IP-like data must be masked
3444        assert_eq!(
3445            Some(
3446                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3447            ),
3448            headers.get_header("User-Agent")
3449        );
3450        // But we still get correct browser and version number
3451        let contexts = event.contexts.into_value().unwrap();
3452        let browser = contexts.0.get("browser").unwrap();
3453        assert_eq!(
3454            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3455            browser.to_json().unwrap()
3456        );
3457    }
3458
3459    #[tokio::test]
3460    #[cfg(feature = "processing")]
3461    async fn test_materialize_dsc() {
3462        use crate::services::projects::project::PublicKeyConfig;
3463
3464        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3465            .parse()
3466            .unwrap();
3467        let request_meta = RequestMeta::new(dsn);
3468        let mut envelope = Envelope::from_request(None, request_meta);
3469
3470        let dsc = r#"{
3471            "trace_id": "00000000-0000-0000-0000-000000000000",
3472            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3473            "sample_rate": "0.2"
3474        }"#;
3475        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3476
3477        let mut item = Item::new(ItemType::Event);
3478        item.set_payload(ContentType::Json, r#"{}"#);
3479        envelope.add_item(item);
3480
3481        let outcome_aggregator = Addr::dummy();
3482        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3483
3484        let mut project_info = ProjectInfo::default();
3485        project_info.public_keys.push(PublicKeyConfig {
3486            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3487            numeric_id: Some(1),
3488        });
3489
3490        let config = serde_json::json!({
3491            "processing": {
3492                "enabled": true,
3493                "kafka_config": [],
3494            }
3495        });
3496
3497        let message = ProcessEnvelopeGrouped {
3498            group: ProcessingGroup::Error,
3499            envelope: managed_envelope,
3500            ctx: processing::Context {
3501                config: &Config::from_json_value(config.clone()).unwrap(),
3502                project_info: &project_info,
3503                sampling_project_info: Some(&project_info),
3504                ..processing::Context::for_test()
3505            },
3506        };
3507
3508        let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3509        let Ok(Some(Submit::Envelope(envelope))) = processor.process(message).await else {
3510            panic!();
3511        };
3512        let event = envelope
3513            .envelope()
3514            .get_item_by(|item| item.ty() == &ItemType::Event)
3515            .unwrap();
3516
3517        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3518        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3519        Object(
3520            {
3521                "environment": ~,
3522                "public_key": String(
3523                    "e12d836b15bb49d7bbf99e64295d995b",
3524                ),
3525                "release": ~,
3526                "replay_id": ~,
3527                "sample_rate": String(
3528                    "0.2",
3529                ),
3530                "trace_id": String(
3531                    "00000000000000000000000000000000",
3532                ),
3533                "transaction": ~,
3534            },
3535        )
3536        "###);
3537    }
3538
3539    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3540        let mut event = Annotated::<Event>::from_json(
3541            r#"
3542            {
3543                "type": "transaction",
3544                "transaction": "/foo/",
3545                "timestamp": 946684810.0,
3546                "start_timestamp": 946684800.0,
3547                "contexts": {
3548                    "trace": {
3549                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3550                        "span_id": "fa90fdead5f74053",
3551                        "op": "http.server",
3552                        "type": "trace"
3553                    }
3554                },
3555                "transaction_info": {
3556                    "source": "url"
3557                }
3558            }
3559            "#,
3560        )
3561        .unwrap();
3562        let e = event.value_mut().as_mut().unwrap();
3563        e.transaction.set_value(Some(transaction_name.into()));
3564
3565        e.transaction_info
3566            .value_mut()
3567            .as_mut()
3568            .unwrap()
3569            .source
3570            .set_value(Some(source));
3571
3572        relay_statsd::with_capturing_test_client(|| {
3573            utils::log_transaction_name_metrics(&mut event, |event| {
3574                let config = NormalizationConfig {
3575                    transaction_name_config: TransactionNameConfig {
3576                        rules: &[TransactionNameRule {
3577                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
3578                            expiry: DateTime::<Utc>::MAX_UTC,
3579                            redaction: RedactionRule::Replace {
3580                                substitution: "*".to_owned(),
3581                            },
3582                        }],
3583                    },
3584                    ..Default::default()
3585                };
3586                relay_event_normalization::normalize_event(event, &config)
3587            });
3588        })
3589    }
3590
3591    #[test]
3592    fn test_log_transaction_metrics_none() {
3593        let captures = capture_test_event("/nothing", TransactionSource::Url);
3594        insta::assert_debug_snapshot!(captures, @r###"
3595        [
3596            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3597        ]
3598        "###);
3599    }
3600
3601    #[test]
3602    fn test_log_transaction_metrics_rule() {
3603        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3604        insta::assert_debug_snapshot!(captures, @r###"
3605        [
3606            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3607        ]
3608        "###);
3609    }
3610
3611    #[test]
3612    fn test_log_transaction_metrics_pattern() {
3613        let captures = capture_test_event("/something/12345", TransactionSource::Url);
3614        insta::assert_debug_snapshot!(captures, @r###"
3615        [
3616            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3617        ]
3618        "###);
3619    }
3620
3621    #[test]
3622    fn test_log_transaction_metrics_both() {
3623        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3624        insta::assert_debug_snapshot!(captures, @r###"
3625        [
3626            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3627        ]
3628        "###);
3629    }
3630
3631    #[test]
3632    fn test_log_transaction_metrics_no_match() {
3633        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3634        insta::assert_debug_snapshot!(captures, @r###"
3635        [
3636            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3637        ]
3638        "###);
3639    }
3640
3641    #[tokio::test]
3642    async fn test_process_metrics_bucket_metadata() {
3643        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3644        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3645        let received_at = Utc::now();
3646        let config = Config::default();
3647
3648        let (aggregator, mut aggregator_rx) = Addr::custom();
3649        let processor = create_test_processor_with_addrs(
3650            config,
3651            Addrs {
3652                aggregator,
3653                ..Default::default()
3654            },
3655        )
3656        .await;
3657
3658        let mut item = Item::new(ItemType::Statsd);
3659        item.set_payload(
3660            ContentType::Text,
3661            "transactions/foo:3182887624:4267882815|s",
3662        );
3663        for (source, expected_received_at) in [
3664            (
3665                BucketSource::External,
3666                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3667            ),
3668            (BucketSource::Internal, None),
3669        ] {
3670            let message = ProcessMetrics {
3671                data: MetricData::Raw(vec![item.clone()]),
3672                project_key,
3673                source,
3674                received_at,
3675                sent_at: Some(Utc::now()),
3676            };
3677            processor.handle_process_metrics(&mut token, message);
3678
3679            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3680            let buckets = merge_buckets.buckets;
3681            assert_eq!(buckets.len(), 1);
3682            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3683        }
3684    }
3685
3686    #[tokio::test]
3687    async fn test_process_batched_metrics() {
3688        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3689        let received_at = Utc::now();
3690        let config = Config::default();
3691
3692        let (aggregator, mut aggregator_rx) = Addr::custom();
3693        let processor = create_test_processor_with_addrs(
3694            config,
3695            Addrs {
3696                aggregator,
3697                ..Default::default()
3698            },
3699        )
3700        .await;
3701
3702        let payload = r#"{
3703    "buckets": {
3704        "11111111111111111111111111111111": [
3705            {
3706                "timestamp": 1615889440,
3707                "width": 0,
3708                "name": "d:custom/endpoint.response_time@millisecond",
3709                "type": "d",
3710                "value": [
3711                  68.0
3712                ],
3713                "tags": {
3714                  "route": "user_index"
3715                }
3716            }
3717        ],
3718        "22222222222222222222222222222222": [
3719            {
3720                "timestamp": 1615889440,
3721                "width": 0,
3722                "name": "d:custom/endpoint.cache_rate@none",
3723                "type": "d",
3724                "value": [
3725                  36.0
3726                ]
3727            }
3728        ]
3729    }
3730}
3731"#;
3732        let message = ProcessBatchedMetrics {
3733            payload: Bytes::from(payload),
3734            source: BucketSource::Internal,
3735            received_at,
3736            sent_at: Some(Utc::now()),
3737        };
3738        processor.handle_process_batched_metrics(&mut token, message);
3739
3740        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3741        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3742
3743        let mut messages = vec![mb1, mb2];
3744        messages.sort_by_key(|mb| mb.project_key);
3745
3746        let actual = messages
3747            .into_iter()
3748            .map(|mb| (mb.project_key, mb.buckets))
3749            .collect::<Vec<_>>();
3750
3751        assert_debug_snapshot!(actual, @r###"
3752        [
3753            (
3754                ProjectKey("11111111111111111111111111111111"),
3755                [
3756                    Bucket {
3757                        timestamp: UnixTimestamp(1615889440),
3758                        width: 0,
3759                        name: MetricName(
3760                            "d:custom/endpoint.response_time@millisecond",
3761                        ),
3762                        value: Distribution(
3763                            [
3764                                68.0,
3765                            ],
3766                        ),
3767                        tags: {
3768                            "route": "user_index",
3769                        },
3770                        metadata: BucketMetadata {
3771                            merges: 1,
3772                            received_at: None,
3773                            extracted_from_indexed: false,
3774                        },
3775                    },
3776                ],
3777            ),
3778            (
3779                ProjectKey("22222222222222222222222222222222"),
3780                [
3781                    Bucket {
3782                        timestamp: UnixTimestamp(1615889440),
3783                        width: 0,
3784                        name: MetricName(
3785                            "d:custom/endpoint.cache_rate@none",
3786                        ),
3787                        value: Distribution(
3788                            [
3789                                36.0,
3790                            ],
3791                        ),
3792                        tags: {},
3793                        metadata: BucketMetadata {
3794                            merges: 1,
3795                            received_at: None,
3796                            extracted_from_indexed: false,
3797                        },
3798                    },
3799                ],
3800            ),
3801        ]
3802        "###);
3803    }
3804}