Skip to main content

relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::Feature;
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{ClientReport, Event, EventId, NetworkReportError, SpanV2};
27use relay_filter::FilterStatKey;
28use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
29use relay_protocol::Annotated;
30use relay_quotas::{DataCategory, RateLimits, Scoping};
31use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision};
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, NoResponse, Service};
34use reqwest::header;
35use smallvec::{SmallVec, smallvec};
36use zstd::stream::Encoder as ZstdEncoder;
37
38use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType};
39use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
40use crate::integrations::Integration;
41use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
42use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
43use crate::metrics_extraction::ExtractedMetrics;
44use crate::processing::attachments::AttachmentProcessor;
45use crate::processing::check_ins::CheckInsProcessor;
46use crate::processing::client_reports::ClientReportsProcessor;
47use crate::processing::errors::{ErrorsProcessor, SwitchProcessingError};
48use crate::processing::legacy_spans::LegacySpansProcessor;
49use crate::processing::logs::LogsProcessor;
50use crate::processing::profile_chunks::ProfileChunksProcessor;
51use crate::processing::profiles::ProfilesProcessor;
52use crate::processing::replays::ReplaysProcessor;
53use crate::processing::sessions::SessionsProcessor;
54use crate::processing::spans::SpansProcessor;
55use crate::processing::trace_attachments::TraceAttachmentsProcessor;
56use crate::processing::trace_metrics::TraceMetricsProcessor;
57use crate::processing::transactions::TransactionProcessor;
58use crate::processing::user_reports::UserReportsProcessor;
59use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
60use crate::service::ServiceError;
61use crate::services::global_config::GlobalConfigHandle;
62use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
63use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
64use crate::services::projects::cache::ProjectCacheHandle;
65use crate::services::projects::project::{ProjectInfo, ProjectState};
66use crate::services::upstream::{
67    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
68};
69use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
70use crate::utils;
71use crate::{http, processing};
72use relay_threading::AsyncPool;
73use symbolic_unreal::{Unreal4Error, Unreal4ErrorKind};
74#[cfg(feature = "processing")]
75use {
76    crate::services::objectstore::Objectstore,
77    crate::services::store::Store,
78    itertools::Itertools,
79    relay_dynamic_config::GlobalConfig,
80    relay_quotas::{Quota, RateLimitingError, RedisRateLimiter},
81    relay_redis::RedisClients,
82    std::time::Instant,
83};
84
85pub mod event;
86mod metrics;
87mod nel;
88pub mod span;
89
90#[cfg(all(sentry, feature = "processing"))]
91pub mod playstation;
92
93/// The minimum clock drift for correction to apply.
94pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
95
96#[derive(Debug)]
97pub struct GroupTypeError;
98
99impl Display for GroupTypeError {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        f.write_str("failed to convert processing group into corresponding type")
102    }
103}
104
105impl std::error::Error for GroupTypeError {}
106
107macro_rules! processing_group {
108    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
109        #[derive(Clone, Copy, Debug)]
110        pub struct $ty;
111
112        impl From<$ty> for ProcessingGroup {
113            fn from(_: $ty) -> Self {
114                ProcessingGroup::$variant
115            }
116        }
117
118        impl TryFrom<ProcessingGroup> for $ty {
119            type Error = GroupTypeError;
120
121            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
122                if matches!(value, ProcessingGroup::$variant) {
123                    return Ok($ty);
124                }
125                $($(
126                    if matches!(value, ProcessingGroup::$other) {
127                        return Ok($ty);
128                    }
129                )+)?
130                return Err(GroupTypeError);
131            }
132        }
133    };
134}
135
136processing_group!(TransactionGroup, Transaction);
137
138processing_group!(ErrorGroup, Error);
139
140processing_group!(SessionGroup, Session);
141processing_group!(ClientReportGroup, ClientReport);
142processing_group!(ReplayGroup, Replay);
143processing_group!(CheckInGroup, CheckIn);
144processing_group!(LogGroup, Log, Nel);
145processing_group!(TraceMetricGroup, TraceMetric);
146processing_group!(SpanGroup, Span);
147
148processing_group!(ProfileChunkGroup, ProfileChunk);
149processing_group!(MetricsGroup, Metrics);
150processing_group!(ForwardUnknownGroup, ForwardUnknown);
151processing_group!(Ungrouped, Ungrouped);
152
153/// Processed group type marker.
154///
155/// Marks the envelopes which passed through the processing pipeline.
156#[derive(Clone, Copy, Debug)]
157pub struct Processed;
158
159/// Describes the groups of the processable items.
160#[derive(Clone, Copy, Debug)]
161pub enum ProcessingGroup {
162    /// All the transaction related items.
163    ///
164    /// Includes transactions, related attachments, profiles.
165    Transaction,
166    /// All the items which require (have or create) events.
167    ///
168    /// This includes: errors, NEL, security reports, user reports, some of the
169    /// attachments.
170    Error,
171    /// Session events.
172    Session,
173    /// Standalone attachments
174    ///
175    /// Attachments that are send without an item that creates an event in the same envelope.
176    StandaloneAttachments,
177    /// Standalone user reports
178    ///
179    /// User reports that are send without an item that creates an event in the same envelope.
180    StandaloneUserReports,
181    /// Standalone profiles
182    ///
183    /// Profiles which had their transaction sampled.
184    StandaloneProfiles,
185    /// Outcomes.
186    ClientReport,
187    /// Replays and ReplayRecordings.
188    Replay,
189    /// Crons.
190    CheckIn,
191    /// NEL reports.
192    Nel,
193    /// Logs.
194    Log,
195    /// Trace metrics.
196    TraceMetric,
197    /// Spans.
198    Span,
199    /// Span V2 spans.
200    SpanV2,
201    /// Metrics.
202    Metrics,
203    /// ProfileChunk.
204    ProfileChunk,
205    /// V2 attachments without span / log association.
206    TraceAttachment,
207    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
208    /// decide what to do with them.
209    ForwardUnknown,
210    /// All the items in the envelope that could not be grouped.
211    Ungrouped,
212}
213
214impl ProcessingGroup {
215    /// Splits provided envelope into list of tuples of groups with associated envelopes.
216    fn split_envelope(
217        mut envelope: Envelope,
218        project_info: &ProjectInfo,
219    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
220        let headers = envelope.headers().clone();
221        let mut grouped_envelopes = smallvec![];
222
223        // Extract replays.
224        let replay_items = envelope.take_items_by(|item| {
225            matches!(
226                item.ty(),
227                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
228            )
229        });
230        if !replay_items.is_empty() {
231            grouped_envelopes.push((
232                ProcessingGroup::Replay,
233                Envelope::from_parts(headers.clone(), replay_items),
234            ))
235        }
236
237        // Keep all the sessions together in one envelope.
238        let session_items = envelope
239            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
240        if !session_items.is_empty() {
241            grouped_envelopes.push((
242                ProcessingGroup::Session,
243                Envelope::from_parts(headers.clone(), session_items),
244            ))
245        }
246
247        let span_v2_items = envelope.take_items_by(|item| {
248            let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
249
250            ItemContainer::<SpanV2>::is_container(item)
251                || matches!(item.integration(), Some(Integration::Spans(_)))
252                // Process standalone spans (v1) via the v2 pipeline
253                || (exp_feature && matches!(item.ty(), &ItemType::Span))
254                || (exp_feature && item.is_span_attachment())
255        });
256
257        if !span_v2_items.is_empty() {
258            grouped_envelopes.push((
259                ProcessingGroup::SpanV2,
260                Envelope::from_parts(headers.clone(), span_v2_items),
261            ))
262        }
263
264        // Extract spans.
265        let span_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Span));
266        if !span_items.is_empty() {
267            grouped_envelopes.push((
268                ProcessingGroup::Span,
269                Envelope::from_parts(headers.clone(), span_items),
270            ))
271        }
272
273        // Extract logs.
274        let logs_items = envelope.take_items_by(|item| {
275            matches!(item.ty(), &ItemType::Log)
276                || matches!(item.integration(), Some(Integration::Logs(_)))
277        });
278        if !logs_items.is_empty() {
279            grouped_envelopes.push((
280                ProcessingGroup::Log,
281                Envelope::from_parts(headers.clone(), logs_items),
282            ))
283        }
284
285        // Extract trace metrics.
286        let trace_metric_items =
287            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
288        if !trace_metric_items.is_empty() {
289            grouped_envelopes.push((
290                ProcessingGroup::TraceMetric,
291                Envelope::from_parts(headers.clone(), trace_metric_items),
292            ))
293        }
294
295        // NEL items are transformed into logs in their own processing step.
296        let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
297        if !nel_items.is_empty() {
298            grouped_envelopes.push((
299                ProcessingGroup::Nel,
300                Envelope::from_parts(headers.clone(), nel_items),
301            ))
302        }
303
304        // Extract all metric items.
305        //
306        // Note: Should only be relevant in proxy mode. In other modes we send metrics through
307        // a separate pipeline.
308        let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
309        if !metric_items.is_empty() {
310            grouped_envelopes.push((
311                ProcessingGroup::Metrics,
312                Envelope::from_parts(headers.clone(), metric_items),
313            ))
314        }
315
316        // Extract profile chunks.
317        let profile_chunk_items =
318            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
319        if !profile_chunk_items.is_empty() {
320            grouped_envelopes.push((
321                ProcessingGroup::ProfileChunk,
322                Envelope::from_parts(headers.clone(), profile_chunk_items),
323            ))
324        }
325
326        let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
327        if !trace_attachment_items.is_empty() {
328            grouped_envelopes.push((
329                ProcessingGroup::TraceAttachment,
330                Envelope::from_parts(headers.clone(), trace_attachment_items),
331            ))
332        }
333
334        if !envelope.items().any(Item::creates_event) {
335            // Extract the standalone attachments
336            let standalone_attachments = envelope
337                .take_items_by(|i| i.requires_event() && matches!(i.ty(), ItemType::Attachment));
338            if !standalone_attachments.is_empty() {
339                grouped_envelopes.push((
340                    ProcessingGroup::StandaloneAttachments,
341                    Envelope::from_parts(headers.clone(), standalone_attachments),
342                ))
343            }
344
345            // Extract the standalone user reports
346            let standalone_user_reports =
347                envelope.take_items_by(|i| matches!(i.ty(), ItemType::UserReport));
348            if !standalone_user_reports.is_empty() {
349                grouped_envelopes.push((
350                    ProcessingGroup::StandaloneUserReports,
351                    Envelope::from_parts(headers.clone(), standalone_user_reports),
352                ));
353            }
354
355            // Extract the standalone profiles
356            let standalone_profiles =
357                envelope.take_items_by(|i| matches!(i.ty(), ItemType::Profile));
358            if !standalone_profiles.is_empty() {
359                grouped_envelopes.push((
360                    ProcessingGroup::StandaloneProfiles,
361                    Envelope::from_parts(headers.clone(), standalone_profiles),
362                ));
363            }
364        }
365
366        // Make sure we create separate envelopes for each `RawSecurity` report.
367        let security_reports_items = envelope
368            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
369            .into_iter()
370            .map(|item| {
371                let headers = headers.clone();
372                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
373                let mut envelope = Envelope::from_parts(headers, items);
374                envelope.set_event_id(EventId::new());
375                (ProcessingGroup::Error, envelope)
376            });
377        grouped_envelopes.extend(security_reports_items);
378
379        // Extract all the items which require an event into separate envelope.
380        let require_event_items = envelope.take_items_by(Item::requires_event);
381        if !require_event_items.is_empty() {
382            let group = if require_event_items
383                .iter()
384                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
385            {
386                ProcessingGroup::Transaction
387            } else {
388                ProcessingGroup::Error
389            };
390
391            grouped_envelopes.push((
392                group,
393                Envelope::from_parts(headers.clone(), require_event_items),
394            ))
395        }
396
397        // Get the rest of the envelopes, one per item.
398        let envelopes = envelope.items_mut().map(|item| {
399            let headers = headers.clone();
400            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
401            let envelope = Envelope::from_parts(headers, items);
402            let item_type = item.ty();
403            let group = if matches!(item_type, &ItemType::CheckIn) {
404                ProcessingGroup::CheckIn
405            } else if matches!(item.ty(), &ItemType::ClientReport) {
406                ProcessingGroup::ClientReport
407            } else if matches!(item_type, &ItemType::Unknown(_)) {
408                ProcessingGroup::ForwardUnknown
409            } else {
410                // Cannot group this item type.
411                ProcessingGroup::Ungrouped
412            };
413
414            (group, envelope)
415        });
416        grouped_envelopes.extend(envelopes);
417
418        grouped_envelopes
419    }
420
421    /// Returns the name of the group.
422    pub fn variant(&self) -> &'static str {
423        match self {
424            ProcessingGroup::Transaction => "transaction",
425            ProcessingGroup::Error => "error",
426            ProcessingGroup::Session => "session",
427            ProcessingGroup::StandaloneAttachments => "standalone_attachment",
428            ProcessingGroup::StandaloneUserReports => "standalone_user_reports",
429            ProcessingGroup::StandaloneProfiles => "standalone_profiles",
430            ProcessingGroup::ClientReport => "client_report",
431            ProcessingGroup::Replay => "replay",
432            ProcessingGroup::CheckIn => "check_in",
433            ProcessingGroup::Log => "log",
434            ProcessingGroup::TraceMetric => "trace_metric",
435            ProcessingGroup::Nel => "nel",
436            ProcessingGroup::Span => "span",
437            ProcessingGroup::SpanV2 => "span_v2",
438            ProcessingGroup::Metrics => "metrics",
439            ProcessingGroup::ProfileChunk => "profile_chunk",
440            ProcessingGroup::TraceAttachment => "trace_attachment",
441            ProcessingGroup::ForwardUnknown => "forward_unknown",
442            ProcessingGroup::Ungrouped => "ungrouped",
443        }
444    }
445}
446
447impl From<ProcessingGroup> for AppFeature {
448    fn from(value: ProcessingGroup) -> Self {
449        match value {
450            ProcessingGroup::Transaction => AppFeature::Transactions,
451            ProcessingGroup::Error => AppFeature::Errors,
452            ProcessingGroup::Session => AppFeature::Sessions,
453            ProcessingGroup::StandaloneAttachments => AppFeature::UnattributedEnvelope,
454            ProcessingGroup::StandaloneUserReports => AppFeature::UserReports,
455            ProcessingGroup::StandaloneProfiles => AppFeature::Profiles,
456            ProcessingGroup::ClientReport => AppFeature::ClientReports,
457            ProcessingGroup::Replay => AppFeature::Replays,
458            ProcessingGroup::CheckIn => AppFeature::CheckIns,
459            ProcessingGroup::Log => AppFeature::Logs,
460            ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
461            ProcessingGroup::Nel => AppFeature::Logs,
462            ProcessingGroup::Span => AppFeature::Spans,
463            ProcessingGroup::SpanV2 => AppFeature::Spans,
464            ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
465            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
466            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
467            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
468            ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
469        }
470    }
471}
472
473/// An error returned when handling [`ProcessEnvelope`].
474#[derive(Debug, thiserror::Error)]
475pub enum ProcessingError {
476    #[error("invalid json in event")]
477    InvalidJson(#[source] serde_json::Error),
478
479    #[error("invalid message pack event payload")]
480    InvalidMsgpack(#[from] rmp_serde::decode::Error),
481
482    #[error("invalid unreal crash report")]
483    InvalidUnrealReport(#[source] Unreal4Error),
484
485    #[error("event payload too large")]
486    PayloadTooLarge(DiscardItemType),
487
488    #[error("invalid transaction event")]
489    InvalidTransaction,
490
491    #[error("the item is not allowed/supported in this envelope")]
492    UnsupportedItem,
493
494    #[error("envelope processor failed")]
495    ProcessingFailed(#[from] ProcessingAction),
496
497    #[error("duplicate {0} in event")]
498    DuplicateItem(ItemType),
499
500    #[error("failed to extract event payload")]
501    NoEventPayload,
502
503    #[error("missing project id in DSN")]
504    MissingProjectId,
505
506    #[error("invalid security report type: {0:?}")]
507    InvalidSecurityType(Bytes),
508
509    #[error("unsupported security report type")]
510    UnsupportedSecurityType,
511
512    #[error("invalid security report")]
513    InvalidSecurityReport(#[source] serde_json::Error),
514
515    #[error("invalid nel report")]
516    InvalidNelReport(#[source] NetworkReportError),
517
518    #[error("event filtered with reason: {0:?}")]
519    EventFiltered(FilterStatKey),
520
521    #[error("could not serialize event payload")]
522    SerializeFailed(#[source] serde_json::Error),
523
524    #[cfg(feature = "processing")]
525    #[error("failed to apply quotas")]
526    QuotasFailed(#[from] RateLimitingError),
527
528    #[error("invalid processing group type")]
529    InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
530
531    #[error("nintendo switch dying message processing failed {0:?}")]
532    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
533
534    #[cfg(all(sentry, feature = "processing"))]
535    #[error("playstation dump processing failed: {0}")]
536    InvalidPlaystationDump(String),
537
538    #[error("processing group does not match specific processor")]
539    ProcessingGroupMismatch,
540    #[error("new processing pipeline failed")]
541    ProcessingFailure,
542
543    #[cfg(feature = "processing")]
544    #[error("invalid attachment reference")]
545    InvalidAttachmentRef,
546
547    #[error("could not determine processing group for envelope items")]
548    NoProcessingGroup,
549}
550
551impl ProcessingError {
552    pub fn to_outcome(&self) -> Option<Outcome> {
553        match self {
554            Self::PayloadTooLarge(payload_type) => {
555                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
556            }
557            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
558            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
559            Self::InvalidSecurityType(_) => {
560                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
561            }
562            Self::UnsupportedItem => Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)),
563            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
564            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
565            Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
566            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
567            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
568            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
569            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
570            #[cfg(all(sentry, feature = "processing"))]
571            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
572            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
573                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
574            }
575            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
576            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
577                Some(Outcome::Invalid(DiscardReason::Internal))
578            }
579            #[cfg(feature = "processing")]
580            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
581            Self::MissingProjectId => None,
582            Self::EventFiltered(key) => Some(Outcome::Filtered(key.clone())),
583            Self::InvalidProcessingGroup(_) => None,
584
585            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
586            // Outcomes are emitted in the new processing pipeline already.
587            Self::ProcessingFailure => None,
588            #[cfg(feature = "processing")]
589            Self::InvalidAttachmentRef => {
590                Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
591            }
592            Self::NoProcessingGroup => Some(Outcome::Invalid(DiscardReason::Internal)),
593        }
594    }
595
596    fn is_unexpected(&self) -> bool {
597        self.to_outcome()
598            .is_some_and(|outcome| outcome.is_unexpected())
599    }
600}
601
602impl From<Unreal4Error> for ProcessingError {
603    fn from(err: Unreal4Error) -> Self {
604        match err.kind() {
605            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
606            _ => ProcessingError::InvalidUnrealReport(err),
607        }
608    }
609}
610
611impl From<InvalidProcessingGroupType> for ProcessingError {
612    fn from(value: InvalidProcessingGroupType) -> Self {
613        Self::InvalidProcessingGroup(Box::new(value))
614    }
615}
616
617type ExtractedEvent = (Annotated<Event>, usize);
618
619/// A container for extracted metrics during processing.
620///
621/// The container enforces that the extracted metrics are correctly tagged
622/// with the dynamic sampling decision.
623#[derive(Debug)]
624pub struct ProcessingExtractedMetrics {
625    metrics: ExtractedMetrics,
626}
627
628impl ProcessingExtractedMetrics {
629    pub fn new() -> Self {
630        Self {
631            metrics: ExtractedMetrics::default(),
632        }
633    }
634
635    pub fn into_inner(self) -> ExtractedMetrics {
636        self.metrics
637    }
638
639    /// Extends the contained metrics with [`ExtractedMetrics`].
640    pub fn extend(
641        &mut self,
642        extracted: ExtractedMetrics,
643        sampling_decision: Option<SamplingDecision>,
644    ) {
645        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
646        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
647    }
648
649    /// Extends the contained project metrics.
650    pub fn extend_project_metrics<I>(
651        &mut self,
652        buckets: I,
653        sampling_decision: Option<SamplingDecision>,
654    ) where
655        I: IntoIterator<Item = Bucket>,
656    {
657        self.metrics
658            .project_metrics
659            .extend(buckets.into_iter().map(|mut bucket| {
660                bucket.metadata.extracted_from_indexed =
661                    sampling_decision == Some(SamplingDecision::Keep);
662                bucket
663            }));
664    }
665
666    /// Extends the contained sampling metrics.
667    pub fn extend_sampling_metrics<I>(
668        &mut self,
669        buckets: I,
670        sampling_decision: Option<SamplingDecision>,
671    ) where
672        I: IntoIterator<Item = Bucket>,
673    {
674        self.metrics
675            .sampling_metrics
676            .extend(buckets.into_iter().map(|mut bucket| {
677                bucket.metadata.extracted_from_indexed =
678                    sampling_decision == Some(SamplingDecision::Keep);
679                bucket
680            }));
681    }
682}
683
684fn send_metrics(
685    metrics: ExtractedMetrics,
686    project_key: ProjectKey,
687    sampling_key: Option<ProjectKey>,
688    aggregator: &Addr<Aggregator>,
689) {
690    let ExtractedMetrics {
691        project_metrics,
692        sampling_metrics,
693    } = metrics;
694
695    if !project_metrics.is_empty() {
696        aggregator.send(MergeBuckets {
697            project_key,
698            buckets: project_metrics,
699        });
700    }
701
702    if !sampling_metrics.is_empty() {
703        // If no sampling project state is available, we associate the sampling
704        // metrics with the current project.
705        //
706        // project_without_tracing         -> metrics goes to self
707        // dependent_project_with_tracing  -> metrics goes to root
708        // root_project_with_tracing       -> metrics goes to root == self
709        let sampling_project_key = sampling_key.unwrap_or(project_key);
710        aggregator.send(MergeBuckets {
711            project_key: sampling_project_key,
712            buckets: sampling_metrics,
713        });
714    }
715}
716
717/// The result of the envelope processing containing the processed envelope along with the partial
718/// result.
719#[derive(Debug)]
720#[expect(
721    clippy::large_enum_variant,
722    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
723)]
724enum ProcessingResult {
725    Envelope {
726        managed_envelope: TypedEnvelope<Processed>,
727        extracted_metrics: ProcessingExtractedMetrics,
728    },
729    Output(Output<Outputs>),
730}
731
732impl ProcessingResult {
733    /// Creates a [`ProcessingResult`] with no metrics.
734    fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
735        Self::Envelope {
736            managed_envelope,
737            extracted_metrics: ProcessingExtractedMetrics::new(),
738        }
739    }
740}
741
742/// All items which can be submitted upstream.
743#[derive(Debug)]
744#[expect(
745    clippy::large_enum_variant,
746    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
747)]
748enum Submit<'a> {
749    /// A processed envelope.
750    Envelope(TypedEnvelope<Processed>),
751    /// The output of a [`processing::Processor`].
752    Output {
753        output: Outputs,
754        ctx: processing::ForwardContext<'a>,
755    },
756}
757
758/// Applies processing to all contents of the given envelope.
759///
760/// Depending on the contents of the envelope and Relay's mode, this includes:
761///
762///  - Basic normalization and validation for all item types.
763///  - Clock drift correction if the required `sent_at` header is present.
764///  - Expansion of certain item types (e.g. unreal).
765///  - Store normalization for event payloads in processing mode.
766///  - Rate limiters and inbound filters on events in processing mode.
767#[derive(Debug)]
768pub struct ProcessEnvelope {
769    /// Envelope to process.
770    pub envelope: ManagedEnvelope,
771    /// The project info.
772    pub project_info: Arc<ProjectInfo>,
773    /// Currently active cached rate limits for this project.
774    pub rate_limits: Arc<RateLimits>,
775    /// Root sampling project info.
776    pub sampling_project_info: Option<Arc<ProjectInfo>>,
777    /// Sampling reservoir counters.
778    pub reservoir_counters: ReservoirCounters,
779}
780
781/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
782#[derive(Debug)]
783struct ProcessEnvelopeGrouped<'a> {
784    /// The group the envelope belongs to.
785    pub group: ProcessingGroup,
786    /// Envelope to process.
787    pub envelope: ManagedEnvelope,
788    /// The processing context.
789    pub ctx: processing::Context<'a>,
790}
791
792/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
793///
794/// This parses and validates the metrics:
795///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
796///    ignored independently.
797///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
798///    dropped together on parsing failure.
799///  - Other envelope items will be ignored with an error message.
800///
801/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
802/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
803#[derive(Debug)]
804pub struct ProcessMetrics {
805    /// A list of metric items.
806    pub data: MetricData,
807    /// The target project.
808    pub project_key: ProjectKey,
809    /// Whether to keep or reset the metric metadata.
810    pub source: BucketSource,
811    /// The wall clock time at which the request was received.
812    pub received_at: DateTime<Utc>,
813    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
814    /// correction.
815    pub sent_at: Option<DateTime<Utc>>,
816}
817
818/// Raw unparsed metric data.
819#[derive(Debug)]
820pub enum MetricData {
821    /// Raw data, unparsed envelope items.
822    Raw(Vec<Item>),
823    /// Already parsed buckets but unprocessed.
824    Parsed(Vec<Bucket>),
825}
826
827impl MetricData {
828    /// Consumes the metric data and parses the contained buckets.
829    ///
830    /// If the contained data is already parsed the buckets are returned unchanged.
831    /// Raw buckets are parsed and created with the passed `timestamp`.
832    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
833        let items = match self {
834            Self::Parsed(buckets) => return buckets,
835            Self::Raw(items) => items,
836        };
837
838        let mut buckets = Vec::new();
839        for item in items {
840            let payload = item.payload();
841            if item.ty() == &ItemType::Statsd {
842                for bucket_result in Bucket::parse_all(&payload, timestamp) {
843                    match bucket_result {
844                        Ok(bucket) => buckets.push(bucket),
845                        Err(error) => relay_log::debug!(
846                            error = &error as &dyn Error,
847                            "failed to parse metric bucket from statsd format",
848                        ),
849                    }
850                }
851            } else if item.ty() == &ItemType::MetricBuckets {
852                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
853                    Ok(parsed_buckets) => {
854                        // Re-use the allocation of `b` if possible.
855                        if buckets.is_empty() {
856                            buckets = parsed_buckets;
857                        } else {
858                            buckets.extend(parsed_buckets);
859                        }
860                    }
861                    Err(error) => {
862                        relay_log::debug!(
863                            error = &error as &dyn Error,
864                            "failed to parse metric bucket",
865                        );
866                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
867                    }
868                }
869            } else {
870                relay_log::error!(
871                    "invalid item of type {} passed to ProcessMetrics",
872                    item.ty()
873                );
874            }
875        }
876        buckets
877    }
878}
879
880#[derive(Debug)]
881pub struct ProcessBatchedMetrics {
882    /// Metrics payload in JSON format.
883    pub payload: Bytes,
884    /// Whether to keep or reset the metric metadata.
885    pub source: BucketSource,
886    /// The wall clock time at which the request was received.
887    pub received_at: DateTime<Utc>,
888    /// The wall clock time at which the request was received.
889    pub sent_at: Option<DateTime<Utc>>,
890}
891
892/// Source information where a metric bucket originates from.
893#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
894pub enum BucketSource {
895    /// The metric bucket originated from an internal Relay use case.
896    ///
897    /// The metric bucket originates either from within the same Relay
898    /// or was accepted coming from another Relay which is registered as
899    /// an internal Relay via Relay's configuration.
900    Internal,
901    /// The bucket source originated from an untrusted source.
902    ///
903    /// Managed Relays sending extracted metrics are considered external,
904    /// it's a project use case but it comes from an untrusted source.
905    External,
906}
907
908impl BucketSource {
909    /// Infers the bucket source from [`RequestMeta::request_trust`].
910    pub fn from_meta(meta: &RequestMeta) -> Self {
911        match meta.request_trust() {
912            RequestTrust::Trusted => Self::Internal,
913            RequestTrust::Untrusted => Self::External,
914        }
915    }
916}
917
918/// Sends a client report to the upstream.
919#[derive(Debug)]
920pub struct SubmitClientReports {
921    /// The client report to be sent.
922    pub client_reports: Vec<ClientReport>,
923    /// Scoping information for the client report.
924    pub scoping: Scoping,
925}
926
927/// CPU-intensive processing tasks for envelopes.
928#[derive(Debug)]
929pub enum EnvelopeProcessor {
930    ProcessEnvelope(Box<ProcessEnvelope>),
931    ProcessProjectMetrics(Box<ProcessMetrics>),
932    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
933    FlushBuckets(Box<FlushBuckets>),
934    SubmitClientReports(Box<SubmitClientReports>),
935}
936
937impl EnvelopeProcessor {
938    /// Returns the name of the message variant.
939    pub fn variant(&self) -> &'static str {
940        match self {
941            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
942            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
943            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
944            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
945            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
946        }
947    }
948}
949
950impl relay_system::Interface for EnvelopeProcessor {}
951
952impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
953    type Response = relay_system::NoResponse;
954
955    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
956        Self::ProcessEnvelope(Box::new(message))
957    }
958}
959
960impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
961    type Response = NoResponse;
962
963    fn from_message(message: ProcessMetrics, _: ()) -> Self {
964        Self::ProcessProjectMetrics(Box::new(message))
965    }
966}
967
968impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
969    type Response = NoResponse;
970
971    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
972        Self::ProcessBatchedMetrics(Box::new(message))
973    }
974}
975
976impl FromMessage<FlushBuckets> for EnvelopeProcessor {
977    type Response = NoResponse;
978
979    fn from_message(message: FlushBuckets, _: ()) -> Self {
980        Self::FlushBuckets(Box::new(message))
981    }
982}
983
984impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
985    type Response = NoResponse;
986
987    fn from_message(message: SubmitClientReports, _: ()) -> Self {
988        Self::SubmitClientReports(Box::new(message))
989    }
990}
991
992/// The asynchronous thread pool used for scheduling processing tasks in the processor.
993pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
994
995/// Service implementing the [`EnvelopeProcessor`] interface.
996///
997/// This service handles messages in a worker pool with configurable concurrency.
998#[derive(Clone)]
999pub struct EnvelopeProcessorService {
1000    inner: Arc<InnerProcessor>,
1001}
1002
1003/// Contains the addresses of services that the processor publishes to.
1004pub struct Addrs {
1005    pub outcome_aggregator: Addr<TrackOutcome>,
1006    pub upstream_relay: Addr<UpstreamRelay>,
1007    #[cfg(feature = "processing")]
1008    pub objectstore: Option<Addr<Objectstore>>,
1009    #[cfg(feature = "processing")]
1010    pub store_forwarder: Option<Addr<Store>>,
1011    pub aggregator: Addr<Aggregator>,
1012}
1013
1014impl Default for Addrs {
1015    fn default() -> Self {
1016        Addrs {
1017            outcome_aggregator: Addr::dummy(),
1018            upstream_relay: Addr::dummy(),
1019            #[cfg(feature = "processing")]
1020            objectstore: None,
1021            #[cfg(feature = "processing")]
1022            store_forwarder: None,
1023            aggregator: Addr::dummy(),
1024        }
1025    }
1026}
1027
1028struct InnerProcessor {
1029    pool: EnvelopeProcessorServicePool,
1030    config: Arc<Config>,
1031    global_config: GlobalConfigHandle,
1032    project_cache: ProjectCacheHandle,
1033    cogs: Cogs,
1034    addrs: Addrs,
1035    #[cfg(feature = "processing")]
1036    rate_limiter: Option<Arc<RedisRateLimiter>>,
1037    metric_outcomes: MetricOutcomes,
1038    processing: Processing,
1039}
1040
1041struct Processing {
1042    errors: ErrorsProcessor,
1043    logs: LogsProcessor,
1044    trace_metrics: TraceMetricsProcessor,
1045    spans: SpansProcessor,
1046    legacy_spans: LegacySpansProcessor,
1047    check_ins: CheckInsProcessor,
1048    sessions: SessionsProcessor,
1049    transactions: TransactionProcessor,
1050    profile_chunks: ProfileChunksProcessor,
1051    trace_attachments: TraceAttachmentsProcessor,
1052    replays: ReplaysProcessor,
1053    client_reports: ClientReportsProcessor,
1054    attachments: AttachmentProcessor,
1055    user_reports: UserReportsProcessor,
1056    profiles: ProfilesProcessor,
1057}
1058
1059impl EnvelopeProcessorService {
1060    /// Creates a multi-threaded envelope processor.
1061    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1062    pub fn new(
1063        pool: EnvelopeProcessorServicePool,
1064        config: Arc<Config>,
1065        global_config: GlobalConfigHandle,
1066        project_cache: ProjectCacheHandle,
1067        cogs: Cogs,
1068        #[cfg(feature = "processing")] redis: Option<RedisClients>,
1069        addrs: Addrs,
1070        metric_outcomes: MetricOutcomes,
1071    ) -> Self {
1072        let geoip_lookup = config
1073            .geoip_path()
1074            .and_then(
1075                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1076                    Ok(geoip) => Some(geoip),
1077                    Err(err) => {
1078                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1079                        None
1080                    }
1081                },
1082            )
1083            .unwrap_or_else(GeoIpLookup::empty);
1084
1085        #[cfg(feature = "processing")]
1086        let rate_limiter = redis.as_ref().map(|redis| {
1087            RedisRateLimiter::new(redis.quotas.clone())
1088                .max_limit(config.max_rate_limit())
1089                .cache(config.quota_cache_ratio(), config.quota_cache_max())
1090        });
1091
1092        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1093            #[cfg(feature = "processing")]
1094            project_cache.clone(),
1095            #[cfg(feature = "processing")]
1096            rate_limiter.clone(),
1097        ));
1098        #[cfg(feature = "processing")]
1099        let rate_limiter = rate_limiter.map(Arc::new);
1100        let outcome_aggregator = addrs.outcome_aggregator.clone();
1101        let inner = InnerProcessor {
1102            pool,
1103            global_config,
1104            project_cache,
1105            cogs,
1106            #[cfg(feature = "processing")]
1107            rate_limiter,
1108            addrs,
1109            metric_outcomes,
1110            processing: Processing {
1111                errors: ErrorsProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1112                logs: LogsProcessor::new(Arc::clone(&quota_limiter)),
1113                trace_metrics: TraceMetricsProcessor::new(Arc::clone(&quota_limiter)),
1114                spans: SpansProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1115                legacy_spans: LegacySpansProcessor::new(
1116                    Arc::clone(&quota_limiter),
1117                    geoip_lookup.clone(),
1118                ),
1119                check_ins: CheckInsProcessor::new(Arc::clone(&quota_limiter)),
1120                sessions: SessionsProcessor::new(Arc::clone(&quota_limiter)),
1121                transactions: TransactionProcessor::new(
1122                    Arc::clone(&quota_limiter),
1123                    geoip_lookup.clone(),
1124                    #[cfg(feature = "processing")]
1125                    redis.map(|r| r.quotas),
1126                    #[cfg(not(feature = "processing"))]
1127                    None,
1128                ),
1129                profile_chunks: ProfileChunksProcessor::new(Arc::clone(&quota_limiter)),
1130                trace_attachments: TraceAttachmentsProcessor::new(Arc::clone(&quota_limiter)),
1131                replays: ReplaysProcessor::new(Arc::clone(&quota_limiter), geoip_lookup),
1132                client_reports: ClientReportsProcessor::new(outcome_aggregator),
1133                attachments: AttachmentProcessor::new(Arc::clone(&quota_limiter)),
1134                user_reports: UserReportsProcessor::new(Arc::clone(&quota_limiter)),
1135                profiles: ProfilesProcessor::new(quota_limiter),
1136            },
1137            config,
1138        };
1139
1140        Self {
1141            inner: Arc::new(inner),
1142        }
1143    }
1144
1145    async fn process_nel(
1146        &self,
1147        mut managed_envelope: ManagedEnvelope,
1148        ctx: processing::Context<'_>,
1149    ) -> Result<ProcessingResult, ProcessingError> {
1150        nel::convert_to_logs(&mut managed_envelope);
1151        self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1152            .await
1153    }
1154
1155    async fn process_with_processor<P: processing::Processor>(
1156        &self,
1157        processor: &P,
1158        mut managed_envelope: ManagedEnvelope,
1159        ctx: processing::Context<'_>,
1160    ) -> Result<ProcessingResult, ProcessingError>
1161    where
1162        Outputs: From<P::Output>,
1163    {
1164        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1165            debug_assert!(
1166                false,
1167                "there must be work for the {} processor",
1168                std::any::type_name::<P>(),
1169            );
1170            return Err(ProcessingError::ProcessingGroupMismatch);
1171        };
1172
1173        managed_envelope.update();
1174        match managed_envelope.envelope().is_empty() {
1175            true => managed_envelope.accept(),
1176            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1177        }
1178
1179        processor
1180            .process(work, ctx)
1181            .await
1182            .map_err(|err| {
1183                relay_log::debug!(
1184                    error = &err as &dyn std::error::Error,
1185                    "processing pipeline failed"
1186                );
1187                ProcessingError::ProcessingFailure
1188            })
1189            .map(|o| o.map(Into::into))
1190            .map(ProcessingResult::Output)
1191    }
1192
1193    async fn process_envelope(
1194        &self,
1195        project_id: ProjectId,
1196        message: ProcessEnvelopeGrouped<'_>,
1197    ) -> Result<ProcessingResult, ProcessingError> {
1198        let ProcessEnvelopeGrouped {
1199            group,
1200            envelope: mut managed_envelope,
1201            ctx,
1202        } = message;
1203
1204        // Pre-process the envelope headers.
1205        if let Some(sampling_state) = ctx.sampling_project_info {
1206            // Both transactions and standalone span envelopes need a normalized DSC header
1207            // to make sampling rules based on the segment/transaction name work correctly.
1208            managed_envelope
1209                .envelope_mut()
1210                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1211        }
1212
1213        // Set the event retention. Effectively, this value will only be available in processing
1214        // mode when the full project config is queried from the upstream.
1215        if let Some(retention) = ctx.project_info.config.event_retention {
1216            managed_envelope.envelope_mut().set_retention(retention);
1217        }
1218
1219        // Set the event retention. Effectively, this value will only be available in processing
1220        // mode when the full project config is queried from the upstream.
1221        if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1222            managed_envelope
1223                .envelope_mut()
1224                .set_downsampled_retention(retention);
1225        }
1226
1227        // Ensure the project ID is updated to the stored instance for this project cache. This can
1228        // differ in two cases:
1229        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
1230        //  2. The DSN was moved and the envelope sent to the old project ID.
1231        managed_envelope
1232            .envelope_mut()
1233            .meta_mut()
1234            .set_project_id(project_id);
1235
1236        relay_log::trace!("Processing {group} group", group = group.variant());
1237
1238        match group {
1239            ProcessingGroup::Error => {
1240                self.process_with_processor(&self.inner.processing.errors, managed_envelope, ctx)
1241                    .await
1242            }
1243            ProcessingGroup::Transaction => {
1244                self.process_with_processor(
1245                    &self.inner.processing.transactions,
1246                    managed_envelope,
1247                    ctx,
1248                )
1249                .await
1250            }
1251            ProcessingGroup::Session => {
1252                self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1253                    .await
1254            }
1255            ProcessingGroup::StandaloneAttachments => {
1256                self.process_with_processor(
1257                    &self.inner.processing.attachments,
1258                    managed_envelope,
1259                    ctx,
1260                )
1261                .await
1262            }
1263            ProcessingGroup::StandaloneUserReports => {
1264                self.process_with_processor(
1265                    &self.inner.processing.user_reports,
1266                    managed_envelope,
1267                    ctx,
1268                )
1269                .await
1270            }
1271            ProcessingGroup::StandaloneProfiles => {
1272                self.process_with_processor(&self.inner.processing.profiles, managed_envelope, ctx)
1273                    .await
1274            }
1275            ProcessingGroup::ClientReport => {
1276                self.process_with_processor(
1277                    &self.inner.processing.client_reports,
1278                    managed_envelope,
1279                    ctx,
1280                )
1281                .await
1282            }
1283            ProcessingGroup::Replay => {
1284                self.process_with_processor(&self.inner.processing.replays, managed_envelope, ctx)
1285                    .await
1286            }
1287            ProcessingGroup::CheckIn => {
1288                self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1289                    .await
1290            }
1291            ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1292            ProcessingGroup::Log => {
1293                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1294                    .await
1295            }
1296            ProcessingGroup::TraceMetric => {
1297                self.process_with_processor(
1298                    &self.inner.processing.trace_metrics,
1299                    managed_envelope,
1300                    ctx,
1301                )
1302                .await
1303            }
1304            ProcessingGroup::SpanV2 => {
1305                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1306                    .await
1307            }
1308            ProcessingGroup::TraceAttachment => {
1309                self.process_with_processor(
1310                    &self.inner.processing.trace_attachments,
1311                    managed_envelope,
1312                    ctx,
1313                )
1314                .await
1315            }
1316            ProcessingGroup::Span => {
1317                self.process_with_processor(
1318                    &self.inner.processing.legacy_spans,
1319                    managed_envelope,
1320                    ctx,
1321                )
1322                .await
1323            }
1324            ProcessingGroup::ProfileChunk => {
1325                self.process_with_processor(
1326                    &self.inner.processing.profile_chunks,
1327                    managed_envelope,
1328                    ctx,
1329                )
1330                .await
1331            }
1332            // Currently is not used.
1333            ProcessingGroup::Metrics => {
1334                // In proxy mode we simply forward the metrics.
1335                // This group shouldn't be used outside of proxy mode.
1336                if self.inner.config.relay_mode() != RelayMode::Proxy {
1337                    relay_log::error!(
1338                        tags.project = %project_id,
1339                        items = ?managed_envelope.envelope().items().next().map(Item::ty),
1340                        "received metrics in the process_state"
1341                    );
1342                }
1343
1344                Ok(ProcessingResult::no_metrics(
1345                    managed_envelope.into_processed(),
1346                ))
1347            }
1348            // Fallback to the legacy process_state implementation for Ungrouped events.
1349            ProcessingGroup::Ungrouped => {
1350                relay_log::error!(
1351                    tags.project = %project_id,
1352                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
1353                    "could not identify the processing group based on the envelope's items"
1354                );
1355
1356                Err(ProcessingError::NoProcessingGroup)
1357            }
1358            // Leave this group unchanged.
1359            //
1360            // This will later be forwarded to upstream.
1361            ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
1362                managed_envelope.into_processed(),
1363            )),
1364        }
1365    }
1366
1367    /// Processes the envelope and returns the processed envelope back.
1368    ///
1369    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
1370    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
1371    /// to be dropped, this is `None`.
1372    async fn process<'a>(
1373        &self,
1374        mut message: ProcessEnvelopeGrouped<'a>,
1375    ) -> Result<Option<Submit<'a>>, ProcessingError> {
1376        let ProcessEnvelopeGrouped {
1377            ref mut envelope,
1378            ctx,
1379            ..
1380        } = message;
1381
1382        // Prefer the project's project ID, and fall back to the stated project id from the
1383        // envelope. The project ID is available in all modes, other than in proxy mode, where
1384        // envelopes for unknown projects are forwarded blindly.
1385        //
1386        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
1387        // since we cannot process an envelope without project ID, so drop it.
1388        let Some(project_id) = ctx
1389            .project_info
1390            .project_id
1391            .or_else(|| envelope.envelope().meta().project_id())
1392        else {
1393            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1394            return Err(ProcessingError::MissingProjectId);
1395        };
1396
1397        let client = envelope.envelope().meta().client().map(str::to_owned);
1398        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1399        let project_key = envelope.envelope().meta().public_key();
1400        // Only allow sending to the sampling key, if we successfully loaded a sampling project
1401        // info relating to it. This filters out unknown/invalid project keys as well as project
1402        // keys from different organizations.
1403        let sampling_key = envelope
1404            .envelope()
1405            .sampling_key()
1406            .filter(|_| ctx.sampling_project_info.is_some());
1407
1408        // We set additional information on the scope, which will be removed after processing the
1409        // envelope.
1410        relay_log::configure_scope(|scope| {
1411            scope.set_tag("project", project_id);
1412            if let Some(client) = client {
1413                scope.set_tag("sdk", client);
1414            }
1415            if let Some(user_agent) = user_agent {
1416                scope.set_extra("user_agent", user_agent.into());
1417            }
1418        });
1419
1420        let result = match self.process_envelope(project_id, message).await {
1421            Ok(ProcessingResult::Envelope {
1422                mut managed_envelope,
1423                extracted_metrics,
1424            }) => {
1425                // The envelope could be modified or even emptied during processing, which
1426                // requires re-computation of the context.
1427                managed_envelope.update();
1428
1429                let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
1430                send_metrics(
1431                    extracted_metrics.metrics,
1432                    project_key,
1433                    sampling_key,
1434                    &self.inner.addrs.aggregator,
1435                );
1436
1437                let envelope_response = if managed_envelope.envelope().is_empty() {
1438                    if !has_metrics {
1439                        // Individual rate limits have already been issued
1440                        managed_envelope.reject(Outcome::RateLimited(None));
1441                    } else {
1442                        managed_envelope.accept();
1443                    }
1444
1445                    None
1446                } else {
1447                    Some(managed_envelope)
1448                };
1449
1450                Ok(envelope_response.map(Submit::Envelope))
1451            }
1452            Ok(ProcessingResult::Output(Output { main, metrics })) => {
1453                if let Some(metrics) = metrics {
1454                    metrics.accept(|metrics| {
1455                        send_metrics(
1456                            metrics,
1457                            project_key,
1458                            sampling_key,
1459                            &self.inner.addrs.aggregator,
1460                        );
1461                    });
1462                }
1463
1464                let ctx = ctx.to_forward();
1465                Ok(main.map(|output| Submit::Output { output, ctx }))
1466            }
1467            Err(err) => Err(err),
1468        };
1469
1470        relay_log::configure_scope(|scope| {
1471            scope.remove_tag("project");
1472            scope.remove_tag("sdk");
1473            scope.remove_tag("user_agent");
1474        });
1475
1476        result
1477    }
1478
1479    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1480        let project_key = message.envelope.envelope().meta().public_key();
1481        let wait_time = message.envelope.age();
1482        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1483
1484        // This COGS handling may need an overhaul in the future:
1485        // Cancel the passed in token, to start individual measurements per envelope instead.
1486        cogs.cancel();
1487
1488        let scoping = message.envelope.scoping();
1489        for (group, envelope) in ProcessingGroup::split_envelope(
1490            *message.envelope.into_envelope(),
1491            &message.project_info,
1492        ) {
1493            let mut cogs = self
1494                .inner
1495                .cogs
1496                .timed(ResourceId::Relay, AppFeature::from(group));
1497
1498            let mut envelope =
1499                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1500            envelope.scope(scoping);
1501
1502            let global_config = self.inner.global_config.current();
1503
1504            let ctx = processing::Context {
1505                config: &self.inner.config,
1506                global_config: &global_config,
1507                project_info: &message.project_info,
1508                sampling_project_info: message.sampling_project_info.as_deref(),
1509                rate_limits: &message.rate_limits,
1510                reservoir_counters: &message.reservoir_counters,
1511            };
1512
1513            let message = ProcessEnvelopeGrouped {
1514                group,
1515                envelope,
1516                ctx,
1517            };
1518
1519            let result = metric!(
1520                timer(RelayTimers::EnvelopeProcessingTime),
1521                group = group.variant(),
1522                { self.process(message).await }
1523            );
1524
1525            match result {
1526                Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
1527                Ok(None) => {}
1528                Err(error) if error.is_unexpected() => {
1529                    relay_log::error!(
1530                        tags.project_key = %project_key,
1531                        error = &error as &dyn Error,
1532                        "error processing envelope"
1533                    )
1534                }
1535                Err(error) => {
1536                    relay_log::debug!(
1537                        tags.project_key = %project_key,
1538                        error = &error as &dyn Error,
1539                        "error processing envelope"
1540                    )
1541                }
1542            }
1543        }
1544    }
1545
1546    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1547        let ProcessMetrics {
1548            data,
1549            project_key,
1550            received_at,
1551            sent_at,
1552            source,
1553        } = message;
1554
1555        let received_timestamp =
1556            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1557
1558        let mut buckets = data.into_buckets(received_timestamp);
1559        if buckets.is_empty() {
1560            return;
1561        };
1562        cogs.update(relay_metrics::cogs::BySize(&buckets));
1563
1564        let clock_drift_processor =
1565            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
1566
1567        buckets.retain_mut(|bucket| {
1568            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
1569                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
1570                return false;
1571            }
1572
1573            if !self::metrics::is_valid_namespace(bucket) {
1574                return false;
1575            }
1576
1577            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
1578
1579            if !matches!(source, BucketSource::Internal) {
1580                bucket.metadata = BucketMetadata::new(received_timestamp);
1581            }
1582
1583            true
1584        });
1585
1586        let project = self.inner.project_cache.get(project_key);
1587
1588        // Best effort check to filter and rate limit buckets, if there is no project state
1589        // available at the current time, we will check again after flushing.
1590        let buckets = match project.state() {
1591            ProjectState::Enabled(project_info) => {
1592                let rate_limits = project.rate_limits().current_limits();
1593                self.check_buckets(project_key, project_info, &rate_limits, buckets)
1594            }
1595            _ => buckets,
1596        };
1597
1598        relay_log::trace!("merging metric buckets into the aggregator");
1599        self.inner
1600            .addrs
1601            .aggregator
1602            .send(MergeBuckets::new(project_key, buckets));
1603    }
1604
1605    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
1606        let ProcessBatchedMetrics {
1607            payload,
1608            source,
1609            received_at,
1610            sent_at,
1611        } = message;
1612
1613        #[derive(serde::Deserialize)]
1614        struct Wrapper {
1615            buckets: HashMap<ProjectKey, Vec<Bucket>>,
1616        }
1617
1618        let buckets = match serde_json::from_slice(&payload) {
1619            Ok(Wrapper { buckets }) => buckets,
1620            Err(error) => {
1621                relay_log::debug!(
1622                    error = &error as &dyn Error,
1623                    "failed to parse batched metrics",
1624                );
1625                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1626                return;
1627            }
1628        };
1629
1630        for (project_key, buckets) in buckets {
1631            self.handle_process_metrics(
1632                cogs,
1633                ProcessMetrics {
1634                    data: MetricData::Parsed(buckets),
1635                    project_key,
1636                    source,
1637                    received_at,
1638                    sent_at,
1639                },
1640            )
1641        }
1642    }
1643
1644    fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
1645        let _submit = cogs.start_category("submit");
1646
1647        #[cfg(feature = "processing")]
1648        if self.inner.config.processing_enabled()
1649            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
1650        {
1651            use crate::processing::StoreHandle;
1652
1653            let objectstore = self.inner.addrs.objectstore.as_ref();
1654            let global_config = &self.inner.global_config.current();
1655            let handle = StoreHandle::new(store_forwarder, objectstore, global_config);
1656
1657            match submit {
1658                // Once check-ins and errors are fully moved to the new pipeline, this is only
1659                // used for metrics forwarding.
1660                //
1661                // Metrics forwarding will n_never_ forward an envelope in processing, making
1662                // this branch here unused.
1663                Submit::Envelope(envelope) => handle.send_envelope(envelope.into_inner()),
1664                Submit::Output { output, ctx } => output
1665                    .forward_store(handle, ctx)
1666                    .unwrap_or_else(|err| err.into_inner()),
1667            }
1668            return;
1669        }
1670
1671        let mut envelope = match submit {
1672            Submit::Envelope(envelope) => envelope,
1673            Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
1674                Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
1675                Err(_) => {
1676                    relay_log::error!("failed to serialize output to an envelope");
1677                    return;
1678                }
1679            },
1680        };
1681
1682        if envelope.envelope_mut().is_empty() {
1683            envelope.accept();
1684            return;
1685        }
1686
1687        // Override the `sent_at` timestamp. Since the envelope went through basic
1688        // normalization, all timestamps have been corrected. We propagate the new
1689        // `sent_at` to allow the next Relay to double-check this timestamp and
1690        // potentially apply correction again. This is done as close to sending as
1691        // possible so that we avoid internal delays.
1692        envelope.envelope_mut().set_sent_at(Utc::now());
1693
1694        relay_log::trace!("sending envelope to sentry endpoint");
1695        let http_encoding = self.inner.config.http_encoding();
1696        let result = envelope.envelope().to_vec().and_then(|v| {
1697            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
1698        });
1699
1700        match result {
1701            Ok(body) => {
1702                self.inner
1703                    .addrs
1704                    .upstream_relay
1705                    .send(SendRequest(SendEnvelope {
1706                        envelope,
1707                        body,
1708                        http_encoding,
1709                        project_cache: self.inner.project_cache.clone(),
1710                    }));
1711            }
1712            Err(error) => {
1713                // Errors are only logged for what we consider an internal discard reason. These
1714                // indicate errors in the infrastructure or implementation bugs.
1715                relay_log::error!(
1716                    error = &error as &dyn Error,
1717                    tags.project_key = %envelope.scoping().project_key,
1718                    "failed to serialize envelope payload"
1719                );
1720
1721                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1722            }
1723        }
1724    }
1725
1726    fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
1727        let SubmitClientReports {
1728            client_reports,
1729            scoping,
1730        } = message;
1731
1732        let upstream = self.inner.config.upstream();
1733        let dsn = PartialDsn::outbound(&scoping, upstream);
1734
1735        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
1736        for client_report in client_reports {
1737            match client_report.serialize() {
1738                Ok(payload) => {
1739                    let mut item = Item::new(ItemType::ClientReport);
1740                    item.set_payload(ContentType::Json, payload);
1741                    envelope.add_item(item);
1742                }
1743                Err(error) => {
1744                    relay_log::error!(
1745                        error = &error as &dyn std::error::Error,
1746                        "failed to serialize client report"
1747                    );
1748                }
1749            }
1750        }
1751
1752        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1753        self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
1754    }
1755
1756    fn check_buckets(
1757        &self,
1758        project_key: ProjectKey,
1759        project_info: &ProjectInfo,
1760        rate_limits: &RateLimits,
1761        buckets: Vec<Bucket>,
1762    ) -> Vec<Bucket> {
1763        let Some(scoping) = project_info.scoping(project_key) else {
1764            relay_log::error!(
1765                tags.project_key = project_key.as_str(),
1766                "there is no scoping: dropping {} buckets",
1767                buckets.len(),
1768            );
1769            return Vec::new();
1770        };
1771
1772        let mut buckets = self::metrics::apply_project_info(
1773            buckets,
1774            &self.inner.metric_outcomes,
1775            project_info,
1776            scoping,
1777        );
1778
1779        let namespaces: BTreeSet<MetricNamespace> = buckets
1780            .iter()
1781            .filter_map(|bucket| bucket.name.try_namespace())
1782            .collect();
1783
1784        for namespace in namespaces {
1785            let limits = rate_limits.check_with_quotas(
1786                project_info.get_quotas(),
1787                scoping.item(DataCategory::MetricBucket),
1788            );
1789
1790            if limits.is_limited() {
1791                let rejected;
1792                (buckets, rejected) = utils::split_off(buckets, |bucket| {
1793                    bucket.name.try_namespace() == Some(namespace)
1794                });
1795
1796                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1797                self.inner.metric_outcomes.track(
1798                    scoping,
1799                    &rejected,
1800                    Outcome::RateLimited(reason_code),
1801                );
1802            }
1803        }
1804
1805        let quotas = project_info.config.quotas.clone();
1806        match MetricsLimiter::create(buckets, quotas, scoping) {
1807            Ok(mut bucket_limiter) => {
1808                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
1809                bucket_limiter.into_buckets()
1810            }
1811            Err(buckets) => buckets,
1812        }
1813    }
1814
1815    #[cfg(feature = "processing")]
1816    async fn rate_limit_buckets(
1817        &self,
1818        scoping: Scoping,
1819        project_info: &ProjectInfo,
1820        mut buckets: Vec<Bucket>,
1821    ) -> Vec<Bucket> {
1822        let Some(rate_limiter) = &self.inner.rate_limiter else {
1823            return buckets;
1824        };
1825
1826        let global_config = self.inner.global_config.current();
1827        let namespaces = buckets
1828            .iter()
1829            .filter_map(|bucket| bucket.name.try_namespace())
1830            .counts();
1831
1832        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
1833
1834        for (namespace, quantity) in namespaces {
1835            let item_scoping = scoping.metric_bucket(namespace);
1836
1837            let limits = match rate_limiter
1838                .is_rate_limited(quotas, item_scoping, quantity, false)
1839                .await
1840            {
1841                Ok(limits) => limits,
1842                Err(err) => {
1843                    relay_log::error!(
1844                        error = &err as &dyn std::error::Error,
1845                        "failed to check redis rate limits"
1846                    );
1847                    break;
1848                }
1849            };
1850
1851            if limits.is_limited() {
1852                let rejected;
1853                (buckets, rejected) = utils::split_off(buckets, |bucket| {
1854                    bucket.name.try_namespace() == Some(namespace)
1855                });
1856
1857                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1858                self.inner.metric_outcomes.track(
1859                    scoping,
1860                    &rejected,
1861                    Outcome::RateLimited(reason_code),
1862                );
1863
1864                self.inner
1865                    .project_cache
1866                    .get(item_scoping.scoping.project_key)
1867                    .rate_limits()
1868                    .merge(limits);
1869            }
1870        }
1871
1872        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
1873            Err(buckets) => buckets,
1874            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
1875        }
1876    }
1877
1878    /// Check and apply rate limits to metrics buckets for transactions and spans.
1879    #[cfg(feature = "processing")]
1880    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
1881        relay_log::trace!("handle_rate_limit_buckets");
1882
1883        let scoping = *bucket_limiter.scoping();
1884
1885        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
1886            let global_config = self.inner.global_config.current();
1887            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
1888
1889            // We set over_accept_once such that the limit is actually reached, which allows subsequent
1890            // calls with quantity=0 to be rate limited.
1891            let over_accept_once = true;
1892            let mut rate_limits = RateLimits::new();
1893
1894            let (category, count) = bucket_limiter.count();
1895
1896            let timer = Instant::now();
1897            let mut is_limited = false;
1898
1899            if let Some(count) = count {
1900                match rate_limiter
1901                    .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
1902                    .await
1903                {
1904                    Ok(limits) => {
1905                        is_limited = limits.is_limited();
1906                        rate_limits.merge(limits)
1907                    }
1908                    Err(e) => {
1909                        relay_log::error!(error = &e as &dyn Error, "rate limiting error")
1910                    }
1911                }
1912            }
1913
1914            relay_statsd::metric!(
1915                timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
1916                category = category.name(),
1917                limited = if is_limited { "true" } else { "false" },
1918                count = match count {
1919                    None => "none",
1920                    Some(0) => "0",
1921                    Some(1) => "1",
1922                    Some(1..=10) => "10",
1923                    Some(1..=25) => "25",
1924                    Some(1..=50) => "50",
1925                    Some(51..=100) => "100",
1926                    Some(101..=500) => "500",
1927                    _ => "> 500",
1928                },
1929            );
1930
1931            if rate_limits.is_limited() {
1932                let was_enforced =
1933                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
1934
1935                if was_enforced {
1936                    // Update the rate limits in the project cache.
1937                    self.inner
1938                        .project_cache
1939                        .get(scoping.project_key)
1940                        .rate_limits()
1941                        .merge(rate_limits);
1942                }
1943            }
1944        }
1945
1946        bucket_limiter.into_buckets()
1947    }
1948
1949    /// Processes metric buckets and sends them to Kafka.
1950    ///
1951    /// This function runs the following steps:
1952    ///  - rate limiting
1953    ///  - submit to `StoreForwarder`
1954    #[cfg(feature = "processing")]
1955    async fn encode_metrics_processing(
1956        &self,
1957        message: FlushBuckets,
1958        store_forwarder: &Addr<Store>,
1959    ) {
1960        use crate::constants::DEFAULT_EVENT_RETENTION;
1961        use crate::services::store::StoreMetrics;
1962
1963        for ProjectBuckets {
1964            buckets,
1965            scoping,
1966            project_info,
1967            ..
1968        } in message.buckets.into_values()
1969        {
1970            let buckets = self
1971                .rate_limit_buckets(scoping, &project_info, buckets)
1972                .await;
1973
1974            if buckets.is_empty() {
1975                continue;
1976            }
1977
1978            let retention = project_info
1979                .config
1980                .event_retention
1981                .unwrap_or(DEFAULT_EVENT_RETENTION);
1982
1983            // The store forwarder takes care of bucket splitting internally, so we can submit the
1984            // entire list of buckets. There is no batching needed here.
1985            store_forwarder.send(StoreMetrics {
1986                buckets,
1987                scoping,
1988                retention,
1989            });
1990        }
1991    }
1992
1993    /// Serializes metric buckets to JSON and sends them to the upstream.
1994    ///
1995    /// This function runs the following steps:
1996    ///  - partitioning
1997    ///  - batching by configured size limit
1998    ///  - serialize to JSON and pack in an envelope
1999    ///  - submit the envelope to upstream or kafka depending on configuration
2000    ///
2001    /// Rate limiting runs only in processing Relays as it requires access to the central Redis instance.
2002    /// Cached rate limits are applied in the project cache already.
2003    fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2004        let FlushBuckets {
2005            partition_key,
2006            buckets,
2007        } = message;
2008
2009        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2010        let upstream = self.inner.config.upstream();
2011
2012        for ProjectBuckets {
2013            buckets, scoping, ..
2014        } in buckets.values()
2015        {
2016            let dsn = PartialDsn::outbound(scoping, upstream);
2017
2018            relay_statsd::metric!(
2019                distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2020            );
2021
2022            let mut num_batches = 0;
2023            for batch in BucketsView::from(buckets).by_size(batch_size) {
2024                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2025
2026                let mut item = Item::new(ItemType::MetricBuckets);
2027                item.set_source_quantities(crate::metrics::extract_quantities(batch));
2028                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2029                envelope.add_item(item);
2030
2031                let mut envelope =
2032                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2033                envelope
2034                    .set_partition_key(Some(partition_key))
2035                    .scope(*scoping);
2036
2037                relay_statsd::metric!(
2038                    distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2039                );
2040
2041                self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2042                num_batches += 1;
2043            }
2044
2045            relay_statsd::metric!(
2046                distribution(RelayDistributions::BatchesPerPartition) = num_batches
2047            );
2048        }
2049    }
2050
2051    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
2052    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2053        if partition.is_empty() {
2054            return;
2055        }
2056
2057        let (unencoded, project_info) = partition.take();
2058        let http_encoding = self.inner.config.http_encoding();
2059        let encoded = match encode_payload(&unencoded, http_encoding) {
2060            Ok(payload) => payload,
2061            Err(error) => {
2062                let error = &error as &dyn std::error::Error;
2063                relay_log::error!(error, "failed to encode metrics payload");
2064                return;
2065            }
2066        };
2067
2068        let request = SendMetricsRequest {
2069            partition_key: partition_key.to_string(),
2070            unencoded,
2071            encoded,
2072            project_info,
2073            http_encoding,
2074            metric_outcomes: self.inner.metric_outcomes.clone(),
2075        };
2076
2077        self.inner.addrs.upstream_relay.send(SendRequest(request));
2078    }
2079
2080    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
2081    ///
2082    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
2083    /// payload directly instead of per-project Envelopes.
2084    ///
2085    /// This function runs the following steps:
2086    ///  - partitioning
2087    ///  - batching by configured size limit
2088    ///  - serialize to JSON
2089    ///  - submit directly to the upstream
2090    fn encode_metrics_global(&self, message: FlushBuckets) {
2091        let FlushBuckets {
2092            partition_key,
2093            buckets,
2094        } = message;
2095
2096        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2097        let mut partition = Partition::new(batch_size);
2098        let mut partition_splits = 0;
2099
2100        for ProjectBuckets {
2101            buckets, scoping, ..
2102        } in buckets.values()
2103        {
2104            for bucket in buckets {
2105                let mut remaining = Some(BucketView::new(bucket));
2106
2107                while let Some(bucket) = remaining.take() {
2108                    if let Some(next) = partition.insert(bucket, *scoping) {
2109                        // A part of the bucket could not be inserted. Take the partition and submit
2110                        // it immediately. Repeat until the final part was inserted. This should
2111                        // always result in a request, otherwise we would enter an endless loop.
2112                        self.send_global_partition(partition_key, &mut partition);
2113                        remaining = Some(next);
2114                        partition_splits += 1;
2115                    }
2116                }
2117            }
2118        }
2119
2120        if partition_splits > 0 {
2121            metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2122        }
2123
2124        self.send_global_partition(partition_key, &mut partition);
2125    }
2126
2127    async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2128        for (project_key, pb) in message.buckets.iter_mut() {
2129            let buckets = std::mem::take(&mut pb.buckets);
2130            pb.buckets =
2131                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2132        }
2133
2134        #[cfg(feature = "processing")]
2135        if self.inner.config.processing_enabled()
2136            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2137        {
2138            return self
2139                .encode_metrics_processing(message, store_forwarder)
2140                .await;
2141        }
2142
2143        if self.inner.config.http_global_metrics() {
2144            self.encode_metrics_global(message)
2145        } else {
2146            self.encode_metrics_envelope(cogs, message)
2147        }
2148    }
2149
2150    #[cfg(all(test, feature = "processing"))]
2151    fn redis_rate_limiter_enabled(&self) -> bool {
2152        self.inner.rate_limiter.is_some()
2153    }
2154
2155    async fn handle_message(&self, message: EnvelopeProcessor) {
2156        let ty = message.variant();
2157        let feature_weights = self.feature_weights(&message);
2158
2159        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2160            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2161
2162            match message {
2163                EnvelopeProcessor::ProcessEnvelope(m) => {
2164                    self.handle_process_envelope(&mut cogs, *m).await
2165                }
2166                EnvelopeProcessor::ProcessProjectMetrics(m) => {
2167                    self.handle_process_metrics(&mut cogs, *m)
2168                }
2169                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2170                    self.handle_process_batched_metrics(&mut cogs, *m)
2171                }
2172                EnvelopeProcessor::FlushBuckets(m) => {
2173                    self.handle_flush_buckets(&mut cogs, *m).await
2174                }
2175                EnvelopeProcessor::SubmitClientReports(m) => {
2176                    self.handle_submit_client_reports(&mut cogs, *m)
2177                }
2178            }
2179        });
2180    }
2181
2182    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2183        match message {
2184            // Envelope is split later and tokens are attributed then.
2185            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2186            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2187            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2188            EnvelopeProcessor::FlushBuckets(v) => v
2189                .buckets
2190                .values()
2191                .map(|s| {
2192                    if self.inner.config.processing_enabled() {
2193                        // Processing does not encode the metrics but instead rate limit the metrics,
2194                        // which scales by count and not size.
2195                        relay_metrics::cogs::ByCount(&s.buckets).into()
2196                    } else {
2197                        relay_metrics::cogs::BySize(&s.buckets).into()
2198                    }
2199                })
2200                .fold(FeatureWeights::none(), FeatureWeights::merge),
2201            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2202        }
2203    }
2204}
2205
2206impl Service for EnvelopeProcessorService {
2207    type Interface = EnvelopeProcessor;
2208
2209    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2210        while let Some(message) = rx.recv().await {
2211            let service = self.clone();
2212            self.inner
2213                .pool
2214                .spawn_async(
2215                    async move {
2216                        service.handle_message(message).await;
2217                    }
2218                    .boxed(),
2219                )
2220                .await;
2221        }
2222    }
2223}
2224
2225pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2226    let envelope_body: Vec<u8> = match http_encoding {
2227        HttpEncoding::Identity => return Ok(body.clone()),
2228        HttpEncoding::Deflate => {
2229            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2230            encoder.write_all(body.as_ref())?;
2231            encoder.finish()?
2232        }
2233        HttpEncoding::Gzip => {
2234            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2235            encoder.write_all(body.as_ref())?;
2236            encoder.finish()?
2237        }
2238        HttpEncoding::Br => {
2239            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
2240            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2241            encoder.write_all(body.as_ref())?;
2242            encoder.into_inner()
2243        }
2244        HttpEncoding::Zstd => {
2245            // Use the fastest compression level, our main objective here is to get the best
2246            // compression ratio for least amount of time spent.
2247            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2248            encoder.write_all(body.as_ref())?;
2249            encoder.finish()?
2250        }
2251    };
2252
2253    Ok(envelope_body.into())
2254}
2255
2256/// An upstream request that submits an envelope via HTTP.
2257#[derive(Debug)]
2258pub struct SendEnvelope {
2259    pub envelope: TypedEnvelope<Processed>,
2260    pub body: Bytes,
2261    pub http_encoding: HttpEncoding,
2262    pub project_cache: ProjectCacheHandle,
2263}
2264
2265impl UpstreamRequest for SendEnvelope {
2266    fn method(&self) -> reqwest::Method {
2267        reqwest::Method::POST
2268    }
2269
2270    fn path(&self) -> Cow<'_, str> {
2271        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2272    }
2273
2274    fn route(&self) -> &'static str {
2275        "envelope"
2276    }
2277
2278    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2279        let envelope_body = self.body.clone();
2280        metric!(
2281            distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2282        );
2283
2284        let meta = &self.envelope.meta();
2285        let shard = self.envelope.partition_key().map(|p| p.to_string());
2286        builder
2287            .content_encoding(self.http_encoding)
2288            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2289            .header_opt("User-Agent", meta.user_agent())
2290            .header("X-Sentry-Auth", meta.auth_header())
2291            .header("X-Forwarded-For", meta.forwarded_for())
2292            .header("Content-Type", envelope::CONTENT_TYPE)
2293            .header_opt("X-Sentry-Relay-Shard", shard)
2294            .body(envelope_body);
2295
2296        Ok(())
2297    }
2298
2299    fn sign(&mut self) -> Option<Sign> {
2300        Some(Sign::Optional(SignatureType::RequestSign))
2301    }
2302
2303    fn respond(
2304        self: Box<Self>,
2305        result: Result<http::Response, UpstreamRequestError>,
2306    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2307        Box::pin(async move {
2308            let result = match result {
2309                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2310                Err(error) => Err(error),
2311            };
2312
2313            match result {
2314                Ok(()) => self.envelope.accept(),
2315                Err(error) if error.is_received() => {
2316                    let scoping = self.envelope.scoping();
2317                    self.envelope.accept();
2318
2319                    if let UpstreamRequestError::RateLimited(limits) = error {
2320                        self.project_cache
2321                            .get(scoping.project_key)
2322                            .rate_limits()
2323                            .merge(limits.scope(&scoping));
2324                    }
2325                }
2326                Err(error) => {
2327                    // Errors are only logged for what we consider an internal discard reason. These
2328                    // indicate errors in the infrastructure or implementation bugs.
2329                    let mut envelope = self.envelope;
2330                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2331                    relay_log::error!(
2332                        error = &error as &dyn Error,
2333                        tags.project_key = %envelope.scoping().project_key,
2334                        "error sending envelope"
2335                    );
2336                }
2337            }
2338        })
2339    }
2340}
2341
2342/// A container for metric buckets from multiple projects.
2343///
2344/// This container is used to send metrics to the upstream in global batches as part of the
2345/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
2346/// the size of all metrics and allows to split them into multiple batches. See
2347/// [`insert`](Self::insert) for more information.
2348#[derive(Debug)]
2349struct Partition<'a> {
2350    max_size: usize,
2351    remaining: usize,
2352    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2353    project_info: HashMap<ProjectKey, Scoping>,
2354}
2355
2356impl<'a> Partition<'a> {
2357    /// Creates a new partition with the given maximum size in bytes.
2358    pub fn new(size: usize) -> Self {
2359        Self {
2360            max_size: size,
2361            remaining: size,
2362            views: HashMap::new(),
2363            project_info: HashMap::new(),
2364        }
2365    }
2366
2367    /// Inserts a bucket into the partition, splitting it if necessary.
2368    ///
2369    /// This function attempts to add the bucket to this partition. If the bucket does not fit
2370    /// entirely into the partition given its maximum size, the remaining part of the bucket is
2371    /// returned from this function call.
2372    ///
2373    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
2374    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
2375    /// partition. Afterwards, the caller is responsible to call this function again with the
2376    /// remaining bucket until it is fully inserted.
2377    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
2378        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
2379
2380        if let Some(current) = current {
2381            self.remaining = self.remaining.saturating_sub(current.estimated_size());
2382            self.views
2383                .entry(scoping.project_key)
2384                .or_default()
2385                .push(current);
2386
2387            self.project_info
2388                .entry(scoping.project_key)
2389                .or_insert(scoping);
2390        }
2391
2392        next
2393    }
2394
2395    /// Returns `true` if the partition does not hold any data.
2396    fn is_empty(&self) -> bool {
2397        self.views.is_empty()
2398    }
2399
2400    /// Returns the serialized buckets for this partition.
2401    ///
2402    /// This empties the partition, so that it can be reused.
2403    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
2404        #[derive(serde::Serialize)]
2405        struct Wrapper<'a> {
2406            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
2407        }
2408
2409        let buckets = &self.views;
2410        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
2411
2412        let scopings = self.project_info.clone();
2413        self.project_info.clear();
2414
2415        self.views.clear();
2416        self.remaining = self.max_size;
2417
2418        (payload, scopings)
2419    }
2420}
2421
2422/// An upstream request that submits metric buckets via HTTP.
2423///
2424/// This request is not awaited. It automatically tracks outcomes if the request is not received.
2425#[derive(Debug)]
2426struct SendMetricsRequest {
2427    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
2428    partition_key: String,
2429    /// Serialized metric buckets without encoding applied, used for signing.
2430    unencoded: Bytes,
2431    /// Serialized metric buckets with the stated HTTP encoding applied.
2432    encoded: Bytes,
2433    /// Mapping of all contained project keys to their scoping and extraction mode.
2434    ///
2435    /// Used to track outcomes for transmission failures.
2436    project_info: HashMap<ProjectKey, Scoping>,
2437    /// Encoding (compression) of the payload.
2438    http_encoding: HttpEncoding,
2439    /// Metric outcomes instance to send outcomes on error.
2440    metric_outcomes: MetricOutcomes,
2441}
2442
2443impl SendMetricsRequest {
2444    fn create_error_outcomes(self) {
2445        #[derive(serde::Deserialize)]
2446        struct Wrapper {
2447            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
2448        }
2449
2450        let buckets = match serde_json::from_slice(&self.unencoded) {
2451            Ok(Wrapper { buckets }) => buckets,
2452            Err(err) => {
2453                relay_log::error!(
2454                    error = &err as &dyn std::error::Error,
2455                    "failed to parse buckets from failed transmission"
2456                );
2457                return;
2458            }
2459        };
2460
2461        for (key, buckets) in buckets {
2462            let Some(&scoping) = self.project_info.get(&key) else {
2463                relay_log::error!("missing scoping for project key");
2464                continue;
2465            };
2466
2467            self.metric_outcomes.track(
2468                scoping,
2469                &buckets,
2470                Outcome::Invalid(DiscardReason::Internal),
2471            );
2472        }
2473    }
2474}
2475
2476impl UpstreamRequest for SendMetricsRequest {
2477    fn set_relay_id(&self) -> bool {
2478        true
2479    }
2480
2481    fn sign(&mut self) -> Option<Sign> {
2482        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
2483    }
2484
2485    fn method(&self) -> reqwest::Method {
2486        reqwest::Method::POST
2487    }
2488
2489    fn path(&self) -> Cow<'_, str> {
2490        "/api/0/relays/metrics/".into()
2491    }
2492
2493    fn route(&self) -> &'static str {
2494        "global_metrics"
2495    }
2496
2497    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2498        metric!(
2499            distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
2500        );
2501
2502        builder
2503            .content_encoding(self.http_encoding)
2504            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
2505            .header(header::CONTENT_TYPE, b"application/json")
2506            .body(self.encoded.clone());
2507
2508        Ok(())
2509    }
2510
2511    fn respond(
2512        self: Box<Self>,
2513        result: Result<http::Response, UpstreamRequestError>,
2514    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2515        Box::pin(async {
2516            match result {
2517                Ok(mut response) => {
2518                    response.consume().await.ok();
2519                }
2520                Err(error) => {
2521                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
2522
2523                    // If the request did not arrive at the upstream, we are responsible for outcomes.
2524                    // Otherwise, the upstream is responsible to log outcomes.
2525                    if error.is_received() {
2526                        return;
2527                    }
2528
2529                    self.create_error_outcomes()
2530                }
2531            }
2532        })
2533    }
2534}
2535
2536/// Container for global and project level [`Quota`].
2537#[derive(Copy, Clone, Debug)]
2538#[cfg(feature = "processing")]
2539struct CombinedQuotas<'a> {
2540    global_quotas: &'a [Quota],
2541    project_quotas: &'a [Quota],
2542}
2543
2544#[cfg(feature = "processing")]
2545impl<'a> CombinedQuotas<'a> {
2546    /// Returns a new [`CombinedQuotas`].
2547    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
2548        Self {
2549            global_quotas: &global_config.quotas,
2550            project_quotas,
2551        }
2552    }
2553}
2554
2555#[cfg(feature = "processing")]
2556impl<'a> IntoIterator for CombinedQuotas<'a> {
2557    type Item = &'a Quota;
2558    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
2559
2560    fn into_iter(self) -> Self::IntoIter {
2561        self.global_quotas.iter().chain(self.project_quotas.iter())
2562    }
2563}
2564
2565#[cfg(test)]
2566mod tests {
2567    use insta::assert_debug_snapshot;
2568    use relay_common::glob2::LazyGlob;
2569    use relay_dynamic_config::ProjectConfig;
2570    use relay_event_normalization::{
2571        NormalizationConfig, RedactionRule, TransactionNameConfig, TransactionNameRule,
2572    };
2573    use relay_event_schema::protocol::TransactionSource;
2574    use relay_pii::DataScrubbingConfig;
2575    use similar_asserts::assert_eq;
2576
2577    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
2578
2579    #[cfg(feature = "processing")]
2580    use {
2581        relay_metrics::BucketValue,
2582        relay_quotas::{QuotaScope, ReasonCode},
2583        relay_test::mock_service,
2584    };
2585
2586    use super::*;
2587
2588    #[cfg(feature = "processing")]
2589    fn mock_quota(id: &str) -> Quota {
2590        Quota {
2591            id: Some(id.into()),
2592            categories: [DataCategory::MetricBucket].into(),
2593            scope: QuotaScope::Organization,
2594            scope_id: None,
2595            limit: Some(0),
2596            window: None,
2597            reason_code: None,
2598            namespace: None,
2599        }
2600    }
2601
2602    #[cfg(feature = "processing")]
2603    #[test]
2604    fn test_dynamic_quotas() {
2605        let global_config = relay_dynamic_config::GlobalConfig {
2606            quotas: vec![mock_quota("foo"), mock_quota("bar")],
2607            ..Default::default()
2608        };
2609
2610        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
2611
2612        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
2613
2614        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
2615        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
2616    }
2617
2618    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
2619    /// also ratelimit the next batches in the same message automatically.
2620    #[cfg(feature = "processing")]
2621    #[tokio::test]
2622    async fn test_ratelimit_per_batch() {
2623        use relay_base_schema::organization::OrganizationId;
2624        use relay_protocol::FiniteF64;
2625
2626        let rate_limited_org = Scoping {
2627            organization_id: OrganizationId::new(1),
2628            project_id: ProjectId::new(21),
2629            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
2630            key_id: Some(17),
2631        };
2632
2633        let not_rate_limited_org = Scoping {
2634            organization_id: OrganizationId::new(2),
2635            project_id: ProjectId::new(21),
2636            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
2637            key_id: Some(17),
2638        };
2639
2640        let message = {
2641            let project_info = {
2642                let quota = Quota {
2643                    id: Some("testing".into()),
2644                    categories: [DataCategory::MetricBucket].into(),
2645                    scope: relay_quotas::QuotaScope::Organization,
2646                    scope_id: Some(rate_limited_org.organization_id.to_string().into()),
2647                    limit: Some(0),
2648                    window: None,
2649                    reason_code: Some(ReasonCode::new("test")),
2650                    namespace: None,
2651                };
2652
2653                let mut config = ProjectConfig::default();
2654                config.quotas.push(quota);
2655
2656                Arc::new(ProjectInfo {
2657                    config,
2658                    ..Default::default()
2659                })
2660            };
2661
2662            let project_metrics = |scoping| ProjectBuckets {
2663                buckets: vec![Bucket {
2664                    name: "d:spans/bar".into(),
2665                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
2666                    timestamp: UnixTimestamp::now(),
2667                    tags: Default::default(),
2668                    width: 10,
2669                    metadata: BucketMetadata::default(),
2670                }],
2671                rate_limits: Default::default(),
2672                project_info: project_info.clone(),
2673                scoping,
2674            };
2675
2676            let buckets = hashbrown::HashMap::from([
2677                (
2678                    rate_limited_org.project_key,
2679                    project_metrics(rate_limited_org),
2680                ),
2681                (
2682                    not_rate_limited_org.project_key,
2683                    project_metrics(not_rate_limited_org),
2684                ),
2685            ]);
2686
2687            FlushBuckets {
2688                partition_key: 0,
2689                buckets,
2690            }
2691        };
2692
2693        // ensure the order of the map while iterating is as expected.
2694        assert_eq!(message.buckets.keys().count(), 2);
2695
2696        let config = {
2697            let config_json = serde_json::json!({
2698                "processing": {
2699                    "enabled": true,
2700                    "kafka_config": [],
2701                    "redis": {
2702                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
2703                    }
2704                }
2705            });
2706            Config::from_json_value(config_json).unwrap()
2707        };
2708
2709        let (store, handle) = {
2710            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
2711                let org_id = match msg {
2712                    Store::Metrics(x) => x.scoping.organization_id,
2713                    _ => panic!("received envelope when expecting only metrics"),
2714                };
2715                org_ids.push(org_id);
2716            };
2717
2718            mock_service("store_forwarder", vec![], f)
2719        };
2720
2721        let processor = create_test_processor(config).await;
2722        assert!(processor.redis_rate_limiter_enabled());
2723
2724        processor.encode_metrics_processing(message, &store).await;
2725
2726        drop(store);
2727        let orgs_not_ratelimited = handle.await.unwrap();
2728
2729        assert_eq!(
2730            orgs_not_ratelimited,
2731            vec![not_rate_limited_org.organization_id]
2732        );
2733    }
2734
2735    #[tokio::test]
2736    async fn test_browser_version_extraction_with_pii_like_data() {
2737        let processor = create_test_processor(Default::default()).await;
2738        let outcome_aggregator = Addr::dummy();
2739        let event_id = EventId::new();
2740
2741        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2742            .parse()
2743            .unwrap();
2744
2745        let request_meta = RequestMeta::new(dsn);
2746        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
2747
2748        envelope.add_item({
2749                let mut item = Item::new(ItemType::Event);
2750                item.set_payload(
2751                    ContentType::Json,
2752                    r#"
2753                    {
2754                        "request": {
2755                            "headers": [
2756                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
2757                            ]
2758                        }
2759                    }
2760                "#,
2761                );
2762                item
2763            });
2764
2765        let mut datascrubbing_settings = DataScrubbingConfig::default();
2766        // enable all the default scrubbing
2767        datascrubbing_settings.scrub_data = true;
2768        datascrubbing_settings.scrub_defaults = true;
2769        datascrubbing_settings.scrub_ip_addresses = true;
2770
2771        // Make sure to mask any IP-like looking data
2772        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
2773
2774        let config = ProjectConfig {
2775            datascrubbing_settings,
2776            pii_config: Some(pii_config),
2777            ..Default::default()
2778        };
2779
2780        let project_info = ProjectInfo {
2781            config,
2782            ..Default::default()
2783        };
2784
2785        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
2786        assert_eq!(envelopes.len(), 1);
2787
2788        let (group, envelope) = envelopes.pop().unwrap();
2789        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2790
2791        let message = ProcessEnvelopeGrouped {
2792            group,
2793            envelope,
2794            ctx: processing::Context {
2795                project_info: &project_info,
2796                ..processing::Context::for_test()
2797            },
2798        };
2799
2800        let Ok(Some(Submit::Output { output, ctx })) = processor.process(message).await else {
2801            panic!();
2802        };
2803        let new_envelope = output.serialize_envelope(ctx).unwrap().accept(|f| f);
2804
2805        let event_item = new_envelope.items().last().unwrap();
2806        let annotated_event: Annotated<Event> =
2807            Annotated::from_json_bytes(&event_item.payload()).unwrap();
2808        let event = annotated_event.into_value().unwrap();
2809        let headers = event
2810            .request
2811            .into_value()
2812            .unwrap()
2813            .headers
2814            .into_value()
2815            .unwrap();
2816
2817        // IP-like data must be masked
2818        assert_eq!(
2819            Some(
2820                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
2821            ),
2822            headers.get_header("User-Agent")
2823        );
2824        // But we still get correct browser and version number
2825        let contexts = event.contexts.into_value().unwrap();
2826        let browser = contexts.0.get("browser").unwrap();
2827        assert_eq!(
2828            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
2829            browser.to_json().unwrap()
2830        );
2831    }
2832
2833    #[tokio::test]
2834    #[cfg(feature = "processing")]
2835    async fn test_materialize_dsc() {
2836        use crate::services::projects::project::PublicKeyConfig;
2837
2838        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2839            .parse()
2840            .unwrap();
2841        let request_meta = RequestMeta::new(dsn);
2842        let mut envelope = Envelope::from_request(None, request_meta);
2843
2844        let dsc = r#"{
2845            "trace_id": "00000000-0000-0000-0000-000000000000",
2846            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
2847            "sample_rate": "0.2"
2848        }"#;
2849        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
2850
2851        let mut item = Item::new(ItemType::Event);
2852        item.set_payload(ContentType::Json, r#"{}"#);
2853        envelope.add_item(item);
2854
2855        let outcome_aggregator = Addr::dummy();
2856        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2857
2858        let mut project_info = ProjectInfo::default();
2859        project_info.public_keys.push(PublicKeyConfig {
2860            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
2861            numeric_id: Some(1),
2862        });
2863
2864        let config = serde_json::json!({
2865            "processing": {
2866                "enabled": true,
2867                "kafka_config": [],
2868            }
2869        });
2870
2871        let message = ProcessEnvelopeGrouped {
2872            group: ProcessingGroup::Error,
2873            envelope: managed_envelope,
2874            ctx: processing::Context {
2875                config: &Config::from_json_value(config.clone()).unwrap(),
2876                project_info: &project_info,
2877                sampling_project_info: Some(&project_info),
2878                ..processing::Context::for_test()
2879            },
2880        };
2881
2882        let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
2883        let Ok(Some(Submit::Output { output, ctx })) = processor.process(message).await else {
2884            panic!();
2885        };
2886        let envelope = output.serialize_envelope(ctx).unwrap();
2887        let event = envelope
2888            .get_item_by(|item| item.ty() == &ItemType::Event)
2889            .unwrap();
2890
2891        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
2892        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
2893        Object(
2894            {
2895                "environment": ~,
2896                "public_key": String(
2897                    "e12d836b15bb49d7bbf99e64295d995b",
2898                ),
2899                "release": ~,
2900                "replay_id": ~,
2901                "sample_rate": String(
2902                    "0.2",
2903                ),
2904                "trace_id": String(
2905                    "00000000000000000000000000000000",
2906                ),
2907                "transaction": ~,
2908            },
2909        )
2910        "###);
2911    }
2912
2913    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
2914        let mut event = Annotated::<Event>::from_json(
2915            r#"
2916            {
2917                "type": "transaction",
2918                "transaction": "/foo/",
2919                "timestamp": 946684810.0,
2920                "start_timestamp": 946684800.0,
2921                "contexts": {
2922                    "trace": {
2923                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
2924                        "span_id": "fa90fdead5f74053",
2925                        "op": "http.server",
2926                        "type": "trace"
2927                    }
2928                },
2929                "transaction_info": {
2930                    "source": "url"
2931                }
2932            }
2933            "#,
2934        )
2935        .unwrap();
2936        let e = event.value_mut().as_mut().unwrap();
2937        e.transaction.set_value(Some(transaction_name.into()));
2938
2939        e.transaction_info
2940            .value_mut()
2941            .as_mut()
2942            .unwrap()
2943            .source
2944            .set_value(Some(source));
2945
2946        relay_statsd::with_capturing_test_client(|| {
2947            utils::log_transaction_name_metrics(&mut event, |event| {
2948                let config = NormalizationConfig {
2949                    transaction_name_config: TransactionNameConfig {
2950                        rules: &[TransactionNameRule {
2951                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
2952                            expiry: DateTime::<Utc>::MAX_UTC,
2953                            redaction: RedactionRule::Replace {
2954                                substitution: "*".to_owned(),
2955                            },
2956                        }],
2957                    },
2958                    ..Default::default()
2959                };
2960                relay_event_normalization::normalize_event(event, &config)
2961            });
2962        })
2963    }
2964
2965    #[test]
2966    fn test_log_transaction_metrics_none() {
2967        let captures = capture_test_event("/nothing", TransactionSource::Url);
2968        insta::assert_debug_snapshot!(captures, @r###"
2969        [
2970            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
2971        ]
2972        "###);
2973    }
2974
2975    #[test]
2976    fn test_log_transaction_metrics_rule() {
2977        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
2978        insta::assert_debug_snapshot!(captures, @r###"
2979        [
2980            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
2981        ]
2982        "###);
2983    }
2984
2985    #[test]
2986    fn test_log_transaction_metrics_pattern() {
2987        let captures = capture_test_event("/something/12345", TransactionSource::Url);
2988        insta::assert_debug_snapshot!(captures, @r###"
2989        [
2990            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
2991        ]
2992        "###);
2993    }
2994
2995    #[test]
2996    fn test_log_transaction_metrics_both() {
2997        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
2998        insta::assert_debug_snapshot!(captures, @r###"
2999        [
3000            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3001        ]
3002        "###);
3003    }
3004
3005    #[test]
3006    fn test_log_transaction_metrics_no_match() {
3007        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3008        insta::assert_debug_snapshot!(captures, @r###"
3009        [
3010            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3011        ]
3012        "###);
3013    }
3014
3015    #[tokio::test]
3016    async fn test_process_metrics_bucket_metadata() {
3017        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3018        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3019        let received_at = Utc::now();
3020        let config = Config::default();
3021
3022        let (aggregator, mut aggregator_rx) = Addr::custom();
3023        let processor = create_test_processor_with_addrs(
3024            config,
3025            Addrs {
3026                aggregator,
3027                ..Default::default()
3028            },
3029        )
3030        .await;
3031
3032        let mut item = Item::new(ItemType::Statsd);
3033        item.set_payload(ContentType::Text, "spans/foo:3182887624:4267882815|s");
3034        for (source, expected_received_at) in [
3035            (
3036                BucketSource::External,
3037                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3038            ),
3039            (BucketSource::Internal, None),
3040        ] {
3041            let message = ProcessMetrics {
3042                data: MetricData::Raw(vec![item.clone()]),
3043                project_key,
3044                source,
3045                received_at,
3046                sent_at: Some(Utc::now()),
3047            };
3048            processor.handle_process_metrics(&mut token, message);
3049
3050            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3051            let buckets = merge_buckets.buckets;
3052            assert_eq!(buckets.len(), 1);
3053            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3054        }
3055    }
3056
3057    #[tokio::test]
3058    async fn test_process_batched_metrics() {
3059        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3060        let received_at = Utc::now();
3061        let config = Config::default();
3062
3063        let (aggregator, mut aggregator_rx) = Addr::custom();
3064        let processor = create_test_processor_with_addrs(
3065            config,
3066            Addrs {
3067                aggregator,
3068                ..Default::default()
3069            },
3070        )
3071        .await;
3072
3073        let payload = r#"{
3074    "buckets": {
3075        "11111111111111111111111111111111": [
3076            {
3077                "timestamp": 1615889440,
3078                "width": 0,
3079                "name": "d:custom/endpoint.response_time@millisecond",
3080                "type": "d",
3081                "value": [
3082                  68.0
3083                ],
3084                "tags": {
3085                  "route": "user_index"
3086                }
3087            }
3088        ],
3089        "22222222222222222222222222222222": [
3090            {
3091                "timestamp": 1615889440,
3092                "width": 0,
3093                "name": "d:custom/endpoint.cache_rate@none",
3094                "type": "d",
3095                "value": [
3096                  36.0
3097                ]
3098            }
3099        ]
3100    }
3101}
3102"#;
3103        let message = ProcessBatchedMetrics {
3104            payload: Bytes::from(payload),
3105            source: BucketSource::Internal,
3106            received_at,
3107            sent_at: Some(Utc::now()),
3108        };
3109        processor.handle_process_batched_metrics(&mut token, message);
3110
3111        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3112        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3113
3114        let mut messages = vec![mb1, mb2];
3115        messages.sort_by_key(|mb| mb.project_key);
3116
3117        let actual = messages
3118            .into_iter()
3119            .map(|mb| (mb.project_key, mb.buckets))
3120            .collect::<Vec<_>>();
3121
3122        assert_debug_snapshot!(actual, @r###"
3123        [
3124            (
3125                ProjectKey("11111111111111111111111111111111"),
3126                [
3127                    Bucket {
3128                        timestamp: UnixTimestamp(1615889440),
3129                        width: 0,
3130                        name: MetricName(
3131                            "d:custom/endpoint.response_time@millisecond",
3132                        ),
3133                        value: Distribution(
3134                            [
3135                                68.0,
3136                            ],
3137                        ),
3138                        tags: {
3139                            "route": "user_index",
3140                        },
3141                        metadata: BucketMetadata {
3142                            merges: 1,
3143                            received_at: None,
3144                            extracted_from_indexed: false,
3145                        },
3146                    },
3147                ],
3148            ),
3149            (
3150                ProjectKey("22222222222222222222222222222222"),
3151                [
3152                    Bucket {
3153                        timestamp: UnixTimestamp(1615889440),
3154                        width: 0,
3155                        name: MetricName(
3156                            "d:custom/endpoint.cache_rate@none",
3157                        ),
3158                        value: Distribution(
3159                            [
3160                                36.0,
3161                            ],
3162                        ),
3163                        tags: {},
3164                        metadata: BucketMetadata {
3165                            merges: 1,
3166                            received_at: None,
3167                            extracted_from_indexed: false,
3168                        },
3169                    },
3170                ],
3171            ),
3172        ]
3173        "###);
3174    }
3175}