Skip to main content

relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeMap, BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, UpstreamDescriptor};
23use relay_dynamic_config::Feature;
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{ClientReport, EventId, SpanV2};
27use relay_filter::FilterStatKey;
28use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
29use relay_quotas::{DataCategory, RateLimits, Scoping};
30use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision};
31use relay_statsd::metric;
32use relay_system::{Addr, FromMessage, NoResponse, Service};
33use reqwest::header;
34use smallvec::{SmallVec, smallvec};
35use zstd::stream::Encoder as ZstdEncoder;
36
37use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType};
38use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
39use crate::integrations::Integration;
40use crate::managed::ManagedEnvelope;
41use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
42use crate::metrics_extraction::ExtractedMetrics;
43use crate::processing::attachments::AttachmentProcessor;
44use crate::processing::check_ins::CheckInsProcessor;
45use crate::processing::client_reports::ClientReportsProcessor;
46use crate::processing::errors::{ErrorsProcessor, SwitchProcessingError};
47use crate::processing::forward_unknown::ForwardUnknownProcessor;
48use crate::processing::legacy_spans::LegacySpansProcessor;
49use crate::processing::logs::LogsProcessor;
50use crate::processing::profile_chunks::ProfileChunksProcessor;
51use crate::processing::profiles::ProfilesProcessor;
52use crate::processing::replays::ReplaysProcessor;
53use crate::processing::sessions::SessionsProcessor;
54use crate::processing::spans::SpansProcessor;
55use crate::processing::trace_attachments::TraceAttachmentsProcessor;
56use crate::processing::trace_metrics::TraceMetricsProcessor;
57use crate::processing::transactions::TransactionProcessor;
58use crate::processing::user_reports::UserReportsProcessor;
59use crate::processing::{Forward as _, ForwardContext, Output, Outputs, QuotaRateLimiter};
60use crate::service::ServiceError;
61use crate::services::global_config::GlobalConfigHandle;
62use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
63use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
64use crate::services::projects::cache::ProjectCacheHandle;
65use crate::services::projects::project::{ProjectInfo, ProjectState};
66use crate::services::upstream::{
67    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
68};
69use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
70use crate::utils;
71use crate::{http, processing};
72use relay_threading::AsyncPool;
73use symbolic_unreal::{Unreal4Error, Unreal4ErrorKind};
74#[cfg(feature = "processing")]
75use {
76    crate::services::objectstore::Objectstore,
77    crate::services::store::Store,
78    itertools::Itertools,
79    relay_dynamic_config::GlobalConfig,
80    relay_quotas::{Quota, RateLimitingError, RedisRateLimiter},
81    relay_redis::RedisClients,
82    std::time::Instant,
83};
84
85mod metrics;
86
87/// The minimum clock drift for correction to apply.
88pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
89
90#[derive(Debug)]
91pub struct GroupTypeError;
92
93impl Display for GroupTypeError {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        f.write_str("failed to convert processing group into corresponding type")
96    }
97}
98
99impl std::error::Error for GroupTypeError {}
100
101macro_rules! processing_group {
102    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
103        #[derive(Clone, Copy, Debug)]
104        pub struct $ty;
105
106        impl From<$ty> for ProcessingGroup {
107            fn from(_: $ty) -> Self {
108                ProcessingGroup::$variant
109            }
110        }
111
112        impl TryFrom<ProcessingGroup> for $ty {
113            type Error = GroupTypeError;
114
115            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
116                if matches!(value, ProcessingGroup::$variant) {
117                    return Ok($ty);
118                }
119                $($(
120                    if matches!(value, ProcessingGroup::$other) {
121                        return Ok($ty);
122                    }
123                )+)?
124                return Err(GroupTypeError);
125            }
126        }
127    };
128}
129
130processing_group!(TransactionGroup, Transaction);
131
132processing_group!(ErrorGroup, Error);
133
134processing_group!(SessionGroup, Session);
135processing_group!(ClientReportGroup, ClientReport);
136processing_group!(ReplayGroup, Replay);
137processing_group!(CheckInGroup, CheckIn);
138processing_group!(TraceMetricGroup, TraceMetric);
139processing_group!(SpanGroup, Span);
140
141processing_group!(ProfileChunkGroup, ProfileChunk);
142processing_group!(ForwardUnknownGroup, ForwardUnknown);
143processing_group!(Ungrouped, Ungrouped);
144
145/// Describes the groups of the processable items.
146#[derive(Clone, Copy, Debug)]
147pub enum ProcessingGroup {
148    /// All the transaction related items.
149    ///
150    /// Includes transactions, related attachments, profiles.
151    Transaction,
152    /// All the items which require (have or create) events.
153    ///
154    /// This includes: errors, NEL, security reports, user reports, some of the
155    /// attachments.
156    Error,
157    /// Session events.
158    Session,
159    /// Standalone attachments
160    ///
161    /// Attachments that are send without an item that creates an event in the same envelope.
162    StandaloneAttachments,
163    /// Standalone user reports
164    ///
165    /// User reports that are send without an item that creates an event in the same envelope.
166    StandaloneUserReports,
167    /// Standalone profiles
168    ///
169    /// Profiles which had their transaction sampled.
170    StandaloneProfiles,
171    /// Outcomes.
172    ClientReport,
173    /// Replays and ReplayRecordings.
174    Replay,
175    /// Crons.
176    CheckIn,
177    /// Logs.
178    Log,
179    /// Trace metrics.
180    TraceMetric,
181    /// Spans.
182    Span,
183    /// Span V2 spans.
184    SpanV2,
185    /// ProfileChunk.
186    ProfileChunk,
187    /// V2 attachments without span / log association.
188    TraceAttachment,
189    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
190    /// decide what to do with them.
191    ForwardUnknown,
192    /// All the items in the envelope that could not be grouped.
193    Ungrouped,
194}
195
196impl ProcessingGroup {
197    /// Splits provided envelope into list of tuples of groups with associated envelopes.
198    fn split_envelope(
199        mut envelope: Envelope,
200        project_info: &ProjectInfo,
201    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
202        let headers = envelope.headers().clone();
203        let mut grouped_envelopes = smallvec![];
204
205        // Extract replays.
206        let replay_items = envelope.take_items_by(|item| {
207            matches!(
208                item.ty(),
209                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
210            )
211        });
212        if !replay_items.is_empty() {
213            grouped_envelopes.push((
214                ProcessingGroup::Replay,
215                Envelope::from_parts(headers.clone(), replay_items),
216            ))
217        }
218
219        // Keep all the sessions together in one envelope.
220        let session_items = envelope
221            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
222        if !session_items.is_empty() {
223            grouped_envelopes.push((
224                ProcessingGroup::Session,
225                Envelope::from_parts(headers.clone(), session_items),
226            ))
227        }
228
229        let span_v2_items = envelope.take_items_by(|item| {
230            let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
231
232            ItemContainer::<SpanV2>::is_container(item)
233                || matches!(item.integration(), Some(Integration::Spans(_)))
234                // Process standalone spans (v1) via the v2 pipeline
235                || (exp_feature && matches!(item.ty(), &ItemType::Span))
236                || (exp_feature && item.is_span_attachment())
237        });
238
239        if !span_v2_items.is_empty() {
240            grouped_envelopes.push((
241                ProcessingGroup::SpanV2,
242                Envelope::from_parts(headers.clone(), span_v2_items),
243            ))
244        }
245
246        // Extract spans.
247        let span_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Span));
248        if !span_items.is_empty() {
249            grouped_envelopes.push((
250                ProcessingGroup::Span,
251                Envelope::from_parts(headers.clone(), span_items),
252            ))
253        }
254
255        // Extract logs.
256        let logs_items = envelope.take_items_by(|item| {
257            matches!(item.ty(), &ItemType::Log)
258                || matches!(item.integration(), Some(Integration::Logs(_)))
259        });
260        if !logs_items.is_empty() {
261            grouped_envelopes.push((
262                ProcessingGroup::Log,
263                Envelope::from_parts(headers.clone(), logs_items),
264            ))
265        }
266
267        // Extract trace metrics.
268        let trace_metric_items =
269            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
270        if !trace_metric_items.is_empty() {
271            grouped_envelopes.push((
272                ProcessingGroup::TraceMetric,
273                Envelope::from_parts(headers.clone(), trace_metric_items),
274            ))
275        }
276
277        // Extract profile chunks.
278        let profile_chunk_items =
279            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
280        if !profile_chunk_items.is_empty() {
281            grouped_envelopes.push((
282                ProcessingGroup::ProfileChunk,
283                Envelope::from_parts(headers.clone(), profile_chunk_items),
284            ))
285        }
286
287        let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
288        if !trace_attachment_items.is_empty() {
289            grouped_envelopes.push((
290                ProcessingGroup::TraceAttachment,
291                Envelope::from_parts(headers.clone(), trace_attachment_items),
292            ))
293        }
294
295        if !envelope.items().any(Item::creates_event) {
296            // Extract the standalone attachments
297            let standalone_attachments = envelope
298                .take_items_by(|i| i.requires_event() && matches!(i.ty(), ItemType::Attachment));
299            if !standalone_attachments.is_empty() {
300                grouped_envelopes.push((
301                    ProcessingGroup::StandaloneAttachments,
302                    Envelope::from_parts(headers.clone(), standalone_attachments),
303                ))
304            }
305
306            // Extract the standalone user reports
307            let standalone_user_reports =
308                envelope.take_items_by(|i| matches!(i.ty(), ItemType::UserReport));
309            if !standalone_user_reports.is_empty() {
310                grouped_envelopes.push((
311                    ProcessingGroup::StandaloneUserReports,
312                    Envelope::from_parts(headers.clone(), standalone_user_reports),
313                ));
314            }
315
316            // Extract the standalone profiles
317            let standalone_profiles =
318                envelope.take_items_by(|i| matches!(i.ty(), ItemType::Profile));
319            if !standalone_profiles.is_empty() {
320                grouped_envelopes.push((
321                    ProcessingGroup::StandaloneProfiles,
322                    Envelope::from_parts(headers.clone(), standalone_profiles),
323                ));
324            }
325        }
326
327        // Make sure we create separate envelopes for each `RawSecurity` report.
328        let security_reports_items = envelope
329            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
330            .into_iter()
331            .map(|item| {
332                let headers = headers.clone();
333                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
334                let mut envelope = Envelope::from_parts(headers, items);
335                envelope.set_event_id(EventId::new());
336                (ProcessingGroup::Error, envelope)
337            });
338        grouped_envelopes.extend(security_reports_items);
339
340        // Extract all the items which require an event into separate envelope.
341        let require_event_items = envelope.take_items_by(Item::requires_event);
342        if !require_event_items.is_empty() {
343            let group = if require_event_items
344                .iter()
345                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
346            {
347                ProcessingGroup::Transaction
348            } else {
349                ProcessingGroup::Error
350            };
351
352            grouped_envelopes.push((
353                group,
354                Envelope::from_parts(headers.clone(), require_event_items),
355            ))
356        }
357
358        // Get the rest of the envelopes, one per item.
359        let envelopes = envelope.items_mut().map(|item| {
360            let headers = headers.clone();
361            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
362            let envelope = Envelope::from_parts(headers, items);
363            let item_type = item.ty();
364            let group = if matches!(item_type, &ItemType::CheckIn) {
365                ProcessingGroup::CheckIn
366            } else if matches!(item.ty(), &ItemType::ClientReport) {
367                ProcessingGroup::ClientReport
368            } else if matches!(item_type, &ItemType::Unknown(_)) {
369                ProcessingGroup::ForwardUnknown
370            } else {
371                // Cannot group this item type.
372                ProcessingGroup::Ungrouped
373            };
374
375            (group, envelope)
376        });
377        grouped_envelopes.extend(envelopes);
378
379        grouped_envelopes
380    }
381
382    /// Returns the name of the group.
383    pub fn variant(&self) -> &'static str {
384        match self {
385            ProcessingGroup::Transaction => "transaction",
386            ProcessingGroup::Error => "error",
387            ProcessingGroup::Session => "session",
388            ProcessingGroup::StandaloneAttachments => "standalone_attachment",
389            ProcessingGroup::StandaloneUserReports => "standalone_user_reports",
390            ProcessingGroup::StandaloneProfiles => "standalone_profiles",
391            ProcessingGroup::ClientReport => "client_report",
392            ProcessingGroup::Replay => "replay",
393            ProcessingGroup::CheckIn => "check_in",
394            ProcessingGroup::Log => "log",
395            ProcessingGroup::TraceMetric => "trace_metric",
396            ProcessingGroup::Span => "span",
397            ProcessingGroup::SpanV2 => "span_v2",
398            ProcessingGroup::ProfileChunk => "profile_chunk",
399            ProcessingGroup::TraceAttachment => "trace_attachment",
400            ProcessingGroup::ForwardUnknown => "forward_unknown",
401            ProcessingGroup::Ungrouped => "ungrouped",
402        }
403    }
404}
405
406impl From<ProcessingGroup> for AppFeature {
407    fn from(value: ProcessingGroup) -> Self {
408        match value {
409            ProcessingGroup::Transaction => AppFeature::Transactions,
410            ProcessingGroup::Error => AppFeature::Errors,
411            ProcessingGroup::Session => AppFeature::Sessions,
412            ProcessingGroup::StandaloneAttachments => AppFeature::UnattributedEnvelope,
413            ProcessingGroup::StandaloneUserReports => AppFeature::UserReports,
414            ProcessingGroup::StandaloneProfiles => AppFeature::Profiles,
415            ProcessingGroup::ClientReport => AppFeature::ClientReports,
416            ProcessingGroup::Replay => AppFeature::Replays,
417            ProcessingGroup::CheckIn => AppFeature::CheckIns,
418            ProcessingGroup::Log => AppFeature::Logs,
419            ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
420            ProcessingGroup::Span => AppFeature::Spans,
421            ProcessingGroup::SpanV2 => AppFeature::Spans,
422            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
423            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
424            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
425            ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
426        }
427    }
428}
429
430/// An error returned when handling [`ProcessEnvelope`].
431#[derive(Debug, thiserror::Error)]
432pub enum ProcessingError {
433    #[error("invalid json in event")]
434    InvalidJson(#[source] serde_json::Error),
435
436    #[error("invalid message pack event payload")]
437    InvalidMsgpack(#[from] rmp_serde::decode::Error),
438
439    #[error("invalid unreal crash report")]
440    InvalidUnrealReport(#[source] Unreal4Error),
441
442    #[error("event payload too large")]
443    PayloadTooLarge(DiscardItemType),
444
445    #[error("invalid transaction event")]
446    InvalidTransaction,
447
448    #[error("the item is not allowed/supported in this envelope")]
449    UnsupportedItem,
450
451    #[error("envelope processor failed")]
452    ProcessingFailed(#[from] ProcessingAction),
453
454    #[error("duplicate {0} in event")]
455    DuplicateItem(ItemType),
456
457    #[error("failed to extract event payload")]
458    NoEventPayload,
459
460    #[error("missing project id in DSN")]
461    MissingProjectId,
462
463    #[error("invalid security report type: {0:?}")]
464    InvalidSecurityType(Bytes),
465
466    #[error("unsupported security report type")]
467    UnsupportedSecurityType,
468
469    #[error("invalid security report")]
470    InvalidSecurityReport(#[source] serde_json::Error),
471
472    #[error("event filtered with reason: {0:?}")]
473    EventFiltered(FilterStatKey),
474
475    #[error("could not serialize event payload")]
476    SerializeFailed(#[source] serde_json::Error),
477
478    #[cfg(feature = "processing")]
479    #[error("failed to apply quotas")]
480    QuotasFailed(#[from] RateLimitingError),
481
482    #[error("nintendo switch dying message processing failed {0:?}")]
483    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
484
485    #[cfg(all(sentry, feature = "processing"))]
486    #[error("playstation dump processing failed: {0}")]
487    InvalidPlaystationDump(String),
488
489    #[error("processing group does not match specific processor")]
490    ProcessingGroupMismatch,
491    #[error("new processing pipeline failed")]
492    ProcessingFailure,
493
494    #[cfg(feature = "processing")]
495    #[error("invalid attachment reference")]
496    InvalidAttachmentRef,
497
498    #[error("could not determine processing group for envelope items")]
499    NoProcessingGroup,
500}
501
502impl ProcessingError {
503    pub fn to_outcome(&self) -> Option<Outcome> {
504        match self {
505            Self::PayloadTooLarge(payload_type) => {
506                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
507            }
508            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
509            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
510            Self::InvalidSecurityType(_) => {
511                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
512            }
513            Self::UnsupportedItem => Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)),
514            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
515            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
516            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
517            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
518            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
519            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
520            #[cfg(all(sentry, feature = "processing"))]
521            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
522            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
523                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
524            }
525            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
526            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
527                Some(Outcome::Invalid(DiscardReason::Internal))
528            }
529            #[cfg(feature = "processing")]
530            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
531            Self::MissingProjectId => None,
532            Self::EventFiltered(key) => Some(Outcome::Filtered(key.clone())),
533
534            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
535            // Outcomes are emitted in the new processing pipeline already.
536            Self::ProcessingFailure => None,
537            #[cfg(feature = "processing")]
538            Self::InvalidAttachmentRef => {
539                Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
540            }
541            Self::NoProcessingGroup => Some(Outcome::Invalid(DiscardReason::Internal)),
542        }
543    }
544
545    fn is_unexpected(&self) -> bool {
546        self.to_outcome()
547            .is_some_and(|outcome| outcome.is_unexpected())
548    }
549}
550
551impl From<Unreal4Error> for ProcessingError {
552    fn from(err: Unreal4Error) -> Self {
553        match err.kind() {
554            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
555            _ => ProcessingError::InvalidUnrealReport(err),
556        }
557    }
558}
559
560/// A container for extracted metrics during processing.
561///
562/// The container enforces that the extracted metrics are correctly tagged
563/// with the dynamic sampling decision.
564#[derive(Debug)]
565pub struct ProcessingExtractedMetrics {
566    metrics: ExtractedMetrics,
567}
568
569impl ProcessingExtractedMetrics {
570    pub fn new() -> Self {
571        Self {
572            metrics: ExtractedMetrics::default(),
573        }
574    }
575
576    pub fn into_inner(self) -> ExtractedMetrics {
577        self.metrics
578    }
579
580    /// Extends the contained metrics with [`ExtractedMetrics`].
581    pub fn extend(
582        &mut self,
583        extracted: ExtractedMetrics,
584        sampling_decision: Option<SamplingDecision>,
585    ) {
586        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
587        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
588    }
589
590    /// Extends the contained project metrics.
591    pub fn extend_project_metrics<I>(
592        &mut self,
593        buckets: I,
594        sampling_decision: Option<SamplingDecision>,
595    ) where
596        I: IntoIterator<Item = Bucket>,
597    {
598        self.metrics
599            .project_metrics
600            .extend(buckets.into_iter().map(|mut bucket| {
601                bucket.metadata.extracted_from_indexed =
602                    sampling_decision == Some(SamplingDecision::Keep);
603                bucket
604            }));
605    }
606
607    /// Extends the contained sampling metrics.
608    pub fn extend_sampling_metrics<I>(
609        &mut self,
610        buckets: I,
611        sampling_decision: Option<SamplingDecision>,
612    ) where
613        I: IntoIterator<Item = Bucket>,
614    {
615        self.metrics
616            .sampling_metrics
617            .extend(buckets.into_iter().map(|mut bucket| {
618                bucket.metadata.extracted_from_indexed =
619                    sampling_decision == Some(SamplingDecision::Keep);
620                bucket
621            }));
622    }
623}
624
625fn send_metrics(
626    metrics: ExtractedMetrics,
627    project_key: ProjectKey,
628    sampling_key: Option<ProjectKey>,
629    aggregator: &Addr<Aggregator>,
630) {
631    let ExtractedMetrics {
632        project_metrics,
633        sampling_metrics,
634    } = metrics;
635
636    if !project_metrics.is_empty() {
637        aggregator.send(MergeBuckets {
638            project_key,
639            buckets: project_metrics,
640        });
641    }
642
643    if !sampling_metrics.is_empty() {
644        // If no sampling project state is available, we associate the sampling
645        // metrics with the current project.
646        //
647        // project_without_tracing         -> metrics goes to self
648        // dependent_project_with_tracing  -> metrics goes to root
649        // root_project_with_tracing       -> metrics goes to root == self
650        let sampling_project_key = sampling_key.unwrap_or(project_key);
651        aggregator.send(MergeBuckets {
652            project_key: sampling_project_key,
653            buckets: sampling_metrics,
654        });
655    }
656}
657
658/// Applies processing to all contents of the given envelope.
659///
660/// Depending on the contents of the envelope and Relay's mode, this includes:
661///
662///  - Basic normalization and validation for all item types.
663///  - Clock drift correction if the required `sent_at` header is present.
664///  - Expansion of certain item types (e.g. unreal).
665///  - Store normalization for event payloads in processing mode.
666///  - Rate limiters and inbound filters on events in processing mode.
667#[derive(Debug)]
668pub struct ProcessEnvelope {
669    /// Envelope to process.
670    pub envelope: ManagedEnvelope,
671    /// The project info.
672    pub project_info: Arc<ProjectInfo>,
673    /// Currently active cached rate limits for this project.
674    pub rate_limits: Arc<RateLimits>,
675    /// Root sampling project info.
676    pub sampling_project_info: Option<Arc<ProjectInfo>>,
677    /// Sampling reservoir counters.
678    pub reservoir_counters: ReservoirCounters,
679}
680
681/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
682#[derive(Debug)]
683struct ProcessEnvelopeGrouped<'a> {
684    /// The group the envelope belongs to.
685    pub group: ProcessingGroup,
686    /// Envelope to process.
687    pub envelope: ManagedEnvelope,
688    /// The processing context.
689    pub ctx: processing::Context<'a>,
690}
691
692/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
693///
694/// This parses and validates the metrics:
695///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
696///    ignored independently.
697///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
698///    dropped together on parsing failure.
699///  - Other envelope items will be ignored with an error message.
700///
701/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
702/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
703#[derive(Debug)]
704pub struct ProcessMetrics {
705    /// A list of metric items.
706    pub data: MetricData,
707    /// The target project.
708    pub project_key: ProjectKey,
709    /// Whether to keep or reset the metric metadata.
710    pub source: BucketSource,
711    /// The wall clock time at which the request was received.
712    pub received_at: DateTime<Utc>,
713    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
714    /// correction.
715    pub sent_at: Option<DateTime<Utc>>,
716}
717
718/// Raw unparsed metric data.
719#[derive(Debug)]
720pub enum MetricData {
721    /// Raw data, unparsed envelope items.
722    Raw(Vec<Item>),
723    /// Already parsed buckets but unprocessed.
724    Parsed(Vec<Bucket>),
725}
726
727impl MetricData {
728    /// Consumes the metric data and parses the contained buckets.
729    ///
730    /// If the contained data is already parsed the buckets are returned unchanged.
731    /// Raw buckets are parsed and created with the passed `timestamp`.
732    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
733        let items = match self {
734            Self::Parsed(buckets) => return buckets,
735            Self::Raw(items) => items,
736        };
737
738        let mut buckets = Vec::new();
739        for item in items {
740            let payload = item.payload();
741            if item.ty() == &ItemType::Statsd {
742                for bucket_result in Bucket::parse_all(&payload, timestamp) {
743                    match bucket_result {
744                        Ok(bucket) => buckets.push(bucket),
745                        Err(error) => relay_log::debug!(
746                            error = &error as &dyn Error,
747                            "failed to parse metric bucket from statsd format",
748                        ),
749                    }
750                }
751            } else if item.ty() == &ItemType::MetricBuckets {
752                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
753                    Ok(parsed_buckets) => {
754                        // Re-use the allocation of `b` if possible.
755                        if buckets.is_empty() {
756                            buckets = parsed_buckets;
757                        } else {
758                            buckets.extend(parsed_buckets);
759                        }
760                    }
761                    Err(error) => {
762                        relay_log::debug!(
763                            error = &error as &dyn Error,
764                            "failed to parse metric bucket",
765                        );
766                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
767                    }
768                }
769            } else {
770                relay_log::error!(
771                    "invalid item of type {} passed to ProcessMetrics",
772                    item.ty()
773                );
774            }
775        }
776        buckets
777    }
778}
779
780#[derive(Debug)]
781pub struct ProcessBatchedMetrics {
782    /// Metrics payload in JSON format.
783    pub payload: Bytes,
784    /// Whether to keep or reset the metric metadata.
785    pub source: BucketSource,
786    /// The wall clock time at which the request was received.
787    pub received_at: DateTime<Utc>,
788    /// The wall clock time at which the request was received.
789    pub sent_at: Option<DateTime<Utc>>,
790}
791
792/// Source information where a metric bucket originates from.
793#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
794pub enum BucketSource {
795    /// The metric bucket originated from an internal Relay use case.
796    ///
797    /// The metric bucket originates either from within the same Relay
798    /// or was accepted coming from another Relay which is registered as
799    /// an internal Relay via Relay's configuration.
800    Internal,
801    /// The bucket source originated from an untrusted source.
802    ///
803    /// Managed Relays sending extracted metrics are considered external,
804    /// it's a project use case but it comes from an untrusted source.
805    External,
806}
807
808impl BucketSource {
809    /// Infers the bucket source from [`RequestMeta::request_trust`].
810    pub fn from_meta(meta: &RequestMeta) -> Self {
811        match meta.request_trust() {
812            RequestTrust::Trusted => Self::Internal,
813            RequestTrust::Untrusted => Self::External,
814        }
815    }
816}
817
818/// Sends a client report to the upstream.
819#[derive(Debug)]
820pub struct SubmitClientReports {
821    /// The client report to be sent.
822    pub client_reports: Vec<ClientReport>,
823    /// Scoping information for the client report.
824    pub scoping: Scoping,
825}
826
827/// CPU-intensive processing tasks for envelopes.
828#[derive(Debug)]
829pub enum EnvelopeProcessor {
830    ProcessEnvelope(Box<ProcessEnvelope>),
831    ProcessProjectMetrics(Box<ProcessMetrics>),
832    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
833    FlushBuckets(Box<FlushBuckets>),
834    SubmitClientReports(Box<SubmitClientReports>),
835}
836
837impl EnvelopeProcessor {
838    /// Returns the name of the message variant.
839    pub fn variant(&self) -> &'static str {
840        match self {
841            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
842            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
843            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
844            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
845            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
846        }
847    }
848}
849
850impl relay_system::Interface for EnvelopeProcessor {}
851
852impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
853    type Response = relay_system::NoResponse;
854
855    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
856        Self::ProcessEnvelope(Box::new(message))
857    }
858}
859
860impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
861    type Response = NoResponse;
862
863    fn from_message(message: ProcessMetrics, _: ()) -> Self {
864        Self::ProcessProjectMetrics(Box::new(message))
865    }
866}
867
868impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
869    type Response = NoResponse;
870
871    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
872        Self::ProcessBatchedMetrics(Box::new(message))
873    }
874}
875
876impl FromMessage<FlushBuckets> for EnvelopeProcessor {
877    type Response = NoResponse;
878
879    fn from_message(message: FlushBuckets, _: ()) -> Self {
880        Self::FlushBuckets(Box::new(message))
881    }
882}
883
884impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
885    type Response = NoResponse;
886
887    fn from_message(message: SubmitClientReports, _: ()) -> Self {
888        Self::SubmitClientReports(Box::new(message))
889    }
890}
891
892/// The asynchronous thread pool used for scheduling processing tasks in the processor.
893pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
894
895/// Service implementing the [`EnvelopeProcessor`] interface.
896///
897/// This service handles messages in a worker pool with configurable concurrency.
898#[derive(Clone)]
899pub struct EnvelopeProcessorService {
900    inner: Arc<InnerProcessor>,
901}
902
903/// Contains the addresses of services that the processor publishes to.
904pub struct Addrs {
905    pub outcome_aggregator: Addr<TrackOutcome>,
906    pub upstream_relay: Addr<UpstreamRelay>,
907    #[cfg(feature = "processing")]
908    pub objectstore: Option<Addr<Objectstore>>,
909    #[cfg(feature = "processing")]
910    pub store_forwarder: Option<Addr<Store>>,
911    pub aggregator: Addr<Aggregator>,
912}
913
914impl Default for Addrs {
915    fn default() -> Self {
916        Addrs {
917            outcome_aggregator: Addr::dummy(),
918            upstream_relay: Addr::dummy(),
919            #[cfg(feature = "processing")]
920            objectstore: None,
921            #[cfg(feature = "processing")]
922            store_forwarder: None,
923            aggregator: Addr::dummy(),
924        }
925    }
926}
927
928struct InnerProcessor {
929    pool: EnvelopeProcessorServicePool,
930    config: Arc<Config>,
931    global_config: GlobalConfigHandle,
932    project_cache: ProjectCacheHandle,
933    cogs: Cogs,
934    addrs: Addrs,
935    #[cfg(feature = "processing")]
936    rate_limiter: Option<Arc<RedisRateLimiter>>,
937    metric_outcomes: MetricOutcomes,
938    processing: Processing,
939}
940
941struct Processing {
942    errors: ErrorsProcessor,
943    logs: LogsProcessor,
944    trace_metrics: TraceMetricsProcessor,
945    spans: SpansProcessor,
946    legacy_spans: LegacySpansProcessor,
947    check_ins: CheckInsProcessor,
948    sessions: SessionsProcessor,
949    transactions: TransactionProcessor,
950    profile_chunks: ProfileChunksProcessor,
951    trace_attachments: TraceAttachmentsProcessor,
952    replays: ReplaysProcessor,
953    client_reports: ClientReportsProcessor,
954    attachments: AttachmentProcessor,
955    user_reports: UserReportsProcessor,
956    profiles: ProfilesProcessor,
957    forward_unknown: ForwardUnknownProcessor,
958}
959
960impl EnvelopeProcessorService {
961    /// Creates a multi-threaded envelope processor.
962    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
963    pub fn new(
964        pool: EnvelopeProcessorServicePool,
965        config: Arc<Config>,
966        global_config: GlobalConfigHandle,
967        project_cache: ProjectCacheHandle,
968        cogs: Cogs,
969        #[cfg(feature = "processing")] redis: Option<RedisClients>,
970        addrs: Addrs,
971        metric_outcomes: MetricOutcomes,
972    ) -> Self {
973        let geoip_lookup = config
974            .geoip_path()
975            .and_then(
976                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
977                    Ok(geoip) => Some(geoip),
978                    Err(err) => {
979                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
980                        None
981                    }
982                },
983            )
984            .unwrap_or_else(GeoIpLookup::empty);
985
986        if let Some(build_epoch) = geoip_lookup.build_epoch() {
987            relay_log::info!("Loaded GeoIP database (build: {build_epoch})");
988        }
989
990        #[cfg(feature = "processing")]
991        let rate_limiter = redis.as_ref().map(|redis| {
992            RedisRateLimiter::new(redis.quotas.clone())
993                .max_limit(config.max_rate_limit())
994                .cache(config.quota_cache_ratio(), config.quota_cache_max())
995        });
996
997        let quota_limiter = Arc::new(QuotaRateLimiter::new(
998            #[cfg(feature = "processing")]
999            project_cache.clone(),
1000            #[cfg(feature = "processing")]
1001            rate_limiter.clone(),
1002        ));
1003        #[cfg(feature = "processing")]
1004        let rate_limiter = rate_limiter.map(Arc::new);
1005        let outcome_aggregator = addrs.outcome_aggregator.clone();
1006        let inner = InnerProcessor {
1007            pool,
1008            global_config,
1009            project_cache,
1010            cogs,
1011            #[cfg(feature = "processing")]
1012            rate_limiter,
1013            addrs,
1014            metric_outcomes,
1015            processing: Processing {
1016                errors: ErrorsProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1017                logs: LogsProcessor::new(Arc::clone(&quota_limiter)),
1018                trace_metrics: TraceMetricsProcessor::new(Arc::clone(&quota_limiter)),
1019                spans: SpansProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1020                legacy_spans: LegacySpansProcessor::new(
1021                    Arc::clone(&quota_limiter),
1022                    geoip_lookup.clone(),
1023                ),
1024                check_ins: CheckInsProcessor::new(Arc::clone(&quota_limiter)),
1025                sessions: SessionsProcessor::new(Arc::clone(&quota_limiter)),
1026                transactions: TransactionProcessor::new(
1027                    Arc::clone(&quota_limiter),
1028                    geoip_lookup.clone(),
1029                    #[cfg(feature = "processing")]
1030                    redis.map(|r| r.quotas),
1031                    #[cfg(not(feature = "processing"))]
1032                    None,
1033                ),
1034                profile_chunks: ProfileChunksProcessor::new(Arc::clone(&quota_limiter)),
1035                trace_attachments: TraceAttachmentsProcessor::new(Arc::clone(&quota_limiter)),
1036                replays: ReplaysProcessor::new(Arc::clone(&quota_limiter), geoip_lookup),
1037                client_reports: ClientReportsProcessor::new(outcome_aggregator),
1038                attachments: AttachmentProcessor::new(Arc::clone(&quota_limiter)),
1039                user_reports: UserReportsProcessor::new(Arc::clone(&quota_limiter)),
1040                profiles: ProfilesProcessor::new(quota_limiter),
1041                forward_unknown: ForwardUnknownProcessor::new(),
1042            },
1043            config,
1044        };
1045
1046        Self {
1047            inner: Arc::new(inner),
1048        }
1049    }
1050
1051    async fn process_with_processor<P: processing::Processor>(
1052        &self,
1053        processor: &P,
1054        mut managed_envelope: ManagedEnvelope,
1055        ctx: processing::Context<'_>,
1056    ) -> Result<Output<Outputs>, ProcessingError>
1057    where
1058        Outputs: From<P::Output>,
1059    {
1060        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1061            debug_assert!(
1062                false,
1063                "there must be work for the {} processor",
1064                std::any::type_name::<P>(),
1065            );
1066            return Err(ProcessingError::ProcessingGroupMismatch);
1067        };
1068
1069        managed_envelope.update();
1070        match managed_envelope.envelope().is_empty() {
1071            true => managed_envelope.accept(),
1072            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1073        }
1074
1075        processor
1076            .process(work, ctx)
1077            .await
1078            .map_err(|err| {
1079                relay_log::debug!(
1080                    error = &err as &dyn std::error::Error,
1081                    "processing pipeline failed"
1082                );
1083                ProcessingError::ProcessingFailure
1084            })
1085            .map(|o| o.map(Into::into))
1086    }
1087
1088    async fn process_envelope(
1089        &self,
1090        project_id: ProjectId,
1091        message: ProcessEnvelopeGrouped<'_>,
1092    ) -> Result<Output<Outputs>, ProcessingError> {
1093        let ProcessEnvelopeGrouped {
1094            group,
1095            envelope: mut managed_envelope,
1096            ctx,
1097        } = message;
1098
1099        // Pre-process the envelope headers.
1100        if let Some(sampling_state) = ctx.sampling_project_info {
1101            // Both transactions and standalone span envelopes need a normalized DSC header
1102            // to make sampling rules based on the segment/transaction name work correctly.
1103            managed_envelope
1104                .envelope_mut()
1105                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1106        }
1107
1108        // Set the event retention. Effectively, this value will only be available in processing
1109        // mode when the full project config is queried from the upstream.
1110        if let Some(retention) = ctx.project_info.config.event_retention {
1111            managed_envelope.envelope_mut().set_retention(retention);
1112        }
1113
1114        // Set the event retention. Effectively, this value will only be available in processing
1115        // mode when the full project config is queried from the upstream.
1116        if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1117            managed_envelope
1118                .envelope_mut()
1119                .set_downsampled_retention(retention);
1120        }
1121
1122        // Ensure the project ID is updated to the stored instance for this project cache. This can
1123        // differ in two cases:
1124        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
1125        //  2. The DSN was moved and the envelope sent to the old project ID.
1126        managed_envelope
1127            .envelope_mut()
1128            .meta_mut()
1129            .set_project_id(project_id);
1130
1131        relay_log::trace!("Processing {group} group", group = group.variant());
1132
1133        match group {
1134            ProcessingGroup::Error => {
1135                self.process_with_processor(&self.inner.processing.errors, managed_envelope, ctx)
1136                    .await
1137            }
1138            ProcessingGroup::Transaction => {
1139                self.process_with_processor(
1140                    &self.inner.processing.transactions,
1141                    managed_envelope,
1142                    ctx,
1143                )
1144                .await
1145            }
1146            ProcessingGroup::Session => {
1147                self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1148                    .await
1149            }
1150            ProcessingGroup::StandaloneAttachments => {
1151                self.process_with_processor(
1152                    &self.inner.processing.attachments,
1153                    managed_envelope,
1154                    ctx,
1155                )
1156                .await
1157            }
1158            ProcessingGroup::StandaloneUserReports => {
1159                self.process_with_processor(
1160                    &self.inner.processing.user_reports,
1161                    managed_envelope,
1162                    ctx,
1163                )
1164                .await
1165            }
1166            ProcessingGroup::StandaloneProfiles => {
1167                self.process_with_processor(&self.inner.processing.profiles, managed_envelope, ctx)
1168                    .await
1169            }
1170            ProcessingGroup::ClientReport => {
1171                self.process_with_processor(
1172                    &self.inner.processing.client_reports,
1173                    managed_envelope,
1174                    ctx,
1175                )
1176                .await
1177            }
1178            ProcessingGroup::Replay => {
1179                self.process_with_processor(&self.inner.processing.replays, managed_envelope, ctx)
1180                    .await
1181            }
1182            ProcessingGroup::CheckIn => {
1183                self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1184                    .await
1185            }
1186            ProcessingGroup::Log => {
1187                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1188                    .await
1189            }
1190            ProcessingGroup::TraceMetric => {
1191                self.process_with_processor(
1192                    &self.inner.processing.trace_metrics,
1193                    managed_envelope,
1194                    ctx,
1195                )
1196                .await
1197            }
1198            ProcessingGroup::SpanV2 => {
1199                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1200                    .await
1201            }
1202            ProcessingGroup::TraceAttachment => {
1203                self.process_with_processor(
1204                    &self.inner.processing.trace_attachments,
1205                    managed_envelope,
1206                    ctx,
1207                )
1208                .await
1209            }
1210            ProcessingGroup::Span => {
1211                self.process_with_processor(
1212                    &self.inner.processing.legacy_spans,
1213                    managed_envelope,
1214                    ctx,
1215                )
1216                .await
1217            }
1218            ProcessingGroup::ProfileChunk => {
1219                self.process_with_processor(
1220                    &self.inner.processing.profile_chunks,
1221                    managed_envelope,
1222                    ctx,
1223                )
1224                .await
1225            }
1226            // Ungrouped items indicate bugs, as all items should have an associated processor,
1227            // or be handled separately (like metrics).
1228            ProcessingGroup::Ungrouped => {
1229                relay_log::error!(
1230                    tags.project = %project_id,
1231                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
1232                    "could not identify the processing group based on the envelope's items"
1233                );
1234
1235                Err(ProcessingError::NoProcessingGroup)
1236            }
1237            // Leave this group unchanged.
1238            //
1239            // This will later be forwarded to upstream.
1240            ProcessingGroup::ForwardUnknown => {
1241                self.process_with_processor(
1242                    &self.inner.processing.forward_unknown,
1243                    managed_envelope,
1244                    ctx,
1245                )
1246                .await
1247            }
1248        }
1249    }
1250
1251    /// Processes the envelope and returns the processed envelope back.
1252    ///
1253    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
1254    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
1255    /// to be dropped, this is `None`.
1256    async fn process<'a>(
1257        &self,
1258        mut message: ProcessEnvelopeGrouped<'a>,
1259    ) -> Result<Option<(Outputs, ForwardContext<'a>)>, ProcessingError> {
1260        let ProcessEnvelopeGrouped {
1261            ref mut envelope,
1262            ctx,
1263            ..
1264        } = message;
1265
1266        // Prefer the project's project ID, and fall back to the stated project id from the
1267        // envelope. The project ID is available in all modes, other than in proxy mode, where
1268        // envelopes for unknown projects are forwarded blindly.
1269        //
1270        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
1271        // since we cannot process an envelope without project ID, so drop it.
1272        let Some(project_id) = ctx
1273            .project_info
1274            .project_id
1275            .or_else(|| envelope.envelope().meta().project_id())
1276        else {
1277            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1278            return Err(ProcessingError::MissingProjectId);
1279        };
1280
1281        let client = envelope.envelope().meta().client().map(str::to_owned);
1282        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1283        let project_key = envelope.envelope().meta().public_key();
1284        // Only allow sending to the sampling key, if we successfully loaded a sampling project
1285        // info relating to it. This filters out unknown/invalid project keys as well as project
1286        // keys from different organizations.
1287        let sampling_key = envelope
1288            .envelope()
1289            .sampling_key()
1290            .filter(|_| ctx.sampling_project_info.is_some());
1291
1292        // We set additional information on the scope, which will be removed after processing the
1293        // envelope.
1294        relay_log::configure_scope(|scope| {
1295            scope.set_tag("project", project_id);
1296            if let Some(client) = client {
1297                scope.set_tag("sdk", client);
1298            }
1299            if let Some(user_agent) = user_agent {
1300                scope.set_extra("user_agent", user_agent.into());
1301            }
1302        });
1303
1304        let result =
1305            self.process_envelope(project_id, message)
1306                .await
1307                .map(|Output { main, metrics }| {
1308                    if let Some(metrics) = metrics {
1309                        metrics.accept(|metrics| {
1310                            send_metrics(
1311                                metrics,
1312                                project_key,
1313                                sampling_key,
1314                                &self.inner.addrs.aggregator,
1315                            );
1316                        });
1317                    }
1318
1319                    let ctx = ctx.to_forward();
1320                    main.map(|output| (output, ctx))
1321                });
1322
1323        relay_log::configure_scope(|scope| {
1324            scope.remove_tag("project");
1325            scope.remove_tag("sdk");
1326            scope.remove_tag("user_agent");
1327        });
1328
1329        result
1330    }
1331
1332    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1333        let project_key = message.envelope.envelope().meta().public_key();
1334        let wait_time = message.envelope.age();
1335        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1336
1337        // This COGS handling may need an overhaul in the future:
1338        // Cancel the passed in token, to start individual measurements per envelope instead.
1339        cogs.cancel();
1340
1341        let scoping = message.envelope.scoping();
1342        for (group, envelope) in ProcessingGroup::split_envelope(
1343            *message.envelope.into_envelope(),
1344            &message.project_info,
1345        ) {
1346            let mut cogs = self
1347                .inner
1348                .cogs
1349                .timed(ResourceId::Relay, AppFeature::from(group));
1350
1351            let mut envelope =
1352                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1353            envelope.scope(scoping);
1354
1355            let global_config = self.inner.global_config.current();
1356
1357            let ctx = processing::Context {
1358                config: &self.inner.config,
1359                global_config: &global_config,
1360                project_info: &message.project_info,
1361                sampling_project_info: message.sampling_project_info.as_deref(),
1362                rate_limits: &message.rate_limits,
1363                reservoir_counters: &message.reservoir_counters,
1364            };
1365
1366            let message = ProcessEnvelopeGrouped {
1367                group,
1368                envelope,
1369                ctx,
1370            };
1371
1372            let result = metric!(
1373                timer(RelayTimers::EnvelopeProcessingTime),
1374                group = group.variant(),
1375                { self.process(message).await }
1376            );
1377
1378            match result {
1379                Ok(Some((output, ctx))) => self.submit_upstream(&mut cogs, output, ctx),
1380                Ok(None) => {}
1381                Err(error) if error.is_unexpected() => {
1382                    relay_log::error!(
1383                        tags.project_key = %project_key,
1384                        error = &error as &dyn Error,
1385                        "error processing envelope"
1386                    )
1387                }
1388                Err(error) => {
1389                    relay_log::debug!(
1390                        tags.project_key = %project_key,
1391                        error = &error as &dyn Error,
1392                        "error processing envelope"
1393                    )
1394                }
1395            }
1396        }
1397    }
1398
1399    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1400        let ProcessMetrics {
1401            data,
1402            project_key,
1403            received_at,
1404            sent_at,
1405            source,
1406        } = message;
1407
1408        let received_timestamp =
1409            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1410
1411        let mut buckets = data.into_buckets(received_timestamp);
1412        if buckets.is_empty() {
1413            return;
1414        };
1415        cogs.update(relay_metrics::cogs::BySize(&buckets));
1416
1417        let clock_drift_processor =
1418            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
1419
1420        buckets.retain_mut(|bucket| {
1421            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
1422                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
1423                return false;
1424            }
1425
1426            if !self::metrics::is_valid_namespace(bucket) {
1427                return false;
1428            }
1429
1430            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
1431
1432            if !matches!(source, BucketSource::Internal) {
1433                bucket.metadata = BucketMetadata::new(received_timestamp);
1434            }
1435
1436            true
1437        });
1438
1439        let project = self.inner.project_cache.get(project_key);
1440
1441        // Best effort check to filter and rate limit buckets, if there is no project state
1442        // available at the current time, we will check again after flushing.
1443        let buckets = match project.state() {
1444            ProjectState::Enabled(project_info) => {
1445                let rate_limits = project.rate_limits().current_limits();
1446                self.check_buckets(project_key, project_info, &rate_limits, buckets)
1447            }
1448            _ => buckets,
1449        };
1450
1451        relay_log::trace!("merging metric buckets into the aggregator");
1452        self.inner
1453            .addrs
1454            .aggregator
1455            .send(MergeBuckets::new(project_key, buckets));
1456    }
1457
1458    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
1459        let ProcessBatchedMetrics {
1460            payload,
1461            source,
1462            received_at,
1463            sent_at,
1464        } = message;
1465
1466        #[derive(serde::Deserialize)]
1467        struct Wrapper {
1468            buckets: HashMap<ProjectKey, Vec<Bucket>>,
1469        }
1470
1471        let buckets = match serde_json::from_slice(&payload) {
1472            Ok(Wrapper { buckets }) => buckets,
1473            Err(error) => {
1474                relay_log::debug!(
1475                    error = &error as &dyn Error,
1476                    "failed to parse batched metrics",
1477                );
1478                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1479                return;
1480            }
1481        };
1482
1483        for (project_key, buckets) in buckets {
1484            self.handle_process_metrics(
1485                cogs,
1486                ProcessMetrics {
1487                    data: MetricData::Parsed(buckets),
1488                    project_key,
1489                    source,
1490                    received_at,
1491                    sent_at,
1492                },
1493            )
1494        }
1495    }
1496
1497    /// Submits a processor [`Output`] to the appropriate upstream.
1498    ///
1499    /// If processing is enabled, the upstream is Kafka.
1500    fn submit_upstream(
1501        &self,
1502        cogs: &mut Token,
1503        output: Outputs,
1504        ctx: processing::ForwardContext<'_>,
1505    ) {
1506        let _submit = cogs.start_category("submit");
1507
1508        #[cfg(feature = "processing")]
1509        if ctx.config.processing_enabled()
1510            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
1511        {
1512            use crate::processing::StoreHandle;
1513
1514            let objectstore = self.inner.addrs.objectstore.as_ref();
1515            let handle = StoreHandle::new(store_forwarder, objectstore, ctx.global_config);
1516
1517            output
1518                .forward_store(handle, ctx)
1519                .unwrap_or_else(|err| err.into_inner());
1520
1521            return;
1522        }
1523
1524        match output.serialize_envelope(ctx) {
1525            Ok(envelope) => {
1526                let envelope = ManagedEnvelope::from(envelope);
1527                self.submit_envelope_upstream(envelope, ctx.project_info.upstream.clone());
1528            }
1529            Err(_) => relay_log::error!("failed to serialize output to an envelope"),
1530        };
1531    }
1532
1533    fn submit_envelope_upstream(
1534        &self,
1535        mut envelope: ManagedEnvelope,
1536        // Currently allowed to be optional as code is migrated to respect the upstream override
1537        // provided from the project config. Eventually must be available and is required.
1538        upstream: Option<UpstreamDescriptor>,
1539    ) {
1540        if envelope.envelope_mut().is_empty() {
1541            envelope.accept();
1542            return;
1543        }
1544
1545        // No code path should hit this.
1546        //
1547        // Any item which is produced by processing is handled in `submit_upstream`,
1548        // metrics are sent to the store directly and outcomes must be produced to Kafka
1549        // instead of being sent onward as client report.
1550        if self.inner.config.processing_enabled() {
1551            relay_log::error!(
1552                "attempt to forward envelope to http upstream when processing is enabled"
1553            );
1554            return;
1555        }
1556
1557        // Override the `sent_at` timestamp. Since the envelope went through basic
1558        // normalization, all timestamps have been corrected. We propagate the new
1559        // `sent_at` to allow the next Relay to double-check this timestamp and
1560        // potentially apply correction again. This is done as close to sending as
1561        // possible so that we avoid internal delays.
1562        envelope.envelope_mut().set_sent_at(Utc::now());
1563
1564        relay_log::trace!("sending envelope to sentry endpoint");
1565        let http_encoding = self.inner.config.http_encoding();
1566        let result = envelope.envelope().to_vec().and_then(|v| {
1567            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
1568        });
1569
1570        match result {
1571            Ok(body) => {
1572                self.inner
1573                    .addrs
1574                    .upstream_relay
1575                    .send(SendRequest(SendEnvelope {
1576                        upstream,
1577                        envelope,
1578                        body,
1579                        http_encoding,
1580                        project_cache: self.inner.project_cache.clone(),
1581                    }));
1582            }
1583            Err(error) => {
1584                // Errors are only logged for what we consider an internal discard reason. These
1585                // indicate errors in the infrastructure or implementation bugs.
1586                relay_log::error!(
1587                    error = &error as &dyn Error,
1588                    tags.project_key = %envelope.scoping().project_key,
1589                    "failed to serialize envelope payload"
1590                );
1591
1592                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1593            }
1594        }
1595    }
1596
1597    fn handle_submit_client_reports(&self, message: SubmitClientReports) {
1598        let SubmitClientReports {
1599            client_reports,
1600            scoping,
1601        } = message;
1602
1603        let upstream = self.inner.config.upstream();
1604        let dsn = PartialDsn::outbound(&scoping, upstream);
1605
1606        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
1607        for client_report in client_reports {
1608            match client_report.serialize() {
1609                Ok(payload) => {
1610                    let mut item = Item::new(ItemType::ClientReport);
1611                    item.set_payload(ContentType::Json, payload);
1612                    envelope.add_item(item);
1613                }
1614                Err(error) => {
1615                    relay_log::error!(
1616                        error = &error as &dyn std::error::Error,
1617                        "failed to serialize client report"
1618                    );
1619                }
1620            }
1621        }
1622
1623        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1624        self.submit_envelope_upstream(envelope, None);
1625    }
1626
1627    fn check_buckets(
1628        &self,
1629        project_key: ProjectKey,
1630        project_info: &ProjectInfo,
1631        rate_limits: &RateLimits,
1632        buckets: Vec<Bucket>,
1633    ) -> Vec<Bucket> {
1634        let Some(scoping) = project_info.scoping(project_key) else {
1635            relay_log::error!(
1636                tags.project_key = project_key.as_str(),
1637                "there is no scoping: dropping {} buckets",
1638                buckets.len(),
1639            );
1640            return Vec::new();
1641        };
1642
1643        let mut buckets = self::metrics::apply_project_info(
1644            buckets,
1645            &self.inner.metric_outcomes,
1646            project_info,
1647            scoping,
1648        );
1649
1650        let namespaces: BTreeSet<MetricNamespace> = buckets
1651            .iter()
1652            .filter_map(|bucket| bucket.name.try_namespace())
1653            .collect();
1654
1655        for namespace in namespaces {
1656            let limits = rate_limits.check_with_quotas(
1657                project_info.get_quotas(),
1658                scoping.item(DataCategory::MetricBucket),
1659            );
1660
1661            if limits.is_limited() {
1662                let rejected;
1663                (buckets, rejected) = utils::split_off(buckets, |bucket| {
1664                    bucket.name.try_namespace() == Some(namespace)
1665                });
1666
1667                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1668                self.inner.metric_outcomes.track(
1669                    scoping,
1670                    &rejected,
1671                    Outcome::RateLimited(reason_code),
1672                );
1673            }
1674        }
1675
1676        let quotas = project_info.config.quotas.clone();
1677        match MetricsLimiter::create(buckets, quotas, scoping) {
1678            Ok(mut bucket_limiter) => {
1679                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
1680                bucket_limiter.into_buckets()
1681            }
1682            Err(buckets) => buckets,
1683        }
1684    }
1685
1686    #[cfg(feature = "processing")]
1687    async fn rate_limit_buckets(
1688        &self,
1689        scoping: Scoping,
1690        project_info: &ProjectInfo,
1691        mut buckets: Vec<Bucket>,
1692    ) -> Vec<Bucket> {
1693        let Some(rate_limiter) = &self.inner.rate_limiter else {
1694            return buckets;
1695        };
1696
1697        let global_config = self.inner.global_config.current();
1698        let namespaces = buckets
1699            .iter()
1700            .filter_map(|bucket| bucket.name.try_namespace())
1701            .counts();
1702
1703        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
1704
1705        for (namespace, quantity) in namespaces {
1706            let item_scoping = scoping.metric_bucket(namespace);
1707
1708            let limits = match rate_limiter
1709                .is_rate_limited(quotas, item_scoping, quantity, false)
1710                .await
1711            {
1712                Ok(limits) => limits,
1713                Err(err) => {
1714                    relay_log::error!(
1715                        error = &err as &dyn std::error::Error,
1716                        "failed to check redis rate limits"
1717                    );
1718                    break;
1719                }
1720            };
1721
1722            if limits.is_limited() {
1723                let rejected;
1724                (buckets, rejected) = utils::split_off(buckets, |bucket| {
1725                    bucket.name.try_namespace() == Some(namespace)
1726                });
1727
1728                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1729                self.inner.metric_outcomes.track(
1730                    scoping,
1731                    &rejected,
1732                    Outcome::RateLimited(reason_code),
1733                );
1734
1735                self.inner
1736                    .project_cache
1737                    .get(item_scoping.scoping.project_key)
1738                    .rate_limits()
1739                    .merge(limits);
1740            }
1741        }
1742
1743        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
1744            Err(buckets) => buckets,
1745            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
1746        }
1747    }
1748
1749    /// Check and apply rate limits to metrics buckets for transactions and spans.
1750    #[cfg(feature = "processing")]
1751    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
1752        relay_log::trace!("handle_rate_limit_buckets");
1753
1754        let scoping = *bucket_limiter.scoping();
1755
1756        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
1757            let global_config = self.inner.global_config.current();
1758            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
1759
1760            // We set over_accept_once such that the limit is actually reached, which allows subsequent
1761            // calls with quantity=0 to be rate limited.
1762            let over_accept_once = true;
1763            let mut rate_limits = RateLimits::new();
1764
1765            let (category, count) = bucket_limiter.count();
1766
1767            let timer = Instant::now();
1768            let mut is_limited = false;
1769
1770            if let Some(count) = count {
1771                match rate_limiter
1772                    .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
1773                    .await
1774                {
1775                    Ok(limits) => {
1776                        is_limited = limits.is_limited();
1777                        rate_limits.merge(limits)
1778                    }
1779                    Err(e) => {
1780                        relay_log::error!(error = &e as &dyn Error, "rate limiting error")
1781                    }
1782                }
1783            }
1784
1785            relay_statsd::metric!(
1786                timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
1787                category = category.name(),
1788                limited = if is_limited { "true" } else { "false" },
1789                count = match count {
1790                    None => "none",
1791                    Some(0) => "0",
1792                    Some(1) => "1",
1793                    Some(1..=10) => "10",
1794                    Some(1..=25) => "25",
1795                    Some(1..=50) => "50",
1796                    Some(51..=100) => "100",
1797                    Some(101..=500) => "500",
1798                    _ => "> 500",
1799                },
1800            );
1801
1802            if rate_limits.is_limited() {
1803                let was_enforced =
1804                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
1805
1806                if was_enforced {
1807                    // Update the rate limits in the project cache.
1808                    self.inner
1809                        .project_cache
1810                        .get(scoping.project_key)
1811                        .rate_limits()
1812                        .merge(rate_limits);
1813                }
1814            }
1815        }
1816
1817        bucket_limiter.into_buckets()
1818    }
1819
1820    /// Processes metric buckets and sends them to Kafka.
1821    ///
1822    /// This function runs the following steps:
1823    ///  - rate limiting
1824    ///  - submit to `StoreForwarder`
1825    #[cfg(feature = "processing")]
1826    async fn encode_metrics_processing(
1827        &self,
1828        message: FlushBuckets,
1829        store_forwarder: &Addr<Store>,
1830    ) {
1831        use crate::constants::DEFAULT_EVENT_RETENTION;
1832        use crate::services::store::StoreMetrics;
1833
1834        for ProjectBuckets {
1835            buckets,
1836            scoping,
1837            project_info,
1838            ..
1839        } in message.buckets.into_values()
1840        {
1841            let buckets = self
1842                .rate_limit_buckets(scoping, &project_info, buckets)
1843                .await;
1844
1845            if buckets.is_empty() {
1846                continue;
1847            }
1848
1849            let retention = project_info
1850                .config
1851                .event_retention
1852                .unwrap_or(DEFAULT_EVENT_RETENTION);
1853
1854            // The store forwarder takes care of bucket splitting internally, so we can submit the
1855            // entire list of buckets. There is no batching needed here.
1856            store_forwarder.send(StoreMetrics {
1857                buckets,
1858                scoping,
1859                retention,
1860            });
1861        }
1862    }
1863
1864    /// Serializes metric buckets to JSON and sends them to the upstream.
1865    ///
1866    /// This function runs the following steps:
1867    ///  - partitioning
1868    ///  - batching by configured size limit
1869    ///  - serialize to JSON and pack in an envelope
1870    ///
1871    /// Rate limiting runs only in processing Relays as it requires access to the central Redis instance.
1872    /// Cached rate limits are applied in the project cache already.
1873    fn encode_metrics_envelope(&self, message: FlushBuckets) {
1874        let FlushBuckets {
1875            partition_key,
1876            buckets,
1877        } = message;
1878
1879        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
1880        let upstream = self.inner.config.upstream();
1881
1882        for ProjectBuckets {
1883            buckets,
1884            scoping,
1885            project_info,
1886            ..
1887        } in buckets.values()
1888        {
1889            let dsn = PartialDsn::outbound(scoping, upstream);
1890
1891            relay_statsd::metric!(
1892                distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
1893            );
1894
1895            let mut num_batches = 0;
1896            for batch in BucketsView::from(buckets).by_size(batch_size) {
1897                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
1898
1899                let mut item = Item::new(ItemType::MetricBuckets);
1900                item.set_source_quantities(crate::metrics::extract_quantities(batch));
1901                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
1902                envelope.add_item(item);
1903
1904                let mut envelope =
1905                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1906                envelope
1907                    .set_partition_key(Some(partition_key))
1908                    .scope(*scoping);
1909
1910                relay_statsd::metric!(
1911                    distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
1912                );
1913
1914                self.submit_envelope_upstream(envelope, project_info.upstream.clone());
1915                num_batches += 1;
1916            }
1917
1918            relay_statsd::metric!(
1919                distribution(RelayDistributions::BatchesPerPartition) = num_batches
1920            );
1921        }
1922    }
1923
1924    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
1925    fn send_global_partition(
1926        &self,
1927        upstream: Option<UpstreamDescriptor>,
1928        partition_key: u32,
1929        partition: &mut Partition<'_>,
1930    ) {
1931        if partition.is_empty() {
1932            return;
1933        }
1934
1935        let (unencoded, project_info) = partition.take();
1936        let http_encoding = self.inner.config.http_encoding();
1937        let encoded = match encode_payload(&unencoded, http_encoding) {
1938            Ok(payload) => payload,
1939            Err(error) => {
1940                let error = &error as &dyn std::error::Error;
1941                relay_log::error!(error, "failed to encode metrics payload");
1942                return;
1943            }
1944        };
1945
1946        let request = SendMetricsRequest {
1947            upstream,
1948            partition_key: partition_key.to_string(),
1949            unencoded,
1950            encoded,
1951            project_info,
1952            http_encoding,
1953            metric_outcomes: self.inner.metric_outcomes.clone(),
1954        };
1955
1956        self.inner.addrs.upstream_relay.send(SendRequest(request));
1957    }
1958
1959    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
1960    ///
1961    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
1962    /// payload directly instead of per-project Envelopes.
1963    ///
1964    /// This function runs the following steps:
1965    ///  - partitioning
1966    ///  - batching by configured size limit
1967    ///  - serialize to JSON
1968    ///  - submit directly to the upstream
1969    fn encode_metrics_global(&self, message: FlushBuckets) {
1970        let FlushBuckets {
1971            partition_key,
1972            buckets,
1973        } = message;
1974
1975        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
1976        let mut partitions = BTreeMap::new();
1977        let mut partition_splits = 0;
1978
1979        for ProjectBuckets {
1980            buckets,
1981            scoping,
1982            project_info,
1983            ..
1984        } in buckets.values()
1985        {
1986            let partition = match partitions.get_mut(&project_info.upstream) {
1987                Some(partition) => partition,
1988                None => partitions
1989                    .entry(project_info.upstream.clone())
1990                    .or_insert_with(|| Partition::new(batch_size)),
1991            };
1992
1993            for bucket in buckets {
1994                let mut remaining = Some(BucketView::new(bucket));
1995
1996                while let Some(bucket) = remaining.take() {
1997                    if let Some(next) = partition.insert(bucket, *scoping) {
1998                        // A part of the bucket could not be inserted. Take the partition and submit
1999                        // it immediately. Repeat until the final part was inserted. This should
2000                        // always result in a request, otherwise we would enter an endless loop.
2001                        self.send_global_partition(
2002                            project_info.upstream.clone(),
2003                            partition_key,
2004                            partition,
2005                        );
2006                        remaining = Some(next);
2007                        partition_splits += 1;
2008                    }
2009                }
2010            }
2011        }
2012
2013        if partition_splits > 0 {
2014            metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2015        }
2016
2017        for (upstream, mut partition) in partitions {
2018            self.send_global_partition(upstream, partition_key, &mut partition);
2019        }
2020    }
2021
2022    async fn handle_flush_buckets(&self, mut message: FlushBuckets) {
2023        for (project_key, pb) in message.buckets.iter_mut() {
2024            let buckets = std::mem::take(&mut pb.buckets);
2025            pb.buckets =
2026                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2027        }
2028
2029        #[cfg(feature = "processing")]
2030        if self.inner.config.processing_enabled()
2031            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2032        {
2033            return self
2034                .encode_metrics_processing(message, store_forwarder)
2035                .await;
2036        }
2037
2038        if self.inner.config.http_global_metrics() {
2039            self.encode_metrics_global(message)
2040        } else {
2041            self.encode_metrics_envelope(message)
2042        }
2043    }
2044
2045    #[cfg(all(test, feature = "processing"))]
2046    fn redis_rate_limiter_enabled(&self) -> bool {
2047        self.inner.rate_limiter.is_some()
2048    }
2049
2050    async fn handle_message(&self, message: EnvelopeProcessor) {
2051        let ty = message.variant();
2052        let feature_weights = self.feature_weights(&message);
2053
2054        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2055            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2056
2057            match message {
2058                EnvelopeProcessor::ProcessEnvelope(m) => {
2059                    self.handle_process_envelope(&mut cogs, *m).await
2060                }
2061                EnvelopeProcessor::ProcessProjectMetrics(m) => {
2062                    self.handle_process_metrics(&mut cogs, *m)
2063                }
2064                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2065                    self.handle_process_batched_metrics(&mut cogs, *m)
2066                }
2067                EnvelopeProcessor::FlushBuckets(m) => self.handle_flush_buckets(*m).await,
2068                EnvelopeProcessor::SubmitClientReports(m) => self.handle_submit_client_reports(*m),
2069            }
2070        });
2071    }
2072
2073    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2074        match message {
2075            // Envelope is split later and tokens are attributed then.
2076            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2077            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2078            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2079            EnvelopeProcessor::FlushBuckets(v) => v
2080                .buckets
2081                .values()
2082                .map(|s| {
2083                    if self.inner.config.processing_enabled() {
2084                        // Processing does not encode the metrics but instead rate limit the metrics,
2085                        // which scales by count and not size.
2086                        relay_metrics::cogs::ByCount(&s.buckets).into()
2087                    } else {
2088                        relay_metrics::cogs::BySize(&s.buckets).into()
2089                    }
2090                })
2091                .fold(FeatureWeights::none(), FeatureWeights::merge),
2092            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2093        }
2094    }
2095}
2096
2097impl Service for EnvelopeProcessorService {
2098    type Interface = EnvelopeProcessor;
2099
2100    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2101        while let Some(message) = rx.recv().await {
2102            let service = self.clone();
2103            self.inner
2104                .pool
2105                .spawn_async(
2106                    async move {
2107                        service.handle_message(message).await;
2108                    }
2109                    .boxed(),
2110                )
2111                .await;
2112        }
2113    }
2114}
2115
2116pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2117    let envelope_body: Vec<u8> = match http_encoding {
2118        HttpEncoding::Identity => return Ok(body.clone()),
2119        HttpEncoding::Deflate => {
2120            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2121            encoder.write_all(body.as_ref())?;
2122            encoder.finish()?
2123        }
2124        HttpEncoding::Gzip => {
2125            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2126            encoder.write_all(body.as_ref())?;
2127            encoder.finish()?
2128        }
2129        HttpEncoding::Br => {
2130            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
2131            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2132            encoder.write_all(body.as_ref())?;
2133            encoder.into_inner()
2134        }
2135        HttpEncoding::Zstd => {
2136            // Use the fastest compression level, our main objective here is to get the best
2137            // compression ratio for least amount of time spent.
2138            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2139            encoder.write_all(body.as_ref())?;
2140            encoder.finish()?
2141        }
2142    };
2143
2144    Ok(envelope_body.into())
2145}
2146
2147/// An upstream request that submits an envelope via HTTP.
2148#[derive(Debug)]
2149pub struct SendEnvelope {
2150    pub upstream: Option<UpstreamDescriptor>,
2151    pub envelope: ManagedEnvelope,
2152    pub body: Bytes,
2153    pub http_encoding: HttpEncoding,
2154    pub project_cache: ProjectCacheHandle,
2155}
2156
2157impl UpstreamRequest for SendEnvelope {
2158    fn upstream(&self) -> Option<&UpstreamDescriptor> {
2159        self.upstream.as_ref()
2160    }
2161
2162    fn method(&self) -> reqwest::Method {
2163        reqwest::Method::POST
2164    }
2165
2166    fn path(&self) -> Cow<'_, str> {
2167        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2168    }
2169
2170    fn route(&self) -> &'static str {
2171        "envelope"
2172    }
2173
2174    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2175        let envelope_body = self.body.clone();
2176        metric!(
2177            distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2178        );
2179
2180        let meta = &self.envelope.meta();
2181        let shard = self.envelope.partition_key().map(|p| p.to_string());
2182        builder
2183            .content_encoding(self.http_encoding)
2184            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2185            .header_opt("User-Agent", meta.user_agent())
2186            .header("X-Sentry-Auth", meta.auth_header())
2187            .header("X-Forwarded-For", meta.forwarded_for())
2188            .header("Content-Type", envelope::CONTENT_TYPE)
2189            .header_opt("X-Sentry-Relay-Shard", shard)
2190            .body(envelope_body);
2191
2192        Ok(())
2193    }
2194
2195    fn sign(&mut self) -> Option<Sign> {
2196        Some(Sign::Optional(SignatureType::RequestSign))
2197    }
2198
2199    fn respond(
2200        self: Box<Self>,
2201        result: Result<http::Response, UpstreamRequestError>,
2202    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2203        Box::pin(async move {
2204            let result = match result {
2205                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2206                Err(error) => Err(error),
2207            };
2208
2209            match result {
2210                Ok(()) => self.envelope.accept(),
2211                Err(error) if error.is_received() => {
2212                    let scoping = self.envelope.scoping();
2213                    self.envelope.accept();
2214
2215                    if let UpstreamRequestError::RateLimited(limits) = error {
2216                        self.project_cache
2217                            .get(scoping.project_key)
2218                            .rate_limits()
2219                            .merge(limits.scope(&scoping));
2220                    }
2221                }
2222                Err(error) => {
2223                    // Errors are only logged for what we consider an internal discard reason. These
2224                    // indicate errors in the infrastructure or implementation bugs.
2225                    let mut envelope = self.envelope;
2226                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2227                    relay_log::error!(
2228                        error = &error as &dyn Error,
2229                        tags.project_key = %envelope.scoping().project_key,
2230                        "error sending envelope"
2231                    );
2232                }
2233            }
2234        })
2235    }
2236}
2237
2238/// A container for metric buckets from multiple projects.
2239///
2240/// This container is used to send metrics to the upstream in global batches as part of the
2241/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
2242/// the size of all metrics and allows to split them into multiple batches. See
2243/// [`insert`](Self::insert) for more information.
2244#[derive(Debug)]
2245struct Partition<'a> {
2246    max_size: usize,
2247    remaining: usize,
2248    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2249    project_info: HashMap<ProjectKey, Scoping>,
2250}
2251
2252impl<'a> Partition<'a> {
2253    /// Creates a new partition with the given maximum size in bytes.
2254    pub fn new(size: usize) -> Self {
2255        Self {
2256            max_size: size,
2257            remaining: size,
2258            views: HashMap::new(),
2259            project_info: HashMap::new(),
2260        }
2261    }
2262
2263    /// Inserts a bucket into the partition, splitting it if necessary.
2264    ///
2265    /// This function attempts to add the bucket to this partition. If the bucket does not fit
2266    /// entirely into the partition given its maximum size, the remaining part of the bucket is
2267    /// returned from this function call.
2268    ///
2269    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
2270    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
2271    /// partition. Afterwards, the caller is responsible to call this function again with the
2272    /// remaining bucket until it is fully inserted.
2273    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
2274        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
2275
2276        if let Some(current) = current {
2277            self.remaining = self.remaining.saturating_sub(current.estimated_size());
2278            self.views
2279                .entry(scoping.project_key)
2280                .or_default()
2281                .push(current);
2282
2283            self.project_info
2284                .entry(scoping.project_key)
2285                .or_insert(scoping);
2286        }
2287
2288        next
2289    }
2290
2291    /// Returns `true` if the partition does not hold any data.
2292    fn is_empty(&self) -> bool {
2293        self.views.is_empty()
2294    }
2295
2296    /// Returns the serialized buckets for this partition.
2297    ///
2298    /// This empties the partition, so that it can be reused.
2299    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
2300        #[derive(serde::Serialize)]
2301        struct Wrapper<'a> {
2302            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
2303        }
2304
2305        let buckets = &self.views;
2306        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
2307
2308        let scopings = std::mem::take(&mut self.project_info);
2309
2310        self.views.clear();
2311        self.remaining = self.max_size;
2312
2313        (payload, scopings)
2314    }
2315}
2316
2317/// An upstream request that submits metric buckets via HTTP.
2318///
2319/// This request is not awaited. It automatically tracks outcomes if the request is not received.
2320#[derive(Debug)]
2321struct SendMetricsRequest {
2322    /// Optional upstream override where the request will be sent to.
2323    upstream: Option<UpstreamDescriptor>,
2324    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
2325    partition_key: String,
2326    /// Serialized metric buckets without encoding applied, used for signing.
2327    unencoded: Bytes,
2328    /// Serialized metric buckets with the stated HTTP encoding applied.
2329    encoded: Bytes,
2330    /// Mapping of all contained project keys to their scoping and extraction mode.
2331    ///
2332    /// Used to track outcomes for transmission failures.
2333    project_info: HashMap<ProjectKey, Scoping>,
2334    /// Encoding (compression) of the payload.
2335    http_encoding: HttpEncoding,
2336    /// Metric outcomes instance to send outcomes on error.
2337    metric_outcomes: MetricOutcomes,
2338}
2339
2340impl SendMetricsRequest {
2341    fn create_error_outcomes(self) {
2342        #[derive(serde::Deserialize)]
2343        struct Wrapper {
2344            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
2345        }
2346
2347        let buckets = match serde_json::from_slice(&self.unencoded) {
2348            Ok(Wrapper { buckets }) => buckets,
2349            Err(err) => {
2350                relay_log::error!(
2351                    error = &err as &dyn std::error::Error,
2352                    "failed to parse buckets from failed transmission"
2353                );
2354                return;
2355            }
2356        };
2357
2358        for (key, buckets) in buckets {
2359            let Some(&scoping) = self.project_info.get(&key) else {
2360                relay_log::error!("missing scoping for project key");
2361                continue;
2362            };
2363
2364            self.metric_outcomes.track(
2365                scoping,
2366                &buckets,
2367                Outcome::Invalid(DiscardReason::Internal),
2368            );
2369        }
2370    }
2371}
2372
2373impl UpstreamRequest for SendMetricsRequest {
2374    fn upstream(&self) -> Option<&UpstreamDescriptor> {
2375        self.upstream.as_ref()
2376    }
2377
2378    fn set_relay_id(&self) -> bool {
2379        true
2380    }
2381
2382    fn sign(&mut self) -> Option<Sign> {
2383        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
2384    }
2385
2386    fn method(&self) -> reqwest::Method {
2387        reqwest::Method::POST
2388    }
2389
2390    fn path(&self) -> Cow<'_, str> {
2391        "/api/0/relays/metrics/".into()
2392    }
2393
2394    fn route(&self) -> &'static str {
2395        "global_metrics"
2396    }
2397
2398    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2399        metric!(
2400            distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
2401        );
2402
2403        builder
2404            .content_encoding(self.http_encoding)
2405            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
2406            .header(header::CONTENT_TYPE, b"application/json")
2407            .body(self.encoded.clone());
2408
2409        Ok(())
2410    }
2411
2412    fn respond(
2413        self: Box<Self>,
2414        result: Result<http::Response, UpstreamRequestError>,
2415    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2416        Box::pin(async {
2417            match result {
2418                Ok(mut response) => {
2419                    response.consume().await.ok();
2420                }
2421                Err(error) => {
2422                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
2423
2424                    // If the request did not arrive at the upstream, we are responsible for outcomes.
2425                    // Otherwise, the upstream is responsible to log outcomes.
2426                    if error.is_received() {
2427                        return;
2428                    }
2429
2430                    self.create_error_outcomes()
2431                }
2432            }
2433        })
2434    }
2435}
2436
2437/// Container for global and project level [`Quota`].
2438#[derive(Copy, Clone, Debug)]
2439#[cfg(feature = "processing")]
2440struct CombinedQuotas<'a> {
2441    global_quotas: &'a [Quota],
2442    project_quotas: &'a [Quota],
2443}
2444
2445#[cfg(feature = "processing")]
2446impl<'a> CombinedQuotas<'a> {
2447    /// Returns a new [`CombinedQuotas`].
2448    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
2449        Self {
2450            global_quotas: &global_config.quotas,
2451            project_quotas,
2452        }
2453    }
2454}
2455
2456#[cfg(feature = "processing")]
2457impl<'a> IntoIterator for CombinedQuotas<'a> {
2458    type Item = &'a Quota;
2459    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
2460
2461    fn into_iter(self) -> Self::IntoIter {
2462        self.global_quotas.iter().chain(self.project_quotas.iter())
2463    }
2464}
2465
2466#[cfg(test)]
2467mod tests {
2468    use insta::assert_debug_snapshot;
2469    use relay_common::glob2::LazyGlob;
2470    use relay_dynamic_config::ProjectConfig;
2471    use relay_event_normalization::{
2472        NormalizationConfig, RedactionRule, TransactionNameConfig, TransactionNameRule,
2473    };
2474    use relay_event_schema::protocol::{Event, TransactionSource};
2475    use relay_pii::DataScrubbingConfig;
2476    use relay_protocol::Annotated;
2477    use similar_asserts::assert_eq;
2478
2479    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
2480
2481    #[cfg(feature = "processing")]
2482    use {
2483        relay_metrics::BucketValue,
2484        relay_quotas::{QuotaScope, ReasonCode},
2485        relay_test::mock_service,
2486    };
2487
2488    use super::*;
2489
2490    #[cfg(feature = "processing")]
2491    fn mock_quota(id: &str) -> Quota {
2492        Quota {
2493            id: Some(id.into()),
2494            categories: [DataCategory::MetricBucket].into(),
2495            scope: QuotaScope::Organization,
2496            scope_id: None,
2497            limit: Some(0),
2498            window: None,
2499            reason_code: None,
2500            namespace: None,
2501        }
2502    }
2503
2504    #[cfg(feature = "processing")]
2505    #[test]
2506    fn test_dynamic_quotas() {
2507        let global_config = relay_dynamic_config::GlobalConfig {
2508            quotas: vec![mock_quota("foo"), mock_quota("bar")],
2509            ..Default::default()
2510        };
2511
2512        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
2513
2514        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
2515
2516        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
2517        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
2518    }
2519
2520    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
2521    /// also ratelimit the next batches in the same message automatically.
2522    #[cfg(feature = "processing")]
2523    #[tokio::test]
2524    async fn test_ratelimit_per_batch() {
2525        use relay_base_schema::organization::OrganizationId;
2526        use relay_protocol::FiniteF64;
2527
2528        let rate_limited_org = Scoping {
2529            organization_id: OrganizationId::new(1),
2530            project_id: ProjectId::new(21),
2531            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
2532            key_id: Some(17),
2533        };
2534
2535        let not_rate_limited_org = Scoping {
2536            organization_id: OrganizationId::new(2),
2537            project_id: ProjectId::new(21),
2538            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
2539            key_id: Some(17),
2540        };
2541
2542        let message = {
2543            let project_info = {
2544                let quota = Quota {
2545                    id: Some("testing".into()),
2546                    categories: [DataCategory::MetricBucket].into(),
2547                    scope: relay_quotas::QuotaScope::Organization,
2548                    scope_id: Some(rate_limited_org.organization_id.to_string().into()),
2549                    limit: Some(0),
2550                    window: None,
2551                    reason_code: Some(ReasonCode::new("test")),
2552                    namespace: None,
2553                };
2554
2555                let mut config = ProjectConfig::default();
2556                config.quotas.push(quota);
2557
2558                Arc::new(ProjectInfo {
2559                    config,
2560                    ..Default::default()
2561                })
2562            };
2563
2564            let project_metrics = |scoping| ProjectBuckets {
2565                buckets: vec![Bucket {
2566                    name: "d:spans/bar".into(),
2567                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
2568                    timestamp: UnixTimestamp::now(),
2569                    tags: Default::default(),
2570                    width: 10,
2571                    metadata: BucketMetadata::default(),
2572                }],
2573                rate_limits: Default::default(),
2574                project_info: project_info.clone(),
2575                scoping,
2576            };
2577
2578            let buckets = hashbrown::HashMap::from([
2579                (
2580                    rate_limited_org.project_key,
2581                    project_metrics(rate_limited_org),
2582                ),
2583                (
2584                    not_rate_limited_org.project_key,
2585                    project_metrics(not_rate_limited_org),
2586                ),
2587            ]);
2588
2589            FlushBuckets {
2590                partition_key: 0,
2591                buckets,
2592            }
2593        };
2594
2595        // ensure the order of the map while iterating is as expected.
2596        assert_eq!(message.buckets.keys().count(), 2);
2597
2598        let config = {
2599            let config_json = serde_json::json!({
2600                "processing": {
2601                    "enabled": true,
2602                    "kafka_config": [],
2603                    "redis": {
2604                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
2605                    }
2606                }
2607            });
2608            Config::from_json_value(config_json).unwrap()
2609        };
2610
2611        let (store, handle) = {
2612            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
2613                let org_id = match msg {
2614                    Store::Metrics(x) => x.scoping.organization_id,
2615                    _ => panic!("received envelope when expecting only metrics"),
2616                };
2617                org_ids.push(org_id);
2618            };
2619
2620            mock_service("store_forwarder", vec![], f)
2621        };
2622
2623        let processor = create_test_processor(config).await;
2624        assert!(processor.redis_rate_limiter_enabled());
2625
2626        processor.encode_metrics_processing(message, &store).await;
2627
2628        drop(store);
2629        let orgs_not_ratelimited = handle.await.unwrap();
2630
2631        assert_eq!(
2632            orgs_not_ratelimited,
2633            vec![not_rate_limited_org.organization_id]
2634        );
2635    }
2636
2637    #[tokio::test]
2638    async fn test_browser_version_extraction_with_pii_like_data() {
2639        let processor = create_test_processor(Default::default()).await;
2640        let outcome_aggregator = Addr::dummy();
2641        let event_id = EventId::new();
2642
2643        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2644            .parse()
2645            .unwrap();
2646
2647        let request_meta = RequestMeta::new(dsn);
2648        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
2649
2650        envelope.add_item({
2651                let mut item = Item::new(ItemType::Event);
2652                item.set_payload(
2653                    ContentType::Json,
2654                    r#"
2655                    {
2656                        "request": {
2657                            "headers": [
2658                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
2659                            ]
2660                        }
2661                    }
2662                "#,
2663                );
2664                item
2665            });
2666
2667        let mut datascrubbing_settings = DataScrubbingConfig::default();
2668        // enable all the default scrubbing
2669        datascrubbing_settings.scrub_data = true;
2670        datascrubbing_settings.scrub_defaults = true;
2671        datascrubbing_settings.scrub_ip_addresses = true;
2672
2673        // Make sure to mask any IP-like looking data
2674        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
2675
2676        let config = ProjectConfig {
2677            datascrubbing_settings,
2678            pii_config: Some(pii_config),
2679            ..Default::default()
2680        };
2681
2682        let project_info = ProjectInfo {
2683            config,
2684            ..Default::default()
2685        };
2686
2687        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
2688        assert_eq!(envelopes.len(), 1);
2689
2690        let (group, envelope) = envelopes.pop().unwrap();
2691        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2692
2693        let message = ProcessEnvelopeGrouped {
2694            group,
2695            envelope,
2696            ctx: processing::Context {
2697                project_info: &project_info,
2698                ..processing::Context::for_test()
2699            },
2700        };
2701
2702        let Ok(Some((output, ctx))) = processor.process(message).await else {
2703            panic!();
2704        };
2705        let new_envelope = output.serialize_envelope(ctx).unwrap().accept(|f| f);
2706
2707        let event_item = new_envelope.items().last().unwrap();
2708        let annotated_event: Annotated<Event> =
2709            Annotated::from_json_bytes(&event_item.payload()).unwrap();
2710        let event = annotated_event.into_value().unwrap();
2711        let headers = event
2712            .request
2713            .into_value()
2714            .unwrap()
2715            .headers
2716            .into_value()
2717            .unwrap();
2718
2719        // IP-like data must be masked
2720        assert_eq!(
2721            Some(
2722                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
2723            ),
2724            headers.get_header("User-Agent")
2725        );
2726        // But we still get correct browser and version number
2727        let contexts = event.contexts.into_value().unwrap();
2728        let browser = contexts.0.get("browser").unwrap();
2729        assert_eq!(
2730            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
2731            browser.to_json().unwrap()
2732        );
2733    }
2734
2735    #[tokio::test]
2736    #[cfg(feature = "processing")]
2737    async fn test_materialize_dsc() {
2738        use crate::services::projects::project::PublicKeyConfig;
2739
2740        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2741            .parse()
2742            .unwrap();
2743        let request_meta = RequestMeta::new(dsn);
2744        let mut envelope = Envelope::from_request(None, request_meta);
2745
2746        let dsc = r#"{
2747            "trace_id": "00000000-0000-0000-0000-000000000000",
2748            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
2749            "sample_rate": "0.2"
2750        }"#;
2751        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
2752
2753        let mut item = Item::new(ItemType::Event);
2754        item.set_payload(ContentType::Json, r#"{}"#);
2755        envelope.add_item(item);
2756
2757        let outcome_aggregator = Addr::dummy();
2758        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2759
2760        let mut project_info = ProjectInfo::default();
2761        project_info.public_keys.push(PublicKeyConfig {
2762            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
2763            numeric_id: Some(1),
2764        });
2765
2766        let config = serde_json::json!({
2767            "processing": {
2768                "enabled": true,
2769                "kafka_config": [],
2770            }
2771        });
2772
2773        let message = ProcessEnvelopeGrouped {
2774            group: ProcessingGroup::Error,
2775            envelope: managed_envelope,
2776            ctx: processing::Context {
2777                config: &Config::from_json_value(config.clone()).unwrap(),
2778                project_info: &project_info,
2779                sampling_project_info: Some(&project_info),
2780                ..processing::Context::for_test()
2781            },
2782        };
2783
2784        let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
2785        let Ok(Some((output, ctx))) = processor.process(message).await else {
2786            panic!();
2787        };
2788        let envelope = output.serialize_envelope(ctx).unwrap();
2789        let event = envelope
2790            .get_item_by(|item| item.ty() == &ItemType::Event)
2791            .unwrap();
2792
2793        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
2794        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
2795        Object(
2796            {
2797                "environment": ~,
2798                "public_key": String(
2799                    "e12d836b15bb49d7bbf99e64295d995b",
2800                ),
2801                "release": ~,
2802                "replay_id": ~,
2803                "sample_rate": String(
2804                    "0.2",
2805                ),
2806                "trace_id": String(
2807                    "00000000000000000000000000000000",
2808                ),
2809                "transaction": ~,
2810            },
2811        )
2812        "###);
2813    }
2814
2815    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
2816        let mut event = Annotated::<Event>::from_json(
2817            r#"
2818            {
2819                "type": "transaction",
2820                "transaction": "/foo/",
2821                "timestamp": 946684810.0,
2822                "start_timestamp": 946684800.0,
2823                "contexts": {
2824                    "trace": {
2825                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
2826                        "span_id": "fa90fdead5f74053",
2827                        "op": "http.server",
2828                        "type": "trace"
2829                    }
2830                },
2831                "transaction_info": {
2832                    "source": "url"
2833                }
2834            }
2835            "#,
2836        )
2837        .unwrap();
2838        let e = event.value_mut().as_mut().unwrap();
2839        e.transaction.set_value(Some(transaction_name.into()));
2840
2841        e.transaction_info
2842            .value_mut()
2843            .as_mut()
2844            .unwrap()
2845            .source
2846            .set_value(Some(source));
2847
2848        relay_statsd::with_capturing_test_client(|| {
2849            utils::log_transaction_name_metrics(&mut event, |event| {
2850                let config = NormalizationConfig {
2851                    transaction_name_config: TransactionNameConfig {
2852                        rules: &[TransactionNameRule {
2853                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
2854                            expiry: DateTime::<Utc>::MAX_UTC,
2855                            redaction: RedactionRule::Replace {
2856                                substitution: "*".to_owned(),
2857                            },
2858                        }],
2859                    },
2860                    ..Default::default()
2861                };
2862                relay_event_normalization::normalize_event(event, &config)
2863            });
2864        })
2865    }
2866
2867    #[test]
2868    fn test_log_transaction_metrics_none() {
2869        let captures = capture_test_event("/nothing", TransactionSource::Url);
2870        insta::assert_debug_snapshot!(captures, @r###"
2871        [
2872            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
2873        ]
2874        "###);
2875    }
2876
2877    #[test]
2878    fn test_log_transaction_metrics_rule() {
2879        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
2880        insta::assert_debug_snapshot!(captures, @r###"
2881        [
2882            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
2883        ]
2884        "###);
2885    }
2886
2887    #[test]
2888    fn test_log_transaction_metrics_pattern() {
2889        let captures = capture_test_event("/something/12345", TransactionSource::Url);
2890        insta::assert_debug_snapshot!(captures, @r###"
2891        [
2892            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
2893        ]
2894        "###);
2895    }
2896
2897    #[test]
2898    fn test_log_transaction_metrics_both() {
2899        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
2900        insta::assert_debug_snapshot!(captures, @r###"
2901        [
2902            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
2903        ]
2904        "###);
2905    }
2906
2907    #[test]
2908    fn test_log_transaction_metrics_no_match() {
2909        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
2910        insta::assert_debug_snapshot!(captures, @r###"
2911        [
2912            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
2913        ]
2914        "###);
2915    }
2916
2917    #[tokio::test]
2918    async fn test_process_metrics_bucket_metadata() {
2919        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
2920        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
2921        let received_at = Utc::now();
2922        let config = Config::default();
2923
2924        let (aggregator, mut aggregator_rx) = Addr::custom();
2925        let processor = create_test_processor_with_addrs(
2926            config,
2927            Addrs {
2928                aggregator,
2929                ..Default::default()
2930            },
2931        )
2932        .await;
2933
2934        let mut item = Item::new(ItemType::Statsd);
2935        item.set_payload(ContentType::Text, "spans/foo:3182887624:4267882815|s");
2936        for (source, expected_received_at) in [
2937            (
2938                BucketSource::External,
2939                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
2940            ),
2941            (BucketSource::Internal, None),
2942        ] {
2943            let message = ProcessMetrics {
2944                data: MetricData::Raw(vec![item.clone()]),
2945                project_key,
2946                source,
2947                received_at,
2948                sent_at: Some(Utc::now()),
2949            };
2950            processor.handle_process_metrics(&mut token, message);
2951
2952            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
2953            let buckets = merge_buckets.buckets;
2954            assert_eq!(buckets.len(), 1);
2955            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
2956        }
2957    }
2958
2959    #[tokio::test]
2960    async fn test_process_batched_metrics() {
2961        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
2962        let received_at = Utc::now();
2963        let config = Config::default();
2964
2965        let (aggregator, mut aggregator_rx) = Addr::custom();
2966        let processor = create_test_processor_with_addrs(
2967            config,
2968            Addrs {
2969                aggregator,
2970                ..Default::default()
2971            },
2972        )
2973        .await;
2974
2975        let payload = r#"{
2976    "buckets": {
2977        "11111111111111111111111111111111": [
2978            {
2979                "timestamp": 1615889440,
2980                "width": 0,
2981                "name": "d:custom/endpoint.response_time@millisecond",
2982                "type": "d",
2983                "value": [
2984                  68.0
2985                ],
2986                "tags": {
2987                  "route": "user_index"
2988                }
2989            }
2990        ],
2991        "22222222222222222222222222222222": [
2992            {
2993                "timestamp": 1615889440,
2994                "width": 0,
2995                "name": "d:custom/endpoint.cache_rate@none",
2996                "type": "d",
2997                "value": [
2998                  36.0
2999                ]
3000            }
3001        ]
3002    }
3003}
3004"#;
3005        let message = ProcessBatchedMetrics {
3006            payload: Bytes::from(payload),
3007            source: BucketSource::Internal,
3008            received_at,
3009            sent_at: Some(Utc::now()),
3010        };
3011        processor.handle_process_batched_metrics(&mut token, message);
3012
3013        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3014        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3015
3016        let mut messages = vec![mb1, mb2];
3017        messages.sort_by_key(|mb| mb.project_key);
3018
3019        let actual = messages
3020            .into_iter()
3021            .map(|mb| (mb.project_key, mb.buckets))
3022            .collect::<Vec<_>>();
3023
3024        assert_debug_snapshot!(actual, @r###"
3025        [
3026            (
3027                ProjectKey("11111111111111111111111111111111"),
3028                [
3029                    Bucket {
3030                        timestamp: UnixTimestamp(1615889440),
3031                        width: 0,
3032                        name: MetricName(
3033                            "d:custom/endpoint.response_time@millisecond",
3034                        ),
3035                        value: Distribution(
3036                            [
3037                                68.0,
3038                            ],
3039                        ),
3040                        tags: {
3041                            "route": "user_index",
3042                        },
3043                        metadata: BucketMetadata {
3044                            merges: 1,
3045                            received_at: None,
3046                            extracted_from_indexed: false,
3047                        },
3048                    },
3049                ],
3050            ),
3051            (
3052                ProjectKey("22222222222222222222222222222222"),
3053                [
3054                    Bucket {
3055                        timestamp: UnixTimestamp(1615889440),
3056                        width: 0,
3057                        name: MetricName(
3058                            "d:custom/endpoint.cache_rate@none",
3059                        ),
3060                        value: Distribution(
3061                            [
3062                                36.0,
3063                            ],
3064                        ),
3065                        tags: {},
3066                        metadata: BucketMetadata {
3067                            merges: 1,
3068                            received_at: None,
3069                            extracted_from_indexed: false,
3070                        },
3071                    },
3072                ],
3073            ),
3074        ]
3075        "###);
3076    }
3077}