Skip to main content

relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding};
23use relay_dynamic_config::Feature;
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{ClientReport, Event, EventId, SpanV2};
27use relay_filter::FilterStatKey;
28use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
29use relay_protocol::Annotated;
30use relay_quotas::{DataCategory, RateLimits, Scoping};
31use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision};
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, NoResponse, Service};
34use reqwest::header;
35use smallvec::{SmallVec, smallvec};
36use zstd::stream::Encoder as ZstdEncoder;
37
38use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType};
39use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
40use crate::integrations::Integration;
41use crate::managed::ManagedEnvelope;
42use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
43use crate::metrics_extraction::ExtractedMetrics;
44use crate::processing::attachments::AttachmentProcessor;
45use crate::processing::check_ins::CheckInsProcessor;
46use crate::processing::client_reports::ClientReportsProcessor;
47use crate::processing::errors::{ErrorsProcessor, SwitchProcessingError};
48use crate::processing::forward_unknown::ForwardUnknownProcessor;
49use crate::processing::legacy_spans::LegacySpansProcessor;
50use crate::processing::logs::LogsProcessor;
51use crate::processing::profile_chunks::ProfileChunksProcessor;
52use crate::processing::profiles::ProfilesProcessor;
53use crate::processing::replays::ReplaysProcessor;
54use crate::processing::sessions::SessionsProcessor;
55use crate::processing::spans::SpansProcessor;
56use crate::processing::trace_attachments::TraceAttachmentsProcessor;
57use crate::processing::trace_metrics::TraceMetricsProcessor;
58use crate::processing::transactions::TransactionProcessor;
59use crate::processing::user_reports::UserReportsProcessor;
60use crate::processing::{Forward as _, ForwardContext, Output, Outputs, QuotaRateLimiter};
61use crate::service::ServiceError;
62use crate::services::global_config::GlobalConfigHandle;
63use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
64use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
65use crate::services::projects::cache::ProjectCacheHandle;
66use crate::services::projects::project::{ProjectInfo, ProjectState};
67use crate::services::upstream::{
68    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
69};
70use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
71use crate::utils;
72use crate::{http, processing};
73use relay_threading::AsyncPool;
74use symbolic_unreal::{Unreal4Error, Unreal4ErrorKind};
75#[cfg(feature = "processing")]
76use {
77    crate::services::objectstore::Objectstore,
78    crate::services::store::Store,
79    itertools::Itertools,
80    relay_dynamic_config::GlobalConfig,
81    relay_quotas::{Quota, RateLimitingError, RedisRateLimiter},
82    relay_redis::RedisClients,
83    std::time::Instant,
84};
85
86pub mod event;
87mod metrics;
88pub mod span;
89
90#[cfg(all(sentry, feature = "processing"))]
91pub mod playstation;
92
93/// The minimum clock drift for correction to apply.
94pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
95
96#[derive(Debug)]
97pub struct GroupTypeError;
98
99impl Display for GroupTypeError {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        f.write_str("failed to convert processing group into corresponding type")
102    }
103}
104
105impl std::error::Error for GroupTypeError {}
106
107macro_rules! processing_group {
108    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
109        #[derive(Clone, Copy, Debug)]
110        pub struct $ty;
111
112        impl From<$ty> for ProcessingGroup {
113            fn from(_: $ty) -> Self {
114                ProcessingGroup::$variant
115            }
116        }
117
118        impl TryFrom<ProcessingGroup> for $ty {
119            type Error = GroupTypeError;
120
121            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
122                if matches!(value, ProcessingGroup::$variant) {
123                    return Ok($ty);
124                }
125                $($(
126                    if matches!(value, ProcessingGroup::$other) {
127                        return Ok($ty);
128                    }
129                )+)?
130                return Err(GroupTypeError);
131            }
132        }
133    };
134}
135
136processing_group!(TransactionGroup, Transaction);
137
138processing_group!(ErrorGroup, Error);
139
140processing_group!(SessionGroup, Session);
141processing_group!(ClientReportGroup, ClientReport);
142processing_group!(ReplayGroup, Replay);
143processing_group!(CheckInGroup, CheckIn);
144processing_group!(TraceMetricGroup, TraceMetric);
145processing_group!(SpanGroup, Span);
146
147processing_group!(ProfileChunkGroup, ProfileChunk);
148processing_group!(ForwardUnknownGroup, ForwardUnknown);
149processing_group!(Ungrouped, Ungrouped);
150
151/// Describes the groups of the processable items.
152#[derive(Clone, Copy, Debug)]
153pub enum ProcessingGroup {
154    /// All the transaction related items.
155    ///
156    /// Includes transactions, related attachments, profiles.
157    Transaction,
158    /// All the items which require (have or create) events.
159    ///
160    /// This includes: errors, NEL, security reports, user reports, some of the
161    /// attachments.
162    Error,
163    /// Session events.
164    Session,
165    /// Standalone attachments
166    ///
167    /// Attachments that are send without an item that creates an event in the same envelope.
168    StandaloneAttachments,
169    /// Standalone user reports
170    ///
171    /// User reports that are send without an item that creates an event in the same envelope.
172    StandaloneUserReports,
173    /// Standalone profiles
174    ///
175    /// Profiles which had their transaction sampled.
176    StandaloneProfiles,
177    /// Outcomes.
178    ClientReport,
179    /// Replays and ReplayRecordings.
180    Replay,
181    /// Crons.
182    CheckIn,
183    /// Logs.
184    Log,
185    /// Trace metrics.
186    TraceMetric,
187    /// Spans.
188    Span,
189    /// Span V2 spans.
190    SpanV2,
191    /// ProfileChunk.
192    ProfileChunk,
193    /// V2 attachments without span / log association.
194    TraceAttachment,
195    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
196    /// decide what to do with them.
197    ForwardUnknown,
198    /// All the items in the envelope that could not be grouped.
199    Ungrouped,
200}
201
202impl ProcessingGroup {
203    /// Splits provided envelope into list of tuples of groups with associated envelopes.
204    fn split_envelope(
205        mut envelope: Envelope,
206        project_info: &ProjectInfo,
207    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
208        let headers = envelope.headers().clone();
209        let mut grouped_envelopes = smallvec![];
210
211        // Extract replays.
212        let replay_items = envelope.take_items_by(|item| {
213            matches!(
214                item.ty(),
215                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
216            )
217        });
218        if !replay_items.is_empty() {
219            grouped_envelopes.push((
220                ProcessingGroup::Replay,
221                Envelope::from_parts(headers.clone(), replay_items),
222            ))
223        }
224
225        // Keep all the sessions together in one envelope.
226        let session_items = envelope
227            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
228        if !session_items.is_empty() {
229            grouped_envelopes.push((
230                ProcessingGroup::Session,
231                Envelope::from_parts(headers.clone(), session_items),
232            ))
233        }
234
235        let span_v2_items = envelope.take_items_by(|item| {
236            let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
237
238            ItemContainer::<SpanV2>::is_container(item)
239                || matches!(item.integration(), Some(Integration::Spans(_)))
240                // Process standalone spans (v1) via the v2 pipeline
241                || (exp_feature && matches!(item.ty(), &ItemType::Span))
242                || (exp_feature && item.is_span_attachment())
243        });
244
245        if !span_v2_items.is_empty() {
246            grouped_envelopes.push((
247                ProcessingGroup::SpanV2,
248                Envelope::from_parts(headers.clone(), span_v2_items),
249            ))
250        }
251
252        // Extract spans.
253        let span_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Span));
254        if !span_items.is_empty() {
255            grouped_envelopes.push((
256                ProcessingGroup::Span,
257                Envelope::from_parts(headers.clone(), span_items),
258            ))
259        }
260
261        // Extract logs.
262        let logs_items = envelope.take_items_by(|item| {
263            matches!(item.ty(), &ItemType::Log)
264                || matches!(item.integration(), Some(Integration::Logs(_)))
265        });
266        if !logs_items.is_empty() {
267            grouped_envelopes.push((
268                ProcessingGroup::Log,
269                Envelope::from_parts(headers.clone(), logs_items),
270            ))
271        }
272
273        // Extract trace metrics.
274        let trace_metric_items =
275            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
276        if !trace_metric_items.is_empty() {
277            grouped_envelopes.push((
278                ProcessingGroup::TraceMetric,
279                Envelope::from_parts(headers.clone(), trace_metric_items),
280            ))
281        }
282
283        // Extract profile chunks.
284        let profile_chunk_items =
285            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
286        if !profile_chunk_items.is_empty() {
287            grouped_envelopes.push((
288                ProcessingGroup::ProfileChunk,
289                Envelope::from_parts(headers.clone(), profile_chunk_items),
290            ))
291        }
292
293        let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
294        if !trace_attachment_items.is_empty() {
295            grouped_envelopes.push((
296                ProcessingGroup::TraceAttachment,
297                Envelope::from_parts(headers.clone(), trace_attachment_items),
298            ))
299        }
300
301        if !envelope.items().any(Item::creates_event) {
302            // Extract the standalone attachments
303            let standalone_attachments = envelope
304                .take_items_by(|i| i.requires_event() && matches!(i.ty(), ItemType::Attachment));
305            if !standalone_attachments.is_empty() {
306                grouped_envelopes.push((
307                    ProcessingGroup::StandaloneAttachments,
308                    Envelope::from_parts(headers.clone(), standalone_attachments),
309                ))
310            }
311
312            // Extract the standalone user reports
313            let standalone_user_reports =
314                envelope.take_items_by(|i| matches!(i.ty(), ItemType::UserReport));
315            if !standalone_user_reports.is_empty() {
316                grouped_envelopes.push((
317                    ProcessingGroup::StandaloneUserReports,
318                    Envelope::from_parts(headers.clone(), standalone_user_reports),
319                ));
320            }
321
322            // Extract the standalone profiles
323            let standalone_profiles =
324                envelope.take_items_by(|i| matches!(i.ty(), ItemType::Profile));
325            if !standalone_profiles.is_empty() {
326                grouped_envelopes.push((
327                    ProcessingGroup::StandaloneProfiles,
328                    Envelope::from_parts(headers.clone(), standalone_profiles),
329                ));
330            }
331        }
332
333        // Make sure we create separate envelopes for each `RawSecurity` report.
334        let security_reports_items = envelope
335            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
336            .into_iter()
337            .map(|item| {
338                let headers = headers.clone();
339                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
340                let mut envelope = Envelope::from_parts(headers, items);
341                envelope.set_event_id(EventId::new());
342                (ProcessingGroup::Error, envelope)
343            });
344        grouped_envelopes.extend(security_reports_items);
345
346        // Extract all the items which require an event into separate envelope.
347        let require_event_items = envelope.take_items_by(Item::requires_event);
348        if !require_event_items.is_empty() {
349            let group = if require_event_items
350                .iter()
351                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
352            {
353                ProcessingGroup::Transaction
354            } else {
355                ProcessingGroup::Error
356            };
357
358            grouped_envelopes.push((
359                group,
360                Envelope::from_parts(headers.clone(), require_event_items),
361            ))
362        }
363
364        // Get the rest of the envelopes, one per item.
365        let envelopes = envelope.items_mut().map(|item| {
366            let headers = headers.clone();
367            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
368            let envelope = Envelope::from_parts(headers, items);
369            let item_type = item.ty();
370            let group = if matches!(item_type, &ItemType::CheckIn) {
371                ProcessingGroup::CheckIn
372            } else if matches!(item.ty(), &ItemType::ClientReport) {
373                ProcessingGroup::ClientReport
374            } else if matches!(item_type, &ItemType::Unknown(_)) {
375                ProcessingGroup::ForwardUnknown
376            } else {
377                // Cannot group this item type.
378                ProcessingGroup::Ungrouped
379            };
380
381            (group, envelope)
382        });
383        grouped_envelopes.extend(envelopes);
384
385        grouped_envelopes
386    }
387
388    /// Returns the name of the group.
389    pub fn variant(&self) -> &'static str {
390        match self {
391            ProcessingGroup::Transaction => "transaction",
392            ProcessingGroup::Error => "error",
393            ProcessingGroup::Session => "session",
394            ProcessingGroup::StandaloneAttachments => "standalone_attachment",
395            ProcessingGroup::StandaloneUserReports => "standalone_user_reports",
396            ProcessingGroup::StandaloneProfiles => "standalone_profiles",
397            ProcessingGroup::ClientReport => "client_report",
398            ProcessingGroup::Replay => "replay",
399            ProcessingGroup::CheckIn => "check_in",
400            ProcessingGroup::Log => "log",
401            ProcessingGroup::TraceMetric => "trace_metric",
402            ProcessingGroup::Span => "span",
403            ProcessingGroup::SpanV2 => "span_v2",
404            ProcessingGroup::ProfileChunk => "profile_chunk",
405            ProcessingGroup::TraceAttachment => "trace_attachment",
406            ProcessingGroup::ForwardUnknown => "forward_unknown",
407            ProcessingGroup::Ungrouped => "ungrouped",
408        }
409    }
410}
411
412impl From<ProcessingGroup> for AppFeature {
413    fn from(value: ProcessingGroup) -> Self {
414        match value {
415            ProcessingGroup::Transaction => AppFeature::Transactions,
416            ProcessingGroup::Error => AppFeature::Errors,
417            ProcessingGroup::Session => AppFeature::Sessions,
418            ProcessingGroup::StandaloneAttachments => AppFeature::UnattributedEnvelope,
419            ProcessingGroup::StandaloneUserReports => AppFeature::UserReports,
420            ProcessingGroup::StandaloneProfiles => AppFeature::Profiles,
421            ProcessingGroup::ClientReport => AppFeature::ClientReports,
422            ProcessingGroup::Replay => AppFeature::Replays,
423            ProcessingGroup::CheckIn => AppFeature::CheckIns,
424            ProcessingGroup::Log => AppFeature::Logs,
425            ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
426            ProcessingGroup::Span => AppFeature::Spans,
427            ProcessingGroup::SpanV2 => AppFeature::Spans,
428            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
429            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
430            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
431            ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
432        }
433    }
434}
435
436/// An error returned when handling [`ProcessEnvelope`].
437#[derive(Debug, thiserror::Error)]
438pub enum ProcessingError {
439    #[error("invalid json in event")]
440    InvalidJson(#[source] serde_json::Error),
441
442    #[error("invalid message pack event payload")]
443    InvalidMsgpack(#[from] rmp_serde::decode::Error),
444
445    #[error("invalid unreal crash report")]
446    InvalidUnrealReport(#[source] Unreal4Error),
447
448    #[error("event payload too large")]
449    PayloadTooLarge(DiscardItemType),
450
451    #[error("invalid transaction event")]
452    InvalidTransaction,
453
454    #[error("the item is not allowed/supported in this envelope")]
455    UnsupportedItem,
456
457    #[error("envelope processor failed")]
458    ProcessingFailed(#[from] ProcessingAction),
459
460    #[error("duplicate {0} in event")]
461    DuplicateItem(ItemType),
462
463    #[error("failed to extract event payload")]
464    NoEventPayload,
465
466    #[error("missing project id in DSN")]
467    MissingProjectId,
468
469    #[error("invalid security report type: {0:?}")]
470    InvalidSecurityType(Bytes),
471
472    #[error("unsupported security report type")]
473    UnsupportedSecurityType,
474
475    #[error("invalid security report")]
476    InvalidSecurityReport(#[source] serde_json::Error),
477
478    #[error("event filtered with reason: {0:?}")]
479    EventFiltered(FilterStatKey),
480
481    #[error("could not serialize event payload")]
482    SerializeFailed(#[source] serde_json::Error),
483
484    #[cfg(feature = "processing")]
485    #[error("failed to apply quotas")]
486    QuotasFailed(#[from] RateLimitingError),
487
488    #[error("nintendo switch dying message processing failed {0:?}")]
489    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
490
491    #[cfg(all(sentry, feature = "processing"))]
492    #[error("playstation dump processing failed: {0}")]
493    InvalidPlaystationDump(String),
494
495    #[error("processing group does not match specific processor")]
496    ProcessingGroupMismatch,
497    #[error("new processing pipeline failed")]
498    ProcessingFailure,
499
500    #[cfg(feature = "processing")]
501    #[error("invalid attachment reference")]
502    InvalidAttachmentRef,
503
504    #[error("could not determine processing group for envelope items")]
505    NoProcessingGroup,
506}
507
508impl ProcessingError {
509    pub fn to_outcome(&self) -> Option<Outcome> {
510        match self {
511            Self::PayloadTooLarge(payload_type) => {
512                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
513            }
514            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
515            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
516            Self::InvalidSecurityType(_) => {
517                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
518            }
519            Self::UnsupportedItem => Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)),
520            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
521            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
522            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
523            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
524            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
525            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
526            #[cfg(all(sentry, feature = "processing"))]
527            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
528            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
529                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
530            }
531            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
532            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
533                Some(Outcome::Invalid(DiscardReason::Internal))
534            }
535            #[cfg(feature = "processing")]
536            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
537            Self::MissingProjectId => None,
538            Self::EventFiltered(key) => Some(Outcome::Filtered(key.clone())),
539
540            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
541            // Outcomes are emitted in the new processing pipeline already.
542            Self::ProcessingFailure => None,
543            #[cfg(feature = "processing")]
544            Self::InvalidAttachmentRef => {
545                Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
546            }
547            Self::NoProcessingGroup => Some(Outcome::Invalid(DiscardReason::Internal)),
548        }
549    }
550
551    fn is_unexpected(&self) -> bool {
552        self.to_outcome()
553            .is_some_and(|outcome| outcome.is_unexpected())
554    }
555}
556
557impl From<Unreal4Error> for ProcessingError {
558    fn from(err: Unreal4Error) -> Self {
559        match err.kind() {
560            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
561            _ => ProcessingError::InvalidUnrealReport(err),
562        }
563    }
564}
565
566type ExtractedEvent = (Annotated<Event>, usize);
567
568/// A container for extracted metrics during processing.
569///
570/// The container enforces that the extracted metrics are correctly tagged
571/// with the dynamic sampling decision.
572#[derive(Debug)]
573pub struct ProcessingExtractedMetrics {
574    metrics: ExtractedMetrics,
575}
576
577impl ProcessingExtractedMetrics {
578    pub fn new() -> Self {
579        Self {
580            metrics: ExtractedMetrics::default(),
581        }
582    }
583
584    pub fn into_inner(self) -> ExtractedMetrics {
585        self.metrics
586    }
587
588    /// Extends the contained metrics with [`ExtractedMetrics`].
589    pub fn extend(
590        &mut self,
591        extracted: ExtractedMetrics,
592        sampling_decision: Option<SamplingDecision>,
593    ) {
594        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
595        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
596    }
597
598    /// Extends the contained project metrics.
599    pub fn extend_project_metrics<I>(
600        &mut self,
601        buckets: I,
602        sampling_decision: Option<SamplingDecision>,
603    ) where
604        I: IntoIterator<Item = Bucket>,
605    {
606        self.metrics
607            .project_metrics
608            .extend(buckets.into_iter().map(|mut bucket| {
609                bucket.metadata.extracted_from_indexed =
610                    sampling_decision == Some(SamplingDecision::Keep);
611                bucket
612            }));
613    }
614
615    /// Extends the contained sampling metrics.
616    pub fn extend_sampling_metrics<I>(
617        &mut self,
618        buckets: I,
619        sampling_decision: Option<SamplingDecision>,
620    ) where
621        I: IntoIterator<Item = Bucket>,
622    {
623        self.metrics
624            .sampling_metrics
625            .extend(buckets.into_iter().map(|mut bucket| {
626                bucket.metadata.extracted_from_indexed =
627                    sampling_decision == Some(SamplingDecision::Keep);
628                bucket
629            }));
630    }
631}
632
633fn send_metrics(
634    metrics: ExtractedMetrics,
635    project_key: ProjectKey,
636    sampling_key: Option<ProjectKey>,
637    aggregator: &Addr<Aggregator>,
638) {
639    let ExtractedMetrics {
640        project_metrics,
641        sampling_metrics,
642    } = metrics;
643
644    if !project_metrics.is_empty() {
645        aggregator.send(MergeBuckets {
646            project_key,
647            buckets: project_metrics,
648        });
649    }
650
651    if !sampling_metrics.is_empty() {
652        // If no sampling project state is available, we associate the sampling
653        // metrics with the current project.
654        //
655        // project_without_tracing         -> metrics goes to self
656        // dependent_project_with_tracing  -> metrics goes to root
657        // root_project_with_tracing       -> metrics goes to root == self
658        let sampling_project_key = sampling_key.unwrap_or(project_key);
659        aggregator.send(MergeBuckets {
660            project_key: sampling_project_key,
661            buckets: sampling_metrics,
662        });
663    }
664}
665
666/// Applies processing to all contents of the given envelope.
667///
668/// Depending on the contents of the envelope and Relay's mode, this includes:
669///
670///  - Basic normalization and validation for all item types.
671///  - Clock drift correction if the required `sent_at` header is present.
672///  - Expansion of certain item types (e.g. unreal).
673///  - Store normalization for event payloads in processing mode.
674///  - Rate limiters and inbound filters on events in processing mode.
675#[derive(Debug)]
676pub struct ProcessEnvelope {
677    /// Envelope to process.
678    pub envelope: ManagedEnvelope,
679    /// The project info.
680    pub project_info: Arc<ProjectInfo>,
681    /// Currently active cached rate limits for this project.
682    pub rate_limits: Arc<RateLimits>,
683    /// Root sampling project info.
684    pub sampling_project_info: Option<Arc<ProjectInfo>>,
685    /// Sampling reservoir counters.
686    pub reservoir_counters: ReservoirCounters,
687}
688
689/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
690#[derive(Debug)]
691struct ProcessEnvelopeGrouped<'a> {
692    /// The group the envelope belongs to.
693    pub group: ProcessingGroup,
694    /// Envelope to process.
695    pub envelope: ManagedEnvelope,
696    /// The processing context.
697    pub ctx: processing::Context<'a>,
698}
699
700/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
701///
702/// This parses and validates the metrics:
703///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
704///    ignored independently.
705///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
706///    dropped together on parsing failure.
707///  - Other envelope items will be ignored with an error message.
708///
709/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
710/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
711#[derive(Debug)]
712pub struct ProcessMetrics {
713    /// A list of metric items.
714    pub data: MetricData,
715    /// The target project.
716    pub project_key: ProjectKey,
717    /// Whether to keep or reset the metric metadata.
718    pub source: BucketSource,
719    /// The wall clock time at which the request was received.
720    pub received_at: DateTime<Utc>,
721    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
722    /// correction.
723    pub sent_at: Option<DateTime<Utc>>,
724}
725
726/// Raw unparsed metric data.
727#[derive(Debug)]
728pub enum MetricData {
729    /// Raw data, unparsed envelope items.
730    Raw(Vec<Item>),
731    /// Already parsed buckets but unprocessed.
732    Parsed(Vec<Bucket>),
733}
734
735impl MetricData {
736    /// Consumes the metric data and parses the contained buckets.
737    ///
738    /// If the contained data is already parsed the buckets are returned unchanged.
739    /// Raw buckets are parsed and created with the passed `timestamp`.
740    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
741        let items = match self {
742            Self::Parsed(buckets) => return buckets,
743            Self::Raw(items) => items,
744        };
745
746        let mut buckets = Vec::new();
747        for item in items {
748            let payload = item.payload();
749            if item.ty() == &ItemType::Statsd {
750                for bucket_result in Bucket::parse_all(&payload, timestamp) {
751                    match bucket_result {
752                        Ok(bucket) => buckets.push(bucket),
753                        Err(error) => relay_log::debug!(
754                            error = &error as &dyn Error,
755                            "failed to parse metric bucket from statsd format",
756                        ),
757                    }
758                }
759            } else if item.ty() == &ItemType::MetricBuckets {
760                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
761                    Ok(parsed_buckets) => {
762                        // Re-use the allocation of `b` if possible.
763                        if buckets.is_empty() {
764                            buckets = parsed_buckets;
765                        } else {
766                            buckets.extend(parsed_buckets);
767                        }
768                    }
769                    Err(error) => {
770                        relay_log::debug!(
771                            error = &error as &dyn Error,
772                            "failed to parse metric bucket",
773                        );
774                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
775                    }
776                }
777            } else {
778                relay_log::error!(
779                    "invalid item of type {} passed to ProcessMetrics",
780                    item.ty()
781                );
782            }
783        }
784        buckets
785    }
786}
787
788#[derive(Debug)]
789pub struct ProcessBatchedMetrics {
790    /// Metrics payload in JSON format.
791    pub payload: Bytes,
792    /// Whether to keep or reset the metric metadata.
793    pub source: BucketSource,
794    /// The wall clock time at which the request was received.
795    pub received_at: DateTime<Utc>,
796    /// The wall clock time at which the request was received.
797    pub sent_at: Option<DateTime<Utc>>,
798}
799
800/// Source information where a metric bucket originates from.
801#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
802pub enum BucketSource {
803    /// The metric bucket originated from an internal Relay use case.
804    ///
805    /// The metric bucket originates either from within the same Relay
806    /// or was accepted coming from another Relay which is registered as
807    /// an internal Relay via Relay's configuration.
808    Internal,
809    /// The bucket source originated from an untrusted source.
810    ///
811    /// Managed Relays sending extracted metrics are considered external,
812    /// it's a project use case but it comes from an untrusted source.
813    External,
814}
815
816impl BucketSource {
817    /// Infers the bucket source from [`RequestMeta::request_trust`].
818    pub fn from_meta(meta: &RequestMeta) -> Self {
819        match meta.request_trust() {
820            RequestTrust::Trusted => Self::Internal,
821            RequestTrust::Untrusted => Self::External,
822        }
823    }
824}
825
826/// Sends a client report to the upstream.
827#[derive(Debug)]
828pub struct SubmitClientReports {
829    /// The client report to be sent.
830    pub client_reports: Vec<ClientReport>,
831    /// Scoping information for the client report.
832    pub scoping: Scoping,
833}
834
835/// CPU-intensive processing tasks for envelopes.
836#[derive(Debug)]
837pub enum EnvelopeProcessor {
838    ProcessEnvelope(Box<ProcessEnvelope>),
839    ProcessProjectMetrics(Box<ProcessMetrics>),
840    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
841    FlushBuckets(Box<FlushBuckets>),
842    SubmitClientReports(Box<SubmitClientReports>),
843}
844
845impl EnvelopeProcessor {
846    /// Returns the name of the message variant.
847    pub fn variant(&self) -> &'static str {
848        match self {
849            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
850            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
851            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
852            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
853            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
854        }
855    }
856}
857
858impl relay_system::Interface for EnvelopeProcessor {}
859
860impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
861    type Response = relay_system::NoResponse;
862
863    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
864        Self::ProcessEnvelope(Box::new(message))
865    }
866}
867
868impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
869    type Response = NoResponse;
870
871    fn from_message(message: ProcessMetrics, _: ()) -> Self {
872        Self::ProcessProjectMetrics(Box::new(message))
873    }
874}
875
876impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
877    type Response = NoResponse;
878
879    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
880        Self::ProcessBatchedMetrics(Box::new(message))
881    }
882}
883
884impl FromMessage<FlushBuckets> for EnvelopeProcessor {
885    type Response = NoResponse;
886
887    fn from_message(message: FlushBuckets, _: ()) -> Self {
888        Self::FlushBuckets(Box::new(message))
889    }
890}
891
892impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
893    type Response = NoResponse;
894
895    fn from_message(message: SubmitClientReports, _: ()) -> Self {
896        Self::SubmitClientReports(Box::new(message))
897    }
898}
899
900/// The asynchronous thread pool used for scheduling processing tasks in the processor.
901pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
902
903/// Service implementing the [`EnvelopeProcessor`] interface.
904///
905/// This service handles messages in a worker pool with configurable concurrency.
906#[derive(Clone)]
907pub struct EnvelopeProcessorService {
908    inner: Arc<InnerProcessor>,
909}
910
911/// Contains the addresses of services that the processor publishes to.
912pub struct Addrs {
913    pub outcome_aggregator: Addr<TrackOutcome>,
914    pub upstream_relay: Addr<UpstreamRelay>,
915    #[cfg(feature = "processing")]
916    pub objectstore: Option<Addr<Objectstore>>,
917    #[cfg(feature = "processing")]
918    pub store_forwarder: Option<Addr<Store>>,
919    pub aggregator: Addr<Aggregator>,
920}
921
922impl Default for Addrs {
923    fn default() -> Self {
924        Addrs {
925            outcome_aggregator: Addr::dummy(),
926            upstream_relay: Addr::dummy(),
927            #[cfg(feature = "processing")]
928            objectstore: None,
929            #[cfg(feature = "processing")]
930            store_forwarder: None,
931            aggregator: Addr::dummy(),
932        }
933    }
934}
935
936struct InnerProcessor {
937    pool: EnvelopeProcessorServicePool,
938    config: Arc<Config>,
939    global_config: GlobalConfigHandle,
940    project_cache: ProjectCacheHandle,
941    cogs: Cogs,
942    addrs: Addrs,
943    #[cfg(feature = "processing")]
944    rate_limiter: Option<Arc<RedisRateLimiter>>,
945    metric_outcomes: MetricOutcomes,
946    processing: Processing,
947}
948
949struct Processing {
950    errors: ErrorsProcessor,
951    logs: LogsProcessor,
952    trace_metrics: TraceMetricsProcessor,
953    spans: SpansProcessor,
954    legacy_spans: LegacySpansProcessor,
955    check_ins: CheckInsProcessor,
956    sessions: SessionsProcessor,
957    transactions: TransactionProcessor,
958    profile_chunks: ProfileChunksProcessor,
959    trace_attachments: TraceAttachmentsProcessor,
960    replays: ReplaysProcessor,
961    client_reports: ClientReportsProcessor,
962    attachments: AttachmentProcessor,
963    user_reports: UserReportsProcessor,
964    profiles: ProfilesProcessor,
965    forward_unknown: ForwardUnknownProcessor,
966}
967
968impl EnvelopeProcessorService {
969    /// Creates a multi-threaded envelope processor.
970    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
971    pub fn new(
972        pool: EnvelopeProcessorServicePool,
973        config: Arc<Config>,
974        global_config: GlobalConfigHandle,
975        project_cache: ProjectCacheHandle,
976        cogs: Cogs,
977        #[cfg(feature = "processing")] redis: Option<RedisClients>,
978        addrs: Addrs,
979        metric_outcomes: MetricOutcomes,
980    ) -> Self {
981        let geoip_lookup = config
982            .geoip_path()
983            .and_then(
984                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
985                    Ok(geoip) => Some(geoip),
986                    Err(err) => {
987                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
988                        None
989                    }
990                },
991            )
992            .unwrap_or_else(GeoIpLookup::empty);
993
994        #[cfg(feature = "processing")]
995        let rate_limiter = redis.as_ref().map(|redis| {
996            RedisRateLimiter::new(redis.quotas.clone())
997                .max_limit(config.max_rate_limit())
998                .cache(config.quota_cache_ratio(), config.quota_cache_max())
999        });
1000
1001        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1002            #[cfg(feature = "processing")]
1003            project_cache.clone(),
1004            #[cfg(feature = "processing")]
1005            rate_limiter.clone(),
1006        ));
1007        #[cfg(feature = "processing")]
1008        let rate_limiter = rate_limiter.map(Arc::new);
1009        let outcome_aggregator = addrs.outcome_aggregator.clone();
1010        let inner = InnerProcessor {
1011            pool,
1012            global_config,
1013            project_cache,
1014            cogs,
1015            #[cfg(feature = "processing")]
1016            rate_limiter,
1017            addrs,
1018            metric_outcomes,
1019            processing: Processing {
1020                errors: ErrorsProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1021                logs: LogsProcessor::new(Arc::clone(&quota_limiter)),
1022                trace_metrics: TraceMetricsProcessor::new(Arc::clone(&quota_limiter)),
1023                spans: SpansProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1024                legacy_spans: LegacySpansProcessor::new(
1025                    Arc::clone(&quota_limiter),
1026                    geoip_lookup.clone(),
1027                ),
1028                check_ins: CheckInsProcessor::new(Arc::clone(&quota_limiter)),
1029                sessions: SessionsProcessor::new(Arc::clone(&quota_limiter)),
1030                transactions: TransactionProcessor::new(
1031                    Arc::clone(&quota_limiter),
1032                    geoip_lookup.clone(),
1033                    #[cfg(feature = "processing")]
1034                    redis.map(|r| r.quotas),
1035                    #[cfg(not(feature = "processing"))]
1036                    None,
1037                ),
1038                profile_chunks: ProfileChunksProcessor::new(Arc::clone(&quota_limiter)),
1039                trace_attachments: TraceAttachmentsProcessor::new(Arc::clone(&quota_limiter)),
1040                replays: ReplaysProcessor::new(Arc::clone(&quota_limiter), geoip_lookup),
1041                client_reports: ClientReportsProcessor::new(outcome_aggregator),
1042                attachments: AttachmentProcessor::new(Arc::clone(&quota_limiter)),
1043                user_reports: UserReportsProcessor::new(Arc::clone(&quota_limiter)),
1044                profiles: ProfilesProcessor::new(quota_limiter),
1045                forward_unknown: ForwardUnknownProcessor::new(),
1046            },
1047            config,
1048        };
1049
1050        Self {
1051            inner: Arc::new(inner),
1052        }
1053    }
1054
1055    async fn process_with_processor<P: processing::Processor>(
1056        &self,
1057        processor: &P,
1058        mut managed_envelope: ManagedEnvelope,
1059        ctx: processing::Context<'_>,
1060    ) -> Result<Output<Outputs>, ProcessingError>
1061    where
1062        Outputs: From<P::Output>,
1063    {
1064        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1065            debug_assert!(
1066                false,
1067                "there must be work for the {} processor",
1068                std::any::type_name::<P>(),
1069            );
1070            return Err(ProcessingError::ProcessingGroupMismatch);
1071        };
1072
1073        managed_envelope.update();
1074        match managed_envelope.envelope().is_empty() {
1075            true => managed_envelope.accept(),
1076            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1077        }
1078
1079        processor
1080            .process(work, ctx)
1081            .await
1082            .map_err(|err| {
1083                relay_log::debug!(
1084                    error = &err as &dyn std::error::Error,
1085                    "processing pipeline failed"
1086                );
1087                ProcessingError::ProcessingFailure
1088            })
1089            .map(|o| o.map(Into::into))
1090    }
1091
1092    async fn process_envelope(
1093        &self,
1094        project_id: ProjectId,
1095        message: ProcessEnvelopeGrouped<'_>,
1096    ) -> Result<Output<Outputs>, ProcessingError> {
1097        let ProcessEnvelopeGrouped {
1098            group,
1099            envelope: mut managed_envelope,
1100            ctx,
1101        } = message;
1102
1103        // Pre-process the envelope headers.
1104        if let Some(sampling_state) = ctx.sampling_project_info {
1105            // Both transactions and standalone span envelopes need a normalized DSC header
1106            // to make sampling rules based on the segment/transaction name work correctly.
1107            managed_envelope
1108                .envelope_mut()
1109                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1110        }
1111
1112        // Set the event retention. Effectively, this value will only be available in processing
1113        // mode when the full project config is queried from the upstream.
1114        if let Some(retention) = ctx.project_info.config.event_retention {
1115            managed_envelope.envelope_mut().set_retention(retention);
1116        }
1117
1118        // Set the event retention. Effectively, this value will only be available in processing
1119        // mode when the full project config is queried from the upstream.
1120        if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1121            managed_envelope
1122                .envelope_mut()
1123                .set_downsampled_retention(retention);
1124        }
1125
1126        // Ensure the project ID is updated to the stored instance for this project cache. This can
1127        // differ in two cases:
1128        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
1129        //  2. The DSN was moved and the envelope sent to the old project ID.
1130        managed_envelope
1131            .envelope_mut()
1132            .meta_mut()
1133            .set_project_id(project_id);
1134
1135        relay_log::trace!("Processing {group} group", group = group.variant());
1136
1137        match group {
1138            ProcessingGroup::Error => {
1139                self.process_with_processor(&self.inner.processing.errors, managed_envelope, ctx)
1140                    .await
1141            }
1142            ProcessingGroup::Transaction => {
1143                self.process_with_processor(
1144                    &self.inner.processing.transactions,
1145                    managed_envelope,
1146                    ctx,
1147                )
1148                .await
1149            }
1150            ProcessingGroup::Session => {
1151                self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1152                    .await
1153            }
1154            ProcessingGroup::StandaloneAttachments => {
1155                self.process_with_processor(
1156                    &self.inner.processing.attachments,
1157                    managed_envelope,
1158                    ctx,
1159                )
1160                .await
1161            }
1162            ProcessingGroup::StandaloneUserReports => {
1163                self.process_with_processor(
1164                    &self.inner.processing.user_reports,
1165                    managed_envelope,
1166                    ctx,
1167                )
1168                .await
1169            }
1170            ProcessingGroup::StandaloneProfiles => {
1171                self.process_with_processor(&self.inner.processing.profiles, managed_envelope, ctx)
1172                    .await
1173            }
1174            ProcessingGroup::ClientReport => {
1175                self.process_with_processor(
1176                    &self.inner.processing.client_reports,
1177                    managed_envelope,
1178                    ctx,
1179                )
1180                .await
1181            }
1182            ProcessingGroup::Replay => {
1183                self.process_with_processor(&self.inner.processing.replays, managed_envelope, ctx)
1184                    .await
1185            }
1186            ProcessingGroup::CheckIn => {
1187                self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1188                    .await
1189            }
1190            ProcessingGroup::Log => {
1191                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1192                    .await
1193            }
1194            ProcessingGroup::TraceMetric => {
1195                self.process_with_processor(
1196                    &self.inner.processing.trace_metrics,
1197                    managed_envelope,
1198                    ctx,
1199                )
1200                .await
1201            }
1202            ProcessingGroup::SpanV2 => {
1203                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1204                    .await
1205            }
1206            ProcessingGroup::TraceAttachment => {
1207                self.process_with_processor(
1208                    &self.inner.processing.trace_attachments,
1209                    managed_envelope,
1210                    ctx,
1211                )
1212                .await
1213            }
1214            ProcessingGroup::Span => {
1215                self.process_with_processor(
1216                    &self.inner.processing.legacy_spans,
1217                    managed_envelope,
1218                    ctx,
1219                )
1220                .await
1221            }
1222            ProcessingGroup::ProfileChunk => {
1223                self.process_with_processor(
1224                    &self.inner.processing.profile_chunks,
1225                    managed_envelope,
1226                    ctx,
1227                )
1228                .await
1229            }
1230            // Ungrouped items indicate bugs, as all items should have an associated processor,
1231            // or be handled separately (like metrics).
1232            ProcessingGroup::Ungrouped => {
1233                relay_log::error!(
1234                    tags.project = %project_id,
1235                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
1236                    "could not identify the processing group based on the envelope's items"
1237                );
1238
1239                Err(ProcessingError::NoProcessingGroup)
1240            }
1241            // Leave this group unchanged.
1242            //
1243            // This will later be forwarded to upstream.
1244            ProcessingGroup::ForwardUnknown => {
1245                self.process_with_processor(
1246                    &self.inner.processing.forward_unknown,
1247                    managed_envelope,
1248                    ctx,
1249                )
1250                .await
1251            }
1252        }
1253    }
1254
1255    /// Processes the envelope and returns the processed envelope back.
1256    ///
1257    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
1258    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
1259    /// to be dropped, this is `None`.
1260    async fn process<'a>(
1261        &self,
1262        mut message: ProcessEnvelopeGrouped<'a>,
1263    ) -> Result<Option<(Outputs, ForwardContext<'a>)>, ProcessingError> {
1264        let ProcessEnvelopeGrouped {
1265            ref mut envelope,
1266            ctx,
1267            ..
1268        } = message;
1269
1270        // Prefer the project's project ID, and fall back to the stated project id from the
1271        // envelope. The project ID is available in all modes, other than in proxy mode, where
1272        // envelopes for unknown projects are forwarded blindly.
1273        //
1274        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
1275        // since we cannot process an envelope without project ID, so drop it.
1276        let Some(project_id) = ctx
1277            .project_info
1278            .project_id
1279            .or_else(|| envelope.envelope().meta().project_id())
1280        else {
1281            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1282            return Err(ProcessingError::MissingProjectId);
1283        };
1284
1285        let client = envelope.envelope().meta().client().map(str::to_owned);
1286        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1287        let project_key = envelope.envelope().meta().public_key();
1288        // Only allow sending to the sampling key, if we successfully loaded a sampling project
1289        // info relating to it. This filters out unknown/invalid project keys as well as project
1290        // keys from different organizations.
1291        let sampling_key = envelope
1292            .envelope()
1293            .sampling_key()
1294            .filter(|_| ctx.sampling_project_info.is_some());
1295
1296        // We set additional information on the scope, which will be removed after processing the
1297        // envelope.
1298        relay_log::configure_scope(|scope| {
1299            scope.set_tag("project", project_id);
1300            if let Some(client) = client {
1301                scope.set_tag("sdk", client);
1302            }
1303            if let Some(user_agent) = user_agent {
1304                scope.set_extra("user_agent", user_agent.into());
1305            }
1306        });
1307
1308        let result =
1309            self.process_envelope(project_id, message)
1310                .await
1311                .map(|Output { main, metrics }| {
1312                    if let Some(metrics) = metrics {
1313                        metrics.accept(|metrics| {
1314                            send_metrics(
1315                                metrics,
1316                                project_key,
1317                                sampling_key,
1318                                &self.inner.addrs.aggregator,
1319                            );
1320                        });
1321                    }
1322
1323                    let ctx = ctx.to_forward();
1324                    main.map(|output| (output, ctx))
1325                });
1326
1327        relay_log::configure_scope(|scope| {
1328            scope.remove_tag("project");
1329            scope.remove_tag("sdk");
1330            scope.remove_tag("user_agent");
1331        });
1332
1333        result
1334    }
1335
1336    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1337        let project_key = message.envelope.envelope().meta().public_key();
1338        let wait_time = message.envelope.age();
1339        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1340
1341        // This COGS handling may need an overhaul in the future:
1342        // Cancel the passed in token, to start individual measurements per envelope instead.
1343        cogs.cancel();
1344
1345        let scoping = message.envelope.scoping();
1346        for (group, envelope) in ProcessingGroup::split_envelope(
1347            *message.envelope.into_envelope(),
1348            &message.project_info,
1349        ) {
1350            let mut cogs = self
1351                .inner
1352                .cogs
1353                .timed(ResourceId::Relay, AppFeature::from(group));
1354
1355            let mut envelope =
1356                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1357            envelope.scope(scoping);
1358
1359            let global_config = self.inner.global_config.current();
1360
1361            let ctx = processing::Context {
1362                config: &self.inner.config,
1363                global_config: &global_config,
1364                project_info: &message.project_info,
1365                sampling_project_info: message.sampling_project_info.as_deref(),
1366                rate_limits: &message.rate_limits,
1367                reservoir_counters: &message.reservoir_counters,
1368            };
1369
1370            let message = ProcessEnvelopeGrouped {
1371                group,
1372                envelope,
1373                ctx,
1374            };
1375
1376            let result = metric!(
1377                timer(RelayTimers::EnvelopeProcessingTime),
1378                group = group.variant(),
1379                { self.process(message).await }
1380            );
1381
1382            match result {
1383                Ok(Some((output, ctx))) => self.submit_upstream(&mut cogs, output, ctx),
1384                Ok(None) => {}
1385                Err(error) if error.is_unexpected() => {
1386                    relay_log::error!(
1387                        tags.project_key = %project_key,
1388                        error = &error as &dyn Error,
1389                        "error processing envelope"
1390                    )
1391                }
1392                Err(error) => {
1393                    relay_log::debug!(
1394                        tags.project_key = %project_key,
1395                        error = &error as &dyn Error,
1396                        "error processing envelope"
1397                    )
1398                }
1399            }
1400        }
1401    }
1402
1403    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1404        let ProcessMetrics {
1405            data,
1406            project_key,
1407            received_at,
1408            sent_at,
1409            source,
1410        } = message;
1411
1412        let received_timestamp =
1413            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1414
1415        let mut buckets = data.into_buckets(received_timestamp);
1416        if buckets.is_empty() {
1417            return;
1418        };
1419        cogs.update(relay_metrics::cogs::BySize(&buckets));
1420
1421        let clock_drift_processor =
1422            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
1423
1424        buckets.retain_mut(|bucket| {
1425            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
1426                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
1427                return false;
1428            }
1429
1430            if !self::metrics::is_valid_namespace(bucket) {
1431                return false;
1432            }
1433
1434            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
1435
1436            if !matches!(source, BucketSource::Internal) {
1437                bucket.metadata = BucketMetadata::new(received_timestamp);
1438            }
1439
1440            true
1441        });
1442
1443        let project = self.inner.project_cache.get(project_key);
1444
1445        // Best effort check to filter and rate limit buckets, if there is no project state
1446        // available at the current time, we will check again after flushing.
1447        let buckets = match project.state() {
1448            ProjectState::Enabled(project_info) => {
1449                let rate_limits = project.rate_limits().current_limits();
1450                self.check_buckets(project_key, project_info, &rate_limits, buckets)
1451            }
1452            _ => buckets,
1453        };
1454
1455        relay_log::trace!("merging metric buckets into the aggregator");
1456        self.inner
1457            .addrs
1458            .aggregator
1459            .send(MergeBuckets::new(project_key, buckets));
1460    }
1461
1462    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
1463        let ProcessBatchedMetrics {
1464            payload,
1465            source,
1466            received_at,
1467            sent_at,
1468        } = message;
1469
1470        #[derive(serde::Deserialize)]
1471        struct Wrapper {
1472            buckets: HashMap<ProjectKey, Vec<Bucket>>,
1473        }
1474
1475        let buckets = match serde_json::from_slice(&payload) {
1476            Ok(Wrapper { buckets }) => buckets,
1477            Err(error) => {
1478                relay_log::debug!(
1479                    error = &error as &dyn Error,
1480                    "failed to parse batched metrics",
1481                );
1482                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1483                return;
1484            }
1485        };
1486
1487        for (project_key, buckets) in buckets {
1488            self.handle_process_metrics(
1489                cogs,
1490                ProcessMetrics {
1491                    data: MetricData::Parsed(buckets),
1492                    project_key,
1493                    source,
1494                    received_at,
1495                    sent_at,
1496                },
1497            )
1498        }
1499    }
1500
1501    /// Submits a processor [`Output`] to the appropriate upstream.
1502    ///
1503    /// If processing is enabled, the upstream is Kafka.
1504    fn submit_upstream(
1505        &self,
1506        cogs: &mut Token,
1507        output: Outputs,
1508        ctx: processing::ForwardContext<'_>,
1509    ) {
1510        let _submit = cogs.start_category("submit");
1511
1512        #[cfg(feature = "processing")]
1513        if ctx.config.processing_enabled()
1514            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
1515        {
1516            use crate::processing::StoreHandle;
1517
1518            let objectstore = self.inner.addrs.objectstore.as_ref();
1519            let handle = StoreHandle::new(store_forwarder, objectstore, ctx.global_config);
1520
1521            output
1522                .forward_store(handle, ctx)
1523                .unwrap_or_else(|err| err.into_inner());
1524
1525            return;
1526        }
1527
1528        match output.serialize_envelope(ctx) {
1529            Ok(envelope) => {
1530                let envelope = ManagedEnvelope::from(envelope);
1531                self.submit_envelope_upstream(envelope);
1532            }
1533            Err(_) => relay_log::error!("failed to serialize output to an envelope"),
1534        };
1535    }
1536
1537    fn submit_envelope_upstream(&self, mut envelope: ManagedEnvelope) {
1538        if envelope.envelope_mut().is_empty() {
1539            envelope.accept();
1540            return;
1541        }
1542
1543        // No code path should hit this.
1544        //
1545        // Any item which is produced by processing is handled in `submit_upstream`,
1546        // metrics are sent to the store directly and outcomes must be produced to Kafka
1547        // instead of being sent onward as client report.
1548        if self.inner.config.processing_enabled() {
1549            relay_log::error!(
1550                "attempt to forward envelope to http upstream when processing is enabled"
1551            );
1552            return;
1553        }
1554
1555        // Override the `sent_at` timestamp. Since the envelope went through basic
1556        // normalization, all timestamps have been corrected. We propagate the new
1557        // `sent_at` to allow the next Relay to double-check this timestamp and
1558        // potentially apply correction again. This is done as close to sending as
1559        // possible so that we avoid internal delays.
1560        envelope.envelope_mut().set_sent_at(Utc::now());
1561
1562        relay_log::trace!("sending envelope to sentry endpoint");
1563        let http_encoding = self.inner.config.http_encoding();
1564        let result = envelope.envelope().to_vec().and_then(|v| {
1565            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
1566        });
1567
1568        match result {
1569            Ok(body) => {
1570                self.inner
1571                    .addrs
1572                    .upstream_relay
1573                    .send(SendRequest(SendEnvelope {
1574                        envelope,
1575                        body,
1576                        http_encoding,
1577                        project_cache: self.inner.project_cache.clone(),
1578                    }));
1579            }
1580            Err(error) => {
1581                // Errors are only logged for what we consider an internal discard reason. These
1582                // indicate errors in the infrastructure or implementation bugs.
1583                relay_log::error!(
1584                    error = &error as &dyn Error,
1585                    tags.project_key = %envelope.scoping().project_key,
1586                    "failed to serialize envelope payload"
1587                );
1588
1589                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1590            }
1591        }
1592    }
1593
1594    fn handle_submit_client_reports(&self, message: SubmitClientReports) {
1595        let SubmitClientReports {
1596            client_reports,
1597            scoping,
1598        } = message;
1599
1600        let upstream = self.inner.config.upstream();
1601        let dsn = PartialDsn::outbound(&scoping, upstream);
1602
1603        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
1604        for client_report in client_reports {
1605            match client_report.serialize() {
1606                Ok(payload) => {
1607                    let mut item = Item::new(ItemType::ClientReport);
1608                    item.set_payload(ContentType::Json, payload);
1609                    envelope.add_item(item);
1610                }
1611                Err(error) => {
1612                    relay_log::error!(
1613                        error = &error as &dyn std::error::Error,
1614                        "failed to serialize client report"
1615                    );
1616                }
1617            }
1618        }
1619
1620        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1621        self.submit_envelope_upstream(envelope);
1622    }
1623
1624    fn check_buckets(
1625        &self,
1626        project_key: ProjectKey,
1627        project_info: &ProjectInfo,
1628        rate_limits: &RateLimits,
1629        buckets: Vec<Bucket>,
1630    ) -> Vec<Bucket> {
1631        let Some(scoping) = project_info.scoping(project_key) else {
1632            relay_log::error!(
1633                tags.project_key = project_key.as_str(),
1634                "there is no scoping: dropping {} buckets",
1635                buckets.len(),
1636            );
1637            return Vec::new();
1638        };
1639
1640        let mut buckets = self::metrics::apply_project_info(
1641            buckets,
1642            &self.inner.metric_outcomes,
1643            project_info,
1644            scoping,
1645        );
1646
1647        let namespaces: BTreeSet<MetricNamespace> = buckets
1648            .iter()
1649            .filter_map(|bucket| bucket.name.try_namespace())
1650            .collect();
1651
1652        for namespace in namespaces {
1653            let limits = rate_limits.check_with_quotas(
1654                project_info.get_quotas(),
1655                scoping.item(DataCategory::MetricBucket),
1656            );
1657
1658            if limits.is_limited() {
1659                let rejected;
1660                (buckets, rejected) = utils::split_off(buckets, |bucket| {
1661                    bucket.name.try_namespace() == Some(namespace)
1662                });
1663
1664                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1665                self.inner.metric_outcomes.track(
1666                    scoping,
1667                    &rejected,
1668                    Outcome::RateLimited(reason_code),
1669                );
1670            }
1671        }
1672
1673        let quotas = project_info.config.quotas.clone();
1674        match MetricsLimiter::create(buckets, quotas, scoping) {
1675            Ok(mut bucket_limiter) => {
1676                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
1677                bucket_limiter.into_buckets()
1678            }
1679            Err(buckets) => buckets,
1680        }
1681    }
1682
1683    #[cfg(feature = "processing")]
1684    async fn rate_limit_buckets(
1685        &self,
1686        scoping: Scoping,
1687        project_info: &ProjectInfo,
1688        mut buckets: Vec<Bucket>,
1689    ) -> Vec<Bucket> {
1690        let Some(rate_limiter) = &self.inner.rate_limiter else {
1691            return buckets;
1692        };
1693
1694        let global_config = self.inner.global_config.current();
1695        let namespaces = buckets
1696            .iter()
1697            .filter_map(|bucket| bucket.name.try_namespace())
1698            .counts();
1699
1700        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
1701
1702        for (namespace, quantity) in namespaces {
1703            let item_scoping = scoping.metric_bucket(namespace);
1704
1705            let limits = match rate_limiter
1706                .is_rate_limited(quotas, item_scoping, quantity, false)
1707                .await
1708            {
1709                Ok(limits) => limits,
1710                Err(err) => {
1711                    relay_log::error!(
1712                        error = &err as &dyn std::error::Error,
1713                        "failed to check redis rate limits"
1714                    );
1715                    break;
1716                }
1717            };
1718
1719            if limits.is_limited() {
1720                let rejected;
1721                (buckets, rejected) = utils::split_off(buckets, |bucket| {
1722                    bucket.name.try_namespace() == Some(namespace)
1723                });
1724
1725                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1726                self.inner.metric_outcomes.track(
1727                    scoping,
1728                    &rejected,
1729                    Outcome::RateLimited(reason_code),
1730                );
1731
1732                self.inner
1733                    .project_cache
1734                    .get(item_scoping.scoping.project_key)
1735                    .rate_limits()
1736                    .merge(limits);
1737            }
1738        }
1739
1740        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
1741            Err(buckets) => buckets,
1742            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
1743        }
1744    }
1745
1746    /// Check and apply rate limits to metrics buckets for transactions and spans.
1747    #[cfg(feature = "processing")]
1748    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
1749        relay_log::trace!("handle_rate_limit_buckets");
1750
1751        let scoping = *bucket_limiter.scoping();
1752
1753        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
1754            let global_config = self.inner.global_config.current();
1755            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
1756
1757            // We set over_accept_once such that the limit is actually reached, which allows subsequent
1758            // calls with quantity=0 to be rate limited.
1759            let over_accept_once = true;
1760            let mut rate_limits = RateLimits::new();
1761
1762            let (category, count) = bucket_limiter.count();
1763
1764            let timer = Instant::now();
1765            let mut is_limited = false;
1766
1767            if let Some(count) = count {
1768                match rate_limiter
1769                    .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
1770                    .await
1771                {
1772                    Ok(limits) => {
1773                        is_limited = limits.is_limited();
1774                        rate_limits.merge(limits)
1775                    }
1776                    Err(e) => {
1777                        relay_log::error!(error = &e as &dyn Error, "rate limiting error")
1778                    }
1779                }
1780            }
1781
1782            relay_statsd::metric!(
1783                timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
1784                category = category.name(),
1785                limited = if is_limited { "true" } else { "false" },
1786                count = match count {
1787                    None => "none",
1788                    Some(0) => "0",
1789                    Some(1) => "1",
1790                    Some(1..=10) => "10",
1791                    Some(1..=25) => "25",
1792                    Some(1..=50) => "50",
1793                    Some(51..=100) => "100",
1794                    Some(101..=500) => "500",
1795                    _ => "> 500",
1796                },
1797            );
1798
1799            if rate_limits.is_limited() {
1800                let was_enforced =
1801                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
1802
1803                if was_enforced {
1804                    // Update the rate limits in the project cache.
1805                    self.inner
1806                        .project_cache
1807                        .get(scoping.project_key)
1808                        .rate_limits()
1809                        .merge(rate_limits);
1810                }
1811            }
1812        }
1813
1814        bucket_limiter.into_buckets()
1815    }
1816
1817    /// Processes metric buckets and sends them to Kafka.
1818    ///
1819    /// This function runs the following steps:
1820    ///  - rate limiting
1821    ///  - submit to `StoreForwarder`
1822    #[cfg(feature = "processing")]
1823    async fn encode_metrics_processing(
1824        &self,
1825        message: FlushBuckets,
1826        store_forwarder: &Addr<Store>,
1827    ) {
1828        use crate::constants::DEFAULT_EVENT_RETENTION;
1829        use crate::services::store::StoreMetrics;
1830
1831        for ProjectBuckets {
1832            buckets,
1833            scoping,
1834            project_info,
1835            ..
1836        } in message.buckets.into_values()
1837        {
1838            let buckets = self
1839                .rate_limit_buckets(scoping, &project_info, buckets)
1840                .await;
1841
1842            if buckets.is_empty() {
1843                continue;
1844            }
1845
1846            let retention = project_info
1847                .config
1848                .event_retention
1849                .unwrap_or(DEFAULT_EVENT_RETENTION);
1850
1851            // The store forwarder takes care of bucket splitting internally, so we can submit the
1852            // entire list of buckets. There is no batching needed here.
1853            store_forwarder.send(StoreMetrics {
1854                buckets,
1855                scoping,
1856                retention,
1857            });
1858        }
1859    }
1860
1861    /// Serializes metric buckets to JSON and sends them to the upstream.
1862    ///
1863    /// This function runs the following steps:
1864    ///  - partitioning
1865    ///  - batching by configured size limit
1866    ///  - serialize to JSON and pack in an envelope
1867    ///  - submit the envelope to upstream or kafka depending on configuration
1868    ///
1869    /// Rate limiting runs only in processing Relays as it requires access to the central Redis instance.
1870    /// Cached rate limits are applied in the project cache already.
1871    fn encode_metrics_envelope(&self, message: FlushBuckets) {
1872        let FlushBuckets {
1873            partition_key,
1874            buckets,
1875        } = message;
1876
1877        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
1878        let upstream = self.inner.config.upstream();
1879
1880        for ProjectBuckets {
1881            buckets, scoping, ..
1882        } in buckets.values()
1883        {
1884            let dsn = PartialDsn::outbound(scoping, upstream);
1885
1886            relay_statsd::metric!(
1887                distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
1888            );
1889
1890            let mut num_batches = 0;
1891            for batch in BucketsView::from(buckets).by_size(batch_size) {
1892                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
1893
1894                let mut item = Item::new(ItemType::MetricBuckets);
1895                item.set_source_quantities(crate::metrics::extract_quantities(batch));
1896                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
1897                envelope.add_item(item);
1898
1899                let mut envelope =
1900                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1901                envelope
1902                    .set_partition_key(Some(partition_key))
1903                    .scope(*scoping);
1904
1905                relay_statsd::metric!(
1906                    distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
1907                );
1908
1909                self.submit_envelope_upstream(envelope);
1910                num_batches += 1;
1911            }
1912
1913            relay_statsd::metric!(
1914                distribution(RelayDistributions::BatchesPerPartition) = num_batches
1915            );
1916        }
1917    }
1918
1919    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
1920    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
1921        if partition.is_empty() {
1922            return;
1923        }
1924
1925        let (unencoded, project_info) = partition.take();
1926        let http_encoding = self.inner.config.http_encoding();
1927        let encoded = match encode_payload(&unencoded, http_encoding) {
1928            Ok(payload) => payload,
1929            Err(error) => {
1930                let error = &error as &dyn std::error::Error;
1931                relay_log::error!(error, "failed to encode metrics payload");
1932                return;
1933            }
1934        };
1935
1936        let request = SendMetricsRequest {
1937            partition_key: partition_key.to_string(),
1938            unencoded,
1939            encoded,
1940            project_info,
1941            http_encoding,
1942            metric_outcomes: self.inner.metric_outcomes.clone(),
1943        };
1944
1945        self.inner.addrs.upstream_relay.send(SendRequest(request));
1946    }
1947
1948    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
1949    ///
1950    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
1951    /// payload directly instead of per-project Envelopes.
1952    ///
1953    /// This function runs the following steps:
1954    ///  - partitioning
1955    ///  - batching by configured size limit
1956    ///  - serialize to JSON
1957    ///  - submit directly to the upstream
1958    fn encode_metrics_global(&self, message: FlushBuckets) {
1959        let FlushBuckets {
1960            partition_key,
1961            buckets,
1962        } = message;
1963
1964        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
1965        let mut partition = Partition::new(batch_size);
1966        let mut partition_splits = 0;
1967
1968        for ProjectBuckets {
1969            buckets, scoping, ..
1970        } in buckets.values()
1971        {
1972            for bucket in buckets {
1973                let mut remaining = Some(BucketView::new(bucket));
1974
1975                while let Some(bucket) = remaining.take() {
1976                    if let Some(next) = partition.insert(bucket, *scoping) {
1977                        // A part of the bucket could not be inserted. Take the partition and submit
1978                        // it immediately. Repeat until the final part was inserted. This should
1979                        // always result in a request, otherwise we would enter an endless loop.
1980                        self.send_global_partition(partition_key, &mut partition);
1981                        remaining = Some(next);
1982                        partition_splits += 1;
1983                    }
1984                }
1985            }
1986        }
1987
1988        if partition_splits > 0 {
1989            metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
1990        }
1991
1992        self.send_global_partition(partition_key, &mut partition);
1993    }
1994
1995    async fn handle_flush_buckets(&self, mut message: FlushBuckets) {
1996        for (project_key, pb) in message.buckets.iter_mut() {
1997            let buckets = std::mem::take(&mut pb.buckets);
1998            pb.buckets =
1999                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2000        }
2001
2002        #[cfg(feature = "processing")]
2003        if self.inner.config.processing_enabled()
2004            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2005        {
2006            return self
2007                .encode_metrics_processing(message, store_forwarder)
2008                .await;
2009        }
2010
2011        if self.inner.config.http_global_metrics() {
2012            self.encode_metrics_global(message)
2013        } else {
2014            self.encode_metrics_envelope(message)
2015        }
2016    }
2017
2018    #[cfg(all(test, feature = "processing"))]
2019    fn redis_rate_limiter_enabled(&self) -> bool {
2020        self.inner.rate_limiter.is_some()
2021    }
2022
2023    async fn handle_message(&self, message: EnvelopeProcessor) {
2024        let ty = message.variant();
2025        let feature_weights = self.feature_weights(&message);
2026
2027        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2028            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2029
2030            match message {
2031                EnvelopeProcessor::ProcessEnvelope(m) => {
2032                    self.handle_process_envelope(&mut cogs, *m).await
2033                }
2034                EnvelopeProcessor::ProcessProjectMetrics(m) => {
2035                    self.handle_process_metrics(&mut cogs, *m)
2036                }
2037                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2038                    self.handle_process_batched_metrics(&mut cogs, *m)
2039                }
2040                EnvelopeProcessor::FlushBuckets(m) => self.handle_flush_buckets(*m).await,
2041                EnvelopeProcessor::SubmitClientReports(m) => self.handle_submit_client_reports(*m),
2042            }
2043        });
2044    }
2045
2046    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2047        match message {
2048            // Envelope is split later and tokens are attributed then.
2049            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2050            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2051            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2052            EnvelopeProcessor::FlushBuckets(v) => v
2053                .buckets
2054                .values()
2055                .map(|s| {
2056                    if self.inner.config.processing_enabled() {
2057                        // Processing does not encode the metrics but instead rate limit the metrics,
2058                        // which scales by count and not size.
2059                        relay_metrics::cogs::ByCount(&s.buckets).into()
2060                    } else {
2061                        relay_metrics::cogs::BySize(&s.buckets).into()
2062                    }
2063                })
2064                .fold(FeatureWeights::none(), FeatureWeights::merge),
2065            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2066        }
2067    }
2068}
2069
2070impl Service for EnvelopeProcessorService {
2071    type Interface = EnvelopeProcessor;
2072
2073    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2074        while let Some(message) = rx.recv().await {
2075            let service = self.clone();
2076            self.inner
2077                .pool
2078                .spawn_async(
2079                    async move {
2080                        service.handle_message(message).await;
2081                    }
2082                    .boxed(),
2083                )
2084                .await;
2085        }
2086    }
2087}
2088
2089pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2090    let envelope_body: Vec<u8> = match http_encoding {
2091        HttpEncoding::Identity => return Ok(body.clone()),
2092        HttpEncoding::Deflate => {
2093            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2094            encoder.write_all(body.as_ref())?;
2095            encoder.finish()?
2096        }
2097        HttpEncoding::Gzip => {
2098            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2099            encoder.write_all(body.as_ref())?;
2100            encoder.finish()?
2101        }
2102        HttpEncoding::Br => {
2103            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
2104            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2105            encoder.write_all(body.as_ref())?;
2106            encoder.into_inner()
2107        }
2108        HttpEncoding::Zstd => {
2109            // Use the fastest compression level, our main objective here is to get the best
2110            // compression ratio for least amount of time spent.
2111            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2112            encoder.write_all(body.as_ref())?;
2113            encoder.finish()?
2114        }
2115    };
2116
2117    Ok(envelope_body.into())
2118}
2119
2120/// An upstream request that submits an envelope via HTTP.
2121#[derive(Debug)]
2122pub struct SendEnvelope {
2123    pub envelope: ManagedEnvelope,
2124    pub body: Bytes,
2125    pub http_encoding: HttpEncoding,
2126    pub project_cache: ProjectCacheHandle,
2127}
2128
2129impl UpstreamRequest for SendEnvelope {
2130    fn method(&self) -> reqwest::Method {
2131        reqwest::Method::POST
2132    }
2133
2134    fn path(&self) -> Cow<'_, str> {
2135        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2136    }
2137
2138    fn route(&self) -> &'static str {
2139        "envelope"
2140    }
2141
2142    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2143        let envelope_body = self.body.clone();
2144        metric!(
2145            distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2146        );
2147
2148        let meta = &self.envelope.meta();
2149        let shard = self.envelope.partition_key().map(|p| p.to_string());
2150        builder
2151            .content_encoding(self.http_encoding)
2152            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2153            .header_opt("User-Agent", meta.user_agent())
2154            .header("X-Sentry-Auth", meta.auth_header())
2155            .header("X-Forwarded-For", meta.forwarded_for())
2156            .header("Content-Type", envelope::CONTENT_TYPE)
2157            .header_opt("X-Sentry-Relay-Shard", shard)
2158            .body(envelope_body);
2159
2160        Ok(())
2161    }
2162
2163    fn sign(&mut self) -> Option<Sign> {
2164        Some(Sign::Optional(SignatureType::RequestSign))
2165    }
2166
2167    fn respond(
2168        self: Box<Self>,
2169        result: Result<http::Response, UpstreamRequestError>,
2170    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2171        Box::pin(async move {
2172            let result = match result {
2173                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2174                Err(error) => Err(error),
2175            };
2176
2177            match result {
2178                Ok(()) => self.envelope.accept(),
2179                Err(error) if error.is_received() => {
2180                    let scoping = self.envelope.scoping();
2181                    self.envelope.accept();
2182
2183                    if let UpstreamRequestError::RateLimited(limits) = error {
2184                        self.project_cache
2185                            .get(scoping.project_key)
2186                            .rate_limits()
2187                            .merge(limits.scope(&scoping));
2188                    }
2189                }
2190                Err(error) => {
2191                    // Errors are only logged for what we consider an internal discard reason. These
2192                    // indicate errors in the infrastructure or implementation bugs.
2193                    let mut envelope = self.envelope;
2194                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2195                    relay_log::error!(
2196                        error = &error as &dyn Error,
2197                        tags.project_key = %envelope.scoping().project_key,
2198                        "error sending envelope"
2199                    );
2200                }
2201            }
2202        })
2203    }
2204}
2205
2206/// A container for metric buckets from multiple projects.
2207///
2208/// This container is used to send metrics to the upstream in global batches as part of the
2209/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
2210/// the size of all metrics and allows to split them into multiple batches. See
2211/// [`insert`](Self::insert) for more information.
2212#[derive(Debug)]
2213struct Partition<'a> {
2214    max_size: usize,
2215    remaining: usize,
2216    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2217    project_info: HashMap<ProjectKey, Scoping>,
2218}
2219
2220impl<'a> Partition<'a> {
2221    /// Creates a new partition with the given maximum size in bytes.
2222    pub fn new(size: usize) -> Self {
2223        Self {
2224            max_size: size,
2225            remaining: size,
2226            views: HashMap::new(),
2227            project_info: HashMap::new(),
2228        }
2229    }
2230
2231    /// Inserts a bucket into the partition, splitting it if necessary.
2232    ///
2233    /// This function attempts to add the bucket to this partition. If the bucket does not fit
2234    /// entirely into the partition given its maximum size, the remaining part of the bucket is
2235    /// returned from this function call.
2236    ///
2237    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
2238    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
2239    /// partition. Afterwards, the caller is responsible to call this function again with the
2240    /// remaining bucket until it is fully inserted.
2241    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
2242        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
2243
2244        if let Some(current) = current {
2245            self.remaining = self.remaining.saturating_sub(current.estimated_size());
2246            self.views
2247                .entry(scoping.project_key)
2248                .or_default()
2249                .push(current);
2250
2251            self.project_info
2252                .entry(scoping.project_key)
2253                .or_insert(scoping);
2254        }
2255
2256        next
2257    }
2258
2259    /// Returns `true` if the partition does not hold any data.
2260    fn is_empty(&self) -> bool {
2261        self.views.is_empty()
2262    }
2263
2264    /// Returns the serialized buckets for this partition.
2265    ///
2266    /// This empties the partition, so that it can be reused.
2267    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
2268        #[derive(serde::Serialize)]
2269        struct Wrapper<'a> {
2270            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
2271        }
2272
2273        let buckets = &self.views;
2274        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
2275
2276        let scopings = self.project_info.clone();
2277        self.project_info.clear();
2278
2279        self.views.clear();
2280        self.remaining = self.max_size;
2281
2282        (payload, scopings)
2283    }
2284}
2285
2286/// An upstream request that submits metric buckets via HTTP.
2287///
2288/// This request is not awaited. It automatically tracks outcomes if the request is not received.
2289#[derive(Debug)]
2290struct SendMetricsRequest {
2291    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
2292    partition_key: String,
2293    /// Serialized metric buckets without encoding applied, used for signing.
2294    unencoded: Bytes,
2295    /// Serialized metric buckets with the stated HTTP encoding applied.
2296    encoded: Bytes,
2297    /// Mapping of all contained project keys to their scoping and extraction mode.
2298    ///
2299    /// Used to track outcomes for transmission failures.
2300    project_info: HashMap<ProjectKey, Scoping>,
2301    /// Encoding (compression) of the payload.
2302    http_encoding: HttpEncoding,
2303    /// Metric outcomes instance to send outcomes on error.
2304    metric_outcomes: MetricOutcomes,
2305}
2306
2307impl SendMetricsRequest {
2308    fn create_error_outcomes(self) {
2309        #[derive(serde::Deserialize)]
2310        struct Wrapper {
2311            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
2312        }
2313
2314        let buckets = match serde_json::from_slice(&self.unencoded) {
2315            Ok(Wrapper { buckets }) => buckets,
2316            Err(err) => {
2317                relay_log::error!(
2318                    error = &err as &dyn std::error::Error,
2319                    "failed to parse buckets from failed transmission"
2320                );
2321                return;
2322            }
2323        };
2324
2325        for (key, buckets) in buckets {
2326            let Some(&scoping) = self.project_info.get(&key) else {
2327                relay_log::error!("missing scoping for project key");
2328                continue;
2329            };
2330
2331            self.metric_outcomes.track(
2332                scoping,
2333                &buckets,
2334                Outcome::Invalid(DiscardReason::Internal),
2335            );
2336        }
2337    }
2338}
2339
2340impl UpstreamRequest for SendMetricsRequest {
2341    fn set_relay_id(&self) -> bool {
2342        true
2343    }
2344
2345    fn sign(&mut self) -> Option<Sign> {
2346        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
2347    }
2348
2349    fn method(&self) -> reqwest::Method {
2350        reqwest::Method::POST
2351    }
2352
2353    fn path(&self) -> Cow<'_, str> {
2354        "/api/0/relays/metrics/".into()
2355    }
2356
2357    fn route(&self) -> &'static str {
2358        "global_metrics"
2359    }
2360
2361    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2362        metric!(
2363            distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
2364        );
2365
2366        builder
2367            .content_encoding(self.http_encoding)
2368            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
2369            .header(header::CONTENT_TYPE, b"application/json")
2370            .body(self.encoded.clone());
2371
2372        Ok(())
2373    }
2374
2375    fn respond(
2376        self: Box<Self>,
2377        result: Result<http::Response, UpstreamRequestError>,
2378    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2379        Box::pin(async {
2380            match result {
2381                Ok(mut response) => {
2382                    response.consume().await.ok();
2383                }
2384                Err(error) => {
2385                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
2386
2387                    // If the request did not arrive at the upstream, we are responsible for outcomes.
2388                    // Otherwise, the upstream is responsible to log outcomes.
2389                    if error.is_received() {
2390                        return;
2391                    }
2392
2393                    self.create_error_outcomes()
2394                }
2395            }
2396        })
2397    }
2398}
2399
2400/// Container for global and project level [`Quota`].
2401#[derive(Copy, Clone, Debug)]
2402#[cfg(feature = "processing")]
2403struct CombinedQuotas<'a> {
2404    global_quotas: &'a [Quota],
2405    project_quotas: &'a [Quota],
2406}
2407
2408#[cfg(feature = "processing")]
2409impl<'a> CombinedQuotas<'a> {
2410    /// Returns a new [`CombinedQuotas`].
2411    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
2412        Self {
2413            global_quotas: &global_config.quotas,
2414            project_quotas,
2415        }
2416    }
2417}
2418
2419#[cfg(feature = "processing")]
2420impl<'a> IntoIterator for CombinedQuotas<'a> {
2421    type Item = &'a Quota;
2422    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
2423
2424    fn into_iter(self) -> Self::IntoIter {
2425        self.global_quotas.iter().chain(self.project_quotas.iter())
2426    }
2427}
2428
2429#[cfg(test)]
2430mod tests {
2431    use insta::assert_debug_snapshot;
2432    use relay_common::glob2::LazyGlob;
2433    use relay_dynamic_config::ProjectConfig;
2434    use relay_event_normalization::{
2435        NormalizationConfig, RedactionRule, TransactionNameConfig, TransactionNameRule,
2436    };
2437    use relay_event_schema::protocol::TransactionSource;
2438    use relay_pii::DataScrubbingConfig;
2439    use similar_asserts::assert_eq;
2440
2441    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
2442
2443    #[cfg(feature = "processing")]
2444    use {
2445        relay_metrics::BucketValue,
2446        relay_quotas::{QuotaScope, ReasonCode},
2447        relay_test::mock_service,
2448    };
2449
2450    use super::*;
2451
2452    #[cfg(feature = "processing")]
2453    fn mock_quota(id: &str) -> Quota {
2454        Quota {
2455            id: Some(id.into()),
2456            categories: [DataCategory::MetricBucket].into(),
2457            scope: QuotaScope::Organization,
2458            scope_id: None,
2459            limit: Some(0),
2460            window: None,
2461            reason_code: None,
2462            namespace: None,
2463        }
2464    }
2465
2466    #[cfg(feature = "processing")]
2467    #[test]
2468    fn test_dynamic_quotas() {
2469        let global_config = relay_dynamic_config::GlobalConfig {
2470            quotas: vec![mock_quota("foo"), mock_quota("bar")],
2471            ..Default::default()
2472        };
2473
2474        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
2475
2476        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
2477
2478        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
2479        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
2480    }
2481
2482    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
2483    /// also ratelimit the next batches in the same message automatically.
2484    #[cfg(feature = "processing")]
2485    #[tokio::test]
2486    async fn test_ratelimit_per_batch() {
2487        use relay_base_schema::organization::OrganizationId;
2488        use relay_protocol::FiniteF64;
2489
2490        let rate_limited_org = Scoping {
2491            organization_id: OrganizationId::new(1),
2492            project_id: ProjectId::new(21),
2493            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
2494            key_id: Some(17),
2495        };
2496
2497        let not_rate_limited_org = Scoping {
2498            organization_id: OrganizationId::new(2),
2499            project_id: ProjectId::new(21),
2500            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
2501            key_id: Some(17),
2502        };
2503
2504        let message = {
2505            let project_info = {
2506                let quota = Quota {
2507                    id: Some("testing".into()),
2508                    categories: [DataCategory::MetricBucket].into(),
2509                    scope: relay_quotas::QuotaScope::Organization,
2510                    scope_id: Some(rate_limited_org.organization_id.to_string().into()),
2511                    limit: Some(0),
2512                    window: None,
2513                    reason_code: Some(ReasonCode::new("test")),
2514                    namespace: None,
2515                };
2516
2517                let mut config = ProjectConfig::default();
2518                config.quotas.push(quota);
2519
2520                Arc::new(ProjectInfo {
2521                    config,
2522                    ..Default::default()
2523                })
2524            };
2525
2526            let project_metrics = |scoping| ProjectBuckets {
2527                buckets: vec![Bucket {
2528                    name: "d:spans/bar".into(),
2529                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
2530                    timestamp: UnixTimestamp::now(),
2531                    tags: Default::default(),
2532                    width: 10,
2533                    metadata: BucketMetadata::default(),
2534                }],
2535                rate_limits: Default::default(),
2536                project_info: project_info.clone(),
2537                scoping,
2538            };
2539
2540            let buckets = hashbrown::HashMap::from([
2541                (
2542                    rate_limited_org.project_key,
2543                    project_metrics(rate_limited_org),
2544                ),
2545                (
2546                    not_rate_limited_org.project_key,
2547                    project_metrics(not_rate_limited_org),
2548                ),
2549            ]);
2550
2551            FlushBuckets {
2552                partition_key: 0,
2553                buckets,
2554            }
2555        };
2556
2557        // ensure the order of the map while iterating is as expected.
2558        assert_eq!(message.buckets.keys().count(), 2);
2559
2560        let config = {
2561            let config_json = serde_json::json!({
2562                "processing": {
2563                    "enabled": true,
2564                    "kafka_config": [],
2565                    "redis": {
2566                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
2567                    }
2568                }
2569            });
2570            Config::from_json_value(config_json).unwrap()
2571        };
2572
2573        let (store, handle) = {
2574            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
2575                let org_id = match msg {
2576                    Store::Metrics(x) => x.scoping.organization_id,
2577                    _ => panic!("received envelope when expecting only metrics"),
2578                };
2579                org_ids.push(org_id);
2580            };
2581
2582            mock_service("store_forwarder", vec![], f)
2583        };
2584
2585        let processor = create_test_processor(config).await;
2586        assert!(processor.redis_rate_limiter_enabled());
2587
2588        processor.encode_metrics_processing(message, &store).await;
2589
2590        drop(store);
2591        let orgs_not_ratelimited = handle.await.unwrap();
2592
2593        assert_eq!(
2594            orgs_not_ratelimited,
2595            vec![not_rate_limited_org.organization_id]
2596        );
2597    }
2598
2599    #[tokio::test]
2600    async fn test_browser_version_extraction_with_pii_like_data() {
2601        let processor = create_test_processor(Default::default()).await;
2602        let outcome_aggregator = Addr::dummy();
2603        let event_id = EventId::new();
2604
2605        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2606            .parse()
2607            .unwrap();
2608
2609        let request_meta = RequestMeta::new(dsn);
2610        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
2611
2612        envelope.add_item({
2613                let mut item = Item::new(ItemType::Event);
2614                item.set_payload(
2615                    ContentType::Json,
2616                    r#"
2617                    {
2618                        "request": {
2619                            "headers": [
2620                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
2621                            ]
2622                        }
2623                    }
2624                "#,
2625                );
2626                item
2627            });
2628
2629        let mut datascrubbing_settings = DataScrubbingConfig::default();
2630        // enable all the default scrubbing
2631        datascrubbing_settings.scrub_data = true;
2632        datascrubbing_settings.scrub_defaults = true;
2633        datascrubbing_settings.scrub_ip_addresses = true;
2634
2635        // Make sure to mask any IP-like looking data
2636        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
2637
2638        let config = ProjectConfig {
2639            datascrubbing_settings,
2640            pii_config: Some(pii_config),
2641            ..Default::default()
2642        };
2643
2644        let project_info = ProjectInfo {
2645            config,
2646            ..Default::default()
2647        };
2648
2649        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
2650        assert_eq!(envelopes.len(), 1);
2651
2652        let (group, envelope) = envelopes.pop().unwrap();
2653        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2654
2655        let message = ProcessEnvelopeGrouped {
2656            group,
2657            envelope,
2658            ctx: processing::Context {
2659                project_info: &project_info,
2660                ..processing::Context::for_test()
2661            },
2662        };
2663
2664        let Ok(Some((output, ctx))) = processor.process(message).await else {
2665            panic!();
2666        };
2667        let new_envelope = output.serialize_envelope(ctx).unwrap().accept(|f| f);
2668
2669        let event_item = new_envelope.items().last().unwrap();
2670        let annotated_event: Annotated<Event> =
2671            Annotated::from_json_bytes(&event_item.payload()).unwrap();
2672        let event = annotated_event.into_value().unwrap();
2673        let headers = event
2674            .request
2675            .into_value()
2676            .unwrap()
2677            .headers
2678            .into_value()
2679            .unwrap();
2680
2681        // IP-like data must be masked
2682        assert_eq!(
2683            Some(
2684                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
2685            ),
2686            headers.get_header("User-Agent")
2687        );
2688        // But we still get correct browser and version number
2689        let contexts = event.contexts.into_value().unwrap();
2690        let browser = contexts.0.get("browser").unwrap();
2691        assert_eq!(
2692            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
2693            browser.to_json().unwrap()
2694        );
2695    }
2696
2697    #[tokio::test]
2698    #[cfg(feature = "processing")]
2699    async fn test_materialize_dsc() {
2700        use crate::services::projects::project::PublicKeyConfig;
2701
2702        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2703            .parse()
2704            .unwrap();
2705        let request_meta = RequestMeta::new(dsn);
2706        let mut envelope = Envelope::from_request(None, request_meta);
2707
2708        let dsc = r#"{
2709            "trace_id": "00000000-0000-0000-0000-000000000000",
2710            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
2711            "sample_rate": "0.2"
2712        }"#;
2713        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
2714
2715        let mut item = Item::new(ItemType::Event);
2716        item.set_payload(ContentType::Json, r#"{}"#);
2717        envelope.add_item(item);
2718
2719        let outcome_aggregator = Addr::dummy();
2720        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2721
2722        let mut project_info = ProjectInfo::default();
2723        project_info.public_keys.push(PublicKeyConfig {
2724            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
2725            numeric_id: Some(1),
2726        });
2727
2728        let config = serde_json::json!({
2729            "processing": {
2730                "enabled": true,
2731                "kafka_config": [],
2732            }
2733        });
2734
2735        let message = ProcessEnvelopeGrouped {
2736            group: ProcessingGroup::Error,
2737            envelope: managed_envelope,
2738            ctx: processing::Context {
2739                config: &Config::from_json_value(config.clone()).unwrap(),
2740                project_info: &project_info,
2741                sampling_project_info: Some(&project_info),
2742                ..processing::Context::for_test()
2743            },
2744        };
2745
2746        let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
2747        let Ok(Some((output, ctx))) = processor.process(message).await else {
2748            panic!();
2749        };
2750        let envelope = output.serialize_envelope(ctx).unwrap();
2751        let event = envelope
2752            .get_item_by(|item| item.ty() == &ItemType::Event)
2753            .unwrap();
2754
2755        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
2756        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
2757        Object(
2758            {
2759                "environment": ~,
2760                "public_key": String(
2761                    "e12d836b15bb49d7bbf99e64295d995b",
2762                ),
2763                "release": ~,
2764                "replay_id": ~,
2765                "sample_rate": String(
2766                    "0.2",
2767                ),
2768                "trace_id": String(
2769                    "00000000000000000000000000000000",
2770                ),
2771                "transaction": ~,
2772            },
2773        )
2774        "###);
2775    }
2776
2777    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
2778        let mut event = Annotated::<Event>::from_json(
2779            r#"
2780            {
2781                "type": "transaction",
2782                "transaction": "/foo/",
2783                "timestamp": 946684810.0,
2784                "start_timestamp": 946684800.0,
2785                "contexts": {
2786                    "trace": {
2787                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
2788                        "span_id": "fa90fdead5f74053",
2789                        "op": "http.server",
2790                        "type": "trace"
2791                    }
2792                },
2793                "transaction_info": {
2794                    "source": "url"
2795                }
2796            }
2797            "#,
2798        )
2799        .unwrap();
2800        let e = event.value_mut().as_mut().unwrap();
2801        e.transaction.set_value(Some(transaction_name.into()));
2802
2803        e.transaction_info
2804            .value_mut()
2805            .as_mut()
2806            .unwrap()
2807            .source
2808            .set_value(Some(source));
2809
2810        relay_statsd::with_capturing_test_client(|| {
2811            utils::log_transaction_name_metrics(&mut event, |event| {
2812                let config = NormalizationConfig {
2813                    transaction_name_config: TransactionNameConfig {
2814                        rules: &[TransactionNameRule {
2815                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
2816                            expiry: DateTime::<Utc>::MAX_UTC,
2817                            redaction: RedactionRule::Replace {
2818                                substitution: "*".to_owned(),
2819                            },
2820                        }],
2821                    },
2822                    ..Default::default()
2823                };
2824                relay_event_normalization::normalize_event(event, &config)
2825            });
2826        })
2827    }
2828
2829    #[test]
2830    fn test_log_transaction_metrics_none() {
2831        let captures = capture_test_event("/nothing", TransactionSource::Url);
2832        insta::assert_debug_snapshot!(captures, @r###"
2833        [
2834            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
2835        ]
2836        "###);
2837    }
2838
2839    #[test]
2840    fn test_log_transaction_metrics_rule() {
2841        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
2842        insta::assert_debug_snapshot!(captures, @r###"
2843        [
2844            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
2845        ]
2846        "###);
2847    }
2848
2849    #[test]
2850    fn test_log_transaction_metrics_pattern() {
2851        let captures = capture_test_event("/something/12345", TransactionSource::Url);
2852        insta::assert_debug_snapshot!(captures, @r###"
2853        [
2854            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
2855        ]
2856        "###);
2857    }
2858
2859    #[test]
2860    fn test_log_transaction_metrics_both() {
2861        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
2862        insta::assert_debug_snapshot!(captures, @r###"
2863        [
2864            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
2865        ]
2866        "###);
2867    }
2868
2869    #[test]
2870    fn test_log_transaction_metrics_no_match() {
2871        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
2872        insta::assert_debug_snapshot!(captures, @r###"
2873        [
2874            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
2875        ]
2876        "###);
2877    }
2878
2879    #[tokio::test]
2880    async fn test_process_metrics_bucket_metadata() {
2881        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
2882        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
2883        let received_at = Utc::now();
2884        let config = Config::default();
2885
2886        let (aggregator, mut aggregator_rx) = Addr::custom();
2887        let processor = create_test_processor_with_addrs(
2888            config,
2889            Addrs {
2890                aggregator,
2891                ..Default::default()
2892            },
2893        )
2894        .await;
2895
2896        let mut item = Item::new(ItemType::Statsd);
2897        item.set_payload(ContentType::Text, "spans/foo:3182887624:4267882815|s");
2898        for (source, expected_received_at) in [
2899            (
2900                BucketSource::External,
2901                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
2902            ),
2903            (BucketSource::Internal, None),
2904        ] {
2905            let message = ProcessMetrics {
2906                data: MetricData::Raw(vec![item.clone()]),
2907                project_key,
2908                source,
2909                received_at,
2910                sent_at: Some(Utc::now()),
2911            };
2912            processor.handle_process_metrics(&mut token, message);
2913
2914            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
2915            let buckets = merge_buckets.buckets;
2916            assert_eq!(buckets.len(), 1);
2917            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
2918        }
2919    }
2920
2921    #[tokio::test]
2922    async fn test_process_batched_metrics() {
2923        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
2924        let received_at = Utc::now();
2925        let config = Config::default();
2926
2927        let (aggregator, mut aggregator_rx) = Addr::custom();
2928        let processor = create_test_processor_with_addrs(
2929            config,
2930            Addrs {
2931                aggregator,
2932                ..Default::default()
2933            },
2934        )
2935        .await;
2936
2937        let payload = r#"{
2938    "buckets": {
2939        "11111111111111111111111111111111": [
2940            {
2941                "timestamp": 1615889440,
2942                "width": 0,
2943                "name": "d:custom/endpoint.response_time@millisecond",
2944                "type": "d",
2945                "value": [
2946                  68.0
2947                ],
2948                "tags": {
2949                  "route": "user_index"
2950                }
2951            }
2952        ],
2953        "22222222222222222222222222222222": [
2954            {
2955                "timestamp": 1615889440,
2956                "width": 0,
2957                "name": "d:custom/endpoint.cache_rate@none",
2958                "type": "d",
2959                "value": [
2960                  36.0
2961                ]
2962            }
2963        ]
2964    }
2965}
2966"#;
2967        let message = ProcessBatchedMetrics {
2968            payload: Bytes::from(payload),
2969            source: BucketSource::Internal,
2970            received_at,
2971            sent_at: Some(Utc::now()),
2972        };
2973        processor.handle_process_batched_metrics(&mut token, message);
2974
2975        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
2976        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
2977
2978        let mut messages = vec![mb1, mb2];
2979        messages.sort_by_key(|mb| mb.project_key);
2980
2981        let actual = messages
2982            .into_iter()
2983            .map(|mb| (mb.project_key, mb.buckets))
2984            .collect::<Vec<_>>();
2985
2986        assert_debug_snapshot!(actual, @r###"
2987        [
2988            (
2989                ProjectKey("11111111111111111111111111111111"),
2990                [
2991                    Bucket {
2992                        timestamp: UnixTimestamp(1615889440),
2993                        width: 0,
2994                        name: MetricName(
2995                            "d:custom/endpoint.response_time@millisecond",
2996                        ),
2997                        value: Distribution(
2998                            [
2999                                68.0,
3000                            ],
3001                        ),
3002                        tags: {
3003                            "route": "user_index",
3004                        },
3005                        metadata: BucketMetadata {
3006                            merges: 1,
3007                            received_at: None,
3008                            extracted_from_indexed: false,
3009                        },
3010                    },
3011                ],
3012            ),
3013            (
3014                ProjectKey("22222222222222222222222222222222"),
3015                [
3016                    Bucket {
3017                        timestamp: UnixTimestamp(1615889440),
3018                        width: 0,
3019                        name: MetricName(
3020                            "d:custom/endpoint.cache_rate@none",
3021                        ),
3022                        value: Distribution(
3023                            [
3024                                36.0,
3025                            ],
3026                        ),
3027                        tags: {},
3028                        metadata: BucketMetadata {
3029                            merges: 1,
3030                            received_at: None,
3031                            extracted_from_indexed: false,
3032                        },
3033                    },
3034                ],
3035            ),
3036        ]
3037        "###);
3038    }
3039}