relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::{Arc, Once};
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{CombinedMetricExtractionConfig, ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27    ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42    self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::{Integration, SpansIntegration};
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::types::ExtractMetricsError;
49use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor};
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::sessions::SessionsProcessor;
53use crate::processing::spans::SpansProcessor;
54use crate::processing::trace_metrics::TraceMetricsProcessor;
55use crate::processing::utils::event::{
56    EventFullyNormalized, EventMetricsExtracted, SpansExtracted, event_category, event_type,
57};
58use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
59use crate::service::ServiceError;
60use crate::services::global_config::GlobalConfigHandle;
61use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
62use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
63use crate::services::processor::event::FiltersStatus;
64use crate::services::projects::cache::ProjectCacheHandle;
65use crate::services::projects::project::{ProjectInfo, ProjectState};
66use crate::services::upstream::{
67    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
68};
69use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
70use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
71use crate::{http, processing};
72use relay_base_schema::organization::OrganizationId;
73use relay_threading::AsyncPool;
74#[cfg(feature = "processing")]
75use {
76    crate::services::global_rate_limits::{GlobalRateLimits, GlobalRateLimitsServiceHandle},
77    crate::services::processor::nnswitch::SwitchProcessingError,
78    crate::services::store::{Store, StoreEnvelope},
79    crate::utils::Enforcement,
80    itertools::Itertools,
81    relay_cardinality::{
82        CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
83        RedisSetLimiterOptions,
84    },
85    relay_dynamic_config::{CardinalityLimiterMode, MetricExtractionGroups},
86    relay_quotas::{RateLimitingError, RedisRateLimiter},
87    relay_redis::{AsyncRedisClient, RedisClients},
88    std::time::Instant,
89    symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
90};
91
92mod attachment;
93mod dynamic_sampling;
94mod event;
95mod metrics;
96mod nel;
97mod profile;
98mod profile_chunk;
99mod replay;
100mod report;
101mod span;
102
103#[cfg(all(sentry, feature = "processing"))]
104mod playstation;
105mod standalone;
106#[cfg(feature = "processing")]
107mod unreal;
108
109#[cfg(feature = "processing")]
110mod nnswitch;
111
112/// Creates the block only if used with `processing` feature.
113///
114/// Provided code block will be executed only if the provided config has `processing_enabled` set.
115macro_rules! if_processing {
116    ($config:expr, $if_true:block) => {
117        #[cfg(feature = "processing")] {
118            if $config.processing_enabled() $if_true
119        }
120    };
121    ($config:expr, $if_true:block else $if_false:block) => {
122        {
123            #[cfg(feature = "processing")] {
124                if $config.processing_enabled() $if_true else $if_false
125            }
126            #[cfg(not(feature = "processing"))] {
127                $if_false
128            }
129        }
130    };
131}
132
133/// The minimum clock drift for correction to apply.
134pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
135
136#[derive(Debug)]
137pub struct GroupTypeError;
138
139impl Display for GroupTypeError {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        f.write_str("failed to convert processing group into corresponding type")
142    }
143}
144
145impl std::error::Error for GroupTypeError {}
146
147macro_rules! processing_group {
148    ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
149        #[derive(Clone, Copy, Debug)]
150        pub struct $ty;
151
152        impl From<$ty> for ProcessingGroup {
153            fn from(_: $ty) -> Self {
154                ProcessingGroup::$variant
155            }
156        }
157
158        impl TryFrom<ProcessingGroup> for $ty {
159            type Error = GroupTypeError;
160
161            fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
162                if matches!(value, ProcessingGroup::$variant) {
163                    return Ok($ty);
164                }
165                $($(
166                    if matches!(value, ProcessingGroup::$other) {
167                        return Ok($ty);
168                    }
169                )+)?
170                return Err(GroupTypeError);
171            }
172        }
173    };
174}
175
176/// A marker trait.
177///
178/// Should be used only with groups which are responsible for processing envelopes with events.
179pub trait EventProcessing {}
180
181/// A trait for processing groups that can be dynamically sampled.
182pub trait Sampling {
183    /// Whether dynamic sampling should run under the given project's conditions.
184    fn supports_sampling(project_info: &ProjectInfo) -> bool;
185
186    /// Whether reservoir sampling applies to this processing group (a.k.a. data type).
187    fn supports_reservoir_sampling() -> bool;
188}
189
190processing_group!(TransactionGroup, Transaction);
191impl EventProcessing for TransactionGroup {}
192
193impl Sampling for TransactionGroup {
194    fn supports_sampling(project_info: &ProjectInfo) -> bool {
195        // For transactions, we require transaction metrics to be enabled before sampling.
196        matches!(&project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled())
197    }
198
199    fn supports_reservoir_sampling() -> bool {
200        true
201    }
202}
203
204processing_group!(ErrorGroup, Error);
205impl EventProcessing for ErrorGroup {}
206
207processing_group!(SessionGroup, Session);
208processing_group!(StandaloneGroup, Standalone);
209processing_group!(ClientReportGroup, ClientReport);
210processing_group!(ReplayGroup, Replay);
211processing_group!(CheckInGroup, CheckIn);
212processing_group!(LogGroup, Log, Nel);
213processing_group!(TraceMetricGroup, TraceMetric);
214processing_group!(SpanGroup, Span);
215
216impl Sampling for SpanGroup {
217    fn supports_sampling(project_info: &ProjectInfo) -> bool {
218        // If no metrics could be extracted, do not sample anything.
219        matches!(&project_info.config().metric_extraction, ErrorBoundary::Ok(c) if c.is_supported())
220    }
221
222    fn supports_reservoir_sampling() -> bool {
223        false
224    }
225}
226
227processing_group!(ProfileChunkGroup, ProfileChunk);
228processing_group!(MetricsGroup, Metrics);
229processing_group!(ForwardUnknownGroup, ForwardUnknown);
230processing_group!(Ungrouped, Ungrouped);
231
232/// Processed group type marker.
233///
234/// Marks the envelopes which passed through the processing pipeline.
235#[derive(Clone, Copy, Debug)]
236pub struct Processed;
237
238/// Describes the groups of the processable items.
239#[derive(Clone, Copy, Debug)]
240pub enum ProcessingGroup {
241    /// All the transaction related items.
242    ///
243    /// Includes transactions, related attachments, profiles.
244    Transaction,
245    /// All the items which require (have or create) events.
246    ///
247    /// This includes: errors, NEL, security reports, user reports, some of the
248    /// attachments.
249    Error,
250    /// Session events.
251    Session,
252    /// Standalone items which can be sent alone without any event attached to it in the current
253    /// envelope e.g. some attachments, user reports.
254    Standalone,
255    /// Outcomes.
256    ClientReport,
257    /// Replays and ReplayRecordings.
258    Replay,
259    /// Crons.
260    CheckIn,
261    /// NEL reports.
262    Nel,
263    /// Logs.
264    Log,
265    /// Trace metrics.
266    TraceMetric,
267    /// Spans.
268    Span,
269    /// Span V2 spans.
270    SpanV2,
271    /// Metrics.
272    Metrics,
273    /// ProfileChunk.
274    ProfileChunk,
275    /// Unknown item types will be forwarded upstream (to processing Relay), where we will
276    /// decide what to do with them.
277    ForwardUnknown,
278    /// All the items in the envelope that could not be grouped.
279    Ungrouped,
280}
281
282impl ProcessingGroup {
283    /// Splits provided envelope into list of tuples of groups with associated envelopes.
284    fn split_envelope(
285        mut envelope: Envelope,
286        project_info: &ProjectInfo,
287    ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
288        let headers = envelope.headers().clone();
289        let mut grouped_envelopes = smallvec![];
290
291        // Extract replays.
292        let replay_items = envelope.take_items_by(|item| {
293            matches!(
294                item.ty(),
295                &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
296            )
297        });
298        if !replay_items.is_empty() {
299            grouped_envelopes.push((
300                ProcessingGroup::Replay,
301                Envelope::from_parts(headers.clone(), replay_items),
302            ))
303        }
304
305        // Keep all the sessions together in one envelope.
306        let session_items = envelope
307            .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
308        if !session_items.is_empty() {
309            grouped_envelopes.push((
310                ProcessingGroup::Session,
311                Envelope::from_parts(headers.clone(), session_items),
312            ))
313        }
314
315        let span_v2_items = envelope.take_items_by(|item| {
316            let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
317            let is_supported_integration = matches!(
318                item.integration(),
319                Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
320            );
321            let is_span = matches!(item.ty(), &ItemType::Span);
322
323            ItemContainer::<SpanV2>::is_container(item)
324                || (exp_feature && is_span)
325                || (exp_feature && is_supported_integration)
326        });
327
328        if !span_v2_items.is_empty() {
329            grouped_envelopes.push((
330                ProcessingGroup::SpanV2,
331                Envelope::from_parts(headers.clone(), span_v2_items),
332            ))
333        }
334
335        // Extract spans.
336        let span_items = envelope.take_items_by(|item| {
337            matches!(item.ty(), &ItemType::Span)
338                || matches!(item.integration(), Some(Integration::Spans(_)))
339        });
340
341        if !span_items.is_empty() {
342            grouped_envelopes.push((
343                ProcessingGroup::Span,
344                Envelope::from_parts(headers.clone(), span_items),
345            ))
346        }
347
348        // Extract logs.
349        let logs_items = envelope.take_items_by(|item| {
350            matches!(item.ty(), &ItemType::Log)
351                || matches!(item.integration(), Some(Integration::Logs(_)))
352        });
353        if !logs_items.is_empty() {
354            grouped_envelopes.push((
355                ProcessingGroup::Log,
356                Envelope::from_parts(headers.clone(), logs_items),
357            ))
358        }
359
360        // Extract trace metrics.
361        let trace_metric_items =
362            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
363        if !trace_metric_items.is_empty() {
364            grouped_envelopes.push((
365                ProcessingGroup::TraceMetric,
366                Envelope::from_parts(headers.clone(), trace_metric_items),
367            ))
368        }
369
370        // NEL items are transformed into logs in their own processing step.
371        let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
372        if !nel_items.is_empty() {
373            grouped_envelopes.push((
374                ProcessingGroup::Nel,
375                Envelope::from_parts(headers.clone(), nel_items),
376            ))
377        }
378
379        // Extract all metric items.
380        //
381        // Note: Should only be relevant in proxy mode. In other modes we send metrics through
382        // a separate pipeline.
383        let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
384        if !metric_items.is_empty() {
385            grouped_envelopes.push((
386                ProcessingGroup::Metrics,
387                Envelope::from_parts(headers.clone(), metric_items),
388            ))
389        }
390
391        // Extract profile chunks.
392        let profile_chunk_items =
393            envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
394        if !profile_chunk_items.is_empty() {
395            grouped_envelopes.push((
396                ProcessingGroup::ProfileChunk,
397                Envelope::from_parts(headers.clone(), profile_chunk_items),
398            ))
399        }
400
401        // Extract all standalone items.
402        //
403        // Note: only if there are no items in the envelope which can create events, otherwise they
404        // will be in the same envelope with all require event items.
405        if !envelope.items().any(Item::creates_event) {
406            let standalone_items = envelope.take_items_by(Item::requires_event);
407            if !standalone_items.is_empty() {
408                grouped_envelopes.push((
409                    ProcessingGroup::Standalone,
410                    Envelope::from_parts(headers.clone(), standalone_items),
411                ))
412            }
413        };
414
415        // Make sure we create separate envelopes for each `RawSecurity` report.
416        let security_reports_items = envelope
417            .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
418            .into_iter()
419            .map(|item| {
420                let headers = headers.clone();
421                let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
422                let mut envelope = Envelope::from_parts(headers, items);
423                envelope.set_event_id(EventId::new());
424                (ProcessingGroup::Error, envelope)
425            });
426        grouped_envelopes.extend(security_reports_items);
427
428        // Extract all the items which require an event into separate envelope.
429        let require_event_items = envelope.take_items_by(Item::requires_event);
430        if !require_event_items.is_empty() {
431            let group = if require_event_items
432                .iter()
433                .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
434            {
435                ProcessingGroup::Transaction
436            } else {
437                ProcessingGroup::Error
438            };
439
440            grouped_envelopes.push((
441                group,
442                Envelope::from_parts(headers.clone(), require_event_items),
443            ))
444        }
445
446        // Get the rest of the envelopes, one per item.
447        let envelopes = envelope.items_mut().map(|item| {
448            let headers = headers.clone();
449            let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
450            let envelope = Envelope::from_parts(headers, items);
451            let item_type = item.ty();
452            let group = if matches!(item_type, &ItemType::CheckIn) {
453                ProcessingGroup::CheckIn
454            } else if matches!(item.ty(), &ItemType::ClientReport) {
455                ProcessingGroup::ClientReport
456            } else if matches!(item_type, &ItemType::Unknown(_)) {
457                ProcessingGroup::ForwardUnknown
458            } else {
459                // Cannot group this item type.
460                ProcessingGroup::Ungrouped
461            };
462
463            (group, envelope)
464        });
465        grouped_envelopes.extend(envelopes);
466
467        grouped_envelopes
468    }
469
470    /// Returns the name of the group.
471    pub fn variant(&self) -> &'static str {
472        match self {
473            ProcessingGroup::Transaction => "transaction",
474            ProcessingGroup::Error => "error",
475            ProcessingGroup::Session => "session",
476            ProcessingGroup::Standalone => "standalone",
477            ProcessingGroup::ClientReport => "client_report",
478            ProcessingGroup::Replay => "replay",
479            ProcessingGroup::CheckIn => "check_in",
480            ProcessingGroup::Log => "log",
481            ProcessingGroup::TraceMetric => "trace_metric",
482            ProcessingGroup::Nel => "nel",
483            ProcessingGroup::Span => "span",
484            ProcessingGroup::SpanV2 => "span_v2",
485            ProcessingGroup::Metrics => "metrics",
486            ProcessingGroup::ProfileChunk => "profile_chunk",
487            ProcessingGroup::ForwardUnknown => "forward_unknown",
488            ProcessingGroup::Ungrouped => "ungrouped",
489        }
490    }
491}
492
493impl From<ProcessingGroup> for AppFeature {
494    fn from(value: ProcessingGroup) -> Self {
495        match value {
496            ProcessingGroup::Transaction => AppFeature::Transactions,
497            ProcessingGroup::Error => AppFeature::Errors,
498            ProcessingGroup::Session => AppFeature::Sessions,
499            ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
500            ProcessingGroup::ClientReport => AppFeature::ClientReports,
501            ProcessingGroup::Replay => AppFeature::Replays,
502            ProcessingGroup::CheckIn => AppFeature::CheckIns,
503            ProcessingGroup::Log => AppFeature::Logs,
504            ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
505            ProcessingGroup::Nel => AppFeature::Logs,
506            ProcessingGroup::Span => AppFeature::Spans,
507            ProcessingGroup::SpanV2 => AppFeature::Spans,
508            ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
509            ProcessingGroup::ProfileChunk => AppFeature::Profiles,
510            ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
511            ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
512        }
513    }
514}
515
516/// An error returned when handling [`ProcessEnvelope`].
517#[derive(Debug, thiserror::Error)]
518pub enum ProcessingError {
519    #[error("invalid json in event")]
520    InvalidJson(#[source] serde_json::Error),
521
522    #[error("invalid message pack event payload")]
523    InvalidMsgpack(#[from] rmp_serde::decode::Error),
524
525    #[cfg(feature = "processing")]
526    #[error("invalid unreal crash report")]
527    InvalidUnrealReport(#[source] Unreal4Error),
528
529    #[error("event payload too large")]
530    PayloadTooLarge(DiscardItemType),
531
532    #[error("invalid transaction event")]
533    InvalidTransaction,
534
535    #[error("envelope processor failed")]
536    ProcessingFailed(#[from] ProcessingAction),
537
538    #[error("duplicate {0} in event")]
539    DuplicateItem(ItemType),
540
541    #[error("failed to extract event payload")]
542    NoEventPayload,
543
544    #[error("missing project id in DSN")]
545    MissingProjectId,
546
547    #[error("invalid security report type: {0:?}")]
548    InvalidSecurityType(Bytes),
549
550    #[error("unsupported security report type")]
551    UnsupportedSecurityType,
552
553    #[error("invalid security report")]
554    InvalidSecurityReport(#[source] serde_json::Error),
555
556    #[error("invalid nel report")]
557    InvalidNelReport(#[source] NetworkReportError),
558
559    #[error("event filtered with reason: {0:?}")]
560    EventFiltered(FilterStatKey),
561
562    #[error("missing or invalid required event timestamp")]
563    InvalidTimestamp,
564
565    #[error("could not serialize event payload")]
566    SerializeFailed(#[source] serde_json::Error),
567
568    #[cfg(feature = "processing")]
569    #[error("failed to apply quotas")]
570    QuotasFailed(#[from] RateLimitingError),
571
572    #[error("invalid pii config")]
573    PiiConfigError(PiiConfigError),
574
575    #[error("invalid processing group type")]
576    InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
577
578    #[error("invalid replay")]
579    InvalidReplay(DiscardReason),
580
581    #[error("replay filtered with reason: {0:?}")]
582    ReplayFiltered(FilterStatKey),
583
584    #[cfg(feature = "processing")]
585    #[error("nintendo switch dying message processing failed {0:?}")]
586    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
587
588    #[cfg(all(sentry, feature = "processing"))]
589    #[error("playstation dump processing failed: {0}")]
590    InvalidPlaystationDump(String),
591
592    #[error("processing group does not match specific processor")]
593    ProcessingGroupMismatch,
594    #[error("new processing pipeline failed")]
595    ProcessingFailure,
596}
597
598impl ProcessingError {
599    pub fn to_outcome(&self) -> Option<Outcome> {
600        match self {
601            Self::PayloadTooLarge(payload_type) => {
602                Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
603            }
604            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
605            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
606            Self::InvalidSecurityType(_) => {
607                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
608            }
609            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
610            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
611            Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
612            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
613            Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
614            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
615            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
616            #[cfg(feature = "processing")]
617            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
618            #[cfg(all(sentry, feature = "processing"))]
619            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
620            #[cfg(feature = "processing")]
621            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
622                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
623            }
624            #[cfg(feature = "processing")]
625            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
626            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
627                Some(Outcome::Invalid(DiscardReason::Internal))
628            }
629            #[cfg(feature = "processing")]
630            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
631            Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
632            Self::MissingProjectId => None,
633            Self::EventFiltered(_) => None,
634            Self::InvalidProcessingGroup(_) => None,
635            Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
636            Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
637
638            Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
639            // Outcomes are emitted in the new processing pipeline already.
640            Self::ProcessingFailure => None,
641        }
642    }
643
644    fn is_unexpected(&self) -> bool {
645        self.to_outcome()
646            .is_some_and(|outcome| outcome.is_unexpected())
647    }
648}
649
650#[cfg(feature = "processing")]
651impl From<Unreal4Error> for ProcessingError {
652    fn from(err: Unreal4Error) -> Self {
653        match err.kind() {
654            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
655            _ => ProcessingError::InvalidUnrealReport(err),
656        }
657    }
658}
659
660impl From<ExtractMetricsError> for ProcessingError {
661    fn from(error: ExtractMetricsError) -> Self {
662        match error {
663            ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
664                Self::InvalidTimestamp
665            }
666        }
667    }
668}
669
670impl From<InvalidProcessingGroupType> for ProcessingError {
671    fn from(value: InvalidProcessingGroupType) -> Self {
672        Self::InvalidProcessingGroup(Box::new(value))
673    }
674}
675
676type ExtractedEvent = (Annotated<Event>, usize);
677
678/// A container for extracted metrics during processing.
679///
680/// The container enforces that the extracted metrics are correctly tagged
681/// with the dynamic sampling decision.
682#[derive(Debug)]
683pub struct ProcessingExtractedMetrics {
684    metrics: ExtractedMetrics,
685}
686
687impl ProcessingExtractedMetrics {
688    pub fn new() -> Self {
689        Self {
690            metrics: ExtractedMetrics::default(),
691        }
692    }
693
694    /// Extends the contained metrics with [`ExtractedMetrics`].
695    pub fn extend(
696        &mut self,
697        extracted: ExtractedMetrics,
698        sampling_decision: Option<SamplingDecision>,
699    ) {
700        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
701        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
702    }
703
704    /// Extends the contained project metrics.
705    pub fn extend_project_metrics<I>(
706        &mut self,
707        buckets: I,
708        sampling_decision: Option<SamplingDecision>,
709    ) where
710        I: IntoIterator<Item = Bucket>,
711    {
712        self.metrics
713            .project_metrics
714            .extend(buckets.into_iter().map(|mut bucket| {
715                bucket.metadata.extracted_from_indexed =
716                    sampling_decision == Some(SamplingDecision::Keep);
717                bucket
718            }));
719    }
720
721    /// Extends the contained sampling metrics.
722    pub fn extend_sampling_metrics<I>(
723        &mut self,
724        buckets: I,
725        sampling_decision: Option<SamplingDecision>,
726    ) where
727        I: IntoIterator<Item = Bucket>,
728    {
729        self.metrics
730            .sampling_metrics
731            .extend(buckets.into_iter().map(|mut bucket| {
732                bucket.metadata.extracted_from_indexed =
733                    sampling_decision == Some(SamplingDecision::Keep);
734                bucket
735            }));
736    }
737
738    /// Applies rate limits to the contained metrics.
739    ///
740    /// This is used to apply rate limits which have been enforced on sampled items of an envelope
741    /// to also consistently apply to the metrics extracted from these items.
742    #[cfg(feature = "processing")]
743    fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
744        // Metric namespaces which need to be dropped.
745        let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
746        // Metrics belonging to this metric namespace need to have the `extracted_from_indexed`
747        // flag reset to `false`.
748        let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
749
750        for (namespace, limit, indexed) in [
751            (
752                MetricNamespace::Transactions,
753                &enforcement.event,
754                &enforcement.event_indexed,
755            ),
756            (
757                MetricNamespace::Spans,
758                &enforcement.spans,
759                &enforcement.spans_indexed,
760            ),
761        ] {
762            if limit.is_active() {
763                drop_namespaces.push(namespace);
764            } else if indexed.is_active() && !enforced_consistently {
765                // If the enforcement was not computed by consistently checking the limits,
766                // the quota for the metrics has not yet been incremented.
767                // In this case we have a dropped indexed payload but a metric which still needs to
768                // be accounted for, make sure the metric will still be rate limited.
769                reset_extracted_from_indexed.push(namespace);
770            }
771        }
772
773        if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
774            self.retain_mut(|bucket| {
775                let Some(namespace) = bucket.name.try_namespace() else {
776                    return true;
777                };
778
779                if drop_namespaces.contains(&namespace) {
780                    return false;
781                }
782
783                if reset_extracted_from_indexed.contains(&namespace) {
784                    bucket.metadata.extracted_from_indexed = false;
785                }
786
787                true
788            });
789        }
790    }
791
792    #[cfg(feature = "processing")]
793    fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
794        self.metrics.project_metrics.retain_mut(&mut f);
795        self.metrics.sampling_metrics.retain_mut(&mut f);
796    }
797}
798
799fn send_metrics(
800    metrics: ExtractedMetrics,
801    project_key: ProjectKey,
802    sampling_key: Option<ProjectKey>,
803    aggregator: &Addr<Aggregator>,
804) {
805    let ExtractedMetrics {
806        project_metrics,
807        sampling_metrics,
808    } = metrics;
809
810    if !project_metrics.is_empty() {
811        aggregator.send(MergeBuckets {
812            project_key,
813            buckets: project_metrics,
814        });
815    }
816
817    if !sampling_metrics.is_empty() {
818        // If no sampling project state is available, we associate the sampling
819        // metrics with the current project.
820        //
821        // project_without_tracing         -> metrics goes to self
822        // dependent_project_with_tracing  -> metrics goes to root
823        // root_project_with_tracing       -> metrics goes to root == self
824        let sampling_project_key = sampling_key.unwrap_or(project_key);
825        aggregator.send(MergeBuckets {
826            project_key: sampling_project_key,
827            buckets: sampling_metrics,
828        });
829    }
830}
831
832/// Function for on-off switches that filter specific item types (profiles, spans)
833/// based on a feature flag.
834///
835/// If the project config did not come from the upstream, we keep the items.
836fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
837    match config.relay_mode() {
838        RelayMode::Proxy => false,
839        RelayMode::Managed => !project_info.has_feature(feature),
840    }
841}
842
843/// The result of the envelope processing containing the processed envelope along with the partial
844/// result.
845#[derive(Debug)]
846#[expect(
847    clippy::large_enum_variant,
848    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
849)]
850enum ProcessingResult {
851    Envelope {
852        managed_envelope: TypedEnvelope<Processed>,
853        extracted_metrics: ProcessingExtractedMetrics,
854    },
855    Output(Output<Outputs>),
856}
857
858impl ProcessingResult {
859    /// Creates a [`ProcessingResult`] with no metrics.
860    fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
861        Self::Envelope {
862            managed_envelope,
863            extracted_metrics: ProcessingExtractedMetrics::new(),
864        }
865    }
866}
867
868/// All items which can be submitted upstream.
869#[derive(Debug)]
870#[expect(
871    clippy::large_enum_variant,
872    reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
873)]
874enum Submit<'a> {
875    /// A processed envelope.
876    Envelope(TypedEnvelope<Processed>),
877    /// The output of a [`processing::Processor`].
878    Output {
879        output: Outputs,
880        ctx: processing::ForwardContext<'a>,
881    },
882}
883
884/// Applies processing to all contents of the given envelope.
885///
886/// Depending on the contents of the envelope and Relay's mode, this includes:
887///
888///  - Basic normalization and validation for all item types.
889///  - Clock drift correction if the required `sent_at` header is present.
890///  - Expansion of certain item types (e.g. unreal).
891///  - Store normalization for event payloads in processing mode.
892///  - Rate limiters and inbound filters on events in processing mode.
893#[derive(Debug)]
894pub struct ProcessEnvelope {
895    /// Envelope to process.
896    pub envelope: ManagedEnvelope,
897    /// The project info.
898    pub project_info: Arc<ProjectInfo>,
899    /// Currently active cached rate limits for this project.
900    pub rate_limits: Arc<RateLimits>,
901    /// Root sampling project info.
902    pub sampling_project_info: Option<Arc<ProjectInfo>>,
903    /// Sampling reservoir counters.
904    pub reservoir_counters: ReservoirCounters,
905}
906
907/// Like a [`ProcessEnvelope`], but with an envelope which has been grouped.
908#[derive(Debug)]
909struct ProcessEnvelopeGrouped<'a> {
910    /// The group the envelope belongs to.
911    pub group: ProcessingGroup,
912    /// Envelope to process.
913    pub envelope: ManagedEnvelope,
914    /// The processing context.
915    pub ctx: processing::Context<'a>,
916    /// Sampling reservoir counters.
917    pub reservoir_counters: &'a ReservoirCounters,
918}
919
920/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
921///
922/// This parses and validates the metrics:
923///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
924///    ignored independently.
925///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
926///    dropped together on parsing failure.
927///  - Other envelope items will be ignored with an error message.
928///
929/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
930/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
931#[derive(Debug)]
932pub struct ProcessMetrics {
933    /// A list of metric items.
934    pub data: MetricData,
935    /// The target project.
936    pub project_key: ProjectKey,
937    /// Whether to keep or reset the metric metadata.
938    pub source: BucketSource,
939    /// The wall clock time at which the request was received.
940    pub received_at: DateTime<Utc>,
941    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
942    /// correction.
943    pub sent_at: Option<DateTime<Utc>>,
944}
945
946/// Raw unparsed metric data.
947#[derive(Debug)]
948pub enum MetricData {
949    /// Raw data, unparsed envelope items.
950    Raw(Vec<Item>),
951    /// Already parsed buckets but unprocessed.
952    Parsed(Vec<Bucket>),
953}
954
955impl MetricData {
956    /// Consumes the metric data and parses the contained buckets.
957    ///
958    /// If the contained data is already parsed the buckets are returned unchanged.
959    /// Raw buckets are parsed and created with the passed `timestamp`.
960    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
961        let items = match self {
962            Self::Parsed(buckets) => return buckets,
963            Self::Raw(items) => items,
964        };
965
966        let mut buckets = Vec::new();
967        for item in items {
968            let payload = item.payload();
969            if item.ty() == &ItemType::Statsd {
970                for bucket_result in Bucket::parse_all(&payload, timestamp) {
971                    match bucket_result {
972                        Ok(bucket) => buckets.push(bucket),
973                        Err(error) => relay_log::debug!(
974                            error = &error as &dyn Error,
975                            "failed to parse metric bucket from statsd format",
976                        ),
977                    }
978                }
979            } else if item.ty() == &ItemType::MetricBuckets {
980                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
981                    Ok(parsed_buckets) => {
982                        // Re-use the allocation of `b` if possible.
983                        if buckets.is_empty() {
984                            buckets = parsed_buckets;
985                        } else {
986                            buckets.extend(parsed_buckets);
987                        }
988                    }
989                    Err(error) => {
990                        relay_log::debug!(
991                            error = &error as &dyn Error,
992                            "failed to parse metric bucket",
993                        );
994                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
995                    }
996                }
997            } else {
998                relay_log::error!(
999                    "invalid item of type {} passed to ProcessMetrics",
1000                    item.ty()
1001                );
1002            }
1003        }
1004        buckets
1005    }
1006}
1007
1008#[derive(Debug)]
1009pub struct ProcessBatchedMetrics {
1010    /// Metrics payload in JSON format.
1011    pub payload: Bytes,
1012    /// Whether to keep or reset the metric metadata.
1013    pub source: BucketSource,
1014    /// The wall clock time at which the request was received.
1015    pub received_at: DateTime<Utc>,
1016    /// The wall clock time at which the request was received.
1017    pub sent_at: Option<DateTime<Utc>>,
1018}
1019
1020/// Source information where a metric bucket originates from.
1021#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1022pub enum BucketSource {
1023    /// The metric bucket originated from an internal Relay use case.
1024    ///
1025    /// The metric bucket originates either from within the same Relay
1026    /// or was accepted coming from another Relay which is registered as
1027    /// an internal Relay via Relay's configuration.
1028    Internal,
1029    /// The bucket source originated from an untrusted source.
1030    ///
1031    /// Managed Relays sending extracted metrics are considered external,
1032    /// it's a project use case but it comes from an untrusted source.
1033    External,
1034}
1035
1036impl BucketSource {
1037    /// Infers the bucket source from [`RequestMeta::request_trust`].
1038    pub fn from_meta(meta: &RequestMeta) -> Self {
1039        match meta.request_trust() {
1040            RequestTrust::Trusted => Self::Internal,
1041            RequestTrust::Untrusted => Self::External,
1042        }
1043    }
1044}
1045
1046/// Sends a client report to the upstream.
1047#[derive(Debug)]
1048pub struct SubmitClientReports {
1049    /// The client report to be sent.
1050    pub client_reports: Vec<ClientReport>,
1051    /// Scoping information for the client report.
1052    pub scoping: Scoping,
1053}
1054
1055/// CPU-intensive processing tasks for envelopes.
1056#[derive(Debug)]
1057pub enum EnvelopeProcessor {
1058    ProcessEnvelope(Box<ProcessEnvelope>),
1059    ProcessProjectMetrics(Box<ProcessMetrics>),
1060    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1061    FlushBuckets(Box<FlushBuckets>),
1062    SubmitClientReports(Box<SubmitClientReports>),
1063}
1064
1065impl EnvelopeProcessor {
1066    /// Returns the name of the message variant.
1067    pub fn variant(&self) -> &'static str {
1068        match self {
1069            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1070            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1071            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1072            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1073            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1074        }
1075    }
1076}
1077
1078impl relay_system::Interface for EnvelopeProcessor {}
1079
1080impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1081    type Response = relay_system::NoResponse;
1082
1083    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1084        Self::ProcessEnvelope(Box::new(message))
1085    }
1086}
1087
1088impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1089    type Response = NoResponse;
1090
1091    fn from_message(message: ProcessMetrics, _: ()) -> Self {
1092        Self::ProcessProjectMetrics(Box::new(message))
1093    }
1094}
1095
1096impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1097    type Response = NoResponse;
1098
1099    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1100        Self::ProcessBatchedMetrics(Box::new(message))
1101    }
1102}
1103
1104impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1105    type Response = NoResponse;
1106
1107    fn from_message(message: FlushBuckets, _: ()) -> Self {
1108        Self::FlushBuckets(Box::new(message))
1109    }
1110}
1111
1112impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1113    type Response = NoResponse;
1114
1115    fn from_message(message: SubmitClientReports, _: ()) -> Self {
1116        Self::SubmitClientReports(Box::new(message))
1117    }
1118}
1119
1120/// The asynchronous thread pool used for scheduling processing tasks in the processor.
1121pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1122
1123/// Service implementing the [`EnvelopeProcessor`] interface.
1124///
1125/// This service handles messages in a worker pool with configurable concurrency.
1126#[derive(Clone)]
1127pub struct EnvelopeProcessorService {
1128    inner: Arc<InnerProcessor>,
1129}
1130
1131/// Contains the addresses of services that the processor publishes to.
1132pub struct Addrs {
1133    pub outcome_aggregator: Addr<TrackOutcome>,
1134    pub upstream_relay: Addr<UpstreamRelay>,
1135    #[cfg(feature = "processing")]
1136    pub store_forwarder: Option<Addr<Store>>,
1137    pub aggregator: Addr<Aggregator>,
1138    #[cfg(feature = "processing")]
1139    pub global_rate_limits: Option<Addr<GlobalRateLimits>>,
1140}
1141
1142impl Default for Addrs {
1143    fn default() -> Self {
1144        Addrs {
1145            outcome_aggregator: Addr::dummy(),
1146            upstream_relay: Addr::dummy(),
1147            #[cfg(feature = "processing")]
1148            store_forwarder: None,
1149            aggregator: Addr::dummy(),
1150            #[cfg(feature = "processing")]
1151            global_rate_limits: None,
1152        }
1153    }
1154}
1155
1156struct InnerProcessor {
1157    pool: EnvelopeProcessorServicePool,
1158    config: Arc<Config>,
1159    global_config: GlobalConfigHandle,
1160    project_cache: ProjectCacheHandle,
1161    cogs: Cogs,
1162    #[cfg(feature = "processing")]
1163    quotas_client: Option<AsyncRedisClient>,
1164    addrs: Addrs,
1165    #[cfg(feature = "processing")]
1166    rate_limiter: Option<Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>>,
1167    geoip_lookup: GeoIpLookup,
1168    #[cfg(feature = "processing")]
1169    cardinality_limiter: Option<CardinalityLimiter>,
1170    metric_outcomes: MetricOutcomes,
1171    processing: Processing,
1172}
1173
1174struct Processing {
1175    logs: LogsProcessor,
1176    trace_metrics: TraceMetricsProcessor,
1177    spans: SpansProcessor,
1178    check_ins: CheckInsProcessor,
1179    sessions: SessionsProcessor,
1180}
1181
1182impl EnvelopeProcessorService {
1183    /// Creates a multi-threaded envelope processor.
1184    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1185    pub fn new(
1186        pool: EnvelopeProcessorServicePool,
1187        config: Arc<Config>,
1188        global_config: GlobalConfigHandle,
1189        project_cache: ProjectCacheHandle,
1190        cogs: Cogs,
1191        #[cfg(feature = "processing")] redis: Option<RedisClients>,
1192        addrs: Addrs,
1193        metric_outcomes: MetricOutcomes,
1194    ) -> Self {
1195        let geoip_lookup = config
1196            .geoip_path()
1197            .and_then(
1198                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1199                    Ok(geoip) => Some(geoip),
1200                    Err(err) => {
1201                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1202                        None
1203                    }
1204                },
1205            )
1206            .unwrap_or_else(GeoIpLookup::empty);
1207
1208        #[cfg(feature = "processing")]
1209        let (cardinality, quotas) = match redis {
1210            Some(RedisClients {
1211                cardinality,
1212                quotas,
1213                ..
1214            }) => (Some(cardinality), Some(quotas)),
1215            None => (None, None),
1216        };
1217
1218        #[cfg(feature = "processing")]
1219        let global_rate_limits = addrs.global_rate_limits.clone().map(Into::into);
1220
1221        #[cfg(feature = "processing")]
1222        let rate_limiter = match (quotas.clone(), global_rate_limits) {
1223            (Some(redis), Some(global)) => {
1224                Some(RedisRateLimiter::new(redis, global).max_limit(config.max_rate_limit()))
1225            }
1226            _ => None,
1227        };
1228
1229        let quota_limiter = Arc::new(QuotaRateLimiter::new(
1230            #[cfg(feature = "processing")]
1231            project_cache.clone(),
1232            #[cfg(feature = "processing")]
1233            rate_limiter.clone(),
1234        ));
1235        #[cfg(feature = "processing")]
1236        let rate_limiter = rate_limiter.map(Arc::new);
1237
1238        let inner = InnerProcessor {
1239            pool,
1240            global_config,
1241            project_cache,
1242            cogs,
1243            #[cfg(feature = "processing")]
1244            quotas_client: quotas.clone(),
1245            #[cfg(feature = "processing")]
1246            rate_limiter,
1247            addrs,
1248            #[cfg(feature = "processing")]
1249            cardinality_limiter: cardinality
1250                .map(|cardinality| {
1251                    RedisSetLimiter::new(
1252                        RedisSetLimiterOptions {
1253                            cache_vacuum_interval: config
1254                                .cardinality_limiter_cache_vacuum_interval(),
1255                        },
1256                        cardinality,
1257                    )
1258                })
1259                .map(CardinalityLimiter::new),
1260            metric_outcomes,
1261            processing: Processing {
1262                logs: LogsProcessor::new(Arc::clone(&quota_limiter)),
1263                trace_metrics: TraceMetricsProcessor::new(Arc::clone(&quota_limiter)),
1264                spans: SpansProcessor::new(Arc::clone(&quota_limiter), geoip_lookup.clone()),
1265                check_ins: CheckInsProcessor::new(Arc::clone(&quota_limiter)),
1266                sessions: SessionsProcessor::new(quota_limiter),
1267            },
1268            geoip_lookup,
1269            config,
1270        };
1271
1272        Self {
1273            inner: Arc::new(inner),
1274        }
1275    }
1276
1277    async fn enforce_quotas<Group>(
1278        &self,
1279        managed_envelope: &mut TypedEnvelope<Group>,
1280        event: Annotated<Event>,
1281        extracted_metrics: &mut ProcessingExtractedMetrics,
1282        ctx: processing::Context<'_>,
1283    ) -> Result<Annotated<Event>, ProcessingError> {
1284        // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not
1285        // applied in the fast path, all cached quotas can be applied here.
1286        let cached_result = RateLimiter::Cached
1287            .enforce(managed_envelope, event, extracted_metrics, ctx)
1288            .await?;
1289
1290        if_processing!(self.inner.config, {
1291            let rate_limiter = match self.inner.rate_limiter.clone() {
1292                Some(rate_limiter) => rate_limiter,
1293                None => return Ok(cached_result.event),
1294            };
1295
1296            // Enforce all quotas consistently with Redis.
1297            let consistent_result = RateLimiter::Consistent(rate_limiter)
1298                .enforce(
1299                    managed_envelope,
1300                    cached_result.event,
1301                    extracted_metrics,
1302                    ctx
1303                )
1304                .await?;
1305
1306            // Update cached rate limits with the freshly computed ones.
1307            if !consistent_result.rate_limits.is_empty() {
1308                self.inner
1309                    .project_cache
1310                    .get(managed_envelope.scoping().project_key)
1311                    .rate_limits()
1312                    .merge(consistent_result.rate_limits);
1313            }
1314
1315            Ok(consistent_result.event)
1316        } else { Ok(cached_result.event) })
1317    }
1318
1319    /// Extract transaction metrics.
1320    #[allow(clippy::too_many_arguments)]
1321    fn extract_transaction_metrics(
1322        &self,
1323        managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1324        event: &mut Annotated<Event>,
1325        extracted_metrics: &mut ProcessingExtractedMetrics,
1326        project_id: ProjectId,
1327        project_info: &ProjectInfo,
1328        sampling_decision: SamplingDecision,
1329        event_metrics_extracted: EventMetricsExtracted,
1330        spans_extracted: SpansExtracted,
1331    ) -> Result<EventMetricsExtracted, ProcessingError> {
1332        if event_metrics_extracted.0 {
1333            return Ok(event_metrics_extracted);
1334        }
1335        let Some(event) = event.value_mut() else {
1336            return Ok(event_metrics_extracted);
1337        };
1338
1339        // NOTE: This function requires a `metric_extraction` in the project config. Legacy configs
1340        // will upsert this configuration from transaction and conditional tagging fields, even if
1341        // it is not present in the actual project config payload.
1342        let global = self.inner.global_config.current();
1343        let combined_config = {
1344            let config = match &project_info.config.metric_extraction {
1345                ErrorBoundary::Ok(config) if config.is_supported() => config,
1346                _ => return Ok(event_metrics_extracted),
1347            };
1348            let global_config = match &global.metric_extraction {
1349                ErrorBoundary::Ok(global_config) => global_config,
1350                #[allow(unused_variables)]
1351                ErrorBoundary::Err(e) => {
1352                    if_processing!(self.inner.config, {
1353                        // Config is invalid, but we will try to extract what we can with just the
1354                        // project config.
1355                        relay_log::error!("Failed to parse global extraction config {e}");
1356                        MetricExtractionGroups::EMPTY
1357                    } else {
1358                        // If there's an error with global metrics extraction, it is safe to assume that this
1359                        // Relay instance is not up-to-date, and we should skip extraction.
1360                        relay_log::debug!("Failed to parse global extraction config: {e}");
1361                        return Ok(event_metrics_extracted);
1362                    })
1363                }
1364            };
1365            CombinedMetricExtractionConfig::new(global_config, config)
1366        };
1367
1368        // Require a valid transaction metrics config.
1369        let tx_config = match &project_info.config.transaction_metrics {
1370            Some(ErrorBoundary::Ok(tx_config)) => tx_config,
1371            Some(ErrorBoundary::Err(e)) => {
1372                relay_log::debug!("Failed to parse legacy transaction metrics config: {e}");
1373                return Ok(event_metrics_extracted);
1374            }
1375            None => {
1376                relay_log::debug!("Legacy transaction metrics config is missing");
1377                return Ok(event_metrics_extracted);
1378            }
1379        };
1380
1381        if !tx_config.is_enabled() {
1382            static TX_CONFIG_ERROR: Once = Once::new();
1383            TX_CONFIG_ERROR.call_once(|| {
1384                if self.inner.config.processing_enabled() {
1385                    relay_log::error!(
1386                        "Processing Relay outdated, received tx config in version {}, which is not supported",
1387                        tx_config.version
1388                    );
1389                }
1390            });
1391
1392            return Ok(event_metrics_extracted);
1393        }
1394
1395        // If spans were already extracted for an event, we rely on span processing to extract metrics.
1396        let extract_spans = !spans_extracted.0
1397            && utils::sample(global.options.span_extraction_sample_rate.unwrap_or(1.0)).is_keep();
1398
1399        let metrics = crate::metrics_extraction::event::extract_metrics(
1400            event,
1401            crate::metrics_extraction::event::ExtractMetricsConfig {
1402                config: combined_config,
1403                sampling_decision,
1404                target_project_id: project_id,
1405                max_tag_value_size: self
1406                    .inner
1407                    .config
1408                    .aggregator_config_for(MetricNamespace::Spans)
1409                    .max_tag_value_length,
1410                extract_spans,
1411                transaction_from_dsc: managed_envelope
1412                    .envelope()
1413                    .dsc()
1414                    .and_then(|dsc| dsc.transaction.as_deref()),
1415            },
1416        );
1417
1418        extracted_metrics.extend(metrics, Some(sampling_decision));
1419
1420        if !project_info.has_feature(Feature::DiscardTransaction) {
1421            let transaction_from_dsc = managed_envelope
1422                .envelope()
1423                .dsc()
1424                .and_then(|dsc| dsc.transaction.as_deref());
1425
1426            let extractor = TransactionExtractor {
1427                config: tx_config,
1428                generic_config: Some(combined_config),
1429                transaction_from_dsc,
1430                sampling_decision,
1431                target_project_id: project_id,
1432            };
1433
1434            extracted_metrics.extend(extractor.extract(event)?, Some(sampling_decision));
1435        }
1436
1437        Ok(EventMetricsExtracted(true))
1438    }
1439
1440    /// Processes the general errors, and the items which require or create the events.
1441    async fn process_errors(
1442        &self,
1443        managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1444        project_id: ProjectId,
1445        mut ctx: processing::Context<'_>,
1446    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1447        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1448        let mut metrics = Metrics::default();
1449        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1450
1451        // Events can also contain user reports.
1452        report::process_user_reports(managed_envelope);
1453
1454        if_processing!(self.inner.config, {
1455            unreal::expand(managed_envelope, &self.inner.config)?;
1456            #[cfg(sentry)]
1457            playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1458            nnswitch::expand(managed_envelope)?;
1459        });
1460
1461        let extraction_result = event::extract(
1462            managed_envelope,
1463            &mut metrics,
1464            event_fully_normalized,
1465            &self.inner.config,
1466        )?;
1467        let mut event = extraction_result.event;
1468
1469        if_processing!(self.inner.config, {
1470            if let Some(inner_event_fully_normalized) =
1471                unreal::process(managed_envelope, &mut event)?
1472            {
1473                event_fully_normalized = inner_event_fully_normalized;
1474            }
1475            #[cfg(sentry)]
1476            if let Some(inner_event_fully_normalized) =
1477                playstation::process(managed_envelope, &mut event, ctx.project_info)?
1478            {
1479                event_fully_normalized = inner_event_fully_normalized;
1480            }
1481            if let Some(inner_event_fully_normalized) =
1482                attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1483            {
1484                event_fully_normalized = inner_event_fully_normalized;
1485            }
1486        });
1487
1488        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1489            managed_envelope,
1490            &mut event,
1491            ctx.project_info,
1492            ctx.sampling_project_info,
1493        );
1494
1495        let attachments = managed_envelope
1496            .envelope()
1497            .items()
1498            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1499        processing::utils::event::finalize(
1500            managed_envelope.envelope().headers(),
1501            &mut event,
1502            attachments,
1503            &mut metrics,
1504            &self.inner.config,
1505        )?;
1506        event_fully_normalized = processing::utils::event::normalize(
1507            managed_envelope.envelope().headers(),
1508            &mut event,
1509            event_fully_normalized,
1510            &ctx,
1511            &self.inner.geoip_lookup,
1512        )?;
1513        let filter_run = event::filter(
1514            managed_envelope,
1515            &mut event,
1516            ctx.project_info,
1517            &self.inner.global_config.current(),
1518        )?;
1519
1520        if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1521            dynamic_sampling::tag_error_with_sampling_decision(
1522                managed_envelope,
1523                &mut event,
1524                ctx.sampling_project_info,
1525                &self.inner.config,
1526            )
1527            .await;
1528        }
1529
1530        event = self
1531            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1532            .await?;
1533
1534        if event.value().is_some() {
1535            event::scrub(&mut event, ctx.project_info)?;
1536            event::serialize(
1537                managed_envelope,
1538                &mut event,
1539                event_fully_normalized,
1540                EventMetricsExtracted(false),
1541                SpansExtracted(false),
1542            )?;
1543            event::emit_feedback_metrics(managed_envelope.envelope());
1544        }
1545
1546        attachment::scrub(managed_envelope, ctx.project_info);
1547
1548        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1549            relay_log::error!(
1550                tags.project = %project_id,
1551                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1552                "ingested event without normalizing"
1553            );
1554        }
1555
1556        Ok(Some(extracted_metrics))
1557    }
1558
1559    /// Processes only transactions and transaction-related items.
1560    #[allow(unused_assignments)]
1561    #[allow(clippy::too_many_arguments)]
1562    async fn process_transactions(
1563        &self,
1564        managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1565        cogs: &mut Token,
1566        project_id: ProjectId,
1567        mut ctx: processing::Context<'_>,
1568        reservoir_counters: &ReservoirCounters,
1569    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1570        let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1571        let mut event_metrics_extracted = EventMetricsExtracted(false);
1572        let mut spans_extracted = SpansExtracted(false);
1573        let mut metrics = Metrics::default();
1574        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1575
1576        // We extract the main event from the envelope.
1577        let extraction_result = event::extract(
1578            managed_envelope,
1579            &mut metrics,
1580            event_fully_normalized,
1581            &self.inner.config,
1582        )?;
1583
1584        // If metrics were extracted we mark that.
1585        if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1586            event_metrics_extracted = inner_event_metrics_extracted;
1587        }
1588        if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1589            spans_extracted = inner_spans_extracted;
1590        };
1591
1592        // We take the main event out of the result.
1593        let mut event = extraction_result.event;
1594
1595        let profile_id = profile::filter(
1596            managed_envelope,
1597            &event,
1598            ctx.config,
1599            project_id,
1600            ctx.project_info,
1601        );
1602        profile::transfer_id(&mut event, profile_id);
1603
1604        ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1605            managed_envelope,
1606            &mut event,
1607            ctx.project_info,
1608            ctx.sampling_project_info,
1609        );
1610
1611        let attachments = managed_envelope
1612            .envelope()
1613            .items()
1614            .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1615        processing::utils::event::finalize(
1616            managed_envelope.envelope().headers(),
1617            &mut event,
1618            attachments,
1619            &mut metrics,
1620            &self.inner.config,
1621        )?;
1622
1623        event_fully_normalized = processing::utils::event::normalize(
1624            managed_envelope.envelope().headers(),
1625            &mut event,
1626            event_fully_normalized,
1627            &ctx,
1628            &self.inner.geoip_lookup,
1629        )?;
1630
1631        let filter_run = event::filter(
1632            managed_envelope,
1633            &mut event,
1634            ctx.project_info,
1635            ctx.global_config,
1636        )?;
1637
1638        // Always run dynamic sampling on processing Relays,
1639        // but delay decision until inbound filters have been fully processed.
1640        let run_dynamic_sampling =
1641            matches!(filter_run, FiltersStatus::Ok) || self.inner.config.processing_enabled();
1642
1643        let reservoir = self.new_reservoir_evaluator(
1644            managed_envelope.scoping().organization_id,
1645            reservoir_counters,
1646        );
1647
1648        let sampling_result = match run_dynamic_sampling {
1649            true => {
1650                dynamic_sampling::run(
1651                    managed_envelope,
1652                    &mut event,
1653                    ctx.config,
1654                    ctx.project_info,
1655                    ctx.sampling_project_info,
1656                    &reservoir,
1657                )
1658                .await
1659            }
1660            false => SamplingResult::Pending,
1661        };
1662
1663        relay_statsd::metric!(
1664            counter(RelayCounters::SamplingDecision) += 1,
1665            decision = sampling_result.decision().as_str(),
1666            item = "transaction"
1667        );
1668
1669        #[cfg(feature = "processing")]
1670        let server_sample_rate = sampling_result.sample_rate();
1671
1672        if let Some(outcome) = sampling_result.into_dropped_outcome() {
1673            // Process profiles before dropping the transaction, if necessary.
1674            // Before metric extraction to make sure the profile count is reflected correctly.
1675            profile::process(
1676                managed_envelope,
1677                &mut event,
1678                ctx.global_config,
1679                ctx.config,
1680                ctx.project_info,
1681            );
1682            // Extract metrics here, we're about to drop the event/transaction.
1683            event_metrics_extracted = self.extract_transaction_metrics(
1684                managed_envelope,
1685                &mut event,
1686                &mut extracted_metrics,
1687                project_id,
1688                ctx.project_info,
1689                SamplingDecision::Drop,
1690                event_metrics_extracted,
1691                spans_extracted,
1692            )?;
1693
1694            dynamic_sampling::drop_unsampled_items(
1695                managed_envelope,
1696                event,
1697                outcome,
1698                spans_extracted,
1699            );
1700
1701            // At this point we have:
1702            //  - An empty envelope.
1703            //  - An envelope containing only processed profiles.
1704            // We need to make sure there are enough quotas for these profiles.
1705            event = self
1706                .enforce_quotas(
1707                    managed_envelope,
1708                    Annotated::empty(),
1709                    &mut extracted_metrics,
1710                    ctx,
1711                )
1712                .await?;
1713
1714            return Ok(Some(extracted_metrics));
1715        }
1716
1717        let _post_ds = cogs.start_category("post_ds");
1718
1719        // Need to scrub the transaction before extracting spans.
1720        //
1721        // Unconditionally scrub to make sure PII is removed as early as possible.
1722        event::scrub(&mut event, ctx.project_info)?;
1723
1724        // TODO: remove once `relay.drop-transaction-attachments` has graduated.
1725        attachment::scrub(managed_envelope, ctx.project_info);
1726
1727        if_processing!(self.inner.config, {
1728            // Process profiles before extracting metrics, to make sure they are removed if they are invalid.
1729            let profile_id = profile::process(
1730                managed_envelope,
1731                &mut event,
1732                ctx.global_config,
1733                ctx.config,
1734                ctx.project_info,
1735            );
1736            profile::transfer_id(&mut event, profile_id);
1737            profile::scrub_profiler_id(&mut event);
1738
1739            // Always extract metrics in processing Relays for sampled items.
1740            event_metrics_extracted = self.extract_transaction_metrics(
1741                managed_envelope,
1742                &mut event,
1743                &mut extracted_metrics,
1744                project_id,
1745                ctx.project_info,
1746                SamplingDecision::Keep,
1747                event_metrics_extracted,
1748                spans_extracted,
1749            )?;
1750
1751            spans_extracted = span::extract_from_event(
1752                managed_envelope,
1753                &event,
1754                ctx.global_config,
1755                ctx.config,
1756                server_sample_rate,
1757                event_metrics_extracted,
1758                spans_extracted,
1759            );
1760        });
1761
1762        event = self
1763            .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1764            .await?;
1765
1766        if_processing!(self.inner.config, {
1767            event = span::maybe_discard_transaction(managed_envelope, event, ctx.project_info);
1768        });
1769
1770        // Event may have been dropped because of a quota and the envelope can be empty.
1771        if event.value().is_some() {
1772            event::serialize(
1773                managed_envelope,
1774                &mut event,
1775                event_fully_normalized,
1776                event_metrics_extracted,
1777                spans_extracted,
1778            )?;
1779        }
1780
1781        if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1782            relay_log::error!(
1783                tags.project = %project_id,
1784                tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1785                "ingested event without normalizing"
1786            );
1787        };
1788
1789        Ok(Some(extracted_metrics))
1790    }
1791
1792    async fn process_profile_chunks(
1793        &self,
1794        managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
1795        ctx: processing::Context<'_>,
1796    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1797        profile_chunk::filter(managed_envelope, ctx.project_info);
1798
1799        if_processing!(self.inner.config, {
1800            profile_chunk::process(
1801                managed_envelope,
1802                ctx.project_info,
1803                ctx.global_config,
1804                ctx.config,
1805            );
1806        });
1807
1808        self.enforce_quotas(
1809            managed_envelope,
1810            Annotated::empty(),
1811            &mut ProcessingExtractedMetrics::new(),
1812            ctx,
1813        )
1814        .await?;
1815
1816        Ok(None)
1817    }
1818
1819    /// Processes standalone items that require an event ID, but do not have an event on the same envelope.
1820    async fn process_standalone(
1821        &self,
1822        managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1823        project_id: ProjectId,
1824        ctx: processing::Context<'_>,
1825    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1826        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1827
1828        standalone::process(managed_envelope);
1829
1830        profile::filter(
1831            managed_envelope,
1832            &Annotated::empty(),
1833            ctx.config,
1834            project_id,
1835            ctx.project_info,
1836        );
1837
1838        self.enforce_quotas(
1839            managed_envelope,
1840            Annotated::empty(),
1841            &mut extracted_metrics,
1842            ctx,
1843        )
1844        .await?;
1845
1846        report::process_user_reports(managed_envelope);
1847        attachment::scrub(managed_envelope, ctx.project_info);
1848
1849        Ok(Some(extracted_metrics))
1850    }
1851
1852    /// Processes user and client reports.
1853    async fn process_client_reports(
1854        &self,
1855        managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1856        ctx: processing::Context<'_>,
1857    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1858        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1859
1860        self.enforce_quotas(
1861            managed_envelope,
1862            Annotated::empty(),
1863            &mut extracted_metrics,
1864            ctx,
1865        )
1866        .await?;
1867
1868        report::process_client_reports(
1869            managed_envelope,
1870            ctx.config,
1871            ctx.project_info,
1872            self.inner.addrs.outcome_aggregator.clone(),
1873        );
1874
1875        Ok(Some(extracted_metrics))
1876    }
1877
1878    /// Processes replays.
1879    async fn process_replays(
1880        &self,
1881        managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1882        ctx: processing::Context<'_>,
1883    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1884        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1885
1886        replay::process(
1887            managed_envelope,
1888            ctx.global_config,
1889            ctx.config,
1890            ctx.project_info,
1891            &self.inner.geoip_lookup,
1892        )?;
1893
1894        self.enforce_quotas(
1895            managed_envelope,
1896            Annotated::empty(),
1897            &mut extracted_metrics,
1898            ctx,
1899        )
1900        .await?;
1901
1902        Ok(Some(extracted_metrics))
1903    }
1904
1905    async fn process_nel(
1906        &self,
1907        mut managed_envelope: ManagedEnvelope,
1908        ctx: processing::Context<'_>,
1909    ) -> Result<ProcessingResult, ProcessingError> {
1910        nel::convert_to_logs(&mut managed_envelope);
1911        self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1912            .await
1913    }
1914
1915    async fn process_with_processor<P: processing::Processor>(
1916        &self,
1917        processor: &P,
1918        mut managed_envelope: ManagedEnvelope,
1919        ctx: processing::Context<'_>,
1920    ) -> Result<ProcessingResult, ProcessingError>
1921    where
1922        Outputs: From<P::Output>,
1923    {
1924        let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1925            debug_assert!(
1926                false,
1927                "there must be work for the {} processor",
1928                std::any::type_name::<P>(),
1929            );
1930            return Err(ProcessingError::ProcessingGroupMismatch);
1931        };
1932
1933        managed_envelope.update();
1934        match managed_envelope.envelope().is_empty() {
1935            true => managed_envelope.accept(),
1936            false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1937        }
1938
1939        processor
1940            .process(work, ctx)
1941            .await
1942            .map_err(|err| {
1943                relay_log::debug!(
1944                    error = &err as &dyn std::error::Error,
1945                    "processing pipeline failed"
1946                );
1947                ProcessingError::ProcessingFailure
1948            })
1949            .map(|o| o.map(Into::into))
1950            .map(ProcessingResult::Output)
1951    }
1952
1953    /// Processes standalone spans.
1954    ///
1955    /// This function does *not* run for spans extracted from transactions.
1956    #[allow(clippy::too_many_arguments)]
1957    async fn process_standalone_spans(
1958        &self,
1959        managed_envelope: &mut TypedEnvelope<SpanGroup>,
1960        _project_id: ProjectId,
1961        ctx: processing::Context<'_>,
1962        _reservoir_counters: &ReservoirCounters,
1963    ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1964        let mut extracted_metrics = ProcessingExtractedMetrics::new();
1965
1966        span::filter(managed_envelope, ctx.config, ctx.project_info);
1967        span::convert_otel_traces_data(managed_envelope);
1968
1969        if_processing!(self.inner.config, {
1970            let reservoir = self.new_reservoir_evaluator(
1971                managed_envelope.scoping().organization_id,
1972                _reservoir_counters,
1973            );
1974
1975            span::process(
1976                managed_envelope,
1977                &mut Annotated::empty(),
1978                &mut extracted_metrics,
1979                _project_id,
1980                ctx,
1981                &self.inner.geoip_lookup,
1982                &reservoir,
1983            )
1984            .await;
1985        });
1986
1987        self.enforce_quotas(
1988            managed_envelope,
1989            Annotated::empty(),
1990            &mut extracted_metrics,
1991            ctx,
1992        )
1993        .await?;
1994
1995        Ok(Some(extracted_metrics))
1996    }
1997
1998    async fn process_envelope(
1999        &self,
2000        cogs: &mut Token,
2001        project_id: ProjectId,
2002        message: ProcessEnvelopeGrouped<'_>,
2003    ) -> Result<ProcessingResult, ProcessingError> {
2004        let ProcessEnvelopeGrouped {
2005            group,
2006            envelope: mut managed_envelope,
2007            ctx,
2008            reservoir_counters,
2009        } = message;
2010
2011        // Pre-process the envelope headers.
2012        if let Some(sampling_state) = ctx.sampling_project_info {
2013            // Both transactions and standalone span envelopes need a normalized DSC header
2014            // to make sampling rules based on the segment/transaction name work correctly.
2015            managed_envelope
2016                .envelope_mut()
2017                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
2018        }
2019
2020        // Set the event retention. Effectively, this value will only be available in processing
2021        // mode when the full project config is queried from the upstream.
2022        if let Some(retention) = ctx.project_info.config.event_retention {
2023            managed_envelope.envelope_mut().set_retention(retention);
2024        }
2025
2026        // Set the event retention. Effectively, this value will only be available in processing
2027        // mode when the full project config is queried from the upstream.
2028        if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
2029            managed_envelope
2030                .envelope_mut()
2031                .set_downsampled_retention(retention);
2032        }
2033
2034        // Ensure the project ID is updated to the stored instance for this project cache. This can
2035        // differ in two cases:
2036        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
2037        //  2. The DSN was moved and the envelope sent to the old project ID.
2038        managed_envelope
2039            .envelope_mut()
2040            .meta_mut()
2041            .set_project_id(project_id);
2042
2043        macro_rules! run {
2044            ($fn_name:ident $(, $args:expr)*) => {
2045                async {
2046                    let mut managed_envelope = (managed_envelope, group).try_into()?;
2047                    match self.$fn_name(&mut managed_envelope, $($args),*).await {
2048                        Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
2049                            managed_envelope: managed_envelope.into_processed(),
2050                            extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
2051                        }),
2052                        Err(error) => {
2053                            relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
2054                            if let Some(outcome) = error.to_outcome() {
2055                                managed_envelope.reject(outcome);
2056                            }
2057
2058                            return Err(error);
2059                        }
2060                    }
2061                }.await
2062            };
2063        }
2064
2065        relay_log::trace!("Processing {group} group", group = group.variant());
2066
2067        match group {
2068            ProcessingGroup::Error => run!(process_errors, project_id, ctx),
2069            ProcessingGroup::Transaction => {
2070                run!(
2071                    process_transactions,
2072                    cogs,
2073                    project_id,
2074                    ctx,
2075                    reservoir_counters
2076                )
2077            }
2078            ProcessingGroup::Session => {
2079                self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
2080                    .await
2081            }
2082            ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx),
2083            ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
2084            ProcessingGroup::Replay => {
2085                run!(process_replays, ctx)
2086            }
2087            ProcessingGroup::CheckIn => {
2088                self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
2089                    .await
2090            }
2091            ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
2092            ProcessingGroup::Log => {
2093                self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
2094                    .await
2095            }
2096            ProcessingGroup::TraceMetric => {
2097                self.process_with_processor(
2098                    &self.inner.processing.trace_metrics,
2099                    managed_envelope,
2100                    ctx,
2101                )
2102                .await
2103            }
2104            ProcessingGroup::SpanV2 => {
2105                self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
2106                    .await
2107            }
2108            ProcessingGroup::Span => run!(
2109                process_standalone_spans,
2110                project_id,
2111                ctx,
2112                reservoir_counters
2113            ),
2114            ProcessingGroup::ProfileChunk => {
2115                run!(process_profile_chunks, ctx)
2116            }
2117            // Currently is not used.
2118            ProcessingGroup::Metrics => {
2119                // In proxy mode we simply forward the metrics.
2120                // This group shouldn't be used outside of proxy mode.
2121                if self.inner.config.relay_mode() != RelayMode::Proxy {
2122                    relay_log::error!(
2123                        tags.project = %project_id,
2124                        items = ?managed_envelope.envelope().items().next().map(Item::ty),
2125                        "received metrics in the process_state"
2126                    );
2127                }
2128
2129                Ok(ProcessingResult::no_metrics(
2130                    managed_envelope.into_processed(),
2131                ))
2132            }
2133            // Fallback to the legacy process_state implementation for Ungrouped events.
2134            ProcessingGroup::Ungrouped => {
2135                relay_log::error!(
2136                    tags.project = %project_id,
2137                    items = ?managed_envelope.envelope().items().next().map(Item::ty),
2138                    "could not identify the processing group based on the envelope's items"
2139                );
2140
2141                Ok(ProcessingResult::no_metrics(
2142                    managed_envelope.into_processed(),
2143                ))
2144            }
2145            // Leave this group unchanged.
2146            //
2147            // This will later be forwarded to upstream.
2148            ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
2149                managed_envelope.into_processed(),
2150            )),
2151        }
2152    }
2153
2154    /// Processes the envelope and returns the processed envelope back.
2155    ///
2156    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
2157    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
2158    /// to be dropped, this is `None`.
2159    async fn process<'a>(
2160        &self,
2161        cogs: &mut Token,
2162        mut message: ProcessEnvelopeGrouped<'a>,
2163    ) -> Result<Option<Submit<'a>>, ProcessingError> {
2164        let ProcessEnvelopeGrouped {
2165            ref mut envelope,
2166            ctx,
2167            ..
2168        } = message;
2169
2170        // Prefer the project's project ID, and fall back to the stated project id from the
2171        // envelope. The project ID is available in all modes, other than in proxy mode, where
2172        // envelopes for unknown projects are forwarded blindly.
2173        //
2174        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
2175        // since we cannot process an envelope without project ID, so drop it.
2176        let Some(project_id) = ctx
2177            .project_info
2178            .project_id
2179            .or_else(|| envelope.envelope().meta().project_id())
2180        else {
2181            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2182            return Err(ProcessingError::MissingProjectId);
2183        };
2184
2185        let client = envelope.envelope().meta().client().map(str::to_owned);
2186        let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2187        let project_key = envelope.envelope().meta().public_key();
2188        // Only allow sending to the sampling key, if we successfully loaded a sampling project
2189        // info relating to it. This filters out unknown/invalid project keys as well as project
2190        // keys from different organizations.
2191        let sampling_key = envelope
2192            .envelope()
2193            .sampling_key()
2194            .filter(|_| ctx.sampling_project_info.is_some());
2195
2196        // We set additional information on the scope, which will be removed after processing the
2197        // envelope.
2198        relay_log::configure_scope(|scope| {
2199            scope.set_tag("project", project_id);
2200            if let Some(client) = client {
2201                scope.set_tag("sdk", client);
2202            }
2203            if let Some(user_agent) = user_agent {
2204                scope.set_extra("user_agent", user_agent.into());
2205            }
2206        });
2207
2208        let result = match self.process_envelope(cogs, project_id, message).await {
2209            Ok(ProcessingResult::Envelope {
2210                mut managed_envelope,
2211                extracted_metrics,
2212            }) => {
2213                // The envelope could be modified or even emptied during processing, which
2214                // requires re-computation of the context.
2215                managed_envelope.update();
2216
2217                let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2218                send_metrics(
2219                    extracted_metrics.metrics,
2220                    project_key,
2221                    sampling_key,
2222                    &self.inner.addrs.aggregator,
2223                );
2224
2225                let envelope_response = if managed_envelope.envelope().is_empty() {
2226                    if !has_metrics {
2227                        // Individual rate limits have already been issued
2228                        managed_envelope.reject(Outcome::RateLimited(None));
2229                    } else {
2230                        managed_envelope.accept();
2231                    }
2232
2233                    None
2234                } else {
2235                    Some(managed_envelope)
2236                };
2237
2238                Ok(envelope_response.map(Submit::Envelope))
2239            }
2240            Ok(ProcessingResult::Output(Output { main, metrics })) => {
2241                if let Some(metrics) = metrics {
2242                    metrics.accept(|metrics| {
2243                        send_metrics(
2244                            metrics,
2245                            project_key,
2246                            sampling_key,
2247                            &self.inner.addrs.aggregator,
2248                        );
2249                    });
2250                }
2251
2252                let ctx = ctx.to_forward();
2253                Ok(main.map(|output| Submit::Output { output, ctx }))
2254            }
2255            Err(err) => Err(err),
2256        };
2257
2258        relay_log::configure_scope(|scope| {
2259            scope.remove_tag("project");
2260            scope.remove_tag("sdk");
2261            scope.remove_tag("user_agent");
2262        });
2263
2264        result
2265    }
2266
2267    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2268        let project_key = message.envelope.envelope().meta().public_key();
2269        let wait_time = message.envelope.age();
2270        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2271
2272        // This COGS handling may need an overhaul in the future:
2273        // Cancel the passed in token, to start individual measurements per envelope instead.
2274        cogs.cancel();
2275
2276        let scoping = message.envelope.scoping();
2277        for (group, envelope) in ProcessingGroup::split_envelope(
2278            *message.envelope.into_envelope(),
2279            &message.project_info,
2280        ) {
2281            let mut cogs = self
2282                .inner
2283                .cogs
2284                .timed(ResourceId::Relay, AppFeature::from(group));
2285
2286            let mut envelope =
2287                ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2288            envelope.scope(scoping);
2289
2290            let global_config = self.inner.global_config.current();
2291
2292            let ctx = processing::Context {
2293                config: &self.inner.config,
2294                global_config: &global_config,
2295                project_info: &message.project_info,
2296                sampling_project_info: message.sampling_project_info.as_deref(),
2297                rate_limits: &message.rate_limits,
2298            };
2299
2300            let message = ProcessEnvelopeGrouped {
2301                group,
2302                envelope,
2303                ctx,
2304                reservoir_counters: &message.reservoir_counters,
2305            };
2306
2307            let result = metric!(
2308                timer(RelayTimers::EnvelopeProcessingTime),
2309                group = group.variant(),
2310                { self.process(&mut cogs, message).await }
2311            );
2312
2313            match result {
2314                Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2315                Ok(None) => {}
2316                Err(error) if error.is_unexpected() => {
2317                    relay_log::error!(
2318                        tags.project_key = %project_key,
2319                        error = &error as &dyn Error,
2320                        "error processing envelope"
2321                    )
2322                }
2323                Err(error) => {
2324                    relay_log::debug!(
2325                        tags.project_key = %project_key,
2326                        error = &error as &dyn Error,
2327                        "error processing envelope"
2328                    )
2329                }
2330            }
2331        }
2332    }
2333
2334    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2335        let ProcessMetrics {
2336            data,
2337            project_key,
2338            received_at,
2339            sent_at,
2340            source,
2341        } = message;
2342
2343        let received_timestamp =
2344            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2345
2346        let mut buckets = data.into_buckets(received_timestamp);
2347        if buckets.is_empty() {
2348            return;
2349        };
2350        cogs.update(relay_metrics::cogs::BySize(&buckets));
2351
2352        let clock_drift_processor =
2353            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2354
2355        buckets.retain_mut(|bucket| {
2356            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2357                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2358                return false;
2359            }
2360
2361            if !self::metrics::is_valid_namespace(bucket) {
2362                return false;
2363            }
2364
2365            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2366
2367            if !matches!(source, BucketSource::Internal) {
2368                bucket.metadata = BucketMetadata::new(received_timestamp);
2369            }
2370
2371            true
2372        });
2373
2374        let project = self.inner.project_cache.get(project_key);
2375
2376        // Best effort check to filter and rate limit buckets, if there is no project state
2377        // available at the current time, we will check again after flushing.
2378        let buckets = match project.state() {
2379            ProjectState::Enabled(project_info) => {
2380                let rate_limits = project.rate_limits().current_limits();
2381                self.check_buckets(project_key, project_info, &rate_limits, buckets)
2382            }
2383            _ => buckets,
2384        };
2385
2386        relay_log::trace!("merging metric buckets into the aggregator");
2387        self.inner
2388            .addrs
2389            .aggregator
2390            .send(MergeBuckets::new(project_key, buckets));
2391    }
2392
2393    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2394        let ProcessBatchedMetrics {
2395            payload,
2396            source,
2397            received_at,
2398            sent_at,
2399        } = message;
2400
2401        #[derive(serde::Deserialize)]
2402        struct Wrapper {
2403            buckets: HashMap<ProjectKey, Vec<Bucket>>,
2404        }
2405
2406        let buckets = match serde_json::from_slice(&payload) {
2407            Ok(Wrapper { buckets }) => buckets,
2408            Err(error) => {
2409                relay_log::debug!(
2410                    error = &error as &dyn Error,
2411                    "failed to parse batched metrics",
2412                );
2413                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2414                return;
2415            }
2416        };
2417
2418        for (project_key, buckets) in buckets {
2419            self.handle_process_metrics(
2420                cogs,
2421                ProcessMetrics {
2422                    data: MetricData::Parsed(buckets),
2423                    project_key,
2424                    source,
2425                    received_at,
2426                    sent_at,
2427                },
2428            )
2429        }
2430    }
2431
2432    fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2433        let _submit = cogs.start_category("submit");
2434
2435        #[cfg(feature = "processing")]
2436        if self.inner.config.processing_enabled()
2437            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2438        {
2439            match submit {
2440                Submit::Envelope(envelope) => store_forwarder.send(StoreEnvelope { envelope }),
2441                Submit::Output { output, ctx } => output
2442                    .forward_store(store_forwarder, ctx)
2443                    .unwrap_or_else(|err| err.into_inner()),
2444            }
2445            return;
2446        }
2447
2448        let mut envelope = match submit {
2449            Submit::Envelope(envelope) => envelope,
2450            Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2451                Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2452                Err(_) => {
2453                    relay_log::error!("failed to serialize output to an envelope");
2454                    return;
2455                }
2456            },
2457        };
2458
2459        if envelope.envelope_mut().is_empty() {
2460            envelope.accept();
2461            return;
2462        }
2463
2464        // Override the `sent_at` timestamp. Since the envelope went through basic
2465        // normalization, all timestamps have been corrected. We propagate the new
2466        // `sent_at` to allow the next Relay to double-check this timestamp and
2467        // potentially apply correction again. This is done as close to sending as
2468        // possible so that we avoid internal delays.
2469        envelope.envelope_mut().set_sent_at(Utc::now());
2470
2471        relay_log::trace!("sending envelope to sentry endpoint");
2472        let http_encoding = self.inner.config.http_encoding();
2473        let result = envelope.envelope().to_vec().and_then(|v| {
2474            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2475        });
2476
2477        match result {
2478            Ok(body) => {
2479                self.inner
2480                    .addrs
2481                    .upstream_relay
2482                    .send(SendRequest(SendEnvelope {
2483                        envelope,
2484                        body,
2485                        http_encoding,
2486                        project_cache: self.inner.project_cache.clone(),
2487                    }));
2488            }
2489            Err(error) => {
2490                // Errors are only logged for what we consider an internal discard reason. These
2491                // indicate errors in the infrastructure or implementation bugs.
2492                relay_log::error!(
2493                    error = &error as &dyn Error,
2494                    tags.project_key = %envelope.scoping().project_key,
2495                    "failed to serialize envelope payload"
2496                );
2497
2498                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2499            }
2500        }
2501    }
2502
2503    fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2504        let SubmitClientReports {
2505            client_reports,
2506            scoping,
2507        } = message;
2508
2509        let upstream = self.inner.config.upstream_descriptor();
2510        let dsn = PartialDsn::outbound(&scoping, upstream);
2511
2512        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2513        for client_report in client_reports {
2514            match client_report.serialize() {
2515                Ok(payload) => {
2516                    let mut item = Item::new(ItemType::ClientReport);
2517                    item.set_payload(ContentType::Json, payload);
2518                    envelope.add_item(item);
2519                }
2520                Err(error) => {
2521                    relay_log::error!(
2522                        error = &error as &dyn std::error::Error,
2523                        "failed to serialize client report"
2524                    );
2525                }
2526            }
2527        }
2528
2529        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2530        self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2531    }
2532
2533    fn check_buckets(
2534        &self,
2535        project_key: ProjectKey,
2536        project_info: &ProjectInfo,
2537        rate_limits: &RateLimits,
2538        buckets: Vec<Bucket>,
2539    ) -> Vec<Bucket> {
2540        let Some(scoping) = project_info.scoping(project_key) else {
2541            relay_log::error!(
2542                tags.project_key = project_key.as_str(),
2543                "there is no scoping: dropping {} buckets",
2544                buckets.len(),
2545            );
2546            return Vec::new();
2547        };
2548
2549        let mut buckets = self::metrics::apply_project_info(
2550            buckets,
2551            &self.inner.metric_outcomes,
2552            project_info,
2553            scoping,
2554        );
2555
2556        let namespaces: BTreeSet<MetricNamespace> = buckets
2557            .iter()
2558            .filter_map(|bucket| bucket.name.try_namespace())
2559            .collect();
2560
2561        for namespace in namespaces {
2562            let limits = rate_limits.check_with_quotas(
2563                project_info.get_quotas(),
2564                scoping.item(DataCategory::MetricBucket),
2565            );
2566
2567            if limits.is_limited() {
2568                let rejected;
2569                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2570                    bucket.name.try_namespace() == Some(namespace)
2571                });
2572
2573                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2574                self.inner.metric_outcomes.track(
2575                    scoping,
2576                    &rejected,
2577                    Outcome::RateLimited(reason_code),
2578                );
2579            }
2580        }
2581
2582        let quotas = project_info.config.quotas.clone();
2583        match MetricsLimiter::create(buckets, quotas, scoping) {
2584            Ok(mut bucket_limiter) => {
2585                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2586                bucket_limiter.into_buckets()
2587            }
2588            Err(buckets) => buckets,
2589        }
2590    }
2591
2592    #[cfg(feature = "processing")]
2593    async fn rate_limit_buckets(
2594        &self,
2595        scoping: Scoping,
2596        project_info: &ProjectInfo,
2597        mut buckets: Vec<Bucket>,
2598    ) -> Vec<Bucket> {
2599        let Some(rate_limiter) = &self.inner.rate_limiter else {
2600            return buckets;
2601        };
2602
2603        let global_config = self.inner.global_config.current();
2604        let namespaces = buckets
2605            .iter()
2606            .filter_map(|bucket| bucket.name.try_namespace())
2607            .counts();
2608
2609        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2610
2611        for (namespace, quantity) in namespaces {
2612            let item_scoping = scoping.metric_bucket(namespace);
2613
2614            let limits = match rate_limiter
2615                .is_rate_limited(quotas, item_scoping, quantity, false)
2616                .await
2617            {
2618                Ok(limits) => limits,
2619                Err(err) => {
2620                    relay_log::error!(
2621                        error = &err as &dyn std::error::Error,
2622                        "failed to check redis rate limits"
2623                    );
2624                    break;
2625                }
2626            };
2627
2628            if limits.is_limited() {
2629                let rejected;
2630                (buckets, rejected) = utils::split_off(buckets, |bucket| {
2631                    bucket.name.try_namespace() == Some(namespace)
2632                });
2633
2634                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2635                self.inner.metric_outcomes.track(
2636                    scoping,
2637                    &rejected,
2638                    Outcome::RateLimited(reason_code),
2639                );
2640
2641                self.inner
2642                    .project_cache
2643                    .get(item_scoping.scoping.project_key)
2644                    .rate_limits()
2645                    .merge(limits);
2646            }
2647        }
2648
2649        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2650            Err(buckets) => buckets,
2651            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2652        }
2653    }
2654
2655    /// Check and apply rate limits to metrics buckets for transactions and spans.
2656    #[cfg(feature = "processing")]
2657    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2658        relay_log::trace!("handle_rate_limit_buckets");
2659
2660        let scoping = *bucket_limiter.scoping();
2661
2662        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2663            let global_config = self.inner.global_config.current();
2664            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2665
2666            // We set over_accept_once such that the limit is actually reached, which allows subsequent
2667            // calls with quantity=0 to be rate limited.
2668            let over_accept_once = true;
2669            let mut rate_limits = RateLimits::new();
2670
2671            for category in [DataCategory::Transaction, DataCategory::Span] {
2672                let count = bucket_limiter.count(category);
2673
2674                let timer = Instant::now();
2675                let mut is_limited = false;
2676
2677                if let Some(count) = count {
2678                    match rate_limiter
2679                        .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2680                        .await
2681                    {
2682                        Ok(limits) => {
2683                            is_limited = limits.is_limited();
2684                            rate_limits.merge(limits)
2685                        }
2686                        Err(e) => relay_log::error!(error = &e as &dyn Error),
2687                    }
2688                }
2689
2690                relay_statsd::metric!(
2691                    timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2692                    category = category.name(),
2693                    limited = if is_limited { "true" } else { "false" },
2694                    count = match count {
2695                        None => "none",
2696                        Some(0) => "0",
2697                        Some(1) => "1",
2698                        Some(1..=10) => "10",
2699                        Some(1..=25) => "25",
2700                        Some(1..=50) => "50",
2701                        Some(51..=100) => "100",
2702                        Some(101..=500) => "500",
2703                        _ => "> 500",
2704                    },
2705                );
2706            }
2707
2708            if rate_limits.is_limited() {
2709                let was_enforced =
2710                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2711
2712                if was_enforced {
2713                    // Update the rate limits in the project cache.
2714                    self.inner
2715                        .project_cache
2716                        .get(scoping.project_key)
2717                        .rate_limits()
2718                        .merge(rate_limits);
2719                }
2720            }
2721        }
2722
2723        bucket_limiter.into_buckets()
2724    }
2725
2726    /// Cardinality limits the passed buckets and returns a filtered vector of only accepted buckets.
2727    #[cfg(feature = "processing")]
2728    async fn cardinality_limit_buckets(
2729        &self,
2730        scoping: Scoping,
2731        limits: &[CardinalityLimit],
2732        buckets: Vec<Bucket>,
2733    ) -> Vec<Bucket> {
2734        let global_config = self.inner.global_config.current();
2735        let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2736
2737        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2738            return buckets;
2739        }
2740
2741        let Some(ref limiter) = self.inner.cardinality_limiter else {
2742            return buckets;
2743        };
2744
2745        let scope = relay_cardinality::Scoping {
2746            organization_id: scoping.organization_id,
2747            project_id: scoping.project_id,
2748        };
2749
2750        let limits = match limiter
2751            .check_cardinality_limits(scope, limits, buckets)
2752            .await
2753        {
2754            Ok(limits) => limits,
2755            Err((buckets, error)) => {
2756                relay_log::error!(
2757                    error = &error as &dyn std::error::Error,
2758                    "cardinality limiter failed"
2759                );
2760                return buckets;
2761            }
2762        };
2763
2764        let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2765        if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2766            for limit in limits.exceeded_limits() {
2767                relay_log::with_scope(
2768                    |scope| {
2769                        // Set the organization as user so we can alert on distinct org_ids.
2770                        scope.set_user(Some(relay_log::sentry::User {
2771                            id: Some(scoping.organization_id.to_string()),
2772                            ..Default::default()
2773                        }));
2774                    },
2775                    || {
2776                        relay_log::error!(
2777                            tags.organization_id = scoping.organization_id.value(),
2778                            tags.limit_id = limit.id,
2779                            tags.passive = limit.passive,
2780                            "Cardinality Limit"
2781                        );
2782                    },
2783                );
2784            }
2785        }
2786
2787        for (limit, reports) in limits.cardinality_reports() {
2788            for report in reports {
2789                self.inner
2790                    .metric_outcomes
2791                    .cardinality(scoping, limit, report);
2792            }
2793        }
2794
2795        if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2796            return limits.into_source();
2797        }
2798
2799        let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2800
2801        for (bucket, exceeded) in rejected {
2802            self.inner.metric_outcomes.track(
2803                scoping,
2804                &[bucket],
2805                Outcome::CardinalityLimited(exceeded.id.clone()),
2806            );
2807        }
2808        accepted
2809    }
2810
2811    /// Processes metric buckets and sends them to kafka.
2812    ///
2813    /// This function runs the following steps:
2814    ///  - cardinality limiting
2815    ///  - rate limiting
2816    ///  - submit to `StoreForwarder`
2817    #[cfg(feature = "processing")]
2818    async fn encode_metrics_processing(
2819        &self,
2820        message: FlushBuckets,
2821        store_forwarder: &Addr<Store>,
2822    ) {
2823        use crate::constants::DEFAULT_EVENT_RETENTION;
2824        use crate::services::store::StoreMetrics;
2825
2826        for ProjectBuckets {
2827            buckets,
2828            scoping,
2829            project_info,
2830            ..
2831        } in message.buckets.into_values()
2832        {
2833            let buckets = self
2834                .rate_limit_buckets(scoping, &project_info, buckets)
2835                .await;
2836
2837            let limits = project_info.get_cardinality_limits();
2838            let buckets = self
2839                .cardinality_limit_buckets(scoping, limits, buckets)
2840                .await;
2841
2842            if buckets.is_empty() {
2843                continue;
2844            }
2845
2846            let retention = project_info
2847                .config
2848                .event_retention
2849                .unwrap_or(DEFAULT_EVENT_RETENTION);
2850
2851            // The store forwarder takes care of bucket splitting internally, so we can submit the
2852            // entire list of buckets. There is no batching needed here.
2853            store_forwarder.send(StoreMetrics {
2854                buckets,
2855                scoping,
2856                retention,
2857            });
2858        }
2859    }
2860
2861    /// Serializes metric buckets to JSON and sends them to the upstream.
2862    ///
2863    /// This function runs the following steps:
2864    ///  - partitioning
2865    ///  - batching by configured size limit
2866    ///  - serialize to JSON and pack in an envelope
2867    ///  - submit the envelope to upstream or kafka depending on configuration
2868    ///
2869    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2870    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2871    /// already.
2872    fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2873        let FlushBuckets {
2874            partition_key,
2875            buckets,
2876        } = message;
2877
2878        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2879        let upstream = self.inner.config.upstream_descriptor();
2880
2881        for ProjectBuckets {
2882            buckets, scoping, ..
2883        } in buckets.values()
2884        {
2885            let dsn = PartialDsn::outbound(scoping, upstream);
2886
2887            relay_statsd::metric!(
2888                histogram(RelayHistograms::PartitionKeys) = u64::from(partition_key)
2889            );
2890
2891            let mut num_batches = 0;
2892            for batch in BucketsView::from(buckets).by_size(batch_size) {
2893                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2894
2895                let mut item = Item::new(ItemType::MetricBuckets);
2896                item.set_source_quantities(crate::metrics::extract_quantities(batch));
2897                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2898                envelope.add_item(item);
2899
2900                let mut envelope =
2901                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2902                envelope
2903                    .set_partition_key(Some(partition_key))
2904                    .scope(*scoping);
2905
2906                relay_statsd::metric!(
2907                    histogram(RelayHistograms::BucketsPerBatch) = batch.len() as u64
2908                );
2909
2910                self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2911                num_batches += 1;
2912            }
2913
2914            relay_statsd::metric!(histogram(RelayHistograms::BatchesPerPartition) = num_batches);
2915        }
2916    }
2917
2918    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
2919    fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2920        if partition.is_empty() {
2921            return;
2922        }
2923
2924        let (unencoded, project_info) = partition.take();
2925        let http_encoding = self.inner.config.http_encoding();
2926        let encoded = match encode_payload(&unencoded, http_encoding) {
2927            Ok(payload) => payload,
2928            Err(error) => {
2929                let error = &error as &dyn std::error::Error;
2930                relay_log::error!(error, "failed to encode metrics payload");
2931                return;
2932            }
2933        };
2934
2935        let request = SendMetricsRequest {
2936            partition_key: partition_key.to_string(),
2937            unencoded,
2938            encoded,
2939            project_info,
2940            http_encoding,
2941            metric_outcomes: self.inner.metric_outcomes.clone(),
2942        };
2943
2944        self.inner.addrs.upstream_relay.send(SendRequest(request));
2945    }
2946
2947    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
2948    ///
2949    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
2950    /// payload directly instead of per-project Envelopes.
2951    ///
2952    /// This function runs the following steps:
2953    ///  - partitioning
2954    ///  - batching by configured size limit
2955    ///  - serialize to JSON
2956    ///  - submit directly to the upstream
2957    ///
2958    /// Cardinality limiting and rate limiting run only in processing Relays as they both require
2959    /// access to the central Redis instance. Cached rate limits are applied in the project cache
2960    /// already.
2961    fn encode_metrics_global(&self, message: FlushBuckets) {
2962        let FlushBuckets {
2963            partition_key,
2964            buckets,
2965        } = message;
2966
2967        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2968        let mut partition = Partition::new(batch_size);
2969        let mut partition_splits = 0;
2970
2971        for ProjectBuckets {
2972            buckets, scoping, ..
2973        } in buckets.values()
2974        {
2975            for bucket in buckets {
2976                let mut remaining = Some(BucketView::new(bucket));
2977
2978                while let Some(bucket) = remaining.take() {
2979                    if let Some(next) = partition.insert(bucket, *scoping) {
2980                        // A part of the bucket could not be inserted. Take the partition and submit
2981                        // it immediately. Repeat until the final part was inserted. This should
2982                        // always result in a request, otherwise we would enter an endless loop.
2983                        self.send_global_partition(partition_key, &mut partition);
2984                        remaining = Some(next);
2985                        partition_splits += 1;
2986                    }
2987                }
2988            }
2989        }
2990
2991        if partition_splits > 0 {
2992            metric!(histogram(RelayHistograms::PartitionSplits) = partition_splits);
2993        }
2994
2995        self.send_global_partition(partition_key, &mut partition);
2996    }
2997
2998    async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2999        for (project_key, pb) in message.buckets.iter_mut() {
3000            let buckets = std::mem::take(&mut pb.buckets);
3001            pb.buckets =
3002                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
3003        }
3004
3005        #[cfg(feature = "processing")]
3006        if self.inner.config.processing_enabled()
3007            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
3008        {
3009            return self
3010                .encode_metrics_processing(message, store_forwarder)
3011                .await;
3012        }
3013
3014        if self.inner.config.http_global_metrics() {
3015            self.encode_metrics_global(message)
3016        } else {
3017            self.encode_metrics_envelope(cogs, message)
3018        }
3019    }
3020
3021    #[cfg(all(test, feature = "processing"))]
3022    fn redis_rate_limiter_enabled(&self) -> bool {
3023        self.inner.rate_limiter.is_some()
3024    }
3025
3026    async fn handle_message(&self, message: EnvelopeProcessor) {
3027        let ty = message.variant();
3028        let feature_weights = self.feature_weights(&message);
3029
3030        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
3031            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
3032
3033            match message {
3034                EnvelopeProcessor::ProcessEnvelope(m) => {
3035                    self.handle_process_envelope(&mut cogs, *m).await
3036                }
3037                EnvelopeProcessor::ProcessProjectMetrics(m) => {
3038                    self.handle_process_metrics(&mut cogs, *m)
3039                }
3040                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
3041                    self.handle_process_batched_metrics(&mut cogs, *m)
3042                }
3043                EnvelopeProcessor::FlushBuckets(m) => {
3044                    self.handle_flush_buckets(&mut cogs, *m).await
3045                }
3046                EnvelopeProcessor::SubmitClientReports(m) => {
3047                    self.handle_submit_client_reports(&mut cogs, *m)
3048                }
3049            }
3050        });
3051    }
3052
3053    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
3054        match message {
3055            // Envelope is split later and tokens are attributed then.
3056            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
3057            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
3058            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
3059            EnvelopeProcessor::FlushBuckets(v) => v
3060                .buckets
3061                .values()
3062                .map(|s| {
3063                    if self.inner.config.processing_enabled() {
3064                        // Processing does not encode the metrics but instead rate and cardinality
3065                        // limits the metrics, which scales by count and not size.
3066                        relay_metrics::cogs::ByCount(&s.buckets).into()
3067                    } else {
3068                        relay_metrics::cogs::BySize(&s.buckets).into()
3069                    }
3070                })
3071                .fold(FeatureWeights::none(), FeatureWeights::merge),
3072            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
3073        }
3074    }
3075
3076    fn new_reservoir_evaluator(
3077        &self,
3078        _organization_id: OrganizationId,
3079        reservoir_counters: &ReservoirCounters,
3080    ) -> ReservoirEvaluator<'_> {
3081        #[cfg_attr(not(feature = "processing"), expect(unused_mut))]
3082        let mut reservoir = ReservoirEvaluator::new(Arc::clone(reservoir_counters));
3083
3084        #[cfg(feature = "processing")]
3085        if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
3086            reservoir.set_redis(_organization_id, quotas_client);
3087        }
3088
3089        reservoir
3090    }
3091}
3092
3093impl Service for EnvelopeProcessorService {
3094    type Interface = EnvelopeProcessor;
3095
3096    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
3097        while let Some(message) = rx.recv().await {
3098            let service = self.clone();
3099            self.inner
3100                .pool
3101                .spawn_async(
3102                    async move {
3103                        service.handle_message(message).await;
3104                    }
3105                    .boxed(),
3106                )
3107                .await;
3108        }
3109    }
3110}
3111
3112/// Result of the enforcement of rate limiting.
3113///
3114/// If the event is already `None` or it's rate limited, it will be `None`
3115/// within the [`Annotated`].
3116struct EnforcementResult {
3117    event: Annotated<Event>,
3118    #[cfg_attr(not(feature = "processing"), expect(dead_code))]
3119    rate_limits: RateLimits,
3120}
3121
3122impl EnforcementResult {
3123    /// Creates a new [`EnforcementResult`].
3124    pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
3125        Self { event, rate_limits }
3126    }
3127}
3128
3129#[derive(Clone)]
3130enum RateLimiter {
3131    Cached,
3132    #[cfg(feature = "processing")]
3133    Consistent(Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>),
3134}
3135
3136impl RateLimiter {
3137    async fn enforce<Group>(
3138        &self,
3139        managed_envelope: &mut TypedEnvelope<Group>,
3140        event: Annotated<Event>,
3141        _extracted_metrics: &mut ProcessingExtractedMetrics,
3142        ctx: processing::Context<'_>,
3143    ) -> Result<EnforcementResult, ProcessingError> {
3144        if managed_envelope.envelope().is_empty() && event.value().is_none() {
3145            return Ok(EnforcementResult::new(event, RateLimits::default()));
3146        }
3147
3148        let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
3149        if quotas.is_empty() {
3150            return Ok(EnforcementResult::new(event, RateLimits::default()));
3151        }
3152
3153        let event_category = event_category(&event);
3154
3155        // We extract the rate limiters, in case we perform consistent rate limiting, since we will
3156        // need Redis access.
3157        //
3158        // When invoking the rate limiter, capture if the event item has been rate limited to also
3159        // remove it from the processing state eventually.
3160        let this = self.clone();
3161        let mut envelope_limiter =
3162            EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
3163                let this = this.clone();
3164
3165                async move {
3166                    match this {
3167                        #[cfg(feature = "processing")]
3168                        RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
3169                            rate_limiter
3170                                .is_rate_limited(quotas, item_scope, _quantity, false)
3171                                .await?,
3172                        ),
3173                        _ => Ok::<_, ProcessingError>(
3174                            ctx.rate_limits.check_with_quotas(quotas, item_scope),
3175                        ),
3176                    }
3177                }
3178            });
3179
3180        // Tell the envelope limiter about the event, since it has been removed from the Envelope at
3181        // this stage in processing.
3182        if let Some(category) = event_category {
3183            envelope_limiter.assume_event(category);
3184        }
3185
3186        let scoping = managed_envelope.scoping();
3187        let (enforcement, rate_limits) =
3188            metric!(timer(RelayTimers::EventProcessingRateLimiting), {
3189                envelope_limiter
3190                    .compute(managed_envelope.envelope_mut(), &scoping)
3191                    .await
3192            })?;
3193        let event_active = enforcement.is_event_active();
3194
3195        // Use the same rate limits as used for the envelope on the metrics.
3196        // Those rate limits should not be checked for expiry or similar to ensure a consistent
3197        // limiting of envelope items and metrics.
3198        #[cfg(feature = "processing")]
3199        _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3200        enforcement.apply_with_outcomes(managed_envelope);
3201
3202        if event_active {
3203            debug_assert!(managed_envelope.envelope().is_empty());
3204            return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3205        }
3206
3207        Ok(EnforcementResult::new(event, rate_limits))
3208    }
3209}
3210
3211pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3212    let envelope_body: Vec<u8> = match http_encoding {
3213        HttpEncoding::Identity => return Ok(body.clone()),
3214        HttpEncoding::Deflate => {
3215            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3216            encoder.write_all(body.as_ref())?;
3217            encoder.finish()?
3218        }
3219        HttpEncoding::Gzip => {
3220            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3221            encoder.write_all(body.as_ref())?;
3222            encoder.finish()?
3223        }
3224        HttpEncoding::Br => {
3225            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
3226            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3227            encoder.write_all(body.as_ref())?;
3228            encoder.into_inner()
3229        }
3230        HttpEncoding::Zstd => {
3231            // Use the fastest compression level, our main objective here is to get the best
3232            // compression ratio for least amount of time spent.
3233            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3234            encoder.write_all(body.as_ref())?;
3235            encoder.finish()?
3236        }
3237    };
3238
3239    Ok(envelope_body.into())
3240}
3241
3242/// An upstream request that submits an envelope via HTTP.
3243#[derive(Debug)]
3244pub struct SendEnvelope {
3245    pub envelope: TypedEnvelope<Processed>,
3246    pub body: Bytes,
3247    pub http_encoding: HttpEncoding,
3248    pub project_cache: ProjectCacheHandle,
3249}
3250
3251impl UpstreamRequest for SendEnvelope {
3252    fn method(&self) -> reqwest::Method {
3253        reqwest::Method::POST
3254    }
3255
3256    fn path(&self) -> Cow<'_, str> {
3257        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3258    }
3259
3260    fn route(&self) -> &'static str {
3261        "envelope"
3262    }
3263
3264    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3265        let envelope_body = self.body.clone();
3266        metric!(histogram(RelayHistograms::UpstreamEnvelopeBodySize) = envelope_body.len() as u64);
3267
3268        let meta = &self.envelope.meta();
3269        let shard = self.envelope.partition_key().map(|p| p.to_string());
3270        builder
3271            .content_encoding(self.http_encoding)
3272            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3273            .header_opt("User-Agent", meta.user_agent())
3274            .header("X-Sentry-Auth", meta.auth_header())
3275            .header("X-Forwarded-For", meta.forwarded_for())
3276            .header("Content-Type", envelope::CONTENT_TYPE)
3277            .header_opt("X-Sentry-Relay-Shard", shard)
3278            .body(envelope_body);
3279
3280        Ok(())
3281    }
3282
3283    fn sign(&mut self) -> Option<Sign> {
3284        Some(Sign::Optional(SignatureType::RequestSign))
3285    }
3286
3287    fn respond(
3288        self: Box<Self>,
3289        result: Result<http::Response, UpstreamRequestError>,
3290    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3291        Box::pin(async move {
3292            let result = match result {
3293                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3294                Err(error) => Err(error),
3295            };
3296
3297            match result {
3298                Ok(()) => self.envelope.accept(),
3299                Err(error) if error.is_received() => {
3300                    let scoping = self.envelope.scoping();
3301                    self.envelope.accept();
3302
3303                    if let UpstreamRequestError::RateLimited(limits) = error {
3304                        self.project_cache
3305                            .get(scoping.project_key)
3306                            .rate_limits()
3307                            .merge(limits.scope(&scoping));
3308                    }
3309                }
3310                Err(error) => {
3311                    // Errors are only logged for what we consider an internal discard reason. These
3312                    // indicate errors in the infrastructure or implementation bugs.
3313                    let mut envelope = self.envelope;
3314                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3315                    relay_log::error!(
3316                        error = &error as &dyn Error,
3317                        tags.project_key = %envelope.scoping().project_key,
3318                        "error sending envelope"
3319                    );
3320                }
3321            }
3322        })
3323    }
3324}
3325
3326/// A container for metric buckets from multiple projects.
3327///
3328/// This container is used to send metrics to the upstream in global batches as part of the
3329/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
3330/// the size of all metrics and allows to split them into multiple batches. See
3331/// [`insert`](Self::insert) for more information.
3332#[derive(Debug)]
3333struct Partition<'a> {
3334    max_size: usize,
3335    remaining: usize,
3336    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3337    project_info: HashMap<ProjectKey, Scoping>,
3338}
3339
3340impl<'a> Partition<'a> {
3341    /// Creates a new partition with the given maximum size in bytes.
3342    pub fn new(size: usize) -> Self {
3343        Self {
3344            max_size: size,
3345            remaining: size,
3346            views: HashMap::new(),
3347            project_info: HashMap::new(),
3348        }
3349    }
3350
3351    /// Inserts a bucket into the partition, splitting it if necessary.
3352    ///
3353    /// This function attempts to add the bucket to this partition. If the bucket does not fit
3354    /// entirely into the partition given its maximum size, the remaining part of the bucket is
3355    /// returned from this function call.
3356    ///
3357    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
3358    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
3359    /// partition. Afterwards, the caller is responsible to call this function again with the
3360    /// remaining bucket until it is fully inserted.
3361    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3362        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3363
3364        if let Some(current) = current {
3365            self.remaining = self.remaining.saturating_sub(current.estimated_size());
3366            self.views
3367                .entry(scoping.project_key)
3368                .or_default()
3369                .push(current);
3370
3371            self.project_info
3372                .entry(scoping.project_key)
3373                .or_insert(scoping);
3374        }
3375
3376        next
3377    }
3378
3379    /// Returns `true` if the partition does not hold any data.
3380    fn is_empty(&self) -> bool {
3381        self.views.is_empty()
3382    }
3383
3384    /// Returns the serialized buckets for this partition.
3385    ///
3386    /// This empties the partition, so that it can be reused.
3387    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3388        #[derive(serde::Serialize)]
3389        struct Wrapper<'a> {
3390            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3391        }
3392
3393        let buckets = &self.views;
3394        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3395
3396        let scopings = self.project_info.clone();
3397        self.project_info.clear();
3398
3399        self.views.clear();
3400        self.remaining = self.max_size;
3401
3402        (payload, scopings)
3403    }
3404}
3405
3406/// An upstream request that submits metric buckets via HTTP.
3407///
3408/// This request is not awaited. It automatically tracks outcomes if the request is not received.
3409#[derive(Debug)]
3410struct SendMetricsRequest {
3411    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
3412    partition_key: String,
3413    /// Serialized metric buckets without encoding applied, used for signing.
3414    unencoded: Bytes,
3415    /// Serialized metric buckets with the stated HTTP encoding applied.
3416    encoded: Bytes,
3417    /// Mapping of all contained project keys to their scoping and extraction mode.
3418    ///
3419    /// Used to track outcomes for transmission failures.
3420    project_info: HashMap<ProjectKey, Scoping>,
3421    /// Encoding (compression) of the payload.
3422    http_encoding: HttpEncoding,
3423    /// Metric outcomes instance to send outcomes on error.
3424    metric_outcomes: MetricOutcomes,
3425}
3426
3427impl SendMetricsRequest {
3428    fn create_error_outcomes(self) {
3429        #[derive(serde::Deserialize)]
3430        struct Wrapper {
3431            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3432        }
3433
3434        let buckets = match serde_json::from_slice(&self.unencoded) {
3435            Ok(Wrapper { buckets }) => buckets,
3436            Err(err) => {
3437                relay_log::error!(
3438                    error = &err as &dyn std::error::Error,
3439                    "failed to parse buckets from failed transmission"
3440                );
3441                return;
3442            }
3443        };
3444
3445        for (key, buckets) in buckets {
3446            let Some(&scoping) = self.project_info.get(&key) else {
3447                relay_log::error!("missing scoping for project key");
3448                continue;
3449            };
3450
3451            self.metric_outcomes.track(
3452                scoping,
3453                &buckets,
3454                Outcome::Invalid(DiscardReason::Internal),
3455            );
3456        }
3457    }
3458}
3459
3460impl UpstreamRequest for SendMetricsRequest {
3461    fn set_relay_id(&self) -> bool {
3462        true
3463    }
3464
3465    fn sign(&mut self) -> Option<Sign> {
3466        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3467    }
3468
3469    fn method(&self) -> reqwest::Method {
3470        reqwest::Method::POST
3471    }
3472
3473    fn path(&self) -> Cow<'_, str> {
3474        "/api/0/relays/metrics/".into()
3475    }
3476
3477    fn route(&self) -> &'static str {
3478        "global_metrics"
3479    }
3480
3481    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3482        metric!(histogram(RelayHistograms::UpstreamMetricsBodySize) = self.encoded.len() as u64);
3483
3484        builder
3485            .content_encoding(self.http_encoding)
3486            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3487            .header(header::CONTENT_TYPE, b"application/json")
3488            .body(self.encoded.clone());
3489
3490        Ok(())
3491    }
3492
3493    fn respond(
3494        self: Box<Self>,
3495        result: Result<http::Response, UpstreamRequestError>,
3496    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3497        Box::pin(async {
3498            match result {
3499                Ok(mut response) => {
3500                    response.consume().await.ok();
3501                }
3502                Err(error) => {
3503                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3504
3505                    // If the request did not arrive at the upstream, we are responsible for outcomes.
3506                    // Otherwise, the upstream is responsible to log outcomes.
3507                    if error.is_received() {
3508                        return;
3509                    }
3510
3511                    self.create_error_outcomes()
3512                }
3513            }
3514        })
3515    }
3516}
3517
3518/// Container for global and project level [`Quota`].
3519#[derive(Copy, Clone, Debug)]
3520struct CombinedQuotas<'a> {
3521    global_quotas: &'a [Quota],
3522    project_quotas: &'a [Quota],
3523}
3524
3525impl<'a> CombinedQuotas<'a> {
3526    /// Returns a new [`CombinedQuotas`].
3527    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3528        Self {
3529            global_quotas: &global_config.quotas,
3530            project_quotas,
3531        }
3532    }
3533
3534    /// Returns `true` if both global quotas and project quotas are empty.
3535    pub fn is_empty(&self) -> bool {
3536        self.len() == 0
3537    }
3538
3539    /// Returns the number of both global and project quotas.
3540    pub fn len(&self) -> usize {
3541        self.global_quotas.len() + self.project_quotas.len()
3542    }
3543}
3544
3545impl<'a> IntoIterator for CombinedQuotas<'a> {
3546    type Item = &'a Quota;
3547    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3548
3549    fn into_iter(self) -> Self::IntoIter {
3550        self.global_quotas.iter().chain(self.project_quotas.iter())
3551    }
3552}
3553
3554#[cfg(test)]
3555mod tests {
3556    use std::collections::BTreeMap;
3557    use std::env;
3558
3559    use insta::assert_debug_snapshot;
3560    use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3561    use relay_common::glob2::LazyGlob;
3562    use relay_dynamic_config::ProjectConfig;
3563    use relay_event_normalization::{
3564        MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3565        TransactionNameRule,
3566    };
3567    use relay_event_schema::protocol::TransactionSource;
3568    use relay_pii::DataScrubbingConfig;
3569    use similar_asserts::assert_eq;
3570
3571    use crate::metrics_extraction::IntoMetric;
3572    use crate::metrics_extraction::transactions::types::{
3573        CommonTags, TransactionMeasurementTags, TransactionMetric,
3574    };
3575    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3576
3577    #[cfg(feature = "processing")]
3578    use {
3579        relay_metrics::BucketValue,
3580        relay_quotas::{QuotaScope, ReasonCode},
3581        relay_test::mock_service,
3582    };
3583
3584    use super::*;
3585
3586    #[cfg(feature = "processing")]
3587    fn mock_quota(id: &str) -> Quota {
3588        Quota {
3589            id: Some(id.into()),
3590            categories: smallvec::smallvec![DataCategory::MetricBucket],
3591            scope: QuotaScope::Organization,
3592            scope_id: None,
3593            limit: Some(0),
3594            window: None,
3595            reason_code: None,
3596            namespace: None,
3597        }
3598    }
3599
3600    #[cfg(feature = "processing")]
3601    #[test]
3602    fn test_dynamic_quotas() {
3603        let global_config = GlobalConfig {
3604            quotas: vec![mock_quota("foo"), mock_quota("bar")],
3605            ..Default::default()
3606        };
3607
3608        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3609
3610        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3611
3612        assert_eq!(dynamic_quotas.len(), 4);
3613        assert!(!dynamic_quotas.is_empty());
3614
3615        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3616        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3617    }
3618
3619    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
3620    /// also ratelimit the next batches in the same message automatically.
3621    #[cfg(feature = "processing")]
3622    #[tokio::test]
3623    async fn test_ratelimit_per_batch() {
3624        use relay_base_schema::organization::OrganizationId;
3625        use relay_protocol::FiniteF64;
3626
3627        let rate_limited_org = Scoping {
3628            organization_id: OrganizationId::new(1),
3629            project_id: ProjectId::new(21),
3630            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3631            key_id: Some(17),
3632        };
3633
3634        let not_rate_limited_org = Scoping {
3635            organization_id: OrganizationId::new(2),
3636            project_id: ProjectId::new(21),
3637            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3638            key_id: Some(17),
3639        };
3640
3641        let message = {
3642            let project_info = {
3643                let quota = Quota {
3644                    id: Some("testing".into()),
3645                    categories: vec![DataCategory::MetricBucket].into(),
3646                    scope: relay_quotas::QuotaScope::Organization,
3647                    scope_id: Some(rate_limited_org.organization_id.to_string()),
3648                    limit: Some(0),
3649                    window: None,
3650                    reason_code: Some(ReasonCode::new("test")),
3651                    namespace: None,
3652                };
3653
3654                let mut config = ProjectConfig::default();
3655                config.quotas.push(quota);
3656
3657                Arc::new(ProjectInfo {
3658                    config,
3659                    ..Default::default()
3660                })
3661            };
3662
3663            let project_metrics = |scoping| ProjectBuckets {
3664                buckets: vec![Bucket {
3665                    name: "d:transactions/bar".into(),
3666                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3667                    timestamp: UnixTimestamp::now(),
3668                    tags: Default::default(),
3669                    width: 10,
3670                    metadata: BucketMetadata::default(),
3671                }],
3672                rate_limits: Default::default(),
3673                project_info: project_info.clone(),
3674                scoping,
3675            };
3676
3677            let buckets = hashbrown::HashMap::from([
3678                (
3679                    rate_limited_org.project_key,
3680                    project_metrics(rate_limited_org),
3681                ),
3682                (
3683                    not_rate_limited_org.project_key,
3684                    project_metrics(not_rate_limited_org),
3685                ),
3686            ]);
3687
3688            FlushBuckets {
3689                partition_key: 0,
3690                buckets,
3691            }
3692        };
3693
3694        // ensure the order of the map while iterating is as expected.
3695        assert_eq!(message.buckets.keys().count(), 2);
3696
3697        let config = {
3698            let config_json = serde_json::json!({
3699                "processing": {
3700                    "enabled": true,
3701                    "kafka_config": [],
3702                    "redis": {
3703                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3704                    }
3705                }
3706            });
3707            Config::from_json_value(config_json).unwrap()
3708        };
3709
3710        let (store, handle) = {
3711            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3712                let org_id = match msg {
3713                    Store::Metrics(x) => x.scoping.organization_id,
3714                    _ => panic!("received envelope when expecting only metrics"),
3715                };
3716                org_ids.push(org_id);
3717            };
3718
3719            mock_service("store_forwarder", vec![], f)
3720        };
3721
3722        let processor = create_test_processor(config).await;
3723        assert!(processor.redis_rate_limiter_enabled());
3724
3725        processor.encode_metrics_processing(message, &store).await;
3726
3727        drop(store);
3728        let orgs_not_ratelimited = handle.await.unwrap();
3729
3730        assert_eq!(
3731            orgs_not_ratelimited,
3732            vec![not_rate_limited_org.organization_id]
3733        );
3734    }
3735
3736    #[tokio::test]
3737    async fn test_browser_version_extraction_with_pii_like_data() {
3738        let processor = create_test_processor(Default::default()).await;
3739        let outcome_aggregator = Addr::dummy();
3740        let event_id = EventId::new();
3741
3742        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3743            .parse()
3744            .unwrap();
3745
3746        let request_meta = RequestMeta::new(dsn);
3747        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3748
3749        envelope.add_item({
3750                let mut item = Item::new(ItemType::Event);
3751                item.set_payload(
3752                    ContentType::Json,
3753                    r#"
3754                    {
3755                        "request": {
3756                            "headers": [
3757                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3758                            ]
3759                        }
3760                    }
3761                "#,
3762                );
3763                item
3764            });
3765
3766        let mut datascrubbing_settings = DataScrubbingConfig::default();
3767        // enable all the default scrubbing
3768        datascrubbing_settings.scrub_data = true;
3769        datascrubbing_settings.scrub_defaults = true;
3770        datascrubbing_settings.scrub_ip_addresses = true;
3771
3772        // Make sure to mask any IP-like looking data
3773        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3774
3775        let config = ProjectConfig {
3776            datascrubbing_settings,
3777            pii_config: Some(pii_config),
3778            ..Default::default()
3779        };
3780
3781        let project_info = ProjectInfo {
3782            config,
3783            ..Default::default()
3784        };
3785
3786        let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3787        assert_eq!(envelopes.len(), 1);
3788
3789        let (group, envelope) = envelopes.pop().unwrap();
3790        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3791
3792        let message = ProcessEnvelopeGrouped {
3793            group,
3794            envelope,
3795            ctx: processing::Context {
3796                project_info: &project_info,
3797                ..processing::Context::for_test()
3798            },
3799            reservoir_counters: &ReservoirCounters::default(),
3800        };
3801
3802        let Ok(Some(Submit::Envelope(mut new_envelope))) =
3803            processor.process(&mut Token::noop(), message).await
3804        else {
3805            panic!();
3806        };
3807        let new_envelope = new_envelope.envelope_mut();
3808
3809        let event_item = new_envelope.items().last().unwrap();
3810        let annotated_event: Annotated<Event> =
3811            Annotated::from_json_bytes(&event_item.payload()).unwrap();
3812        let event = annotated_event.into_value().unwrap();
3813        let headers = event
3814            .request
3815            .into_value()
3816            .unwrap()
3817            .headers
3818            .into_value()
3819            .unwrap();
3820
3821        // IP-like data must be masked
3822        assert_eq!(
3823            Some(
3824                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3825            ),
3826            headers.get_header("User-Agent")
3827        );
3828        // But we still get correct browser and version number
3829        let contexts = event.contexts.into_value().unwrap();
3830        let browser = contexts.0.get("browser").unwrap();
3831        assert_eq!(
3832            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3833            browser.to_json().unwrap()
3834        );
3835    }
3836
3837    #[tokio::test]
3838    #[cfg(feature = "processing")]
3839    async fn test_materialize_dsc() {
3840        use crate::services::projects::project::PublicKeyConfig;
3841
3842        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3843            .parse()
3844            .unwrap();
3845        let request_meta = RequestMeta::new(dsn);
3846        let mut envelope = Envelope::from_request(None, request_meta);
3847
3848        let dsc = r#"{
3849            "trace_id": "00000000-0000-0000-0000-000000000000",
3850            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3851            "sample_rate": "0.2"
3852        }"#;
3853        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3854
3855        let mut item = Item::new(ItemType::Event);
3856        item.set_payload(ContentType::Json, r#"{}"#);
3857        envelope.add_item(item);
3858
3859        let outcome_aggregator = Addr::dummy();
3860        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3861
3862        let mut project_info = ProjectInfo::default();
3863        project_info.public_keys.push(PublicKeyConfig {
3864            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3865            numeric_id: Some(1),
3866        });
3867
3868        let config = serde_json::json!({
3869            "processing": {
3870                "enabled": true,
3871                "kafka_config": [],
3872            }
3873        });
3874
3875        let message = ProcessEnvelopeGrouped {
3876            group: ProcessingGroup::Transaction,
3877            envelope: managed_envelope,
3878            ctx: processing::Context {
3879                config: &Config::from_json_value(config.clone()).unwrap(),
3880                project_info: &project_info,
3881                sampling_project_info: Some(&project_info),
3882                ..processing::Context::for_test()
3883            },
3884            reservoir_counters: &ReservoirCounters::default(),
3885        };
3886
3887        let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3888        let Ok(Some(Submit::Envelope(envelope))) =
3889            processor.process(&mut Token::noop(), message).await
3890        else {
3891            panic!();
3892        };
3893        let event = envelope
3894            .envelope()
3895            .get_item_by(|item| item.ty() == &ItemType::Event)
3896            .unwrap();
3897
3898        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3899        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3900        Object(
3901            {
3902                "environment": ~,
3903                "public_key": String(
3904                    "e12d836b15bb49d7bbf99e64295d995b",
3905                ),
3906                "release": ~,
3907                "replay_id": ~,
3908                "sample_rate": String(
3909                    "0.2",
3910                ),
3911                "trace_id": String(
3912                    "00000000000000000000000000000000",
3913                ),
3914                "transaction": ~,
3915            },
3916        )
3917        "###);
3918    }
3919
3920    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3921        let mut event = Annotated::<Event>::from_json(
3922            r#"
3923            {
3924                "type": "transaction",
3925                "transaction": "/foo/",
3926                "timestamp": 946684810.0,
3927                "start_timestamp": 946684800.0,
3928                "contexts": {
3929                    "trace": {
3930                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3931                        "span_id": "fa90fdead5f74053",
3932                        "op": "http.server",
3933                        "type": "trace"
3934                    }
3935                },
3936                "transaction_info": {
3937                    "source": "url"
3938                }
3939            }
3940            "#,
3941        )
3942        .unwrap();
3943        let e = event.value_mut().as_mut().unwrap();
3944        e.transaction.set_value(Some(transaction_name.into()));
3945
3946        e.transaction_info
3947            .value_mut()
3948            .as_mut()
3949            .unwrap()
3950            .source
3951            .set_value(Some(source));
3952
3953        relay_statsd::with_capturing_test_client(|| {
3954            utils::log_transaction_name_metrics(&mut event, |event| {
3955                let config = NormalizationConfig {
3956                    transaction_name_config: TransactionNameConfig {
3957                        rules: &[TransactionNameRule {
3958                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
3959                            expiry: DateTime::<Utc>::MAX_UTC,
3960                            redaction: RedactionRule::Replace {
3961                                substitution: "*".to_owned(),
3962                            },
3963                        }],
3964                    },
3965                    ..Default::default()
3966                };
3967                relay_event_normalization::normalize_event(event, &config)
3968            });
3969        })
3970    }
3971
3972    #[test]
3973    fn test_log_transaction_metrics_none() {
3974        let captures = capture_test_event("/nothing", TransactionSource::Url);
3975        insta::assert_debug_snapshot!(captures, @r###"
3976        [
3977            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3978        ]
3979        "###);
3980    }
3981
3982    #[test]
3983    fn test_log_transaction_metrics_rule() {
3984        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3985        insta::assert_debug_snapshot!(captures, @r###"
3986        [
3987            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3988        ]
3989        "###);
3990    }
3991
3992    #[test]
3993    fn test_log_transaction_metrics_pattern() {
3994        let captures = capture_test_event("/something/12345", TransactionSource::Url);
3995        insta::assert_debug_snapshot!(captures, @r###"
3996        [
3997            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3998        ]
3999        "###);
4000    }
4001
4002    #[test]
4003    fn test_log_transaction_metrics_both() {
4004        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
4005        insta::assert_debug_snapshot!(captures, @r###"
4006        [
4007            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
4008        ]
4009        "###);
4010    }
4011
4012    #[test]
4013    fn test_log_transaction_metrics_no_match() {
4014        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
4015        insta::assert_debug_snapshot!(captures, @r###"
4016        [
4017            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
4018        ]
4019        "###);
4020    }
4021
4022    /// Confirms that the hardcoded value we use for the fixed length of the measurement MRI is
4023    /// correct. Unit test is placed here because it has dependencies to relay-server and therefore
4024    /// cannot be called from relay-metrics.
4025    #[test]
4026    fn test_mri_overhead_constant() {
4027        let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
4028
4029        let derived_value = {
4030            let name = "foobar".to_owned();
4031            let value = 5.into(); // Arbitrary value.
4032            let unit = MetricUnit::Duration(DurationUnit::default());
4033            let tags = TransactionMeasurementTags {
4034                measurement_rating: None,
4035                universal_tags: CommonTags(BTreeMap::new()),
4036                score_profile_version: None,
4037            };
4038
4039            let measurement = TransactionMetric::Measurement {
4040                name: name.clone(),
4041                value,
4042                unit,
4043                tags,
4044            };
4045
4046            let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
4047            metric.name.len() - unit.to_string().len() - name.len()
4048        };
4049        assert_eq!(
4050            hardcoded_value, derived_value,
4051            "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
4052        );
4053    }
4054
4055    #[tokio::test]
4056    async fn test_process_metrics_bucket_metadata() {
4057        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
4058        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
4059        let received_at = Utc::now();
4060        let config = Config::default();
4061
4062        let (aggregator, mut aggregator_rx) = Addr::custom();
4063        let processor = create_test_processor_with_addrs(
4064            config,
4065            Addrs {
4066                aggregator,
4067                ..Default::default()
4068            },
4069        )
4070        .await;
4071
4072        let mut item = Item::new(ItemType::Statsd);
4073        item.set_payload(
4074            ContentType::Text,
4075            "transactions/foo:3182887624:4267882815|s",
4076        );
4077        for (source, expected_received_at) in [
4078            (
4079                BucketSource::External,
4080                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
4081            ),
4082            (BucketSource::Internal, None),
4083        ] {
4084            let message = ProcessMetrics {
4085                data: MetricData::Raw(vec![item.clone()]),
4086                project_key,
4087                source,
4088                received_at,
4089                sent_at: Some(Utc::now()),
4090            };
4091            processor.handle_process_metrics(&mut token, message);
4092
4093            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
4094            let buckets = merge_buckets.buckets;
4095            assert_eq!(buckets.len(), 1);
4096            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
4097        }
4098    }
4099
4100    #[tokio::test]
4101    async fn test_process_batched_metrics() {
4102        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
4103        let received_at = Utc::now();
4104        let config = Config::default();
4105
4106        let (aggregator, mut aggregator_rx) = Addr::custom();
4107        let processor = create_test_processor_with_addrs(
4108            config,
4109            Addrs {
4110                aggregator,
4111                ..Default::default()
4112            },
4113        )
4114        .await;
4115
4116        let payload = r#"{
4117    "buckets": {
4118        "11111111111111111111111111111111": [
4119            {
4120                "timestamp": 1615889440,
4121                "width": 0,
4122                "name": "d:custom/endpoint.response_time@millisecond",
4123                "type": "d",
4124                "value": [
4125                  68.0
4126                ],
4127                "tags": {
4128                  "route": "user_index"
4129                }
4130            }
4131        ],
4132        "22222222222222222222222222222222": [
4133            {
4134                "timestamp": 1615889440,
4135                "width": 0,
4136                "name": "d:custom/endpoint.cache_rate@none",
4137                "type": "d",
4138                "value": [
4139                  36.0
4140                ]
4141            }
4142        ]
4143    }
4144}
4145"#;
4146        let message = ProcessBatchedMetrics {
4147            payload: Bytes::from(payload),
4148            source: BucketSource::Internal,
4149            received_at,
4150            sent_at: Some(Utc::now()),
4151        };
4152        processor.handle_process_batched_metrics(&mut token, message);
4153
4154        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
4155        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
4156
4157        let mut messages = vec![mb1, mb2];
4158        messages.sort_by_key(|mb| mb.project_key);
4159
4160        let actual = messages
4161            .into_iter()
4162            .map(|mb| (mb.project_key, mb.buckets))
4163            .collect::<Vec<_>>();
4164
4165        assert_debug_snapshot!(actual, @r###"
4166        [
4167            (
4168                ProjectKey("11111111111111111111111111111111"),
4169                [
4170                    Bucket {
4171                        timestamp: UnixTimestamp(1615889440),
4172                        width: 0,
4173                        name: MetricName(
4174                            "d:custom/endpoint.response_time@millisecond",
4175                        ),
4176                        value: Distribution(
4177                            [
4178                                68.0,
4179                            ],
4180                        ),
4181                        tags: {
4182                            "route": "user_index",
4183                        },
4184                        metadata: BucketMetadata {
4185                            merges: 1,
4186                            received_at: None,
4187                            extracted_from_indexed: false,
4188                        },
4189                    },
4190                ],
4191            ),
4192            (
4193                ProjectKey("22222222222222222222222222222222"),
4194                [
4195                    Bucket {
4196                        timestamp: UnixTimestamp(1615889440),
4197                        width: 0,
4198                        name: MetricName(
4199                            "d:custom/endpoint.cache_rate@none",
4200                        ),
4201                        value: Distribution(
4202                            [
4203                                36.0,
4204                            ],
4205                        ),
4206                        tags: {},
4207                        metadata: BucketMetadata {
4208                            merges: 1,
4209                            received_at: None,
4210                            extracted_from_indexed: false,
4211                        },
4212                    },
4213                ],
4214            ),
4215        ]
4216        "###);
4217    }
4218}