1use std::borrow::Cow;
2use std::collections::{BTreeMap, BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::Debug;
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::future::BoxFuture;
18use relay_base_schema::project::{ProjectId, ProjectKey};
19use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
20use relay_common::time::UnixTimestamp;
21use relay_config::{Config, EmitOutcomes, HttpEncoding, UpstreamDescriptor};
22use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
23use relay_event_schema::processor::ProcessingAction;
24use relay_event_schema::protocol::ClientReport;
25use relay_filter::FilterStatKey;
26use relay_log::sentry::SentryFutureExt;
27use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
28use relay_quotas::{RateLimits, Scoping};
29use relay_sampling::evaluation::SamplingDecision;
30use relay_statsd::metric;
31use relay_system::{Addr, FromMessage, NoResponse, Service};
32use reqwest::header;
33use zstd::stream::Encoder as ZstdEncoder;
34
35use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType};
36use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
37use crate::managed::ManagedEnvelope;
38use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
39use crate::metrics_extraction::ExtractedMetrics;
40use crate::processing::errors::SwitchProcessingError;
41use crate::processing::relay::RelayProcessor;
42use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
43use crate::service::ServiceError;
44use crate::services::global_config::GlobalConfigHandle;
45use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
46use crate::services::outcome::{self, DiscardItemType, DiscardReason, Outcome, TrackOutcome};
47use crate::services::projects::cache::ProjectCacheHandle;
48use crate::services::projects::project::{ProjectInfo, ProjectState};
49use crate::services::upstream::{
50 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
51};
52use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
53use crate::utils;
54use crate::{http, processing};
55use relay_threading::AsyncPool;
56use symbolic_unreal::{Unreal4Error, Unreal4ErrorKind};
57#[cfg(feature = "processing")]
58use {
59 crate::services::objectstore::Objectstore,
60 crate::services::store::Store,
61 itertools::Itertools,
62 relay_dynamic_config::GlobalConfig,
63 relay_quotas::{Quota, RateLimitingError, RedisRateLimiter},
64 relay_redis::RedisClients,
65 std::time::Instant,
66};
67
68mod metrics;
69
70pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
72
73#[derive(Debug, thiserror::Error)]
75pub enum ProcessingError {
76 #[error("invalid json in event")]
77 InvalidJson(#[source] serde_json::Error),
78
79 #[error("invalid message pack event payload")]
80 InvalidMsgpack(#[from] rmp_serde::decode::Error),
81
82 #[error("invalid unreal crash report")]
83 InvalidUnrealReport(#[source] Unreal4Error),
84
85 #[error("event payload too large")]
86 PayloadTooLarge(DiscardItemType),
87
88 #[error("invalid transaction event")]
89 InvalidTransaction,
90
91 #[error("the item is not allowed/supported in this envelope")]
92 UnsupportedItem,
93
94 #[error("envelope processor failed")]
95 ProcessingFailed(#[from] ProcessingAction),
96
97 #[error("duplicate {0} in event")]
98 DuplicateItem(ItemType),
99
100 #[error("failed to extract event payload")]
101 NoEventPayload,
102
103 #[error("invalid security report type: {0:?}")]
104 InvalidSecurityType(Bytes),
105
106 #[error("unsupported security report type")]
107 UnsupportedSecurityType,
108
109 #[error("invalid security report")]
110 InvalidSecurityReport(#[source] serde_json::Error),
111
112 #[error("event filtered with reason: {0:?}")]
113 EventFiltered(FilterStatKey),
114
115 #[error("could not serialize event payload")]
116 SerializeFailed(#[source] serde_json::Error),
117
118 #[cfg(feature = "processing")]
119 #[error("failed to apply quotas")]
120 QuotasFailed(#[from] RateLimitingError),
121
122 #[error("nintendo switch dying message processing failed {0:?}")]
123 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
124
125 #[cfg(all(sentry, feature = "processing"))]
126 #[error("playstation dump processing failed: {0}")]
127 InvalidPlaystationDump(String),
128
129 #[cfg(feature = "processing")]
130 #[error("invalid attachment reference")]
131 InvalidAttachmentRef,
132}
133
134impl ProcessingError {
135 pub fn to_outcome(&self) -> Option<Outcome> {
136 match self {
137 Self::PayloadTooLarge(payload_type) => {
138 Some(Outcome::Invalid(DiscardReason::ItemTooLarge(*payload_type)))
139 }
140 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
141 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
142 Self::InvalidSecurityType(_) => {
143 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
144 }
145 Self::UnsupportedItem => Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)),
146 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
147 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
148 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
149 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
150 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
151 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
152 #[cfg(all(sentry, feature = "processing"))]
153 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
154 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
155 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
156 }
157 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
158 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
159 Some(Outcome::Invalid(DiscardReason::Internal))
160 }
161 #[cfg(feature = "processing")]
162 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
163 Self::EventFiltered(key) => Some(Outcome::Filtered(key.clone())),
164
165 #[cfg(feature = "processing")]
166 Self::InvalidAttachmentRef => {
167 Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
168 }
169 }
170 }
171}
172
173impl From<Unreal4Error> for ProcessingError {
174 fn from(err: Unreal4Error) -> Self {
175 match err.kind() {
176 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
177 _ => ProcessingError::InvalidUnrealReport(err),
178 }
179 }
180}
181
182#[derive(Debug)]
187pub struct ProcessingExtractedMetrics {
188 metrics: ExtractedMetrics,
189}
190
191impl ProcessingExtractedMetrics {
192 pub fn new() -> Self {
193 Self {
194 metrics: ExtractedMetrics::default(),
195 }
196 }
197
198 pub fn into_inner(self) -> ExtractedMetrics {
199 self.metrics
200 }
201
202 pub fn extend(
204 &mut self,
205 extracted: ExtractedMetrics,
206 sampling_decision: Option<SamplingDecision>,
207 ) {
208 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
209 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
210 }
211
212 pub fn extend_project_metrics<I>(
214 &mut self,
215 buckets: I,
216 sampling_decision: Option<SamplingDecision>,
217 ) where
218 I: IntoIterator<Item = Bucket>,
219 {
220 self.metrics
221 .project_metrics
222 .extend(buckets.into_iter().map(|mut bucket| {
223 bucket.metadata.extracted_from_indexed =
224 sampling_decision == Some(SamplingDecision::Keep);
225 bucket
226 }));
227 }
228
229 pub fn extend_sampling_metrics<I>(
231 &mut self,
232 buckets: I,
233 sampling_decision: Option<SamplingDecision>,
234 ) where
235 I: IntoIterator<Item = Bucket>,
236 {
237 self.metrics
238 .sampling_metrics
239 .extend(buckets.into_iter().map(|mut bucket| {
240 bucket.metadata.extracted_from_indexed =
241 sampling_decision == Some(SamplingDecision::Keep);
242 bucket
243 }));
244 }
245}
246
247fn send_metrics(
248 metrics: ExtractedMetrics,
249 project_key: ProjectKey,
250 sampling_key: Option<ProjectKey>,
251 aggregator: &Addr<Aggregator>,
252) {
253 let ExtractedMetrics {
254 project_metrics,
255 sampling_metrics,
256 } = metrics;
257
258 if !project_metrics.is_empty() {
259 aggregator.send(MergeBuckets {
260 project_key,
261 buckets: project_metrics,
262 });
263 }
264
265 if !sampling_metrics.is_empty() {
266 let sampling_project_key = sampling_key.unwrap_or(project_key);
273 aggregator.send(MergeBuckets {
274 project_key: sampling_project_key,
275 buckets: sampling_metrics,
276 });
277 }
278}
279
280#[derive(Debug)]
290pub struct ProcessEnvelope {
291 pub envelope: ManagedEnvelope,
293 pub project_info: Arc<ProjectInfo>,
295 pub rate_limits: Arc<RateLimits>,
297 pub sampling_project_info: Option<Arc<ProjectInfo>>,
299}
300
301#[derive(Debug)]
313pub struct ProcessMetrics {
314 pub data: MetricData,
316 pub project_key: ProjectKey,
318 pub source: BucketSource,
320 pub received_at: DateTime<Utc>,
322 pub sent_at: Option<DateTime<Utc>>,
325}
326
327#[derive(Debug)]
329pub enum MetricData {
330 Raw(Vec<Item>),
332 Parsed(Vec<Bucket>),
334}
335
336impl MetricData {
337 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
342 let items = match self {
343 Self::Parsed(buckets) => return buckets,
344 Self::Raw(items) => items,
345 };
346
347 let mut buckets = Vec::new();
348 for item in items {
349 let payload = item.payload();
350 if item.ty() == &ItemType::Statsd {
351 for bucket_result in Bucket::parse_all(&payload, timestamp) {
352 match bucket_result {
353 Ok(bucket) => buckets.push(bucket),
354 Err(error) => relay_log::debug!(
355 error = &error as &dyn Error,
356 "failed to parse metric bucket from statsd format",
357 ),
358 }
359 }
360 } else if item.ty() == &ItemType::MetricBuckets {
361 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
362 Ok(parsed_buckets) => {
363 if buckets.is_empty() {
365 buckets = parsed_buckets;
366 } else {
367 buckets.extend(parsed_buckets);
368 }
369 }
370 Err(error) => {
371 relay_log::debug!(
372 error = &error as &dyn Error,
373 "failed to parse metric bucket",
374 );
375 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
376 }
377 }
378 } else {
379 relay_log::error!(
380 "invalid item of type {} passed to ProcessMetrics",
381 item.ty()
382 );
383 }
384 }
385 buckets
386 }
387}
388
389#[derive(Debug)]
390pub struct ProcessBatchedMetrics {
391 pub payload: Bytes,
393 pub source: BucketSource,
395 pub received_at: DateTime<Utc>,
397 pub sent_at: Option<DateTime<Utc>>,
399}
400
401#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
403pub enum BucketSource {
404 Internal,
410 External,
415}
416
417impl BucketSource {
418 pub fn from_meta(meta: &RequestMeta) -> Self {
420 match meta.request_trust() {
421 RequestTrust::Trusted => Self::Internal,
422 RequestTrust::Untrusted => Self::External,
423 }
424 }
425}
426
427#[derive(Debug)]
429pub struct SubmitClientReports {
430 pub client_reports: Vec<ClientReport>,
432 pub scoping: Scoping,
434}
435
436#[derive(Debug)]
438pub enum EnvelopeProcessor {
439 ProcessEnvelope(Box<ProcessEnvelope>),
440 ProcessProjectMetrics(Box<ProcessMetrics>),
441 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
442 FlushBuckets(Box<FlushBuckets>),
443 SubmitClientReports(Box<SubmitClientReports>),
444}
445
446impl EnvelopeProcessor {
447 pub fn variant(&self) -> &'static str {
449 match self {
450 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
451 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
452 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
453 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
454 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
455 }
456 }
457}
458
459impl relay_system::Interface for EnvelopeProcessor {}
460
461impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
462 type Response = relay_system::NoResponse;
463
464 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
465 Self::ProcessEnvelope(Box::new(message))
466 }
467}
468
469impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
470 type Response = NoResponse;
471
472 fn from_message(message: ProcessMetrics, _: ()) -> Self {
473 Self::ProcessProjectMetrics(Box::new(message))
474 }
475}
476
477impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
478 type Response = NoResponse;
479
480 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
481 Self::ProcessBatchedMetrics(Box::new(message))
482 }
483}
484
485impl FromMessage<FlushBuckets> for EnvelopeProcessor {
486 type Response = NoResponse;
487
488 fn from_message(message: FlushBuckets, _: ()) -> Self {
489 Self::FlushBuckets(Box::new(message))
490 }
491}
492
493impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
494 type Response = NoResponse;
495
496 fn from_message(message: SubmitClientReports, _: ()) -> Self {
497 Self::SubmitClientReports(Box::new(message))
498 }
499}
500
501pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
503
504#[derive(Clone)]
508pub struct EnvelopeProcessorService {
509 inner: Arc<InnerProcessor>,
510}
511
512pub struct Addrs {
514 pub outcome_aggregator: Addr<TrackOutcome>,
515 pub upstream_relay: Addr<UpstreamRelay>,
516 #[cfg(feature = "processing")]
517 pub objectstore: Option<Addr<Objectstore>>,
518 #[cfg(feature = "processing")]
519 pub store_forwarder: Option<Addr<Store>>,
520 pub aggregator: Addr<Aggregator>,
521}
522
523impl Default for Addrs {
524 fn default() -> Self {
525 Addrs {
526 outcome_aggregator: Addr::dummy(),
527 upstream_relay: Addr::dummy(),
528 #[cfg(feature = "processing")]
529 objectstore: None,
530 #[cfg(feature = "processing")]
531 store_forwarder: None,
532 aggregator: Addr::dummy(),
533 }
534 }
535}
536
537struct InnerProcessor {
538 pool: EnvelopeProcessorServicePool,
539 config: Arc<Config>,
540 global_config: GlobalConfigHandle,
541 project_cache: ProjectCacheHandle,
542 cogs: Cogs,
543 addrs: Addrs,
544 #[cfg(feature = "processing")]
545 rate_limiter: Option<Arc<RedisRateLimiter>>,
546 metric_outcomes: MetricOutcomes,
547 processor: RelayProcessor,
548}
549
550impl EnvelopeProcessorService {
551 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
553 pub fn new(
554 pool: EnvelopeProcessorServicePool,
555 config: Arc<Config>,
556 global_config: GlobalConfigHandle,
557 project_cache: ProjectCacheHandle,
558 cogs: Cogs,
559 #[cfg(feature = "processing")] redis: Option<RedisClients>,
560 addrs: Addrs,
561 metric_outcomes: MetricOutcomes,
562 ) -> Self {
563 let geoip_lookup = config
564 .geoip_path()
565 .and_then(
566 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
567 Ok(geoip) => Some(geoip),
568 Err(err) => {
569 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
570 None
571 }
572 },
573 )
574 .unwrap_or_else(GeoIpLookup::empty);
575
576 if let Some(build_epoch) = geoip_lookup.build_epoch() {
577 relay_log::info!("Loaded GeoIP database (build: {build_epoch})");
578 }
579
580 #[cfg(feature = "processing")]
581 let rate_limiter = redis.map(|redis| {
582 RedisRateLimiter::new(redis.quotas)
583 .max_limit(config.max_rate_limit())
584 .cache(config.quota_cache_ratio(), config.quota_cache_max())
585 });
586
587 let quota_limiter = Arc::new(QuotaRateLimiter::new(
588 #[cfg(feature = "processing")]
589 project_cache.clone(),
590 #[cfg(feature = "processing")]
591 rate_limiter.clone(),
592 ));
593 #[cfg(feature = "processing")]
594 let rate_limiter = rate_limiter.map(Arc::new);
595 let inner = InnerProcessor {
596 pool,
597 global_config,
598 project_cache,
599 #[cfg(feature = "processing")]
600 rate_limiter,
601 processor: RelayProcessor::new(
602 cogs.clone(),
603 "a_limiter,
604 &geoip_lookup,
605 addrs.outcome_aggregator.clone(),
606 ),
607 cogs,
608 addrs,
609 metric_outcomes,
610 config,
611 };
612
613 Self {
614 inner: Arc::new(inner),
615 }
616 }
617
618 async fn process_envelope(
619 &self,
620 project_id: ProjectId,
621 mut envelope: ManagedEnvelope,
622 ctx: processing::Context<'_>,
623 ) -> Vec<Output<Outputs>> {
624 if let Some(sampling_state) = ctx.sampling_project_info {
626 envelope
629 .envelope_mut()
630 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
631 }
632
633 if let Some(retention) = ctx.project_info.config.event_retention {
636 envelope.envelope_mut().set_retention(retention);
637 }
638
639 envelope
644 .envelope_mut()
645 .meta_mut()
646 .set_project_id(project_id);
647
648 self.inner.processor.run(envelope, ctx).await
649 }
650
651 async fn process<'a>(
657 &self,
658 mut envelope: ManagedEnvelope,
659 ctx: processing::Context<'a>,
660 ) -> Vec<Output<Outputs>> {
661 let Some(project_id) = ctx
668 .project_info
669 .project_id
670 .or_else(|| envelope.envelope().meta().project_id())
671 else {
672 relay_log::error!(
673 tags.project_key = %envelope.envelope().meta().public_key(),
674 "project info does not contain project id"
675 );
676 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
677 return Vec::new();
678 };
679
680 relay_log::configure_scope(|scope| {
681 scope.set_tag("project_id", project_id);
682 });
683
684 self.process_envelope(project_id, envelope, ctx).await
685 }
686
687 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
688 let wait_time = message.envelope.age();
689 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
690
691 cogs.cancel();
694
695 let global_config = self.inner.global_config.current().unwrap_or_default();
696
697 let ctx = processing::Context {
698 config: &self.inner.config,
699 global_config: &global_config,
700 project_info: &message.project_info,
701 sampling_project_info: message.sampling_project_info.as_deref(),
702 rate_limits: &message.rate_limits,
703 };
704
705 let project_key = message.envelope.meta().public_key();
706 let sampling_key = ctx
710 .sampling_project_info
711 .and_then(|p| p.get_public_key_config())
712 .map(|pkc| pkc.public_key);
713
714 relay_log::configure_scope(|scope| {
715 scope.set_tag("project_key", project_key);
716 if let Some(sampling_key) = sampling_key {
717 scope.set_tag("sampling_key", sampling_key);
718 }
719 let meta = message.envelope.envelope().meta();
720 scope.set_tag("sdk_name", meta.client_name());
721 if let Some(client) = meta.client() {
722 scope.set_tag("sdk", client);
723 }
724 if let Some(user_agent) = meta.user_agent() {
725 scope.set_extra("user_agent", user_agent.into());
726 }
727 });
728
729 let outputs = metric!(timer(RelayTimers::EnvelopeProcessingTime), {
730 self.process(message.envelope, ctx).await
731 });
732
733 let ctx = ctx.to_forward();
734 for Output { main, metrics } in outputs {
735 if let Some(metrics) = metrics {
736 let agg = &self.inner.addrs.aggregator;
737 metrics.accept(|metrics| {
738 send_metrics(metrics, project_key, sampling_key, agg);
739 });
740 }
741
742 if let Some(output) = main {
743 self.submit_upstream(&mut Token::noop(), output, ctx);
745 }
746 }
747 }
748
749 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
750 let ProcessMetrics {
751 data,
752 project_key,
753 received_at,
754 sent_at,
755 source,
756 } = message;
757
758 let received_timestamp =
759 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
760
761 let mut buckets = data.into_buckets(received_timestamp);
762 if buckets.is_empty() {
763 return;
764 };
765 cogs.update(relay_metrics::cogs::BySize(&buckets));
766
767 let clock_drift_processor =
768 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
769
770 buckets.retain_mut(|bucket| {
771 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
772 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
773 return false;
774 }
775
776 if !self::metrics::is_valid_namespace(bucket, source) {
777 relay_log::debug!("dropping bucket in invalid namespace {bucket:?}");
778 return false;
779 }
780
781 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
782
783 if !matches!(source, BucketSource::Internal) {
784 bucket.metadata = BucketMetadata::new(received_timestamp);
785 }
786
787 true
788 });
789
790 let project = self.inner.project_cache.get(project_key);
791
792 let buckets = match project.state() {
795 ProjectState::Enabled(project_info) => {
796 let rate_limits = project.rate_limits().current_limits();
797 self.check_buckets(project_key, project_info, &rate_limits, buckets)
798 }
799 _ => buckets,
800 };
801
802 relay_log::trace!("merging metric buckets into the aggregator");
803 self.inner
804 .addrs
805 .aggregator
806 .send(MergeBuckets::new(project_key, buckets));
807 }
808
809 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
810 let ProcessBatchedMetrics {
811 payload,
812 source,
813 received_at,
814 sent_at,
815 } = message;
816
817 #[derive(serde::Deserialize)]
818 struct Wrapper {
819 buckets: HashMap<ProjectKey, Vec<Bucket>>,
820 }
821
822 let buckets = match serde_json::from_slice(&payload) {
823 Ok(Wrapper { buckets }) => buckets,
824 Err(error) => {
825 relay_log::debug!(
826 error = &error as &dyn Error,
827 "failed to parse batched metrics",
828 );
829 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
830 return;
831 }
832 };
833
834 for (project_key, buckets) in buckets {
835 self.handle_process_metrics(
836 cogs,
837 ProcessMetrics {
838 data: MetricData::Parsed(buckets),
839 project_key,
840 source,
841 received_at,
842 sent_at,
843 },
844 )
845 }
846 }
847
848 fn submit_upstream(
852 &self,
853 cogs: &mut Token,
854 output: Outputs,
855 ctx: processing::ForwardContext<'_>,
856 ) {
857 let _submit = cogs.start_category("submit");
858
859 #[cfg(feature = "processing")]
860 if ctx.config.processing_enabled()
861 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
862 {
863 use crate::processing::StoreHandle;
864
865 let objectstore = self.inner.addrs.objectstore.as_ref();
866 let handle = StoreHandle::new(store_forwarder, objectstore, ctx.global_config);
867
868 output
869 .forward_store(handle, ctx)
870 .unwrap_or_else(|err| err.into_inner());
871
872 return;
873 }
874
875 match output.serialize_envelope(ctx) {
876 Ok(envelope) => {
877 let envelope = ManagedEnvelope::from(envelope);
878 self.submit_envelope_upstream(envelope, ctx.project_info.upstream.clone());
879 }
880 Err(_) => relay_log::error!("failed to serialize output to an envelope"),
881 };
882 }
883
884 fn submit_envelope_upstream(
885 &self,
886 mut envelope: ManagedEnvelope,
887 upstream: Option<UpstreamDescriptor>,
890 ) {
891 if envelope.envelope_mut().is_empty() {
892 envelope.accept();
893 return;
894 }
895
896 if self.inner.config.processing_enabled() {
902 relay_log::error!(
903 "attempt to forward envelope to http upstream when processing is enabled"
904 );
905 return;
906 }
907
908 envelope.envelope_mut().set_sent_at(Utc::now());
914
915 relay_log::trace!("sending envelope to sentry endpoint");
916 let http_encoding = self.inner.config.http_encoding();
917 let result = envelope.envelope().to_vec().and_then(|v| {
918 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
919 });
920
921 match result {
922 Ok(body) => {
923 self.inner
924 .addrs
925 .upstream_relay
926 .send(SendRequest(SendEnvelope {
927 upstream,
928 envelope,
929 body,
930 http_encoding,
931 project_cache: self.inner.project_cache.clone(),
932 }));
933 }
934 Err(error) => {
935 relay_log::error!(
938 error = &error as &dyn Error,
939 tags.project_key = %envelope.scoping().project_key,
940 "failed to serialize envelope payload"
941 );
942
943 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
944 }
945 }
946 }
947
948 fn handle_submit_client_reports(&self, message: SubmitClientReports) {
949 let SubmitClientReports {
950 client_reports,
951 scoping,
952 } = message;
953
954 relay_log::trace!(
955 "sending {} client report(s) to project id {}",
956 client_reports.len(),
957 scoping.project_id
958 );
959
960 if client_reports.is_empty() {
961 return;
962 }
963
964 let upstream = self.inner.config.upstream();
965 let dsn = PartialDsn::outbound(&scoping, upstream);
966
967 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
968 for client_report in client_reports {
969 match client_report.serialize() {
970 Ok(payload) => {
971 let mut item = Item::new(ItemType::ClientReport);
972 item.set_payload(ContentType::Json, payload);
973 envelope.add_item(item);
974 }
975 Err(error) => {
976 relay_log::error!(
977 error = &error as &dyn std::error::Error,
978 "failed to serialize client report"
979 );
980 }
981 }
982 }
983
984 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
985 self.submit_envelope_upstream(envelope, None);
986 }
987
988 fn check_buckets(
989 &self,
990 project_key: ProjectKey,
991 project_info: &ProjectInfo,
992 rate_limits: &RateLimits,
993 buckets: Vec<Bucket>,
994 ) -> Vec<Bucket> {
995 let Some(scoping) = project_info.scoping(project_key) else {
996 relay_log::error!(
997 tags.project_key = project_key.as_str(),
998 "there is no scoping: dropping {} buckets",
999 buckets.len(),
1000 );
1001 return Vec::new();
1002 };
1003
1004 let mut buckets = self::metrics::apply_project_info(
1005 buckets,
1006 &self.inner.metric_outcomes,
1007 project_info,
1008 scoping,
1009 );
1010
1011 let mut namespaces: BTreeSet<MetricNamespace> = buckets
1012 .iter()
1013 .filter_map(|bucket| bucket.name.try_namespace())
1014 .collect();
1015
1016 namespaces.remove(&MetricNamespace::Outcomes);
1018
1019 for namespace in namespaces {
1020 let limits = rate_limits
1021 .check_with_quotas(project_info.get_quotas(), scoping.metric_bucket(namespace));
1022
1023 if limits.is_limited() {
1024 let rejected;
1025 (buckets, rejected) = utils::split_off(buckets, |bucket| {
1026 bucket.name.try_namespace() == Some(namespace)
1027 });
1028
1029 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1030 self.inner.metric_outcomes.track(
1031 scoping,
1032 &rejected,
1033 Outcome::RateLimited(reason_code),
1034 );
1035 }
1036 }
1037
1038 let quotas = project_info.config.quotas.clone();
1039 match MetricsLimiter::create(buckets, quotas, scoping) {
1040 Ok(mut bucket_limiter) => {
1041 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
1042 bucket_limiter.into_buckets()
1043 }
1044 Err(buckets) => buckets,
1045 }
1046 }
1047
1048 #[cfg(feature = "processing")]
1049 async fn rate_limit_buckets(
1050 &self,
1051 scoping: Scoping,
1052 project_info: &ProjectInfo,
1053 mut buckets: Vec<Bucket>,
1054 ) -> Vec<Bucket> {
1055 let Some(rate_limiter) = &self.inner.rate_limiter else {
1056 return buckets;
1057 };
1058
1059 let global_config = self.inner.global_config.current().unwrap_or_default();
1060 let mut namespaces = buckets
1061 .iter()
1062 .filter_map(|bucket| bucket.name.try_namespace())
1063 .counts();
1064
1065 namespaces.remove(&MetricNamespace::Outcomes);
1067
1068 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
1069
1070 for (namespace, quantity) in namespaces {
1071 let item_scoping = scoping.metric_bucket(namespace);
1072
1073 let limits = match rate_limiter
1074 .is_rate_limited(quotas, item_scoping, quantity, false)
1075 .await
1076 {
1077 Ok(limits) => limits,
1078 Err(err) => {
1079 relay_log::error!(
1080 error = &err as &dyn std::error::Error,
1081 "failed to check redis rate limits"
1082 );
1083 break;
1084 }
1085 };
1086
1087 if limits.is_limited() {
1088 let rejected;
1089 (buckets, rejected) = utils::split_off(buckets, |bucket| {
1090 bucket.name.try_namespace() == Some(namespace)
1091 });
1092
1093 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1094 self.inner.metric_outcomes.track(
1095 scoping,
1096 &rejected,
1097 Outcome::RateLimited(reason_code),
1098 );
1099
1100 self.inner
1101 .project_cache
1102 .get(item_scoping.scoping.project_key)
1103 .rate_limits()
1104 .merge(limits);
1105 }
1106 }
1107
1108 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
1109 Err(buckets) => buckets,
1110 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
1111 }
1112 }
1113
1114 #[cfg(feature = "processing")]
1116 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
1117 relay_log::trace!("handle_rate_limit_buckets");
1118
1119 let scoping = *bucket_limiter.scoping();
1120
1121 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
1122 let global_config = self.inner.global_config.current().unwrap_or_default();
1123 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
1124
1125 let over_accept_once = true;
1128 let mut rate_limits = RateLimits::new();
1129
1130 let (category, count) = bucket_limiter.count();
1131
1132 let timer = Instant::now();
1133 let mut is_limited = false;
1134
1135 if let Some(count) = count {
1136 match rate_limiter
1137 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
1138 .await
1139 {
1140 Ok(limits) => {
1141 is_limited = limits.is_limited();
1142 rate_limits.merge(limits)
1143 }
1144 Err(e) => {
1145 relay_log::error!(error = &e as &dyn Error, "rate limiting error")
1146 }
1147 }
1148 }
1149
1150 relay_statsd::metric!(
1151 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
1152 category = category.name(),
1153 limited = if is_limited { "true" } else { "false" },
1154 count = match count {
1155 None => "none",
1156 Some(0) => "0",
1157 Some(1) => "1",
1158 Some(1..=10) => "10",
1159 Some(1..=25) => "25",
1160 Some(1..=50) => "50",
1161 Some(51..=100) => "100",
1162 Some(101..=500) => "500",
1163 _ => "> 500",
1164 },
1165 );
1166
1167 if rate_limits.is_limited() {
1168 let was_enforced =
1169 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
1170
1171 if was_enforced {
1172 self.inner
1174 .project_cache
1175 .get(scoping.project_key)
1176 .rate_limits()
1177 .merge(rate_limits);
1178 }
1179 }
1180 }
1181
1182 bucket_limiter.into_buckets()
1183 }
1184
1185 #[cfg(feature = "processing")]
1192 async fn encode_metrics_processing(
1193 &self,
1194 message: FlushBuckets,
1195 store_forwarder: &Addr<Store>,
1196 ) {
1197 use crate::constants::DEFAULT_EVENT_RETENTION;
1198 use crate::services::store::StoreMetrics;
1199 use relay_dynamic_config::Feature;
1200
1201 for ProjectBuckets {
1202 buckets,
1203 scoping,
1204 project_info,
1205 ..
1206 } in message.buckets.into_values()
1207 {
1208 let mut buckets = self
1209 .rate_limit_buckets(scoping, &project_info, buckets)
1210 .await;
1211
1212 if buckets.is_empty() {
1213 continue;
1214 }
1215
1216 if project_info
1217 .config
1218 .features
1219 .has(Feature::GenerateBillingOutcome)
1220 {
1221 self.inner
1223 .metric_outcomes
1224 .track_accepted_outcome(scoping, &mut buckets);
1225 }
1226
1227 let retention = project_info
1228 .config
1229 .event_retention
1230 .unwrap_or(DEFAULT_EVENT_RETENTION);
1231
1232 store_forwarder.send(StoreMetrics {
1235 buckets,
1236 scoping,
1237 retention,
1238 });
1239 }
1240 }
1241
1242 fn encode_metrics_envelope(&self, message: FlushBuckets) {
1252 let FlushBuckets {
1253 partition_key,
1254 buckets,
1255 } = message;
1256
1257 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
1258 let upstream = self.inner.config.upstream();
1259
1260 for ProjectBuckets {
1261 buckets,
1262 scoping,
1263 project_info,
1264 ..
1265 } in buckets.values()
1266 {
1267 let dsn = PartialDsn::outbound(scoping, upstream);
1268
1269 relay_statsd::metric!(
1270 distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
1271 );
1272
1273 let mut num_batches = 0;
1274 for batch in BucketsView::from(buckets).by_size(batch_size) {
1275 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
1276
1277 let mut item = Item::new(ItemType::MetricBuckets);
1278 item.set_source_quantities(crate::metrics::extract_quantities(batch));
1279 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
1280 envelope.add_item(item);
1281
1282 let mut envelope =
1283 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1284 envelope
1285 .set_partition_key(Some(partition_key))
1286 .scope(*scoping);
1287
1288 relay_statsd::metric!(
1289 distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
1290 );
1291
1292 self.submit_envelope_upstream(envelope, project_info.upstream.clone());
1293 num_batches += 1;
1294 }
1295
1296 relay_statsd::metric!(
1297 distribution(RelayDistributions::BatchesPerPartition) = num_batches
1298 );
1299 }
1300 }
1301
1302 fn send_global_partition(
1304 &self,
1305 upstream: Option<UpstreamDescriptor>,
1306 partition_key: u32,
1307 partition: &mut Partition<'_>,
1308 ) {
1309 if partition.is_empty() {
1310 return;
1311 }
1312
1313 let (unencoded, project_info) = partition.take();
1314 let http_encoding = self.inner.config.http_encoding();
1315 let encoded = match encode_payload(&unencoded, http_encoding) {
1316 Ok(payload) => payload,
1317 Err(error) => {
1318 let error = &error as &dyn std::error::Error;
1319 relay_log::error!(error, "failed to encode metrics payload");
1320 return;
1321 }
1322 };
1323
1324 let request = SendMetricsRequest {
1325 upstream,
1326 partition_key: partition_key.to_string(),
1327 unencoded,
1328 encoded,
1329 project_info,
1330 http_encoding,
1331 metric_outcomes: self.inner.metric_outcomes.clone(),
1332 };
1333
1334 self.inner.addrs.upstream_relay.send(SendRequest(request));
1335 }
1336
1337 fn encode_metrics_global(&self, message: FlushBuckets) {
1348 let FlushBuckets {
1349 partition_key,
1350 buckets,
1351 } = message;
1352
1353 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
1354 let mut partitions = BTreeMap::new();
1355 let mut partition_splits = 0;
1356
1357 for ProjectBuckets {
1358 buckets,
1359 scoping,
1360 project_info,
1361 ..
1362 } in buckets.values()
1363 {
1364 let partition = match partitions.get_mut(&project_info.upstream) {
1365 Some(partition) => partition,
1366 None => partitions
1367 .entry(project_info.upstream.clone())
1368 .or_insert_with(|| Partition::new(batch_size)),
1369 };
1370
1371 for bucket in buckets {
1372 let mut remaining = Some(BucketView::new(bucket));
1373
1374 while let Some(bucket) = remaining.take() {
1375 if let Some(next) = partition.insert(bucket, *scoping) {
1376 self.send_global_partition(
1380 project_info.upstream.clone(),
1381 partition_key,
1382 partition,
1383 );
1384 remaining = Some(next);
1385 partition_splits += 1;
1386 }
1387 }
1388 }
1389 }
1390
1391 if partition_splits > 0 {
1392 metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
1393 }
1394
1395 for (upstream, mut partition) in partitions {
1396 self.send_global_partition(upstream, partition_key, &mut partition);
1397 }
1398 }
1399
1400 fn encode_metrics_client_reports(&self, mut message: FlushBuckets) -> FlushBuckets {
1404 for ProjectBuckets {
1405 buckets, scoping, ..
1406 } in message.buckets.values_mut()
1407 {
1408 let client_reports = outcome::metric::extract_client_reports(buckets).collect();
1409
1410 self.handle_submit_client_reports(SubmitClientReports {
1411 client_reports,
1412 scoping: *scoping,
1413 });
1414 }
1415
1416 message
1417 }
1418
1419 async fn handle_flush_buckets(&self, mut message: FlushBuckets) {
1420 for (project_key, pb) in message.buckets.iter_mut() {
1421 let buckets = std::mem::take(&mut pb.buckets);
1422 pb.buckets =
1423 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
1424 }
1425
1426 #[cfg(feature = "processing")]
1427 if self.inner.config.processing_enabled()
1428 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
1429 {
1430 return self
1431 .encode_metrics_processing(message, store_forwarder)
1432 .await;
1433 }
1434
1435 if self.inner.config.emit_outcomes() == EmitOutcomes::AsClientReports {
1438 message = self.encode_metrics_client_reports(message);
1441 }
1442
1443 if self.inner.config.http_global_metrics() {
1444 self.encode_metrics_global(message)
1445 } else {
1446 self.encode_metrics_envelope(message)
1447 }
1448 }
1449
1450 #[cfg(all(test, feature = "processing"))]
1451 fn redis_rate_limiter_enabled(&self) -> bool {
1452 self.inner.rate_limiter.is_some()
1453 }
1454
1455 async fn handle_message(self, message: EnvelopeProcessor) {
1456 let ty = message.variant();
1457 let feature_weights = self.feature_weights(&message);
1458
1459 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
1460 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
1461
1462 match message {
1463 EnvelopeProcessor::ProcessEnvelope(m) => {
1464 self.handle_process_envelope(&mut cogs, *m).await
1465 }
1466 EnvelopeProcessor::ProcessProjectMetrics(m) => {
1467 self.handle_process_metrics(&mut cogs, *m)
1468 }
1469 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
1470 self.handle_process_batched_metrics(&mut cogs, *m)
1471 }
1472 EnvelopeProcessor::FlushBuckets(m) => self.handle_flush_buckets(*m).await,
1473 EnvelopeProcessor::SubmitClientReports(m) => self.handle_submit_client_reports(*m),
1474 }
1475 });
1476 }
1477
1478 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
1479 match message {
1480 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
1482 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
1483 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
1484 EnvelopeProcessor::FlushBuckets(v) => v
1485 .buckets
1486 .values()
1487 .map(|s| {
1488 if self.inner.config.processing_enabled() {
1489 relay_metrics::cogs::ByCount(&s.buckets).into()
1492 } else {
1493 relay_metrics::cogs::BySize(&s.buckets).into()
1494 }
1495 })
1496 .fold(FeatureWeights::none(), FeatureWeights::merge),
1497 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
1498 }
1499 }
1500}
1501
1502impl Service for EnvelopeProcessorService {
1503 type Interface = EnvelopeProcessor;
1504
1505 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
1506 while let Some(message) = rx.recv().await {
1507 let service = self.clone();
1508 let hub = relay_log::Hub::with(|h| relay_log::Hub::new_from_top(h));
1510
1511 self.inner
1512 .pool
1513 .spawn_async(Box::pin(service.handle_message(message).bind_hub(hub)))
1514 .await;
1515 }
1516 }
1517}
1518
1519pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
1520 let envelope_body: Vec<u8> = match http_encoding {
1521 HttpEncoding::Identity => return Ok(body.clone()),
1522 HttpEncoding::Deflate => {
1523 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
1524 encoder.write_all(body.as_ref())?;
1525 encoder.finish()?
1526 }
1527 HttpEncoding::Gzip => {
1528 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
1529 encoder.write_all(body.as_ref())?;
1530 encoder.finish()?
1531 }
1532 HttpEncoding::Br => {
1533 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
1535 encoder.write_all(body.as_ref())?;
1536 encoder.into_inner()
1537 }
1538 HttpEncoding::Zstd => {
1539 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
1542 encoder.write_all(body.as_ref())?;
1543 encoder.finish()?
1544 }
1545 };
1546
1547 Ok(envelope_body.into())
1548}
1549
1550#[derive(Debug)]
1552pub struct SendEnvelope {
1553 pub upstream: Option<UpstreamDescriptor>,
1554 pub envelope: ManagedEnvelope,
1555 pub body: Bytes,
1556 pub http_encoding: HttpEncoding,
1557 pub project_cache: ProjectCacheHandle,
1558}
1559
1560impl UpstreamRequest for SendEnvelope {
1561 fn upstream(&self) -> Option<&UpstreamDescriptor> {
1562 self.upstream.as_ref()
1563 }
1564
1565 fn method(&self) -> reqwest::Method {
1566 reqwest::Method::POST
1567 }
1568
1569 fn path(&self) -> Cow<'_, str> {
1570 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
1571 }
1572
1573 fn route(&self) -> &'static str {
1574 "envelope"
1575 }
1576
1577 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
1578 let envelope_body = self.body.clone();
1579 metric!(
1580 distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
1581 );
1582
1583 let meta = &self.envelope.meta();
1584 let shard = self.envelope.partition_key().map(|p| p.to_string());
1585 builder
1586 .content_encoding(self.http_encoding)
1587 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
1588 .header_opt("User-Agent", meta.user_agent())
1589 .header("X-Sentry-Auth", meta.auth_header())
1590 .header("X-Forwarded-For", meta.forwarded_for())
1591 .header("Content-Type", envelope::CONTENT_TYPE)
1592 .header_opt("X-Sentry-Relay-Shard", shard)
1593 .body(envelope_body);
1594
1595 Ok(())
1596 }
1597
1598 fn sign(&mut self) -> Option<Sign> {
1599 Some(Sign::Optional(SignatureType::RequestSign))
1600 }
1601
1602 fn respond(
1603 self: Box<Self>,
1604 result: Result<http::Response, UpstreamRequestError>,
1605 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
1606 Box::pin(async move {
1607 let result = match result {
1608 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
1609 Err(error) => Err(error),
1610 };
1611
1612 match result {
1613 Ok(()) => self.envelope.accept(),
1614 Err(error) if error.is_received() => {
1615 let scoping = self.envelope.scoping();
1616 self.envelope.accept();
1617
1618 if let UpstreamRequestError::RateLimited(limits) = error {
1619 self.project_cache
1620 .get(scoping.project_key)
1621 .rate_limits()
1622 .merge(limits.scope(&scoping));
1623 }
1624 }
1625 Err(error) => {
1626 let mut envelope = self.envelope;
1629 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1630 relay_log::error!(
1631 error = &error as &dyn Error,
1632 tags.project_key = %envelope.scoping().project_key,
1633 "error sending envelope"
1634 );
1635 }
1636 }
1637 })
1638 }
1639}
1640
1641#[derive(Debug)]
1648struct Partition<'a> {
1649 max_size: usize,
1650 remaining: usize,
1651 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
1652 project_info: HashMap<ProjectKey, Scoping>,
1653}
1654
1655impl<'a> Partition<'a> {
1656 pub fn new(size: usize) -> Self {
1658 Self {
1659 max_size: size,
1660 remaining: size,
1661 views: HashMap::new(),
1662 project_info: HashMap::new(),
1663 }
1664 }
1665
1666 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
1677 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
1678
1679 if let Some(current) = current {
1680 self.remaining = self.remaining.saturating_sub(current.estimated_size());
1681 self.views
1682 .entry(scoping.project_key)
1683 .or_default()
1684 .push(current);
1685
1686 self.project_info
1687 .entry(scoping.project_key)
1688 .or_insert(scoping);
1689 }
1690
1691 next
1692 }
1693
1694 fn is_empty(&self) -> bool {
1696 self.views.is_empty()
1697 }
1698
1699 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
1703 #[derive(serde::Serialize)]
1704 struct Wrapper<'a> {
1705 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
1706 }
1707
1708 let buckets = &self.views;
1709 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
1710
1711 let scopings = std::mem::take(&mut self.project_info);
1712
1713 self.views.clear();
1714 self.remaining = self.max_size;
1715
1716 (payload, scopings)
1717 }
1718}
1719
1720#[derive(Debug)]
1724struct SendMetricsRequest {
1725 upstream: Option<UpstreamDescriptor>,
1727 partition_key: String,
1729 unencoded: Bytes,
1731 encoded: Bytes,
1733 project_info: HashMap<ProjectKey, Scoping>,
1737 http_encoding: HttpEncoding,
1739 metric_outcomes: MetricOutcomes,
1741}
1742
1743impl SendMetricsRequest {
1744 fn create_error_outcomes(self) {
1745 #[derive(serde::Deserialize)]
1746 struct Wrapper {
1747 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
1748 }
1749
1750 let buckets = match serde_json::from_slice(&self.unencoded) {
1751 Ok(Wrapper { buckets }) => buckets,
1752 Err(err) => {
1753 relay_log::error!(
1754 error = &err as &dyn std::error::Error,
1755 "failed to parse buckets from failed transmission"
1756 );
1757 return;
1758 }
1759 };
1760
1761 for (key, buckets) in buckets {
1762 let Some(&scoping) = self.project_info.get(&key) else {
1763 relay_log::error!("missing scoping for project key");
1764 continue;
1765 };
1766
1767 self.metric_outcomes.track(
1768 scoping,
1769 &buckets,
1770 Outcome::Invalid(DiscardReason::Internal),
1771 );
1772 }
1773 }
1774}
1775
1776impl UpstreamRequest for SendMetricsRequest {
1777 fn upstream(&self) -> Option<&UpstreamDescriptor> {
1778 self.upstream.as_ref()
1779 }
1780
1781 fn set_relay_id(&self) -> bool {
1782 true
1783 }
1784
1785 fn sign(&mut self) -> Option<Sign> {
1786 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
1787 }
1788
1789 fn method(&self) -> reqwest::Method {
1790 reqwest::Method::POST
1791 }
1792
1793 fn path(&self) -> Cow<'_, str> {
1794 "/api/0/relays/metrics/".into()
1795 }
1796
1797 fn route(&self) -> &'static str {
1798 "global_metrics"
1799 }
1800
1801 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
1802 metric!(
1803 distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
1804 );
1805
1806 builder
1807 .content_encoding(self.http_encoding)
1808 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
1809 .header(header::CONTENT_TYPE, b"application/json")
1810 .body(self.encoded.clone());
1811
1812 Ok(())
1813 }
1814
1815 fn respond(
1816 self: Box<Self>,
1817 result: Result<http::Response, UpstreamRequestError>,
1818 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
1819 Box::pin(async {
1820 match result {
1821 Ok(mut response) => {
1822 response.consume().await.ok();
1823 }
1824 Err(error) => {
1825 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
1826
1827 if error.is_received() {
1830 return;
1831 }
1832
1833 self.create_error_outcomes()
1834 }
1835 }
1836 })
1837 }
1838}
1839
1840#[derive(Copy, Clone, Debug)]
1842#[cfg(feature = "processing")]
1843struct CombinedQuotas<'a> {
1844 global_quotas: &'a [Quota],
1845 project_quotas: &'a [Quota],
1846}
1847
1848#[cfg(feature = "processing")]
1849impl<'a> CombinedQuotas<'a> {
1850 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
1852 Self {
1853 global_quotas: &global_config.quotas,
1854 project_quotas,
1855 }
1856 }
1857}
1858
1859#[cfg(feature = "processing")]
1860impl<'a> IntoIterator for CombinedQuotas<'a> {
1861 type Item = &'a Quota;
1862 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
1863
1864 fn into_iter(self) -> Self::IntoIter {
1865 self.global_quotas.iter().chain(self.project_quotas.iter())
1866 }
1867}
1868
1869#[cfg(test)]
1870mod tests {
1871 use insta::assert_debug_snapshot;
1872 use relay_common::glob2::LazyGlob;
1873 use relay_dynamic_config::ProjectConfig;
1874 use relay_event_normalization::{
1875 NormalizationConfig, RedactionRule, TransactionNameConfig, TransactionNameRule,
1876 };
1877 use relay_event_schema::protocol::{Event, EventId, TransactionSource};
1878 use relay_pii::DataScrubbingConfig;
1879 use relay_protocol::Annotated;
1880 #[cfg(feature = "processing")]
1881 use relay_quotas::DataCategory;
1882 use similar_asserts::assert_eq;
1883
1884 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
1885
1886 #[cfg(feature = "processing")]
1887 use {
1888 relay_metrics::BucketValue,
1889 relay_quotas::{QuotaScope, ReasonCode},
1890 relay_test::mock_service,
1891 };
1892
1893 use super::*;
1894
1895 async fn process_to_single_envelope<'a>(
1896 processor: &EnvelopeProcessorService,
1897 envelope: ManagedEnvelope,
1898 ctx: processing::Context<'a>,
1899 ) -> Box<Envelope> {
1900 let mut outputs = processor.process(envelope, ctx).await;
1901 assert_eq!(outputs.len(), 1);
1902
1903 let Output { main, metrics } = outputs.pop().unwrap();
1904
1905 if let Some(metrics) = metrics {
1906 metrics.accept(drop);
1907 }
1908
1909 main.unwrap()
1910 .serialize_envelope(ctx.to_forward())
1911 .unwrap()
1912 .accept(|envelope| envelope)
1913 }
1914
1915 #[cfg(feature = "processing")]
1916 fn mock_quota(id: &str) -> Quota {
1917 Quota {
1918 id: Some(id.into()),
1919 categories: [DataCategory::MetricBucket].into(),
1920 scope: QuotaScope::Organization,
1921 scope_id: None,
1922 limit: Some(0),
1923 window: None,
1924 reason_code: None,
1925 namespace: None,
1926 }
1927 }
1928
1929 #[cfg(feature = "processing")]
1930 #[test]
1931 fn test_dynamic_quotas() {
1932 let global_config = relay_dynamic_config::GlobalConfig {
1933 quotas: vec![mock_quota("foo"), mock_quota("bar")],
1934 ..Default::default()
1935 };
1936
1937 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
1938
1939 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
1940
1941 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
1942 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
1943 }
1944
1945 #[cfg(feature = "processing")]
1948 #[tokio::test]
1949 async fn test_ratelimit_per_batch() {
1950 use relay_base_schema::organization::OrganizationId;
1951 use relay_protocol::FiniteF64;
1952
1953 let rate_limited_org = Scoping {
1954 organization_id: OrganizationId::new(1),
1955 project_id: ProjectId::new(21),
1956 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
1957 key_id: Some(17),
1958 };
1959
1960 let not_rate_limited_org = Scoping {
1961 organization_id: OrganizationId::new(2),
1962 project_id: ProjectId::new(21),
1963 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
1964 key_id: Some(17),
1965 };
1966
1967 let message = {
1968 let project_info = {
1969 let quota = Quota {
1970 id: Some("testing".into()),
1971 categories: [DataCategory::MetricBucket].into(),
1972 scope: relay_quotas::QuotaScope::Organization,
1973 scope_id: Some(rate_limited_org.organization_id.to_string().into()),
1974 limit: Some(0),
1975 window: None,
1976 reason_code: Some(ReasonCode::new("test")),
1977 namespace: None,
1978 };
1979
1980 let mut config = ProjectConfig::default();
1981 config.quotas.push(quota);
1982
1983 Arc::new(ProjectInfo {
1984 config,
1985 ..Default::default()
1986 })
1987 };
1988
1989 let project_metrics = |scoping| ProjectBuckets {
1990 buckets: vec![Bucket {
1991 name: "d:spans/bar".into(),
1992 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
1993 timestamp: UnixTimestamp::now(),
1994 tags: Default::default(),
1995 width: 10,
1996 metadata: BucketMetadata::default(),
1997 }],
1998 rate_limits: Default::default(),
1999 project_info: project_info.clone(),
2000 scoping,
2001 };
2002
2003 let buckets = hashbrown::HashMap::from([
2004 (
2005 rate_limited_org.project_key,
2006 project_metrics(rate_limited_org),
2007 ),
2008 (
2009 not_rate_limited_org.project_key,
2010 project_metrics(not_rate_limited_org),
2011 ),
2012 ]);
2013
2014 FlushBuckets {
2015 partition_key: 0,
2016 buckets,
2017 }
2018 };
2019
2020 assert_eq!(message.buckets.keys().count(), 2);
2022
2023 let config = {
2024 let config_json = serde_json::json!({
2025 "processing": {
2026 "enabled": true,
2027 "kafka_config": [],
2028 "redis": {
2029 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
2030 }
2031 }
2032 });
2033 Config::from_json_value(config_json).unwrap()
2034 };
2035
2036 let (store, handle) = {
2037 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
2038 let org_id = match msg {
2039 Store::Metrics(x) => x.scoping.organization_id,
2040 _ => panic!("received envelope when expecting only metrics"),
2041 };
2042 org_ids.push(org_id);
2043 };
2044
2045 mock_service("store_forwarder", vec![], f)
2046 };
2047
2048 let processor = create_test_processor(config).await;
2049 assert!(processor.redis_rate_limiter_enabled());
2050
2051 processor.encode_metrics_processing(message, &store).await;
2052
2053 drop(store);
2054 let orgs_not_ratelimited = handle.await.unwrap();
2055
2056 assert_eq!(
2057 orgs_not_ratelimited,
2058 vec![not_rate_limited_org.organization_id]
2059 );
2060 }
2061
2062 #[tokio::test]
2063 async fn test_browser_version_extraction_with_pii_like_data() {
2064 let processor = create_test_processor(Default::default()).await;
2065 let outcome_aggregator = Addr::dummy();
2066 let event_id = EventId::new();
2067
2068 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2069 .parse()
2070 .unwrap();
2071
2072 let request_meta = RequestMeta::new(dsn);
2073 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
2074
2075 envelope.add_item({
2076 let mut item = Item::new(ItemType::Event);
2077 item.set_payload(
2078 ContentType::Json,
2079 r#"
2080 {
2081 "request": {
2082 "headers": [
2083 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
2084 ]
2085 }
2086 }
2087 "#,
2088 );
2089 item
2090 });
2091
2092 let mut datascrubbing_settings = DataScrubbingConfig::default();
2093 datascrubbing_settings.scrub_data = true;
2095 datascrubbing_settings.scrub_defaults = true;
2096 datascrubbing_settings.scrub_ip_addresses = true;
2097
2098 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
2100
2101 let config = ProjectConfig {
2102 datascrubbing_settings,
2103 pii_config: Some(pii_config),
2104 ..Default::default()
2105 };
2106
2107 let project_info = ProjectInfo {
2108 config,
2109 ..Default::default()
2110 };
2111
2112 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2113
2114 let ctx = processing::Context {
2115 project_info: &project_info,
2116 ..processing::Context::for_test()
2117 };
2118
2119 let new_envelope = process_to_single_envelope(&processor, envelope, ctx).await;
2120
2121 let event_item = new_envelope.items().last().unwrap();
2122 let annotated_event: Annotated<Event> =
2123 Annotated::from_json_bytes(&event_item.payload()).unwrap();
2124 let event = annotated_event.into_value().unwrap();
2125 let headers = event
2126 .request
2127 .into_value()
2128 .unwrap()
2129 .headers
2130 .into_value()
2131 .unwrap();
2132
2133 assert_eq!(
2135 Some(
2136 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
2137 ),
2138 headers.get_header("User-Agent")
2139 );
2140 let contexts = event.contexts.into_value().unwrap();
2142 let browser = contexts.0.get("browser").unwrap();
2143 assert_eq!(
2144 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
2145 browser.to_json().unwrap()
2146 );
2147 }
2148
2149 #[tokio::test]
2150 #[cfg(feature = "processing")]
2151 async fn test_materialize_dsc() {
2152 use crate::services::projects::project::PublicKeyConfig;
2153
2154 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2155 .parse()
2156 .unwrap();
2157 let request_meta = RequestMeta::new(dsn);
2158 let mut envelope = Envelope::from_request(None, request_meta);
2159
2160 let dsc = r#"{
2161 "trace_id": "00000000-0000-0000-0000-000000000001",
2162 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
2163 "sample_rate": "0.2"
2164 }"#;
2165 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
2166
2167 let mut item = Item::new(ItemType::Event);
2168 item.set_payload(ContentType::Json, r#"{}"#);
2169 envelope.add_item(item);
2170
2171 let outcome_aggregator = Addr::dummy();
2172 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2173
2174 let mut project_info = ProjectInfo::default();
2175 project_info.public_keys.push(PublicKeyConfig {
2176 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
2177 numeric_id: Some(1),
2178 });
2179
2180 let config = serde_json::json!({
2181 "processing": {
2182 "enabled": true,
2183 "kafka_config": [],
2184 }
2185 });
2186
2187 let processor =
2188 create_test_processor(Config::from_json_value(config.clone()).unwrap()).await;
2189 let config = Config::from_json_value(config).unwrap();
2190 let ctx = processing::Context {
2191 config: &config,
2192 project_info: &project_info,
2193 sampling_project_info: Some(&project_info),
2194 ..processing::Context::for_test()
2195 };
2196
2197 let envelope = process_to_single_envelope(&processor, managed_envelope, ctx).await;
2198 let event = envelope
2199 .get_item_by(|item| item.ty() == &ItemType::Event)
2200 .unwrap();
2201
2202 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
2203 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
2204 Object(
2205 {
2206 "environment": ~,
2207 "public_key": String(
2208 "e12d836b15bb49d7bbf99e64295d995b",
2209 ),
2210 "release": ~,
2211 "replay_id": ~,
2212 "sample_rate": String(
2213 "0.2",
2214 ),
2215 "trace_id": String(
2216 "00000000000000000000000000000001",
2217 ),
2218 "transaction": ~,
2219 },
2220 )
2221 "###);
2222 }
2223
2224 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
2225 let mut event = Annotated::<Event>::from_json(
2226 r#"
2227 {
2228 "type": "transaction",
2229 "transaction": "/foo/",
2230 "timestamp": 946684810.0,
2231 "start_timestamp": 946684800.0,
2232 "contexts": {
2233 "trace": {
2234 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
2235 "span_id": "fa90fdead5f74053",
2236 "op": "http.server",
2237 "type": "trace"
2238 }
2239 },
2240 "transaction_info": {
2241 "source": "url"
2242 }
2243 }
2244 "#,
2245 )
2246 .unwrap();
2247 let e = event.value_mut().as_mut().unwrap();
2248 e.transaction.set_value(Some(transaction_name.into()));
2249
2250 e.transaction_info
2251 .value_mut()
2252 .as_mut()
2253 .unwrap()
2254 .source
2255 .set_value(Some(source));
2256
2257 relay_statsd::with_capturing_test_client(|| {
2258 utils::log_transaction_name_metrics(&mut event, |event| {
2259 let config = NormalizationConfig {
2260 transaction_name_config: TransactionNameConfig {
2261 rules: &[TransactionNameRule {
2262 pattern: LazyGlob::new("/foo/*/**".to_owned()),
2263 expiry: DateTime::<Utc>::MAX_UTC,
2264 redaction: RedactionRule::Replace {
2265 substitution: "*".to_owned(),
2266 },
2267 }],
2268 },
2269 ..Default::default()
2270 };
2271 relay_event_normalization::normalize_event(event, &config)
2272 });
2273 })
2274 }
2275
2276 #[test]
2277 fn test_log_transaction_metrics_none() {
2278 let captures = capture_test_event("/nothing", TransactionSource::Url);
2279 insta::assert_debug_snapshot!(captures, @r###"
2280 [
2281 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
2282 ]
2283 "###);
2284 }
2285
2286 #[test]
2287 fn test_log_transaction_metrics_rule() {
2288 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
2289 insta::assert_debug_snapshot!(captures, @r###"
2290 [
2291 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
2292 ]
2293 "###);
2294 }
2295
2296 #[test]
2297 fn test_log_transaction_metrics_pattern() {
2298 let captures = capture_test_event("/something/12345", TransactionSource::Url);
2299 insta::assert_debug_snapshot!(captures, @r###"
2300 [
2301 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
2302 ]
2303 "###);
2304 }
2305
2306 #[test]
2307 fn test_log_transaction_metrics_both() {
2308 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
2309 insta::assert_debug_snapshot!(captures, @r###"
2310 [
2311 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
2312 ]
2313 "###);
2314 }
2315
2316 #[test]
2317 fn test_log_transaction_metrics_no_match() {
2318 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
2319 insta::assert_debug_snapshot!(captures, @r###"
2320 [
2321 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
2322 ]
2323 "###);
2324 }
2325
2326 #[tokio::test]
2327 async fn test_process_metrics_bucket_metadata() {
2328 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
2329 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
2330 let received_at = Utc::now();
2331 let config = Config::default();
2332
2333 let (aggregator, mut aggregator_rx) = Addr::custom();
2334 let processor = create_test_processor_with_addrs(
2335 config,
2336 Addrs {
2337 aggregator,
2338 ..Default::default()
2339 },
2340 )
2341 .await;
2342
2343 let mut item = Item::new(ItemType::Statsd);
2344 item.set_payload(ContentType::Text, "spans/foo:3182887624:4267882815|s");
2345 for (source, expected_received_at) in [
2346 (
2347 BucketSource::External,
2348 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
2349 ),
2350 (BucketSource::Internal, None),
2351 ] {
2352 let message = ProcessMetrics {
2353 data: MetricData::Raw(vec![item.clone()]),
2354 project_key,
2355 source,
2356 received_at,
2357 sent_at: Some(Utc::now()),
2358 };
2359 processor.handle_process_metrics(&mut token, message);
2360
2361 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
2362 let buckets = merge_buckets.buckets;
2363 assert_eq!(buckets.len(), 1);
2364 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
2365 }
2366 }
2367
2368 #[tokio::test]
2369 async fn test_process_batched_metrics() {
2370 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
2371 let received_at = Utc::now();
2372 let config = Config::default();
2373
2374 let (aggregator, mut aggregator_rx) = Addr::custom();
2375 let processor = create_test_processor_with_addrs(
2376 config,
2377 Addrs {
2378 aggregator,
2379 ..Default::default()
2380 },
2381 )
2382 .await;
2383
2384 let payload = r#"{
2385 "buckets": {
2386 "11111111111111111111111111111111": [
2387 {
2388 "timestamp": 1615889440,
2389 "width": 0,
2390 "name": "d:custom/endpoint.response_time@millisecond",
2391 "type": "d",
2392 "value": [
2393 68.0
2394 ],
2395 "tags": {
2396 "route": "user_index"
2397 }
2398 }
2399 ],
2400 "22222222222222222222222222222222": [
2401 {
2402 "timestamp": 1615889440,
2403 "width": 0,
2404 "name": "d:custom/endpoint.cache_rate@none",
2405 "type": "d",
2406 "value": [
2407 36.0
2408 ]
2409 }
2410 ]
2411 }
2412}
2413"#;
2414 let message = ProcessBatchedMetrics {
2415 payload: Bytes::from(payload),
2416 source: BucketSource::Internal,
2417 received_at,
2418 sent_at: Some(Utc::now()),
2419 };
2420 processor.handle_process_batched_metrics(&mut token, message);
2421
2422 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
2423 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
2424
2425 let mut messages = vec![mb1, mb2];
2426 messages.sort_by_key(|mb| mb.project_key);
2427
2428 let actual = messages
2429 .into_iter()
2430 .map(|mb| (mb.project_key, mb.buckets))
2431 .collect::<Vec<_>>();
2432
2433 assert_debug_snapshot!(actual, @r###"
2434 [
2435 (
2436 ProjectKey("11111111111111111111111111111111"),
2437 [
2438 Bucket {
2439 timestamp: UnixTimestamp(1615889440),
2440 width: 0,
2441 name: MetricName(
2442 "d:custom/endpoint.response_time@millisecond",
2443 ),
2444 value: Distribution(
2445 [
2446 68.0,
2447 ],
2448 ),
2449 tags: {
2450 "route": "user_index",
2451 },
2452 metadata: BucketMetadata {
2453 merges: 1,
2454 received_at: None,
2455 extracted_from_indexed: false,
2456 },
2457 },
2458 ],
2459 ),
2460 (
2461 ProjectKey("22222222222222222222222222222222"),
2462 [
2463 Bucket {
2464 timestamp: UnixTimestamp(1615889440),
2465 width: 0,
2466 name: MetricName(
2467 "d:custom/endpoint.cache_rate@none",
2468 ),
2469 value: Distribution(
2470 [
2471 36.0,
2472 ],
2473 ),
2474 tags: {},
2475 metadata: BucketMetadata {
2476 merges: 1,
2477 received_at: None,
2478 extracted_from_indexed: false,
2479 },
2480 },
2481 ],
2482 ),
2483 ]
2484 "###);
2485 }
2486}