relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{ClientReport, Event, EventId, NetworkReportError, SpanV2};
27use relay_filter::FilterStatKey;
28use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
29use relay_protocol::Annotated;
30use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
31use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision};
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, NoResponse, Service};
34use reqwest::header;
35use smallvec::{SmallVec, smallvec};
36use zstd::stream::Encoder as ZstdEncoder;
37
38use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType};
39use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
40use crate::integrations::Integration;
41use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
42use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
43use crate::metrics_extraction::ExtractedMetrics;
44use crate::processing::attachments::AttachmentProcessor;
45use crate::processing::check_ins::CheckInsProcessor;
46use crate::processing::client_reports::ClientReportsProcessor;
47use crate::processing::errors::{ErrorsProcessor, SwitchProcessingError};
48use crate::processing::logs::LogsProcessor;
49use crate::processing::profile_chunks::ProfileChunksProcessor;
50use crate::processing::profiles::ProfilesProcessor;
51use crate::processing::replays::ReplaysProcessor;
52use crate::processing::sessions::SessionsProcessor;
53use crate::processing::spans::SpansProcessor;
54use crate::processing::trace_attachments::TraceAttachmentsProcessor;
55use crate::processing::trace_metrics::TraceMetricsProcessor;
56use crate::processing::transactions::TransactionProcessor;
57use crate::processing::user_reports::UserReportsProcessor;
58use crate::processing::utils::event::event_category;
59use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
60use crate::service::ServiceError;
61use crate::services::global_config::GlobalConfigHandle;
62use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
63use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
64use crate::services::projects::cache::ProjectCacheHandle;
65use crate::services::projects::project::{ProjectInfo, ProjectState};
66use crate::services::upstream::{
67    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
68};
69use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
70use crate::utils::{self, CheckLimits, EnvelopeLimiter};
71use crate::{http, processing};
72use relay_threading::AsyncPool;
73#[cfg(feature = "processing")]
74use {
75    crate::services::objectstore::Objectstore,
76    crate::services::store::Store,
77    crate::utils::Enforcement,
78    itertools::Itertools,
79    relay_quotas::{RateLimitingError, RedisRateLimiter},
80    relay_redis::RedisClients,
81    std::time::Instant,
82    symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
83};
84
85pub mod event;
86mod metrics;
87mod nel;
88#[cfg(feature = "processing")]
89mod span;
90
91#[cfg(all(sentry, feature = "processing"))]
92pub mod playstation;
93
94/// Creates the block only if used with `processing` feature.
95///
96/// Provided code block will be executed only if the provided config has `processing_enabled` set.
97macro_rules! if_processing {
98    ($config:expr, $if_true:block) => {
99        #[cfg(feature = "processing")] {
100            if $config.processing_enabled() $if_true
101        }
102    };
103    ($config:expr, $if_true:block else $if_false:block) => {
104        {
105            #[cfg(feature = "processing")] {
106                if $config.processing_enabled() $if_true else $if_false
107            }
108            #[cfg(not(feature = "processing"))] {
109                $if_false
110            }
111        }
112    };
113}
114
115/// The minimum clock drift for correction to apply.
116pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
117
118#[derive(Debug)]
119pub struct GroupTypeError;
120
121impl Display for GroupTypeError {
122    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123        f.write_str("failed to convert processing group into corresponding type")
124    }
125}
126
127impl std::error::Error for GroupTypeError {}
128
129macro_rules! processing_group {
130    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
131        #[derive(Clone, Copy, Debug)]
132        pub struct $ty;
133
134        impl From<$ty> for ProcessingGroup {
135            fn from(_: $ty) -> Self {
136                ProcessingGroup::$variant
137            }
138        }
139
140        impl TryFrom<ProcessingGroup> for $ty {
141            type Error = GroupTypeError;
142
143            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
144                if matches!(value, ProcessingGroup::$variant) {
145                    return Ok($ty);
146                }
147                $($(
148                    if matches!(value, ProcessingGroup::$other) {
149                        return Ok($ty);
150                    }
151                )+)?
152                return Err(GroupTypeError);
153            }
154        }
155    };
156}
157
158processing_group!(TransactionGroup, Transaction);
159
160processing_group!(ErrorGroup, Error);
161
162processing_group!(SessionGroup, Session);
163processing_group!(ClientReportGroup, ClientReport);
164processing_group!(ReplayGroup, Replay);
165processing_group!(CheckInGroup, CheckIn);
166processing_group!(LogGroup, Log, Nel);
167processing_group!(TraceMetricGroup, TraceMetric);
168processing_group!(SpanGroup, Span);
169
170processing_group!(ProfileChunkGroup, ProfileChunk);
171processing_group!(MetricsGroup, Metrics);
172processing_group!(ForwardUnknownGroup, ForwardUnknown);
173processing_group!(Ungrouped, Ungrouped);
174
175/// Processed group type marker.
176///
177/// Marks the envelopes which passed through the processing pipeline.
178#[derive(Clone, Copy, Debug)]
179pub struct Processed;
180
181/// Describes the groups of the processable items.
182#[derive(Clone, Copy, Debug)]
183pub enum ProcessingGroup {
184    /// All the transaction related items.
185    ///
186    /// Includes transactions, related attachments, profiles.
187    Transaction,
188    /// All the items which require (have or create) events.
189    ///
190    /// This includes: errors, NEL, security reports, user reports, some of the
191    /// attachments.
192    Error,
193    /// Session events.
194    Session,
195    /// Standalone attachments
196    ///
197    /// Attachments that are send without an item that creates an event in the same envelope.
198    StandaloneAttachments,
199    /// Standalone user reports
200    ///
201    /// User reports that are send without an item that creates an event in the same envelope.
202    StandaloneUserReports,
203    /// Standalone profiles
204    ///
205    /// Profiles which had their transaction sampled.
206    StandaloneProfiles,
207    /// Outcomes.
208    ClientReport,
209    /// Replays and ReplayRecordings.
210    Replay,
211    /// Crons.
212    CheckIn,
213    /// NEL reports.
214    Nel,
215    /// Logs.
216    Log,
217    /// Trace metrics.
218    TraceMetric,
219    /// Spans.
220    Span,
221    /// Span V2 spans.
222    SpanV2,
223    /// Metrics.
224    Metrics,
225    /// ProfileChunk.
226    ProfileChunk,
227    /// V2 attachments without span / log association.
228    TraceAttachment,
229    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
230    /// decide what to do with them.
231    ForwardUnknown,
232    /// All the items in the envelope that could not be grouped.
233    Ungrouped,
234}
235
236impl ProcessingGroup {
237    /// Splits provided envelope into list of tuples of groups with associated envelopes.
238    fn split_envelope(
239        mut envelope: Envelope,
240        project_info: &ProjectInfo,
241    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
242        let headers = envelope.headers().clone();
243        let mut grouped_envelopes = smallvec![];
244
245        // Extract replays.
246        let replay_items = envelope.take_items_by(|item| {
247            matches!(
248                item.ty(),
249                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
250            )
251        });
252        if !replay_items.is_empty() {
253            grouped_envelopes.push((
254                ProcessingGroup::Replay,
255                Envelope::from_parts(headers.clone(), replay_items),
256            ))
257        }
258
259        // Keep all the sessions together in one envelope.
260        let session_items = envelope
261            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
262        if !session_items.is_empty() {
263            grouped_envelopes.push((
264                ProcessingGroup::Session,
265                Envelope::from_parts(headers.clone(), session_items),
266            ))
267        }
268
269        let span_v2_items = envelope.take_items_by(|item| {
270            let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
271
272            ItemContainer::<SpanV2>::is_container(item)
273                || matches!(item.integration(), Some(Integration::Spans(_)))
274                // Process standalone spans (v1) via the v2 pipeline
275                || (exp_feature && matches!(item.ty(), &ItemType::Span))
276                || (exp_feature && item.is_span_attachment())
277        });
278
279        if !span_v2_items.is_empty() {
280            grouped_envelopes.push((
281                ProcessingGroup::SpanV2,
282                Envelope::from_parts(headers.clone(), span_v2_items),
283            ))
284        }
285
286        // Extract spans.
287        let span_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Span));
288        if !span_items.is_empty() {
289            grouped_envelopes.push((
290                ProcessingGroup::Span,
291                Envelope::from_parts(headers.clone(), span_items),
292            ))
293        }
294
295        // Extract logs.
296        let logs_items = envelope.take_items_by(|item| {
297            matches!(item.ty(), &ItemType::Log)
298                || matches!(item.integration(), Some(Integration::Logs(_)))
299        });
300        if !logs_items.is_empty() {
301            grouped_envelopes.push((
302                ProcessingGroup::Log,
303                Envelope::from_parts(headers.clone(), logs_items),
304            ))
305        }
306
307        // Extract trace metrics.
308        let trace_metric_items =
309            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
310        if !trace_metric_items.is_empty() {
311            grouped_envelopes.push((
312                ProcessingGroup::TraceMetric,
313                Envelope::from_parts(headers.clone(), trace_metric_items),
314            ))
315        }
316
317        // NEL items are transformed into logs in their own processing step.
318        let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
319        if !nel_items.is_empty() {
320            grouped_envelopes.push((
321                ProcessingGroup::Nel,
322                Envelope::from_parts(headers.clone(), nel_items),
323            ))
324        }
325
326        // Extract all metric items.
327        //
328        // Note: Should only be relevant in proxy mode. In other modes we send metrics through
329        // a separate pipeline.
330        let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
331        if !metric_items.is_empty() {
332            grouped_envelopes.push((
333                ProcessingGroup::Metrics,
334                Envelope::from_parts(headers.clone(), metric_items),
335            ))
336        }
337
338        // Extract profile chunks.
339        let profile_chunk_items =
340            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
341        if !profile_chunk_items.is_empty() {
342            grouped_envelopes.push((
343                ProcessingGroup::ProfileChunk,
344                Envelope::from_parts(headers.clone(), profile_chunk_items),
345            ))
346        }
347
348        let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
349        if !trace_attachment_items.is_empty() {
350            grouped_envelopes.push((
351                ProcessingGroup::TraceAttachment,
352                Envelope::from_parts(headers.clone(), trace_attachment_items),
353            ))
354        }
355
356        if !envelope.items().any(Item::creates_event) {
357            // Extract the standalone attachments
358            let standalone_attachments = envelope
359                .take_items_by(|i| i.requires_event() && matches!(i.ty(), ItemType::Attachment));
360            if !standalone_attachments.is_empty() {
361                grouped_envelopes.push((
362                    ProcessingGroup::StandaloneAttachments,
363                    Envelope::from_parts(headers.clone(), standalone_attachments),
364                ))
365            }
366
367            // Extract the standalone user reports
368            let standalone_user_reports =
369                envelope.take_items_by(|i| matches!(i.ty(), ItemType::UserReport));
370            if !standalone_user_reports.is_empty() {
371                grouped_envelopes.push((
372                    ProcessingGroup::StandaloneUserReports,
373                    Envelope::from_parts(headers.clone(), standalone_user_reports),
374                ));
375            }
376
377            // Extract the standalone profiles
378            let standalone_profiles =
379                envelope.take_items_by(|i| matches!(i.ty(), ItemType::Profile));
380            if !standalone_profiles.is_empty() {
381                grouped_envelopes.push((
382                    ProcessingGroup::StandaloneProfiles,
383                    Envelope::from_parts(headers.clone(), standalone_profiles),
384                ));
385            }
386        }
387
388        // Make sure we create separate envelopes for each `RawSecurity` report.
389        let security_reports_items = envelope
390            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
391            .into_iter()
392            .map(|item| {
393                let headers = headers.clone();
394                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
395                let mut envelope = Envelope::from_parts(headers, items);
396                envelope.set_event_id(EventId::new());
397                (ProcessingGroup::Error, envelope)
398            });
399        grouped_envelopes.extend(security_reports_items);
400
401        // Extract all the items which require an event into separate envelope.
402        let require_event_items = envelope.take_items_by(Item::requires_event);
403        if !require_event_items.is_empty() {
404            let group = if require_event_items
405                .iter()
406                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
407            {
408                ProcessingGroup::Transaction
409            } else {
410                ProcessingGroup::Error
411            };
412
413            grouped_envelopes.push((
414                group,
415                Envelope::from_parts(headers.clone(), require_event_items),
416            ))
417        }
418
419        // Get the rest of the envelopes, one per item.
420        let envelopes = envelope.items_mut().map(|item| {
421            let headers = headers.clone();
422            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
423            let envelope = Envelope::from_parts(headers, items);
424            let item_type = item.ty();
425            let group = if matches!(item_type, &ItemType::CheckIn) {
426                ProcessingGroup::CheckIn
427            } else if matches!(item.ty(), &ItemType::ClientReport) {
428                ProcessingGroup::ClientReport
429            } else if matches!(item_type, &ItemType::Unknown(_)) {
430                ProcessingGroup::ForwardUnknown
431            } else {
432                // Cannot group this item type.
433                ProcessingGroup::Ungrouped
434            };
435
436            (group, envelope)
437        });
438        grouped_envelopes.extend(envelopes);
439
440        grouped_envelopes
441    }
442
443    /// Returns the name of the group.
444    pub fn variant(&self) -> &'static str {
445        match self {
446            ProcessingGroup::Transaction => "transaction",
447            ProcessingGroup::Error => "error",
448            ProcessingGroup::Session => "session",
449            ProcessingGroup::StandaloneAttachments => "standalone_attachment",
450            ProcessingGroup::StandaloneUserReports => "standalone_user_reports",
451            ProcessingGroup::StandaloneProfiles => "standalone_profiles",
452            ProcessingGroup::ClientReport => "client_report",
453            ProcessingGroup::Replay => "replay",
454            ProcessingGroup::CheckIn => "check_in",
455            ProcessingGroup::Log => "log",
456            ProcessingGroup::TraceMetric => "trace_metric",
457            ProcessingGroup::Nel => "nel",
458            ProcessingGroup::Span => "span",
459            ProcessingGroup::SpanV2 => "span_v2",
460            ProcessingGroup::Metrics => "metrics",
461            ProcessingGroup::ProfileChunk => "profile_chunk",
462            ProcessingGroup::TraceAttachment => "trace_attachment",
463            ProcessingGroup::ForwardUnknown => "forward_unknown",
464            ProcessingGroup::Ungrouped => "ungrouped",
465        }
466    }
467}
468
469impl From<ProcessingGroup> for AppFeature {
470    fn from(value: ProcessingGroup) -> Self {
471        match value {
472            ProcessingGroup::Transaction => AppFeature::Transactions,
473            ProcessingGroup::Error => AppFeature::Errors,
474            ProcessingGroup::Session => AppFeature::Sessions,
475            ProcessingGroup::StandaloneAttachments => AppFeature::UnattributedEnvelope,
476            ProcessingGroup::StandaloneUserReports => AppFeature::UserReports,
477            ProcessingGroup::StandaloneProfiles => AppFeature::Profiles,
478            ProcessingGroup::ClientReport => AppFeature::ClientReports,
479            ProcessingGroup::Replay => AppFeature::Replays,
480            ProcessingGroup::CheckIn => AppFeature::CheckIns,
481            ProcessingGroup::Log => AppFeature::Logs,
482            ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
483            ProcessingGroup::Nel => AppFeature::Logs,
484            ProcessingGroup::Span => AppFeature::Spans,
485            ProcessingGroup::SpanV2 => AppFeature::Spans,
486            ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
487            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
488            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
489            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
490            ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
491        }
492    }
493}
494
495/// An error returned when handling [`ProcessEnvelope`].
496#[derive(Debug, thiserror::Error)]
497pub enum ProcessingError {
498    #[error("invalid json in event")]
499    InvalidJson(#[source] serde_json::Error),
500
501    #[error("invalid message pack event payload")]
502    InvalidMsgpack(#[from] rmp_serde::decode::Error),
503
504    #[cfg(feature = "processing")]
505    #[error("invalid unreal crash report")]
506    InvalidUnrealReport(#[source] Unreal4Error),
507
508    #[error("event payload too large")]
509    PayloadTooLarge(DiscardItemType),
510
511    #[error("invalid transaction event")]
512    InvalidTransaction,
513
514    #[error("the item is not allowed/supported in this envelope")]
515    UnsupportedItem,
516
517    #[error("envelope processor failed")]
518    ProcessingFailed(#[from] ProcessingAction),
519
520    #[error("duplicate {0} in event")]
521    DuplicateItem(ItemType),
522
523    #[error("failed to extract event payload")]
524    NoEventPayload,
525
526    #[error("missing project id in DSN")]
527    MissingProjectId,
528
529    #[error("invalid security report type: {0:?}")]
530    InvalidSecurityType(Bytes),
531
532    #[error("unsupported security report type")]
533    UnsupportedSecurityType,
534
535    #[error("invalid security report")]
536    InvalidSecurityReport(#[source] serde_json::Error),
537
538    #[error("invalid nel report")]
539    InvalidNelReport(#[source] NetworkReportError),
540
541    #[error("event filtered with reason: {0:?}")]
542    EventFiltered(FilterStatKey),
543
544    #[error("could not serialize event payload")]
545    SerializeFailed(#[source] serde_json::Error),
546
547    #[cfg(feature = "processing")]
548    #[error("failed to apply quotas")]
549    QuotasFailed(#[from] RateLimitingError),
550
551    #[error("invalid processing group type")]
552    InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
553
554    #[error("nintendo switch dying message processing failed {0:?}")]
555    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
556
557    #[cfg(all(sentry, feature = "processing"))]
558    #[error("playstation dump processing failed: {0}")]
559    InvalidPlaystationDump(String),
560
561    #[error("processing group does not match specific processor")]
562    ProcessingGroupMismatch,
563    #[error("new processing pipeline failed")]
564    ProcessingFailure,
565
566    #[cfg(feature = "processing")]
567    #[error("invalid attachment reference")]
568    InvalidAttachmentRef,
569
570    #[error("could not determine processing group for envelope items")]
571    NoProcessingGroup,
572}
573
574impl ProcessingError {
575    pub fn to_outcome(&self) -> Option<Outcome> {
576        match self {
577            Self::PayloadTooLarge(payload_type) => {
578                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
579            }
580            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
581            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
582            Self::InvalidSecurityType(_) => {
583                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
584            }
585            Self::UnsupportedItem => Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)),
586            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
587            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
588            Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
589            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
590            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
591            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
592            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
593            #[cfg(all(sentry, feature = "processing"))]
594            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
595            #[cfg(feature = "processing")]
596            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
597                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
598            }
599            #[cfg(feature = "processing")]
600            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
601            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
602                Some(Outcome::Invalid(DiscardReason::Internal))
603            }
604            #[cfg(feature = "processing")]
605            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
606            Self::MissingProjectId => None,
607            Self::EventFiltered(key) => Some(Outcome::Filtered(key.clone())),
608            Self::InvalidProcessingGroup(_) => None,
609
610            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
611            // Outcomes are emitted in the new processing pipeline already.
612            Self::ProcessingFailure => None,
613            #[cfg(feature = "processing")]
614            Self::InvalidAttachmentRef => {
615                Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
616            }
617            Self::NoProcessingGroup => Some(Outcome::Invalid(DiscardReason::Internal)),
618        }
619    }
620
621    fn is_unexpected(&self) -> bool {
622        self.to_outcome()
623            .is_some_and(|outcome| outcome.is_unexpected())
624    }
625}
626
627#[cfg(feature = "processing")]
628impl From<Unreal4Error> for ProcessingError {
629    fn from(err: Unreal4Error) -> Self {
630        match err.kind() {
631            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
632            _ => ProcessingError::InvalidUnrealReport(err),
633        }
634    }
635}
636
637impl From<InvalidProcessingGroupType> for ProcessingError {
638    fn from(value: InvalidProcessingGroupType) -> Self {
639        Self::InvalidProcessingGroup(Box::new(value))
640    }
641}
642
643type ExtractedEvent = (Annotated<Event>, usize);
644
645/// A container for extracted metrics during processing.
646///
647/// The container enforces that the extracted metrics are correctly tagged
648/// with the dynamic sampling decision.
649#[derive(Debug)]
650pub struct ProcessingExtractedMetrics {
651    metrics: ExtractedMetrics,
652}
653
654impl ProcessingExtractedMetrics {
655    pub fn new() -> Self {
656        Self {
657            metrics: ExtractedMetrics::default(),
658        }
659    }
660
661    pub fn into_inner(self) -> ExtractedMetrics {
662        self.metrics
663    }
664
665    /// Extends the contained metrics with [`ExtractedMetrics`].
666    pub fn extend(
667        &mut self,
668        extracted: ExtractedMetrics,
669        sampling_decision: Option<SamplingDecision>,
670    ) {
671        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
672        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
673    }
674
675    /// Extends the contained project metrics.
676    pub fn extend_project_metrics<I>(
677        &mut self,
678        buckets: I,
679        sampling_decision: Option<SamplingDecision>,
680    ) where
681        I: IntoIterator<Item = Bucket>,
682    {
683        self.metrics
684            .project_metrics
685            .extend(buckets.into_iter().map(|mut bucket| {
686                bucket.metadata.extracted_from_indexed =
687                    sampling_decision == Some(SamplingDecision::Keep);
688                bucket
689            }));
690    }
691
692    /// Extends the contained sampling metrics.
693    pub fn extend_sampling_metrics<I>(
694        &mut self,
695        buckets: I,
696        sampling_decision: Option<SamplingDecision>,
697    ) where
698        I: IntoIterator<Item = Bucket>,
699    {
700        self.metrics
701            .sampling_metrics
702            .extend(buckets.into_iter().map(|mut bucket| {
703                bucket.metadata.extracted_from_indexed =
704                    sampling_decision == Some(SamplingDecision::Keep);
705                bucket
706            }));
707    }
708
709    /// Applies rate limits to the contained metrics.
710    ///
711    /// This is used to apply rate limits which have been enforced on sampled items of an envelope
712    /// to also consistently apply to the metrics extracted from these items.
713    #[cfg(feature = "processing")]
714    fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
715        // Metric namespaces which need to be dropped.
716        let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
717        // Metrics belonging to this metric namespace need to have the `extracted_from_indexed`
718        // flag reset to `false`.
719        let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
720
721        for (namespace, limit, indexed) in [(
722            MetricNamespace::Spans,
723            &enforcement.spans,
724            &enforcement.spans_indexed,
725        )] {
726            if limit.is_active() {
727                drop_namespaces.push(namespace);
728            } else if indexed.is_active() && !enforced_consistently {
729                // If the enforcement was not computed by consistently checking the limits,
730                // the quota for the metrics has not yet been incremented.
731                // In this case we have a dropped indexed payload but a metric which still needs to
732                // be accounted for, make sure the metric will still be rate limited.
733                reset_extracted_from_indexed.push(namespace);
734            }
735        }
736
737        if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
738            self.retain_mut(|bucket| {
739                let Some(namespace) = bucket.name.try_namespace() else {
740                    return true;
741                };
742
743                if drop_namespaces.contains(&namespace) {
744                    return false;
745                }
746
747                if reset_extracted_from_indexed.contains(&namespace) {
748                    bucket.metadata.extracted_from_indexed = false;
749                }
750
751                true
752            });
753        }
754    }
755
756    #[cfg(feature = "processing")]
757    fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
758        self.metrics.project_metrics.retain_mut(&mut f);
759        self.metrics.sampling_metrics.retain_mut(&mut f);
760    }
761}
762
763fn send_metrics(
764    metrics: ExtractedMetrics,
765    project_key: ProjectKey,
766    sampling_key: Option<ProjectKey>,
767    aggregator: &Addr<Aggregator>,
768) {
769    let ExtractedMetrics {
770        project_metrics,
771        sampling_metrics,
772    } = metrics;
773
774    if !project_metrics.is_empty() {
775        aggregator.send(MergeBuckets {
776            project_key,
777            buckets: project_metrics,
778        });
779    }
780
781    if !sampling_metrics.is_empty() {
782        // If no sampling project state is available, we associate the sampling
783        // metrics with the current project.
784        //
785        // project_without_tracing         -> metrics goes to self
786        // dependent_project_with_tracing  -> metrics goes to root
787        // root_project_with_tracing       -> metrics goes to root == self
788        let sampling_project_key = sampling_key.unwrap_or(project_key);
789        aggregator.send(MergeBuckets {
790            project_key: sampling_project_key,
791            buckets: sampling_metrics,
792        });
793    }
794}
795
796/// The result of the envelope processing containing the processed envelope along with the partial
797/// result.
798#[derive(Debug)]
799#[expect(
800    clippy::large_enum_variant,
801    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
802)]
803enum ProcessingResult {
804    Envelope {
805        managed_envelope: TypedEnvelope<Processed>,
806        extracted_metrics: ProcessingExtractedMetrics,
807    },
808    Output(Output<Outputs>),
809}
810
811impl ProcessingResult {
812    /// Creates a [`ProcessingResult`] with no metrics.
813    fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
814        Self::Envelope {
815            managed_envelope,
816            extracted_metrics: ProcessingExtractedMetrics::new(),
817        }
818    }
819}
820
821/// All items which can be submitted upstream.
822#[derive(Debug)]
823#[expect(
824    clippy::large_enum_variant,
825    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
826)]
827enum Submit<'a> {
828    /// A processed envelope.
829    Envelope(TypedEnvelope<Processed>),
830    /// The output of a [`processing::Processor`].
831    Output {
832        output: Outputs,
833        ctx: processing::ForwardContext<'a>,
834    },
835}
836
837/// Applies processing to all contents of the given envelope.
838///
839/// Depending on the contents of the envelope and Relay's mode, this includes:
840///
841///  - Basic normalization and validation for all item types.
842///  - Clock drift correction if the required `sent_at` header is present.
843///  - Expansion of certain item types (e.g. unreal).
844///  - Store normalization for event payloads in processing mode.
845///  - Rate limiters and inbound filters on events in processing mode.
846#[derive(Debug)]
847pub struct ProcessEnvelope {
848    /// Envelope to process.
849    pub envelope: ManagedEnvelope,
850    /// The project info.
851    pub project_info: Arc<ProjectInfo>,
852    /// Currently active cached rate limits for this project.
853    pub rate_limits: Arc<RateLimits>,
854    /// Root sampling project info.
855    pub sampling_project_info: Option<Arc<ProjectInfo>>,
856    /// Sampling reservoir counters.
857    pub reservoir_counters: ReservoirCounters,
858}
859
860/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
861#[derive(Debug)]
862struct ProcessEnvelopeGrouped<'a> {
863    /// The group the envelope belongs to.
864    pub group: ProcessingGroup,
865    /// Envelope to process.
866    pub envelope: ManagedEnvelope,
867    /// The processing context.
868    pub ctx: processing::Context<'a>,
869}
870
871/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
872///
873/// This parses and validates the metrics:
874///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
875///    ignored independently.
876///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
877///    dropped together on parsing failure.
878///  - Other envelope items will be ignored with an error message.
879///
880/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
881/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
882#[derive(Debug)]
883pub struct ProcessMetrics {
884    /// A list of metric items.
885    pub data: MetricData,
886    /// The target project.
887    pub project_key: ProjectKey,
888    /// Whether to keep or reset the metric metadata.
889    pub source: BucketSource,
890    /// The wall clock time at which the request was received.
891    pub received_at: DateTime<Utc>,
892    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
893    /// correction.
894    pub sent_at: Option<DateTime<Utc>>,
895}
896
897/// Raw unparsed metric data.
898#[derive(Debug)]
899pub enum MetricData {
900    /// Raw data, unparsed envelope items.
901    Raw(Vec<Item>),
902    /// Already parsed buckets but unprocessed.
903    Parsed(Vec<Bucket>),
904}
905
906impl MetricData {
907    /// Consumes the metric data and parses the contained buckets.
908    ///
909    /// If the contained data is already parsed the buckets are returned unchanged.
910    /// Raw buckets are parsed and created with the passed `timestamp`.
911    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
912        let items = match self {
913            Self::Parsed(buckets) => return buckets,
914            Self::Raw(items) => items,
915        };
916
917        let mut buckets = Vec::new();
918        for item in items {
919            let payload = item.payload();
920            if item.ty() == &ItemType::Statsd {
921                for bucket_result in Bucket::parse_all(&payload, timestamp) {
922                    match bucket_result {
923                        Ok(bucket) => buckets.push(bucket),
924                        Err(error) => relay_log::debug!(
925                            error = &error as &dyn Error,
926                            "failed to parse metric bucket from statsd format",
927                        ),
928                    }
929                }
930            } else if item.ty() == &ItemType::MetricBuckets {
931                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
932                    Ok(parsed_buckets) => {
933                        // Re-use the allocation of `b` if possible.
934                        if buckets.is_empty() {
935                            buckets = parsed_buckets;
936                        } else {
937                            buckets.extend(parsed_buckets);
938                        }
939                    }
940                    Err(error) => {
941                        relay_log::debug!(
942                            error = &error as &dyn Error,
943                            "failed to parse metric bucket",
944                        );
945                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
946                    }
947                }
948            } else {
949                relay_log::error!(
950                    "invalid item of type {} passed to ProcessMetrics",
951                    item.ty()
952                );
953            }
954        }
955        buckets
956    }
957}
958
959#[derive(Debug)]
960pub struct ProcessBatchedMetrics {
961    /// Metrics payload in JSON format.
962    pub payload: Bytes,
963    /// Whether to keep or reset the metric metadata.
964    pub source: BucketSource,
965    /// The wall clock time at which the request was received.
966    pub received_at: DateTime<Utc>,
967    /// The wall clock time at which the request was received.
968    pub sent_at: Option<DateTime<Utc>>,
969}
970
971/// Source information where a metric bucket originates from.
972#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
973pub enum BucketSource {
974    /// The metric bucket originated from an internal Relay use case.
975    ///
976    /// The metric bucket originates either from within the same Relay
977    /// or was accepted coming from another Relay which is registered as
978    /// an internal Relay via Relay's configuration.
979    Internal,
980    /// The bucket source originated from an untrusted source.
981    ///
982    /// Managed Relays sending extracted metrics are considered external,
983    /// it's a project use case but it comes from an untrusted source.
984    External,
985}
986
987impl BucketSource {
988    /// Infers the bucket source from [`RequestMeta::request_trust`].
989    pub fn from_meta(meta: &RequestMeta) -> Self {
990        match meta.request_trust() {
991            RequestTrust::Trusted => Self::Internal,
992            RequestTrust::Untrusted => Self::External,
993        }
994    }
995}
996
997/// Sends a client report to the upstream.
998#[derive(Debug)]
999pub struct SubmitClientReports {
1000    /// The client report to be sent.
1001    pub client_reports: Vec<ClientReport>,
1002    /// Scoping information for the client report.
1003    pub scoping: Scoping,
1004}
1005
1006/// CPU-intensive processing tasks for envelopes.
1007#[derive(Debug)]
1008pub enum EnvelopeProcessor {
1009    ProcessEnvelope(Box<ProcessEnvelope>),
1010    ProcessProjectMetrics(Box<ProcessMetrics>),
1011    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1012    FlushBuckets(Box<FlushBuckets>),
1013    SubmitClientReports(Box<SubmitClientReports>),
1014}
1015
1016impl EnvelopeProcessor {
1017    /// Returns the name of the message variant.
1018    pub fn variant(&self) -> &'static str {
1019        match self {
1020            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1021            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1022            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1023            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1024            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1025        }
1026    }
1027}
1028
1029impl relay_system::Interface for EnvelopeProcessor {}
1030
1031impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1032    type Response = relay_system::NoResponse;
1033
1034    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1035        Self::ProcessEnvelope(Box::new(message))
1036    }
1037}
1038
1039impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1040    type Response = NoResponse;
1041
1042    fn from_message(message: ProcessMetrics, _: ()) -> Self {
1043        Self::ProcessProjectMetrics(Box::new(message))
1044    }
1045}
1046
1047impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1048    type Response = NoResponse;
1049
1050    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1051        Self::ProcessBatchedMetrics(Box::new(message))
1052    }
1053}
1054
1055impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1056    type Response = NoResponse;
1057
1058    fn from_message(message: FlushBuckets, _: ()) -> Self {
1059        Self::FlushBuckets(Box::new(message))
1060    }
1061}
1062
1063impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1064    type Response = NoResponse;
1065
1066    fn from_message(message: SubmitClientReports, _: ()) -> Self {
1067        Self::SubmitClientReports(Box::new(message))
1068    }
1069}
1070
1071/// The asynchronous thread pool used for scheduling processing tasks in the processor.
1072pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1073
1074/// Service implementing the [`EnvelopeProcessor`] interface.
1075///
1076/// This service handles messages in a worker pool with configurable concurrency.
1077#[derive(Clone)]
1078pub struct EnvelopeProcessorService {
1079    inner: Arc<InnerProcessor>,
1080}
1081
1082/// Contains the addresses of services that the processor publishes to.
1083pub struct Addrs {
1084    pub outcome_aggregator: Addr<TrackOutcome>,
1085    pub upstream_relay: Addr<UpstreamRelay>,
1086    #[cfg(feature = "processing")]
1087    pub objectstore: Option<Addr<Objectstore>>,
1088    #[cfg(feature = "processing")]
1089    pub store_forwarder: Option<Addr<Store>>,
1090    pub aggregator: Addr<Aggregator>,
1091}
1092
1093impl Default for Addrs {
1094    fn default() -> Self {
1095        Addrs {
1096            outcome_aggregator: Addr::dummy(),
1097            upstream_relay: Addr::dummy(),
1098            #[cfg(feature = "processing")]
1099            objectstore: None,
1100            #[cfg(feature = "processing")]
1101            store_forwarder: None,
1102            aggregator: Addr::dummy(),
1103        }
1104    }
1105}
1106
1107struct InnerProcessor {
1108    pool: EnvelopeProcessorServicePool,
1109    config: Arc<Config>,
1110    global_config: GlobalConfigHandle,
1111    project_cache: ProjectCacheHandle,
1112    cogs: Cogs,
1113    addrs: Addrs,
1114    #[cfg(feature = "processing")]
1115    rate_limiter: Option<Arc<RedisRateLimiter>>,
1116    #[cfg(feature = "processing")]
1117    geoip_lookup: GeoIpLookup,
1118    metric_outcomes: MetricOutcomes,
1119    processing: Processing,
1120}
1121
1122struct Processing {
1123    errors: ErrorsProcessor,
1124    logs: LogsProcessor,
1125    trace_metrics: TraceMetricsProcessor,
1126    spans: SpansProcessor,
1127    check_ins: CheckInsProcessor,
1128    sessions: SessionsProcessor,
1129    transactions: TransactionProcessor,
1130    profile_chunks: ProfileChunksProcessor,
1131    trace_attachments: TraceAttachmentsProcessor,
1132    replays: ReplaysProcessor,
1133    client_reports: ClientReportsProcessor,
1134    attachments: AttachmentProcessor,
1135    user_reports: UserReportsProcessor,
1136    profiles: ProfilesProcessor,
1137}
1138
1139impl EnvelopeProcessorService {
1140    /// Creates a multi-threaded envelope processor.
1141    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1142    pub fn new(
1143        pool: EnvelopeProcessorServicePool,
1144        config: Arc<Config>,
1145        global_config: GlobalConfigHandle,
1146        project_cache: ProjectCacheHandle,
1147        cogs: Cogs,
1148        #[cfg(feature = "processing")] redis: Option<RedisClients>,
1149        addrs: Addrs,
1150        metric_outcomes: MetricOutcomes,
1151    ) -> Self {
1152        let geoip_lookup = config
1153            .geoip_path()
1154            .and_then(
1155                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1156                    Ok(geoip) => Some(geoip),
1157                    Err(err) => {
1158                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1159                        None
1160                    }
1161                },
1162            )
1163            .unwrap_or_else(GeoIpLookup::empty);
1164
1165        #[cfg(feature = "processing")]
1166        let rate_limiter = redis.as_ref().map(|redis| {
1167            RedisRateLimiter::new(redis.quotas.clone())
1168                .max_limit(config.max_rate_limit())
1169                .cache(config.quota_cache_ratio(), config.quota_cache_max())
1170        });
1171
1172        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1173            #[cfg(feature = "processing")]
1174            project_cache.clone(),
1175            #[cfg(feature = "processing")]
1176            rate_limiter.clone(),
1177        ));
1178        #[cfg(feature = "processing")]
1179        let rate_limiter = rate_limiter.map(Arc::new);
1180        let outcome_aggregator = addrs.outcome_aggregator.clone();
1181        let inner = InnerProcessor {
1182            pool,
1183            global_config,
1184            project_cache,
1185            cogs,
1186            #[cfg(feature = "processing")]
1187            rate_limiter,
1188            addrs,
1189            metric_outcomes,
1190            #[cfg(feature = "processing")]
1191            geoip_lookup: geoip_lookup.clone(),
1192            processing: Processing {
1193                errors: ErrorsProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1194                logs: LogsProcessor::new(Arc::clone(&quota_limiter)),
1195                trace_metrics: TraceMetricsProcessor::new(Arc::clone(&quota_limiter)),
1196                spans: SpansProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1197                check_ins: CheckInsProcessor::new(Arc::clone(&quota_limiter)),
1198                sessions: SessionsProcessor::new(Arc::clone(&quota_limiter)),
1199                transactions: TransactionProcessor::new(
1200                    Arc::clone(&quota_limiter),
1201                    geoip_lookup.clone(),
1202                    #[cfg(feature = "processing")]
1203                    redis.map(|r| r.quotas),
1204                    #[cfg(not(feature = "processing"))]
1205                    None,
1206                ),
1207                profile_chunks: ProfileChunksProcessor::new(Arc::clone(&quota_limiter)),
1208                trace_attachments: TraceAttachmentsProcessor::new(Arc::clone(&quota_limiter)),
1209                replays: ReplaysProcessor::new(Arc::clone(&quota_limiter), geoip_lookup),
1210                client_reports: ClientReportsProcessor::new(outcome_aggregator),
1211                attachments: AttachmentProcessor::new(Arc::clone(&quota_limiter)),
1212                user_reports: UserReportsProcessor::new(Arc::clone(&quota_limiter)),
1213                profiles: ProfilesProcessor::new(quota_limiter),
1214            },
1215            config,
1216        };
1217
1218        Self {
1219            inner: Arc::new(inner),
1220        }
1221    }
1222
1223    async fn enforce_quotas<Group>(
1224        &self,
1225        managed_envelope: &mut TypedEnvelope<Group>,
1226        event: Annotated<Event>,
1227        extracted_metrics: &mut ProcessingExtractedMetrics,
1228        ctx: processing::Context<'_>,
1229    ) -> Result<Annotated<Event>, ProcessingError> {
1230        // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not
1231        // applied in the fast path, all cached quotas can be applied here.
1232        let cached_result = RateLimiter::Cached
1233            .enforce(managed_envelope, event, extracted_metrics, ctx)
1234            .await?;
1235
1236        if_processing!(self.inner.config, {
1237            let rate_limiter = match self.inner.rate_limiter.clone() {
1238                Some(rate_limiter) => rate_limiter,
1239                None => return Ok(cached_result.event),
1240            };
1241
1242            // Enforce all quotas consistently with Redis.
1243            let consistent_result = RateLimiter::Consistent(rate_limiter)
1244                .enforce(
1245                    managed_envelope,
1246                    cached_result.event,
1247                    extracted_metrics,
1248                    ctx
1249                )
1250                .await?;
1251
1252            // Update cached rate limits with the freshly computed ones.
1253            if !consistent_result.rate_limits.is_empty() {
1254                self.inner
1255                    .project_cache
1256                    .get(managed_envelope.scoping().project_key)
1257                    .rate_limits()
1258                    .merge(consistent_result.rate_limits);
1259            }
1260
1261            Ok(consistent_result.event)
1262        } else { Ok(cached_result.event) })
1263    }
1264
1265    async fn process_nel(
1266        &self,
1267        mut managed_envelope: ManagedEnvelope,
1268        ctx: processing::Context<'_>,
1269    ) -> Result<ProcessingResult, ProcessingError> {
1270        nel::convert_to_logs(&mut managed_envelope);
1271        self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1272            .await
1273    }
1274
1275    async fn process_with_processor<P: processing::Processor>(
1276        &self,
1277        processor: &P,
1278        mut managed_envelope: ManagedEnvelope,
1279        ctx: processing::Context<'_>,
1280    ) -> Result<ProcessingResult, ProcessingError>
1281    where
1282        Outputs: From<P::Output>,
1283    {
1284        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1285            debug_assert!(
1286                false,
1287                "there must be work for the {} processor",
1288                std::any::type_name::<P>(),
1289            );
1290            return Err(ProcessingError::ProcessingGroupMismatch);
1291        };
1292
1293        managed_envelope.update();
1294        match managed_envelope.envelope().is_empty() {
1295            true => managed_envelope.accept(),
1296            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1297        }
1298
1299        processor
1300            .process(work, ctx)
1301            .await
1302            .map_err(|err| {
1303                relay_log::debug!(
1304                    error = &err as &dyn std::error::Error,
1305                    "processing pipeline failed"
1306                );
1307                ProcessingError::ProcessingFailure
1308            })
1309            .map(|o| o.map(Into::into))
1310            .map(ProcessingResult::Output)
1311    }
1312
1313    /// Processes standalone spans.
1314    ///
1315    /// This function does *not* run for spans extracted from transactions.
1316    async fn process_standalone_spans(
1317        &self,
1318        managed_envelope: &mut TypedEnvelope<SpanGroup>,
1319        _project_id: ProjectId,
1320        ctx: processing::Context<'_>,
1321    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1322        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1323
1324        if_processing!(self.inner.config, {
1325            span::process(
1326                managed_envelope,
1327                &mut Annotated::empty(),
1328                &mut extracted_metrics,
1329                _project_id,
1330                ctx,
1331                &self.inner.geoip_lookup,
1332            )
1333            .await;
1334        });
1335
1336        self.enforce_quotas(
1337            managed_envelope,
1338            Annotated::empty(),
1339            &mut extracted_metrics,
1340            ctx,
1341        )
1342        .await?;
1343
1344        Ok(Some(extracted_metrics))
1345    }
1346
1347    async fn process_envelope(
1348        &self,
1349        project_id: ProjectId,
1350        message: ProcessEnvelopeGrouped<'_>,
1351    ) -> Result<ProcessingResult, ProcessingError> {
1352        let ProcessEnvelopeGrouped {
1353            group,
1354            envelope: mut managed_envelope,
1355            ctx,
1356        } = message;
1357
1358        // Pre-process the envelope headers.
1359        if let Some(sampling_state) = ctx.sampling_project_info {
1360            // Both transactions and standalone span envelopes need a normalized DSC header
1361            // to make sampling rules based on the segment/transaction name work correctly.
1362            managed_envelope
1363                .envelope_mut()
1364                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1365        }
1366
1367        // Set the event retention. Effectively, this value will only be available in processing
1368        // mode when the full project config is queried from the upstream.
1369        if let Some(retention) = ctx.project_info.config.event_retention {
1370            managed_envelope.envelope_mut().set_retention(retention);
1371        }
1372
1373        // Set the event retention. Effectively, this value will only be available in processing
1374        // mode when the full project config is queried from the upstream.
1375        if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1376            managed_envelope
1377                .envelope_mut()
1378                .set_downsampled_retention(retention);
1379        }
1380
1381        // Ensure the project ID is updated to the stored instance for this project cache. This can
1382        // differ in two cases:
1383        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
1384        //  2. The DSN was moved and the envelope sent to the old project ID.
1385        managed_envelope
1386            .envelope_mut()
1387            .meta_mut()
1388            .set_project_id(project_id);
1389
1390        macro_rules! run {
1391            ($fn_name:ident $(, $args:expr)*) => {
1392                async {
1393                    let mut managed_envelope = (managed_envelope, group).try_into()?;
1394                    match self.$fn_name(&mut managed_envelope, $($args),*).await {
1395                        Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1396                            managed_envelope: managed_envelope.into_processed(),
1397                            extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1398                        }),
1399                        Err(error) => {
1400                            relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1401                            if let Some(outcome) = error.to_outcome() {
1402                                managed_envelope.reject(outcome);
1403                            }
1404
1405                            return Err(error);
1406                        }
1407                    }
1408                }.await
1409            };
1410        }
1411
1412        relay_log::trace!("Processing {group} group", group = group.variant());
1413
1414        match group {
1415            ProcessingGroup::Error => {
1416                self.process_with_processor(&self.inner.processing.errors, managed_envelope, ctx)
1417                    .await
1418            }
1419            ProcessingGroup::Transaction => {
1420                self.process_with_processor(
1421                    &self.inner.processing.transactions,
1422                    managed_envelope,
1423                    ctx,
1424                )
1425                .await
1426            }
1427            ProcessingGroup::Session => {
1428                self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1429                    .await
1430            }
1431            ProcessingGroup::StandaloneAttachments => {
1432                self.process_with_processor(
1433                    &self.inner.processing.attachments,
1434                    managed_envelope,
1435                    ctx,
1436                )
1437                .await
1438            }
1439            ProcessingGroup::StandaloneUserReports => {
1440                self.process_with_processor(
1441                    &self.inner.processing.user_reports,
1442                    managed_envelope,
1443                    ctx,
1444                )
1445                .await
1446            }
1447            ProcessingGroup::StandaloneProfiles => {
1448                self.process_with_processor(&self.inner.processing.profiles, managed_envelope, ctx)
1449                    .await
1450            }
1451            ProcessingGroup::ClientReport => {
1452                self.process_with_processor(
1453                    &self.inner.processing.client_reports,
1454                    managed_envelope,
1455                    ctx,
1456                )
1457                .await
1458            }
1459            ProcessingGroup::Replay => {
1460                self.process_with_processor(&self.inner.processing.replays, managed_envelope, ctx)
1461                    .await
1462            }
1463            ProcessingGroup::CheckIn => {
1464                self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1465                    .await
1466            }
1467            ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1468            ProcessingGroup::Log => {
1469                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1470                    .await
1471            }
1472            ProcessingGroup::TraceMetric => {
1473                self.process_with_processor(
1474                    &self.inner.processing.trace_metrics,
1475                    managed_envelope,
1476                    ctx,
1477                )
1478                .await
1479            }
1480            ProcessingGroup::SpanV2 => {
1481                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1482                    .await
1483            }
1484            ProcessingGroup::TraceAttachment => {
1485                self.process_with_processor(
1486                    &self.inner.processing.trace_attachments,
1487                    managed_envelope,
1488                    ctx,
1489                )
1490                .await
1491            }
1492            ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1493            ProcessingGroup::ProfileChunk => {
1494                self.process_with_processor(
1495                    &self.inner.processing.profile_chunks,
1496                    managed_envelope,
1497                    ctx,
1498                )
1499                .await
1500            }
1501            // Currently is not used.
1502            ProcessingGroup::Metrics => {
1503                // In proxy mode we simply forward the metrics.
1504                // This group shouldn't be used outside of proxy mode.
1505                if self.inner.config.relay_mode() != RelayMode::Proxy {
1506                    relay_log::error!(
1507                        tags.project = %project_id,
1508                        items = ?managed_envelope.envelope().items().next().map(Item::ty),
1509                        "received metrics in the process_state"
1510                    );
1511                }
1512
1513                Ok(ProcessingResult::no_metrics(
1514                    managed_envelope.into_processed(),
1515                ))
1516            }
1517            // Fallback to the legacy process_state implementation for Ungrouped events.
1518            ProcessingGroup::Ungrouped => {
1519                relay_log::error!(
1520                    tags.project = %project_id,
1521                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
1522                    "could not identify the processing group based on the envelope's items"
1523                );
1524
1525                Err(ProcessingError::NoProcessingGroup)
1526            }
1527            // Leave this group unchanged.
1528            //
1529            // This will later be forwarded to upstream.
1530            ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
1531                managed_envelope.into_processed(),
1532            )),
1533        }
1534    }
1535
1536    /// Processes the envelope and returns the processed envelope back.
1537    ///
1538    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
1539    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
1540    /// to be dropped, this is `None`.
1541    async fn process<'a>(
1542        &self,
1543        mut message: ProcessEnvelopeGrouped<'a>,
1544    ) -> Result<Option<Submit<'a>>, ProcessingError> {
1545        let ProcessEnvelopeGrouped {
1546            ref mut envelope,
1547            ctx,
1548            ..
1549        } = message;
1550
1551        // Prefer the project's project ID, and fall back to the stated project id from the
1552        // envelope. The project ID is available in all modes, other than in proxy mode, where
1553        // envelopes for unknown projects are forwarded blindly.
1554        //
1555        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
1556        // since we cannot process an envelope without project ID, so drop it.
1557        let Some(project_id) = ctx
1558            .project_info
1559            .project_id
1560            .or_else(|| envelope.envelope().meta().project_id())
1561        else {
1562            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1563            return Err(ProcessingError::MissingProjectId);
1564        };
1565
1566        let client = envelope.envelope().meta().client().map(str::to_owned);
1567        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1568        let project_key = envelope.envelope().meta().public_key();
1569        // Only allow sending to the sampling key, if we successfully loaded a sampling project
1570        // info relating to it. This filters out unknown/invalid project keys as well as project
1571        // keys from different organizations.
1572        let sampling_key = envelope
1573            .envelope()
1574            .sampling_key()
1575            .filter(|_| ctx.sampling_project_info.is_some());
1576
1577        // We set additional information on the scope, which will be removed after processing the
1578        // envelope.
1579        relay_log::configure_scope(|scope| {
1580            scope.set_tag("project", project_id);
1581            if let Some(client) = client {
1582                scope.set_tag("sdk", client);
1583            }
1584            if let Some(user_agent) = user_agent {
1585                scope.set_extra("user_agent", user_agent.into());
1586            }
1587        });
1588
1589        let result = match self.process_envelope(project_id, message).await {
1590            Ok(ProcessingResult::Envelope {
1591                mut managed_envelope,
1592                extracted_metrics,
1593            }) => {
1594                // The envelope could be modified or even emptied during processing, which
1595                // requires re-computation of the context.
1596                managed_envelope.update();
1597
1598                let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
1599                send_metrics(
1600                    extracted_metrics.metrics,
1601                    project_key,
1602                    sampling_key,
1603                    &self.inner.addrs.aggregator,
1604                );
1605
1606                let envelope_response = if managed_envelope.envelope().is_empty() {
1607                    if !has_metrics {
1608                        // Individual rate limits have already been issued
1609                        managed_envelope.reject(Outcome::RateLimited(None));
1610                    } else {
1611                        managed_envelope.accept();
1612                    }
1613
1614                    None
1615                } else {
1616                    Some(managed_envelope)
1617                };
1618
1619                Ok(envelope_response.map(Submit::Envelope))
1620            }
1621            Ok(ProcessingResult::Output(Output { main, metrics })) => {
1622                if let Some(metrics) = metrics {
1623                    metrics.accept(|metrics| {
1624                        send_metrics(
1625                            metrics,
1626                            project_key,
1627                            sampling_key,
1628                            &self.inner.addrs.aggregator,
1629                        );
1630                    });
1631                }
1632
1633                let ctx = ctx.to_forward();
1634                Ok(main.map(|output| Submit::Output { output, ctx }))
1635            }
1636            Err(err) => Err(err),
1637        };
1638
1639        relay_log::configure_scope(|scope| {
1640            scope.remove_tag("project");
1641            scope.remove_tag("sdk");
1642            scope.remove_tag("user_agent");
1643        });
1644
1645        result
1646    }
1647
1648    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1649        let project_key = message.envelope.envelope().meta().public_key();
1650        let wait_time = message.envelope.age();
1651        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1652
1653        // This COGS handling may need an overhaul in the future:
1654        // Cancel the passed in token, to start individual measurements per envelope instead.
1655        cogs.cancel();
1656
1657        let scoping = message.envelope.scoping();
1658        for (group, envelope) in ProcessingGroup::split_envelope(
1659            *message.envelope.into_envelope(),
1660            &message.project_info,
1661        ) {
1662            let mut cogs = self
1663                .inner
1664                .cogs
1665                .timed(ResourceId::Relay, AppFeature::from(group));
1666
1667            let mut envelope =
1668                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1669            envelope.scope(scoping);
1670
1671            let global_config = self.inner.global_config.current();
1672
1673            let ctx = processing::Context {
1674                config: &self.inner.config,
1675                global_config: &global_config,
1676                project_info: &message.project_info,
1677                sampling_project_info: message.sampling_project_info.as_deref(),
1678                rate_limits: &message.rate_limits,
1679                reservoir_counters: &message.reservoir_counters,
1680            };
1681
1682            let message = ProcessEnvelopeGrouped {
1683                group,
1684                envelope,
1685                ctx,
1686            };
1687
1688            let result = metric!(
1689                timer(RelayTimers::EnvelopeProcessingTime),
1690                group = group.variant(),
1691                { self.process(message).await }
1692            );
1693
1694            match result {
1695                Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
1696                Ok(None) => {}
1697                Err(error) if error.is_unexpected() => {
1698                    relay_log::error!(
1699                        tags.project_key = %project_key,
1700                        error = &error as &dyn Error,
1701                        "error processing envelope"
1702                    )
1703                }
1704                Err(error) => {
1705                    relay_log::debug!(
1706                        tags.project_key = %project_key,
1707                        error = &error as &dyn Error,
1708                        "error processing envelope"
1709                    )
1710                }
1711            }
1712        }
1713    }
1714
1715    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1716        let ProcessMetrics {
1717            data,
1718            project_key,
1719            received_at,
1720            sent_at,
1721            source,
1722        } = message;
1723
1724        let received_timestamp =
1725            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1726
1727        let mut buckets = data.into_buckets(received_timestamp);
1728        if buckets.is_empty() {
1729            return;
1730        };
1731        cogs.update(relay_metrics::cogs::BySize(&buckets));
1732
1733        let clock_drift_processor =
1734            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
1735
1736        buckets.retain_mut(|bucket| {
1737            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
1738                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
1739                return false;
1740            }
1741
1742            if !self::metrics::is_valid_namespace(bucket) {
1743                return false;
1744            }
1745
1746            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
1747
1748            if !matches!(source, BucketSource::Internal) {
1749                bucket.metadata = BucketMetadata::new(received_timestamp);
1750            }
1751
1752            true
1753        });
1754
1755        let project = self.inner.project_cache.get(project_key);
1756
1757        // Best effort check to filter and rate limit buckets, if there is no project state
1758        // available at the current time, we will check again after flushing.
1759        let buckets = match project.state() {
1760            ProjectState::Enabled(project_info) => {
1761                let rate_limits = project.rate_limits().current_limits();
1762                self.check_buckets(project_key, project_info, &rate_limits, buckets)
1763            }
1764            _ => buckets,
1765        };
1766
1767        relay_log::trace!("merging metric buckets into the aggregator");
1768        self.inner
1769            .addrs
1770            .aggregator
1771            .send(MergeBuckets::new(project_key, buckets));
1772    }
1773
1774    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
1775        let ProcessBatchedMetrics {
1776            payload,
1777            source,
1778            received_at,
1779            sent_at,
1780        } = message;
1781
1782        #[derive(serde::Deserialize)]
1783        struct Wrapper {
1784            buckets: HashMap<ProjectKey, Vec<Bucket>>,
1785        }
1786
1787        let buckets = match serde_json::from_slice(&payload) {
1788            Ok(Wrapper { buckets }) => buckets,
1789            Err(error) => {
1790                relay_log::debug!(
1791                    error = &error as &dyn Error,
1792                    "failed to parse batched metrics",
1793                );
1794                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1795                return;
1796            }
1797        };
1798
1799        for (project_key, buckets) in buckets {
1800            self.handle_process_metrics(
1801                cogs,
1802                ProcessMetrics {
1803                    data: MetricData::Parsed(buckets),
1804                    project_key,
1805                    source,
1806                    received_at,
1807                    sent_at,
1808                },
1809            )
1810        }
1811    }
1812
1813    fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
1814        let _submit = cogs.start_category("submit");
1815
1816        #[cfg(feature = "processing")]
1817        if self.inner.config.processing_enabled()
1818            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
1819        {
1820            use crate::processing::StoreHandle;
1821
1822            let objectstore = self.inner.addrs.objectstore.as_ref();
1823            let global_config = &self.inner.global_config.current();
1824            let handle = StoreHandle::new(store_forwarder, objectstore, global_config);
1825
1826            match submit {
1827                // Once check-ins and errors are fully moved to the new pipeline, this is only
1828                // used for metrics forwarding.
1829                //
1830                // Metrics forwarding will n_never_ forward an envelope in processing, making
1831                // this branch here unused.
1832                Submit::Envelope(envelope) => handle.send_envelope(envelope.into_inner()),
1833                Submit::Output { output, ctx } => output
1834                    .forward_store(handle, ctx)
1835                    .unwrap_or_else(|err| err.into_inner()),
1836            }
1837            return;
1838        }
1839
1840        let mut envelope = match submit {
1841            Submit::Envelope(envelope) => envelope,
1842            Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
1843                Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
1844                Err(_) => {
1845                    relay_log::error!("failed to serialize output to an envelope");
1846                    return;
1847                }
1848            },
1849        };
1850
1851        if envelope.envelope_mut().is_empty() {
1852            envelope.accept();
1853            return;
1854        }
1855
1856        // Override the `sent_at` timestamp. Since the envelope went through basic
1857        // normalization, all timestamps have been corrected. We propagate the new
1858        // `sent_at` to allow the next Relay to double-check this timestamp and
1859        // potentially apply correction again. This is done as close to sending as
1860        // possible so that we avoid internal delays.
1861        envelope.envelope_mut().set_sent_at(Utc::now());
1862
1863        relay_log::trace!("sending envelope to sentry endpoint");
1864        let http_encoding = self.inner.config.http_encoding();
1865        let result = envelope.envelope().to_vec().and_then(|v| {
1866            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
1867        });
1868
1869        match result {
1870            Ok(body) => {
1871                self.inner
1872                    .addrs
1873                    .upstream_relay
1874                    .send(SendRequest(SendEnvelope {
1875                        envelope,
1876                        body,
1877                        http_encoding,
1878                        project_cache: self.inner.project_cache.clone(),
1879                    }));
1880            }
1881            Err(error) => {
1882                // Errors are only logged for what we consider an internal discard reason. These
1883                // indicate errors in the infrastructure or implementation bugs.
1884                relay_log::error!(
1885                    error = &error as &dyn Error,
1886                    tags.project_key = %envelope.scoping().project_key,
1887                    "failed to serialize envelope payload"
1888                );
1889
1890                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1891            }
1892        }
1893    }
1894
1895    fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
1896        let SubmitClientReports {
1897            client_reports,
1898            scoping,
1899        } = message;
1900
1901        let upstream = self.inner.config.upstream_descriptor();
1902        let dsn = PartialDsn::outbound(&scoping, upstream);
1903
1904        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
1905        for client_report in client_reports {
1906            match client_report.serialize() {
1907                Ok(payload) => {
1908                    let mut item = Item::new(ItemType::ClientReport);
1909                    item.set_payload(ContentType::Json, payload);
1910                    envelope.add_item(item);
1911                }
1912                Err(error) => {
1913                    relay_log::error!(
1914                        error = &error as &dyn std::error::Error,
1915                        "failed to serialize client report"
1916                    );
1917                }
1918            }
1919        }
1920
1921        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1922        self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
1923    }
1924
1925    fn check_buckets(
1926        &self,
1927        project_key: ProjectKey,
1928        project_info: &ProjectInfo,
1929        rate_limits: &RateLimits,
1930        buckets: Vec<Bucket>,
1931    ) -> Vec<Bucket> {
1932        let Some(scoping) = project_info.scoping(project_key) else {
1933            relay_log::error!(
1934                tags.project_key = project_key.as_str(),
1935                "there is no scoping: dropping {} buckets",
1936                buckets.len(),
1937            );
1938            return Vec::new();
1939        };
1940
1941        let mut buckets = self::metrics::apply_project_info(
1942            buckets,
1943            &self.inner.metric_outcomes,
1944            project_info,
1945            scoping,
1946        );
1947
1948        let namespaces: BTreeSet<MetricNamespace> = buckets
1949            .iter()
1950            .filter_map(|bucket| bucket.name.try_namespace())
1951            .collect();
1952
1953        for namespace in namespaces {
1954            let limits = rate_limits.check_with_quotas(
1955                project_info.get_quotas(),
1956                scoping.item(DataCategory::MetricBucket),
1957            );
1958
1959            if limits.is_limited() {
1960                let rejected;
1961                (buckets, rejected) = utils::split_off(buckets, |bucket| {
1962                    bucket.name.try_namespace() == Some(namespace)
1963                });
1964
1965                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1966                self.inner.metric_outcomes.track(
1967                    scoping,
1968                    &rejected,
1969                    Outcome::RateLimited(reason_code),
1970                );
1971            }
1972        }
1973
1974        let quotas = project_info.config.quotas.clone();
1975        match MetricsLimiter::create(buckets, quotas, scoping) {
1976            Ok(mut bucket_limiter) => {
1977                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
1978                bucket_limiter.into_buckets()
1979            }
1980            Err(buckets) => buckets,
1981        }
1982    }
1983
1984    #[cfg(feature = "processing")]
1985    async fn rate_limit_buckets(
1986        &self,
1987        scoping: Scoping,
1988        project_info: &ProjectInfo,
1989        mut buckets: Vec<Bucket>,
1990    ) -> Vec<Bucket> {
1991        let Some(rate_limiter) = &self.inner.rate_limiter else {
1992            return buckets;
1993        };
1994
1995        let global_config = self.inner.global_config.current();
1996        let namespaces = buckets
1997            .iter()
1998            .filter_map(|bucket| bucket.name.try_namespace())
1999            .counts();
2000
2001        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2002
2003        for (namespace, quantity) in namespaces {
2004            let item_scoping = scoping.metric_bucket(namespace);
2005
2006            let limits = match rate_limiter
2007                .is_rate_limited(quotas, item_scoping, quantity, false)
2008                .await
2009            {
2010                Ok(limits) => limits,
2011                Err(err) => {
2012                    relay_log::error!(
2013                        error = &err as &dyn std::error::Error,
2014                        "failed to check redis rate limits"
2015                    );
2016                    break;
2017                }
2018            };
2019
2020            if limits.is_limited() {
2021                let rejected;
2022                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2023                    bucket.name.try_namespace() == Some(namespace)
2024                });
2025
2026                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2027                self.inner.metric_outcomes.track(
2028                    scoping,
2029                    &rejected,
2030                    Outcome::RateLimited(reason_code),
2031                );
2032
2033                self.inner
2034                    .project_cache
2035                    .get(item_scoping.scoping.project_key)
2036                    .rate_limits()
2037                    .merge(limits);
2038            }
2039        }
2040
2041        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2042            Err(buckets) => buckets,
2043            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2044        }
2045    }
2046
2047    /// Check and apply rate limits to metrics buckets for transactions and spans.
2048    #[cfg(feature = "processing")]
2049    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2050        relay_log::trace!("handle_rate_limit_buckets");
2051
2052        let scoping = *bucket_limiter.scoping();
2053
2054        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2055            let global_config = self.inner.global_config.current();
2056            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2057
2058            // We set over_accept_once such that the limit is actually reached, which allows subsequent
2059            // calls with quantity=0 to be rate limited.
2060            let over_accept_once = true;
2061            let mut rate_limits = RateLimits::new();
2062
2063            let (category, count) = bucket_limiter.count();
2064
2065            let timer = Instant::now();
2066            let mut is_limited = false;
2067
2068            if let Some(count) = count {
2069                match rate_limiter
2070                    .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2071                    .await
2072                {
2073                    Ok(limits) => {
2074                        is_limited = limits.is_limited();
2075                        rate_limits.merge(limits)
2076                    }
2077                    Err(e) => {
2078                        relay_log::error!(error = &e as &dyn Error, "rate limiting error")
2079                    }
2080                }
2081            }
2082
2083            relay_statsd::metric!(
2084                timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2085                category = category.name(),
2086                limited = if is_limited { "true" } else { "false" },
2087                count = match count {
2088                    None => "none",
2089                    Some(0) => "0",
2090                    Some(1) => "1",
2091                    Some(1..=10) => "10",
2092                    Some(1..=25) => "25",
2093                    Some(1..=50) => "50",
2094                    Some(51..=100) => "100",
2095                    Some(101..=500) => "500",
2096                    _ => "> 500",
2097                },
2098            );
2099
2100            if rate_limits.is_limited() {
2101                let was_enforced =
2102                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2103
2104                if was_enforced {
2105                    // Update the rate limits in the project cache.
2106                    self.inner
2107                        .project_cache
2108                        .get(scoping.project_key)
2109                        .rate_limits()
2110                        .merge(rate_limits);
2111                }
2112            }
2113        }
2114
2115        bucket_limiter.into_buckets()
2116    }
2117
2118    /// Processes metric buckets and sends them to Kafka.
2119    ///
2120    /// This function runs the following steps:
2121    ///  - rate limiting
2122    ///  - submit to `StoreForwarder`
2123    #[cfg(feature = "processing")]
2124    async fn encode_metrics_processing(
2125        &self,
2126        message: FlushBuckets,
2127        store_forwarder: &Addr<Store>,
2128    ) {
2129        use crate::constants::DEFAULT_EVENT_RETENTION;
2130        use crate::services::store::StoreMetrics;
2131
2132        for ProjectBuckets {
2133            buckets,
2134            scoping,
2135            project_info,
2136            ..
2137        } in message.buckets.into_values()
2138        {
2139            let buckets = self
2140                .rate_limit_buckets(scoping, &project_info, buckets)
2141                .await;
2142
2143            if buckets.is_empty() {
2144                continue;
2145            }
2146
2147            let retention = project_info
2148                .config
2149                .event_retention
2150                .unwrap_or(DEFAULT_EVENT_RETENTION);
2151
2152            // The store forwarder takes care of bucket splitting internally, so we can submit the
2153            // entire list of buckets. There is no batching needed here.
2154            store_forwarder.send(StoreMetrics {
2155                buckets,
2156                scoping,
2157                retention,
2158            });
2159        }
2160    }
2161
2162    /// Serializes metric buckets to JSON and sends them to the upstream.
2163    ///
2164    /// This function runs the following steps:
2165    ///  - partitioning
2166    ///  - batching by configured size limit
2167    ///  - serialize to JSON and pack in an envelope
2168    ///  - submit the envelope to upstream or kafka depending on configuration
2169    ///
2170    /// Rate limiting runs only in processing Relays as it requires access to the central Redis instance.
2171    /// Cached rate limits are applied in the project cache already.
2172    fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2173        let FlushBuckets {
2174            partition_key,
2175            buckets,
2176        } = message;
2177
2178        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2179        let upstream = self.inner.config.upstream_descriptor();
2180
2181        for ProjectBuckets {
2182            buckets, scoping, ..
2183        } in buckets.values()
2184        {
2185            let dsn = PartialDsn::outbound(scoping, upstream);
2186
2187            relay_statsd::metric!(
2188                distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2189            );
2190
2191            let mut num_batches = 0;
2192            for batch in BucketsView::from(buckets).by_size(batch_size) {
2193                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2194
2195                let mut item = Item::new(ItemType::MetricBuckets);
2196                item.set_source_quantities(crate::metrics::extract_quantities(batch));
2197                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2198                envelope.add_item(item);
2199
2200                let mut envelope =
2201                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2202                envelope
2203                    .set_partition_key(Some(partition_key))
2204                    .scope(*scoping);
2205
2206                relay_statsd::metric!(
2207                    distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2208                );
2209
2210                self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2211                num_batches += 1;
2212            }
2213
2214            relay_statsd::metric!(
2215                distribution(RelayDistributions::BatchesPerPartition) = num_batches
2216            );
2217        }
2218    }
2219
2220    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
2221    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2222        if partition.is_empty() {
2223            return;
2224        }
2225
2226        let (unencoded, project_info) = partition.take();
2227        let http_encoding = self.inner.config.http_encoding();
2228        let encoded = match encode_payload(&unencoded, http_encoding) {
2229            Ok(payload) => payload,
2230            Err(error) => {
2231                let error = &error as &dyn std::error::Error;
2232                relay_log::error!(error, "failed to encode metrics payload");
2233                return;
2234            }
2235        };
2236
2237        let request = SendMetricsRequest {
2238            partition_key: partition_key.to_string(),
2239            unencoded,
2240            encoded,
2241            project_info,
2242            http_encoding,
2243            metric_outcomes: self.inner.metric_outcomes.clone(),
2244        };
2245
2246        self.inner.addrs.upstream_relay.send(SendRequest(request));
2247    }
2248
2249    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
2250    ///
2251    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
2252    /// payload directly instead of per-project Envelopes.
2253    ///
2254    /// This function runs the following steps:
2255    ///  - partitioning
2256    ///  - batching by configured size limit
2257    ///  - serialize to JSON
2258    ///  - submit directly to the upstream
2259    fn encode_metrics_global(&self, message: FlushBuckets) {
2260        let FlushBuckets {
2261            partition_key,
2262            buckets,
2263        } = message;
2264
2265        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2266        let mut partition = Partition::new(batch_size);
2267        let mut partition_splits = 0;
2268
2269        for ProjectBuckets {
2270            buckets, scoping, ..
2271        } in buckets.values()
2272        {
2273            for bucket in buckets {
2274                let mut remaining = Some(BucketView::new(bucket));
2275
2276                while let Some(bucket) = remaining.take() {
2277                    if let Some(next) = partition.insert(bucket, *scoping) {
2278                        // A part of the bucket could not be inserted. Take the partition and submit
2279                        // it immediately. Repeat until the final part was inserted. This should
2280                        // always result in a request, otherwise we would enter an endless loop.
2281                        self.send_global_partition(partition_key, &mut partition);
2282                        remaining = Some(next);
2283                        partition_splits += 1;
2284                    }
2285                }
2286            }
2287        }
2288
2289        if partition_splits > 0 {
2290            metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2291        }
2292
2293        self.send_global_partition(partition_key, &mut partition);
2294    }
2295
2296    async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2297        for (project_key, pb) in message.buckets.iter_mut() {
2298            let buckets = std::mem::take(&mut pb.buckets);
2299            pb.buckets =
2300                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2301        }
2302
2303        #[cfg(feature = "processing")]
2304        if self.inner.config.processing_enabled()
2305            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2306        {
2307            return self
2308                .encode_metrics_processing(message, store_forwarder)
2309                .await;
2310        }
2311
2312        if self.inner.config.http_global_metrics() {
2313            self.encode_metrics_global(message)
2314        } else {
2315            self.encode_metrics_envelope(cogs, message)
2316        }
2317    }
2318
2319    #[cfg(all(test, feature = "processing"))]
2320    fn redis_rate_limiter_enabled(&self) -> bool {
2321        self.inner.rate_limiter.is_some()
2322    }
2323
2324    async fn handle_message(&self, message: EnvelopeProcessor) {
2325        let ty = message.variant();
2326        let feature_weights = self.feature_weights(&message);
2327
2328        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2329            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2330
2331            match message {
2332                EnvelopeProcessor::ProcessEnvelope(m) => {
2333                    self.handle_process_envelope(&mut cogs, *m).await
2334                }
2335                EnvelopeProcessor::ProcessProjectMetrics(m) => {
2336                    self.handle_process_metrics(&mut cogs, *m)
2337                }
2338                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2339                    self.handle_process_batched_metrics(&mut cogs, *m)
2340                }
2341                EnvelopeProcessor::FlushBuckets(m) => {
2342                    self.handle_flush_buckets(&mut cogs, *m).await
2343                }
2344                EnvelopeProcessor::SubmitClientReports(m) => {
2345                    self.handle_submit_client_reports(&mut cogs, *m)
2346                }
2347            }
2348        });
2349    }
2350
2351    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2352        match message {
2353            // Envelope is split later and tokens are attributed then.
2354            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2355            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2356            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2357            EnvelopeProcessor::FlushBuckets(v) => v
2358                .buckets
2359                .values()
2360                .map(|s| {
2361                    if self.inner.config.processing_enabled() {
2362                        // Processing does not encode the metrics but instead rate limit the metrics,
2363                        // which scales by count and not size.
2364                        relay_metrics::cogs::ByCount(&s.buckets).into()
2365                    } else {
2366                        relay_metrics::cogs::BySize(&s.buckets).into()
2367                    }
2368                })
2369                .fold(FeatureWeights::none(), FeatureWeights::merge),
2370            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2371        }
2372    }
2373}
2374
2375impl Service for EnvelopeProcessorService {
2376    type Interface = EnvelopeProcessor;
2377
2378    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2379        while let Some(message) = rx.recv().await {
2380            let service = self.clone();
2381            self.inner
2382                .pool
2383                .spawn_async(
2384                    async move {
2385                        service.handle_message(message).await;
2386                    }
2387                    .boxed(),
2388                )
2389                .await;
2390        }
2391    }
2392}
2393
2394/// Result of the enforcement of rate limiting.
2395///
2396/// If the event is already `None` or it's rate limited, it will be `None`
2397/// within the [`Annotated`].
2398struct EnforcementResult {
2399    event: Annotated<Event>,
2400    #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2401    rate_limits: RateLimits,
2402}
2403
2404impl EnforcementResult {
2405    /// Creates a new [`EnforcementResult`].
2406    pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
2407        Self { event, rate_limits }
2408    }
2409}
2410
2411#[derive(Clone)]
2412enum RateLimiter {
2413    Cached,
2414    #[cfg(feature = "processing")]
2415    Consistent(Arc<RedisRateLimiter>),
2416}
2417
2418impl RateLimiter {
2419    async fn enforce<Group>(
2420        &self,
2421        managed_envelope: &mut TypedEnvelope<Group>,
2422        event: Annotated<Event>,
2423        _extracted_metrics: &mut ProcessingExtractedMetrics,
2424        ctx: processing::Context<'_>,
2425    ) -> Result<EnforcementResult, ProcessingError> {
2426        if managed_envelope.envelope().is_empty() && event.value().is_none() {
2427            return Ok(EnforcementResult::new(event, RateLimits::default()));
2428        }
2429
2430        let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
2431        if quotas.is_empty() {
2432            return Ok(EnforcementResult::new(event, RateLimits::default()));
2433        }
2434
2435        let event_category = event_category(&event);
2436
2437        // We extract the rate limiters, in case we perform consistent rate limiting, since we will
2438        // need Redis access.
2439        //
2440        // When invoking the rate limiter, capture if the event item has been rate limited to also
2441        // remove it from the processing state eventually.
2442        let this = self.clone();
2443        let mut envelope_limiter =
2444            EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
2445                let this = this.clone();
2446
2447                async move {
2448                    match this {
2449                        #[cfg(feature = "processing")]
2450                        RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
2451                            rate_limiter
2452                                .is_rate_limited(quotas, item_scope, _quantity, false)
2453                                .await?,
2454                        ),
2455                        _ => Ok::<_, ProcessingError>(
2456                            ctx.rate_limits.check_with_quotas(quotas, item_scope),
2457                        ),
2458                    }
2459                }
2460            });
2461
2462        // Tell the envelope limiter about the event, since it has been removed from the Envelope at
2463        // this stage in processing.
2464        if let Some(category) = event_category {
2465            envelope_limiter.assume_event(category);
2466        }
2467
2468        let scoping = managed_envelope.scoping();
2469        let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
2470            envelope_limiter
2471                .compute(managed_envelope.envelope_mut(), &scoping)
2472                .await
2473        })?;
2474        let event_active = enforcement.is_event_active();
2475
2476        // Use the same rate limits as used for the envelope on the metrics.
2477        // Those rate limits should not be checked for expiry or similar to ensure a consistent
2478        // limiting of envelope items and metrics.
2479        #[cfg(feature = "processing")]
2480        _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
2481        enforcement.apply_with_outcomes(managed_envelope);
2482
2483        if event_active {
2484            debug_assert!(managed_envelope.envelope().is_empty());
2485            return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
2486        }
2487
2488        Ok(EnforcementResult::new(event, rate_limits))
2489    }
2490
2491    fn name(&self) -> &'static str {
2492        match self {
2493            Self::Cached => "cached",
2494            #[cfg(feature = "processing")]
2495            Self::Consistent(_) => "consistent",
2496        }
2497    }
2498}
2499
2500pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2501    let envelope_body: Vec<u8> = match http_encoding {
2502        HttpEncoding::Identity => return Ok(body.clone()),
2503        HttpEncoding::Deflate => {
2504            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2505            encoder.write_all(body.as_ref())?;
2506            encoder.finish()?
2507        }
2508        HttpEncoding::Gzip => {
2509            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2510            encoder.write_all(body.as_ref())?;
2511            encoder.finish()?
2512        }
2513        HttpEncoding::Br => {
2514            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
2515            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2516            encoder.write_all(body.as_ref())?;
2517            encoder.into_inner()
2518        }
2519        HttpEncoding::Zstd => {
2520            // Use the fastest compression level, our main objective here is to get the best
2521            // compression ratio for least amount of time spent.
2522            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2523            encoder.write_all(body.as_ref())?;
2524            encoder.finish()?
2525        }
2526    };
2527
2528    Ok(envelope_body.into())
2529}
2530
2531/// An upstream request that submits an envelope via HTTP.
2532#[derive(Debug)]
2533pub struct SendEnvelope {
2534    pub envelope: TypedEnvelope<Processed>,
2535    pub body: Bytes,
2536    pub http_encoding: HttpEncoding,
2537    pub project_cache: ProjectCacheHandle,
2538}
2539
2540impl UpstreamRequest for SendEnvelope {
2541    fn method(&self) -> reqwest::Method {
2542        reqwest::Method::POST
2543    }
2544
2545    fn path(&self) -> Cow<'_, str> {
2546        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2547    }
2548
2549    fn route(&self) -> &'static str {
2550        "envelope"
2551    }
2552
2553    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2554        let envelope_body = self.body.clone();
2555        metric!(
2556            distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2557        );
2558
2559        let meta = &self.envelope.meta();
2560        let shard = self.envelope.partition_key().map(|p| p.to_string());
2561        builder
2562            .content_encoding(self.http_encoding)
2563            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2564            .header_opt("User-Agent", meta.user_agent())
2565            .header("X-Sentry-Auth", meta.auth_header())
2566            .header("X-Forwarded-For", meta.forwarded_for())
2567            .header("Content-Type", envelope::CONTENT_TYPE)
2568            .header_opt("X-Sentry-Relay-Shard", shard)
2569            .body(envelope_body);
2570
2571        Ok(())
2572    }
2573
2574    fn sign(&mut self) -> Option<Sign> {
2575        Some(Sign::Optional(SignatureType::RequestSign))
2576    }
2577
2578    fn respond(
2579        self: Box<Self>,
2580        result: Result<http::Response, UpstreamRequestError>,
2581    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2582        Box::pin(async move {
2583            let result = match result {
2584                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2585                Err(error) => Err(error),
2586            };
2587
2588            match result {
2589                Ok(()) => self.envelope.accept(),
2590                Err(error) if error.is_received() => {
2591                    let scoping = self.envelope.scoping();
2592                    self.envelope.accept();
2593
2594                    if let UpstreamRequestError::RateLimited(limits) = error {
2595                        self.project_cache
2596                            .get(scoping.project_key)
2597                            .rate_limits()
2598                            .merge(limits.scope(&scoping));
2599                    }
2600                }
2601                Err(error) => {
2602                    // Errors are only logged for what we consider an internal discard reason. These
2603                    // indicate errors in the infrastructure or implementation bugs.
2604                    let mut envelope = self.envelope;
2605                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2606                    relay_log::error!(
2607                        error = &error as &dyn Error,
2608                        tags.project_key = %envelope.scoping().project_key,
2609                        "error sending envelope"
2610                    );
2611                }
2612            }
2613        })
2614    }
2615}
2616
2617/// A container for metric buckets from multiple projects.
2618///
2619/// This container is used to send metrics to the upstream in global batches as part of the
2620/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
2621/// the size of all metrics and allows to split them into multiple batches. See
2622/// [`insert`](Self::insert) for more information.
2623#[derive(Debug)]
2624struct Partition<'a> {
2625    max_size: usize,
2626    remaining: usize,
2627    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2628    project_info: HashMap<ProjectKey, Scoping>,
2629}
2630
2631impl<'a> Partition<'a> {
2632    /// Creates a new partition with the given maximum size in bytes.
2633    pub fn new(size: usize) -> Self {
2634        Self {
2635            max_size: size,
2636            remaining: size,
2637            views: HashMap::new(),
2638            project_info: HashMap::new(),
2639        }
2640    }
2641
2642    /// Inserts a bucket into the partition, splitting it if necessary.
2643    ///
2644    /// This function attempts to add the bucket to this partition. If the bucket does not fit
2645    /// entirely into the partition given its maximum size, the remaining part of the bucket is
2646    /// returned from this function call.
2647    ///
2648    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
2649    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
2650    /// partition. Afterwards, the caller is responsible to call this function again with the
2651    /// remaining bucket until it is fully inserted.
2652    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
2653        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
2654
2655        if let Some(current) = current {
2656            self.remaining = self.remaining.saturating_sub(current.estimated_size());
2657            self.views
2658                .entry(scoping.project_key)
2659                .or_default()
2660                .push(current);
2661
2662            self.project_info
2663                .entry(scoping.project_key)
2664                .or_insert(scoping);
2665        }
2666
2667        next
2668    }
2669
2670    /// Returns `true` if the partition does not hold any data.
2671    fn is_empty(&self) -> bool {
2672        self.views.is_empty()
2673    }
2674
2675    /// Returns the serialized buckets for this partition.
2676    ///
2677    /// This empties the partition, so that it can be reused.
2678    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
2679        #[derive(serde::Serialize)]
2680        struct Wrapper<'a> {
2681            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
2682        }
2683
2684        let buckets = &self.views;
2685        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
2686
2687        let scopings = self.project_info.clone();
2688        self.project_info.clear();
2689
2690        self.views.clear();
2691        self.remaining = self.max_size;
2692
2693        (payload, scopings)
2694    }
2695}
2696
2697/// An upstream request that submits metric buckets via HTTP.
2698///
2699/// This request is not awaited. It automatically tracks outcomes if the request is not received.
2700#[derive(Debug)]
2701struct SendMetricsRequest {
2702    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
2703    partition_key: String,
2704    /// Serialized metric buckets without encoding applied, used for signing.
2705    unencoded: Bytes,
2706    /// Serialized metric buckets with the stated HTTP encoding applied.
2707    encoded: Bytes,
2708    /// Mapping of all contained project keys to their scoping and extraction mode.
2709    ///
2710    /// Used to track outcomes for transmission failures.
2711    project_info: HashMap<ProjectKey, Scoping>,
2712    /// Encoding (compression) of the payload.
2713    http_encoding: HttpEncoding,
2714    /// Metric outcomes instance to send outcomes on error.
2715    metric_outcomes: MetricOutcomes,
2716}
2717
2718impl SendMetricsRequest {
2719    fn create_error_outcomes(self) {
2720        #[derive(serde::Deserialize)]
2721        struct Wrapper {
2722            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
2723        }
2724
2725        let buckets = match serde_json::from_slice(&self.unencoded) {
2726            Ok(Wrapper { buckets }) => buckets,
2727            Err(err) => {
2728                relay_log::error!(
2729                    error = &err as &dyn std::error::Error,
2730                    "failed to parse buckets from failed transmission"
2731                );
2732                return;
2733            }
2734        };
2735
2736        for (key, buckets) in buckets {
2737            let Some(&scoping) = self.project_info.get(&key) else {
2738                relay_log::error!("missing scoping for project key");
2739                continue;
2740            };
2741
2742            self.metric_outcomes.track(
2743                scoping,
2744                &buckets,
2745                Outcome::Invalid(DiscardReason::Internal),
2746            );
2747        }
2748    }
2749}
2750
2751impl UpstreamRequest for SendMetricsRequest {
2752    fn set_relay_id(&self) -> bool {
2753        true
2754    }
2755
2756    fn sign(&mut self) -> Option<Sign> {
2757        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
2758    }
2759
2760    fn method(&self) -> reqwest::Method {
2761        reqwest::Method::POST
2762    }
2763
2764    fn path(&self) -> Cow<'_, str> {
2765        "/api/0/relays/metrics/".into()
2766    }
2767
2768    fn route(&self) -> &'static str {
2769        "global_metrics"
2770    }
2771
2772    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2773        metric!(
2774            distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
2775        );
2776
2777        builder
2778            .content_encoding(self.http_encoding)
2779            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
2780            .header(header::CONTENT_TYPE, b"application/json")
2781            .body(self.encoded.clone());
2782
2783        Ok(())
2784    }
2785
2786    fn respond(
2787        self: Box<Self>,
2788        result: Result<http::Response, UpstreamRequestError>,
2789    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2790        Box::pin(async {
2791            match result {
2792                Ok(mut response) => {
2793                    response.consume().await.ok();
2794                }
2795                Err(error) => {
2796                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
2797
2798                    // If the request did not arrive at the upstream, we are responsible for outcomes.
2799                    // Otherwise, the upstream is responsible to log outcomes.
2800                    if error.is_received() {
2801                        return;
2802                    }
2803
2804                    self.create_error_outcomes()
2805                }
2806            }
2807        })
2808    }
2809}
2810
2811/// Container for global and project level [`Quota`].
2812#[derive(Copy, Clone, Debug)]
2813struct CombinedQuotas<'a> {
2814    global_quotas: &'a [Quota],
2815    project_quotas: &'a [Quota],
2816}
2817
2818impl<'a> CombinedQuotas<'a> {
2819    /// Returns a new [`CombinedQuotas`].
2820    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
2821        Self {
2822            global_quotas: &global_config.quotas,
2823            project_quotas,
2824        }
2825    }
2826
2827    /// Returns `true` if both global quotas and project quotas are empty.
2828    pub fn is_empty(&self) -> bool {
2829        self.len() == 0
2830    }
2831
2832    /// Returns the number of both global and project quotas.
2833    pub fn len(&self) -> usize {
2834        self.global_quotas.len() + self.project_quotas.len()
2835    }
2836}
2837
2838impl<'a> IntoIterator for CombinedQuotas<'a> {
2839    type Item = &'a Quota;
2840    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
2841
2842    fn into_iter(self) -> Self::IntoIter {
2843        self.global_quotas.iter().chain(self.project_quotas.iter())
2844    }
2845}
2846
2847#[cfg(test)]
2848mod tests {
2849    use insta::assert_debug_snapshot;
2850    use relay_common::glob2::LazyGlob;
2851    use relay_dynamic_config::ProjectConfig;
2852    use relay_event_normalization::{
2853        NormalizationConfig, RedactionRule, TransactionNameConfig, TransactionNameRule,
2854    };
2855    use relay_event_schema::protocol::TransactionSource;
2856    use relay_pii::DataScrubbingConfig;
2857    use similar_asserts::assert_eq;
2858
2859    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
2860
2861    #[cfg(feature = "processing")]
2862    use {
2863        relay_metrics::BucketValue,
2864        relay_quotas::{QuotaScope, ReasonCode},
2865        relay_test::mock_service,
2866    };
2867
2868    use super::*;
2869
2870    #[cfg(feature = "processing")]
2871    fn mock_quota(id: &str) -> Quota {
2872        Quota {
2873            id: Some(id.into()),
2874            categories: [DataCategory::MetricBucket].into(),
2875            scope: QuotaScope::Organization,
2876            scope_id: None,
2877            limit: Some(0),
2878            window: None,
2879            reason_code: None,
2880            namespace: None,
2881        }
2882    }
2883
2884    #[cfg(feature = "processing")]
2885    #[test]
2886    fn test_dynamic_quotas() {
2887        let global_config = GlobalConfig {
2888            quotas: vec![mock_quota("foo"), mock_quota("bar")],
2889            ..Default::default()
2890        };
2891
2892        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
2893
2894        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
2895
2896        assert_eq!(dynamic_quotas.len(), 4);
2897        assert!(!dynamic_quotas.is_empty());
2898
2899        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
2900        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
2901    }
2902
2903    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
2904    /// also ratelimit the next batches in the same message automatically.
2905    #[cfg(feature = "processing")]
2906    #[tokio::test]
2907    async fn test_ratelimit_per_batch() {
2908        use relay_base_schema::organization::OrganizationId;
2909        use relay_protocol::FiniteF64;
2910
2911        let rate_limited_org = Scoping {
2912            organization_id: OrganizationId::new(1),
2913            project_id: ProjectId::new(21),
2914            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
2915            key_id: Some(17),
2916        };
2917
2918        let not_rate_limited_org = Scoping {
2919            organization_id: OrganizationId::new(2),
2920            project_id: ProjectId::new(21),
2921            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
2922            key_id: Some(17),
2923        };
2924
2925        let message = {
2926            let project_info = {
2927                let quota = Quota {
2928                    id: Some("testing".into()),
2929                    categories: [DataCategory::MetricBucket].into(),
2930                    scope: relay_quotas::QuotaScope::Organization,
2931                    scope_id: Some(rate_limited_org.organization_id.to_string().into()),
2932                    limit: Some(0),
2933                    window: None,
2934                    reason_code: Some(ReasonCode::new("test")),
2935                    namespace: None,
2936                };
2937
2938                let mut config = ProjectConfig::default();
2939                config.quotas.push(quota);
2940
2941                Arc::new(ProjectInfo {
2942                    config,
2943                    ..Default::default()
2944                })
2945            };
2946
2947            let project_metrics = |scoping| ProjectBuckets {
2948                buckets: vec![Bucket {
2949                    name: "d:spans/bar".into(),
2950                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
2951                    timestamp: UnixTimestamp::now(),
2952                    tags: Default::default(),
2953                    width: 10,
2954                    metadata: BucketMetadata::default(),
2955                }],
2956                rate_limits: Default::default(),
2957                project_info: project_info.clone(),
2958                scoping,
2959            };
2960
2961            let buckets = hashbrown::HashMap::from([
2962                (
2963                    rate_limited_org.project_key,
2964                    project_metrics(rate_limited_org),
2965                ),
2966                (
2967                    not_rate_limited_org.project_key,
2968                    project_metrics(not_rate_limited_org),
2969                ),
2970            ]);
2971
2972            FlushBuckets {
2973                partition_key: 0,
2974                buckets,
2975            }
2976        };
2977
2978        // ensure the order of the map while iterating is as expected.
2979        assert_eq!(message.buckets.keys().count(), 2);
2980
2981        let config = {
2982            let config_json = serde_json::json!({
2983                "processing": {
2984                    "enabled": true,
2985                    "kafka_config": [],
2986                    "redis": {
2987                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
2988                    }
2989                }
2990            });
2991            Config::from_json_value(config_json).unwrap()
2992        };
2993
2994        let (store, handle) = {
2995            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
2996                let org_id = match msg {
2997                    Store::Metrics(x) => x.scoping.organization_id,
2998                    _ => panic!("received envelope when expecting only metrics"),
2999                };
3000                org_ids.push(org_id);
3001            };
3002
3003            mock_service("store_forwarder", vec![], f)
3004        };
3005
3006        let processor = create_test_processor(config).await;
3007        assert!(processor.redis_rate_limiter_enabled());
3008
3009        processor.encode_metrics_processing(message, &store).await;
3010
3011        drop(store);
3012        let orgs_not_ratelimited = handle.await.unwrap();
3013
3014        assert_eq!(
3015            orgs_not_ratelimited,
3016            vec![not_rate_limited_org.organization_id]
3017        );
3018    }
3019
3020    #[tokio::test]
3021    async fn test_browser_version_extraction_with_pii_like_data() {
3022        let processor = create_test_processor(Default::default()).await;
3023        let outcome_aggregator = Addr::dummy();
3024        let event_id = EventId::new();
3025
3026        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3027            .parse()
3028            .unwrap();
3029
3030        let request_meta = RequestMeta::new(dsn);
3031        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3032
3033        envelope.add_item({
3034                let mut item = Item::new(ItemType::Event);
3035                item.set_payload(
3036                    ContentType::Json,
3037                    r#"
3038                    {
3039                        "request": {
3040                            "headers": [
3041                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3042                            ]
3043                        }
3044                    }
3045                "#,
3046                );
3047                item
3048            });
3049
3050        let mut datascrubbing_settings = DataScrubbingConfig::default();
3051        // enable all the default scrubbing
3052        datascrubbing_settings.scrub_data = true;
3053        datascrubbing_settings.scrub_defaults = true;
3054        datascrubbing_settings.scrub_ip_addresses = true;
3055
3056        // Make sure to mask any IP-like looking data
3057        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3058
3059        let config = ProjectConfig {
3060            datascrubbing_settings,
3061            pii_config: Some(pii_config),
3062            ..Default::default()
3063        };
3064
3065        let project_info = ProjectInfo {
3066            config,
3067            ..Default::default()
3068        };
3069
3070        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3071        assert_eq!(envelopes.len(), 1);
3072
3073        let (group, envelope) = envelopes.pop().unwrap();
3074        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3075
3076        let message = ProcessEnvelopeGrouped {
3077            group,
3078            envelope,
3079            ctx: processing::Context {
3080                project_info: &project_info,
3081                ..processing::Context::for_test()
3082            },
3083        };
3084
3085        let Ok(Some(Submit::Output { output, ctx })) = processor.process(message).await else {
3086            panic!();
3087        };
3088        let new_envelope = output.serialize_envelope(ctx).unwrap().accept(|f| f);
3089
3090        let event_item = new_envelope.items().last().unwrap();
3091        let annotated_event: Annotated<Event> =
3092            Annotated::from_json_bytes(&event_item.payload()).unwrap();
3093        let event = annotated_event.into_value().unwrap();
3094        let headers = event
3095            .request
3096            .into_value()
3097            .unwrap()
3098            .headers
3099            .into_value()
3100            .unwrap();
3101
3102        // IP-like data must be masked
3103        assert_eq!(
3104            Some(
3105                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3106            ),
3107            headers.get_header("User-Agent")
3108        );
3109        // But we still get correct browser and version number
3110        let contexts = event.contexts.into_value().unwrap();
3111        let browser = contexts.0.get("browser").unwrap();
3112        assert_eq!(
3113            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3114            browser.to_json().unwrap()
3115        );
3116    }
3117
3118    #[tokio::test]
3119    #[cfg(feature = "processing")]
3120    async fn test_materialize_dsc() {
3121        use crate::services::projects::project::PublicKeyConfig;
3122
3123        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3124            .parse()
3125            .unwrap();
3126        let request_meta = RequestMeta::new(dsn);
3127        let mut envelope = Envelope::from_request(None, request_meta);
3128
3129        let dsc = r#"{
3130            "trace_id": "00000000-0000-0000-0000-000000000000",
3131            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3132            "sample_rate": "0.2"
3133        }"#;
3134        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3135
3136        let mut item = Item::new(ItemType::Event);
3137        item.set_payload(ContentType::Json, r#"{}"#);
3138        envelope.add_item(item);
3139
3140        let outcome_aggregator = Addr::dummy();
3141        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3142
3143        let mut project_info = ProjectInfo::default();
3144        project_info.public_keys.push(PublicKeyConfig {
3145            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3146            numeric_id: Some(1),
3147        });
3148
3149        let config = serde_json::json!({
3150            "processing": {
3151                "enabled": true,
3152                "kafka_config": [],
3153            }
3154        });
3155
3156        let message = ProcessEnvelopeGrouped {
3157            group: ProcessingGroup::Error,
3158            envelope: managed_envelope,
3159            ctx: processing::Context {
3160                config: &Config::from_json_value(config.clone()).unwrap(),
3161                project_info: &project_info,
3162                sampling_project_info: Some(&project_info),
3163                ..processing::Context::for_test()
3164            },
3165        };
3166
3167        let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3168        let Ok(Some(Submit::Output { output, ctx })) = processor.process(message).await else {
3169            panic!();
3170        };
3171        let envelope = output.serialize_envelope(ctx).unwrap();
3172        let event = envelope
3173            .get_item_by(|item| item.ty() == &ItemType::Event)
3174            .unwrap();
3175
3176        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3177        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3178        Object(
3179            {
3180                "environment": ~,
3181                "public_key": String(
3182                    "e12d836b15bb49d7bbf99e64295d995b",
3183                ),
3184                "release": ~,
3185                "replay_id": ~,
3186                "sample_rate": String(
3187                    "0.2",
3188                ),
3189                "trace_id": String(
3190                    "00000000000000000000000000000000",
3191                ),
3192                "transaction": ~,
3193            },
3194        )
3195        "###);
3196    }
3197
3198    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3199        let mut event = Annotated::<Event>::from_json(
3200            r#"
3201            {
3202                "type": "transaction",
3203                "transaction": "/foo/",
3204                "timestamp": 946684810.0,
3205                "start_timestamp": 946684800.0,
3206                "contexts": {
3207                    "trace": {
3208                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3209                        "span_id": "fa90fdead5f74053",
3210                        "op": "http.server",
3211                        "type": "trace"
3212                    }
3213                },
3214                "transaction_info": {
3215                    "source": "url"
3216                }
3217            }
3218            "#,
3219        )
3220        .unwrap();
3221        let e = event.value_mut().as_mut().unwrap();
3222        e.transaction.set_value(Some(transaction_name.into()));
3223
3224        e.transaction_info
3225            .value_mut()
3226            .as_mut()
3227            .unwrap()
3228            .source
3229            .set_value(Some(source));
3230
3231        relay_statsd::with_capturing_test_client(|| {
3232            utils::log_transaction_name_metrics(&mut event, |event| {
3233                let config = NormalizationConfig {
3234                    transaction_name_config: TransactionNameConfig {
3235                        rules: &[TransactionNameRule {
3236                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
3237                            expiry: DateTime::<Utc>::MAX_UTC,
3238                            redaction: RedactionRule::Replace {
3239                                substitution: "*".to_owned(),
3240                            },
3241                        }],
3242                    },
3243                    ..Default::default()
3244                };
3245                relay_event_normalization::normalize_event(event, &config)
3246            });
3247        })
3248    }
3249
3250    #[test]
3251    fn test_log_transaction_metrics_none() {
3252        let captures = capture_test_event("/nothing", TransactionSource::Url);
3253        insta::assert_debug_snapshot!(captures, @r###"
3254        [
3255            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3256        ]
3257        "###);
3258    }
3259
3260    #[test]
3261    fn test_log_transaction_metrics_rule() {
3262        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3263        insta::assert_debug_snapshot!(captures, @r###"
3264        [
3265            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3266        ]
3267        "###);
3268    }
3269
3270    #[test]
3271    fn test_log_transaction_metrics_pattern() {
3272        let captures = capture_test_event("/something/12345", TransactionSource::Url);
3273        insta::assert_debug_snapshot!(captures, @r###"
3274        [
3275            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3276        ]
3277        "###);
3278    }
3279
3280    #[test]
3281    fn test_log_transaction_metrics_both() {
3282        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3283        insta::assert_debug_snapshot!(captures, @r###"
3284        [
3285            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3286        ]
3287        "###);
3288    }
3289
3290    #[test]
3291    fn test_log_transaction_metrics_no_match() {
3292        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3293        insta::assert_debug_snapshot!(captures, @r###"
3294        [
3295            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3296        ]
3297        "###);
3298    }
3299
3300    #[tokio::test]
3301    async fn test_process_metrics_bucket_metadata() {
3302        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3303        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3304        let received_at = Utc::now();
3305        let config = Config::default();
3306
3307        let (aggregator, mut aggregator_rx) = Addr::custom();
3308        let processor = create_test_processor_with_addrs(
3309            config,
3310            Addrs {
3311                aggregator,
3312                ..Default::default()
3313            },
3314        )
3315        .await;
3316
3317        let mut item = Item::new(ItemType::Statsd);
3318        item.set_payload(ContentType::Text, "spans/foo:3182887624:4267882815|s");
3319        for (source, expected_received_at) in [
3320            (
3321                BucketSource::External,
3322                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3323            ),
3324            (BucketSource::Internal, None),
3325        ] {
3326            let message = ProcessMetrics {
3327                data: MetricData::Raw(vec![item.clone()]),
3328                project_key,
3329                source,
3330                received_at,
3331                sent_at: Some(Utc::now()),
3332            };
3333            processor.handle_process_metrics(&mut token, message);
3334
3335            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3336            let buckets = merge_buckets.buckets;
3337            assert_eq!(buckets.len(), 1);
3338            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3339        }
3340    }
3341
3342    #[tokio::test]
3343    async fn test_process_batched_metrics() {
3344        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3345        let received_at = Utc::now();
3346        let config = Config::default();
3347
3348        let (aggregator, mut aggregator_rx) = Addr::custom();
3349        let processor = create_test_processor_with_addrs(
3350            config,
3351            Addrs {
3352                aggregator,
3353                ..Default::default()
3354            },
3355        )
3356        .await;
3357
3358        let payload = r#"{
3359    "buckets": {
3360        "11111111111111111111111111111111": [
3361            {
3362                "timestamp": 1615889440,
3363                "width": 0,
3364                "name": "d:custom/endpoint.response_time@millisecond",
3365                "type": "d",
3366                "value": [
3367                  68.0
3368                ],
3369                "tags": {
3370                  "route": "user_index"
3371                }
3372            }
3373        ],
3374        "22222222222222222222222222222222": [
3375            {
3376                "timestamp": 1615889440,
3377                "width": 0,
3378                "name": "d:custom/endpoint.cache_rate@none",
3379                "type": "d",
3380                "value": [
3381                  36.0
3382                ]
3383            }
3384        ]
3385    }
3386}
3387"#;
3388        let message = ProcessBatchedMetrics {
3389            payload: Bytes::from(payload),
3390            source: BucketSource::Internal,
3391            received_at,
3392            sent_at: Some(Utc::now()),
3393        };
3394        processor.handle_process_batched_metrics(&mut token, message);
3395
3396        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3397        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3398
3399        let mut messages = vec![mb1, mb2];
3400        messages.sort_by_key(|mb| mb.project_key);
3401
3402        let actual = messages
3403            .into_iter()
3404            .map(|mb| (mb.project_key, mb.buckets))
3405            .collect::<Vec<_>>();
3406
3407        assert_debug_snapshot!(actual, @r###"
3408        [
3409            (
3410                ProjectKey("11111111111111111111111111111111"),
3411                [
3412                    Bucket {
3413                        timestamp: UnixTimestamp(1615889440),
3414                        width: 0,
3415                        name: MetricName(
3416                            "d:custom/endpoint.response_time@millisecond",
3417                        ),
3418                        value: Distribution(
3419                            [
3420                                68.0,
3421                            ],
3422                        ),
3423                        tags: {
3424                            "route": "user_index",
3425                        },
3426                        metadata: BucketMetadata {
3427                            merges: 1,
3428                            received_at: None,
3429                            extracted_from_indexed: false,
3430                        },
3431                    },
3432                ],
3433            ),
3434            (
3435                ProjectKey("22222222222222222222222222222222"),
3436                [
3437                    Bucket {
3438                        timestamp: UnixTimestamp(1615889440),
3439                        width: 0,
3440                        name: MetricName(
3441                            "d:custom/endpoint.cache_rate@none",
3442                        ),
3443                        value: Distribution(
3444                            [
3445                                36.0,
3446                            ],
3447                        ),
3448                        tags: {},
3449                        metadata: BucketMetadata {
3450                            merges: 1,
3451                            received_at: None,
3452                            extracted_from_indexed: false,
3453                        },
3454                    },
3455                ],
3456            ),
3457        ]
3458        "###);
3459    }
3460}