1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27 ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42 self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::{Integration, SpansIntegration};
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::ExtractedMetrics;
49use crate::metrics_extraction::transactions::types::ExtractMetricsError;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::sessions::SessionsProcessor;
53use crate::processing::spans::SpansProcessor;
54use crate::processing::trace_metrics::TraceMetricsProcessor;
55use crate::processing::utils::event::{
56 EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
57 event_type,
58};
59use crate::processing::utils::transaction::ExtractMetricsContext;
60use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
61use crate::service::ServiceError;
62use crate::services::global_config::GlobalConfigHandle;
63use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
64use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
65use crate::services::projects::cache::ProjectCacheHandle;
66use crate::services::projects::project::{ProjectInfo, ProjectState};
67use crate::services::upstream::{
68 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
69};
70use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
71use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
72use crate::{http, processing};
73use relay_threading::AsyncPool;
74#[cfg(feature = "processing")]
75use {
76 crate::services::global_rate_limits::{GlobalRateLimits, GlobalRateLimitsServiceHandle},
77 crate::services::processor::nnswitch::SwitchProcessingError,
78 crate::services::store::{Store, StoreEnvelope},
79 crate::utils::Enforcement,
80 itertools::Itertools,
81 relay_cardinality::{
82 CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
83 RedisSetLimiterOptions,
84 },
85 relay_dynamic_config::CardinalityLimiterMode,
86 relay_quotas::{RateLimitingError, RedisRateLimiter},
87 relay_redis::{AsyncRedisClient, RedisClients},
88 std::time::Instant,
89 symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
90};
91
92mod attachment;
93mod dynamic_sampling;
94mod event;
95mod metrics;
96mod nel;
97mod profile;
98mod profile_chunk;
99mod replay;
100mod report;
101mod span;
102
103#[cfg(all(sentry, feature = "processing"))]
104mod playstation;
105mod standalone;
106#[cfg(feature = "processing")]
107mod unreal;
108
109#[cfg(feature = "processing")]
110mod nnswitch;
111
112macro_rules! if_processing {
116 ($config:expr, $if_true:block) => {
117 #[cfg(feature = "processing")] {
118 if $config.processing_enabled() $if_true
119 }
120 };
121 ($config:expr, $if_true:block else $if_false:block) => {
122 {
123 #[cfg(feature = "processing")] {
124 if $config.processing_enabled() $if_true else $if_false
125 }
126 #[cfg(not(feature = "processing"))] {
127 $if_false
128 }
129 }
130 };
131}
132
133pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
135
136#[derive(Debug)]
137pub struct GroupTypeError;
138
139impl Display for GroupTypeError {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 f.write_str("failed to convert processing group into corresponding type")
142 }
143}
144
145impl std::error::Error for GroupTypeError {}
146
147macro_rules! processing_group {
148 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
149 #[derive(Clone, Copy, Debug)]
150 pub struct $ty;
151
152 impl From<$ty> for ProcessingGroup {
153 fn from(_: $ty) -> Self {
154 ProcessingGroup::$variant
155 }
156 }
157
158 impl TryFrom<ProcessingGroup> for $ty {
159 type Error = GroupTypeError;
160
161 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
162 if matches!(value, ProcessingGroup::$variant) {
163 return Ok($ty);
164 }
165 $($(
166 if matches!(value, ProcessingGroup::$other) {
167 return Ok($ty);
168 }
169 )+)?
170 return Err(GroupTypeError);
171 }
172 }
173 };
174}
175
176pub trait EventProcessing {}
180
181processing_group!(TransactionGroup, Transaction);
182impl EventProcessing for TransactionGroup {}
183
184processing_group!(ErrorGroup, Error);
185impl EventProcessing for ErrorGroup {}
186
187processing_group!(SessionGroup, Session);
188processing_group!(StandaloneGroup, Standalone);
189processing_group!(ClientReportGroup, ClientReport);
190processing_group!(ReplayGroup, Replay);
191processing_group!(CheckInGroup, CheckIn);
192processing_group!(LogGroup, Log, Nel);
193processing_group!(TraceMetricGroup, TraceMetric);
194processing_group!(SpanGroup, Span);
195
196processing_group!(ProfileChunkGroup, ProfileChunk);
197processing_group!(MetricsGroup, Metrics);
198processing_group!(ForwardUnknownGroup, ForwardUnknown);
199processing_group!(Ungrouped, Ungrouped);
200
201#[derive(Clone, Copy, Debug)]
205pub struct Processed;
206
207#[derive(Clone, Copy, Debug)]
209pub enum ProcessingGroup {
210 Transaction,
214 Error,
219 Session,
221 Standalone,
224 ClientReport,
226 Replay,
228 CheckIn,
230 Nel,
232 Log,
234 TraceMetric,
236 Span,
238 SpanV2,
240 Metrics,
242 ProfileChunk,
244 ForwardUnknown,
247 Ungrouped,
249}
250
251impl ProcessingGroup {
252 fn split_envelope(
254 mut envelope: Envelope,
255 project_info: &ProjectInfo,
256 ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
257 let headers = envelope.headers().clone();
258 let mut grouped_envelopes = smallvec![];
259
260 let replay_items = envelope.take_items_by(|item| {
262 matches!(
263 item.ty(),
264 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
265 )
266 });
267 if !replay_items.is_empty() {
268 grouped_envelopes.push((
269 ProcessingGroup::Replay,
270 Envelope::from_parts(headers.clone(), replay_items),
271 ))
272 }
273
274 let session_items = envelope
276 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
277 if !session_items.is_empty() {
278 grouped_envelopes.push((
279 ProcessingGroup::Session,
280 Envelope::from_parts(headers.clone(), session_items),
281 ))
282 }
283
284 let span_v2_items = envelope.take_items_by(|item| {
285 let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
286 let is_supported_integration = matches!(
287 item.integration(),
288 Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
289 );
290 let is_span = matches!(item.ty(), &ItemType::Span);
291
292 ItemContainer::<SpanV2>::is_container(item)
293 || (exp_feature && is_span)
294 || (exp_feature && is_supported_integration)
295 });
296
297 if !span_v2_items.is_empty() {
298 grouped_envelopes.push((
299 ProcessingGroup::SpanV2,
300 Envelope::from_parts(headers.clone(), span_v2_items),
301 ))
302 }
303
304 let span_items = envelope.take_items_by(|item| {
306 matches!(item.ty(), &ItemType::Span)
307 || matches!(item.integration(), Some(Integration::Spans(_)))
308 });
309
310 if !span_items.is_empty() {
311 grouped_envelopes.push((
312 ProcessingGroup::Span,
313 Envelope::from_parts(headers.clone(), span_items),
314 ))
315 }
316
317 let logs_items = envelope.take_items_by(|item| {
319 matches!(item.ty(), &ItemType::Log)
320 || matches!(item.integration(), Some(Integration::Logs(_)))
321 });
322 if !logs_items.is_empty() {
323 grouped_envelopes.push((
324 ProcessingGroup::Log,
325 Envelope::from_parts(headers.clone(), logs_items),
326 ))
327 }
328
329 let trace_metric_items =
331 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
332 if !trace_metric_items.is_empty() {
333 grouped_envelopes.push((
334 ProcessingGroup::TraceMetric,
335 Envelope::from_parts(headers.clone(), trace_metric_items),
336 ))
337 }
338
339 let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
341 if !nel_items.is_empty() {
342 grouped_envelopes.push((
343 ProcessingGroup::Nel,
344 Envelope::from_parts(headers.clone(), nel_items),
345 ))
346 }
347
348 let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
353 if !metric_items.is_empty() {
354 grouped_envelopes.push((
355 ProcessingGroup::Metrics,
356 Envelope::from_parts(headers.clone(), metric_items),
357 ))
358 }
359
360 let profile_chunk_items =
362 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
363 if !profile_chunk_items.is_empty() {
364 grouped_envelopes.push((
365 ProcessingGroup::ProfileChunk,
366 Envelope::from_parts(headers.clone(), profile_chunk_items),
367 ))
368 }
369
370 if !envelope.items().any(Item::creates_event) {
375 let standalone_items = envelope.take_items_by(Item::requires_event);
376 if !standalone_items.is_empty() {
377 grouped_envelopes.push((
378 ProcessingGroup::Standalone,
379 Envelope::from_parts(headers.clone(), standalone_items),
380 ))
381 }
382 };
383
384 let security_reports_items = envelope
386 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
387 .into_iter()
388 .map(|item| {
389 let headers = headers.clone();
390 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
391 let mut envelope = Envelope::from_parts(headers, items);
392 envelope.set_event_id(EventId::new());
393 (ProcessingGroup::Error, envelope)
394 });
395 grouped_envelopes.extend(security_reports_items);
396
397 let require_event_items = envelope.take_items_by(Item::requires_event);
399 if !require_event_items.is_empty() {
400 let group = if require_event_items
401 .iter()
402 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
403 {
404 ProcessingGroup::Transaction
405 } else {
406 ProcessingGroup::Error
407 };
408
409 grouped_envelopes.push((
410 group,
411 Envelope::from_parts(headers.clone(), require_event_items),
412 ))
413 }
414
415 let envelopes = envelope.items_mut().map(|item| {
417 let headers = headers.clone();
418 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
419 let envelope = Envelope::from_parts(headers, items);
420 let item_type = item.ty();
421 let group = if matches!(item_type, &ItemType::CheckIn) {
422 ProcessingGroup::CheckIn
423 } else if matches!(item.ty(), &ItemType::ClientReport) {
424 ProcessingGroup::ClientReport
425 } else if matches!(item_type, &ItemType::Unknown(_)) {
426 ProcessingGroup::ForwardUnknown
427 } else {
428 ProcessingGroup::Ungrouped
430 };
431
432 (group, envelope)
433 });
434 grouped_envelopes.extend(envelopes);
435
436 grouped_envelopes
437 }
438
439 pub fn variant(&self) -> &'static str {
441 match self {
442 ProcessingGroup::Transaction => "transaction",
443 ProcessingGroup::Error => "error",
444 ProcessingGroup::Session => "session",
445 ProcessingGroup::Standalone => "standalone",
446 ProcessingGroup::ClientReport => "client_report",
447 ProcessingGroup::Replay => "replay",
448 ProcessingGroup::CheckIn => "check_in",
449 ProcessingGroup::Log => "log",
450 ProcessingGroup::TraceMetric => "trace_metric",
451 ProcessingGroup::Nel => "nel",
452 ProcessingGroup::Span => "span",
453 ProcessingGroup::SpanV2 => "span_v2",
454 ProcessingGroup::Metrics => "metrics",
455 ProcessingGroup::ProfileChunk => "profile_chunk",
456 ProcessingGroup::ForwardUnknown => "forward_unknown",
457 ProcessingGroup::Ungrouped => "ungrouped",
458 }
459 }
460}
461
462impl From<ProcessingGroup> for AppFeature {
463 fn from(value: ProcessingGroup) -> Self {
464 match value {
465 ProcessingGroup::Transaction => AppFeature::Transactions,
466 ProcessingGroup::Error => AppFeature::Errors,
467 ProcessingGroup::Session => AppFeature::Sessions,
468 ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
469 ProcessingGroup::ClientReport => AppFeature::ClientReports,
470 ProcessingGroup::Replay => AppFeature::Replays,
471 ProcessingGroup::CheckIn => AppFeature::CheckIns,
472 ProcessingGroup::Log => AppFeature::Logs,
473 ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
474 ProcessingGroup::Nel => AppFeature::Logs,
475 ProcessingGroup::Span => AppFeature::Spans,
476 ProcessingGroup::SpanV2 => AppFeature::Spans,
477 ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
478 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
479 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
480 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
481 }
482 }
483}
484
485#[derive(Debug, thiserror::Error)]
487pub enum ProcessingError {
488 #[error("invalid json in event")]
489 InvalidJson(#[source] serde_json::Error),
490
491 #[error("invalid message pack event payload")]
492 InvalidMsgpack(#[from] rmp_serde::decode::Error),
493
494 #[cfg(feature = "processing")]
495 #[error("invalid unreal crash report")]
496 InvalidUnrealReport(#[source] Unreal4Error),
497
498 #[error("event payload too large")]
499 PayloadTooLarge(DiscardItemType),
500
501 #[error("invalid transaction event")]
502 InvalidTransaction,
503
504 #[error("envelope processor failed")]
505 ProcessingFailed(#[from] ProcessingAction),
506
507 #[error("duplicate {0} in event")]
508 DuplicateItem(ItemType),
509
510 #[error("failed to extract event payload")]
511 NoEventPayload,
512
513 #[error("missing project id in DSN")]
514 MissingProjectId,
515
516 #[error("invalid security report type: {0:?}")]
517 InvalidSecurityType(Bytes),
518
519 #[error("unsupported security report type")]
520 UnsupportedSecurityType,
521
522 #[error("invalid security report")]
523 InvalidSecurityReport(#[source] serde_json::Error),
524
525 #[error("invalid nel report")]
526 InvalidNelReport(#[source] NetworkReportError),
527
528 #[error("event filtered with reason: {0:?}")]
529 EventFiltered(FilterStatKey),
530
531 #[error("missing or invalid required event timestamp")]
532 InvalidTimestamp,
533
534 #[error("could not serialize event payload")]
535 SerializeFailed(#[source] serde_json::Error),
536
537 #[cfg(feature = "processing")]
538 #[error("failed to apply quotas")]
539 QuotasFailed(#[from] RateLimitingError),
540
541 #[error("invalid pii config")]
542 PiiConfigError(PiiConfigError),
543
544 #[error("invalid processing group type")]
545 InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
546
547 #[error("invalid replay")]
548 InvalidReplay(DiscardReason),
549
550 #[error("replay filtered with reason: {0:?}")]
551 ReplayFiltered(FilterStatKey),
552
553 #[cfg(feature = "processing")]
554 #[error("nintendo switch dying message processing failed {0:?}")]
555 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
556
557 #[cfg(all(sentry, feature = "processing"))]
558 #[error("playstation dump processing failed: {0}")]
559 InvalidPlaystationDump(String),
560
561 #[error("processing group does not match specific processor")]
562 ProcessingGroupMismatch,
563 #[error("new processing pipeline failed")]
564 ProcessingFailure,
565}
566
567impl ProcessingError {
568 pub fn to_outcome(&self) -> Option<Outcome> {
569 match self {
570 Self::PayloadTooLarge(payload_type) => {
571 Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
572 }
573 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
574 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
575 Self::InvalidSecurityType(_) => {
576 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
577 }
578 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
579 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
580 Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
581 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
582 Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
583 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
584 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
585 #[cfg(feature = "processing")]
586 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
587 #[cfg(all(sentry, feature = "processing"))]
588 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
589 #[cfg(feature = "processing")]
590 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
591 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
592 }
593 #[cfg(feature = "processing")]
594 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
595 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
596 Some(Outcome::Invalid(DiscardReason::Internal))
597 }
598 #[cfg(feature = "processing")]
599 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
600 Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
601 Self::MissingProjectId => None,
602 Self::EventFiltered(_) => None,
603 Self::InvalidProcessingGroup(_) => None,
604 Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
605 Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
606
607 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
608 Self::ProcessingFailure => None,
610 }
611 }
612
613 fn is_unexpected(&self) -> bool {
614 self.to_outcome()
615 .is_some_and(|outcome| outcome.is_unexpected())
616 }
617}
618
619#[cfg(feature = "processing")]
620impl From<Unreal4Error> for ProcessingError {
621 fn from(err: Unreal4Error) -> Self {
622 match err.kind() {
623 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
624 _ => ProcessingError::InvalidUnrealReport(err),
625 }
626 }
627}
628
629impl From<ExtractMetricsError> for ProcessingError {
630 fn from(error: ExtractMetricsError) -> Self {
631 match error {
632 ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
633 Self::InvalidTimestamp
634 }
635 }
636 }
637}
638
639impl From<InvalidProcessingGroupType> for ProcessingError {
640 fn from(value: InvalidProcessingGroupType) -> Self {
641 Self::InvalidProcessingGroup(Box::new(value))
642 }
643}
644
645type ExtractedEvent = (Annotated<Event>, usize);
646
647#[derive(Debug)]
652pub struct ProcessingExtractedMetrics {
653 metrics: ExtractedMetrics,
654}
655
656impl ProcessingExtractedMetrics {
657 pub fn new() -> Self {
658 Self {
659 metrics: ExtractedMetrics::default(),
660 }
661 }
662
663 pub fn extend(
665 &mut self,
666 extracted: ExtractedMetrics,
667 sampling_decision: Option<SamplingDecision>,
668 ) {
669 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
670 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
671 }
672
673 pub fn extend_project_metrics<I>(
675 &mut self,
676 buckets: I,
677 sampling_decision: Option<SamplingDecision>,
678 ) where
679 I: IntoIterator<Item = Bucket>,
680 {
681 self.metrics
682 .project_metrics
683 .extend(buckets.into_iter().map(|mut bucket| {
684 bucket.metadata.extracted_from_indexed =
685 sampling_decision == Some(SamplingDecision::Keep);
686 bucket
687 }));
688 }
689
690 pub fn extend_sampling_metrics<I>(
692 &mut self,
693 buckets: I,
694 sampling_decision: Option<SamplingDecision>,
695 ) where
696 I: IntoIterator<Item = Bucket>,
697 {
698 self.metrics
699 .sampling_metrics
700 .extend(buckets.into_iter().map(|mut bucket| {
701 bucket.metadata.extracted_from_indexed =
702 sampling_decision == Some(SamplingDecision::Keep);
703 bucket
704 }));
705 }
706
707 #[cfg(feature = "processing")]
712 fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
713 let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
715 let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
718
719 for (namespace, limit, indexed) in [
720 (
721 MetricNamespace::Transactions,
722 &enforcement.event,
723 &enforcement.event_indexed,
724 ),
725 (
726 MetricNamespace::Spans,
727 &enforcement.spans,
728 &enforcement.spans_indexed,
729 ),
730 ] {
731 if limit.is_active() {
732 drop_namespaces.push(namespace);
733 } else if indexed.is_active() && !enforced_consistently {
734 reset_extracted_from_indexed.push(namespace);
739 }
740 }
741
742 if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
743 self.retain_mut(|bucket| {
744 let Some(namespace) = bucket.name.try_namespace() else {
745 return true;
746 };
747
748 if drop_namespaces.contains(&namespace) {
749 return false;
750 }
751
752 if reset_extracted_from_indexed.contains(&namespace) {
753 bucket.metadata.extracted_from_indexed = false;
754 }
755
756 true
757 });
758 }
759 }
760
761 #[cfg(feature = "processing")]
762 fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
763 self.metrics.project_metrics.retain_mut(&mut f);
764 self.metrics.sampling_metrics.retain_mut(&mut f);
765 }
766}
767
768fn send_metrics(
769 metrics: ExtractedMetrics,
770 project_key: ProjectKey,
771 sampling_key: Option<ProjectKey>,
772 aggregator: &Addr<Aggregator>,
773) {
774 let ExtractedMetrics {
775 project_metrics,
776 sampling_metrics,
777 } = metrics;
778
779 if !project_metrics.is_empty() {
780 aggregator.send(MergeBuckets {
781 project_key,
782 buckets: project_metrics,
783 });
784 }
785
786 if !sampling_metrics.is_empty() {
787 let sampling_project_key = sampling_key.unwrap_or(project_key);
794 aggregator.send(MergeBuckets {
795 project_key: sampling_project_key,
796 buckets: sampling_metrics,
797 });
798 }
799}
800
801fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
806 match config.relay_mode() {
807 RelayMode::Proxy => false,
808 RelayMode::Managed => !project_info.has_feature(feature),
809 }
810}
811
812#[derive(Debug)]
815#[expect(
816 clippy::large_enum_variant,
817 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
818)]
819enum ProcessingResult {
820 Envelope {
821 managed_envelope: TypedEnvelope<Processed>,
822 extracted_metrics: ProcessingExtractedMetrics,
823 },
824 Output(Output<Outputs>),
825}
826
827impl ProcessingResult {
828 fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
830 Self::Envelope {
831 managed_envelope,
832 extracted_metrics: ProcessingExtractedMetrics::new(),
833 }
834 }
835}
836
837#[derive(Debug)]
839#[expect(
840 clippy::large_enum_variant,
841 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
842)]
843enum Submit<'a> {
844 Envelope(TypedEnvelope<Processed>),
846 Output {
848 output: Outputs,
849 ctx: processing::ForwardContext<'a>,
850 },
851}
852
853#[derive(Debug)]
863pub struct ProcessEnvelope {
864 pub envelope: ManagedEnvelope,
866 pub project_info: Arc<ProjectInfo>,
868 pub rate_limits: Arc<RateLimits>,
870 pub sampling_project_info: Option<Arc<ProjectInfo>>,
872 pub reservoir_counters: ReservoirCounters,
874}
875
876#[derive(Debug)]
878struct ProcessEnvelopeGrouped<'a> {
879 pub group: ProcessingGroup,
881 pub envelope: ManagedEnvelope,
883 pub ctx: processing::Context<'a>,
885 pub reservoir_counters: &'a ReservoirCounters,
887}
888
889#[derive(Debug)]
901pub struct ProcessMetrics {
902 pub data: MetricData,
904 pub project_key: ProjectKey,
906 pub source: BucketSource,
908 pub received_at: DateTime<Utc>,
910 pub sent_at: Option<DateTime<Utc>>,
913}
914
915#[derive(Debug)]
917pub enum MetricData {
918 Raw(Vec<Item>),
920 Parsed(Vec<Bucket>),
922}
923
924impl MetricData {
925 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
930 let items = match self {
931 Self::Parsed(buckets) => return buckets,
932 Self::Raw(items) => items,
933 };
934
935 let mut buckets = Vec::new();
936 for item in items {
937 let payload = item.payload();
938 if item.ty() == &ItemType::Statsd {
939 for bucket_result in Bucket::parse_all(&payload, timestamp) {
940 match bucket_result {
941 Ok(bucket) => buckets.push(bucket),
942 Err(error) => relay_log::debug!(
943 error = &error as &dyn Error,
944 "failed to parse metric bucket from statsd format",
945 ),
946 }
947 }
948 } else if item.ty() == &ItemType::MetricBuckets {
949 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
950 Ok(parsed_buckets) => {
951 if buckets.is_empty() {
953 buckets = parsed_buckets;
954 } else {
955 buckets.extend(parsed_buckets);
956 }
957 }
958 Err(error) => {
959 relay_log::debug!(
960 error = &error as &dyn Error,
961 "failed to parse metric bucket",
962 );
963 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
964 }
965 }
966 } else {
967 relay_log::error!(
968 "invalid item of type {} passed to ProcessMetrics",
969 item.ty()
970 );
971 }
972 }
973 buckets
974 }
975}
976
977#[derive(Debug)]
978pub struct ProcessBatchedMetrics {
979 pub payload: Bytes,
981 pub source: BucketSource,
983 pub received_at: DateTime<Utc>,
985 pub sent_at: Option<DateTime<Utc>>,
987}
988
989#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
991pub enum BucketSource {
992 Internal,
998 External,
1003}
1004
1005impl BucketSource {
1006 pub fn from_meta(meta: &RequestMeta) -> Self {
1008 match meta.request_trust() {
1009 RequestTrust::Trusted => Self::Internal,
1010 RequestTrust::Untrusted => Self::External,
1011 }
1012 }
1013}
1014
1015#[derive(Debug)]
1017pub struct SubmitClientReports {
1018 pub client_reports: Vec<ClientReport>,
1020 pub scoping: Scoping,
1022}
1023
1024#[derive(Debug)]
1026pub enum EnvelopeProcessor {
1027 ProcessEnvelope(Box<ProcessEnvelope>),
1028 ProcessProjectMetrics(Box<ProcessMetrics>),
1029 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1030 FlushBuckets(Box<FlushBuckets>),
1031 SubmitClientReports(Box<SubmitClientReports>),
1032}
1033
1034impl EnvelopeProcessor {
1035 pub fn variant(&self) -> &'static str {
1037 match self {
1038 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1039 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1040 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1041 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1042 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1043 }
1044 }
1045}
1046
1047impl relay_system::Interface for EnvelopeProcessor {}
1048
1049impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1050 type Response = relay_system::NoResponse;
1051
1052 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1053 Self::ProcessEnvelope(Box::new(message))
1054 }
1055}
1056
1057impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1058 type Response = NoResponse;
1059
1060 fn from_message(message: ProcessMetrics, _: ()) -> Self {
1061 Self::ProcessProjectMetrics(Box::new(message))
1062 }
1063}
1064
1065impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1066 type Response = NoResponse;
1067
1068 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1069 Self::ProcessBatchedMetrics(Box::new(message))
1070 }
1071}
1072
1073impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1074 type Response = NoResponse;
1075
1076 fn from_message(message: FlushBuckets, _: ()) -> Self {
1077 Self::FlushBuckets(Box::new(message))
1078 }
1079}
1080
1081impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1082 type Response = NoResponse;
1083
1084 fn from_message(message: SubmitClientReports, _: ()) -> Self {
1085 Self::SubmitClientReports(Box::new(message))
1086 }
1087}
1088
1089pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1091
1092#[derive(Clone)]
1096pub struct EnvelopeProcessorService {
1097 inner: Arc<InnerProcessor>,
1098}
1099
1100pub struct Addrs {
1102 pub outcome_aggregator: Addr<TrackOutcome>,
1103 pub upstream_relay: Addr<UpstreamRelay>,
1104 #[cfg(feature = "processing")]
1105 pub store_forwarder: Option<Addr<Store>>,
1106 pub aggregator: Addr<Aggregator>,
1107 #[cfg(feature = "processing")]
1108 pub global_rate_limits: Option<Addr<GlobalRateLimits>>,
1109}
1110
1111impl Default for Addrs {
1112 fn default() -> Self {
1113 Addrs {
1114 outcome_aggregator: Addr::dummy(),
1115 upstream_relay: Addr::dummy(),
1116 #[cfg(feature = "processing")]
1117 store_forwarder: None,
1118 aggregator: Addr::dummy(),
1119 #[cfg(feature = "processing")]
1120 global_rate_limits: None,
1121 }
1122 }
1123}
1124
1125struct InnerProcessor {
1126 pool: EnvelopeProcessorServicePool,
1127 config: Arc<Config>,
1128 global_config: GlobalConfigHandle,
1129 project_cache: ProjectCacheHandle,
1130 cogs: Cogs,
1131 #[cfg(feature = "processing")]
1132 quotas_client: Option<AsyncRedisClient>,
1133 addrs: Addrs,
1134 #[cfg(feature = "processing")]
1135 rate_limiter: Option<Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>>,
1136 geoip_lookup: GeoIpLookup,
1137 #[cfg(feature = "processing")]
1138 cardinality_limiter: Option<CardinalityLimiter>,
1139 metric_outcomes: MetricOutcomes,
1140 processing: Processing,
1141}
1142
1143struct Processing {
1144 logs: LogsProcessor,
1145 trace_metrics: TraceMetricsProcessor,
1146 spans: SpansProcessor,
1147 check_ins: CheckInsProcessor,
1148 sessions: SessionsProcessor,
1149}
1150
1151impl EnvelopeProcessorService {
1152 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1154 pub fn new(
1155 pool: EnvelopeProcessorServicePool,
1156 config: Arc<Config>,
1157 global_config: GlobalConfigHandle,
1158 project_cache: ProjectCacheHandle,
1159 cogs: Cogs,
1160 #[cfg(feature = "processing")] redis: Option<RedisClients>,
1161 addrs: Addrs,
1162 metric_outcomes: MetricOutcomes,
1163 ) -> Self {
1164 let geoip_lookup = config
1165 .geoip_path()
1166 .and_then(
1167 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1168 Ok(geoip) => Some(geoip),
1169 Err(err) => {
1170 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1171 None
1172 }
1173 },
1174 )
1175 .unwrap_or_else(GeoIpLookup::empty);
1176
1177 #[cfg(feature = "processing")]
1178 let (cardinality, quotas) = match redis {
1179 Some(RedisClients {
1180 cardinality,
1181 quotas,
1182 ..
1183 }) => (Some(cardinality), Some(quotas)),
1184 None => (None, None),
1185 };
1186
1187 #[cfg(feature = "processing")]
1188 let global_rate_limits = addrs.global_rate_limits.clone().map(Into::into);
1189
1190 #[cfg(feature = "processing")]
1191 let rate_limiter = match (quotas.clone(), global_rate_limits) {
1192 (Some(redis), Some(global)) => {
1193 Some(RedisRateLimiter::new(redis, global).max_limit(config.max_rate_limit()))
1194 }
1195 _ => None,
1196 };
1197
1198 let quota_limiter = Arc::new(QuotaRateLimiter::new(
1199 #[cfg(feature = "processing")]
1200 project_cache.clone(),
1201 #[cfg(feature = "processing")]
1202 rate_limiter.clone(),
1203 ));
1204 #[cfg(feature = "processing")]
1205 let rate_limiter = rate_limiter.map(Arc::new);
1206
1207 let inner = InnerProcessor {
1208 pool,
1209 global_config,
1210 project_cache,
1211 cogs,
1212 #[cfg(feature = "processing")]
1213 quotas_client: quotas.clone(),
1214 #[cfg(feature = "processing")]
1215 rate_limiter,
1216 addrs,
1217 #[cfg(feature = "processing")]
1218 cardinality_limiter: cardinality
1219 .map(|cardinality| {
1220 RedisSetLimiter::new(
1221 RedisSetLimiterOptions {
1222 cache_vacuum_interval: config
1223 .cardinality_limiter_cache_vacuum_interval(),
1224 },
1225 cardinality,
1226 )
1227 })
1228 .map(CardinalityLimiter::new),
1229 metric_outcomes,
1230 processing: Processing {
1231 logs: LogsProcessor::new(Arc::clone("a_limiter)),
1232 trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)),
1233 spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1234 check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)),
1235 sessions: SessionsProcessor::new(quota_limiter),
1236 },
1237 geoip_lookup,
1238 config,
1239 };
1240
1241 Self {
1242 inner: Arc::new(inner),
1243 }
1244 }
1245
1246 async fn enforce_quotas<Group>(
1247 &self,
1248 managed_envelope: &mut TypedEnvelope<Group>,
1249 event: Annotated<Event>,
1250 extracted_metrics: &mut ProcessingExtractedMetrics,
1251 ctx: processing::Context<'_>,
1252 ) -> Result<Annotated<Event>, ProcessingError> {
1253 let cached_result = RateLimiter::Cached
1256 .enforce(managed_envelope, event, extracted_metrics, ctx)
1257 .await?;
1258
1259 if_processing!(self.inner.config, {
1260 let rate_limiter = match self.inner.rate_limiter.clone() {
1261 Some(rate_limiter) => rate_limiter,
1262 None => return Ok(cached_result.event),
1263 };
1264
1265 let consistent_result = RateLimiter::Consistent(rate_limiter)
1267 .enforce(
1268 managed_envelope,
1269 cached_result.event,
1270 extracted_metrics,
1271 ctx
1272 )
1273 .await?;
1274
1275 if !consistent_result.rate_limits.is_empty() {
1277 self.inner
1278 .project_cache
1279 .get(managed_envelope.scoping().project_key)
1280 .rate_limits()
1281 .merge(consistent_result.rate_limits);
1282 }
1283
1284 Ok(consistent_result.event)
1285 } else { Ok(cached_result.event) })
1286 }
1287
1288 async fn process_errors(
1290 &self,
1291 managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1292 project_id: ProjectId,
1293 mut ctx: processing::Context<'_>,
1294 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1295 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1296 let mut metrics = Metrics::default();
1297 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1298
1299 report::process_user_reports(managed_envelope);
1301
1302 if_processing!(self.inner.config, {
1303 unreal::expand(managed_envelope, &self.inner.config)?;
1304 #[cfg(sentry)]
1305 playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1306 nnswitch::expand(managed_envelope)?;
1307 });
1308
1309 let extraction_result = event::extract(
1310 managed_envelope,
1311 &mut metrics,
1312 event_fully_normalized,
1313 &self.inner.config,
1314 )?;
1315 let mut event = extraction_result.event;
1316
1317 if_processing!(self.inner.config, {
1318 if let Some(inner_event_fully_normalized) =
1319 unreal::process(managed_envelope, &mut event)?
1320 {
1321 event_fully_normalized = inner_event_fully_normalized;
1322 }
1323 #[cfg(sentry)]
1324 if let Some(inner_event_fully_normalized) =
1325 playstation::process(managed_envelope, &mut event, ctx.project_info)?
1326 {
1327 event_fully_normalized = inner_event_fully_normalized;
1328 }
1329 if let Some(inner_event_fully_normalized) =
1330 attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1331 {
1332 event_fully_normalized = inner_event_fully_normalized;
1333 }
1334 });
1335
1336 ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1337 managed_envelope,
1338 &mut event,
1339 ctx.project_info,
1340 ctx.sampling_project_info,
1341 );
1342
1343 let attachments = managed_envelope
1344 .envelope()
1345 .items()
1346 .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1347 processing::utils::event::finalize(
1348 managed_envelope.envelope().headers(),
1349 &mut event,
1350 attachments,
1351 &mut metrics,
1352 &self.inner.config,
1353 )?;
1354 event_fully_normalized = processing::utils::event::normalize(
1355 managed_envelope.envelope().headers(),
1356 &mut event,
1357 event_fully_normalized,
1358 &ctx,
1359 &self.inner.geoip_lookup,
1360 )?;
1361 let filter_run = processing::utils::event::filter(
1362 managed_envelope.envelope().headers(),
1363 &mut event,
1364 &ctx,
1365 )
1366 .map_err(|err| {
1367 managed_envelope.reject(Outcome::Filtered(err.clone()));
1368 ProcessingError::EventFiltered(err)
1369 })?;
1370
1371 if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1372 dynamic_sampling::tag_error_with_sampling_decision(
1373 managed_envelope,
1374 &mut event,
1375 ctx.sampling_project_info,
1376 &self.inner.config,
1377 )
1378 .await;
1379 }
1380
1381 event = self
1382 .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1383 .await?;
1384
1385 if event.value().is_some() {
1386 event::scrub(&mut event, ctx.project_info)?;
1387 event::serialize(
1388 managed_envelope,
1389 &mut event,
1390 event_fully_normalized,
1391 EventMetricsExtracted(false),
1392 SpansExtracted(false),
1393 )?;
1394 event::emit_feedback_metrics(managed_envelope.envelope());
1395 }
1396
1397 attachment::scrub(managed_envelope, ctx.project_info);
1398
1399 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1400 relay_log::error!(
1401 tags.project = %project_id,
1402 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1403 "ingested event without normalizing"
1404 );
1405 }
1406
1407 Ok(Some(extracted_metrics))
1408 }
1409
1410 #[allow(unused_assignments)]
1412 #[allow(clippy::too_many_arguments)]
1413 async fn process_transactions(
1414 &self,
1415 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1416 cogs: &mut Token,
1417 project_id: ProjectId,
1418 mut ctx: processing::Context<'_>,
1419 reservoir_counters: &ReservoirCounters,
1420 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1421 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1422 let mut event_metrics_extracted = EventMetricsExtracted(false);
1423 let mut spans_extracted = SpansExtracted(false);
1424 let mut metrics = Metrics::default();
1425 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1426
1427 let extraction_result = event::extract(
1429 managed_envelope,
1430 &mut metrics,
1431 event_fully_normalized,
1432 &self.inner.config,
1433 )?;
1434
1435 if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1437 event_metrics_extracted = inner_event_metrics_extracted;
1438 }
1439 if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1440 spans_extracted = inner_spans_extracted;
1441 };
1442
1443 let mut event = extraction_result.event;
1445
1446 let profile_id = profile::filter(
1447 managed_envelope,
1448 &event,
1449 ctx.config,
1450 project_id,
1451 ctx.project_info,
1452 );
1453 profile::transfer_id(&mut event, profile_id);
1454
1455 ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1456 managed_envelope,
1457 &mut event,
1458 ctx.project_info,
1459 ctx.sampling_project_info,
1460 );
1461
1462 let attachments = managed_envelope
1463 .envelope()
1464 .items()
1465 .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1466 processing::utils::event::finalize(
1467 managed_envelope.envelope().headers(),
1468 &mut event,
1469 attachments,
1470 &mut metrics,
1471 &self.inner.config,
1472 )?;
1473
1474 event_fully_normalized = processing::utils::event::normalize(
1475 managed_envelope.envelope().headers(),
1476 &mut event,
1477 event_fully_normalized,
1478 &ctx,
1479 &self.inner.geoip_lookup,
1480 )?;
1481
1482 let filter_run = processing::utils::event::filter(
1483 managed_envelope.envelope().headers(),
1484 &mut event,
1485 &ctx,
1486 )
1487 .map_err(|err| {
1488 managed_envelope.reject(Outcome::Filtered(err.clone()));
1489 ProcessingError::EventFiltered(err)
1490 })?;
1491
1492 let run_dynamic_sampling = (matches!(filter_run, FiltersStatus::Ok)
1496 || self.inner.config.processing_enabled())
1497 && matches!(&ctx.project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled());
1498
1499 let sampling_result = match run_dynamic_sampling {
1500 true => {
1501 #[allow(unused_mut)]
1502 let mut reservoir = ReservoirEvaluator::new(Arc::clone(reservoir_counters));
1503 #[cfg(feature = "processing")]
1504 if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
1505 reservoir.set_redis(managed_envelope.scoping().organization_id, quotas_client);
1506 }
1507 processing::utils::dynamic_sampling::run(
1508 managed_envelope.envelope().headers().dsc(),
1509 &mut event,
1510 &ctx,
1511 Some(&reservoir),
1512 )
1513 .await
1514 }
1515 false => SamplingResult::Pending,
1516 };
1517
1518 relay_statsd::metric!(
1519 counter(RelayCounters::SamplingDecision) += 1,
1520 decision = sampling_result.decision().as_str(),
1521 item = "transaction"
1522 );
1523
1524 #[cfg(feature = "processing")]
1525 let server_sample_rate = sampling_result.sample_rate();
1526
1527 if let Some(outcome) = sampling_result.into_dropped_outcome() {
1528 profile::process(
1531 managed_envelope,
1532 &mut event,
1533 ctx.global_config,
1534 ctx.config,
1535 ctx.project_info,
1536 );
1537 event_metrics_extracted = processing::utils::transaction::extract_metrics(
1539 &mut event,
1540 &mut extracted_metrics,
1541 ExtractMetricsContext {
1542 dsc: managed_envelope.envelope().dsc(),
1543 project_id,
1544 ctx: &ctx,
1545 sampling_decision: SamplingDecision::Drop,
1546 event_metrics_extracted,
1547 spans_extracted,
1548 },
1549 )?;
1550
1551 dynamic_sampling::drop_unsampled_items(
1552 managed_envelope,
1553 event,
1554 outcome,
1555 spans_extracted,
1556 );
1557
1558 event = self
1563 .enforce_quotas(
1564 managed_envelope,
1565 Annotated::empty(),
1566 &mut extracted_metrics,
1567 ctx,
1568 )
1569 .await?;
1570
1571 return Ok(Some(extracted_metrics));
1572 }
1573
1574 let _post_ds = cogs.start_category("post_ds");
1575
1576 event::scrub(&mut event, ctx.project_info)?;
1580
1581 attachment::scrub(managed_envelope, ctx.project_info);
1582
1583 if_processing!(self.inner.config, {
1584 let profile_id = profile::process(
1586 managed_envelope,
1587 &mut event,
1588 ctx.global_config,
1589 ctx.config,
1590 ctx.project_info,
1591 );
1592 profile::transfer_id(&mut event, profile_id);
1593 profile::scrub_profiler_id(&mut event);
1594
1595 event_metrics_extracted = processing::utils::transaction::extract_metrics(
1597 &mut event,
1598 &mut extracted_metrics,
1599 ExtractMetricsContext {
1600 dsc: managed_envelope.envelope().dsc(),
1601 project_id,
1602 ctx: &ctx,
1603 sampling_decision: SamplingDecision::Keep,
1604 event_metrics_extracted,
1605 spans_extracted,
1606 },
1607 )?;
1608
1609 spans_extracted = span::extract_from_event(
1610 managed_envelope,
1611 &event,
1612 ctx.global_config,
1613 ctx.config,
1614 server_sample_rate,
1615 event_metrics_extracted,
1616 spans_extracted,
1617 );
1618 });
1619
1620 event = self
1621 .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1622 .await?;
1623
1624 if_processing!(self.inner.config, {
1625 event = span::maybe_discard_transaction(managed_envelope, event, ctx.project_info);
1626 });
1627
1628 if event.value().is_some() {
1630 event::serialize(
1631 managed_envelope,
1632 &mut event,
1633 event_fully_normalized,
1634 event_metrics_extracted,
1635 spans_extracted,
1636 )?;
1637 }
1638
1639 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1640 relay_log::error!(
1641 tags.project = %project_id,
1642 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1643 "ingested event without normalizing"
1644 );
1645 };
1646
1647 Ok(Some(extracted_metrics))
1648 }
1649
1650 async fn process_profile_chunks(
1651 &self,
1652 managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
1653 ctx: processing::Context<'_>,
1654 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1655 profile_chunk::filter(managed_envelope, ctx.project_info);
1656
1657 if_processing!(self.inner.config, {
1658 profile_chunk::process(
1659 managed_envelope,
1660 ctx.project_info,
1661 ctx.global_config,
1662 ctx.config,
1663 );
1664 });
1665
1666 self.enforce_quotas(
1667 managed_envelope,
1668 Annotated::empty(),
1669 &mut ProcessingExtractedMetrics::new(),
1670 ctx,
1671 )
1672 .await?;
1673
1674 Ok(None)
1675 }
1676
1677 async fn process_standalone(
1679 &self,
1680 managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1681 project_id: ProjectId,
1682 ctx: processing::Context<'_>,
1683 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1684 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1685
1686 standalone::process(managed_envelope);
1687
1688 profile::filter(
1689 managed_envelope,
1690 &Annotated::empty(),
1691 ctx.config,
1692 project_id,
1693 ctx.project_info,
1694 );
1695
1696 self.enforce_quotas(
1697 managed_envelope,
1698 Annotated::empty(),
1699 &mut extracted_metrics,
1700 ctx,
1701 )
1702 .await?;
1703
1704 report::process_user_reports(managed_envelope);
1705 attachment::scrub(managed_envelope, ctx.project_info);
1706
1707 Ok(Some(extracted_metrics))
1708 }
1709
1710 async fn process_client_reports(
1712 &self,
1713 managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1714 ctx: processing::Context<'_>,
1715 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1716 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1717
1718 self.enforce_quotas(
1719 managed_envelope,
1720 Annotated::empty(),
1721 &mut extracted_metrics,
1722 ctx,
1723 )
1724 .await?;
1725
1726 report::process_client_reports(
1727 managed_envelope,
1728 ctx.config,
1729 ctx.project_info,
1730 self.inner.addrs.outcome_aggregator.clone(),
1731 );
1732
1733 Ok(Some(extracted_metrics))
1734 }
1735
1736 async fn process_replays(
1738 &self,
1739 managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1740 ctx: processing::Context<'_>,
1741 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1742 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1743
1744 replay::process(
1745 managed_envelope,
1746 ctx.global_config,
1747 ctx.config,
1748 ctx.project_info,
1749 &self.inner.geoip_lookup,
1750 )?;
1751
1752 self.enforce_quotas(
1753 managed_envelope,
1754 Annotated::empty(),
1755 &mut extracted_metrics,
1756 ctx,
1757 )
1758 .await?;
1759
1760 Ok(Some(extracted_metrics))
1761 }
1762
1763 async fn process_nel(
1764 &self,
1765 mut managed_envelope: ManagedEnvelope,
1766 ctx: processing::Context<'_>,
1767 ) -> Result<ProcessingResult, ProcessingError> {
1768 nel::convert_to_logs(&mut managed_envelope);
1769 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1770 .await
1771 }
1772
1773 async fn process_with_processor<P: processing::Processor>(
1774 &self,
1775 processor: &P,
1776 mut managed_envelope: ManagedEnvelope,
1777 ctx: processing::Context<'_>,
1778 ) -> Result<ProcessingResult, ProcessingError>
1779 where
1780 Outputs: From<P::Output>,
1781 {
1782 let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1783 debug_assert!(
1784 false,
1785 "there must be work for the {} processor",
1786 std::any::type_name::<P>(),
1787 );
1788 return Err(ProcessingError::ProcessingGroupMismatch);
1789 };
1790
1791 managed_envelope.update();
1792 match managed_envelope.envelope().is_empty() {
1793 true => managed_envelope.accept(),
1794 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1795 }
1796
1797 processor
1798 .process(work, ctx)
1799 .await
1800 .map_err(|err| {
1801 relay_log::debug!(
1802 error = &err as &dyn std::error::Error,
1803 "processing pipeline failed"
1804 );
1805 ProcessingError::ProcessingFailure
1806 })
1807 .map(|o| o.map(Into::into))
1808 .map(ProcessingResult::Output)
1809 }
1810
1811 async fn process_standalone_spans(
1815 &self,
1816 managed_envelope: &mut TypedEnvelope<SpanGroup>,
1817 _project_id: ProjectId,
1818 ctx: processing::Context<'_>,
1819 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1820 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1821
1822 span::filter(managed_envelope, ctx.config, ctx.project_info);
1823 span::convert_otel_traces_data(managed_envelope);
1824
1825 if_processing!(self.inner.config, {
1826 span::process(
1827 managed_envelope,
1828 &mut Annotated::empty(),
1829 &mut extracted_metrics,
1830 _project_id,
1831 ctx,
1832 &self.inner.geoip_lookup,
1833 )
1834 .await;
1835 });
1836
1837 self.enforce_quotas(
1838 managed_envelope,
1839 Annotated::empty(),
1840 &mut extracted_metrics,
1841 ctx,
1842 )
1843 .await?;
1844
1845 Ok(Some(extracted_metrics))
1846 }
1847
1848 async fn process_envelope(
1849 &self,
1850 cogs: &mut Token,
1851 project_id: ProjectId,
1852 message: ProcessEnvelopeGrouped<'_>,
1853 ) -> Result<ProcessingResult, ProcessingError> {
1854 let ProcessEnvelopeGrouped {
1855 group,
1856 envelope: mut managed_envelope,
1857 ctx,
1858 reservoir_counters,
1859 } = message;
1860
1861 if let Some(sampling_state) = ctx.sampling_project_info {
1863 managed_envelope
1866 .envelope_mut()
1867 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1868 }
1869
1870 if let Some(retention) = ctx.project_info.config.event_retention {
1873 managed_envelope.envelope_mut().set_retention(retention);
1874 }
1875
1876 if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1879 managed_envelope
1880 .envelope_mut()
1881 .set_downsampled_retention(retention);
1882 }
1883
1884 managed_envelope
1889 .envelope_mut()
1890 .meta_mut()
1891 .set_project_id(project_id);
1892
1893 macro_rules! run {
1894 ($fn_name:ident $(, $args:expr)*) => {
1895 async {
1896 let mut managed_envelope = (managed_envelope, group).try_into()?;
1897 match self.$fn_name(&mut managed_envelope, $($args),*).await {
1898 Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1899 managed_envelope: managed_envelope.into_processed(),
1900 extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1901 }),
1902 Err(error) => {
1903 relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1904 if let Some(outcome) = error.to_outcome() {
1905 managed_envelope.reject(outcome);
1906 }
1907
1908 return Err(error);
1909 }
1910 }
1911 }.await
1912 };
1913 }
1914
1915 relay_log::trace!("Processing {group} group", group = group.variant());
1916
1917 match group {
1918 ProcessingGroup::Error => run!(process_errors, project_id, ctx),
1919 ProcessingGroup::Transaction => {
1920 run!(
1921 process_transactions,
1922 cogs,
1923 project_id,
1924 ctx,
1925 reservoir_counters
1926 )
1927 }
1928 ProcessingGroup::Session => {
1929 self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1930 .await
1931 }
1932 ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx),
1933 ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
1934 ProcessingGroup::Replay => {
1935 run!(process_replays, ctx)
1936 }
1937 ProcessingGroup::CheckIn => {
1938 self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1939 .await
1940 }
1941 ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1942 ProcessingGroup::Log => {
1943 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1944 .await
1945 }
1946 ProcessingGroup::TraceMetric => {
1947 self.process_with_processor(
1948 &self.inner.processing.trace_metrics,
1949 managed_envelope,
1950 ctx,
1951 )
1952 .await
1953 }
1954 ProcessingGroup::SpanV2 => {
1955 self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1956 .await
1957 }
1958 ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1959 ProcessingGroup::ProfileChunk => {
1960 run!(process_profile_chunks, ctx)
1961 }
1962 ProcessingGroup::Metrics => {
1964 if self.inner.config.relay_mode() != RelayMode::Proxy {
1967 relay_log::error!(
1968 tags.project = %project_id,
1969 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1970 "received metrics in the process_state"
1971 );
1972 }
1973
1974 Ok(ProcessingResult::no_metrics(
1975 managed_envelope.into_processed(),
1976 ))
1977 }
1978 ProcessingGroup::Ungrouped => {
1980 relay_log::error!(
1981 tags.project = %project_id,
1982 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1983 "could not identify the processing group based on the envelope's items"
1984 );
1985
1986 Ok(ProcessingResult::no_metrics(
1987 managed_envelope.into_processed(),
1988 ))
1989 }
1990 ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
1994 managed_envelope.into_processed(),
1995 )),
1996 }
1997 }
1998
1999 async fn process<'a>(
2005 &self,
2006 cogs: &mut Token,
2007 mut message: ProcessEnvelopeGrouped<'a>,
2008 ) -> Result<Option<Submit<'a>>, ProcessingError> {
2009 let ProcessEnvelopeGrouped {
2010 ref mut envelope,
2011 ctx,
2012 ..
2013 } = message;
2014
2015 let Some(project_id) = ctx
2022 .project_info
2023 .project_id
2024 .or_else(|| envelope.envelope().meta().project_id())
2025 else {
2026 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2027 return Err(ProcessingError::MissingProjectId);
2028 };
2029
2030 let client = envelope.envelope().meta().client().map(str::to_owned);
2031 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2032 let project_key = envelope.envelope().meta().public_key();
2033 let sampling_key = envelope
2037 .envelope()
2038 .sampling_key()
2039 .filter(|_| ctx.sampling_project_info.is_some());
2040
2041 relay_log::configure_scope(|scope| {
2044 scope.set_tag("project", project_id);
2045 if let Some(client) = client {
2046 scope.set_tag("sdk", client);
2047 }
2048 if let Some(user_agent) = user_agent {
2049 scope.set_extra("user_agent", user_agent.into());
2050 }
2051 });
2052
2053 let result = match self.process_envelope(cogs, project_id, message).await {
2054 Ok(ProcessingResult::Envelope {
2055 mut managed_envelope,
2056 extracted_metrics,
2057 }) => {
2058 managed_envelope.update();
2061
2062 let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2063 send_metrics(
2064 extracted_metrics.metrics,
2065 project_key,
2066 sampling_key,
2067 &self.inner.addrs.aggregator,
2068 );
2069
2070 let envelope_response = if managed_envelope.envelope().is_empty() {
2071 if !has_metrics {
2072 managed_envelope.reject(Outcome::RateLimited(None));
2074 } else {
2075 managed_envelope.accept();
2076 }
2077
2078 None
2079 } else {
2080 Some(managed_envelope)
2081 };
2082
2083 Ok(envelope_response.map(Submit::Envelope))
2084 }
2085 Ok(ProcessingResult::Output(Output { main, metrics })) => {
2086 if let Some(metrics) = metrics {
2087 metrics.accept(|metrics| {
2088 send_metrics(
2089 metrics,
2090 project_key,
2091 sampling_key,
2092 &self.inner.addrs.aggregator,
2093 );
2094 });
2095 }
2096
2097 let ctx = ctx.to_forward();
2098 Ok(main.map(|output| Submit::Output { output, ctx }))
2099 }
2100 Err(err) => Err(err),
2101 };
2102
2103 relay_log::configure_scope(|scope| {
2104 scope.remove_tag("project");
2105 scope.remove_tag("sdk");
2106 scope.remove_tag("user_agent");
2107 });
2108
2109 result
2110 }
2111
2112 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2113 let project_key = message.envelope.envelope().meta().public_key();
2114 let wait_time = message.envelope.age();
2115 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2116
2117 cogs.cancel();
2120
2121 let scoping = message.envelope.scoping();
2122 for (group, envelope) in ProcessingGroup::split_envelope(
2123 *message.envelope.into_envelope(),
2124 &message.project_info,
2125 ) {
2126 let mut cogs = self
2127 .inner
2128 .cogs
2129 .timed(ResourceId::Relay, AppFeature::from(group));
2130
2131 let mut envelope =
2132 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2133 envelope.scope(scoping);
2134
2135 let global_config = self.inner.global_config.current();
2136
2137 let ctx = processing::Context {
2138 config: &self.inner.config,
2139 global_config: &global_config,
2140 project_info: &message.project_info,
2141 sampling_project_info: message.sampling_project_info.as_deref(),
2142 rate_limits: &message.rate_limits,
2143 };
2144
2145 let message = ProcessEnvelopeGrouped {
2146 group,
2147 envelope,
2148 ctx,
2149 reservoir_counters: &message.reservoir_counters,
2150 };
2151
2152 let result = metric!(
2153 timer(RelayTimers::EnvelopeProcessingTime),
2154 group = group.variant(),
2155 { self.process(&mut cogs, message).await }
2156 );
2157
2158 match result {
2159 Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2160 Ok(None) => {}
2161 Err(error) if error.is_unexpected() => {
2162 relay_log::error!(
2163 tags.project_key = %project_key,
2164 error = &error as &dyn Error,
2165 "error processing envelope"
2166 )
2167 }
2168 Err(error) => {
2169 relay_log::debug!(
2170 tags.project_key = %project_key,
2171 error = &error as &dyn Error,
2172 "error processing envelope"
2173 )
2174 }
2175 }
2176 }
2177 }
2178
2179 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2180 let ProcessMetrics {
2181 data,
2182 project_key,
2183 received_at,
2184 sent_at,
2185 source,
2186 } = message;
2187
2188 let received_timestamp =
2189 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2190
2191 let mut buckets = data.into_buckets(received_timestamp);
2192 if buckets.is_empty() {
2193 return;
2194 };
2195 cogs.update(relay_metrics::cogs::BySize(&buckets));
2196
2197 let clock_drift_processor =
2198 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2199
2200 buckets.retain_mut(|bucket| {
2201 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2202 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2203 return false;
2204 }
2205
2206 if !self::metrics::is_valid_namespace(bucket) {
2207 return false;
2208 }
2209
2210 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2211
2212 if !matches!(source, BucketSource::Internal) {
2213 bucket.metadata = BucketMetadata::new(received_timestamp);
2214 }
2215
2216 true
2217 });
2218
2219 let project = self.inner.project_cache.get(project_key);
2220
2221 let buckets = match project.state() {
2224 ProjectState::Enabled(project_info) => {
2225 let rate_limits = project.rate_limits().current_limits();
2226 self.check_buckets(project_key, project_info, &rate_limits, buckets)
2227 }
2228 _ => buckets,
2229 };
2230
2231 relay_log::trace!("merging metric buckets into the aggregator");
2232 self.inner
2233 .addrs
2234 .aggregator
2235 .send(MergeBuckets::new(project_key, buckets));
2236 }
2237
2238 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2239 let ProcessBatchedMetrics {
2240 payload,
2241 source,
2242 received_at,
2243 sent_at,
2244 } = message;
2245
2246 #[derive(serde::Deserialize)]
2247 struct Wrapper {
2248 buckets: HashMap<ProjectKey, Vec<Bucket>>,
2249 }
2250
2251 let buckets = match serde_json::from_slice(&payload) {
2252 Ok(Wrapper { buckets }) => buckets,
2253 Err(error) => {
2254 relay_log::debug!(
2255 error = &error as &dyn Error,
2256 "failed to parse batched metrics",
2257 );
2258 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2259 return;
2260 }
2261 };
2262
2263 for (project_key, buckets) in buckets {
2264 self.handle_process_metrics(
2265 cogs,
2266 ProcessMetrics {
2267 data: MetricData::Parsed(buckets),
2268 project_key,
2269 source,
2270 received_at,
2271 sent_at,
2272 },
2273 )
2274 }
2275 }
2276
2277 fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2278 let _submit = cogs.start_category("submit");
2279
2280 #[cfg(feature = "processing")]
2281 if self.inner.config.processing_enabled()
2282 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2283 {
2284 match submit {
2285 Submit::Envelope(envelope) => store_forwarder.send(StoreEnvelope { envelope }),
2286 Submit::Output { output, ctx } => output
2287 .forward_store(store_forwarder, ctx)
2288 .unwrap_or_else(|err| err.into_inner()),
2289 }
2290 return;
2291 }
2292
2293 let mut envelope = match submit {
2294 Submit::Envelope(envelope) => envelope,
2295 Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2296 Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2297 Err(_) => {
2298 relay_log::error!("failed to serialize output to an envelope");
2299 return;
2300 }
2301 },
2302 };
2303
2304 if envelope.envelope_mut().is_empty() {
2305 envelope.accept();
2306 return;
2307 }
2308
2309 envelope.envelope_mut().set_sent_at(Utc::now());
2315
2316 relay_log::trace!("sending envelope to sentry endpoint");
2317 let http_encoding = self.inner.config.http_encoding();
2318 let result = envelope.envelope().to_vec().and_then(|v| {
2319 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2320 });
2321
2322 match result {
2323 Ok(body) => {
2324 self.inner
2325 .addrs
2326 .upstream_relay
2327 .send(SendRequest(SendEnvelope {
2328 envelope,
2329 body,
2330 http_encoding,
2331 project_cache: self.inner.project_cache.clone(),
2332 }));
2333 }
2334 Err(error) => {
2335 relay_log::error!(
2338 error = &error as &dyn Error,
2339 tags.project_key = %envelope.scoping().project_key,
2340 "failed to serialize envelope payload"
2341 );
2342
2343 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2344 }
2345 }
2346 }
2347
2348 fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2349 let SubmitClientReports {
2350 client_reports,
2351 scoping,
2352 } = message;
2353
2354 let upstream = self.inner.config.upstream_descriptor();
2355 let dsn = PartialDsn::outbound(&scoping, upstream);
2356
2357 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2358 for client_report in client_reports {
2359 match client_report.serialize() {
2360 Ok(payload) => {
2361 let mut item = Item::new(ItemType::ClientReport);
2362 item.set_payload(ContentType::Json, payload);
2363 envelope.add_item(item);
2364 }
2365 Err(error) => {
2366 relay_log::error!(
2367 error = &error as &dyn std::error::Error,
2368 "failed to serialize client report"
2369 );
2370 }
2371 }
2372 }
2373
2374 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2375 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2376 }
2377
2378 fn check_buckets(
2379 &self,
2380 project_key: ProjectKey,
2381 project_info: &ProjectInfo,
2382 rate_limits: &RateLimits,
2383 buckets: Vec<Bucket>,
2384 ) -> Vec<Bucket> {
2385 let Some(scoping) = project_info.scoping(project_key) else {
2386 relay_log::error!(
2387 tags.project_key = project_key.as_str(),
2388 "there is no scoping: dropping {} buckets",
2389 buckets.len(),
2390 );
2391 return Vec::new();
2392 };
2393
2394 let mut buckets = self::metrics::apply_project_info(
2395 buckets,
2396 &self.inner.metric_outcomes,
2397 project_info,
2398 scoping,
2399 );
2400
2401 let namespaces: BTreeSet<MetricNamespace> = buckets
2402 .iter()
2403 .filter_map(|bucket| bucket.name.try_namespace())
2404 .collect();
2405
2406 for namespace in namespaces {
2407 let limits = rate_limits.check_with_quotas(
2408 project_info.get_quotas(),
2409 scoping.item(DataCategory::MetricBucket),
2410 );
2411
2412 if limits.is_limited() {
2413 let rejected;
2414 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2415 bucket.name.try_namespace() == Some(namespace)
2416 });
2417
2418 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2419 self.inner.metric_outcomes.track(
2420 scoping,
2421 &rejected,
2422 Outcome::RateLimited(reason_code),
2423 );
2424 }
2425 }
2426
2427 let quotas = project_info.config.quotas.clone();
2428 match MetricsLimiter::create(buckets, quotas, scoping) {
2429 Ok(mut bucket_limiter) => {
2430 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2431 bucket_limiter.into_buckets()
2432 }
2433 Err(buckets) => buckets,
2434 }
2435 }
2436
2437 #[cfg(feature = "processing")]
2438 async fn rate_limit_buckets(
2439 &self,
2440 scoping: Scoping,
2441 project_info: &ProjectInfo,
2442 mut buckets: Vec<Bucket>,
2443 ) -> Vec<Bucket> {
2444 let Some(rate_limiter) = &self.inner.rate_limiter else {
2445 return buckets;
2446 };
2447
2448 let global_config = self.inner.global_config.current();
2449 let namespaces = buckets
2450 .iter()
2451 .filter_map(|bucket| bucket.name.try_namespace())
2452 .counts();
2453
2454 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2455
2456 for (namespace, quantity) in namespaces {
2457 let item_scoping = scoping.metric_bucket(namespace);
2458
2459 let limits = match rate_limiter
2460 .is_rate_limited(quotas, item_scoping, quantity, false)
2461 .await
2462 {
2463 Ok(limits) => limits,
2464 Err(err) => {
2465 relay_log::error!(
2466 error = &err as &dyn std::error::Error,
2467 "failed to check redis rate limits"
2468 );
2469 break;
2470 }
2471 };
2472
2473 if limits.is_limited() {
2474 let rejected;
2475 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2476 bucket.name.try_namespace() == Some(namespace)
2477 });
2478
2479 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2480 self.inner.metric_outcomes.track(
2481 scoping,
2482 &rejected,
2483 Outcome::RateLimited(reason_code),
2484 );
2485
2486 self.inner
2487 .project_cache
2488 .get(item_scoping.scoping.project_key)
2489 .rate_limits()
2490 .merge(limits);
2491 }
2492 }
2493
2494 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2495 Err(buckets) => buckets,
2496 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2497 }
2498 }
2499
2500 #[cfg(feature = "processing")]
2502 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2503 relay_log::trace!("handle_rate_limit_buckets");
2504
2505 let scoping = *bucket_limiter.scoping();
2506
2507 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2508 let global_config = self.inner.global_config.current();
2509 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2510
2511 let over_accept_once = true;
2514 let mut rate_limits = RateLimits::new();
2515
2516 for category in [DataCategory::Transaction, DataCategory::Span] {
2517 let count = bucket_limiter.count(category);
2518
2519 let timer = Instant::now();
2520 let mut is_limited = false;
2521
2522 if let Some(count) = count {
2523 match rate_limiter
2524 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2525 .await
2526 {
2527 Ok(limits) => {
2528 is_limited = limits.is_limited();
2529 rate_limits.merge(limits)
2530 }
2531 Err(e) => relay_log::error!(error = &e as &dyn Error),
2532 }
2533 }
2534
2535 relay_statsd::metric!(
2536 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2537 category = category.name(),
2538 limited = if is_limited { "true" } else { "false" },
2539 count = match count {
2540 None => "none",
2541 Some(0) => "0",
2542 Some(1) => "1",
2543 Some(1..=10) => "10",
2544 Some(1..=25) => "25",
2545 Some(1..=50) => "50",
2546 Some(51..=100) => "100",
2547 Some(101..=500) => "500",
2548 _ => "> 500",
2549 },
2550 );
2551 }
2552
2553 if rate_limits.is_limited() {
2554 let was_enforced =
2555 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2556
2557 if was_enforced {
2558 self.inner
2560 .project_cache
2561 .get(scoping.project_key)
2562 .rate_limits()
2563 .merge(rate_limits);
2564 }
2565 }
2566 }
2567
2568 bucket_limiter.into_buckets()
2569 }
2570
2571 #[cfg(feature = "processing")]
2573 async fn cardinality_limit_buckets(
2574 &self,
2575 scoping: Scoping,
2576 limits: &[CardinalityLimit],
2577 buckets: Vec<Bucket>,
2578 ) -> Vec<Bucket> {
2579 let global_config = self.inner.global_config.current();
2580 let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2581
2582 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2583 return buckets;
2584 }
2585
2586 let Some(ref limiter) = self.inner.cardinality_limiter else {
2587 return buckets;
2588 };
2589
2590 let scope = relay_cardinality::Scoping {
2591 organization_id: scoping.organization_id,
2592 project_id: scoping.project_id,
2593 };
2594
2595 let limits = match limiter
2596 .check_cardinality_limits(scope, limits, buckets)
2597 .await
2598 {
2599 Ok(limits) => limits,
2600 Err((buckets, error)) => {
2601 relay_log::error!(
2602 error = &error as &dyn std::error::Error,
2603 "cardinality limiter failed"
2604 );
2605 return buckets;
2606 }
2607 };
2608
2609 let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2610 if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2611 for limit in limits.exceeded_limits() {
2612 relay_log::with_scope(
2613 |scope| {
2614 scope.set_user(Some(relay_log::sentry::User {
2616 id: Some(scoping.organization_id.to_string()),
2617 ..Default::default()
2618 }));
2619 },
2620 || {
2621 relay_log::error!(
2622 tags.organization_id = scoping.organization_id.value(),
2623 tags.limit_id = limit.id,
2624 tags.passive = limit.passive,
2625 "Cardinality Limit"
2626 );
2627 },
2628 );
2629 }
2630 }
2631
2632 for (limit, reports) in limits.cardinality_reports() {
2633 for report in reports {
2634 self.inner
2635 .metric_outcomes
2636 .cardinality(scoping, limit, report);
2637 }
2638 }
2639
2640 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2641 return limits.into_source();
2642 }
2643
2644 let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2645
2646 for (bucket, exceeded) in rejected {
2647 self.inner.metric_outcomes.track(
2648 scoping,
2649 &[bucket],
2650 Outcome::CardinalityLimited(exceeded.id.clone()),
2651 );
2652 }
2653 accepted
2654 }
2655
2656 #[cfg(feature = "processing")]
2663 async fn encode_metrics_processing(
2664 &self,
2665 message: FlushBuckets,
2666 store_forwarder: &Addr<Store>,
2667 ) {
2668 use crate::constants::DEFAULT_EVENT_RETENTION;
2669 use crate::services::store::StoreMetrics;
2670
2671 for ProjectBuckets {
2672 buckets,
2673 scoping,
2674 project_info,
2675 ..
2676 } in message.buckets.into_values()
2677 {
2678 let buckets = self
2679 .rate_limit_buckets(scoping, &project_info, buckets)
2680 .await;
2681
2682 let limits = project_info.get_cardinality_limits();
2683 let buckets = self
2684 .cardinality_limit_buckets(scoping, limits, buckets)
2685 .await;
2686
2687 if buckets.is_empty() {
2688 continue;
2689 }
2690
2691 let retention = project_info
2692 .config
2693 .event_retention
2694 .unwrap_or(DEFAULT_EVENT_RETENTION);
2695
2696 store_forwarder.send(StoreMetrics {
2699 buckets,
2700 scoping,
2701 retention,
2702 });
2703 }
2704 }
2705
2706 fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2718 let FlushBuckets {
2719 partition_key,
2720 buckets,
2721 } = message;
2722
2723 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2724 let upstream = self.inner.config.upstream_descriptor();
2725
2726 for ProjectBuckets {
2727 buckets, scoping, ..
2728 } in buckets.values()
2729 {
2730 let dsn = PartialDsn::outbound(scoping, upstream);
2731
2732 relay_statsd::metric!(
2733 histogram(RelayHistograms::PartitionKeys) = u64::from(partition_key)
2734 );
2735
2736 let mut num_batches = 0;
2737 for batch in BucketsView::from(buckets).by_size(batch_size) {
2738 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2739
2740 let mut item = Item::new(ItemType::MetricBuckets);
2741 item.set_source_quantities(crate::metrics::extract_quantities(batch));
2742 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2743 envelope.add_item(item);
2744
2745 let mut envelope =
2746 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2747 envelope
2748 .set_partition_key(Some(partition_key))
2749 .scope(*scoping);
2750
2751 relay_statsd::metric!(
2752 histogram(RelayHistograms::BucketsPerBatch) = batch.len() as u64
2753 );
2754
2755 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2756 num_batches += 1;
2757 }
2758
2759 relay_statsd::metric!(histogram(RelayHistograms::BatchesPerPartition) = num_batches);
2760 }
2761 }
2762
2763 fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2765 if partition.is_empty() {
2766 return;
2767 }
2768
2769 let (unencoded, project_info) = partition.take();
2770 let http_encoding = self.inner.config.http_encoding();
2771 let encoded = match encode_payload(&unencoded, http_encoding) {
2772 Ok(payload) => payload,
2773 Err(error) => {
2774 let error = &error as &dyn std::error::Error;
2775 relay_log::error!(error, "failed to encode metrics payload");
2776 return;
2777 }
2778 };
2779
2780 let request = SendMetricsRequest {
2781 partition_key: partition_key.to_string(),
2782 unencoded,
2783 encoded,
2784 project_info,
2785 http_encoding,
2786 metric_outcomes: self.inner.metric_outcomes.clone(),
2787 };
2788
2789 self.inner.addrs.upstream_relay.send(SendRequest(request));
2790 }
2791
2792 fn encode_metrics_global(&self, message: FlushBuckets) {
2807 let FlushBuckets {
2808 partition_key,
2809 buckets,
2810 } = message;
2811
2812 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2813 let mut partition = Partition::new(batch_size);
2814 let mut partition_splits = 0;
2815
2816 for ProjectBuckets {
2817 buckets, scoping, ..
2818 } in buckets.values()
2819 {
2820 for bucket in buckets {
2821 let mut remaining = Some(BucketView::new(bucket));
2822
2823 while let Some(bucket) = remaining.take() {
2824 if let Some(next) = partition.insert(bucket, *scoping) {
2825 self.send_global_partition(partition_key, &mut partition);
2829 remaining = Some(next);
2830 partition_splits += 1;
2831 }
2832 }
2833 }
2834 }
2835
2836 if partition_splits > 0 {
2837 metric!(histogram(RelayHistograms::PartitionSplits) = partition_splits);
2838 }
2839
2840 self.send_global_partition(partition_key, &mut partition);
2841 }
2842
2843 async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2844 for (project_key, pb) in message.buckets.iter_mut() {
2845 let buckets = std::mem::take(&mut pb.buckets);
2846 pb.buckets =
2847 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2848 }
2849
2850 #[cfg(feature = "processing")]
2851 if self.inner.config.processing_enabled()
2852 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2853 {
2854 return self
2855 .encode_metrics_processing(message, store_forwarder)
2856 .await;
2857 }
2858
2859 if self.inner.config.http_global_metrics() {
2860 self.encode_metrics_global(message)
2861 } else {
2862 self.encode_metrics_envelope(cogs, message)
2863 }
2864 }
2865
2866 #[cfg(all(test, feature = "processing"))]
2867 fn redis_rate_limiter_enabled(&self) -> bool {
2868 self.inner.rate_limiter.is_some()
2869 }
2870
2871 async fn handle_message(&self, message: EnvelopeProcessor) {
2872 let ty = message.variant();
2873 let feature_weights = self.feature_weights(&message);
2874
2875 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2876 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2877
2878 match message {
2879 EnvelopeProcessor::ProcessEnvelope(m) => {
2880 self.handle_process_envelope(&mut cogs, *m).await
2881 }
2882 EnvelopeProcessor::ProcessProjectMetrics(m) => {
2883 self.handle_process_metrics(&mut cogs, *m)
2884 }
2885 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2886 self.handle_process_batched_metrics(&mut cogs, *m)
2887 }
2888 EnvelopeProcessor::FlushBuckets(m) => {
2889 self.handle_flush_buckets(&mut cogs, *m).await
2890 }
2891 EnvelopeProcessor::SubmitClientReports(m) => {
2892 self.handle_submit_client_reports(&mut cogs, *m)
2893 }
2894 }
2895 });
2896 }
2897
2898 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2899 match message {
2900 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2902 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2903 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2904 EnvelopeProcessor::FlushBuckets(v) => v
2905 .buckets
2906 .values()
2907 .map(|s| {
2908 if self.inner.config.processing_enabled() {
2909 relay_metrics::cogs::ByCount(&s.buckets).into()
2912 } else {
2913 relay_metrics::cogs::BySize(&s.buckets).into()
2914 }
2915 })
2916 .fold(FeatureWeights::none(), FeatureWeights::merge),
2917 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2918 }
2919 }
2920}
2921
2922impl Service for EnvelopeProcessorService {
2923 type Interface = EnvelopeProcessor;
2924
2925 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2926 while let Some(message) = rx.recv().await {
2927 let service = self.clone();
2928 self.inner
2929 .pool
2930 .spawn_async(
2931 async move {
2932 service.handle_message(message).await;
2933 }
2934 .boxed(),
2935 )
2936 .await;
2937 }
2938 }
2939}
2940
2941struct EnforcementResult {
2946 event: Annotated<Event>,
2947 #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2948 rate_limits: RateLimits,
2949}
2950
2951impl EnforcementResult {
2952 pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
2954 Self { event, rate_limits }
2955 }
2956}
2957
2958#[derive(Clone)]
2959enum RateLimiter {
2960 Cached,
2961 #[cfg(feature = "processing")]
2962 Consistent(Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>),
2963}
2964
2965impl RateLimiter {
2966 async fn enforce<Group>(
2967 &self,
2968 managed_envelope: &mut TypedEnvelope<Group>,
2969 event: Annotated<Event>,
2970 _extracted_metrics: &mut ProcessingExtractedMetrics,
2971 ctx: processing::Context<'_>,
2972 ) -> Result<EnforcementResult, ProcessingError> {
2973 if managed_envelope.envelope().is_empty() && event.value().is_none() {
2974 return Ok(EnforcementResult::new(event, RateLimits::default()));
2975 }
2976
2977 let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
2978 if quotas.is_empty() {
2979 return Ok(EnforcementResult::new(event, RateLimits::default()));
2980 }
2981
2982 let event_category = event_category(&event);
2983
2984 let this = self.clone();
2990 let mut envelope_limiter =
2991 EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
2992 let this = this.clone();
2993
2994 async move {
2995 match this {
2996 #[cfg(feature = "processing")]
2997 RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
2998 rate_limiter
2999 .is_rate_limited(quotas, item_scope, _quantity, false)
3000 .await?,
3001 ),
3002 _ => Ok::<_, ProcessingError>(
3003 ctx.rate_limits.check_with_quotas(quotas, item_scope),
3004 ),
3005 }
3006 }
3007 });
3008
3009 if let Some(category) = event_category {
3012 envelope_limiter.assume_event(category);
3013 }
3014
3015 let scoping = managed_envelope.scoping();
3016 let (enforcement, rate_limits) =
3017 metric!(timer(RelayTimers::EventProcessingRateLimiting), {
3018 envelope_limiter
3019 .compute(managed_envelope.envelope_mut(), &scoping)
3020 .await
3021 })?;
3022 let event_active = enforcement.is_event_active();
3023
3024 #[cfg(feature = "processing")]
3028 _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3029 enforcement.apply_with_outcomes(managed_envelope);
3030
3031 if event_active {
3032 debug_assert!(managed_envelope.envelope().is_empty());
3033 return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3034 }
3035
3036 Ok(EnforcementResult::new(event, rate_limits))
3037 }
3038}
3039
3040pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3041 let envelope_body: Vec<u8> = match http_encoding {
3042 HttpEncoding::Identity => return Ok(body.clone()),
3043 HttpEncoding::Deflate => {
3044 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3045 encoder.write_all(body.as_ref())?;
3046 encoder.finish()?
3047 }
3048 HttpEncoding::Gzip => {
3049 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3050 encoder.write_all(body.as_ref())?;
3051 encoder.finish()?
3052 }
3053 HttpEncoding::Br => {
3054 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3056 encoder.write_all(body.as_ref())?;
3057 encoder.into_inner()
3058 }
3059 HttpEncoding::Zstd => {
3060 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3063 encoder.write_all(body.as_ref())?;
3064 encoder.finish()?
3065 }
3066 };
3067
3068 Ok(envelope_body.into())
3069}
3070
3071#[derive(Debug)]
3073pub struct SendEnvelope {
3074 pub envelope: TypedEnvelope<Processed>,
3075 pub body: Bytes,
3076 pub http_encoding: HttpEncoding,
3077 pub project_cache: ProjectCacheHandle,
3078}
3079
3080impl UpstreamRequest for SendEnvelope {
3081 fn method(&self) -> reqwest::Method {
3082 reqwest::Method::POST
3083 }
3084
3085 fn path(&self) -> Cow<'_, str> {
3086 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3087 }
3088
3089 fn route(&self) -> &'static str {
3090 "envelope"
3091 }
3092
3093 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3094 let envelope_body = self.body.clone();
3095 metric!(histogram(RelayHistograms::UpstreamEnvelopeBodySize) = envelope_body.len() as u64);
3096
3097 let meta = &self.envelope.meta();
3098 let shard = self.envelope.partition_key().map(|p| p.to_string());
3099 builder
3100 .content_encoding(self.http_encoding)
3101 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3102 .header_opt("User-Agent", meta.user_agent())
3103 .header("X-Sentry-Auth", meta.auth_header())
3104 .header("X-Forwarded-For", meta.forwarded_for())
3105 .header("Content-Type", envelope::CONTENT_TYPE)
3106 .header_opt("X-Sentry-Relay-Shard", shard)
3107 .body(envelope_body);
3108
3109 Ok(())
3110 }
3111
3112 fn sign(&mut self) -> Option<Sign> {
3113 Some(Sign::Optional(SignatureType::RequestSign))
3114 }
3115
3116 fn respond(
3117 self: Box<Self>,
3118 result: Result<http::Response, UpstreamRequestError>,
3119 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3120 Box::pin(async move {
3121 let result = match result {
3122 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3123 Err(error) => Err(error),
3124 };
3125
3126 match result {
3127 Ok(()) => self.envelope.accept(),
3128 Err(error) if error.is_received() => {
3129 let scoping = self.envelope.scoping();
3130 self.envelope.accept();
3131
3132 if let UpstreamRequestError::RateLimited(limits) = error {
3133 self.project_cache
3134 .get(scoping.project_key)
3135 .rate_limits()
3136 .merge(limits.scope(&scoping));
3137 }
3138 }
3139 Err(error) => {
3140 let mut envelope = self.envelope;
3143 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3144 relay_log::error!(
3145 error = &error as &dyn Error,
3146 tags.project_key = %envelope.scoping().project_key,
3147 "error sending envelope"
3148 );
3149 }
3150 }
3151 })
3152 }
3153}
3154
3155#[derive(Debug)]
3162struct Partition<'a> {
3163 max_size: usize,
3164 remaining: usize,
3165 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3166 project_info: HashMap<ProjectKey, Scoping>,
3167}
3168
3169impl<'a> Partition<'a> {
3170 pub fn new(size: usize) -> Self {
3172 Self {
3173 max_size: size,
3174 remaining: size,
3175 views: HashMap::new(),
3176 project_info: HashMap::new(),
3177 }
3178 }
3179
3180 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3191 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3192
3193 if let Some(current) = current {
3194 self.remaining = self.remaining.saturating_sub(current.estimated_size());
3195 self.views
3196 .entry(scoping.project_key)
3197 .or_default()
3198 .push(current);
3199
3200 self.project_info
3201 .entry(scoping.project_key)
3202 .or_insert(scoping);
3203 }
3204
3205 next
3206 }
3207
3208 fn is_empty(&self) -> bool {
3210 self.views.is_empty()
3211 }
3212
3213 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3217 #[derive(serde::Serialize)]
3218 struct Wrapper<'a> {
3219 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3220 }
3221
3222 let buckets = &self.views;
3223 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3224
3225 let scopings = self.project_info.clone();
3226 self.project_info.clear();
3227
3228 self.views.clear();
3229 self.remaining = self.max_size;
3230
3231 (payload, scopings)
3232 }
3233}
3234
3235#[derive(Debug)]
3239struct SendMetricsRequest {
3240 partition_key: String,
3242 unencoded: Bytes,
3244 encoded: Bytes,
3246 project_info: HashMap<ProjectKey, Scoping>,
3250 http_encoding: HttpEncoding,
3252 metric_outcomes: MetricOutcomes,
3254}
3255
3256impl SendMetricsRequest {
3257 fn create_error_outcomes(self) {
3258 #[derive(serde::Deserialize)]
3259 struct Wrapper {
3260 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3261 }
3262
3263 let buckets = match serde_json::from_slice(&self.unencoded) {
3264 Ok(Wrapper { buckets }) => buckets,
3265 Err(err) => {
3266 relay_log::error!(
3267 error = &err as &dyn std::error::Error,
3268 "failed to parse buckets from failed transmission"
3269 );
3270 return;
3271 }
3272 };
3273
3274 for (key, buckets) in buckets {
3275 let Some(&scoping) = self.project_info.get(&key) else {
3276 relay_log::error!("missing scoping for project key");
3277 continue;
3278 };
3279
3280 self.metric_outcomes.track(
3281 scoping,
3282 &buckets,
3283 Outcome::Invalid(DiscardReason::Internal),
3284 );
3285 }
3286 }
3287}
3288
3289impl UpstreamRequest for SendMetricsRequest {
3290 fn set_relay_id(&self) -> bool {
3291 true
3292 }
3293
3294 fn sign(&mut self) -> Option<Sign> {
3295 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3296 }
3297
3298 fn method(&self) -> reqwest::Method {
3299 reqwest::Method::POST
3300 }
3301
3302 fn path(&self) -> Cow<'_, str> {
3303 "/api/0/relays/metrics/".into()
3304 }
3305
3306 fn route(&self) -> &'static str {
3307 "global_metrics"
3308 }
3309
3310 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3311 metric!(histogram(RelayHistograms::UpstreamMetricsBodySize) = self.encoded.len() as u64);
3312
3313 builder
3314 .content_encoding(self.http_encoding)
3315 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3316 .header(header::CONTENT_TYPE, b"application/json")
3317 .body(self.encoded.clone());
3318
3319 Ok(())
3320 }
3321
3322 fn respond(
3323 self: Box<Self>,
3324 result: Result<http::Response, UpstreamRequestError>,
3325 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3326 Box::pin(async {
3327 match result {
3328 Ok(mut response) => {
3329 response.consume().await.ok();
3330 }
3331 Err(error) => {
3332 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3333
3334 if error.is_received() {
3337 return;
3338 }
3339
3340 self.create_error_outcomes()
3341 }
3342 }
3343 })
3344 }
3345}
3346
3347#[derive(Copy, Clone, Debug)]
3349struct CombinedQuotas<'a> {
3350 global_quotas: &'a [Quota],
3351 project_quotas: &'a [Quota],
3352}
3353
3354impl<'a> CombinedQuotas<'a> {
3355 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3357 Self {
3358 global_quotas: &global_config.quotas,
3359 project_quotas,
3360 }
3361 }
3362
3363 pub fn is_empty(&self) -> bool {
3365 self.len() == 0
3366 }
3367
3368 pub fn len(&self) -> usize {
3370 self.global_quotas.len() + self.project_quotas.len()
3371 }
3372}
3373
3374impl<'a> IntoIterator for CombinedQuotas<'a> {
3375 type Item = &'a Quota;
3376 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3377
3378 fn into_iter(self) -> Self::IntoIter {
3379 self.global_quotas.iter().chain(self.project_quotas.iter())
3380 }
3381}
3382
3383#[cfg(test)]
3384mod tests {
3385 use std::collections::BTreeMap;
3386
3387 use insta::assert_debug_snapshot;
3388 use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3389 use relay_common::glob2::LazyGlob;
3390 use relay_dynamic_config::ProjectConfig;
3391 use relay_event_normalization::{
3392 MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3393 TransactionNameRule,
3394 };
3395 use relay_event_schema::protocol::TransactionSource;
3396 use relay_pii::DataScrubbingConfig;
3397 use similar_asserts::assert_eq;
3398
3399 use crate::metrics_extraction::IntoMetric;
3400 use crate::metrics_extraction::transactions::types::{
3401 CommonTags, TransactionMeasurementTags, TransactionMetric,
3402 };
3403 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3404
3405 #[cfg(feature = "processing")]
3406 use {
3407 relay_metrics::BucketValue,
3408 relay_quotas::{QuotaScope, ReasonCode},
3409 relay_test::mock_service,
3410 };
3411
3412 use super::*;
3413
3414 #[cfg(feature = "processing")]
3415 fn mock_quota(id: &str) -> Quota {
3416 Quota {
3417 id: Some(id.into()),
3418 categories: smallvec::smallvec![DataCategory::MetricBucket],
3419 scope: QuotaScope::Organization,
3420 scope_id: None,
3421 limit: Some(0),
3422 window: None,
3423 reason_code: None,
3424 namespace: None,
3425 }
3426 }
3427
3428 #[cfg(feature = "processing")]
3429 #[test]
3430 fn test_dynamic_quotas() {
3431 let global_config = GlobalConfig {
3432 quotas: vec![mock_quota("foo"), mock_quota("bar")],
3433 ..Default::default()
3434 };
3435
3436 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3437
3438 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3439
3440 assert_eq!(dynamic_quotas.len(), 4);
3441 assert!(!dynamic_quotas.is_empty());
3442
3443 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3444 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3445 }
3446
3447 #[cfg(feature = "processing")]
3450 #[tokio::test]
3451 async fn test_ratelimit_per_batch() {
3452 use relay_base_schema::organization::OrganizationId;
3453 use relay_protocol::FiniteF64;
3454
3455 let rate_limited_org = Scoping {
3456 organization_id: OrganizationId::new(1),
3457 project_id: ProjectId::new(21),
3458 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3459 key_id: Some(17),
3460 };
3461
3462 let not_rate_limited_org = Scoping {
3463 organization_id: OrganizationId::new(2),
3464 project_id: ProjectId::new(21),
3465 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3466 key_id: Some(17),
3467 };
3468
3469 let message = {
3470 let project_info = {
3471 let quota = Quota {
3472 id: Some("testing".into()),
3473 categories: vec![DataCategory::MetricBucket].into(),
3474 scope: relay_quotas::QuotaScope::Organization,
3475 scope_id: Some(rate_limited_org.organization_id.to_string()),
3476 limit: Some(0),
3477 window: None,
3478 reason_code: Some(ReasonCode::new("test")),
3479 namespace: None,
3480 };
3481
3482 let mut config = ProjectConfig::default();
3483 config.quotas.push(quota);
3484
3485 Arc::new(ProjectInfo {
3486 config,
3487 ..Default::default()
3488 })
3489 };
3490
3491 let project_metrics = |scoping| ProjectBuckets {
3492 buckets: vec![Bucket {
3493 name: "d:transactions/bar".into(),
3494 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3495 timestamp: UnixTimestamp::now(),
3496 tags: Default::default(),
3497 width: 10,
3498 metadata: BucketMetadata::default(),
3499 }],
3500 rate_limits: Default::default(),
3501 project_info: project_info.clone(),
3502 scoping,
3503 };
3504
3505 let buckets = hashbrown::HashMap::from([
3506 (
3507 rate_limited_org.project_key,
3508 project_metrics(rate_limited_org),
3509 ),
3510 (
3511 not_rate_limited_org.project_key,
3512 project_metrics(not_rate_limited_org),
3513 ),
3514 ]);
3515
3516 FlushBuckets {
3517 partition_key: 0,
3518 buckets,
3519 }
3520 };
3521
3522 assert_eq!(message.buckets.keys().count(), 2);
3524
3525 let config = {
3526 let config_json = serde_json::json!({
3527 "processing": {
3528 "enabled": true,
3529 "kafka_config": [],
3530 "redis": {
3531 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3532 }
3533 }
3534 });
3535 Config::from_json_value(config_json).unwrap()
3536 };
3537
3538 let (store, handle) = {
3539 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3540 let org_id = match msg {
3541 Store::Metrics(x) => x.scoping.organization_id,
3542 _ => panic!("received envelope when expecting only metrics"),
3543 };
3544 org_ids.push(org_id);
3545 };
3546
3547 mock_service("store_forwarder", vec![], f)
3548 };
3549
3550 let processor = create_test_processor(config).await;
3551 assert!(processor.redis_rate_limiter_enabled());
3552
3553 processor.encode_metrics_processing(message, &store).await;
3554
3555 drop(store);
3556 let orgs_not_ratelimited = handle.await.unwrap();
3557
3558 assert_eq!(
3559 orgs_not_ratelimited,
3560 vec![not_rate_limited_org.organization_id]
3561 );
3562 }
3563
3564 #[tokio::test]
3565 async fn test_browser_version_extraction_with_pii_like_data() {
3566 let processor = create_test_processor(Default::default()).await;
3567 let outcome_aggregator = Addr::dummy();
3568 let event_id = EventId::new();
3569
3570 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3571 .parse()
3572 .unwrap();
3573
3574 let request_meta = RequestMeta::new(dsn);
3575 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3576
3577 envelope.add_item({
3578 let mut item = Item::new(ItemType::Event);
3579 item.set_payload(
3580 ContentType::Json,
3581 r#"
3582 {
3583 "request": {
3584 "headers": [
3585 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3586 ]
3587 }
3588 }
3589 "#,
3590 );
3591 item
3592 });
3593
3594 let mut datascrubbing_settings = DataScrubbingConfig::default();
3595 datascrubbing_settings.scrub_data = true;
3597 datascrubbing_settings.scrub_defaults = true;
3598 datascrubbing_settings.scrub_ip_addresses = true;
3599
3600 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3602
3603 let config = ProjectConfig {
3604 datascrubbing_settings,
3605 pii_config: Some(pii_config),
3606 ..Default::default()
3607 };
3608
3609 let project_info = ProjectInfo {
3610 config,
3611 ..Default::default()
3612 };
3613
3614 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3615 assert_eq!(envelopes.len(), 1);
3616
3617 let (group, envelope) = envelopes.pop().unwrap();
3618 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3619
3620 let message = ProcessEnvelopeGrouped {
3621 group,
3622 envelope,
3623 ctx: processing::Context {
3624 project_info: &project_info,
3625 ..processing::Context::for_test()
3626 },
3627 reservoir_counters: &ReservoirCounters::default(),
3628 };
3629
3630 let Ok(Some(Submit::Envelope(mut new_envelope))) =
3631 processor.process(&mut Token::noop(), message).await
3632 else {
3633 panic!();
3634 };
3635 let new_envelope = new_envelope.envelope_mut();
3636
3637 let event_item = new_envelope.items().last().unwrap();
3638 let annotated_event: Annotated<Event> =
3639 Annotated::from_json_bytes(&event_item.payload()).unwrap();
3640 let event = annotated_event.into_value().unwrap();
3641 let headers = event
3642 .request
3643 .into_value()
3644 .unwrap()
3645 .headers
3646 .into_value()
3647 .unwrap();
3648
3649 assert_eq!(
3651 Some(
3652 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3653 ),
3654 headers.get_header("User-Agent")
3655 );
3656 let contexts = event.contexts.into_value().unwrap();
3658 let browser = contexts.0.get("browser").unwrap();
3659 assert_eq!(
3660 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3661 browser.to_json().unwrap()
3662 );
3663 }
3664
3665 #[tokio::test]
3666 #[cfg(feature = "processing")]
3667 async fn test_materialize_dsc() {
3668 use crate::services::projects::project::PublicKeyConfig;
3669
3670 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3671 .parse()
3672 .unwrap();
3673 let request_meta = RequestMeta::new(dsn);
3674 let mut envelope = Envelope::from_request(None, request_meta);
3675
3676 let dsc = r#"{
3677 "trace_id": "00000000-0000-0000-0000-000000000000",
3678 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3679 "sample_rate": "0.2"
3680 }"#;
3681 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3682
3683 let mut item = Item::new(ItemType::Event);
3684 item.set_payload(ContentType::Json, r#"{}"#);
3685 envelope.add_item(item);
3686
3687 let outcome_aggregator = Addr::dummy();
3688 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3689
3690 let mut project_info = ProjectInfo::default();
3691 project_info.public_keys.push(PublicKeyConfig {
3692 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3693 numeric_id: Some(1),
3694 });
3695
3696 let config = serde_json::json!({
3697 "processing": {
3698 "enabled": true,
3699 "kafka_config": [],
3700 }
3701 });
3702
3703 let message = ProcessEnvelopeGrouped {
3704 group: ProcessingGroup::Transaction,
3705 envelope: managed_envelope,
3706 ctx: processing::Context {
3707 config: &Config::from_json_value(config.clone()).unwrap(),
3708 project_info: &project_info,
3709 sampling_project_info: Some(&project_info),
3710 ..processing::Context::for_test()
3711 },
3712 reservoir_counters: &ReservoirCounters::default(),
3713 };
3714
3715 let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3716 let Ok(Some(Submit::Envelope(envelope))) =
3717 processor.process(&mut Token::noop(), message).await
3718 else {
3719 panic!();
3720 };
3721 let event = envelope
3722 .envelope()
3723 .get_item_by(|item| item.ty() == &ItemType::Event)
3724 .unwrap();
3725
3726 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3727 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3728 Object(
3729 {
3730 "environment": ~,
3731 "public_key": String(
3732 "e12d836b15bb49d7bbf99e64295d995b",
3733 ),
3734 "release": ~,
3735 "replay_id": ~,
3736 "sample_rate": String(
3737 "0.2",
3738 ),
3739 "trace_id": String(
3740 "00000000000000000000000000000000",
3741 ),
3742 "transaction": ~,
3743 },
3744 )
3745 "###);
3746 }
3747
3748 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3749 let mut event = Annotated::<Event>::from_json(
3750 r#"
3751 {
3752 "type": "transaction",
3753 "transaction": "/foo/",
3754 "timestamp": 946684810.0,
3755 "start_timestamp": 946684800.0,
3756 "contexts": {
3757 "trace": {
3758 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3759 "span_id": "fa90fdead5f74053",
3760 "op": "http.server",
3761 "type": "trace"
3762 }
3763 },
3764 "transaction_info": {
3765 "source": "url"
3766 }
3767 }
3768 "#,
3769 )
3770 .unwrap();
3771 let e = event.value_mut().as_mut().unwrap();
3772 e.transaction.set_value(Some(transaction_name.into()));
3773
3774 e.transaction_info
3775 .value_mut()
3776 .as_mut()
3777 .unwrap()
3778 .source
3779 .set_value(Some(source));
3780
3781 relay_statsd::with_capturing_test_client(|| {
3782 utils::log_transaction_name_metrics(&mut event, |event| {
3783 let config = NormalizationConfig {
3784 transaction_name_config: TransactionNameConfig {
3785 rules: &[TransactionNameRule {
3786 pattern: LazyGlob::new("/foo/*/**".to_owned()),
3787 expiry: DateTime::<Utc>::MAX_UTC,
3788 redaction: RedactionRule::Replace {
3789 substitution: "*".to_owned(),
3790 },
3791 }],
3792 },
3793 ..Default::default()
3794 };
3795 relay_event_normalization::normalize_event(event, &config)
3796 });
3797 })
3798 }
3799
3800 #[test]
3801 fn test_log_transaction_metrics_none() {
3802 let captures = capture_test_event("/nothing", TransactionSource::Url);
3803 insta::assert_debug_snapshot!(captures, @r###"
3804 [
3805 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3806 ]
3807 "###);
3808 }
3809
3810 #[test]
3811 fn test_log_transaction_metrics_rule() {
3812 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3813 insta::assert_debug_snapshot!(captures, @r###"
3814 [
3815 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3816 ]
3817 "###);
3818 }
3819
3820 #[test]
3821 fn test_log_transaction_metrics_pattern() {
3822 let captures = capture_test_event("/something/12345", TransactionSource::Url);
3823 insta::assert_debug_snapshot!(captures, @r###"
3824 [
3825 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3826 ]
3827 "###);
3828 }
3829
3830 #[test]
3831 fn test_log_transaction_metrics_both() {
3832 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3833 insta::assert_debug_snapshot!(captures, @r###"
3834 [
3835 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3836 ]
3837 "###);
3838 }
3839
3840 #[test]
3841 fn test_log_transaction_metrics_no_match() {
3842 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3843 insta::assert_debug_snapshot!(captures, @r###"
3844 [
3845 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3846 ]
3847 "###);
3848 }
3849
3850 #[test]
3854 fn test_mri_overhead_constant() {
3855 let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
3856
3857 let derived_value = {
3858 let name = "foobar".to_owned();
3859 let value = 5.into(); let unit = MetricUnit::Duration(DurationUnit::default());
3861 let tags = TransactionMeasurementTags {
3862 measurement_rating: None,
3863 universal_tags: CommonTags(BTreeMap::new()),
3864 score_profile_version: None,
3865 };
3866
3867 let measurement = TransactionMetric::Measurement {
3868 name: name.clone(),
3869 value,
3870 unit,
3871 tags,
3872 };
3873
3874 let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
3875 metric.name.len() - unit.to_string().len() - name.len()
3876 };
3877 assert_eq!(
3878 hardcoded_value, derived_value,
3879 "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
3880 );
3881 }
3882
3883 #[tokio::test]
3884 async fn test_process_metrics_bucket_metadata() {
3885 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3886 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3887 let received_at = Utc::now();
3888 let config = Config::default();
3889
3890 let (aggregator, mut aggregator_rx) = Addr::custom();
3891 let processor = create_test_processor_with_addrs(
3892 config,
3893 Addrs {
3894 aggregator,
3895 ..Default::default()
3896 },
3897 )
3898 .await;
3899
3900 let mut item = Item::new(ItemType::Statsd);
3901 item.set_payload(
3902 ContentType::Text,
3903 "transactions/foo:3182887624:4267882815|s",
3904 );
3905 for (source, expected_received_at) in [
3906 (
3907 BucketSource::External,
3908 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3909 ),
3910 (BucketSource::Internal, None),
3911 ] {
3912 let message = ProcessMetrics {
3913 data: MetricData::Raw(vec![item.clone()]),
3914 project_key,
3915 source,
3916 received_at,
3917 sent_at: Some(Utc::now()),
3918 };
3919 processor.handle_process_metrics(&mut token, message);
3920
3921 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3922 let buckets = merge_buckets.buckets;
3923 assert_eq!(buckets.len(), 1);
3924 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3925 }
3926 }
3927
3928 #[tokio::test]
3929 async fn test_process_batched_metrics() {
3930 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3931 let received_at = Utc::now();
3932 let config = Config::default();
3933
3934 let (aggregator, mut aggregator_rx) = Addr::custom();
3935 let processor = create_test_processor_with_addrs(
3936 config,
3937 Addrs {
3938 aggregator,
3939 ..Default::default()
3940 },
3941 )
3942 .await;
3943
3944 let payload = r#"{
3945 "buckets": {
3946 "11111111111111111111111111111111": [
3947 {
3948 "timestamp": 1615889440,
3949 "width": 0,
3950 "name": "d:custom/endpoint.response_time@millisecond",
3951 "type": "d",
3952 "value": [
3953 68.0
3954 ],
3955 "tags": {
3956 "route": "user_index"
3957 }
3958 }
3959 ],
3960 "22222222222222222222222222222222": [
3961 {
3962 "timestamp": 1615889440,
3963 "width": 0,
3964 "name": "d:custom/endpoint.cache_rate@none",
3965 "type": "d",
3966 "value": [
3967 36.0
3968 ]
3969 }
3970 ]
3971 }
3972}
3973"#;
3974 let message = ProcessBatchedMetrics {
3975 payload: Bytes::from(payload),
3976 source: BucketSource::Internal,
3977 received_at,
3978 sent_at: Some(Utc::now()),
3979 };
3980 processor.handle_process_batched_metrics(&mut token, message);
3981
3982 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3983 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3984
3985 let mut messages = vec![mb1, mb2];
3986 messages.sort_by_key(|mb| mb.project_key);
3987
3988 let actual = messages
3989 .into_iter()
3990 .map(|mb| (mb.project_key, mb.buckets))
3991 .collect::<Vec<_>>();
3992
3993 assert_debug_snapshot!(actual, @r###"
3994 [
3995 (
3996 ProjectKey("11111111111111111111111111111111"),
3997 [
3998 Bucket {
3999 timestamp: UnixTimestamp(1615889440),
4000 width: 0,
4001 name: MetricName(
4002 "d:custom/endpoint.response_time@millisecond",
4003 ),
4004 value: Distribution(
4005 [
4006 68.0,
4007 ],
4008 ),
4009 tags: {
4010 "route": "user_index",
4011 },
4012 metadata: BucketMetadata {
4013 merges: 1,
4014 received_at: None,
4015 extracted_from_indexed: false,
4016 },
4017 },
4018 ],
4019 ),
4020 (
4021 ProjectKey("22222222222222222222222222222222"),
4022 [
4023 Bucket {
4024 timestamp: UnixTimestamp(1615889440),
4025 width: 0,
4026 name: MetricName(
4027 "d:custom/endpoint.cache_rate@none",
4028 ),
4029 value: Distribution(
4030 [
4031 36.0,
4032 ],
4033 ),
4034 tags: {},
4035 metadata: BucketMetadata {
4036 merges: 1,
4037 received_at: None,
4038 extracted_from_indexed: false,
4039 },
4040 },
4041 ],
4042 ),
4043 ]
4044 "###);
4045 }
4046}