1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::Feature;
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{ClientReport, Event, EventId, NetworkReportError, SpanV2};
27use relay_filter::FilterStatKey;
28use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
29use relay_protocol::Annotated;
30use relay_quotas::{DataCategory, RateLimits, Scoping};
31use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision};
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, NoResponse, Service};
34use reqwest::header;
35use smallvec::{SmallVec, smallvec};
36use zstd::stream::Encoder as ZstdEncoder;
37
38use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType};
39use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
40use crate::integrations::Integration;
41use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
42use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
43use crate::metrics_extraction::ExtractedMetrics;
44use crate::processing::attachments::AttachmentProcessor;
45use crate::processing::check_ins::CheckInsProcessor;
46use crate::processing::client_reports::ClientReportsProcessor;
47use crate::processing::errors::{ErrorsProcessor, SwitchProcessingError};
48use crate::processing::legacy_spans::LegacySpansProcessor;
49use crate::processing::logs::LogsProcessor;
50use crate::processing::profile_chunks::ProfileChunksProcessor;
51use crate::processing::profiles::ProfilesProcessor;
52use crate::processing::replays::ReplaysProcessor;
53use crate::processing::sessions::SessionsProcessor;
54use crate::processing::spans::SpansProcessor;
55use crate::processing::trace_attachments::TraceAttachmentsProcessor;
56use crate::processing::trace_metrics::TraceMetricsProcessor;
57use crate::processing::transactions::TransactionProcessor;
58use crate::processing::user_reports::UserReportsProcessor;
59use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
60use crate::service::ServiceError;
61use crate::services::global_config::GlobalConfigHandle;
62use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
63use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
64use crate::services::projects::cache::ProjectCacheHandle;
65use crate::services::projects::project::{ProjectInfo, ProjectState};
66use crate::services::upstream::{
67 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
68};
69use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
70use crate::utils;
71use crate::{http, processing};
72use relay_threading::AsyncPool;
73use symbolic_unreal::{Unreal4Error, Unreal4ErrorKind};
74#[cfg(feature = "processing")]
75use {
76 crate::services::objectstore::Objectstore,
77 crate::services::store::Store,
78 itertools::Itertools,
79 relay_dynamic_config::GlobalConfig,
80 relay_quotas::{Quota, RateLimitingError, RedisRateLimiter},
81 relay_redis::RedisClients,
82 std::time::Instant,
83};
84
85pub mod event;
86mod metrics;
87mod nel;
88pub mod span;
89
90#[cfg(all(sentry, feature = "processing"))]
91pub mod playstation;
92
93pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
95
96#[derive(Debug)]
97pub struct GroupTypeError;
98
99impl Display for GroupTypeError {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 f.write_str("failed to convert processing group into corresponding type")
102 }
103}
104
105impl std::error::Error for GroupTypeError {}
106
107macro_rules! processing_group {
108 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
109 #[derive(Clone, Copy, Debug)]
110 pub struct $ty;
111
112 impl From<$ty> for ProcessingGroup {
113 fn from(_: $ty) -> Self {
114 ProcessingGroup::$variant
115 }
116 }
117
118 impl TryFrom<ProcessingGroup> for $ty {
119 type Error = GroupTypeError;
120
121 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
122 if matches!(value, ProcessingGroup::$variant) {
123 return Ok($ty);
124 }
125 $($(
126 if matches!(value, ProcessingGroup::$other) {
127 return Ok($ty);
128 }
129 )+)?
130 return Err(GroupTypeError);
131 }
132 }
133 };
134}
135
136processing_group!(TransactionGroup, Transaction);
137
138processing_group!(ErrorGroup, Error);
139
140processing_group!(SessionGroup, Session);
141processing_group!(ClientReportGroup, ClientReport);
142processing_group!(ReplayGroup, Replay);
143processing_group!(CheckInGroup, CheckIn);
144processing_group!(LogGroup, Log, Nel);
145processing_group!(TraceMetricGroup, TraceMetric);
146processing_group!(SpanGroup, Span);
147
148processing_group!(ProfileChunkGroup, ProfileChunk);
149processing_group!(MetricsGroup, Metrics);
150processing_group!(ForwardUnknownGroup, ForwardUnknown);
151processing_group!(Ungrouped, Ungrouped);
152
153#[derive(Clone, Copy, Debug)]
157pub struct Processed;
158
159#[derive(Clone, Copy, Debug)]
161pub enum ProcessingGroup {
162 Transaction,
166 Error,
171 Session,
173 StandaloneAttachments,
177 StandaloneUserReports,
181 StandaloneProfiles,
185 ClientReport,
187 Replay,
189 CheckIn,
191 Nel,
193 Log,
195 TraceMetric,
197 Span,
199 SpanV2,
201 Metrics,
203 ProfileChunk,
205 TraceAttachment,
207 ForwardUnknown,
210 Ungrouped,
212}
213
214impl ProcessingGroup {
215 fn split_envelope(
217 mut envelope: Envelope,
218 project_info: &ProjectInfo,
219 ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
220 let headers = envelope.headers().clone();
221 let mut grouped_envelopes = smallvec![];
222
223 let replay_items = envelope.take_items_by(|item| {
225 matches!(
226 item.ty(),
227 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
228 )
229 });
230 if !replay_items.is_empty() {
231 grouped_envelopes.push((
232 ProcessingGroup::Replay,
233 Envelope::from_parts(headers.clone(), replay_items),
234 ))
235 }
236
237 let session_items = envelope
239 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
240 if !session_items.is_empty() {
241 grouped_envelopes.push((
242 ProcessingGroup::Session,
243 Envelope::from_parts(headers.clone(), session_items),
244 ))
245 }
246
247 let span_v2_items = envelope.take_items_by(|item| {
248 let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
249
250 ItemContainer::<SpanV2>::is_container(item)
251 || matches!(item.integration(), Some(Integration::Spans(_)))
252 || (exp_feature && matches!(item.ty(), &ItemType::Span))
254 || (exp_feature && item.is_span_attachment())
255 });
256
257 if !span_v2_items.is_empty() {
258 grouped_envelopes.push((
259 ProcessingGroup::SpanV2,
260 Envelope::from_parts(headers.clone(), span_v2_items),
261 ))
262 }
263
264 let span_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Span));
266 if !span_items.is_empty() {
267 grouped_envelopes.push((
268 ProcessingGroup::Span,
269 Envelope::from_parts(headers.clone(), span_items),
270 ))
271 }
272
273 let logs_items = envelope.take_items_by(|item| {
275 matches!(item.ty(), &ItemType::Log)
276 || matches!(item.integration(), Some(Integration::Logs(_)))
277 });
278 if !logs_items.is_empty() {
279 grouped_envelopes.push((
280 ProcessingGroup::Log,
281 Envelope::from_parts(headers.clone(), logs_items),
282 ))
283 }
284
285 let trace_metric_items =
287 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
288 if !trace_metric_items.is_empty() {
289 grouped_envelopes.push((
290 ProcessingGroup::TraceMetric,
291 Envelope::from_parts(headers.clone(), trace_metric_items),
292 ))
293 }
294
295 let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
297 if !nel_items.is_empty() {
298 grouped_envelopes.push((
299 ProcessingGroup::Nel,
300 Envelope::from_parts(headers.clone(), nel_items),
301 ))
302 }
303
304 let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
309 if !metric_items.is_empty() {
310 grouped_envelopes.push((
311 ProcessingGroup::Metrics,
312 Envelope::from_parts(headers.clone(), metric_items),
313 ))
314 }
315
316 let profile_chunk_items =
318 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
319 if !profile_chunk_items.is_empty() {
320 grouped_envelopes.push((
321 ProcessingGroup::ProfileChunk,
322 Envelope::from_parts(headers.clone(), profile_chunk_items),
323 ))
324 }
325
326 let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
327 if !trace_attachment_items.is_empty() {
328 grouped_envelopes.push((
329 ProcessingGroup::TraceAttachment,
330 Envelope::from_parts(headers.clone(), trace_attachment_items),
331 ))
332 }
333
334 if !envelope.items().any(Item::creates_event) {
335 let standalone_attachments = envelope
337 .take_items_by(|i| i.requires_event() && matches!(i.ty(), ItemType::Attachment));
338 if !standalone_attachments.is_empty() {
339 grouped_envelopes.push((
340 ProcessingGroup::StandaloneAttachments,
341 Envelope::from_parts(headers.clone(), standalone_attachments),
342 ))
343 }
344
345 let standalone_user_reports =
347 envelope.take_items_by(|i| matches!(i.ty(), ItemType::UserReport));
348 if !standalone_user_reports.is_empty() {
349 grouped_envelopes.push((
350 ProcessingGroup::StandaloneUserReports,
351 Envelope::from_parts(headers.clone(), standalone_user_reports),
352 ));
353 }
354
355 let standalone_profiles =
357 envelope.take_items_by(|i| matches!(i.ty(), ItemType::Profile));
358 if !standalone_profiles.is_empty() {
359 grouped_envelopes.push((
360 ProcessingGroup::StandaloneProfiles,
361 Envelope::from_parts(headers.clone(), standalone_profiles),
362 ));
363 }
364 }
365
366 let security_reports_items = envelope
368 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
369 .into_iter()
370 .map(|item| {
371 let headers = headers.clone();
372 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
373 let mut envelope = Envelope::from_parts(headers, items);
374 envelope.set_event_id(EventId::new());
375 (ProcessingGroup::Error, envelope)
376 });
377 grouped_envelopes.extend(security_reports_items);
378
379 let require_event_items = envelope.take_items_by(Item::requires_event);
381 if !require_event_items.is_empty() {
382 let group = if require_event_items
383 .iter()
384 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
385 {
386 ProcessingGroup::Transaction
387 } else {
388 ProcessingGroup::Error
389 };
390
391 grouped_envelopes.push((
392 group,
393 Envelope::from_parts(headers.clone(), require_event_items),
394 ))
395 }
396
397 let envelopes = envelope.items_mut().map(|item| {
399 let headers = headers.clone();
400 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
401 let envelope = Envelope::from_parts(headers, items);
402 let item_type = item.ty();
403 let group = if matches!(item_type, &ItemType::CheckIn) {
404 ProcessingGroup::CheckIn
405 } else if matches!(item.ty(), &ItemType::ClientReport) {
406 ProcessingGroup::ClientReport
407 } else if matches!(item_type, &ItemType::Unknown(_)) {
408 ProcessingGroup::ForwardUnknown
409 } else {
410 ProcessingGroup::Ungrouped
412 };
413
414 (group, envelope)
415 });
416 grouped_envelopes.extend(envelopes);
417
418 grouped_envelopes
419 }
420
421 pub fn variant(&self) -> &'static str {
423 match self {
424 ProcessingGroup::Transaction => "transaction",
425 ProcessingGroup::Error => "error",
426 ProcessingGroup::Session => "session",
427 ProcessingGroup::StandaloneAttachments => "standalone_attachment",
428 ProcessingGroup::StandaloneUserReports => "standalone_user_reports",
429 ProcessingGroup::StandaloneProfiles => "standalone_profiles",
430 ProcessingGroup::ClientReport => "client_report",
431 ProcessingGroup::Replay => "replay",
432 ProcessingGroup::CheckIn => "check_in",
433 ProcessingGroup::Log => "log",
434 ProcessingGroup::TraceMetric => "trace_metric",
435 ProcessingGroup::Nel => "nel",
436 ProcessingGroup::Span => "span",
437 ProcessingGroup::SpanV2 => "span_v2",
438 ProcessingGroup::Metrics => "metrics",
439 ProcessingGroup::ProfileChunk => "profile_chunk",
440 ProcessingGroup::TraceAttachment => "trace_attachment",
441 ProcessingGroup::ForwardUnknown => "forward_unknown",
442 ProcessingGroup::Ungrouped => "ungrouped",
443 }
444 }
445}
446
447impl From<ProcessingGroup> for AppFeature {
448 fn from(value: ProcessingGroup) -> Self {
449 match value {
450 ProcessingGroup::Transaction => AppFeature::Transactions,
451 ProcessingGroup::Error => AppFeature::Errors,
452 ProcessingGroup::Session => AppFeature::Sessions,
453 ProcessingGroup::StandaloneAttachments => AppFeature::UnattributedEnvelope,
454 ProcessingGroup::StandaloneUserReports => AppFeature::UserReports,
455 ProcessingGroup::StandaloneProfiles => AppFeature::Profiles,
456 ProcessingGroup::ClientReport => AppFeature::ClientReports,
457 ProcessingGroup::Replay => AppFeature::Replays,
458 ProcessingGroup::CheckIn => AppFeature::CheckIns,
459 ProcessingGroup::Log => AppFeature::Logs,
460 ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
461 ProcessingGroup::Nel => AppFeature::Logs,
462 ProcessingGroup::Span => AppFeature::Spans,
463 ProcessingGroup::SpanV2 => AppFeature::Spans,
464 ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
465 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
466 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
467 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
468 ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
469 }
470 }
471}
472
473#[derive(Debug, thiserror::Error)]
475pub enum ProcessingError {
476 #[error("invalid json in event")]
477 InvalidJson(#[source] serde_json::Error),
478
479 #[error("invalid message pack event payload")]
480 InvalidMsgpack(#[from] rmp_serde::decode::Error),
481
482 #[error("invalid unreal crash report")]
483 InvalidUnrealReport(#[source] Unreal4Error),
484
485 #[error("event payload too large")]
486 PayloadTooLarge(DiscardItemType),
487
488 #[error("invalid transaction event")]
489 InvalidTransaction,
490
491 #[error("the item is not allowed/supported in this envelope")]
492 UnsupportedItem,
493
494 #[error("envelope processor failed")]
495 ProcessingFailed(#[from] ProcessingAction),
496
497 #[error("duplicate {0} in event")]
498 DuplicateItem(ItemType),
499
500 #[error("failed to extract event payload")]
501 NoEventPayload,
502
503 #[error("missing project id in DSN")]
504 MissingProjectId,
505
506 #[error("invalid security report type: {0:?}")]
507 InvalidSecurityType(Bytes),
508
509 #[error("unsupported security report type")]
510 UnsupportedSecurityType,
511
512 #[error("invalid security report")]
513 InvalidSecurityReport(#[source] serde_json::Error),
514
515 #[error("invalid nel report")]
516 InvalidNelReport(#[source] NetworkReportError),
517
518 #[error("event filtered with reason: {0:?}")]
519 EventFiltered(FilterStatKey),
520
521 #[error("could not serialize event payload")]
522 SerializeFailed(#[source] serde_json::Error),
523
524 #[cfg(feature = "processing")]
525 #[error("failed to apply quotas")]
526 QuotasFailed(#[from] RateLimitingError),
527
528 #[error("invalid processing group type")]
529 InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
530
531 #[error("nintendo switch dying message processing failed {0:?}")]
532 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
533
534 #[cfg(all(sentry, feature = "processing"))]
535 #[error("playstation dump processing failed: {0}")]
536 InvalidPlaystationDump(String),
537
538 #[error("processing group does not match specific processor")]
539 ProcessingGroupMismatch,
540 #[error("new processing pipeline failed")]
541 ProcessingFailure,
542
543 #[cfg(feature = "processing")]
544 #[error("invalid attachment reference")]
545 InvalidAttachmentRef,
546
547 #[error("could not determine processing group for envelope items")]
548 NoProcessingGroup,
549}
550
551impl ProcessingError {
552 pub fn to_outcome(&self) -> Option<Outcome> {
553 match self {
554 Self::PayloadTooLarge(payload_type) => {
555 Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
556 }
557 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
558 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
559 Self::InvalidSecurityType(_) => {
560 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
561 }
562 Self::UnsupportedItem => Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)),
563 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
564 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
565 Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
566 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
567 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
568 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
569 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
570 #[cfg(all(sentry, feature = "processing"))]
571 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
572 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
573 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
574 }
575 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
576 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
577 Some(Outcome::Invalid(DiscardReason::Internal))
578 }
579 #[cfg(feature = "processing")]
580 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
581 Self::MissingProjectId => None,
582 Self::EventFiltered(key) => Some(Outcome::Filtered(key.clone())),
583 Self::InvalidProcessingGroup(_) => None,
584
585 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
586 Self::ProcessingFailure => None,
588 #[cfg(feature = "processing")]
589 Self::InvalidAttachmentRef => {
590 Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
591 }
592 Self::NoProcessingGroup => Some(Outcome::Invalid(DiscardReason::Internal)),
593 }
594 }
595
596 fn is_unexpected(&self) -> bool {
597 self.to_outcome()
598 .is_some_and(|outcome| outcome.is_unexpected())
599 }
600}
601
602impl From<Unreal4Error> for ProcessingError {
603 fn from(err: Unreal4Error) -> Self {
604 match err.kind() {
605 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
606 _ => ProcessingError::InvalidUnrealReport(err),
607 }
608 }
609}
610
611impl From<InvalidProcessingGroupType> for ProcessingError {
612 fn from(value: InvalidProcessingGroupType) -> Self {
613 Self::InvalidProcessingGroup(Box::new(value))
614 }
615}
616
617type ExtractedEvent = (Annotated<Event>, usize);
618
619#[derive(Debug)]
624pub struct ProcessingExtractedMetrics {
625 metrics: ExtractedMetrics,
626}
627
628impl ProcessingExtractedMetrics {
629 pub fn new() -> Self {
630 Self {
631 metrics: ExtractedMetrics::default(),
632 }
633 }
634
635 pub fn into_inner(self) -> ExtractedMetrics {
636 self.metrics
637 }
638
639 pub fn extend(
641 &mut self,
642 extracted: ExtractedMetrics,
643 sampling_decision: Option<SamplingDecision>,
644 ) {
645 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
646 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
647 }
648
649 pub fn extend_project_metrics<I>(
651 &mut self,
652 buckets: I,
653 sampling_decision: Option<SamplingDecision>,
654 ) where
655 I: IntoIterator<Item = Bucket>,
656 {
657 self.metrics
658 .project_metrics
659 .extend(buckets.into_iter().map(|mut bucket| {
660 bucket.metadata.extracted_from_indexed =
661 sampling_decision == Some(SamplingDecision::Keep);
662 bucket
663 }));
664 }
665
666 pub fn extend_sampling_metrics<I>(
668 &mut self,
669 buckets: I,
670 sampling_decision: Option<SamplingDecision>,
671 ) where
672 I: IntoIterator<Item = Bucket>,
673 {
674 self.metrics
675 .sampling_metrics
676 .extend(buckets.into_iter().map(|mut bucket| {
677 bucket.metadata.extracted_from_indexed =
678 sampling_decision == Some(SamplingDecision::Keep);
679 bucket
680 }));
681 }
682}
683
684fn send_metrics(
685 metrics: ExtractedMetrics,
686 project_key: ProjectKey,
687 sampling_key: Option<ProjectKey>,
688 aggregator: &Addr<Aggregator>,
689) {
690 let ExtractedMetrics {
691 project_metrics,
692 sampling_metrics,
693 } = metrics;
694
695 if !project_metrics.is_empty() {
696 aggregator.send(MergeBuckets {
697 project_key,
698 buckets: project_metrics,
699 });
700 }
701
702 if !sampling_metrics.is_empty() {
703 let sampling_project_key = sampling_key.unwrap_or(project_key);
710 aggregator.send(MergeBuckets {
711 project_key: sampling_project_key,
712 buckets: sampling_metrics,
713 });
714 }
715}
716
717#[derive(Debug)]
720#[expect(
721 clippy::large_enum_variant,
722 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
723)]
724enum ProcessingResult {
725 Envelope {
726 managed_envelope: TypedEnvelope<Processed>,
727 extracted_metrics: ProcessingExtractedMetrics,
728 },
729 Output(Output<Outputs>),
730}
731
732impl ProcessingResult {
733 fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
735 Self::Envelope {
736 managed_envelope,
737 extracted_metrics: ProcessingExtractedMetrics::new(),
738 }
739 }
740}
741
742#[derive(Debug)]
744#[expect(
745 clippy::large_enum_variant,
746 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
747)]
748enum Submit<'a> {
749 Envelope(TypedEnvelope<Processed>),
751 Output {
753 output: Outputs,
754 ctx: processing::ForwardContext<'a>,
755 },
756}
757
758#[derive(Debug)]
768pub struct ProcessEnvelope {
769 pub envelope: ManagedEnvelope,
771 pub project_info: Arc<ProjectInfo>,
773 pub rate_limits: Arc<RateLimits>,
775 pub sampling_project_info: Option<Arc<ProjectInfo>>,
777 pub reservoir_counters: ReservoirCounters,
779}
780
781#[derive(Debug)]
783struct ProcessEnvelopeGrouped<'a> {
784 pub group: ProcessingGroup,
786 pub envelope: ManagedEnvelope,
788 pub ctx: processing::Context<'a>,
790}
791
792#[derive(Debug)]
804pub struct ProcessMetrics {
805 pub data: MetricData,
807 pub project_key: ProjectKey,
809 pub source: BucketSource,
811 pub received_at: DateTime<Utc>,
813 pub sent_at: Option<DateTime<Utc>>,
816}
817
818#[derive(Debug)]
820pub enum MetricData {
821 Raw(Vec<Item>),
823 Parsed(Vec<Bucket>),
825}
826
827impl MetricData {
828 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
833 let items = match self {
834 Self::Parsed(buckets) => return buckets,
835 Self::Raw(items) => items,
836 };
837
838 let mut buckets = Vec::new();
839 for item in items {
840 let payload = item.payload();
841 if item.ty() == &ItemType::Statsd {
842 for bucket_result in Bucket::parse_all(&payload, timestamp) {
843 match bucket_result {
844 Ok(bucket) => buckets.push(bucket),
845 Err(error) => relay_log::debug!(
846 error = &error as &dyn Error,
847 "failed to parse metric bucket from statsd format",
848 ),
849 }
850 }
851 } else if item.ty() == &ItemType::MetricBuckets {
852 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
853 Ok(parsed_buckets) => {
854 if buckets.is_empty() {
856 buckets = parsed_buckets;
857 } else {
858 buckets.extend(parsed_buckets);
859 }
860 }
861 Err(error) => {
862 relay_log::debug!(
863 error = &error as &dyn Error,
864 "failed to parse metric bucket",
865 );
866 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
867 }
868 }
869 } else {
870 relay_log::error!(
871 "invalid item of type {} passed to ProcessMetrics",
872 item.ty()
873 );
874 }
875 }
876 buckets
877 }
878}
879
880#[derive(Debug)]
881pub struct ProcessBatchedMetrics {
882 pub payload: Bytes,
884 pub source: BucketSource,
886 pub received_at: DateTime<Utc>,
888 pub sent_at: Option<DateTime<Utc>>,
890}
891
892#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
894pub enum BucketSource {
895 Internal,
901 External,
906}
907
908impl BucketSource {
909 pub fn from_meta(meta: &RequestMeta) -> Self {
911 match meta.request_trust() {
912 RequestTrust::Trusted => Self::Internal,
913 RequestTrust::Untrusted => Self::External,
914 }
915 }
916}
917
918#[derive(Debug)]
920pub struct SubmitClientReports {
921 pub client_reports: Vec<ClientReport>,
923 pub scoping: Scoping,
925}
926
927#[derive(Debug)]
929pub enum EnvelopeProcessor {
930 ProcessEnvelope(Box<ProcessEnvelope>),
931 ProcessProjectMetrics(Box<ProcessMetrics>),
932 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
933 FlushBuckets(Box<FlushBuckets>),
934 SubmitClientReports(Box<SubmitClientReports>),
935}
936
937impl EnvelopeProcessor {
938 pub fn variant(&self) -> &'static str {
940 match self {
941 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
942 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
943 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
944 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
945 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
946 }
947 }
948}
949
950impl relay_system::Interface for EnvelopeProcessor {}
951
952impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
953 type Response = relay_system::NoResponse;
954
955 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
956 Self::ProcessEnvelope(Box::new(message))
957 }
958}
959
960impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
961 type Response = NoResponse;
962
963 fn from_message(message: ProcessMetrics, _: ()) -> Self {
964 Self::ProcessProjectMetrics(Box::new(message))
965 }
966}
967
968impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
969 type Response = NoResponse;
970
971 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
972 Self::ProcessBatchedMetrics(Box::new(message))
973 }
974}
975
976impl FromMessage<FlushBuckets> for EnvelopeProcessor {
977 type Response = NoResponse;
978
979 fn from_message(message: FlushBuckets, _: ()) -> Self {
980 Self::FlushBuckets(Box::new(message))
981 }
982}
983
984impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
985 type Response = NoResponse;
986
987 fn from_message(message: SubmitClientReports, _: ()) -> Self {
988 Self::SubmitClientReports(Box::new(message))
989 }
990}
991
992pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
994
995#[derive(Clone)]
999pub struct EnvelopeProcessorService {
1000 inner: Arc<InnerProcessor>,
1001}
1002
1003pub struct Addrs {
1005 pub outcome_aggregator: Addr<TrackOutcome>,
1006 pub upstream_relay: Addr<UpstreamRelay>,
1007 #[cfg(feature = "processing")]
1008 pub objectstore: Option<Addr<Objectstore>>,
1009 #[cfg(feature = "processing")]
1010 pub store_forwarder: Option<Addr<Store>>,
1011 pub aggregator: Addr<Aggregator>,
1012}
1013
1014impl Default for Addrs {
1015 fn default() -> Self {
1016 Addrs {
1017 outcome_aggregator: Addr::dummy(),
1018 upstream_relay: Addr::dummy(),
1019 #[cfg(feature = "processing")]
1020 objectstore: None,
1021 #[cfg(feature = "processing")]
1022 store_forwarder: None,
1023 aggregator: Addr::dummy(),
1024 }
1025 }
1026}
1027
1028struct InnerProcessor {
1029 pool: EnvelopeProcessorServicePool,
1030 config: Arc<Config>,
1031 global_config: GlobalConfigHandle,
1032 project_cache: ProjectCacheHandle,
1033 cogs: Cogs,
1034 addrs: Addrs,
1035 #[cfg(feature = "processing")]
1036 rate_limiter: Option<Arc<RedisRateLimiter>>,
1037 metric_outcomes: MetricOutcomes,
1038 processing: Processing,
1039}
1040
1041struct Processing {
1042 errors: ErrorsProcessor,
1043 logs: LogsProcessor,
1044 trace_metrics: TraceMetricsProcessor,
1045 spans: SpansProcessor,
1046 legacy_spans: LegacySpansProcessor,
1047 check_ins: CheckInsProcessor,
1048 sessions: SessionsProcessor,
1049 transactions: TransactionProcessor,
1050 profile_chunks: ProfileChunksProcessor,
1051 trace_attachments: TraceAttachmentsProcessor,
1052 replays: ReplaysProcessor,
1053 client_reports: ClientReportsProcessor,
1054 attachments: AttachmentProcessor,
1055 user_reports: UserReportsProcessor,
1056 profiles: ProfilesProcessor,
1057}
1058
1059impl EnvelopeProcessorService {
1060 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1062 pub fn new(
1063 pool: EnvelopeProcessorServicePool,
1064 config: Arc<Config>,
1065 global_config: GlobalConfigHandle,
1066 project_cache: ProjectCacheHandle,
1067 cogs: Cogs,
1068 #[cfg(feature = "processing")] redis: Option<RedisClients>,
1069 addrs: Addrs,
1070 metric_outcomes: MetricOutcomes,
1071 ) -> Self {
1072 let geoip_lookup = config
1073 .geoip_path()
1074 .and_then(
1075 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1076 Ok(geoip) => Some(geoip),
1077 Err(err) => {
1078 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1079 None
1080 }
1081 },
1082 )
1083 .unwrap_or_else(GeoIpLookup::empty);
1084
1085 #[cfg(feature = "processing")]
1086 let rate_limiter = redis.as_ref().map(|redis| {
1087 RedisRateLimiter::new(redis.quotas.clone())
1088 .max_limit(config.max_rate_limit())
1089 .cache(config.quota_cache_ratio(), config.quota_cache_max())
1090 });
1091
1092 let quota_limiter = Arc::new(QuotaRateLimiter::new(
1093 #[cfg(feature = "processing")]
1094 project_cache.clone(),
1095 #[cfg(feature = "processing")]
1096 rate_limiter.clone(),
1097 ));
1098 #[cfg(feature = "processing")]
1099 let rate_limiter = rate_limiter.map(Arc::new);
1100 let outcome_aggregator = addrs.outcome_aggregator.clone();
1101 let inner = InnerProcessor {
1102 pool,
1103 global_config,
1104 project_cache,
1105 cogs,
1106 #[cfg(feature = "processing")]
1107 rate_limiter,
1108 addrs,
1109 metric_outcomes,
1110 processing: Processing {
1111 errors: ErrorsProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1112 logs: LogsProcessor::new(Arc::clone("a_limiter)),
1113 trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)),
1114 spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1115 legacy_spans: LegacySpansProcessor::new(
1116 Arc::clone("a_limiter),
1117 geoip_lookup.clone(),
1118 ),
1119 check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)),
1120 sessions: SessionsProcessor::new(Arc::clone("a_limiter)),
1121 transactions: TransactionProcessor::new(
1122 Arc::clone("a_limiter),
1123 geoip_lookup.clone(),
1124 #[cfg(feature = "processing")]
1125 redis.map(|r| r.quotas),
1126 #[cfg(not(feature = "processing"))]
1127 None,
1128 ),
1129 profile_chunks: ProfileChunksProcessor::new(Arc::clone("a_limiter)),
1130 trace_attachments: TraceAttachmentsProcessor::new(Arc::clone("a_limiter)),
1131 replays: ReplaysProcessor::new(Arc::clone("a_limiter), geoip_lookup),
1132 client_reports: ClientReportsProcessor::new(outcome_aggregator),
1133 attachments: AttachmentProcessor::new(Arc::clone("a_limiter)),
1134 user_reports: UserReportsProcessor::new(Arc::clone("a_limiter)),
1135 profiles: ProfilesProcessor::new(quota_limiter),
1136 },
1137 config,
1138 };
1139
1140 Self {
1141 inner: Arc::new(inner),
1142 }
1143 }
1144
1145 async fn process_nel(
1146 &self,
1147 mut managed_envelope: ManagedEnvelope,
1148 ctx: processing::Context<'_>,
1149 ) -> Result<ProcessingResult, ProcessingError> {
1150 nel::convert_to_logs(&mut managed_envelope);
1151 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1152 .await
1153 }
1154
1155 async fn process_with_processor<P: processing::Processor>(
1156 &self,
1157 processor: &P,
1158 mut managed_envelope: ManagedEnvelope,
1159 ctx: processing::Context<'_>,
1160 ) -> Result<ProcessingResult, ProcessingError>
1161 where
1162 Outputs: From<P::Output>,
1163 {
1164 let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1165 debug_assert!(
1166 false,
1167 "there must be work for the {} processor",
1168 std::any::type_name::<P>(),
1169 );
1170 return Err(ProcessingError::ProcessingGroupMismatch);
1171 };
1172
1173 managed_envelope.update();
1174 match managed_envelope.envelope().is_empty() {
1175 true => managed_envelope.accept(),
1176 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1177 }
1178
1179 processor
1180 .process(work, ctx)
1181 .await
1182 .map_err(|err| {
1183 relay_log::debug!(
1184 error = &err as &dyn std::error::Error,
1185 "processing pipeline failed"
1186 );
1187 ProcessingError::ProcessingFailure
1188 })
1189 .map(|o| o.map(Into::into))
1190 .map(ProcessingResult::Output)
1191 }
1192
1193 async fn process_envelope(
1194 &self,
1195 project_id: ProjectId,
1196 message: ProcessEnvelopeGrouped<'_>,
1197 ) -> Result<ProcessingResult, ProcessingError> {
1198 let ProcessEnvelopeGrouped {
1199 group,
1200 envelope: mut managed_envelope,
1201 ctx,
1202 } = message;
1203
1204 if let Some(sampling_state) = ctx.sampling_project_info {
1206 managed_envelope
1209 .envelope_mut()
1210 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1211 }
1212
1213 if let Some(retention) = ctx.project_info.config.event_retention {
1216 managed_envelope.envelope_mut().set_retention(retention);
1217 }
1218
1219 if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1222 managed_envelope
1223 .envelope_mut()
1224 .set_downsampled_retention(retention);
1225 }
1226
1227 managed_envelope
1232 .envelope_mut()
1233 .meta_mut()
1234 .set_project_id(project_id);
1235
1236 relay_log::trace!("Processing {group} group", group = group.variant());
1237
1238 match group {
1239 ProcessingGroup::Error => {
1240 self.process_with_processor(&self.inner.processing.errors, managed_envelope, ctx)
1241 .await
1242 }
1243 ProcessingGroup::Transaction => {
1244 self.process_with_processor(
1245 &self.inner.processing.transactions,
1246 managed_envelope,
1247 ctx,
1248 )
1249 .await
1250 }
1251 ProcessingGroup::Session => {
1252 self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1253 .await
1254 }
1255 ProcessingGroup::StandaloneAttachments => {
1256 self.process_with_processor(
1257 &self.inner.processing.attachments,
1258 managed_envelope,
1259 ctx,
1260 )
1261 .await
1262 }
1263 ProcessingGroup::StandaloneUserReports => {
1264 self.process_with_processor(
1265 &self.inner.processing.user_reports,
1266 managed_envelope,
1267 ctx,
1268 )
1269 .await
1270 }
1271 ProcessingGroup::StandaloneProfiles => {
1272 self.process_with_processor(&self.inner.processing.profiles, managed_envelope, ctx)
1273 .await
1274 }
1275 ProcessingGroup::ClientReport => {
1276 self.process_with_processor(
1277 &self.inner.processing.client_reports,
1278 managed_envelope,
1279 ctx,
1280 )
1281 .await
1282 }
1283 ProcessingGroup::Replay => {
1284 self.process_with_processor(&self.inner.processing.replays, managed_envelope, ctx)
1285 .await
1286 }
1287 ProcessingGroup::CheckIn => {
1288 self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1289 .await
1290 }
1291 ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1292 ProcessingGroup::Log => {
1293 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1294 .await
1295 }
1296 ProcessingGroup::TraceMetric => {
1297 self.process_with_processor(
1298 &self.inner.processing.trace_metrics,
1299 managed_envelope,
1300 ctx,
1301 )
1302 .await
1303 }
1304 ProcessingGroup::SpanV2 => {
1305 self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1306 .await
1307 }
1308 ProcessingGroup::TraceAttachment => {
1309 self.process_with_processor(
1310 &self.inner.processing.trace_attachments,
1311 managed_envelope,
1312 ctx,
1313 )
1314 .await
1315 }
1316 ProcessingGroup::Span => {
1317 self.process_with_processor(
1318 &self.inner.processing.legacy_spans,
1319 managed_envelope,
1320 ctx,
1321 )
1322 .await
1323 }
1324 ProcessingGroup::ProfileChunk => {
1325 self.process_with_processor(
1326 &self.inner.processing.profile_chunks,
1327 managed_envelope,
1328 ctx,
1329 )
1330 .await
1331 }
1332 ProcessingGroup::Metrics => {
1334 if self.inner.config.relay_mode() != RelayMode::Proxy {
1337 relay_log::error!(
1338 tags.project = %project_id,
1339 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1340 "received metrics in the process_state"
1341 );
1342 }
1343
1344 Ok(ProcessingResult::no_metrics(
1345 managed_envelope.into_processed(),
1346 ))
1347 }
1348 ProcessingGroup::Ungrouped => {
1350 relay_log::error!(
1351 tags.project = %project_id,
1352 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1353 "could not identify the processing group based on the envelope's items"
1354 );
1355
1356 Err(ProcessingError::NoProcessingGroup)
1357 }
1358 ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
1362 managed_envelope.into_processed(),
1363 )),
1364 }
1365 }
1366
1367 async fn process<'a>(
1373 &self,
1374 mut message: ProcessEnvelopeGrouped<'a>,
1375 ) -> Result<Option<Submit<'a>>, ProcessingError> {
1376 let ProcessEnvelopeGrouped {
1377 ref mut envelope,
1378 ctx,
1379 ..
1380 } = message;
1381
1382 let Some(project_id) = ctx
1389 .project_info
1390 .project_id
1391 .or_else(|| envelope.envelope().meta().project_id())
1392 else {
1393 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1394 return Err(ProcessingError::MissingProjectId);
1395 };
1396
1397 let client = envelope.envelope().meta().client().map(str::to_owned);
1398 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1399 let project_key = envelope.envelope().meta().public_key();
1400 let sampling_key = envelope
1404 .envelope()
1405 .sampling_key()
1406 .filter(|_| ctx.sampling_project_info.is_some());
1407
1408 relay_log::configure_scope(|scope| {
1411 scope.set_tag("project", project_id);
1412 if let Some(client) = client {
1413 scope.set_tag("sdk", client);
1414 }
1415 if let Some(user_agent) = user_agent {
1416 scope.set_extra("user_agent", user_agent.into());
1417 }
1418 });
1419
1420 let result = match self.process_envelope(project_id, message).await {
1421 Ok(ProcessingResult::Envelope {
1422 mut managed_envelope,
1423 extracted_metrics,
1424 }) => {
1425 managed_envelope.update();
1428
1429 let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
1430 send_metrics(
1431 extracted_metrics.metrics,
1432 project_key,
1433 sampling_key,
1434 &self.inner.addrs.aggregator,
1435 );
1436
1437 let envelope_response = if managed_envelope.envelope().is_empty() {
1438 if !has_metrics {
1439 managed_envelope.reject(Outcome::RateLimited(None));
1441 } else {
1442 managed_envelope.accept();
1443 }
1444
1445 None
1446 } else {
1447 Some(managed_envelope)
1448 };
1449
1450 Ok(envelope_response.map(Submit::Envelope))
1451 }
1452 Ok(ProcessingResult::Output(Output { main, metrics })) => {
1453 if let Some(metrics) = metrics {
1454 metrics.accept(|metrics| {
1455 send_metrics(
1456 metrics,
1457 project_key,
1458 sampling_key,
1459 &self.inner.addrs.aggregator,
1460 );
1461 });
1462 }
1463
1464 let ctx = ctx.to_forward();
1465 Ok(main.map(|output| Submit::Output { output, ctx }))
1466 }
1467 Err(err) => Err(err),
1468 };
1469
1470 relay_log::configure_scope(|scope| {
1471 scope.remove_tag("project");
1472 scope.remove_tag("sdk");
1473 scope.remove_tag("user_agent");
1474 });
1475
1476 result
1477 }
1478
1479 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1480 let project_key = message.envelope.envelope().meta().public_key();
1481 let wait_time = message.envelope.age();
1482 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1483
1484 cogs.cancel();
1487
1488 let scoping = message.envelope.scoping();
1489 for (group, envelope) in ProcessingGroup::split_envelope(
1490 *message.envelope.into_envelope(),
1491 &message.project_info,
1492 ) {
1493 let mut cogs = self
1494 .inner
1495 .cogs
1496 .timed(ResourceId::Relay, AppFeature::from(group));
1497
1498 let mut envelope =
1499 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1500 envelope.scope(scoping);
1501
1502 let global_config = self.inner.global_config.current();
1503
1504 let ctx = processing::Context {
1505 config: &self.inner.config,
1506 global_config: &global_config,
1507 project_info: &message.project_info,
1508 sampling_project_info: message.sampling_project_info.as_deref(),
1509 rate_limits: &message.rate_limits,
1510 reservoir_counters: &message.reservoir_counters,
1511 };
1512
1513 let message = ProcessEnvelopeGrouped {
1514 group,
1515 envelope,
1516 ctx,
1517 };
1518
1519 let result = metric!(
1520 timer(RelayTimers::EnvelopeProcessingTime),
1521 group = group.variant(),
1522 { self.process(message).await }
1523 );
1524
1525 match result {
1526 Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
1527 Ok(None) => {}
1528 Err(error) if error.is_unexpected() => {
1529 relay_log::error!(
1530 tags.project_key = %project_key,
1531 error = &error as &dyn Error,
1532 "error processing envelope"
1533 )
1534 }
1535 Err(error) => {
1536 relay_log::debug!(
1537 tags.project_key = %project_key,
1538 error = &error as &dyn Error,
1539 "error processing envelope"
1540 )
1541 }
1542 }
1543 }
1544 }
1545
1546 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1547 let ProcessMetrics {
1548 data,
1549 project_key,
1550 received_at,
1551 sent_at,
1552 source,
1553 } = message;
1554
1555 let received_timestamp =
1556 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1557
1558 let mut buckets = data.into_buckets(received_timestamp);
1559 if buckets.is_empty() {
1560 return;
1561 };
1562 cogs.update(relay_metrics::cogs::BySize(&buckets));
1563
1564 let clock_drift_processor =
1565 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
1566
1567 buckets.retain_mut(|bucket| {
1568 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
1569 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
1570 return false;
1571 }
1572
1573 if !self::metrics::is_valid_namespace(bucket) {
1574 return false;
1575 }
1576
1577 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
1578
1579 if !matches!(source, BucketSource::Internal) {
1580 bucket.metadata = BucketMetadata::new(received_timestamp);
1581 }
1582
1583 true
1584 });
1585
1586 let project = self.inner.project_cache.get(project_key);
1587
1588 let buckets = match project.state() {
1591 ProjectState::Enabled(project_info) => {
1592 let rate_limits = project.rate_limits().current_limits();
1593 self.check_buckets(project_key, project_info, &rate_limits, buckets)
1594 }
1595 _ => buckets,
1596 };
1597
1598 relay_log::trace!("merging metric buckets into the aggregator");
1599 self.inner
1600 .addrs
1601 .aggregator
1602 .send(MergeBuckets::new(project_key, buckets));
1603 }
1604
1605 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
1606 let ProcessBatchedMetrics {
1607 payload,
1608 source,
1609 received_at,
1610 sent_at,
1611 } = message;
1612
1613 #[derive(serde::Deserialize)]
1614 struct Wrapper {
1615 buckets: HashMap<ProjectKey, Vec<Bucket>>,
1616 }
1617
1618 let buckets = match serde_json::from_slice(&payload) {
1619 Ok(Wrapper { buckets }) => buckets,
1620 Err(error) => {
1621 relay_log::debug!(
1622 error = &error as &dyn Error,
1623 "failed to parse batched metrics",
1624 );
1625 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1626 return;
1627 }
1628 };
1629
1630 for (project_key, buckets) in buckets {
1631 self.handle_process_metrics(
1632 cogs,
1633 ProcessMetrics {
1634 data: MetricData::Parsed(buckets),
1635 project_key,
1636 source,
1637 received_at,
1638 sent_at,
1639 },
1640 )
1641 }
1642 }
1643
1644 fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
1645 let _submit = cogs.start_category("submit");
1646
1647 #[cfg(feature = "processing")]
1648 if self.inner.config.processing_enabled()
1649 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
1650 {
1651 use crate::processing::StoreHandle;
1652
1653 let objectstore = self.inner.addrs.objectstore.as_ref();
1654 let global_config = &self.inner.global_config.current();
1655 let handle = StoreHandle::new(store_forwarder, objectstore, global_config);
1656
1657 match submit {
1658 Submit::Envelope(envelope) => handle.send_envelope(envelope.into_inner()),
1664 Submit::Output { output, ctx } => output
1665 .forward_store(handle, ctx)
1666 .unwrap_or_else(|err| err.into_inner()),
1667 }
1668 return;
1669 }
1670
1671 let mut envelope = match submit {
1672 Submit::Envelope(envelope) => envelope,
1673 Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
1674 Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
1675 Err(_) => {
1676 relay_log::error!("failed to serialize output to an envelope");
1677 return;
1678 }
1679 },
1680 };
1681
1682 if envelope.envelope_mut().is_empty() {
1683 envelope.accept();
1684 return;
1685 }
1686
1687 envelope.envelope_mut().set_sent_at(Utc::now());
1693
1694 relay_log::trace!("sending envelope to sentry endpoint");
1695 let http_encoding = self.inner.config.http_encoding();
1696 let result = envelope.envelope().to_vec().and_then(|v| {
1697 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
1698 });
1699
1700 match result {
1701 Ok(body) => {
1702 self.inner
1703 .addrs
1704 .upstream_relay
1705 .send(SendRequest(SendEnvelope {
1706 envelope,
1707 body,
1708 http_encoding,
1709 project_cache: self.inner.project_cache.clone(),
1710 }));
1711 }
1712 Err(error) => {
1713 relay_log::error!(
1716 error = &error as &dyn Error,
1717 tags.project_key = %envelope.scoping().project_key,
1718 "failed to serialize envelope payload"
1719 );
1720
1721 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1722 }
1723 }
1724 }
1725
1726 fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
1727 let SubmitClientReports {
1728 client_reports,
1729 scoping,
1730 } = message;
1731
1732 let upstream = self.inner.config.upstream();
1733 let dsn = PartialDsn::outbound(&scoping, upstream);
1734
1735 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
1736 for client_report in client_reports {
1737 match client_report.serialize() {
1738 Ok(payload) => {
1739 let mut item = Item::new(ItemType::ClientReport);
1740 item.set_payload(ContentType::Json, payload);
1741 envelope.add_item(item);
1742 }
1743 Err(error) => {
1744 relay_log::error!(
1745 error = &error as &dyn std::error::Error,
1746 "failed to serialize client report"
1747 );
1748 }
1749 }
1750 }
1751
1752 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1753 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
1754 }
1755
1756 fn check_buckets(
1757 &self,
1758 project_key: ProjectKey,
1759 project_info: &ProjectInfo,
1760 rate_limits: &RateLimits,
1761 buckets: Vec<Bucket>,
1762 ) -> Vec<Bucket> {
1763 let Some(scoping) = project_info.scoping(project_key) else {
1764 relay_log::error!(
1765 tags.project_key = project_key.as_str(),
1766 "there is no scoping: dropping {} buckets",
1767 buckets.len(),
1768 );
1769 return Vec::new();
1770 };
1771
1772 let mut buckets = self::metrics::apply_project_info(
1773 buckets,
1774 &self.inner.metric_outcomes,
1775 project_info,
1776 scoping,
1777 );
1778
1779 let namespaces: BTreeSet<MetricNamespace> = buckets
1780 .iter()
1781 .filter_map(|bucket| bucket.name.try_namespace())
1782 .collect();
1783
1784 for namespace in namespaces {
1785 let limits = rate_limits.check_with_quotas(
1786 project_info.get_quotas(),
1787 scoping.item(DataCategory::MetricBucket),
1788 );
1789
1790 if limits.is_limited() {
1791 let rejected;
1792 (buckets, rejected) = utils::split_off(buckets, |bucket| {
1793 bucket.name.try_namespace() == Some(namespace)
1794 });
1795
1796 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1797 self.inner.metric_outcomes.track(
1798 scoping,
1799 &rejected,
1800 Outcome::RateLimited(reason_code),
1801 );
1802 }
1803 }
1804
1805 let quotas = project_info.config.quotas.clone();
1806 match MetricsLimiter::create(buckets, quotas, scoping) {
1807 Ok(mut bucket_limiter) => {
1808 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
1809 bucket_limiter.into_buckets()
1810 }
1811 Err(buckets) => buckets,
1812 }
1813 }
1814
1815 #[cfg(feature = "processing")]
1816 async fn rate_limit_buckets(
1817 &self,
1818 scoping: Scoping,
1819 project_info: &ProjectInfo,
1820 mut buckets: Vec<Bucket>,
1821 ) -> Vec<Bucket> {
1822 let Some(rate_limiter) = &self.inner.rate_limiter else {
1823 return buckets;
1824 };
1825
1826 let global_config = self.inner.global_config.current();
1827 let namespaces = buckets
1828 .iter()
1829 .filter_map(|bucket| bucket.name.try_namespace())
1830 .counts();
1831
1832 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
1833
1834 for (namespace, quantity) in namespaces {
1835 let item_scoping = scoping.metric_bucket(namespace);
1836
1837 let limits = match rate_limiter
1838 .is_rate_limited(quotas, item_scoping, quantity, false)
1839 .await
1840 {
1841 Ok(limits) => limits,
1842 Err(err) => {
1843 relay_log::error!(
1844 error = &err as &dyn std::error::Error,
1845 "failed to check redis rate limits"
1846 );
1847 break;
1848 }
1849 };
1850
1851 if limits.is_limited() {
1852 let rejected;
1853 (buckets, rejected) = utils::split_off(buckets, |bucket| {
1854 bucket.name.try_namespace() == Some(namespace)
1855 });
1856
1857 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1858 self.inner.metric_outcomes.track(
1859 scoping,
1860 &rejected,
1861 Outcome::RateLimited(reason_code),
1862 );
1863
1864 self.inner
1865 .project_cache
1866 .get(item_scoping.scoping.project_key)
1867 .rate_limits()
1868 .merge(limits);
1869 }
1870 }
1871
1872 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
1873 Err(buckets) => buckets,
1874 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
1875 }
1876 }
1877
1878 #[cfg(feature = "processing")]
1880 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
1881 relay_log::trace!("handle_rate_limit_buckets");
1882
1883 let scoping = *bucket_limiter.scoping();
1884
1885 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
1886 let global_config = self.inner.global_config.current();
1887 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
1888
1889 let over_accept_once = true;
1892 let mut rate_limits = RateLimits::new();
1893
1894 let (category, count) = bucket_limiter.count();
1895
1896 let timer = Instant::now();
1897 let mut is_limited = false;
1898
1899 if let Some(count) = count {
1900 match rate_limiter
1901 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
1902 .await
1903 {
1904 Ok(limits) => {
1905 is_limited = limits.is_limited();
1906 rate_limits.merge(limits)
1907 }
1908 Err(e) => {
1909 relay_log::error!(error = &e as &dyn Error, "rate limiting error")
1910 }
1911 }
1912 }
1913
1914 relay_statsd::metric!(
1915 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
1916 category = category.name(),
1917 limited = if is_limited { "true" } else { "false" },
1918 count = match count {
1919 None => "none",
1920 Some(0) => "0",
1921 Some(1) => "1",
1922 Some(1..=10) => "10",
1923 Some(1..=25) => "25",
1924 Some(1..=50) => "50",
1925 Some(51..=100) => "100",
1926 Some(101..=500) => "500",
1927 _ => "> 500",
1928 },
1929 );
1930
1931 if rate_limits.is_limited() {
1932 let was_enforced =
1933 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
1934
1935 if was_enforced {
1936 self.inner
1938 .project_cache
1939 .get(scoping.project_key)
1940 .rate_limits()
1941 .merge(rate_limits);
1942 }
1943 }
1944 }
1945
1946 bucket_limiter.into_buckets()
1947 }
1948
1949 #[cfg(feature = "processing")]
1955 async fn encode_metrics_processing(
1956 &self,
1957 message: FlushBuckets,
1958 store_forwarder: &Addr<Store>,
1959 ) {
1960 use crate::constants::DEFAULT_EVENT_RETENTION;
1961 use crate::services::store::StoreMetrics;
1962
1963 for ProjectBuckets {
1964 buckets,
1965 scoping,
1966 project_info,
1967 ..
1968 } in message.buckets.into_values()
1969 {
1970 let buckets = self
1971 .rate_limit_buckets(scoping, &project_info, buckets)
1972 .await;
1973
1974 if buckets.is_empty() {
1975 continue;
1976 }
1977
1978 let retention = project_info
1979 .config
1980 .event_retention
1981 .unwrap_or(DEFAULT_EVENT_RETENTION);
1982
1983 store_forwarder.send(StoreMetrics {
1986 buckets,
1987 scoping,
1988 retention,
1989 });
1990 }
1991 }
1992
1993 fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2004 let FlushBuckets {
2005 partition_key,
2006 buckets,
2007 } = message;
2008
2009 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2010 let upstream = self.inner.config.upstream();
2011
2012 for ProjectBuckets {
2013 buckets, scoping, ..
2014 } in buckets.values()
2015 {
2016 let dsn = PartialDsn::outbound(scoping, upstream);
2017
2018 relay_statsd::metric!(
2019 distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2020 );
2021
2022 let mut num_batches = 0;
2023 for batch in BucketsView::from(buckets).by_size(batch_size) {
2024 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2025
2026 let mut item = Item::new(ItemType::MetricBuckets);
2027 item.set_source_quantities(crate::metrics::extract_quantities(batch));
2028 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2029 envelope.add_item(item);
2030
2031 let mut envelope =
2032 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2033 envelope
2034 .set_partition_key(Some(partition_key))
2035 .scope(*scoping);
2036
2037 relay_statsd::metric!(
2038 distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2039 );
2040
2041 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2042 num_batches += 1;
2043 }
2044
2045 relay_statsd::metric!(
2046 distribution(RelayDistributions::BatchesPerPartition) = num_batches
2047 );
2048 }
2049 }
2050
2051 fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2053 if partition.is_empty() {
2054 return;
2055 }
2056
2057 let (unencoded, project_info) = partition.take();
2058 let http_encoding = self.inner.config.http_encoding();
2059 let encoded = match encode_payload(&unencoded, http_encoding) {
2060 Ok(payload) => payload,
2061 Err(error) => {
2062 let error = &error as &dyn std::error::Error;
2063 relay_log::error!(error, "failed to encode metrics payload");
2064 return;
2065 }
2066 };
2067
2068 let request = SendMetricsRequest {
2069 partition_key: partition_key.to_string(),
2070 unencoded,
2071 encoded,
2072 project_info,
2073 http_encoding,
2074 metric_outcomes: self.inner.metric_outcomes.clone(),
2075 };
2076
2077 self.inner.addrs.upstream_relay.send(SendRequest(request));
2078 }
2079
2080 fn encode_metrics_global(&self, message: FlushBuckets) {
2091 let FlushBuckets {
2092 partition_key,
2093 buckets,
2094 } = message;
2095
2096 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2097 let mut partition = Partition::new(batch_size);
2098 let mut partition_splits = 0;
2099
2100 for ProjectBuckets {
2101 buckets, scoping, ..
2102 } in buckets.values()
2103 {
2104 for bucket in buckets {
2105 let mut remaining = Some(BucketView::new(bucket));
2106
2107 while let Some(bucket) = remaining.take() {
2108 if let Some(next) = partition.insert(bucket, *scoping) {
2109 self.send_global_partition(partition_key, &mut partition);
2113 remaining = Some(next);
2114 partition_splits += 1;
2115 }
2116 }
2117 }
2118 }
2119
2120 if partition_splits > 0 {
2121 metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2122 }
2123
2124 self.send_global_partition(partition_key, &mut partition);
2125 }
2126
2127 async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2128 for (project_key, pb) in message.buckets.iter_mut() {
2129 let buckets = std::mem::take(&mut pb.buckets);
2130 pb.buckets =
2131 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2132 }
2133
2134 #[cfg(feature = "processing")]
2135 if self.inner.config.processing_enabled()
2136 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2137 {
2138 return self
2139 .encode_metrics_processing(message, store_forwarder)
2140 .await;
2141 }
2142
2143 if self.inner.config.http_global_metrics() {
2144 self.encode_metrics_global(message)
2145 } else {
2146 self.encode_metrics_envelope(cogs, message)
2147 }
2148 }
2149
2150 #[cfg(all(test, feature = "processing"))]
2151 fn redis_rate_limiter_enabled(&self) -> bool {
2152 self.inner.rate_limiter.is_some()
2153 }
2154
2155 async fn handle_message(&self, message: EnvelopeProcessor) {
2156 let ty = message.variant();
2157 let feature_weights = self.feature_weights(&message);
2158
2159 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2160 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2161
2162 match message {
2163 EnvelopeProcessor::ProcessEnvelope(m) => {
2164 self.handle_process_envelope(&mut cogs, *m).await
2165 }
2166 EnvelopeProcessor::ProcessProjectMetrics(m) => {
2167 self.handle_process_metrics(&mut cogs, *m)
2168 }
2169 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2170 self.handle_process_batched_metrics(&mut cogs, *m)
2171 }
2172 EnvelopeProcessor::FlushBuckets(m) => {
2173 self.handle_flush_buckets(&mut cogs, *m).await
2174 }
2175 EnvelopeProcessor::SubmitClientReports(m) => {
2176 self.handle_submit_client_reports(&mut cogs, *m)
2177 }
2178 }
2179 });
2180 }
2181
2182 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2183 match message {
2184 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2186 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2187 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2188 EnvelopeProcessor::FlushBuckets(v) => v
2189 .buckets
2190 .values()
2191 .map(|s| {
2192 if self.inner.config.processing_enabled() {
2193 relay_metrics::cogs::ByCount(&s.buckets).into()
2196 } else {
2197 relay_metrics::cogs::BySize(&s.buckets).into()
2198 }
2199 })
2200 .fold(FeatureWeights::none(), FeatureWeights::merge),
2201 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2202 }
2203 }
2204}
2205
2206impl Service for EnvelopeProcessorService {
2207 type Interface = EnvelopeProcessor;
2208
2209 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2210 while let Some(message) = rx.recv().await {
2211 let service = self.clone();
2212 self.inner
2213 .pool
2214 .spawn_async(
2215 async move {
2216 service.handle_message(message).await;
2217 }
2218 .boxed(),
2219 )
2220 .await;
2221 }
2222 }
2223}
2224
2225pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2226 let envelope_body: Vec<u8> = match http_encoding {
2227 HttpEncoding::Identity => return Ok(body.clone()),
2228 HttpEncoding::Deflate => {
2229 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2230 encoder.write_all(body.as_ref())?;
2231 encoder.finish()?
2232 }
2233 HttpEncoding::Gzip => {
2234 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2235 encoder.write_all(body.as_ref())?;
2236 encoder.finish()?
2237 }
2238 HttpEncoding::Br => {
2239 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2241 encoder.write_all(body.as_ref())?;
2242 encoder.into_inner()
2243 }
2244 HttpEncoding::Zstd => {
2245 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2248 encoder.write_all(body.as_ref())?;
2249 encoder.finish()?
2250 }
2251 };
2252
2253 Ok(envelope_body.into())
2254}
2255
2256#[derive(Debug)]
2258pub struct SendEnvelope {
2259 pub envelope: TypedEnvelope<Processed>,
2260 pub body: Bytes,
2261 pub http_encoding: HttpEncoding,
2262 pub project_cache: ProjectCacheHandle,
2263}
2264
2265impl UpstreamRequest for SendEnvelope {
2266 fn method(&self) -> reqwest::Method {
2267 reqwest::Method::POST
2268 }
2269
2270 fn path(&self) -> Cow<'_, str> {
2271 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2272 }
2273
2274 fn route(&self) -> &'static str {
2275 "envelope"
2276 }
2277
2278 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2279 let envelope_body = self.body.clone();
2280 metric!(
2281 distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2282 );
2283
2284 let meta = &self.envelope.meta();
2285 let shard = self.envelope.partition_key().map(|p| p.to_string());
2286 builder
2287 .content_encoding(self.http_encoding)
2288 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2289 .header_opt("User-Agent", meta.user_agent())
2290 .header("X-Sentry-Auth", meta.auth_header())
2291 .header("X-Forwarded-For", meta.forwarded_for())
2292 .header("Content-Type", envelope::CONTENT_TYPE)
2293 .header_opt("X-Sentry-Relay-Shard", shard)
2294 .body(envelope_body);
2295
2296 Ok(())
2297 }
2298
2299 fn sign(&mut self) -> Option<Sign> {
2300 Some(Sign::Optional(SignatureType::RequestSign))
2301 }
2302
2303 fn respond(
2304 self: Box<Self>,
2305 result: Result<http::Response, UpstreamRequestError>,
2306 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2307 Box::pin(async move {
2308 let result = match result {
2309 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2310 Err(error) => Err(error),
2311 };
2312
2313 match result {
2314 Ok(()) => self.envelope.accept(),
2315 Err(error) if error.is_received() => {
2316 let scoping = self.envelope.scoping();
2317 self.envelope.accept();
2318
2319 if let UpstreamRequestError::RateLimited(limits) = error {
2320 self.project_cache
2321 .get(scoping.project_key)
2322 .rate_limits()
2323 .merge(limits.scope(&scoping));
2324 }
2325 }
2326 Err(error) => {
2327 let mut envelope = self.envelope;
2330 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2331 relay_log::error!(
2332 error = &error as &dyn Error,
2333 tags.project_key = %envelope.scoping().project_key,
2334 "error sending envelope"
2335 );
2336 }
2337 }
2338 })
2339 }
2340}
2341
2342#[derive(Debug)]
2349struct Partition<'a> {
2350 max_size: usize,
2351 remaining: usize,
2352 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2353 project_info: HashMap<ProjectKey, Scoping>,
2354}
2355
2356impl<'a> Partition<'a> {
2357 pub fn new(size: usize) -> Self {
2359 Self {
2360 max_size: size,
2361 remaining: size,
2362 views: HashMap::new(),
2363 project_info: HashMap::new(),
2364 }
2365 }
2366
2367 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
2378 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
2379
2380 if let Some(current) = current {
2381 self.remaining = self.remaining.saturating_sub(current.estimated_size());
2382 self.views
2383 .entry(scoping.project_key)
2384 .or_default()
2385 .push(current);
2386
2387 self.project_info
2388 .entry(scoping.project_key)
2389 .or_insert(scoping);
2390 }
2391
2392 next
2393 }
2394
2395 fn is_empty(&self) -> bool {
2397 self.views.is_empty()
2398 }
2399
2400 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
2404 #[derive(serde::Serialize)]
2405 struct Wrapper<'a> {
2406 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
2407 }
2408
2409 let buckets = &self.views;
2410 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
2411
2412 let scopings = self.project_info.clone();
2413 self.project_info.clear();
2414
2415 self.views.clear();
2416 self.remaining = self.max_size;
2417
2418 (payload, scopings)
2419 }
2420}
2421
2422#[derive(Debug)]
2426struct SendMetricsRequest {
2427 partition_key: String,
2429 unencoded: Bytes,
2431 encoded: Bytes,
2433 project_info: HashMap<ProjectKey, Scoping>,
2437 http_encoding: HttpEncoding,
2439 metric_outcomes: MetricOutcomes,
2441}
2442
2443impl SendMetricsRequest {
2444 fn create_error_outcomes(self) {
2445 #[derive(serde::Deserialize)]
2446 struct Wrapper {
2447 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
2448 }
2449
2450 let buckets = match serde_json::from_slice(&self.unencoded) {
2451 Ok(Wrapper { buckets }) => buckets,
2452 Err(err) => {
2453 relay_log::error!(
2454 error = &err as &dyn std::error::Error,
2455 "failed to parse buckets from failed transmission"
2456 );
2457 return;
2458 }
2459 };
2460
2461 for (key, buckets) in buckets {
2462 let Some(&scoping) = self.project_info.get(&key) else {
2463 relay_log::error!("missing scoping for project key");
2464 continue;
2465 };
2466
2467 self.metric_outcomes.track(
2468 scoping,
2469 &buckets,
2470 Outcome::Invalid(DiscardReason::Internal),
2471 );
2472 }
2473 }
2474}
2475
2476impl UpstreamRequest for SendMetricsRequest {
2477 fn set_relay_id(&self) -> bool {
2478 true
2479 }
2480
2481 fn sign(&mut self) -> Option<Sign> {
2482 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
2483 }
2484
2485 fn method(&self) -> reqwest::Method {
2486 reqwest::Method::POST
2487 }
2488
2489 fn path(&self) -> Cow<'_, str> {
2490 "/api/0/relays/metrics/".into()
2491 }
2492
2493 fn route(&self) -> &'static str {
2494 "global_metrics"
2495 }
2496
2497 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2498 metric!(
2499 distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
2500 );
2501
2502 builder
2503 .content_encoding(self.http_encoding)
2504 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
2505 .header(header::CONTENT_TYPE, b"application/json")
2506 .body(self.encoded.clone());
2507
2508 Ok(())
2509 }
2510
2511 fn respond(
2512 self: Box<Self>,
2513 result: Result<http::Response, UpstreamRequestError>,
2514 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2515 Box::pin(async {
2516 match result {
2517 Ok(mut response) => {
2518 response.consume().await.ok();
2519 }
2520 Err(error) => {
2521 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
2522
2523 if error.is_received() {
2526 return;
2527 }
2528
2529 self.create_error_outcomes()
2530 }
2531 }
2532 })
2533 }
2534}
2535
2536#[derive(Copy, Clone, Debug)]
2538#[cfg(feature = "processing")]
2539struct CombinedQuotas<'a> {
2540 global_quotas: &'a [Quota],
2541 project_quotas: &'a [Quota],
2542}
2543
2544#[cfg(feature = "processing")]
2545impl<'a> CombinedQuotas<'a> {
2546 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
2548 Self {
2549 global_quotas: &global_config.quotas,
2550 project_quotas,
2551 }
2552 }
2553}
2554
2555#[cfg(feature = "processing")]
2556impl<'a> IntoIterator for CombinedQuotas<'a> {
2557 type Item = &'a Quota;
2558 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
2559
2560 fn into_iter(self) -> Self::IntoIter {
2561 self.global_quotas.iter().chain(self.project_quotas.iter())
2562 }
2563}
2564
2565#[cfg(test)]
2566mod tests {
2567 use insta::assert_debug_snapshot;
2568 use relay_common::glob2::LazyGlob;
2569 use relay_dynamic_config::ProjectConfig;
2570 use relay_event_normalization::{
2571 NormalizationConfig, RedactionRule, TransactionNameConfig, TransactionNameRule,
2572 };
2573 use relay_event_schema::protocol::TransactionSource;
2574 use relay_pii::DataScrubbingConfig;
2575 use similar_asserts::assert_eq;
2576
2577 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
2578
2579 #[cfg(feature = "processing")]
2580 use {
2581 relay_metrics::BucketValue,
2582 relay_quotas::{QuotaScope, ReasonCode},
2583 relay_test::mock_service,
2584 };
2585
2586 use super::*;
2587
2588 #[cfg(feature = "processing")]
2589 fn mock_quota(id: &str) -> Quota {
2590 Quota {
2591 id: Some(id.into()),
2592 categories: [DataCategory::MetricBucket].into(),
2593 scope: QuotaScope::Organization,
2594 scope_id: None,
2595 limit: Some(0),
2596 window: None,
2597 reason_code: None,
2598 namespace: None,
2599 }
2600 }
2601
2602 #[cfg(feature = "processing")]
2603 #[test]
2604 fn test_dynamic_quotas() {
2605 let global_config = relay_dynamic_config::GlobalConfig {
2606 quotas: vec![mock_quota("foo"), mock_quota("bar")],
2607 ..Default::default()
2608 };
2609
2610 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
2611
2612 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
2613
2614 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
2615 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
2616 }
2617
2618 #[cfg(feature = "processing")]
2621 #[tokio::test]
2622 async fn test_ratelimit_per_batch() {
2623 use relay_base_schema::organization::OrganizationId;
2624 use relay_protocol::FiniteF64;
2625
2626 let rate_limited_org = Scoping {
2627 organization_id: OrganizationId::new(1),
2628 project_id: ProjectId::new(21),
2629 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
2630 key_id: Some(17),
2631 };
2632
2633 let not_rate_limited_org = Scoping {
2634 organization_id: OrganizationId::new(2),
2635 project_id: ProjectId::new(21),
2636 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
2637 key_id: Some(17),
2638 };
2639
2640 let message = {
2641 let project_info = {
2642 let quota = Quota {
2643 id: Some("testing".into()),
2644 categories: [DataCategory::MetricBucket].into(),
2645 scope: relay_quotas::QuotaScope::Organization,
2646 scope_id: Some(rate_limited_org.organization_id.to_string().into()),
2647 limit: Some(0),
2648 window: None,
2649 reason_code: Some(ReasonCode::new("test")),
2650 namespace: None,
2651 };
2652
2653 let mut config = ProjectConfig::default();
2654 config.quotas.push(quota);
2655
2656 Arc::new(ProjectInfo {
2657 config,
2658 ..Default::default()
2659 })
2660 };
2661
2662 let project_metrics = |scoping| ProjectBuckets {
2663 buckets: vec![Bucket {
2664 name: "d:spans/bar".into(),
2665 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
2666 timestamp: UnixTimestamp::now(),
2667 tags: Default::default(),
2668 width: 10,
2669 metadata: BucketMetadata::default(),
2670 }],
2671 rate_limits: Default::default(),
2672 project_info: project_info.clone(),
2673 scoping,
2674 };
2675
2676 let buckets = hashbrown::HashMap::from([
2677 (
2678 rate_limited_org.project_key,
2679 project_metrics(rate_limited_org),
2680 ),
2681 (
2682 not_rate_limited_org.project_key,
2683 project_metrics(not_rate_limited_org),
2684 ),
2685 ]);
2686
2687 FlushBuckets {
2688 partition_key: 0,
2689 buckets,
2690 }
2691 };
2692
2693 assert_eq!(message.buckets.keys().count(), 2);
2695
2696 let config = {
2697 let config_json = serde_json::json!({
2698 "processing": {
2699 "enabled": true,
2700 "kafka_config": [],
2701 "redis": {
2702 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
2703 }
2704 }
2705 });
2706 Config::from_json_value(config_json).unwrap()
2707 };
2708
2709 let (store, handle) = {
2710 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
2711 let org_id = match msg {
2712 Store::Metrics(x) => x.scoping.organization_id,
2713 _ => panic!("received envelope when expecting only metrics"),
2714 };
2715 org_ids.push(org_id);
2716 };
2717
2718 mock_service("store_forwarder", vec![], f)
2719 };
2720
2721 let processor = create_test_processor(config).await;
2722 assert!(processor.redis_rate_limiter_enabled());
2723
2724 processor.encode_metrics_processing(message, &store).await;
2725
2726 drop(store);
2727 let orgs_not_ratelimited = handle.await.unwrap();
2728
2729 assert_eq!(
2730 orgs_not_ratelimited,
2731 vec![not_rate_limited_org.organization_id]
2732 );
2733 }
2734
2735 #[tokio::test]
2736 async fn test_browser_version_extraction_with_pii_like_data() {
2737 let processor = create_test_processor(Default::default()).await;
2738 let outcome_aggregator = Addr::dummy();
2739 let event_id = EventId::new();
2740
2741 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2742 .parse()
2743 .unwrap();
2744
2745 let request_meta = RequestMeta::new(dsn);
2746 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
2747
2748 envelope.add_item({
2749 let mut item = Item::new(ItemType::Event);
2750 item.set_payload(
2751 ContentType::Json,
2752 r#"
2753 {
2754 "request": {
2755 "headers": [
2756 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
2757 ]
2758 }
2759 }
2760 "#,
2761 );
2762 item
2763 });
2764
2765 let mut datascrubbing_settings = DataScrubbingConfig::default();
2766 datascrubbing_settings.scrub_data = true;
2768 datascrubbing_settings.scrub_defaults = true;
2769 datascrubbing_settings.scrub_ip_addresses = true;
2770
2771 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
2773
2774 let config = ProjectConfig {
2775 datascrubbing_settings,
2776 pii_config: Some(pii_config),
2777 ..Default::default()
2778 };
2779
2780 let project_info = ProjectInfo {
2781 config,
2782 ..Default::default()
2783 };
2784
2785 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
2786 assert_eq!(envelopes.len(), 1);
2787
2788 let (group, envelope) = envelopes.pop().unwrap();
2789 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2790
2791 let message = ProcessEnvelopeGrouped {
2792 group,
2793 envelope,
2794 ctx: processing::Context {
2795 project_info: &project_info,
2796 ..processing::Context::for_test()
2797 },
2798 };
2799
2800 let Ok(Some(Submit::Output { output, ctx })) = processor.process(message).await else {
2801 panic!();
2802 };
2803 let new_envelope = output.serialize_envelope(ctx).unwrap().accept(|f| f);
2804
2805 let event_item = new_envelope.items().last().unwrap();
2806 let annotated_event: Annotated<Event> =
2807 Annotated::from_json_bytes(&event_item.payload()).unwrap();
2808 let event = annotated_event.into_value().unwrap();
2809 let headers = event
2810 .request
2811 .into_value()
2812 .unwrap()
2813 .headers
2814 .into_value()
2815 .unwrap();
2816
2817 assert_eq!(
2819 Some(
2820 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
2821 ),
2822 headers.get_header("User-Agent")
2823 );
2824 let contexts = event.contexts.into_value().unwrap();
2826 let browser = contexts.0.get("browser").unwrap();
2827 assert_eq!(
2828 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
2829 browser.to_json().unwrap()
2830 );
2831 }
2832
2833 #[tokio::test]
2834 #[cfg(feature = "processing")]
2835 async fn test_materialize_dsc() {
2836 use crate::services::projects::project::PublicKeyConfig;
2837
2838 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2839 .parse()
2840 .unwrap();
2841 let request_meta = RequestMeta::new(dsn);
2842 let mut envelope = Envelope::from_request(None, request_meta);
2843
2844 let dsc = r#"{
2845 "trace_id": "00000000-0000-0000-0000-000000000000",
2846 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
2847 "sample_rate": "0.2"
2848 }"#;
2849 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
2850
2851 let mut item = Item::new(ItemType::Event);
2852 item.set_payload(ContentType::Json, r#"{}"#);
2853 envelope.add_item(item);
2854
2855 let outcome_aggregator = Addr::dummy();
2856 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2857
2858 let mut project_info = ProjectInfo::default();
2859 project_info.public_keys.push(PublicKeyConfig {
2860 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
2861 numeric_id: Some(1),
2862 });
2863
2864 let config = serde_json::json!({
2865 "processing": {
2866 "enabled": true,
2867 "kafka_config": [],
2868 }
2869 });
2870
2871 let message = ProcessEnvelopeGrouped {
2872 group: ProcessingGroup::Error,
2873 envelope: managed_envelope,
2874 ctx: processing::Context {
2875 config: &Config::from_json_value(config.clone()).unwrap(),
2876 project_info: &project_info,
2877 sampling_project_info: Some(&project_info),
2878 ..processing::Context::for_test()
2879 },
2880 };
2881
2882 let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
2883 let Ok(Some(Submit::Output { output, ctx })) = processor.process(message).await else {
2884 panic!();
2885 };
2886 let envelope = output.serialize_envelope(ctx).unwrap();
2887 let event = envelope
2888 .get_item_by(|item| item.ty() == &ItemType::Event)
2889 .unwrap();
2890
2891 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
2892 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
2893 Object(
2894 {
2895 "environment": ~,
2896 "public_key": String(
2897 "e12d836b15bb49d7bbf99e64295d995b",
2898 ),
2899 "release": ~,
2900 "replay_id": ~,
2901 "sample_rate": String(
2902 "0.2",
2903 ),
2904 "trace_id": String(
2905 "00000000000000000000000000000000",
2906 ),
2907 "transaction": ~,
2908 },
2909 )
2910 "###);
2911 }
2912
2913 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
2914 let mut event = Annotated::<Event>::from_json(
2915 r#"
2916 {
2917 "type": "transaction",
2918 "transaction": "/foo/",
2919 "timestamp": 946684810.0,
2920 "start_timestamp": 946684800.0,
2921 "contexts": {
2922 "trace": {
2923 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
2924 "span_id": "fa90fdead5f74053",
2925 "op": "http.server",
2926 "type": "trace"
2927 }
2928 },
2929 "transaction_info": {
2930 "source": "url"
2931 }
2932 }
2933 "#,
2934 )
2935 .unwrap();
2936 let e = event.value_mut().as_mut().unwrap();
2937 e.transaction.set_value(Some(transaction_name.into()));
2938
2939 e.transaction_info
2940 .value_mut()
2941 .as_mut()
2942 .unwrap()
2943 .source
2944 .set_value(Some(source));
2945
2946 relay_statsd::with_capturing_test_client(|| {
2947 utils::log_transaction_name_metrics(&mut event, |event| {
2948 let config = NormalizationConfig {
2949 transaction_name_config: TransactionNameConfig {
2950 rules: &[TransactionNameRule {
2951 pattern: LazyGlob::new("/foo/*/**".to_owned()),
2952 expiry: DateTime::<Utc>::MAX_UTC,
2953 redaction: RedactionRule::Replace {
2954 substitution: "*".to_owned(),
2955 },
2956 }],
2957 },
2958 ..Default::default()
2959 };
2960 relay_event_normalization::normalize_event(event, &config)
2961 });
2962 })
2963 }
2964
2965 #[test]
2966 fn test_log_transaction_metrics_none() {
2967 let captures = capture_test_event("/nothing", TransactionSource::Url);
2968 insta::assert_debug_snapshot!(captures, @r###"
2969 [
2970 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
2971 ]
2972 "###);
2973 }
2974
2975 #[test]
2976 fn test_log_transaction_metrics_rule() {
2977 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
2978 insta::assert_debug_snapshot!(captures, @r###"
2979 [
2980 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
2981 ]
2982 "###);
2983 }
2984
2985 #[test]
2986 fn test_log_transaction_metrics_pattern() {
2987 let captures = capture_test_event("/something/12345", TransactionSource::Url);
2988 insta::assert_debug_snapshot!(captures, @r###"
2989 [
2990 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
2991 ]
2992 "###);
2993 }
2994
2995 #[test]
2996 fn test_log_transaction_metrics_both() {
2997 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
2998 insta::assert_debug_snapshot!(captures, @r###"
2999 [
3000 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3001 ]
3002 "###);
3003 }
3004
3005 #[test]
3006 fn test_log_transaction_metrics_no_match() {
3007 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3008 insta::assert_debug_snapshot!(captures, @r###"
3009 [
3010 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3011 ]
3012 "###);
3013 }
3014
3015 #[tokio::test]
3016 async fn test_process_metrics_bucket_metadata() {
3017 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3018 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3019 let received_at = Utc::now();
3020 let config = Config::default();
3021
3022 let (aggregator, mut aggregator_rx) = Addr::custom();
3023 let processor = create_test_processor_with_addrs(
3024 config,
3025 Addrs {
3026 aggregator,
3027 ..Default::default()
3028 },
3029 )
3030 .await;
3031
3032 let mut item = Item::new(ItemType::Statsd);
3033 item.set_payload(ContentType::Text, "spans/foo:3182887624:4267882815|s");
3034 for (source, expected_received_at) in [
3035 (
3036 BucketSource::External,
3037 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3038 ),
3039 (BucketSource::Internal, None),
3040 ] {
3041 let message = ProcessMetrics {
3042 data: MetricData::Raw(vec![item.clone()]),
3043 project_key,
3044 source,
3045 received_at,
3046 sent_at: Some(Utc::now()),
3047 };
3048 processor.handle_process_metrics(&mut token, message);
3049
3050 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3051 let buckets = merge_buckets.buckets;
3052 assert_eq!(buckets.len(), 1);
3053 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3054 }
3055 }
3056
3057 #[tokio::test]
3058 async fn test_process_batched_metrics() {
3059 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3060 let received_at = Utc::now();
3061 let config = Config::default();
3062
3063 let (aggregator, mut aggregator_rx) = Addr::custom();
3064 let processor = create_test_processor_with_addrs(
3065 config,
3066 Addrs {
3067 aggregator,
3068 ..Default::default()
3069 },
3070 )
3071 .await;
3072
3073 let payload = r#"{
3074 "buckets": {
3075 "11111111111111111111111111111111": [
3076 {
3077 "timestamp": 1615889440,
3078 "width": 0,
3079 "name": "d:custom/endpoint.response_time@millisecond",
3080 "type": "d",
3081 "value": [
3082 68.0
3083 ],
3084 "tags": {
3085 "route": "user_index"
3086 }
3087 }
3088 ],
3089 "22222222222222222222222222222222": [
3090 {
3091 "timestamp": 1615889440,
3092 "width": 0,
3093 "name": "d:custom/endpoint.cache_rate@none",
3094 "type": "d",
3095 "value": [
3096 36.0
3097 ]
3098 }
3099 ]
3100 }
3101}
3102"#;
3103 let message = ProcessBatchedMetrics {
3104 payload: Bytes::from(payload),
3105 source: BucketSource::Internal,
3106 received_at,
3107 sent_at: Some(Utc::now()),
3108 };
3109 processor.handle_process_batched_metrics(&mut token, message);
3110
3111 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3112 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3113
3114 let mut messages = vec![mb1, mb2];
3115 messages.sort_by_key(|mb| mb.project_key);
3116
3117 let actual = messages
3118 .into_iter()
3119 .map(|mb| (mb.project_key, mb.buckets))
3120 .collect::<Vec<_>>();
3121
3122 assert_debug_snapshot!(actual, @r###"
3123 [
3124 (
3125 ProjectKey("11111111111111111111111111111111"),
3126 [
3127 Bucket {
3128 timestamp: UnixTimestamp(1615889440),
3129 width: 0,
3130 name: MetricName(
3131 "d:custom/endpoint.response_time@millisecond",
3132 ),
3133 value: Distribution(
3134 [
3135 68.0,
3136 ],
3137 ),
3138 tags: {
3139 "route": "user_index",
3140 },
3141 metadata: BucketMetadata {
3142 merges: 1,
3143 received_at: None,
3144 extracted_from_indexed: false,
3145 },
3146 },
3147 ],
3148 ),
3149 (
3150 ProjectKey("22222222222222222222222222222222"),
3151 [
3152 Bucket {
3153 timestamp: UnixTimestamp(1615889440),
3154 width: 0,
3155 name: MetricName(
3156 "d:custom/endpoint.cache_rate@none",
3157 ),
3158 value: Distribution(
3159 [
3160 36.0,
3161 ],
3162 ),
3163 tags: {},
3164 metadata: BucketMetadata {
3165 merges: 1,
3166 received_at: None,
3167 extracted_from_indexed: false,
3168 },
3169 },
3170 ],
3171 ),
3172 ]
3173 "###);
3174 }
3175}