Skip to main content

relay_server/services/
processor.rs

1use std::borrow::Cow;
2use std::collections::{BTreeMap, BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::Debug;
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::future::BoxFuture;
18use relay_base_schema::project::{ProjectId, ProjectKey};
19use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
20use relay_common::time::UnixTimestamp;
21use relay_config::{Config, EmitOutcomes, HttpEncoding, UpstreamDescriptor};
22use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
23use relay_event_schema::processor::ProcessingAction;
24use relay_event_schema::protocol::ClientReport;
25use relay_filter::FilterStatKey;
26use relay_log::sentry::SentryFutureExt;
27use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
28use relay_quotas::{RateLimits, Scoping};
29use relay_sampling::evaluation::SamplingDecision;
30use relay_statsd::metric;
31use relay_system::{Addr, FromMessage, NoResponse, Service};
32use reqwest::header;
33use zstd::stream::Encoder as ZstdEncoder;
34
35use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType};
36use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
37use crate::managed::ManagedEnvelope;
38use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
39use crate::metrics_extraction::ExtractedMetrics;
40use crate::processing::errors::SwitchProcessingError;
41use crate::processing::relay::RelayProcessor;
42use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
43use crate::service::ServiceError;
44use crate::services::global_config::GlobalConfigHandle;
45use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
46use crate::services::outcome::{self, DiscardItemType, DiscardReason, Outcome, TrackOutcome};
47use crate::services::projects::cache::ProjectCacheHandle;
48use crate::services::projects::project::{ProjectInfo, ProjectState};
49use crate::services::upstream::{
50    SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
51};
52use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
53use crate::utils;
54use crate::{http, processing};
55use relay_threading::AsyncPool;
56use symbolic_unreal::{Unreal4Error, Unreal4ErrorKind};
57#[cfg(feature = "processing")]
58use {
59    crate::services::objectstore::Objectstore,
60    crate::services::store::Store,
61    itertools::Itertools,
62    relay_dynamic_config::GlobalConfig,
63    relay_quotas::{Quota, RateLimitingError, RedisRateLimiter},
64    relay_redis::RedisClients,
65    std::time::Instant,
66};
67
68mod metrics;
69
70/// The minimum clock drift for correction to apply.
71pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
72
73/// An error returned when handling [`ProcessEnvelope`].
74#[derive(Debug, thiserror::Error)]
75pub enum ProcessingError {
76    #[error("invalid json in event")]
77    InvalidJson(#[source] serde_json::Error),
78
79    #[error("invalid message pack event payload")]
80    InvalidMsgpack(#[from] rmp_serde::decode::Error),
81
82    #[error("invalid unreal crash report")]
83    InvalidUnrealReport(#[source] Unreal4Error),
84
85    #[error("event payload too large")]
86    PayloadTooLarge(DiscardItemType),
87
88    #[error("invalid transaction event")]
89    InvalidTransaction,
90
91    #[error("the item is not allowed/supported in this envelope")]
92    UnsupportedItem,
93
94    #[error("envelope processor failed")]
95    ProcessingFailed(#[from] ProcessingAction),
96
97    #[error("duplicate {0} in event")]
98    DuplicateItem(ItemType),
99
100    #[error("failed to extract event payload")]
101    NoEventPayload,
102
103    #[error("invalid security report type: {0:?}")]
104    InvalidSecurityType(Bytes),
105
106    #[error("unsupported security report type")]
107    UnsupportedSecurityType,
108
109    #[error("invalid security report")]
110    InvalidSecurityReport(#[source] serde_json::Error),
111
112    #[error("event filtered with reason: {0:?}")]
113    EventFiltered(FilterStatKey),
114
115    #[error("could not serialize event payload")]
116    SerializeFailed(#[source] serde_json::Error),
117
118    #[cfg(feature = "processing")]
119    #[error("failed to apply quotas")]
120    QuotasFailed(#[from] RateLimitingError),
121
122    #[error("nintendo switch dying message processing failed {0:?}")]
123    InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
124
125    #[cfg(all(sentry, feature = "processing"))]
126    #[error("playstation dump processing failed: {0}")]
127    InvalidPlaystationDump(String),
128
129    #[cfg(feature = "processing")]
130    #[error("invalid attachment reference")]
131    InvalidAttachmentRef,
132}
133
134impl ProcessingError {
135    pub fn to_outcome(&self) -> Option<Outcome> {
136        match self {
137            Self::PayloadTooLarge(payload_type) => {
138                Some(Outcome::Invalid(DiscardReason::ItemTooLarge(*payload_type)))
139            }
140            Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
141            Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
142            Self::InvalidSecurityType(_) => {
143                Some(Outcome::Invalid(DiscardReason::SecurityReportType))
144            }
145            Self::UnsupportedItem => Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)),
146            Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
147            Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
148            Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
149            Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
150            Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
151            Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
152            #[cfg(all(sentry, feature = "processing"))]
153            Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
154            Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
155                Some(Outcome::Invalid(DiscardReason::InvalidCompression))
156            }
157            Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
158            Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
159                Some(Outcome::Invalid(DiscardReason::Internal))
160            }
161            #[cfg(feature = "processing")]
162            Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
163            Self::EventFiltered(key) => Some(Outcome::Filtered(key.clone())),
164
165            #[cfg(feature = "processing")]
166            Self::InvalidAttachmentRef => {
167                Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
168            }
169        }
170    }
171}
172
173impl From<Unreal4Error> for ProcessingError {
174    fn from(err: Unreal4Error) -> Self {
175        match err.kind() {
176            Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
177            _ => ProcessingError::InvalidUnrealReport(err),
178        }
179    }
180}
181
182/// A container for extracted metrics during processing.
183///
184/// The container enforces that the extracted metrics are correctly tagged
185/// with the dynamic sampling decision.
186#[derive(Debug)]
187pub struct ProcessingExtractedMetrics {
188    metrics: ExtractedMetrics,
189}
190
191impl ProcessingExtractedMetrics {
192    pub fn new() -> Self {
193        Self {
194            metrics: ExtractedMetrics::default(),
195        }
196    }
197
198    pub fn into_inner(self) -> ExtractedMetrics {
199        self.metrics
200    }
201
202    /// Extends the contained metrics with [`ExtractedMetrics`].
203    pub fn extend(
204        &mut self,
205        extracted: ExtractedMetrics,
206        sampling_decision: Option<SamplingDecision>,
207    ) {
208        self.extend_project_metrics(extracted.project_metrics, sampling_decision);
209        self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
210    }
211
212    /// Extends the contained project metrics.
213    pub fn extend_project_metrics<I>(
214        &mut self,
215        buckets: I,
216        sampling_decision: Option<SamplingDecision>,
217    ) where
218        I: IntoIterator<Item = Bucket>,
219    {
220        self.metrics
221            .project_metrics
222            .extend(buckets.into_iter().map(|mut bucket| {
223                bucket.metadata.extracted_from_indexed =
224                    sampling_decision == Some(SamplingDecision::Keep);
225                bucket
226            }));
227    }
228
229    /// Extends the contained sampling metrics.
230    pub fn extend_sampling_metrics<I>(
231        &mut self,
232        buckets: I,
233        sampling_decision: Option<SamplingDecision>,
234    ) where
235        I: IntoIterator<Item = Bucket>,
236    {
237        self.metrics
238            .sampling_metrics
239            .extend(buckets.into_iter().map(|mut bucket| {
240                bucket.metadata.extracted_from_indexed =
241                    sampling_decision == Some(SamplingDecision::Keep);
242                bucket
243            }));
244    }
245}
246
247fn send_metrics(
248    metrics: ExtractedMetrics,
249    project_key: ProjectKey,
250    sampling_key: Option<ProjectKey>,
251    aggregator: &Addr<Aggregator>,
252) {
253    let ExtractedMetrics {
254        project_metrics,
255        sampling_metrics,
256    } = metrics;
257
258    if !project_metrics.is_empty() {
259        aggregator.send(MergeBuckets {
260            project_key,
261            buckets: project_metrics,
262        });
263    }
264
265    if !sampling_metrics.is_empty() {
266        // If no sampling project state is available, we associate the sampling
267        // metrics with the current project.
268        //
269        // project_without_tracing         -> metrics goes to self
270        // dependent_project_with_tracing  -> metrics goes to root
271        // root_project_with_tracing       -> metrics goes to root == self
272        let sampling_project_key = sampling_key.unwrap_or(project_key);
273        aggregator.send(MergeBuckets {
274            project_key: sampling_project_key,
275            buckets: sampling_metrics,
276        });
277    }
278}
279
280/// Applies processing to all contents of the given envelope.
281///
282/// Depending on the contents of the envelope and Relay's mode, this includes:
283///
284///  - Basic normalization and validation for all item types.
285///  - Clock drift correction if the required `sent_at` header is present.
286///  - Expansion of certain item types (e.g. unreal).
287///  - Store normalization for event payloads in processing mode.
288///  - Rate limiters and inbound filters on events in processing mode.
289#[derive(Debug)]
290pub struct ProcessEnvelope {
291    /// Envelope to process.
292    pub envelope: ManagedEnvelope,
293    /// The project info.
294    pub project_info: Arc<ProjectInfo>,
295    /// Currently active cached rate limits for this project.
296    pub rate_limits: Arc<RateLimits>,
297    /// Root sampling project info.
298    pub sampling_project_info: Option<Arc<ProjectInfo>>,
299}
300
301/// Parses a list of metrics or metric buckets and pushes them to the project's aggregator.
302///
303/// This parses and validates the metrics:
304///  - For [`Metrics`](ItemType::Statsd), each metric is parsed separately, and invalid metrics are
305///    ignored independently.
306///  - For [`MetricBuckets`](ItemType::MetricBuckets), the entire list of buckets is parsed and
307///    dropped together on parsing failure.
308///  - Other envelope items will be ignored with an error message.
309///
310/// Additionally, processing applies clock drift correction using the system clock of this Relay, if
311/// the Envelope specifies the [`sent_at`](Envelope::sent_at) header.
312#[derive(Debug)]
313pub struct ProcessMetrics {
314    /// A list of metric items.
315    pub data: MetricData,
316    /// The target project.
317    pub project_key: ProjectKey,
318    /// Whether to keep or reset the metric metadata.
319    pub source: BucketSource,
320    /// The wall clock time at which the request was received.
321    pub received_at: DateTime<Utc>,
322    /// The value of the Envelope's [`sent_at`](Envelope::sent_at) header for clock drift
323    /// correction.
324    pub sent_at: Option<DateTime<Utc>>,
325}
326
327/// Raw unparsed metric data.
328#[derive(Debug)]
329pub enum MetricData {
330    /// Raw data, unparsed envelope items.
331    Raw(Vec<Item>),
332    /// Already parsed buckets but unprocessed.
333    Parsed(Vec<Bucket>),
334}
335
336impl MetricData {
337    /// Consumes the metric data and parses the contained buckets.
338    ///
339    /// If the contained data is already parsed the buckets are returned unchanged.
340    /// Raw buckets are parsed and created with the passed `timestamp`.
341    fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
342        let items = match self {
343            Self::Parsed(buckets) => return buckets,
344            Self::Raw(items) => items,
345        };
346
347        let mut buckets = Vec::new();
348        for item in items {
349            let payload = item.payload();
350            if item.ty() == &ItemType::Statsd {
351                for bucket_result in Bucket::parse_all(&payload, timestamp) {
352                    match bucket_result {
353                        Ok(bucket) => buckets.push(bucket),
354                        Err(error) => relay_log::debug!(
355                            error = &error as &dyn Error,
356                            "failed to parse metric bucket from statsd format",
357                        ),
358                    }
359                }
360            } else if item.ty() == &ItemType::MetricBuckets {
361                match serde_json::from_slice::<Vec<Bucket>>(&payload) {
362                    Ok(parsed_buckets) => {
363                        // Re-use the allocation of `b` if possible.
364                        if buckets.is_empty() {
365                            buckets = parsed_buckets;
366                        } else {
367                            buckets.extend(parsed_buckets);
368                        }
369                    }
370                    Err(error) => {
371                        relay_log::debug!(
372                            error = &error as &dyn Error,
373                            "failed to parse metric bucket",
374                        );
375                        metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
376                    }
377                }
378            } else {
379                relay_log::error!(
380                    "invalid item of type {} passed to ProcessMetrics",
381                    item.ty()
382                );
383            }
384        }
385        buckets
386    }
387}
388
389#[derive(Debug)]
390pub struct ProcessBatchedMetrics {
391    /// Metrics payload in JSON format.
392    pub payload: Bytes,
393    /// Whether to keep or reset the metric metadata.
394    pub source: BucketSource,
395    /// The wall clock time at which the request was received.
396    pub received_at: DateTime<Utc>,
397    /// The wall clock time at which the request was received.
398    pub sent_at: Option<DateTime<Utc>>,
399}
400
401/// Source information where a metric bucket originates from.
402#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
403pub enum BucketSource {
404    /// The metric bucket originated from an internal Relay use case.
405    ///
406    /// The metric bucket originates either from within the same Relay
407    /// or was accepted coming from another Relay which is registered as
408    /// an internal Relay via Relay's configuration.
409    Internal,
410    /// The bucket source originated from an untrusted source.
411    ///
412    /// Managed Relays sending extracted metrics are considered external,
413    /// it's a project use case but it comes from an untrusted source.
414    External,
415}
416
417impl BucketSource {
418    /// Infers the bucket source from [`RequestMeta::request_trust`].
419    pub fn from_meta(meta: &RequestMeta) -> Self {
420        match meta.request_trust() {
421            RequestTrust::Trusted => Self::Internal,
422            RequestTrust::Untrusted => Self::External,
423        }
424    }
425}
426
427/// Sends a client report to the upstream.
428#[derive(Debug)]
429pub struct SubmitClientReports {
430    /// The client report to be sent.
431    pub client_reports: Vec<ClientReport>,
432    /// Scoping information for the client report.
433    pub scoping: Scoping,
434}
435
436/// CPU-intensive processing tasks for envelopes.
437#[derive(Debug)]
438pub enum EnvelopeProcessor {
439    ProcessEnvelope(Box<ProcessEnvelope>),
440    ProcessProjectMetrics(Box<ProcessMetrics>),
441    ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
442    FlushBuckets(Box<FlushBuckets>),
443    SubmitClientReports(Box<SubmitClientReports>),
444}
445
446impl EnvelopeProcessor {
447    /// Returns the name of the message variant.
448    pub fn variant(&self) -> &'static str {
449        match self {
450            EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
451            EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
452            EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
453            EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
454            EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
455        }
456    }
457}
458
459impl relay_system::Interface for EnvelopeProcessor {}
460
461impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
462    type Response = relay_system::NoResponse;
463
464    fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
465        Self::ProcessEnvelope(Box::new(message))
466    }
467}
468
469impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
470    type Response = NoResponse;
471
472    fn from_message(message: ProcessMetrics, _: ()) -> Self {
473        Self::ProcessProjectMetrics(Box::new(message))
474    }
475}
476
477impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
478    type Response = NoResponse;
479
480    fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
481        Self::ProcessBatchedMetrics(Box::new(message))
482    }
483}
484
485impl FromMessage<FlushBuckets> for EnvelopeProcessor {
486    type Response = NoResponse;
487
488    fn from_message(message: FlushBuckets, _: ()) -> Self {
489        Self::FlushBuckets(Box::new(message))
490    }
491}
492
493impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
494    type Response = NoResponse;
495
496    fn from_message(message: SubmitClientReports, _: ()) -> Self {
497        Self::SubmitClientReports(Box::new(message))
498    }
499}
500
501/// The asynchronous thread pool used for scheduling processing tasks in the processor.
502pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
503
504/// Service implementing the [`EnvelopeProcessor`] interface.
505///
506/// This service handles messages in a worker pool with configurable concurrency.
507#[derive(Clone)]
508pub struct EnvelopeProcessorService {
509    inner: Arc<InnerProcessor>,
510}
511
512/// Contains the addresses of services that the processor publishes to.
513pub struct Addrs {
514    pub outcome_aggregator: Addr<TrackOutcome>,
515    pub upstream_relay: Addr<UpstreamRelay>,
516    #[cfg(feature = "processing")]
517    pub objectstore: Option<Addr<Objectstore>>,
518    #[cfg(feature = "processing")]
519    pub store_forwarder: Option<Addr<Store>>,
520    pub aggregator: Addr<Aggregator>,
521}
522
523impl Default for Addrs {
524    fn default() -> Self {
525        Addrs {
526            outcome_aggregator: Addr::dummy(),
527            upstream_relay: Addr::dummy(),
528            #[cfg(feature = "processing")]
529            objectstore: None,
530            #[cfg(feature = "processing")]
531            store_forwarder: None,
532            aggregator: Addr::dummy(),
533        }
534    }
535}
536
537struct InnerProcessor {
538    pool: EnvelopeProcessorServicePool,
539    config: Arc<Config>,
540    global_config: GlobalConfigHandle,
541    project_cache: ProjectCacheHandle,
542    cogs: Cogs,
543    addrs: Addrs,
544    #[cfg(feature = "processing")]
545    rate_limiter: Option<Arc<RedisRateLimiter>>,
546    metric_outcomes: MetricOutcomes,
547    processor: RelayProcessor,
548}
549
550impl EnvelopeProcessorService {
551    /// Creates a multi-threaded envelope processor.
552    #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
553    pub fn new(
554        pool: EnvelopeProcessorServicePool,
555        config: Arc<Config>,
556        global_config: GlobalConfigHandle,
557        project_cache: ProjectCacheHandle,
558        cogs: Cogs,
559        #[cfg(feature = "processing")] redis: Option<RedisClients>,
560        addrs: Addrs,
561        metric_outcomes: MetricOutcomes,
562    ) -> Self {
563        let geoip_lookup = config
564            .geoip_path()
565            .and_then(
566                |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
567                    Ok(geoip) => Some(geoip),
568                    Err(err) => {
569                        relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
570                        None
571                    }
572                },
573            )
574            .unwrap_or_else(GeoIpLookup::empty);
575
576        if let Some(build_epoch) = geoip_lookup.build_epoch() {
577            relay_log::info!("Loaded GeoIP database (build: {build_epoch})");
578        }
579
580        #[cfg(feature = "processing")]
581        let rate_limiter = redis.map(|redis| {
582            RedisRateLimiter::new(redis.quotas)
583                .max_limit(config.max_rate_limit())
584                .cache(config.quota_cache_ratio(), config.quota_cache_max())
585        });
586
587        let quota_limiter = Arc::new(QuotaRateLimiter::new(
588            #[cfg(feature = "processing")]
589            project_cache.clone(),
590            #[cfg(feature = "processing")]
591            rate_limiter.clone(),
592        ));
593        #[cfg(feature = "processing")]
594        let rate_limiter = rate_limiter.map(Arc::new);
595        let inner = InnerProcessor {
596            pool,
597            global_config,
598            project_cache,
599            #[cfg(feature = "processing")]
600            rate_limiter,
601            processor: RelayProcessor::new(
602                cogs.clone(),
603                &quota_limiter,
604                &geoip_lookup,
605                addrs.outcome_aggregator.clone(),
606            ),
607            cogs,
608            addrs,
609            metric_outcomes,
610            config,
611        };
612
613        Self {
614            inner: Arc::new(inner),
615        }
616    }
617
618    async fn process_envelope(
619        &self,
620        project_id: ProjectId,
621        mut envelope: ManagedEnvelope,
622        ctx: processing::Context<'_>,
623    ) -> Vec<Output<Outputs>> {
624        // Pre-process the envelope headers.
625        if let Some(sampling_state) = ctx.sampling_project_info {
626            // Both transactions and standalone span envelopes need a normalized DSC header
627            // to make sampling rules based on the segment/transaction name work correctly.
628            envelope
629                .envelope_mut()
630                .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
631        }
632
633        // Set the event retention. Effectively, this value will only be available in processing
634        // mode when the full project config is queried from the upstream.
635        if let Some(retention) = ctx.project_info.config.event_retention {
636            envelope.envelope_mut().set_retention(retention);
637        }
638
639        // Ensure the project ID is updated to the stored instance for this project cache. This can
640        // differ in two cases:
641        //  1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
642        //  2. The DSN was moved and the envelope sent to the old project ID.
643        envelope
644            .envelope_mut()
645            .meta_mut()
646            .set_project_id(project_id);
647
648        self.inner.processor.run(envelope, ctx).await
649    }
650
651    /// Processes the envelope and returns the processed envelope back.
652    ///
653    /// Returns `Some` if the envelope passed inbound filtering and rate limiting. Invalid items are
654    /// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
655    /// to be dropped, this is `None`.
656    async fn process<'a>(
657        &self,
658        mut envelope: ManagedEnvelope,
659        ctx: processing::Context<'a>,
660    ) -> Vec<Output<Outputs>> {
661        // Prefer the project's project ID, and fall back to the stated project id from the
662        // envelope. The project ID is available in all modes, other than in proxy mode, where
663        // envelopes for unknown projects are forwarded blindly.
664        //
665        // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
666        // since we cannot process an envelope without project ID, so drop it.
667        let Some(project_id) = ctx
668            .project_info
669            .project_id
670            .or_else(|| envelope.envelope().meta().project_id())
671        else {
672            relay_log::error!(
673                tags.project_key = %envelope.envelope().meta().public_key(),
674                "project info does not contain project id"
675            );
676            envelope.reject(Outcome::Invalid(DiscardReason::Internal));
677            return Vec::new();
678        };
679
680        relay_log::configure_scope(|scope| {
681            scope.set_tag("project_id", project_id);
682        });
683
684        self.process_envelope(project_id, envelope, ctx).await
685    }
686
687    async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
688        let wait_time = message.envelope.age();
689        metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
690
691        // This COGS handling may need an overhaul in the future:
692        // Cancel the passed in token, to start individual measurements per processor instead.
693        cogs.cancel();
694
695        let global_config = self.inner.global_config.current().unwrap_or_default();
696
697        let ctx = processing::Context {
698            config: &self.inner.config,
699            global_config: &global_config,
700            project_info: &message.project_info,
701            sampling_project_info: message.sampling_project_info.as_deref(),
702            rate_limits: &message.rate_limits,
703        };
704
705        let project_key = message.envelope.meta().public_key();
706        // Only allow sending to the sampling key, if we successfully loaded a sampling project
707        // info relating to it. This filters out unknown/invalid project keys as well as project
708        // keys from different organizations.
709        let sampling_key = ctx
710            .sampling_project_info
711            .and_then(|p| p.get_public_key_config())
712            .map(|pkc| pkc.public_key);
713
714        relay_log::configure_scope(|scope| {
715            scope.set_tag("project_key", project_key);
716            if let Some(sampling_key) = sampling_key {
717                scope.set_tag("sampling_key", sampling_key);
718            }
719            let meta = message.envelope.envelope().meta();
720            scope.set_tag("sdk_name", meta.client_name());
721            if let Some(client) = meta.client() {
722                scope.set_tag("sdk", client);
723            }
724            if let Some(user_agent) = meta.user_agent() {
725                scope.set_extra("user_agent", user_agent.into());
726            }
727        });
728
729        let outputs = metric!(timer(RelayTimers::EnvelopeProcessingTime), {
730            self.process(message.envelope, ctx).await
731        });
732
733        let ctx = ctx.to_forward();
734        for Output { main, metrics } in outputs {
735            if let Some(metrics) = metrics {
736                let agg = &self.inner.addrs.aggregator;
737                metrics.accept(|metrics| {
738                    send_metrics(metrics, project_key, sampling_key, agg);
739                });
740            }
741
742            if let Some(output) = main {
743                // Only counting processing time for COGS at the moment.
744                self.submit_upstream(&mut Token::noop(), output, ctx);
745            }
746        }
747    }
748
749    fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
750        let ProcessMetrics {
751            data,
752            project_key,
753            received_at,
754            sent_at,
755            source,
756        } = message;
757
758        let received_timestamp =
759            UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
760
761        let mut buckets = data.into_buckets(received_timestamp);
762        if buckets.is_empty() {
763            return;
764        };
765        cogs.update(relay_metrics::cogs::BySize(&buckets));
766
767        let clock_drift_processor =
768            ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
769
770        buckets.retain_mut(|bucket| {
771            if let Err(error) = relay_metrics::normalize_bucket(bucket) {
772                relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
773                return false;
774            }
775
776            if !self::metrics::is_valid_namespace(bucket, source) {
777                relay_log::debug!("dropping bucket in invalid namespace {bucket:?}");
778                return false;
779            }
780
781            clock_drift_processor.process_timestamp(&mut bucket.timestamp);
782
783            if !matches!(source, BucketSource::Internal) {
784                bucket.metadata = BucketMetadata::new(received_timestamp);
785            }
786
787            true
788        });
789
790        let project = self.inner.project_cache.get(project_key);
791
792        // Best effort check to filter and rate limit buckets, if there is no project state
793        // available at the current time, we will check again after flushing.
794        let buckets = match project.state() {
795            ProjectState::Enabled(project_info) => {
796                let rate_limits = project.rate_limits().current_limits();
797                self.check_buckets(project_key, project_info, &rate_limits, buckets)
798            }
799            _ => buckets,
800        };
801
802        relay_log::trace!("merging metric buckets into the aggregator");
803        self.inner
804            .addrs
805            .aggregator
806            .send(MergeBuckets::new(project_key, buckets));
807    }
808
809    fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
810        let ProcessBatchedMetrics {
811            payload,
812            source,
813            received_at,
814            sent_at,
815        } = message;
816
817        #[derive(serde::Deserialize)]
818        struct Wrapper {
819            buckets: HashMap<ProjectKey, Vec<Bucket>>,
820        }
821
822        let buckets = match serde_json::from_slice(&payload) {
823            Ok(Wrapper { buckets }) => buckets,
824            Err(error) => {
825                relay_log::debug!(
826                    error = &error as &dyn Error,
827                    "failed to parse batched metrics",
828                );
829                metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
830                return;
831            }
832        };
833
834        for (project_key, buckets) in buckets {
835            self.handle_process_metrics(
836                cogs,
837                ProcessMetrics {
838                    data: MetricData::Parsed(buckets),
839                    project_key,
840                    source,
841                    received_at,
842                    sent_at,
843                },
844            )
845        }
846    }
847
848    /// Submits a processor [`Output`] to the appropriate upstream.
849    ///
850    /// If processing is enabled, the upstream is Kafka.
851    fn submit_upstream(
852        &self,
853        cogs: &mut Token,
854        output: Outputs,
855        ctx: processing::ForwardContext<'_>,
856    ) {
857        let _submit = cogs.start_category("submit");
858
859        #[cfg(feature = "processing")]
860        if ctx.config.processing_enabled()
861            && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
862        {
863            use crate::processing::StoreHandle;
864
865            let objectstore = self.inner.addrs.objectstore.as_ref();
866            let handle = StoreHandle::new(store_forwarder, objectstore, ctx.global_config);
867
868            output
869                .forward_store(handle, ctx)
870                .unwrap_or_else(|err| err.into_inner());
871
872            return;
873        }
874
875        match output.serialize_envelope(ctx) {
876            Ok(envelope) => {
877                let envelope = ManagedEnvelope::from(envelope);
878                self.submit_envelope_upstream(envelope, ctx.project_info.upstream.clone());
879            }
880            Err(_) => relay_log::error!("failed to serialize output to an envelope"),
881        };
882    }
883
884    fn submit_envelope_upstream(
885        &self,
886        mut envelope: ManagedEnvelope,
887        // Currently allowed to be optional as code is migrated to respect the upstream override
888        // provided from the project config. Eventually must be available and is required.
889        upstream: Option<UpstreamDescriptor>,
890    ) {
891        if envelope.envelope_mut().is_empty() {
892            envelope.accept();
893            return;
894        }
895
896        // No code path should hit this.
897        //
898        // Any item which is produced by processing is handled in `submit_upstream`,
899        // metrics are sent to the store directly and outcomes must be produced to Kafka
900        // instead of being sent onward as client report.
901        if self.inner.config.processing_enabled() {
902            relay_log::error!(
903                "attempt to forward envelope to http upstream when processing is enabled"
904            );
905            return;
906        }
907
908        // Override the `sent_at` timestamp. Since the envelope went through basic
909        // normalization, all timestamps have been corrected. We propagate the new
910        // `sent_at` to allow the next Relay to double-check this timestamp and
911        // potentially apply correction again. This is done as close to sending as
912        // possible so that we avoid internal delays.
913        envelope.envelope_mut().set_sent_at(Utc::now());
914
915        relay_log::trace!("sending envelope to sentry endpoint");
916        let http_encoding = self.inner.config.http_encoding();
917        let result = envelope.envelope().to_vec().and_then(|v| {
918            encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
919        });
920
921        match result {
922            Ok(body) => {
923                self.inner
924                    .addrs
925                    .upstream_relay
926                    .send(SendRequest(SendEnvelope {
927                        upstream,
928                        envelope,
929                        body,
930                        http_encoding,
931                        project_cache: self.inner.project_cache.clone(),
932                    }));
933            }
934            Err(error) => {
935                // Errors are only logged for what we consider an internal discard reason. These
936                // indicate errors in the infrastructure or implementation bugs.
937                relay_log::error!(
938                    error = &error as &dyn Error,
939                    tags.project_key = %envelope.scoping().project_key,
940                    "failed to serialize envelope payload"
941                );
942
943                envelope.reject(Outcome::Invalid(DiscardReason::Internal));
944            }
945        }
946    }
947
948    fn handle_submit_client_reports(&self, message: SubmitClientReports) {
949        let SubmitClientReports {
950            client_reports,
951            scoping,
952        } = message;
953
954        relay_log::trace!(
955            "sending {} client report(s) to project id {}",
956            client_reports.len(),
957            scoping.project_id
958        );
959
960        if client_reports.is_empty() {
961            return;
962        }
963
964        let upstream = self.inner.config.upstream();
965        let dsn = PartialDsn::outbound(&scoping, upstream);
966
967        let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
968        for client_report in client_reports {
969            match client_report.serialize() {
970                Ok(payload) => {
971                    let mut item = Item::new(ItemType::ClientReport);
972                    item.set_payload(ContentType::Json, payload);
973                    envelope.add_item(item);
974                }
975                Err(error) => {
976                    relay_log::error!(
977                        error = &error as &dyn std::error::Error,
978                        "failed to serialize client report"
979                    );
980                }
981            }
982        }
983
984        let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
985        self.submit_envelope_upstream(envelope, None);
986    }
987
988    fn check_buckets(
989        &self,
990        project_key: ProjectKey,
991        project_info: &ProjectInfo,
992        rate_limits: &RateLimits,
993        buckets: Vec<Bucket>,
994    ) -> Vec<Bucket> {
995        let Some(scoping) = project_info.scoping(project_key) else {
996            relay_log::error!(
997                tags.project_key = project_key.as_str(),
998                "there is no scoping: dropping {} buckets",
999                buckets.len(),
1000            );
1001            return Vec::new();
1002        };
1003
1004        let mut buckets = self::metrics::apply_project_info(
1005            buckets,
1006            &self.inner.metric_outcomes,
1007            project_info,
1008            scoping,
1009        );
1010
1011        let mut namespaces: BTreeSet<MetricNamespace> = buckets
1012            .iter()
1013            .filter_map(|bucket| bucket.name.try_namespace())
1014            .collect();
1015
1016        // Never rate limit outcomes.
1017        namespaces.remove(&MetricNamespace::Outcomes);
1018
1019        for namespace in namespaces {
1020            let limits = rate_limits
1021                .check_with_quotas(project_info.get_quotas(), scoping.metric_bucket(namespace));
1022
1023            if limits.is_limited() {
1024                let rejected;
1025                (buckets, rejected) = utils::split_off(buckets, |bucket| {
1026                    bucket.name.try_namespace() == Some(namespace)
1027                });
1028
1029                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1030                self.inner.metric_outcomes.track(
1031                    scoping,
1032                    &rejected,
1033                    Outcome::RateLimited(reason_code),
1034                );
1035            }
1036        }
1037
1038        let quotas = project_info.config.quotas.clone();
1039        match MetricsLimiter::create(buckets, quotas, scoping) {
1040            Ok(mut bucket_limiter) => {
1041                bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
1042                bucket_limiter.into_buckets()
1043            }
1044            Err(buckets) => buckets,
1045        }
1046    }
1047
1048    #[cfg(feature = "processing")]
1049    async fn rate_limit_buckets(
1050        &self,
1051        scoping: Scoping,
1052        project_info: &ProjectInfo,
1053        mut buckets: Vec<Bucket>,
1054    ) -> Vec<Bucket> {
1055        let Some(rate_limiter) = &self.inner.rate_limiter else {
1056            return buckets;
1057        };
1058
1059        let global_config = self.inner.global_config.current().unwrap_or_default();
1060        let mut namespaces = buckets
1061            .iter()
1062            .filter_map(|bucket| bucket.name.try_namespace())
1063            .counts();
1064
1065        // Never rate limit outcomes.
1066        namespaces.remove(&MetricNamespace::Outcomes);
1067
1068        let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
1069
1070        for (namespace, quantity) in namespaces {
1071            let item_scoping = scoping.metric_bucket(namespace);
1072
1073            let limits = match rate_limiter
1074                .is_rate_limited(quotas, item_scoping, quantity, false)
1075                .await
1076            {
1077                Ok(limits) => limits,
1078                Err(err) => {
1079                    relay_log::error!(
1080                        error = &err as &dyn std::error::Error,
1081                        "failed to check redis rate limits"
1082                    );
1083                    break;
1084                }
1085            };
1086
1087            if limits.is_limited() {
1088                let rejected;
1089                (buckets, rejected) = utils::split_off(buckets, |bucket| {
1090                    bucket.name.try_namespace() == Some(namespace)
1091                });
1092
1093                let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1094                self.inner.metric_outcomes.track(
1095                    scoping,
1096                    &rejected,
1097                    Outcome::RateLimited(reason_code),
1098                );
1099
1100                self.inner
1101                    .project_cache
1102                    .get(item_scoping.scoping.project_key)
1103                    .rate_limits()
1104                    .merge(limits);
1105            }
1106        }
1107
1108        match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
1109            Err(buckets) => buckets,
1110            Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
1111        }
1112    }
1113
1114    /// Check and apply rate limits to metrics buckets for transactions and spans.
1115    #[cfg(feature = "processing")]
1116    async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
1117        relay_log::trace!("handle_rate_limit_buckets");
1118
1119        let scoping = *bucket_limiter.scoping();
1120
1121        if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
1122            let global_config = self.inner.global_config.current().unwrap_or_default();
1123            let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
1124
1125            // We set over_accept_once such that the limit is actually reached, which allows subsequent
1126            // calls with quantity=0 to be rate limited.
1127            let over_accept_once = true;
1128            let mut rate_limits = RateLimits::new();
1129
1130            let (category, count) = bucket_limiter.count();
1131
1132            let timer = Instant::now();
1133            let mut is_limited = false;
1134
1135            if let Some(count) = count {
1136                match rate_limiter
1137                    .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
1138                    .await
1139                {
1140                    Ok(limits) => {
1141                        is_limited = limits.is_limited();
1142                        rate_limits.merge(limits)
1143                    }
1144                    Err(e) => {
1145                        relay_log::error!(error = &e as &dyn Error, "rate limiting error")
1146                    }
1147                }
1148            }
1149
1150            relay_statsd::metric!(
1151                timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
1152                category = category.name(),
1153                limited = if is_limited { "true" } else { "false" },
1154                count = match count {
1155                    None => "none",
1156                    Some(0) => "0",
1157                    Some(1) => "1",
1158                    Some(1..=10) => "10",
1159                    Some(1..=25) => "25",
1160                    Some(1..=50) => "50",
1161                    Some(51..=100) => "100",
1162                    Some(101..=500) => "500",
1163                    _ => "> 500",
1164                },
1165            );
1166
1167            if rate_limits.is_limited() {
1168                let was_enforced =
1169                    bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
1170
1171                if was_enforced {
1172                    // Update the rate limits in the project cache.
1173                    self.inner
1174                        .project_cache
1175                        .get(scoping.project_key)
1176                        .rate_limits()
1177                        .merge(rate_limits);
1178                }
1179            }
1180        }
1181
1182        bucket_limiter.into_buckets()
1183    }
1184
1185    /// Processes metric buckets and sends them to Kafka.
1186    ///
1187    /// This function runs the following steps:
1188    ///  - rate limiting
1189    ///  - emit billing outcomes
1190    ///  - submit to `StoreForwarder`
1191    #[cfg(feature = "processing")]
1192    async fn encode_metrics_processing(
1193        &self,
1194        message: FlushBuckets,
1195        store_forwarder: &Addr<Store>,
1196    ) {
1197        use crate::constants::DEFAULT_EVENT_RETENTION;
1198        use crate::services::store::StoreMetrics;
1199        use relay_dynamic_config::Feature;
1200
1201        for ProjectBuckets {
1202            buckets,
1203            scoping,
1204            project_info,
1205            ..
1206        } in message.buckets.into_values()
1207        {
1208            let mut buckets = self
1209                .rate_limit_buckets(scoping, &project_info, buckets)
1210                .await;
1211
1212            if buckets.is_empty() {
1213                continue;
1214            }
1215
1216            if project_info
1217                .config
1218                .features
1219                .has(Feature::GenerateBillingOutcome)
1220            {
1221                // Emit metric billing outcomes.
1222                self.inner
1223                    .metric_outcomes
1224                    .track_accepted_outcome(scoping, &mut buckets);
1225            }
1226
1227            let retention = project_info
1228                .config
1229                .event_retention
1230                .unwrap_or(DEFAULT_EVENT_RETENTION);
1231
1232            // The store forwarder takes care of bucket splitting internally, so we can submit the
1233            // entire list of buckets. There is no batching needed here.
1234            store_forwarder.send(StoreMetrics {
1235                buckets,
1236                scoping,
1237                retention,
1238            });
1239        }
1240    }
1241
1242    /// Serializes metric buckets to JSON and sends them to the upstream.
1243    ///
1244    /// This function runs the following steps:
1245    ///  - partitioning
1246    ///  - batching by configured size limit
1247    ///  - serialize to JSON and pack in an envelope
1248    ///
1249    /// Rate limiting runs only in processing Relays as it requires access to the central Redis instance.
1250    /// Cached rate limits are applied in the project cache already.
1251    fn encode_metrics_envelope(&self, message: FlushBuckets) {
1252        let FlushBuckets {
1253            partition_key,
1254            buckets,
1255        } = message;
1256
1257        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
1258        let upstream = self.inner.config.upstream();
1259
1260        for ProjectBuckets {
1261            buckets,
1262            scoping,
1263            project_info,
1264            ..
1265        } in buckets.values()
1266        {
1267            let dsn = PartialDsn::outbound(scoping, upstream);
1268
1269            relay_statsd::metric!(
1270                distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
1271            );
1272
1273            let mut num_batches = 0;
1274            for batch in BucketsView::from(buckets).by_size(batch_size) {
1275                let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
1276
1277                let mut item = Item::new(ItemType::MetricBuckets);
1278                item.set_source_quantities(crate::metrics::extract_quantities(batch));
1279                item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
1280                envelope.add_item(item);
1281
1282                let mut envelope =
1283                    ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1284                envelope
1285                    .set_partition_key(Some(partition_key))
1286                    .scope(*scoping);
1287
1288                relay_statsd::metric!(
1289                    distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
1290                );
1291
1292                self.submit_envelope_upstream(envelope, project_info.upstream.clone());
1293                num_batches += 1;
1294            }
1295
1296            relay_statsd::metric!(
1297                distribution(RelayDistributions::BatchesPerPartition) = num_batches
1298            );
1299        }
1300    }
1301
1302    /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay.
1303    fn send_global_partition(
1304        &self,
1305        upstream: Option<UpstreamDescriptor>,
1306        partition_key: u32,
1307        partition: &mut Partition<'_>,
1308    ) {
1309        if partition.is_empty() {
1310            return;
1311        }
1312
1313        let (unencoded, project_info) = partition.take();
1314        let http_encoding = self.inner.config.http_encoding();
1315        let encoded = match encode_payload(&unencoded, http_encoding) {
1316            Ok(payload) => payload,
1317            Err(error) => {
1318                let error = &error as &dyn std::error::Error;
1319                relay_log::error!(error, "failed to encode metrics payload");
1320                return;
1321            }
1322        };
1323
1324        let request = SendMetricsRequest {
1325            upstream,
1326            partition_key: partition_key.to_string(),
1327            unencoded,
1328            encoded,
1329            project_info,
1330            http_encoding,
1331            metric_outcomes: self.inner.metric_outcomes.clone(),
1332        };
1333
1334        self.inner.addrs.upstream_relay.send(SendRequest(request));
1335    }
1336
1337    /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint.
1338    ///
1339    /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched
1340    /// payload directly instead of per-project Envelopes.
1341    ///
1342    /// This function runs the following steps:
1343    ///  - partitioning
1344    ///  - batching by configured size limit
1345    ///  - serialize to JSON
1346    ///  - submit directly to the upstream
1347    fn encode_metrics_global(&self, message: FlushBuckets) {
1348        let FlushBuckets {
1349            partition_key,
1350            buckets,
1351        } = message;
1352
1353        let batch_size = self.inner.config.metrics_max_batch_size_bytes();
1354        let mut partitions = BTreeMap::new();
1355        let mut partition_splits = 0;
1356
1357        for ProjectBuckets {
1358            buckets,
1359            scoping,
1360            project_info,
1361            ..
1362        } in buckets.values()
1363        {
1364            let partition = match partitions.get_mut(&project_info.upstream) {
1365                Some(partition) => partition,
1366                None => partitions
1367                    .entry(project_info.upstream.clone())
1368                    .or_insert_with(|| Partition::new(batch_size)),
1369            };
1370
1371            for bucket in buckets {
1372                let mut remaining = Some(BucketView::new(bucket));
1373
1374                while let Some(bucket) = remaining.take() {
1375                    if let Some(next) = partition.insert(bucket, *scoping) {
1376                        // A part of the bucket could not be inserted. Take the partition and submit
1377                        // it immediately. Repeat until the final part was inserted. This should
1378                        // always result in a request, otherwise we would enter an endless loop.
1379                        self.send_global_partition(
1380                            project_info.upstream.clone(),
1381                            partition_key,
1382                            partition,
1383                        );
1384                        remaining = Some(next);
1385                        partition_splits += 1;
1386                    }
1387                }
1388            }
1389        }
1390
1391        if partition_splits > 0 {
1392            metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
1393        }
1394
1395        for (upstream, mut partition) in partitions {
1396            self.send_global_partition(upstream, partition_key, &mut partition);
1397        }
1398    }
1399
1400    /// Removes all outcome metrics from `message` and sends them as client reports.
1401    ///
1402    /// Returns a new [`FlushBuckets`] message, without any outcome metrics remaining.
1403    fn encode_metrics_client_reports(&self, mut message: FlushBuckets) -> FlushBuckets {
1404        for ProjectBuckets {
1405            buckets, scoping, ..
1406        } in message.buckets.values_mut()
1407        {
1408            let client_reports = outcome::metric::extract_client_reports(buckets).collect();
1409
1410            self.handle_submit_client_reports(SubmitClientReports {
1411                client_reports,
1412                scoping: *scoping,
1413            });
1414        }
1415
1416        message
1417    }
1418
1419    async fn handle_flush_buckets(&self, mut message: FlushBuckets) {
1420        for (project_key, pb) in message.buckets.iter_mut() {
1421            let buckets = std::mem::take(&mut pb.buckets);
1422            pb.buckets =
1423                self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
1424        }
1425
1426        #[cfg(feature = "processing")]
1427        if self.inner.config.processing_enabled()
1428            && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
1429        {
1430            return self
1431                .encode_metrics_processing(message, store_forwarder)
1432                .await;
1433        }
1434
1435        // Processing Relays never send outcomes as client reports, which is why this check is after
1436        // the processing check.
1437        if self.inner.config.emit_outcomes() == EmitOutcomes::AsClientReports {
1438            // Remove client reports from metrics to be sent, if configured as client reports
1439            // and send them separately.
1440            message = self.encode_metrics_client_reports(message);
1441        }
1442
1443        if self.inner.config.http_global_metrics() {
1444            self.encode_metrics_global(message)
1445        } else {
1446            self.encode_metrics_envelope(message)
1447        }
1448    }
1449
1450    #[cfg(all(test, feature = "processing"))]
1451    fn redis_rate_limiter_enabled(&self) -> bool {
1452        self.inner.rate_limiter.is_some()
1453    }
1454
1455    async fn handle_message(self, message: EnvelopeProcessor) {
1456        let ty = message.variant();
1457        let feature_weights = self.feature_weights(&message);
1458
1459        metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
1460            let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
1461
1462            match message {
1463                EnvelopeProcessor::ProcessEnvelope(m) => {
1464                    self.handle_process_envelope(&mut cogs, *m).await
1465                }
1466                EnvelopeProcessor::ProcessProjectMetrics(m) => {
1467                    self.handle_process_metrics(&mut cogs, *m)
1468                }
1469                EnvelopeProcessor::ProcessBatchedMetrics(m) => {
1470                    self.handle_process_batched_metrics(&mut cogs, *m)
1471                }
1472                EnvelopeProcessor::FlushBuckets(m) => self.handle_flush_buckets(*m).await,
1473                EnvelopeProcessor::SubmitClientReports(m) => self.handle_submit_client_reports(*m),
1474            }
1475        });
1476    }
1477
1478    fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
1479        match message {
1480            // Envelope is split later and tokens are attributed then.
1481            EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
1482            EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
1483            EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
1484            EnvelopeProcessor::FlushBuckets(v) => v
1485                .buckets
1486                .values()
1487                .map(|s| {
1488                    if self.inner.config.processing_enabled() {
1489                        // Processing does not encode the metrics but instead rate limit the metrics,
1490                        // which scales by count and not size.
1491                        relay_metrics::cogs::ByCount(&s.buckets).into()
1492                    } else {
1493                        relay_metrics::cogs::BySize(&s.buckets).into()
1494                    }
1495                })
1496                .fold(FeatureWeights::none(), FeatureWeights::merge),
1497            EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
1498        }
1499    }
1500}
1501
1502impl Service for EnvelopeProcessorService {
1503    type Interface = EnvelopeProcessor;
1504
1505    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1506        while let Some(message) = rx.recv().await {
1507            let service = self.clone();
1508            // Create a new hub to prevent sentry scopes from bleeding to other tasks.
1509            let hub = relay_log::Hub::with(|h| relay_log::Hub::new_from_top(h));
1510
1511            self.inner
1512                .pool
1513                .spawn_async(Box::pin(service.handle_message(message).bind_hub(hub)))
1514                .await;
1515        }
1516    }
1517}
1518
1519pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
1520    let envelope_body: Vec<u8> = match http_encoding {
1521        HttpEncoding::Identity => return Ok(body.clone()),
1522        HttpEncoding::Deflate => {
1523            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
1524            encoder.write_all(body.as_ref())?;
1525            encoder.finish()?
1526        }
1527        HttpEncoding::Gzip => {
1528            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
1529            encoder.write_all(body.as_ref())?;
1530            encoder.finish()?
1531        }
1532        HttpEncoding::Br => {
1533            // Use default buffer size (via 0), medium quality (5), and the default lgwin (22).
1534            let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
1535            encoder.write_all(body.as_ref())?;
1536            encoder.into_inner()
1537        }
1538        HttpEncoding::Zstd => {
1539            // Use the fastest compression level, our main objective here is to get the best
1540            // compression ratio for least amount of time spent.
1541            let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
1542            encoder.write_all(body.as_ref())?;
1543            encoder.finish()?
1544        }
1545    };
1546
1547    Ok(envelope_body.into())
1548}
1549
1550/// An upstream request that submits an envelope via HTTP.
1551#[derive(Debug)]
1552pub struct SendEnvelope {
1553    pub upstream: Option<UpstreamDescriptor>,
1554    pub envelope: ManagedEnvelope,
1555    pub body: Bytes,
1556    pub http_encoding: HttpEncoding,
1557    pub project_cache: ProjectCacheHandle,
1558}
1559
1560impl UpstreamRequest for SendEnvelope {
1561    fn upstream(&self) -> Option<&UpstreamDescriptor> {
1562        self.upstream.as_ref()
1563    }
1564
1565    fn method(&self) -> reqwest::Method {
1566        reqwest::Method::POST
1567    }
1568
1569    fn path(&self) -> Cow<'_, str> {
1570        format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
1571    }
1572
1573    fn route(&self) -> &'static str {
1574        "envelope"
1575    }
1576
1577    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
1578        let envelope_body = self.body.clone();
1579        metric!(
1580            distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
1581        );
1582
1583        let meta = &self.envelope.meta();
1584        let shard = self.envelope.partition_key().map(|p| p.to_string());
1585        builder
1586            .content_encoding(self.http_encoding)
1587            .header_opt("Origin", meta.origin().map(|url| url.as_str()))
1588            .header_opt("User-Agent", meta.user_agent())
1589            .header("X-Sentry-Auth", meta.auth_header())
1590            .header("X-Forwarded-For", meta.forwarded_for())
1591            .header("Content-Type", envelope::CONTENT_TYPE)
1592            .header_opt("X-Sentry-Relay-Shard", shard)
1593            .body(envelope_body);
1594
1595        Ok(())
1596    }
1597
1598    fn sign(&mut self) -> Option<Sign> {
1599        Some(Sign::Optional(SignatureType::RequestSign))
1600    }
1601
1602    fn respond(
1603        self: Box<Self>,
1604        result: Result<http::Response, UpstreamRequestError>,
1605    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
1606        Box::pin(async move {
1607            let result = match result {
1608                Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
1609                Err(error) => Err(error),
1610            };
1611
1612            match result {
1613                Ok(()) => self.envelope.accept(),
1614                Err(error) if error.is_received() => {
1615                    let scoping = self.envelope.scoping();
1616                    self.envelope.accept();
1617
1618                    if let UpstreamRequestError::RateLimited(limits) = error {
1619                        self.project_cache
1620                            .get(scoping.project_key)
1621                            .rate_limits()
1622                            .merge(limits.scope(&scoping));
1623                    }
1624                }
1625                Err(error) => {
1626                    // Errors are only logged for what we consider an internal discard reason. These
1627                    // indicate errors in the infrastructure or implementation bugs.
1628                    let mut envelope = self.envelope;
1629                    envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1630                    relay_log::error!(
1631                        error = &error as &dyn Error,
1632                        tags.project_key = %envelope.scoping().project_key,
1633                        "error sending envelope"
1634                    );
1635                }
1636            }
1637        })
1638    }
1639}
1640
1641/// A container for metric buckets from multiple projects.
1642///
1643/// This container is used to send metrics to the upstream in global batches as part of the
1644/// [`FlushBuckets`] message if the `http.global_metrics` option is enabled. The container monitors
1645/// the size of all metrics and allows to split them into multiple batches. See
1646/// [`insert`](Self::insert) for more information.
1647#[derive(Debug)]
1648struct Partition<'a> {
1649    max_size: usize,
1650    remaining: usize,
1651    views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
1652    project_info: HashMap<ProjectKey, Scoping>,
1653}
1654
1655impl<'a> Partition<'a> {
1656    /// Creates a new partition with the given maximum size in bytes.
1657    pub fn new(size: usize) -> Self {
1658        Self {
1659            max_size: size,
1660            remaining: size,
1661            views: HashMap::new(),
1662            project_info: HashMap::new(),
1663        }
1664    }
1665
1666    /// Inserts a bucket into the partition, splitting it if necessary.
1667    ///
1668    /// This function attempts to add the bucket to this partition. If the bucket does not fit
1669    /// entirely into the partition given its maximum size, the remaining part of the bucket is
1670    /// returned from this function call.
1671    ///
1672    /// If this function returns `Some(_)`, the partition is full and should be submitted to the
1673    /// upstream immediately. Use [`Self::take`] to retrieve the contents of the
1674    /// partition. Afterwards, the caller is responsible to call this function again with the
1675    /// remaining bucket until it is fully inserted.
1676    pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
1677        let (current, next) = bucket.split(self.remaining, Some(self.max_size));
1678
1679        if let Some(current) = current {
1680            self.remaining = self.remaining.saturating_sub(current.estimated_size());
1681            self.views
1682                .entry(scoping.project_key)
1683                .or_default()
1684                .push(current);
1685
1686            self.project_info
1687                .entry(scoping.project_key)
1688                .or_insert(scoping);
1689        }
1690
1691        next
1692    }
1693
1694    /// Returns `true` if the partition does not hold any data.
1695    fn is_empty(&self) -> bool {
1696        self.views.is_empty()
1697    }
1698
1699    /// Returns the serialized buckets for this partition.
1700    ///
1701    /// This empties the partition, so that it can be reused.
1702    fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
1703        #[derive(serde::Serialize)]
1704        struct Wrapper<'a> {
1705            buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
1706        }
1707
1708        let buckets = &self.views;
1709        let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
1710
1711        let scopings = std::mem::take(&mut self.project_info);
1712
1713        self.views.clear();
1714        self.remaining = self.max_size;
1715
1716        (payload, scopings)
1717    }
1718}
1719
1720/// An upstream request that submits metric buckets via HTTP.
1721///
1722/// This request is not awaited. It automatically tracks outcomes if the request is not received.
1723#[derive(Debug)]
1724struct SendMetricsRequest {
1725    /// Optional upstream override where the request will be sent to.
1726    upstream: Option<UpstreamDescriptor>,
1727    /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`.
1728    partition_key: String,
1729    /// Serialized metric buckets without encoding applied, used for signing.
1730    unencoded: Bytes,
1731    /// Serialized metric buckets with the stated HTTP encoding applied.
1732    encoded: Bytes,
1733    /// Mapping of all contained project keys to their scoping and extraction mode.
1734    ///
1735    /// Used to track outcomes for transmission failures.
1736    project_info: HashMap<ProjectKey, Scoping>,
1737    /// Encoding (compression) of the payload.
1738    http_encoding: HttpEncoding,
1739    /// Metric outcomes instance to send outcomes on error.
1740    metric_outcomes: MetricOutcomes,
1741}
1742
1743impl SendMetricsRequest {
1744    fn create_error_outcomes(self) {
1745        #[derive(serde::Deserialize)]
1746        struct Wrapper {
1747            buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
1748        }
1749
1750        let buckets = match serde_json::from_slice(&self.unencoded) {
1751            Ok(Wrapper { buckets }) => buckets,
1752            Err(err) => {
1753                relay_log::error!(
1754                    error = &err as &dyn std::error::Error,
1755                    "failed to parse buckets from failed transmission"
1756                );
1757                return;
1758            }
1759        };
1760
1761        for (key, buckets) in buckets {
1762            let Some(&scoping) = self.project_info.get(&key) else {
1763                relay_log::error!("missing scoping for project key");
1764                continue;
1765            };
1766
1767            self.metric_outcomes.track(
1768                scoping,
1769                &buckets,
1770                Outcome::Invalid(DiscardReason::Internal),
1771            );
1772        }
1773    }
1774}
1775
1776impl UpstreamRequest for SendMetricsRequest {
1777    fn upstream(&self) -> Option<&UpstreamDescriptor> {
1778        self.upstream.as_ref()
1779    }
1780
1781    fn set_relay_id(&self) -> bool {
1782        true
1783    }
1784
1785    fn sign(&mut self) -> Option<Sign> {
1786        Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
1787    }
1788
1789    fn method(&self) -> reqwest::Method {
1790        reqwest::Method::POST
1791    }
1792
1793    fn path(&self) -> Cow<'_, str> {
1794        "/api/0/relays/metrics/".into()
1795    }
1796
1797    fn route(&self) -> &'static str {
1798        "global_metrics"
1799    }
1800
1801    fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
1802        metric!(
1803            distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
1804        );
1805
1806        builder
1807            .content_encoding(self.http_encoding)
1808            .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
1809            .header(header::CONTENT_TYPE, b"application/json")
1810            .body(self.encoded.clone());
1811
1812        Ok(())
1813    }
1814
1815    fn respond(
1816        self: Box<Self>,
1817        result: Result<http::Response, UpstreamRequestError>,
1818    ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
1819        Box::pin(async {
1820            match result {
1821                Ok(mut response) => {
1822                    response.consume().await.ok();
1823                }
1824                Err(error) => {
1825                    relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
1826
1827                    // If the request did not arrive at the upstream, we are responsible for outcomes.
1828                    // Otherwise, the upstream is responsible to log outcomes.
1829                    if error.is_received() {
1830                        return;
1831                    }
1832
1833                    self.create_error_outcomes()
1834                }
1835            }
1836        })
1837    }
1838}
1839
1840/// Container for global and project level [`Quota`].
1841#[derive(Copy, Clone, Debug)]
1842#[cfg(feature = "processing")]
1843struct CombinedQuotas<'a> {
1844    global_quotas: &'a [Quota],
1845    project_quotas: &'a [Quota],
1846}
1847
1848#[cfg(feature = "processing")]
1849impl<'a> CombinedQuotas<'a> {
1850    /// Returns a new [`CombinedQuotas`].
1851    pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
1852        Self {
1853            global_quotas: &global_config.quotas,
1854            project_quotas,
1855        }
1856    }
1857}
1858
1859#[cfg(feature = "processing")]
1860impl<'a> IntoIterator for CombinedQuotas<'a> {
1861    type Item = &'a Quota;
1862    type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
1863
1864    fn into_iter(self) -> Self::IntoIter {
1865        self.global_quotas.iter().chain(self.project_quotas.iter())
1866    }
1867}
1868
1869#[cfg(test)]
1870mod tests {
1871    use insta::assert_debug_snapshot;
1872    use relay_common::glob2::LazyGlob;
1873    use relay_dynamic_config::ProjectConfig;
1874    use relay_event_normalization::{
1875        NormalizationConfig, RedactionRule, TransactionNameConfig, TransactionNameRule,
1876    };
1877    use relay_event_schema::protocol::{Event, EventId, TransactionSource};
1878    use relay_pii::DataScrubbingConfig;
1879    use relay_protocol::Annotated;
1880    #[cfg(feature = "processing")]
1881    use relay_quotas::DataCategory;
1882    use similar_asserts::assert_eq;
1883
1884    use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
1885
1886    #[cfg(feature = "processing")]
1887    use {
1888        relay_metrics::BucketValue,
1889        relay_quotas::{QuotaScope, ReasonCode},
1890        relay_test::mock_service,
1891    };
1892
1893    use super::*;
1894
1895    async fn process_to_single_envelope<'a>(
1896        processor: &EnvelopeProcessorService,
1897        envelope: ManagedEnvelope,
1898        ctx: processing::Context<'a>,
1899    ) -> Box<Envelope> {
1900        let mut outputs = processor.process(envelope, ctx).await;
1901        assert_eq!(outputs.len(), 1);
1902
1903        let Output { main, metrics } = outputs.pop().unwrap();
1904
1905        if let Some(metrics) = metrics {
1906            metrics.accept(drop);
1907        }
1908
1909        main.unwrap()
1910            .serialize_envelope(ctx.to_forward())
1911            .unwrap()
1912            .accept(|envelope| envelope)
1913    }
1914
1915    #[cfg(feature = "processing")]
1916    fn mock_quota(id: &str) -> Quota {
1917        Quota {
1918            id: Some(id.into()),
1919            categories: [DataCategory::MetricBucket].into(),
1920            scope: QuotaScope::Organization,
1921            scope_id: None,
1922            limit: Some(0),
1923            window: None,
1924            reason_code: None,
1925            namespace: None,
1926        }
1927    }
1928
1929    #[cfg(feature = "processing")]
1930    #[test]
1931    fn test_dynamic_quotas() {
1932        let global_config = relay_dynamic_config::GlobalConfig {
1933            quotas: vec![mock_quota("foo"), mock_quota("bar")],
1934            ..Default::default()
1935        };
1936
1937        let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
1938
1939        let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
1940
1941        let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
1942        assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
1943    }
1944
1945    /// Ensures that if we ratelimit one batch of buckets in [`FlushBuckets`] message, it won't
1946    /// also ratelimit the next batches in the same message automatically.
1947    #[cfg(feature = "processing")]
1948    #[tokio::test]
1949    async fn test_ratelimit_per_batch() {
1950        use relay_base_schema::organization::OrganizationId;
1951        use relay_protocol::FiniteF64;
1952
1953        let rate_limited_org = Scoping {
1954            organization_id: OrganizationId::new(1),
1955            project_id: ProjectId::new(21),
1956            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
1957            key_id: Some(17),
1958        };
1959
1960        let not_rate_limited_org = Scoping {
1961            organization_id: OrganizationId::new(2),
1962            project_id: ProjectId::new(21),
1963            project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
1964            key_id: Some(17),
1965        };
1966
1967        let message = {
1968            let project_info = {
1969                let quota = Quota {
1970                    id: Some("testing".into()),
1971                    categories: [DataCategory::MetricBucket].into(),
1972                    scope: relay_quotas::QuotaScope::Organization,
1973                    scope_id: Some(rate_limited_org.organization_id.to_string().into()),
1974                    limit: Some(0),
1975                    window: None,
1976                    reason_code: Some(ReasonCode::new("test")),
1977                    namespace: None,
1978                };
1979
1980                let mut config = ProjectConfig::default();
1981                config.quotas.push(quota);
1982
1983                Arc::new(ProjectInfo {
1984                    config,
1985                    ..Default::default()
1986                })
1987            };
1988
1989            let project_metrics = |scoping| ProjectBuckets {
1990                buckets: vec![Bucket {
1991                    name: "d:spans/bar".into(),
1992                    value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
1993                    timestamp: UnixTimestamp::now(),
1994                    tags: Default::default(),
1995                    width: 10,
1996                    metadata: BucketMetadata::default(),
1997                }],
1998                rate_limits: Default::default(),
1999                project_info: project_info.clone(),
2000                scoping,
2001            };
2002
2003            let buckets = hashbrown::HashMap::from([
2004                (
2005                    rate_limited_org.project_key,
2006                    project_metrics(rate_limited_org),
2007                ),
2008                (
2009                    not_rate_limited_org.project_key,
2010                    project_metrics(not_rate_limited_org),
2011                ),
2012            ]);
2013
2014            FlushBuckets {
2015                partition_key: 0,
2016                buckets,
2017            }
2018        };
2019
2020        // ensure the order of the map while iterating is as expected.
2021        assert_eq!(message.buckets.keys().count(), 2);
2022
2023        let config = {
2024            let config_json = serde_json::json!({
2025                "processing": {
2026                    "enabled": true,
2027                    "kafka_config": [],
2028                    "redis": {
2029                        "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
2030                    }
2031                }
2032            });
2033            Config::from_json_value(config_json).unwrap()
2034        };
2035
2036        let (store, handle) = {
2037            let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
2038                let org_id = match msg {
2039                    Store::Metrics(x) => x.scoping.organization_id,
2040                    _ => panic!("received envelope when expecting only metrics"),
2041                };
2042                org_ids.push(org_id);
2043            };
2044
2045            mock_service("store_forwarder", vec![], f)
2046        };
2047
2048        let processor = create_test_processor(config).await;
2049        assert!(processor.redis_rate_limiter_enabled());
2050
2051        processor.encode_metrics_processing(message, &store).await;
2052
2053        drop(store);
2054        let orgs_not_ratelimited = handle.await.unwrap();
2055
2056        assert_eq!(
2057            orgs_not_ratelimited,
2058            vec![not_rate_limited_org.organization_id]
2059        );
2060    }
2061
2062    #[tokio::test]
2063    async fn test_browser_version_extraction_with_pii_like_data() {
2064        let processor = create_test_processor(Default::default()).await;
2065        let outcome_aggregator = Addr::dummy();
2066        let event_id = EventId::new();
2067
2068        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2069            .parse()
2070            .unwrap();
2071
2072        let request_meta = RequestMeta::new(dsn);
2073        let mut envelope = Envelope::from_request(Some(event_id), request_meta);
2074
2075        envelope.add_item({
2076                let mut item = Item::new(ItemType::Event);
2077                item.set_payload(
2078                    ContentType::Json,
2079                    r#"
2080                    {
2081                        "request": {
2082                            "headers": [
2083                                ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
2084                            ]
2085                        }
2086                    }
2087                "#,
2088                );
2089                item
2090            });
2091
2092        let mut datascrubbing_settings = DataScrubbingConfig::default();
2093        // enable all the default scrubbing
2094        datascrubbing_settings.scrub_data = true;
2095        datascrubbing_settings.scrub_defaults = true;
2096        datascrubbing_settings.scrub_ip_addresses = true;
2097
2098        // Make sure to mask any IP-like looking data
2099        let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
2100
2101        let config = ProjectConfig {
2102            datascrubbing_settings,
2103            pii_config: Some(pii_config),
2104            ..Default::default()
2105        };
2106
2107        let project_info = ProjectInfo {
2108            config,
2109            ..Default::default()
2110        };
2111
2112        let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2113
2114        let ctx = processing::Context {
2115            project_info: &project_info,
2116            ..processing::Context::for_test()
2117        };
2118
2119        let new_envelope = process_to_single_envelope(&processor, envelope, ctx).await;
2120
2121        let event_item = new_envelope.items().last().unwrap();
2122        let annotated_event: Annotated<Event> =
2123            Annotated::from_json_bytes(&event_item.payload()).unwrap();
2124        let event = annotated_event.into_value().unwrap();
2125        let headers = event
2126            .request
2127            .into_value()
2128            .unwrap()
2129            .headers
2130            .into_value()
2131            .unwrap();
2132
2133        // IP-like data must be masked
2134        assert_eq!(
2135            Some(
2136                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
2137            ),
2138            headers.get_header("User-Agent")
2139        );
2140        // But we still get correct browser and version number
2141        let contexts = event.contexts.into_value().unwrap();
2142        let browser = contexts.0.get("browser").unwrap();
2143        assert_eq!(
2144            r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
2145            browser.to_json().unwrap()
2146        );
2147    }
2148
2149    #[tokio::test]
2150    #[cfg(feature = "processing")]
2151    async fn test_materialize_dsc() {
2152        use crate::services::projects::project::PublicKeyConfig;
2153
2154        let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2155            .parse()
2156            .unwrap();
2157        let request_meta = RequestMeta::new(dsn);
2158        let mut envelope = Envelope::from_request(None, request_meta);
2159
2160        let dsc = r#"{
2161            "trace_id": "00000000-0000-0000-0000-000000000001",
2162            "public_key": "e12d836b15bb49d7bbf99e64295d995b",
2163            "sample_rate": "0.2"
2164        }"#;
2165        envelope.set_dsc(serde_json::from_str(dsc).unwrap());
2166
2167        let mut item = Item::new(ItemType::Event);
2168        item.set_payload(ContentType::Json, r#"{}"#);
2169        envelope.add_item(item);
2170
2171        let outcome_aggregator = Addr::dummy();
2172        let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2173
2174        let mut project_info = ProjectInfo::default();
2175        project_info.public_keys.push(PublicKeyConfig {
2176            public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
2177            numeric_id: Some(1),
2178        });
2179
2180        let config = serde_json::json!({
2181            "processing": {
2182                "enabled": true,
2183                "kafka_config": [],
2184            }
2185        });
2186
2187        let processor =
2188            create_test_processor(Config::from_json_value(config.clone()).unwrap()).await;
2189        let config = Config::from_json_value(config).unwrap();
2190        let ctx = processing::Context {
2191            config: &config,
2192            project_info: &project_info,
2193            sampling_project_info: Some(&project_info),
2194            ..processing::Context::for_test()
2195        };
2196
2197        let envelope = process_to_single_envelope(&processor, managed_envelope, ctx).await;
2198        let event = envelope
2199            .get_item_by(|item| item.ty() == &ItemType::Event)
2200            .unwrap();
2201
2202        let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
2203        insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
2204        Object(
2205            {
2206                "environment": ~,
2207                "public_key": String(
2208                    "e12d836b15bb49d7bbf99e64295d995b",
2209                ),
2210                "release": ~,
2211                "replay_id": ~,
2212                "sample_rate": String(
2213                    "0.2",
2214                ),
2215                "trace_id": String(
2216                    "00000000000000000000000000000001",
2217                ),
2218                "transaction": ~,
2219            },
2220        )
2221        "###);
2222    }
2223
2224    fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
2225        let mut event = Annotated::<Event>::from_json(
2226            r#"
2227            {
2228                "type": "transaction",
2229                "transaction": "/foo/",
2230                "timestamp": 946684810.0,
2231                "start_timestamp": 946684800.0,
2232                "contexts": {
2233                    "trace": {
2234                        "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
2235                        "span_id": "fa90fdead5f74053",
2236                        "op": "http.server",
2237                        "type": "trace"
2238                    }
2239                },
2240                "transaction_info": {
2241                    "source": "url"
2242                }
2243            }
2244            "#,
2245        )
2246        .unwrap();
2247        let e = event.value_mut().as_mut().unwrap();
2248        e.transaction.set_value(Some(transaction_name.into()));
2249
2250        e.transaction_info
2251            .value_mut()
2252            .as_mut()
2253            .unwrap()
2254            .source
2255            .set_value(Some(source));
2256
2257        relay_statsd::with_capturing_test_client(|| {
2258            utils::log_transaction_name_metrics(&mut event, |event| {
2259                let config = NormalizationConfig {
2260                    transaction_name_config: TransactionNameConfig {
2261                        rules: &[TransactionNameRule {
2262                            pattern: LazyGlob::new("/foo/*/**".to_owned()),
2263                            expiry: DateTime::<Utc>::MAX_UTC,
2264                            redaction: RedactionRule::Replace {
2265                                substitution: "*".to_owned(),
2266                            },
2267                        }],
2268                    },
2269                    ..Default::default()
2270                };
2271                relay_event_normalization::normalize_event(event, &config)
2272            });
2273        })
2274    }
2275
2276    #[test]
2277    fn test_log_transaction_metrics_none() {
2278        let captures = capture_test_event("/nothing", TransactionSource::Url);
2279        insta::assert_debug_snapshot!(captures, @r###"
2280        [
2281            "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
2282        ]
2283        "###);
2284    }
2285
2286    #[test]
2287    fn test_log_transaction_metrics_rule() {
2288        let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
2289        insta::assert_debug_snapshot!(captures, @r###"
2290        [
2291            "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
2292        ]
2293        "###);
2294    }
2295
2296    #[test]
2297    fn test_log_transaction_metrics_pattern() {
2298        let captures = capture_test_event("/something/12345", TransactionSource::Url);
2299        insta::assert_debug_snapshot!(captures, @r###"
2300        [
2301            "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
2302        ]
2303        "###);
2304    }
2305
2306    #[test]
2307    fn test_log_transaction_metrics_both() {
2308        let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
2309        insta::assert_debug_snapshot!(captures, @r###"
2310        [
2311            "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
2312        ]
2313        "###);
2314    }
2315
2316    #[test]
2317    fn test_log_transaction_metrics_no_match() {
2318        let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
2319        insta::assert_debug_snapshot!(captures, @r###"
2320        [
2321            "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
2322        ]
2323        "###);
2324    }
2325
2326    #[tokio::test]
2327    async fn test_process_metrics_bucket_metadata() {
2328        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
2329        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
2330        let received_at = Utc::now();
2331        let config = Config::default();
2332
2333        let (aggregator, mut aggregator_rx) = Addr::custom();
2334        let processor = create_test_processor_with_addrs(
2335            config,
2336            Addrs {
2337                aggregator,
2338                ..Default::default()
2339            },
2340        )
2341        .await;
2342
2343        let mut item = Item::new(ItemType::Statsd);
2344        item.set_payload(ContentType::Text, "spans/foo:3182887624:4267882815|s");
2345        for (source, expected_received_at) in [
2346            (
2347                BucketSource::External,
2348                Some(UnixTimestamp::from_datetime(received_at).unwrap()),
2349            ),
2350            (BucketSource::Internal, None),
2351        ] {
2352            let message = ProcessMetrics {
2353                data: MetricData::Raw(vec![item.clone()]),
2354                project_key,
2355                source,
2356                received_at,
2357                sent_at: Some(Utc::now()),
2358            };
2359            processor.handle_process_metrics(&mut token, message);
2360
2361            let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
2362            let buckets = merge_buckets.buckets;
2363            assert_eq!(buckets.len(), 1);
2364            assert_eq!(buckets[0].metadata.received_at, expected_received_at);
2365        }
2366    }
2367
2368    #[tokio::test]
2369    async fn test_process_batched_metrics() {
2370        let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
2371        let received_at = Utc::now();
2372        let config = Config::default();
2373
2374        let (aggregator, mut aggregator_rx) = Addr::custom();
2375        let processor = create_test_processor_with_addrs(
2376            config,
2377            Addrs {
2378                aggregator,
2379                ..Default::default()
2380            },
2381        )
2382        .await;
2383
2384        let payload = r#"{
2385    "buckets": {
2386        "11111111111111111111111111111111": [
2387            {
2388                "timestamp": 1615889440,
2389                "width": 0,
2390                "name": "d:custom/endpoint.response_time@millisecond",
2391                "type": "d",
2392                "value": [
2393                  68.0
2394                ],
2395                "tags": {
2396                  "route": "user_index"
2397                }
2398            }
2399        ],
2400        "22222222222222222222222222222222": [
2401            {
2402                "timestamp": 1615889440,
2403                "width": 0,
2404                "name": "d:custom/endpoint.cache_rate@none",
2405                "type": "d",
2406                "value": [
2407                  36.0
2408                ]
2409            }
2410        ]
2411    }
2412}
2413"#;
2414        let message = ProcessBatchedMetrics {
2415            payload: Bytes::from(payload),
2416            source: BucketSource::Internal,
2417            received_at,
2418            sent_at: Some(Utc::now()),
2419        };
2420        processor.handle_process_batched_metrics(&mut token, message);
2421
2422        let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
2423        let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
2424
2425        let mut messages = vec![mb1, mb2];
2426        messages.sort_by_key(|mb| mb.project_key);
2427
2428        let actual = messages
2429            .into_iter()
2430            .map(|mb| (mb.project_key, mb.buckets))
2431            .collect::<Vec<_>>();
2432
2433        assert_debug_snapshot!(actual, @r###"
2434        [
2435            (
2436                ProjectKey("11111111111111111111111111111111"),
2437                [
2438                    Bucket {
2439                        timestamp: UnixTimestamp(1615889440),
2440                        width: 0,
2441                        name: MetricName(
2442                            "d:custom/endpoint.response_time@millisecond",
2443                        ),
2444                        value: Distribution(
2445                            [
2446                                68.0,
2447                            ],
2448                        ),
2449                        tags: {
2450                            "route": "user_index",
2451                        },
2452                        metadata: BucketMetadata {
2453                            merges: 1,
2454                            received_at: None,
2455                            extracted_from_indexed: false,
2456                        },
2457                    },
2458                ],
2459            ),
2460            (
2461                ProjectKey("22222222222222222222222222222222"),
2462                [
2463                    Bucket {
2464                        timestamp: UnixTimestamp(1615889440),
2465                        width: 0,
2466                        name: MetricName(
2467                            "d:custom/endpoint.cache_rate@none",
2468                        ),
2469                        value: Distribution(
2470                            [
2471                                36.0,
2472                            ],
2473                        ),
2474                        tags: {},
2475                        metadata: BucketMetadata {
2476                            merges: 1,
2477                            received_at: None,
2478                            extracted_from_indexed: false,
2479                        },
2480                    },
2481                ],
2482            ),
2483        ]
2484        "###);
2485    }
2486}