1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27 ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42 self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::{Integration, SpansIntegration};
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::ExtractedMetrics;
49use crate::metrics_extraction::transactions::types::ExtractMetricsError;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::sessions::SessionsProcessor;
53use crate::processing::spans::SpansProcessor;
54use crate::processing::trace_metrics::TraceMetricsProcessor;
55use crate::processing::transactions::extraction::ExtractMetricsContext;
56use crate::processing::utils::event::{
57 EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
58 event_type,
59};
60use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
61use crate::service::ServiceError;
62use crate::services::global_config::GlobalConfigHandle;
63use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
64use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
65use crate::services::projects::cache::ProjectCacheHandle;
66use crate::services::projects::project::{ProjectInfo, ProjectState};
67use crate::services::upstream::{
68 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
69};
70use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
71use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
72use crate::{http, processing};
73use relay_threading::AsyncPool;
74#[cfg(feature = "processing")]
75use {
76 crate::services::global_rate_limits::{GlobalRateLimits, GlobalRateLimitsServiceHandle},
77 crate::services::processor::nnswitch::SwitchProcessingError,
78 crate::services::store::{Store, StoreEnvelope},
79 crate::services::upload::UploadAttachments,
80 crate::utils::Enforcement,
81 itertools::Itertools,
82 relay_cardinality::{
83 CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
84 RedisSetLimiterOptions,
85 },
86 relay_dynamic_config::CardinalityLimiterMode,
87 relay_quotas::{RateLimitingError, RedisRateLimiter},
88 relay_redis::{AsyncRedisClient, RedisClients},
89 std::time::Instant,
90 symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
91};
92
93mod attachment;
94mod dynamic_sampling;
95mod event;
96mod metrics;
97mod nel;
98mod profile;
99mod profile_chunk;
100mod replay;
101mod report;
102mod span;
103
104#[cfg(all(sentry, feature = "processing"))]
105mod playstation;
106mod standalone;
107#[cfg(feature = "processing")]
108mod unreal;
109
110#[cfg(feature = "processing")]
111mod nnswitch;
112
113macro_rules! if_processing {
117 ($config:expr, $if_true:block) => {
118 #[cfg(feature = "processing")] {
119 if $config.processing_enabled() $if_true
120 }
121 };
122 ($config:expr, $if_true:block else $if_false:block) => {
123 {
124 #[cfg(feature = "processing")] {
125 if $config.processing_enabled() $if_true else $if_false
126 }
127 #[cfg(not(feature = "processing"))] {
128 $if_false
129 }
130 }
131 };
132}
133
134pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
136
137#[derive(Debug)]
138pub struct GroupTypeError;
139
140impl Display for GroupTypeError {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 f.write_str("failed to convert processing group into corresponding type")
143 }
144}
145
146impl std::error::Error for GroupTypeError {}
147
148macro_rules! processing_group {
149 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
150 #[derive(Clone, Copy, Debug)]
151 pub struct $ty;
152
153 impl From<$ty> for ProcessingGroup {
154 fn from(_: $ty) -> Self {
155 ProcessingGroup::$variant
156 }
157 }
158
159 impl TryFrom<ProcessingGroup> for $ty {
160 type Error = GroupTypeError;
161
162 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
163 if matches!(value, ProcessingGroup::$variant) {
164 return Ok($ty);
165 }
166 $($(
167 if matches!(value, ProcessingGroup::$other) {
168 return Ok($ty);
169 }
170 )+)?
171 return Err(GroupTypeError);
172 }
173 }
174 };
175}
176
177pub trait EventProcessing {}
181
182processing_group!(TransactionGroup, Transaction);
183impl EventProcessing for TransactionGroup {}
184
185processing_group!(ErrorGroup, Error);
186impl EventProcessing for ErrorGroup {}
187
188processing_group!(SessionGroup, Session);
189processing_group!(StandaloneGroup, Standalone);
190processing_group!(ClientReportGroup, ClientReport);
191processing_group!(ReplayGroup, Replay);
192processing_group!(CheckInGroup, CheckIn);
193processing_group!(LogGroup, Log, Nel);
194processing_group!(TraceMetricGroup, TraceMetric);
195processing_group!(SpanGroup, Span);
196
197processing_group!(ProfileChunkGroup, ProfileChunk);
198processing_group!(MetricsGroup, Metrics);
199processing_group!(ForwardUnknownGroup, ForwardUnknown);
200processing_group!(Ungrouped, Ungrouped);
201
202#[derive(Clone, Copy, Debug)]
206pub struct Processed;
207
208#[derive(Clone, Copy, Debug)]
210pub enum ProcessingGroup {
211 Transaction,
215 Error,
220 Session,
222 Standalone,
225 ClientReport,
227 Replay,
229 CheckIn,
231 Nel,
233 Log,
235 TraceMetric,
237 Span,
239 SpanV2,
241 Metrics,
243 ProfileChunk,
245 ForwardUnknown,
248 Ungrouped,
250}
251
252impl ProcessingGroup {
253 fn split_envelope(
255 mut envelope: Envelope,
256 project_info: &ProjectInfo,
257 ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
258 let headers = envelope.headers().clone();
259 let mut grouped_envelopes = smallvec![];
260
261 let replay_items = envelope.take_items_by(|item| {
263 matches!(
264 item.ty(),
265 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
266 )
267 });
268 if !replay_items.is_empty() {
269 grouped_envelopes.push((
270 ProcessingGroup::Replay,
271 Envelope::from_parts(headers.clone(), replay_items),
272 ))
273 }
274
275 let session_items = envelope
277 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
278 if !session_items.is_empty() {
279 grouped_envelopes.push((
280 ProcessingGroup::Session,
281 Envelope::from_parts(headers.clone(), session_items),
282 ))
283 }
284
285 let span_v2_items = envelope.take_items_by(|item| {
286 let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
287 let is_supported_integration = matches!(
288 item.integration(),
289 Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
290 );
291 let is_span = matches!(item.ty(), &ItemType::Span);
292
293 ItemContainer::<SpanV2>::is_container(item)
294 || (exp_feature && is_span)
295 || (exp_feature && is_supported_integration)
296 });
297
298 if !span_v2_items.is_empty() {
299 grouped_envelopes.push((
300 ProcessingGroup::SpanV2,
301 Envelope::from_parts(headers.clone(), span_v2_items),
302 ))
303 }
304
305 let span_items = envelope.take_items_by(|item| {
307 matches!(item.ty(), &ItemType::Span)
308 || matches!(item.integration(), Some(Integration::Spans(_)))
309 });
310
311 if !span_items.is_empty() {
312 grouped_envelopes.push((
313 ProcessingGroup::Span,
314 Envelope::from_parts(headers.clone(), span_items),
315 ))
316 }
317
318 let logs_items = envelope.take_items_by(|item| {
320 matches!(item.ty(), &ItemType::Log)
321 || matches!(item.integration(), Some(Integration::Logs(_)))
322 });
323 if !logs_items.is_empty() {
324 grouped_envelopes.push((
325 ProcessingGroup::Log,
326 Envelope::from_parts(headers.clone(), logs_items),
327 ))
328 }
329
330 let trace_metric_items =
332 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
333 if !trace_metric_items.is_empty() {
334 grouped_envelopes.push((
335 ProcessingGroup::TraceMetric,
336 Envelope::from_parts(headers.clone(), trace_metric_items),
337 ))
338 }
339
340 let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
342 if !nel_items.is_empty() {
343 grouped_envelopes.push((
344 ProcessingGroup::Nel,
345 Envelope::from_parts(headers.clone(), nel_items),
346 ))
347 }
348
349 let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
354 if !metric_items.is_empty() {
355 grouped_envelopes.push((
356 ProcessingGroup::Metrics,
357 Envelope::from_parts(headers.clone(), metric_items),
358 ))
359 }
360
361 let profile_chunk_items =
363 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
364 if !profile_chunk_items.is_empty() {
365 grouped_envelopes.push((
366 ProcessingGroup::ProfileChunk,
367 Envelope::from_parts(headers.clone(), profile_chunk_items),
368 ))
369 }
370
371 if !envelope.items().any(Item::creates_event) {
376 let standalone_items = envelope.take_items_by(Item::requires_event);
377 if !standalone_items.is_empty() {
378 grouped_envelopes.push((
379 ProcessingGroup::Standalone,
380 Envelope::from_parts(headers.clone(), standalone_items),
381 ))
382 }
383 };
384
385 let security_reports_items = envelope
387 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
388 .into_iter()
389 .map(|item| {
390 let headers = headers.clone();
391 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
392 let mut envelope = Envelope::from_parts(headers, items);
393 envelope.set_event_id(EventId::new());
394 (ProcessingGroup::Error, envelope)
395 });
396 grouped_envelopes.extend(security_reports_items);
397
398 let require_event_items = envelope.take_items_by(Item::requires_event);
400 if !require_event_items.is_empty() {
401 let group = if require_event_items
402 .iter()
403 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
404 {
405 ProcessingGroup::Transaction
406 } else {
407 ProcessingGroup::Error
408 };
409
410 grouped_envelopes.push((
411 group,
412 Envelope::from_parts(headers.clone(), require_event_items),
413 ))
414 }
415
416 let envelopes = envelope.items_mut().map(|item| {
418 let headers = headers.clone();
419 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
420 let envelope = Envelope::from_parts(headers, items);
421 let item_type = item.ty();
422 let group = if matches!(item_type, &ItemType::CheckIn) {
423 ProcessingGroup::CheckIn
424 } else if matches!(item.ty(), &ItemType::ClientReport) {
425 ProcessingGroup::ClientReport
426 } else if matches!(item_type, &ItemType::Unknown(_)) {
427 ProcessingGroup::ForwardUnknown
428 } else {
429 ProcessingGroup::Ungrouped
431 };
432
433 (group, envelope)
434 });
435 grouped_envelopes.extend(envelopes);
436
437 grouped_envelopes
438 }
439
440 pub fn variant(&self) -> &'static str {
442 match self {
443 ProcessingGroup::Transaction => "transaction",
444 ProcessingGroup::Error => "error",
445 ProcessingGroup::Session => "session",
446 ProcessingGroup::Standalone => "standalone",
447 ProcessingGroup::ClientReport => "client_report",
448 ProcessingGroup::Replay => "replay",
449 ProcessingGroup::CheckIn => "check_in",
450 ProcessingGroup::Log => "log",
451 ProcessingGroup::TraceMetric => "trace_metric",
452 ProcessingGroup::Nel => "nel",
453 ProcessingGroup::Span => "span",
454 ProcessingGroup::SpanV2 => "span_v2",
455 ProcessingGroup::Metrics => "metrics",
456 ProcessingGroup::ProfileChunk => "profile_chunk",
457 ProcessingGroup::ForwardUnknown => "forward_unknown",
458 ProcessingGroup::Ungrouped => "ungrouped",
459 }
460 }
461}
462
463impl From<ProcessingGroup> for AppFeature {
464 fn from(value: ProcessingGroup) -> Self {
465 match value {
466 ProcessingGroup::Transaction => AppFeature::Transactions,
467 ProcessingGroup::Error => AppFeature::Errors,
468 ProcessingGroup::Session => AppFeature::Sessions,
469 ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
470 ProcessingGroup::ClientReport => AppFeature::ClientReports,
471 ProcessingGroup::Replay => AppFeature::Replays,
472 ProcessingGroup::CheckIn => AppFeature::CheckIns,
473 ProcessingGroup::Log => AppFeature::Logs,
474 ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
475 ProcessingGroup::Nel => AppFeature::Logs,
476 ProcessingGroup::Span => AppFeature::Spans,
477 ProcessingGroup::SpanV2 => AppFeature::Spans,
478 ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
479 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
480 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
481 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
482 }
483 }
484}
485
486#[derive(Debug, thiserror::Error)]
488pub enum ProcessingError {
489 #[error("invalid json in event")]
490 InvalidJson(#[source] serde_json::Error),
491
492 #[error("invalid message pack event payload")]
493 InvalidMsgpack(#[from] rmp_serde::decode::Error),
494
495 #[cfg(feature = "processing")]
496 #[error("invalid unreal crash report")]
497 InvalidUnrealReport(#[source] Unreal4Error),
498
499 #[error("event payload too large")]
500 PayloadTooLarge(DiscardItemType),
501
502 #[error("invalid transaction event")]
503 InvalidTransaction,
504
505 #[error("envelope processor failed")]
506 ProcessingFailed(#[from] ProcessingAction),
507
508 #[error("duplicate {0} in event")]
509 DuplicateItem(ItemType),
510
511 #[error("failed to extract event payload")]
512 NoEventPayload,
513
514 #[error("missing project id in DSN")]
515 MissingProjectId,
516
517 #[error("invalid security report type: {0:?}")]
518 InvalidSecurityType(Bytes),
519
520 #[error("unsupported security report type")]
521 UnsupportedSecurityType,
522
523 #[error("invalid security report")]
524 InvalidSecurityReport(#[source] serde_json::Error),
525
526 #[error("invalid nel report")]
527 InvalidNelReport(#[source] NetworkReportError),
528
529 #[error("event filtered with reason: {0:?}")]
530 EventFiltered(FilterStatKey),
531
532 #[error("missing or invalid required event timestamp")]
533 InvalidTimestamp,
534
535 #[error("could not serialize event payload")]
536 SerializeFailed(#[source] serde_json::Error),
537
538 #[cfg(feature = "processing")]
539 #[error("failed to apply quotas")]
540 QuotasFailed(#[from] RateLimitingError),
541
542 #[error("invalid pii config")]
543 PiiConfigError(PiiConfigError),
544
545 #[error("invalid processing group type")]
546 InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
547
548 #[error("invalid replay")]
549 InvalidReplay(DiscardReason),
550
551 #[error("replay filtered with reason: {0:?}")]
552 ReplayFiltered(FilterStatKey),
553
554 #[cfg(feature = "processing")]
555 #[error("nintendo switch dying message processing failed {0:?}")]
556 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
557
558 #[cfg(all(sentry, feature = "processing"))]
559 #[error("playstation dump processing failed: {0}")]
560 InvalidPlaystationDump(String),
561
562 #[error("processing group does not match specific processor")]
563 ProcessingGroupMismatch,
564 #[error("new processing pipeline failed")]
565 ProcessingFailure,
566}
567
568impl ProcessingError {
569 pub fn to_outcome(&self) -> Option<Outcome> {
570 match self {
571 Self::PayloadTooLarge(payload_type) => {
572 Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
573 }
574 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
575 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
576 Self::InvalidSecurityType(_) => {
577 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
578 }
579 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
580 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
581 Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
582 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
583 Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
584 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
585 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
586 #[cfg(feature = "processing")]
587 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
588 #[cfg(all(sentry, feature = "processing"))]
589 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
590 #[cfg(feature = "processing")]
591 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
592 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
593 }
594 #[cfg(feature = "processing")]
595 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
596 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
597 Some(Outcome::Invalid(DiscardReason::Internal))
598 }
599 #[cfg(feature = "processing")]
600 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
601 Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
602 Self::MissingProjectId => None,
603 Self::EventFiltered(_) => None,
604 Self::InvalidProcessingGroup(_) => None,
605 Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
606 Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
607
608 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
609 Self::ProcessingFailure => None,
611 }
612 }
613
614 fn is_unexpected(&self) -> bool {
615 self.to_outcome()
616 .is_some_and(|outcome| outcome.is_unexpected())
617 }
618}
619
620#[cfg(feature = "processing")]
621impl From<Unreal4Error> for ProcessingError {
622 fn from(err: Unreal4Error) -> Self {
623 match err.kind() {
624 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
625 _ => ProcessingError::InvalidUnrealReport(err),
626 }
627 }
628}
629
630impl From<ExtractMetricsError> for ProcessingError {
631 fn from(error: ExtractMetricsError) -> Self {
632 match error {
633 ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
634 Self::InvalidTimestamp
635 }
636 }
637 }
638}
639
640impl From<InvalidProcessingGroupType> for ProcessingError {
641 fn from(value: InvalidProcessingGroupType) -> Self {
642 Self::InvalidProcessingGroup(Box::new(value))
643 }
644}
645
646type ExtractedEvent = (Annotated<Event>, usize);
647
648#[derive(Debug)]
653pub struct ProcessingExtractedMetrics {
654 metrics: ExtractedMetrics,
655}
656
657impl ProcessingExtractedMetrics {
658 pub fn new() -> Self {
659 Self {
660 metrics: ExtractedMetrics::default(),
661 }
662 }
663
664 pub fn into_inner(self) -> ExtractedMetrics {
665 self.metrics
666 }
667
668 pub fn extend(
670 &mut self,
671 extracted: ExtractedMetrics,
672 sampling_decision: Option<SamplingDecision>,
673 ) {
674 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
675 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
676 }
677
678 pub fn extend_project_metrics<I>(
680 &mut self,
681 buckets: I,
682 sampling_decision: Option<SamplingDecision>,
683 ) where
684 I: IntoIterator<Item = Bucket>,
685 {
686 self.metrics
687 .project_metrics
688 .extend(buckets.into_iter().map(|mut bucket| {
689 bucket.metadata.extracted_from_indexed =
690 sampling_decision == Some(SamplingDecision::Keep);
691 bucket
692 }));
693 }
694
695 pub fn extend_sampling_metrics<I>(
697 &mut self,
698 buckets: I,
699 sampling_decision: Option<SamplingDecision>,
700 ) where
701 I: IntoIterator<Item = Bucket>,
702 {
703 self.metrics
704 .sampling_metrics
705 .extend(buckets.into_iter().map(|mut bucket| {
706 bucket.metadata.extracted_from_indexed =
707 sampling_decision == Some(SamplingDecision::Keep);
708 bucket
709 }));
710 }
711
712 #[cfg(feature = "processing")]
717 fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
718 let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
720 let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
723
724 for (namespace, limit, indexed) in [
725 (
726 MetricNamespace::Transactions,
727 &enforcement.event,
728 &enforcement.event_indexed,
729 ),
730 (
731 MetricNamespace::Spans,
732 &enforcement.spans,
733 &enforcement.spans_indexed,
734 ),
735 ] {
736 if limit.is_active() {
737 drop_namespaces.push(namespace);
738 } else if indexed.is_active() && !enforced_consistently {
739 reset_extracted_from_indexed.push(namespace);
744 }
745 }
746
747 if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
748 self.retain_mut(|bucket| {
749 let Some(namespace) = bucket.name.try_namespace() else {
750 return true;
751 };
752
753 if drop_namespaces.contains(&namespace) {
754 return false;
755 }
756
757 if reset_extracted_from_indexed.contains(&namespace) {
758 bucket.metadata.extracted_from_indexed = false;
759 }
760
761 true
762 });
763 }
764 }
765
766 #[cfg(feature = "processing")]
767 fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
768 self.metrics.project_metrics.retain_mut(&mut f);
769 self.metrics.sampling_metrics.retain_mut(&mut f);
770 }
771}
772
773fn send_metrics(
774 metrics: ExtractedMetrics,
775 project_key: ProjectKey,
776 sampling_key: Option<ProjectKey>,
777 aggregator: &Addr<Aggregator>,
778) {
779 let ExtractedMetrics {
780 project_metrics,
781 sampling_metrics,
782 } = metrics;
783
784 if !project_metrics.is_empty() {
785 aggregator.send(MergeBuckets {
786 project_key,
787 buckets: project_metrics,
788 });
789 }
790
791 if !sampling_metrics.is_empty() {
792 let sampling_project_key = sampling_key.unwrap_or(project_key);
799 aggregator.send(MergeBuckets {
800 project_key: sampling_project_key,
801 buckets: sampling_metrics,
802 });
803 }
804}
805
806fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
811 match config.relay_mode() {
812 RelayMode::Proxy => false,
813 RelayMode::Managed => !project_info.has_feature(feature),
814 }
815}
816
817#[derive(Debug)]
820#[expect(
821 clippy::large_enum_variant,
822 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
823)]
824enum ProcessingResult {
825 Envelope {
826 managed_envelope: TypedEnvelope<Processed>,
827 extracted_metrics: ProcessingExtractedMetrics,
828 },
829 Output(Output<Outputs>),
830}
831
832impl ProcessingResult {
833 fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
835 Self::Envelope {
836 managed_envelope,
837 extracted_metrics: ProcessingExtractedMetrics::new(),
838 }
839 }
840}
841
842#[derive(Debug)]
844#[expect(
845 clippy::large_enum_variant,
846 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
847)]
848enum Submit<'a> {
849 Envelope(TypedEnvelope<Processed>),
851 Output {
853 output: Outputs,
854 ctx: processing::ForwardContext<'a>,
855 },
856}
857
858#[derive(Debug)]
868pub struct ProcessEnvelope {
869 pub envelope: ManagedEnvelope,
871 pub project_info: Arc<ProjectInfo>,
873 pub rate_limits: Arc<RateLimits>,
875 pub sampling_project_info: Option<Arc<ProjectInfo>>,
877 pub reservoir_counters: ReservoirCounters,
879}
880
881#[derive(Debug)]
883struct ProcessEnvelopeGrouped<'a> {
884 pub group: ProcessingGroup,
886 pub envelope: ManagedEnvelope,
888 pub ctx: processing::Context<'a>,
890}
891
892#[derive(Debug)]
904pub struct ProcessMetrics {
905 pub data: MetricData,
907 pub project_key: ProjectKey,
909 pub source: BucketSource,
911 pub received_at: DateTime<Utc>,
913 pub sent_at: Option<DateTime<Utc>>,
916}
917
918#[derive(Debug)]
920pub enum MetricData {
921 Raw(Vec<Item>),
923 Parsed(Vec<Bucket>),
925}
926
927impl MetricData {
928 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
933 let items = match self {
934 Self::Parsed(buckets) => return buckets,
935 Self::Raw(items) => items,
936 };
937
938 let mut buckets = Vec::new();
939 for item in items {
940 let payload = item.payload();
941 if item.ty() == &ItemType::Statsd {
942 for bucket_result in Bucket::parse_all(&payload, timestamp) {
943 match bucket_result {
944 Ok(bucket) => buckets.push(bucket),
945 Err(error) => relay_log::debug!(
946 error = &error as &dyn Error,
947 "failed to parse metric bucket from statsd format",
948 ),
949 }
950 }
951 } else if item.ty() == &ItemType::MetricBuckets {
952 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
953 Ok(parsed_buckets) => {
954 if buckets.is_empty() {
956 buckets = parsed_buckets;
957 } else {
958 buckets.extend(parsed_buckets);
959 }
960 }
961 Err(error) => {
962 relay_log::debug!(
963 error = &error as &dyn Error,
964 "failed to parse metric bucket",
965 );
966 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
967 }
968 }
969 } else {
970 relay_log::error!(
971 "invalid item of type {} passed to ProcessMetrics",
972 item.ty()
973 );
974 }
975 }
976 buckets
977 }
978}
979
980#[derive(Debug)]
981pub struct ProcessBatchedMetrics {
982 pub payload: Bytes,
984 pub source: BucketSource,
986 pub received_at: DateTime<Utc>,
988 pub sent_at: Option<DateTime<Utc>>,
990}
991
992#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
994pub enum BucketSource {
995 Internal,
1001 External,
1006}
1007
1008impl BucketSource {
1009 pub fn from_meta(meta: &RequestMeta) -> Self {
1011 match meta.request_trust() {
1012 RequestTrust::Trusted => Self::Internal,
1013 RequestTrust::Untrusted => Self::External,
1014 }
1015 }
1016}
1017
1018#[derive(Debug)]
1020pub struct SubmitClientReports {
1021 pub client_reports: Vec<ClientReport>,
1023 pub scoping: Scoping,
1025}
1026
1027#[derive(Debug)]
1029pub enum EnvelopeProcessor {
1030 ProcessEnvelope(Box<ProcessEnvelope>),
1031 ProcessProjectMetrics(Box<ProcessMetrics>),
1032 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1033 FlushBuckets(Box<FlushBuckets>),
1034 SubmitClientReports(Box<SubmitClientReports>),
1035}
1036
1037impl EnvelopeProcessor {
1038 pub fn variant(&self) -> &'static str {
1040 match self {
1041 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1042 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1043 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1044 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1045 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1046 }
1047 }
1048}
1049
1050impl relay_system::Interface for EnvelopeProcessor {}
1051
1052impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1053 type Response = relay_system::NoResponse;
1054
1055 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1056 Self::ProcessEnvelope(Box::new(message))
1057 }
1058}
1059
1060impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1061 type Response = NoResponse;
1062
1063 fn from_message(message: ProcessMetrics, _: ()) -> Self {
1064 Self::ProcessProjectMetrics(Box::new(message))
1065 }
1066}
1067
1068impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1069 type Response = NoResponse;
1070
1071 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1072 Self::ProcessBatchedMetrics(Box::new(message))
1073 }
1074}
1075
1076impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1077 type Response = NoResponse;
1078
1079 fn from_message(message: FlushBuckets, _: ()) -> Self {
1080 Self::FlushBuckets(Box::new(message))
1081 }
1082}
1083
1084impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1085 type Response = NoResponse;
1086
1087 fn from_message(message: SubmitClientReports, _: ()) -> Self {
1088 Self::SubmitClientReports(Box::new(message))
1089 }
1090}
1091
1092pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1094
1095#[derive(Clone)]
1099pub struct EnvelopeProcessorService {
1100 inner: Arc<InnerProcessor>,
1101}
1102
1103pub struct Addrs {
1105 pub outcome_aggregator: Addr<TrackOutcome>,
1106 pub upstream_relay: Addr<UpstreamRelay>,
1107 #[cfg(feature = "processing")]
1108 pub upload: Option<Addr<UploadAttachments>>,
1109 #[cfg(feature = "processing")]
1110 pub store_forwarder: Option<Addr<Store>>,
1111 pub aggregator: Addr<Aggregator>,
1112 #[cfg(feature = "processing")]
1113 pub global_rate_limits: Option<Addr<GlobalRateLimits>>,
1114}
1115
1116impl Default for Addrs {
1117 fn default() -> Self {
1118 Addrs {
1119 outcome_aggregator: Addr::dummy(),
1120 upstream_relay: Addr::dummy(),
1121 #[cfg(feature = "processing")]
1122 upload: None,
1123 #[cfg(feature = "processing")]
1124 store_forwarder: None,
1125 aggregator: Addr::dummy(),
1126 #[cfg(feature = "processing")]
1127 global_rate_limits: None,
1128 }
1129 }
1130}
1131
1132struct InnerProcessor {
1133 pool: EnvelopeProcessorServicePool,
1134 config: Arc<Config>,
1135 global_config: GlobalConfigHandle,
1136 project_cache: ProjectCacheHandle,
1137 cogs: Cogs,
1138 #[cfg(feature = "processing")]
1139 quotas_client: Option<AsyncRedisClient>,
1140 addrs: Addrs,
1141 #[cfg(feature = "processing")]
1142 rate_limiter: Option<Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>>,
1143 geoip_lookup: GeoIpLookup,
1144 #[cfg(feature = "processing")]
1145 cardinality_limiter: Option<CardinalityLimiter>,
1146 metric_outcomes: MetricOutcomes,
1147 processing: Processing,
1148}
1149
1150struct Processing {
1151 logs: LogsProcessor,
1152 trace_metrics: TraceMetricsProcessor,
1153 spans: SpansProcessor,
1154 check_ins: CheckInsProcessor,
1155 sessions: SessionsProcessor,
1156}
1157
1158impl EnvelopeProcessorService {
1159 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1161 pub fn new(
1162 pool: EnvelopeProcessorServicePool,
1163 config: Arc<Config>,
1164 global_config: GlobalConfigHandle,
1165 project_cache: ProjectCacheHandle,
1166 cogs: Cogs,
1167 #[cfg(feature = "processing")] redis: Option<RedisClients>,
1168 addrs: Addrs,
1169 metric_outcomes: MetricOutcomes,
1170 ) -> Self {
1171 let geoip_lookup = config
1172 .geoip_path()
1173 .and_then(
1174 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1175 Ok(geoip) => Some(geoip),
1176 Err(err) => {
1177 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1178 None
1179 }
1180 },
1181 )
1182 .unwrap_or_else(GeoIpLookup::empty);
1183
1184 #[cfg(feature = "processing")]
1185 let (cardinality, quotas) = match redis {
1186 Some(RedisClients {
1187 cardinality,
1188 quotas,
1189 ..
1190 }) => (Some(cardinality), Some(quotas)),
1191 None => (None, None),
1192 };
1193
1194 #[cfg(feature = "processing")]
1195 let global_rate_limits = addrs.global_rate_limits.clone().map(Into::into);
1196
1197 #[cfg(feature = "processing")]
1198 let rate_limiter = match (quotas.clone(), global_rate_limits) {
1199 (Some(redis), Some(global)) => {
1200 Some(RedisRateLimiter::new(redis, global).max_limit(config.max_rate_limit()))
1201 }
1202 _ => None,
1203 };
1204
1205 let quota_limiter = Arc::new(QuotaRateLimiter::new(
1206 #[cfg(feature = "processing")]
1207 project_cache.clone(),
1208 #[cfg(feature = "processing")]
1209 rate_limiter.clone(),
1210 ));
1211 #[cfg(feature = "processing")]
1212 let rate_limiter = rate_limiter.map(Arc::new);
1213
1214 let inner = InnerProcessor {
1215 pool,
1216 global_config,
1217 project_cache,
1218 cogs,
1219 #[cfg(feature = "processing")]
1220 quotas_client: quotas.clone(),
1221 #[cfg(feature = "processing")]
1222 rate_limiter,
1223 addrs,
1224 #[cfg(feature = "processing")]
1225 cardinality_limiter: cardinality
1226 .map(|cardinality| {
1227 RedisSetLimiter::new(
1228 RedisSetLimiterOptions {
1229 cache_vacuum_interval: config
1230 .cardinality_limiter_cache_vacuum_interval(),
1231 },
1232 cardinality,
1233 )
1234 })
1235 .map(CardinalityLimiter::new),
1236 metric_outcomes,
1237 processing: Processing {
1238 logs: LogsProcessor::new(Arc::clone("a_limiter)),
1239 trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)),
1240 spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1241 check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)),
1242 sessions: SessionsProcessor::new(quota_limiter),
1243 },
1244 geoip_lookup,
1245 config,
1246 };
1247
1248 Self {
1249 inner: Arc::new(inner),
1250 }
1251 }
1252
1253 async fn enforce_quotas<Group>(
1254 &self,
1255 managed_envelope: &mut TypedEnvelope<Group>,
1256 event: Annotated<Event>,
1257 extracted_metrics: &mut ProcessingExtractedMetrics,
1258 ctx: processing::Context<'_>,
1259 ) -> Result<Annotated<Event>, ProcessingError> {
1260 let cached_result = RateLimiter::Cached
1263 .enforce(managed_envelope, event, extracted_metrics, ctx)
1264 .await?;
1265
1266 if_processing!(self.inner.config, {
1267 let rate_limiter = match self.inner.rate_limiter.clone() {
1268 Some(rate_limiter) => rate_limiter,
1269 None => return Ok(cached_result.event),
1270 };
1271
1272 let consistent_result = RateLimiter::Consistent(rate_limiter)
1274 .enforce(
1275 managed_envelope,
1276 cached_result.event,
1277 extracted_metrics,
1278 ctx
1279 )
1280 .await?;
1281
1282 if !consistent_result.rate_limits.is_empty() {
1284 self.inner
1285 .project_cache
1286 .get(managed_envelope.scoping().project_key)
1287 .rate_limits()
1288 .merge(consistent_result.rate_limits);
1289 }
1290
1291 Ok(consistent_result.event)
1292 } else { Ok(cached_result.event) })
1293 }
1294
1295 async fn process_errors(
1297 &self,
1298 managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1299 project_id: ProjectId,
1300 mut ctx: processing::Context<'_>,
1301 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1302 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1303 let mut metrics = Metrics::default();
1304 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1305
1306 report::process_user_reports(managed_envelope);
1308
1309 if_processing!(self.inner.config, {
1310 unreal::expand(managed_envelope, &self.inner.config)?;
1311 #[cfg(sentry)]
1312 playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1313 nnswitch::expand(managed_envelope)?;
1314 });
1315
1316 let extraction_result = event::extract(
1317 managed_envelope,
1318 &mut metrics,
1319 event_fully_normalized,
1320 &self.inner.config,
1321 )?;
1322 let mut event = extraction_result.event;
1323
1324 if_processing!(self.inner.config, {
1325 if let Some(inner_event_fully_normalized) =
1326 unreal::process(managed_envelope, &mut event)?
1327 {
1328 event_fully_normalized = inner_event_fully_normalized;
1329 }
1330 #[cfg(sentry)]
1331 if let Some(inner_event_fully_normalized) =
1332 playstation::process(managed_envelope, &mut event, ctx.project_info)?
1333 {
1334 event_fully_normalized = inner_event_fully_normalized;
1335 }
1336 if let Some(inner_event_fully_normalized) =
1337 attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1338 {
1339 event_fully_normalized = inner_event_fully_normalized;
1340 }
1341 });
1342
1343 ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1344 managed_envelope,
1345 &mut event,
1346 ctx.project_info,
1347 ctx.sampling_project_info,
1348 );
1349
1350 let attachments = managed_envelope
1351 .envelope()
1352 .items()
1353 .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1354 processing::utils::event::finalize(
1355 managed_envelope.envelope().headers(),
1356 &mut event,
1357 attachments,
1358 &mut metrics,
1359 ctx.config,
1360 )?;
1361 event_fully_normalized = processing::utils::event::normalize(
1362 managed_envelope.envelope().headers(),
1363 &mut event,
1364 event_fully_normalized,
1365 project_id,
1366 ctx,
1367 &self.inner.geoip_lookup,
1368 )?;
1369 let filter_run =
1370 processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1371 .map_err(|err| {
1372 managed_envelope.reject(Outcome::Filtered(err.clone()));
1373 ProcessingError::EventFiltered(err)
1374 })?;
1375
1376 if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1377 dynamic_sampling::tag_error_with_sampling_decision(
1378 managed_envelope,
1379 &mut event,
1380 ctx.sampling_project_info,
1381 &self.inner.config,
1382 )
1383 .await;
1384 }
1385
1386 event = self
1387 .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1388 .await?;
1389
1390 if event.value().is_some() {
1391 processing::utils::event::scrub(&mut event, ctx.project_info)?;
1392 event::serialize(
1393 managed_envelope,
1394 &mut event,
1395 event_fully_normalized,
1396 EventMetricsExtracted(false),
1397 SpansExtracted(false),
1398 )?;
1399 event::emit_feedback_metrics(managed_envelope.envelope());
1400 }
1401
1402 let attachments = managed_envelope
1403 .envelope_mut()
1404 .items_mut()
1405 .filter(|i| i.ty() == &ItemType::Attachment);
1406 processing::utils::attachments::scrub(attachments, ctx.project_info);
1407
1408 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1409 relay_log::error!(
1410 tags.project = %project_id,
1411 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1412 "ingested event without normalizing"
1413 );
1414 }
1415
1416 Ok(Some(extracted_metrics))
1417 }
1418
1419 #[allow(unused_assignments)]
1421 async fn process_transactions(
1422 &self,
1423 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1424 cogs: &mut Token,
1425 project_id: ProjectId,
1426 mut ctx: processing::Context<'_>,
1427 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1428 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1429 let mut event_metrics_extracted = EventMetricsExtracted(false);
1430 let mut spans_extracted = SpansExtracted(false);
1431 let mut metrics = Metrics::default();
1432 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1433
1434 let extraction_result = event::extract(
1436 managed_envelope,
1437 &mut metrics,
1438 event_fully_normalized,
1439 &self.inner.config,
1440 )?;
1441
1442 if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1444 event_metrics_extracted = inner_event_metrics_extracted;
1445 }
1446 if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1447 spans_extracted = inner_spans_extracted;
1448 };
1449
1450 let mut event = extraction_result.event;
1452
1453 let profile_id = profile::filter(
1454 managed_envelope,
1455 &event,
1456 ctx.config,
1457 project_id,
1458 ctx.project_info,
1459 );
1460 processing::transactions::profile::transfer_id(&mut event, profile_id);
1461 processing::transactions::profile::remove_context_if_rate_limited(
1462 &mut event,
1463 managed_envelope.scoping(),
1464 ctx,
1465 );
1466
1467 ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1468 managed_envelope,
1469 &mut event,
1470 ctx.project_info,
1471 ctx.sampling_project_info,
1472 );
1473
1474 let attachments = managed_envelope
1475 .envelope()
1476 .items()
1477 .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1478 processing::utils::event::finalize(
1479 managed_envelope.envelope().headers(),
1480 &mut event,
1481 attachments,
1482 &mut metrics,
1483 &self.inner.config,
1484 )?;
1485
1486 event_fully_normalized = processing::utils::event::normalize(
1487 managed_envelope.envelope().headers(),
1488 &mut event,
1489 event_fully_normalized,
1490 project_id,
1491 ctx,
1492 &self.inner.geoip_lookup,
1493 )?;
1494
1495 let filter_run =
1496 processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1497 .map_err(|err| {
1498 managed_envelope.reject(Outcome::Filtered(err.clone()));
1499 ProcessingError::EventFiltered(err)
1500 })?;
1501
1502 let run_dynamic_sampling = (matches!(filter_run, FiltersStatus::Ok)
1506 || self.inner.config.processing_enabled())
1507 && matches!(&ctx.project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled());
1508
1509 let sampling_result = match run_dynamic_sampling {
1510 true => {
1511 #[allow(unused_mut)]
1512 let mut reservoir = ReservoirEvaluator::new(Arc::clone(ctx.reservoir_counters));
1513 #[cfg(feature = "processing")]
1514 if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
1515 reservoir.set_redis(managed_envelope.scoping().organization_id, quotas_client);
1516 }
1517 processing::utils::dynamic_sampling::run(
1518 managed_envelope.envelope().headers().dsc(),
1519 &event,
1520 &ctx,
1521 Some(&reservoir),
1522 )
1523 .await
1524 }
1525 false => SamplingResult::Pending,
1526 };
1527
1528 relay_statsd::metric!(
1529 counter(RelayCounters::SamplingDecision) += 1,
1530 decision = sampling_result.decision().as_str(),
1531 item = "transaction"
1532 );
1533
1534 #[cfg(feature = "processing")]
1535 let server_sample_rate = sampling_result.sample_rate();
1536
1537 if let Some(outcome) = sampling_result.into_dropped_outcome() {
1538 profile::process(
1541 managed_envelope,
1542 &mut event,
1543 ctx.global_config,
1544 ctx.config,
1545 ctx.project_info,
1546 );
1547 event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1549 &mut event,
1550 &mut extracted_metrics,
1551 ExtractMetricsContext {
1552 dsc: managed_envelope.envelope().dsc(),
1553 project_id,
1554 ctx,
1555 sampling_decision: SamplingDecision::Drop,
1556 metrics_extracted: event_metrics_extracted.0,
1557 spans_extracted: spans_extracted.0,
1558 },
1559 )?;
1560
1561 dynamic_sampling::drop_unsampled_items(
1562 managed_envelope,
1563 event,
1564 outcome,
1565 spans_extracted,
1566 );
1567
1568 event = self
1573 .enforce_quotas(
1574 managed_envelope,
1575 Annotated::empty(),
1576 &mut extracted_metrics,
1577 ctx,
1578 )
1579 .await?;
1580
1581 return Ok(Some(extracted_metrics));
1582 }
1583
1584 let _post_ds = cogs.start_category("post_ds");
1585
1586 processing::utils::event::scrub(&mut event, ctx.project_info)?;
1590
1591 let attachments = managed_envelope
1592 .envelope_mut()
1593 .items_mut()
1594 .filter(|i| i.ty() == &ItemType::Attachment);
1595 processing::utils::attachments::scrub(attachments, ctx.project_info);
1596
1597 if_processing!(self.inner.config, {
1598 let profile_id = profile::process(
1600 managed_envelope,
1601 &mut event,
1602 ctx.global_config,
1603 ctx.config,
1604 ctx.project_info,
1605 );
1606 processing::transactions::profile::transfer_id(&mut event, profile_id);
1607 processing::transactions::profile::scrub_profiler_id(&mut event);
1608
1609 event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1611 &mut event,
1612 &mut extracted_metrics,
1613 ExtractMetricsContext {
1614 dsc: managed_envelope.envelope().dsc(),
1615 project_id,
1616 ctx,
1617 sampling_decision: SamplingDecision::Keep,
1618 metrics_extracted: event_metrics_extracted.0,
1619 spans_extracted: spans_extracted.0,
1620 },
1621 )?;
1622
1623 if let Some(spans) = processing::transactions::spans::extract_from_event(
1624 managed_envelope.envelope().dsc(),
1625 &event,
1626 ctx.global_config,
1627 ctx.config,
1628 server_sample_rate,
1629 event_metrics_extracted,
1630 spans_extracted,
1631 ) {
1632 spans_extracted = SpansExtracted(true);
1633 for item in spans {
1634 match item {
1635 Ok(item) => managed_envelope.envelope_mut().add_item(item),
1636 Err(()) => managed_envelope.track_outcome(
1637 Outcome::Invalid(DiscardReason::InvalidSpan),
1638 DataCategory::SpanIndexed,
1639 1,
1640 ),
1641 }
1643 }
1644 }
1645 });
1646
1647 event = self
1648 .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1649 .await?;
1650
1651 if event.value().is_some() {
1653 event::serialize(
1654 managed_envelope,
1655 &mut event,
1656 event_fully_normalized,
1657 event_metrics_extracted,
1658 spans_extracted,
1659 )?;
1660 }
1661
1662 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1663 relay_log::error!(
1664 tags.project = %project_id,
1665 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1666 "ingested event without normalizing"
1667 );
1668 };
1669
1670 Ok(Some(extracted_metrics))
1671 }
1672
1673 async fn process_profile_chunks(
1674 &self,
1675 managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
1676 ctx: processing::Context<'_>,
1677 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1678 profile_chunk::filter(managed_envelope, ctx.project_info);
1679
1680 if_processing!(self.inner.config, {
1681 profile_chunk::process(
1682 managed_envelope,
1683 ctx.project_info,
1684 ctx.global_config,
1685 ctx.config,
1686 );
1687 });
1688
1689 self.enforce_quotas(
1690 managed_envelope,
1691 Annotated::empty(),
1692 &mut ProcessingExtractedMetrics::new(),
1693 ctx,
1694 )
1695 .await?;
1696
1697 Ok(None)
1698 }
1699
1700 async fn process_standalone(
1702 &self,
1703 managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1704 project_id: ProjectId,
1705 ctx: processing::Context<'_>,
1706 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1707 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1708
1709 standalone::process(managed_envelope);
1710
1711 profile::filter(
1712 managed_envelope,
1713 &Annotated::empty(),
1714 ctx.config,
1715 project_id,
1716 ctx.project_info,
1717 );
1718
1719 self.enforce_quotas(
1720 managed_envelope,
1721 Annotated::empty(),
1722 &mut extracted_metrics,
1723 ctx,
1724 )
1725 .await?;
1726
1727 report::process_user_reports(managed_envelope);
1728 let attachments = managed_envelope
1729 .envelope_mut()
1730 .items_mut()
1731 .filter(|i| i.ty() == &ItemType::Attachment);
1732 processing::utils::attachments::scrub(attachments, ctx.project_info);
1733
1734 Ok(Some(extracted_metrics))
1735 }
1736
1737 async fn process_client_reports(
1739 &self,
1740 managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1741 ctx: processing::Context<'_>,
1742 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1743 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1744
1745 self.enforce_quotas(
1746 managed_envelope,
1747 Annotated::empty(),
1748 &mut extracted_metrics,
1749 ctx,
1750 )
1751 .await?;
1752
1753 report::process_client_reports(
1754 managed_envelope,
1755 ctx.config,
1756 ctx.project_info,
1757 self.inner.addrs.outcome_aggregator.clone(),
1758 );
1759
1760 Ok(Some(extracted_metrics))
1761 }
1762
1763 async fn process_replays(
1765 &self,
1766 managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1767 ctx: processing::Context<'_>,
1768 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1769 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1770
1771 replay::process(
1772 managed_envelope,
1773 ctx.global_config,
1774 ctx.config,
1775 ctx.project_info,
1776 &self.inner.geoip_lookup,
1777 )?;
1778
1779 self.enforce_quotas(
1780 managed_envelope,
1781 Annotated::empty(),
1782 &mut extracted_metrics,
1783 ctx,
1784 )
1785 .await?;
1786
1787 Ok(Some(extracted_metrics))
1788 }
1789
1790 async fn process_nel(
1791 &self,
1792 mut managed_envelope: ManagedEnvelope,
1793 ctx: processing::Context<'_>,
1794 ) -> Result<ProcessingResult, ProcessingError> {
1795 nel::convert_to_logs(&mut managed_envelope);
1796 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1797 .await
1798 }
1799
1800 async fn process_with_processor<P: processing::Processor>(
1801 &self,
1802 processor: &P,
1803 mut managed_envelope: ManagedEnvelope,
1804 ctx: processing::Context<'_>,
1805 ) -> Result<ProcessingResult, ProcessingError>
1806 where
1807 Outputs: From<P::Output>,
1808 {
1809 let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1810 debug_assert!(
1811 false,
1812 "there must be work for the {} processor",
1813 std::any::type_name::<P>(),
1814 );
1815 return Err(ProcessingError::ProcessingGroupMismatch);
1816 };
1817
1818 managed_envelope.update();
1819 match managed_envelope.envelope().is_empty() {
1820 true => managed_envelope.accept(),
1821 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1822 }
1823
1824 processor
1825 .process(work, ctx)
1826 .await
1827 .map_err(|err| {
1828 relay_log::debug!(
1829 error = &err as &dyn std::error::Error,
1830 "processing pipeline failed"
1831 );
1832 ProcessingError::ProcessingFailure
1833 })
1834 .map(|o| o.map(Into::into))
1835 .map(ProcessingResult::Output)
1836 }
1837
1838 async fn process_standalone_spans(
1842 &self,
1843 managed_envelope: &mut TypedEnvelope<SpanGroup>,
1844 _project_id: ProjectId,
1845 ctx: processing::Context<'_>,
1846 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1847 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1848
1849 span::filter(managed_envelope, ctx.config, ctx.project_info);
1850 span::convert_otel_traces_data(managed_envelope);
1851
1852 if_processing!(self.inner.config, {
1853 span::process(
1854 managed_envelope,
1855 &mut Annotated::empty(),
1856 &mut extracted_metrics,
1857 _project_id,
1858 ctx,
1859 &self.inner.geoip_lookup,
1860 )
1861 .await;
1862 });
1863
1864 self.enforce_quotas(
1865 managed_envelope,
1866 Annotated::empty(),
1867 &mut extracted_metrics,
1868 ctx,
1869 )
1870 .await?;
1871
1872 Ok(Some(extracted_metrics))
1873 }
1874
1875 async fn process_envelope(
1876 &self,
1877 cogs: &mut Token,
1878 project_id: ProjectId,
1879 message: ProcessEnvelopeGrouped<'_>,
1880 ) -> Result<ProcessingResult, ProcessingError> {
1881 let ProcessEnvelopeGrouped {
1882 group,
1883 envelope: mut managed_envelope,
1884 ctx,
1885 } = message;
1886
1887 if let Some(sampling_state) = ctx.sampling_project_info {
1889 managed_envelope
1892 .envelope_mut()
1893 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1894 }
1895
1896 if let Some(retention) = ctx.project_info.config.event_retention {
1899 managed_envelope.envelope_mut().set_retention(retention);
1900 }
1901
1902 if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1905 managed_envelope
1906 .envelope_mut()
1907 .set_downsampled_retention(retention);
1908 }
1909
1910 managed_envelope
1915 .envelope_mut()
1916 .meta_mut()
1917 .set_project_id(project_id);
1918
1919 macro_rules! run {
1920 ($fn_name:ident $(, $args:expr)*) => {
1921 async {
1922 let mut managed_envelope = (managed_envelope, group).try_into()?;
1923 match self.$fn_name(&mut managed_envelope, $($args),*).await {
1924 Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1925 managed_envelope: managed_envelope.into_processed(),
1926 extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1927 }),
1928 Err(error) => {
1929 relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1930 if let Some(outcome) = error.to_outcome() {
1931 managed_envelope.reject(outcome);
1932 }
1933
1934 return Err(error);
1935 }
1936 }
1937 }.await
1938 };
1939 }
1940
1941 relay_log::trace!("Processing {group} group", group = group.variant());
1942
1943 match group {
1944 ProcessingGroup::Error => run!(process_errors, project_id, ctx),
1945 ProcessingGroup::Transaction => {
1946 run!(process_transactions, cogs, project_id, ctx)
1947 }
1948 ProcessingGroup::Session => {
1949 self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1950 .await
1951 }
1952 ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx),
1953 ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
1954 ProcessingGroup::Replay => {
1955 run!(process_replays, ctx)
1956 }
1957 ProcessingGroup::CheckIn => {
1958 self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1959 .await
1960 }
1961 ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1962 ProcessingGroup::Log => {
1963 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1964 .await
1965 }
1966 ProcessingGroup::TraceMetric => {
1967 self.process_with_processor(
1968 &self.inner.processing.trace_metrics,
1969 managed_envelope,
1970 ctx,
1971 )
1972 .await
1973 }
1974 ProcessingGroup::SpanV2 => {
1975 self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1976 .await
1977 }
1978 ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1979 ProcessingGroup::ProfileChunk => {
1980 run!(process_profile_chunks, ctx)
1981 }
1982 ProcessingGroup::Metrics => {
1984 if self.inner.config.relay_mode() != RelayMode::Proxy {
1987 relay_log::error!(
1988 tags.project = %project_id,
1989 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1990 "received metrics in the process_state"
1991 );
1992 }
1993
1994 Ok(ProcessingResult::no_metrics(
1995 managed_envelope.into_processed(),
1996 ))
1997 }
1998 ProcessingGroup::Ungrouped => {
2000 relay_log::error!(
2001 tags.project = %project_id,
2002 items = ?managed_envelope.envelope().items().next().map(Item::ty),
2003 "could not identify the processing group based on the envelope's items"
2004 );
2005
2006 Ok(ProcessingResult::no_metrics(
2007 managed_envelope.into_processed(),
2008 ))
2009 }
2010 ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
2014 managed_envelope.into_processed(),
2015 )),
2016 }
2017 }
2018
2019 async fn process<'a>(
2025 &self,
2026 cogs: &mut Token,
2027 mut message: ProcessEnvelopeGrouped<'a>,
2028 ) -> Result<Option<Submit<'a>>, ProcessingError> {
2029 let ProcessEnvelopeGrouped {
2030 ref mut envelope,
2031 ctx,
2032 ..
2033 } = message;
2034
2035 let Some(project_id) = ctx
2042 .project_info
2043 .project_id
2044 .or_else(|| envelope.envelope().meta().project_id())
2045 else {
2046 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2047 return Err(ProcessingError::MissingProjectId);
2048 };
2049
2050 let client = envelope.envelope().meta().client().map(str::to_owned);
2051 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2052 let project_key = envelope.envelope().meta().public_key();
2053 let sampling_key = envelope
2057 .envelope()
2058 .sampling_key()
2059 .filter(|_| ctx.sampling_project_info.is_some());
2060
2061 relay_log::configure_scope(|scope| {
2064 scope.set_tag("project", project_id);
2065 if let Some(client) = client {
2066 scope.set_tag("sdk", client);
2067 }
2068 if let Some(user_agent) = user_agent {
2069 scope.set_extra("user_agent", user_agent.into());
2070 }
2071 });
2072
2073 let result = match self.process_envelope(cogs, project_id, message).await {
2074 Ok(ProcessingResult::Envelope {
2075 mut managed_envelope,
2076 extracted_metrics,
2077 }) => {
2078 managed_envelope.update();
2081
2082 let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2083 send_metrics(
2084 extracted_metrics.metrics,
2085 project_key,
2086 sampling_key,
2087 &self.inner.addrs.aggregator,
2088 );
2089
2090 let envelope_response = if managed_envelope.envelope().is_empty() {
2091 if !has_metrics {
2092 managed_envelope.reject(Outcome::RateLimited(None));
2094 } else {
2095 managed_envelope.accept();
2096 }
2097
2098 None
2099 } else {
2100 Some(managed_envelope)
2101 };
2102
2103 Ok(envelope_response.map(Submit::Envelope))
2104 }
2105 Ok(ProcessingResult::Output(Output { main, metrics })) => {
2106 if let Some(metrics) = metrics {
2107 metrics.accept(|metrics| {
2108 send_metrics(
2109 metrics,
2110 project_key,
2111 sampling_key,
2112 &self.inner.addrs.aggregator,
2113 );
2114 });
2115 }
2116
2117 let ctx = ctx.to_forward();
2118 Ok(main.map(|output| Submit::Output { output, ctx }))
2119 }
2120 Err(err) => Err(err),
2121 };
2122
2123 relay_log::configure_scope(|scope| {
2124 scope.remove_tag("project");
2125 scope.remove_tag("sdk");
2126 scope.remove_tag("user_agent");
2127 });
2128
2129 result
2130 }
2131
2132 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2133 let project_key = message.envelope.envelope().meta().public_key();
2134 let wait_time = message.envelope.age();
2135 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2136
2137 cogs.cancel();
2140
2141 let scoping = message.envelope.scoping();
2142 for (group, envelope) in ProcessingGroup::split_envelope(
2143 *message.envelope.into_envelope(),
2144 &message.project_info,
2145 ) {
2146 let mut cogs = self
2147 .inner
2148 .cogs
2149 .timed(ResourceId::Relay, AppFeature::from(group));
2150
2151 let mut envelope =
2152 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2153 envelope.scope(scoping);
2154
2155 let global_config = self.inner.global_config.current();
2156
2157 let ctx = processing::Context {
2158 config: &self.inner.config,
2159 global_config: &global_config,
2160 project_info: &message.project_info,
2161 sampling_project_info: message.sampling_project_info.as_deref(),
2162 rate_limits: &message.rate_limits,
2163 reservoir_counters: &message.reservoir_counters,
2164 };
2165
2166 let message = ProcessEnvelopeGrouped {
2167 group,
2168 envelope,
2169 ctx,
2170 };
2171
2172 let result = metric!(
2173 timer(RelayTimers::EnvelopeProcessingTime),
2174 group = group.variant(),
2175 { self.process(&mut cogs, message).await }
2176 );
2177
2178 match result {
2179 Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2180 Ok(None) => {}
2181 Err(error) if error.is_unexpected() => {
2182 relay_log::error!(
2183 tags.project_key = %project_key,
2184 error = &error as &dyn Error,
2185 "error processing envelope"
2186 )
2187 }
2188 Err(error) => {
2189 relay_log::debug!(
2190 tags.project_key = %project_key,
2191 error = &error as &dyn Error,
2192 "error processing envelope"
2193 )
2194 }
2195 }
2196 }
2197 }
2198
2199 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2200 let ProcessMetrics {
2201 data,
2202 project_key,
2203 received_at,
2204 sent_at,
2205 source,
2206 } = message;
2207
2208 let received_timestamp =
2209 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2210
2211 let mut buckets = data.into_buckets(received_timestamp);
2212 if buckets.is_empty() {
2213 return;
2214 };
2215 cogs.update(relay_metrics::cogs::BySize(&buckets));
2216
2217 let clock_drift_processor =
2218 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2219
2220 buckets.retain_mut(|bucket| {
2221 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2222 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2223 return false;
2224 }
2225
2226 if !self::metrics::is_valid_namespace(bucket) {
2227 return false;
2228 }
2229
2230 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2231
2232 if !matches!(source, BucketSource::Internal) {
2233 bucket.metadata = BucketMetadata::new(received_timestamp);
2234 }
2235
2236 true
2237 });
2238
2239 let project = self.inner.project_cache.get(project_key);
2240
2241 let buckets = match project.state() {
2244 ProjectState::Enabled(project_info) => {
2245 let rate_limits = project.rate_limits().current_limits();
2246 self.check_buckets(project_key, project_info, &rate_limits, buckets)
2247 }
2248 _ => buckets,
2249 };
2250
2251 relay_log::trace!("merging metric buckets into the aggregator");
2252 self.inner
2253 .addrs
2254 .aggregator
2255 .send(MergeBuckets::new(project_key, buckets));
2256 }
2257
2258 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2259 let ProcessBatchedMetrics {
2260 payload,
2261 source,
2262 received_at,
2263 sent_at,
2264 } = message;
2265
2266 #[derive(serde::Deserialize)]
2267 struct Wrapper {
2268 buckets: HashMap<ProjectKey, Vec<Bucket>>,
2269 }
2270
2271 let buckets = match serde_json::from_slice(&payload) {
2272 Ok(Wrapper { buckets }) => buckets,
2273 Err(error) => {
2274 relay_log::debug!(
2275 error = &error as &dyn Error,
2276 "failed to parse batched metrics",
2277 );
2278 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2279 return;
2280 }
2281 };
2282
2283 for (project_key, buckets) in buckets {
2284 self.handle_process_metrics(
2285 cogs,
2286 ProcessMetrics {
2287 data: MetricData::Parsed(buckets),
2288 project_key,
2289 source,
2290 received_at,
2291 sent_at,
2292 },
2293 )
2294 }
2295 }
2296
2297 fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2298 let _submit = cogs.start_category("submit");
2299
2300 #[cfg(feature = "processing")]
2301 if self.inner.config.processing_enabled()
2302 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2303 {
2304 match submit {
2305 Submit::Envelope(envelope) => {
2306 let envelope_has_attachments = envelope
2307 .envelope()
2308 .items()
2309 .any(|item| *item.ty() == ItemType::Attachment);
2310 let use_objectstore = || {
2312 let options = &self.inner.global_config.current().options;
2313 utils::sample(options.objectstore_attachments_sample_rate).is_keep()
2314 };
2315
2316 if let Some(upload) = &self.inner.addrs.upload
2317 && envelope_has_attachments
2318 && use_objectstore()
2319 {
2320 upload.send(UploadAttachments { envelope })
2322 } else {
2323 store_forwarder.send(StoreEnvelope { envelope })
2324 }
2325 }
2326 Submit::Output { output, ctx } => output
2327 .forward_store(store_forwarder, ctx)
2328 .unwrap_or_else(|err| err.into_inner()),
2329 }
2330 return;
2331 }
2332
2333 let mut envelope = match submit {
2334 Submit::Envelope(envelope) => envelope,
2335 Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2336 Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2337 Err(_) => {
2338 relay_log::error!("failed to serialize output to an envelope");
2339 return;
2340 }
2341 },
2342 };
2343
2344 if envelope.envelope_mut().is_empty() {
2345 envelope.accept();
2346 return;
2347 }
2348
2349 envelope.envelope_mut().set_sent_at(Utc::now());
2355
2356 relay_log::trace!("sending envelope to sentry endpoint");
2357 let http_encoding = self.inner.config.http_encoding();
2358 let result = envelope.envelope().to_vec().and_then(|v| {
2359 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2360 });
2361
2362 match result {
2363 Ok(body) => {
2364 self.inner
2365 .addrs
2366 .upstream_relay
2367 .send(SendRequest(SendEnvelope {
2368 envelope,
2369 body,
2370 http_encoding,
2371 project_cache: self.inner.project_cache.clone(),
2372 }));
2373 }
2374 Err(error) => {
2375 relay_log::error!(
2378 error = &error as &dyn Error,
2379 tags.project_key = %envelope.scoping().project_key,
2380 "failed to serialize envelope payload"
2381 );
2382
2383 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2384 }
2385 }
2386 }
2387
2388 fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2389 let SubmitClientReports {
2390 client_reports,
2391 scoping,
2392 } = message;
2393
2394 let upstream = self.inner.config.upstream_descriptor();
2395 let dsn = PartialDsn::outbound(&scoping, upstream);
2396
2397 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2398 for client_report in client_reports {
2399 match client_report.serialize() {
2400 Ok(payload) => {
2401 let mut item = Item::new(ItemType::ClientReport);
2402 item.set_payload(ContentType::Json, payload);
2403 envelope.add_item(item);
2404 }
2405 Err(error) => {
2406 relay_log::error!(
2407 error = &error as &dyn std::error::Error,
2408 "failed to serialize client report"
2409 );
2410 }
2411 }
2412 }
2413
2414 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2415 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2416 }
2417
2418 fn check_buckets(
2419 &self,
2420 project_key: ProjectKey,
2421 project_info: &ProjectInfo,
2422 rate_limits: &RateLimits,
2423 buckets: Vec<Bucket>,
2424 ) -> Vec<Bucket> {
2425 let Some(scoping) = project_info.scoping(project_key) else {
2426 relay_log::error!(
2427 tags.project_key = project_key.as_str(),
2428 "there is no scoping: dropping {} buckets",
2429 buckets.len(),
2430 );
2431 return Vec::new();
2432 };
2433
2434 let mut buckets = self::metrics::apply_project_info(
2435 buckets,
2436 &self.inner.metric_outcomes,
2437 project_info,
2438 scoping,
2439 );
2440
2441 let namespaces: BTreeSet<MetricNamespace> = buckets
2442 .iter()
2443 .filter_map(|bucket| bucket.name.try_namespace())
2444 .collect();
2445
2446 for namespace in namespaces {
2447 let limits = rate_limits.check_with_quotas(
2448 project_info.get_quotas(),
2449 scoping.item(DataCategory::MetricBucket),
2450 );
2451
2452 if limits.is_limited() {
2453 let rejected;
2454 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2455 bucket.name.try_namespace() == Some(namespace)
2456 });
2457
2458 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2459 self.inner.metric_outcomes.track(
2460 scoping,
2461 &rejected,
2462 Outcome::RateLimited(reason_code),
2463 );
2464 }
2465 }
2466
2467 let quotas = project_info.config.quotas.clone();
2468 match MetricsLimiter::create(buckets, quotas, scoping) {
2469 Ok(mut bucket_limiter) => {
2470 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2471 bucket_limiter.into_buckets()
2472 }
2473 Err(buckets) => buckets,
2474 }
2475 }
2476
2477 #[cfg(feature = "processing")]
2478 async fn rate_limit_buckets(
2479 &self,
2480 scoping: Scoping,
2481 project_info: &ProjectInfo,
2482 mut buckets: Vec<Bucket>,
2483 ) -> Vec<Bucket> {
2484 let Some(rate_limiter) = &self.inner.rate_limiter else {
2485 return buckets;
2486 };
2487
2488 let global_config = self.inner.global_config.current();
2489 let namespaces = buckets
2490 .iter()
2491 .filter_map(|bucket| bucket.name.try_namespace())
2492 .counts();
2493
2494 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2495
2496 for (namespace, quantity) in namespaces {
2497 let item_scoping = scoping.metric_bucket(namespace);
2498
2499 let limits = match rate_limiter
2500 .is_rate_limited(quotas, item_scoping, quantity, false)
2501 .await
2502 {
2503 Ok(limits) => limits,
2504 Err(err) => {
2505 relay_log::error!(
2506 error = &err as &dyn std::error::Error,
2507 "failed to check redis rate limits"
2508 );
2509 break;
2510 }
2511 };
2512
2513 if limits.is_limited() {
2514 let rejected;
2515 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2516 bucket.name.try_namespace() == Some(namespace)
2517 });
2518
2519 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2520 self.inner.metric_outcomes.track(
2521 scoping,
2522 &rejected,
2523 Outcome::RateLimited(reason_code),
2524 );
2525
2526 self.inner
2527 .project_cache
2528 .get(item_scoping.scoping.project_key)
2529 .rate_limits()
2530 .merge(limits);
2531 }
2532 }
2533
2534 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2535 Err(buckets) => buckets,
2536 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2537 }
2538 }
2539
2540 #[cfg(feature = "processing")]
2542 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2543 relay_log::trace!("handle_rate_limit_buckets");
2544
2545 let scoping = *bucket_limiter.scoping();
2546
2547 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2548 let global_config = self.inner.global_config.current();
2549 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2550
2551 let over_accept_once = true;
2554 let mut rate_limits = RateLimits::new();
2555
2556 for category in [DataCategory::Transaction, DataCategory::Span] {
2557 let count = bucket_limiter.count(category);
2558
2559 let timer = Instant::now();
2560 let mut is_limited = false;
2561
2562 if let Some(count) = count {
2563 match rate_limiter
2564 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2565 .await
2566 {
2567 Ok(limits) => {
2568 is_limited = limits.is_limited();
2569 rate_limits.merge(limits)
2570 }
2571 Err(e) => relay_log::error!(error = &e as &dyn Error),
2572 }
2573 }
2574
2575 relay_statsd::metric!(
2576 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2577 category = category.name(),
2578 limited = if is_limited { "true" } else { "false" },
2579 count = match count {
2580 None => "none",
2581 Some(0) => "0",
2582 Some(1) => "1",
2583 Some(1..=10) => "10",
2584 Some(1..=25) => "25",
2585 Some(1..=50) => "50",
2586 Some(51..=100) => "100",
2587 Some(101..=500) => "500",
2588 _ => "> 500",
2589 },
2590 );
2591 }
2592
2593 if rate_limits.is_limited() {
2594 let was_enforced =
2595 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2596
2597 if was_enforced {
2598 self.inner
2600 .project_cache
2601 .get(scoping.project_key)
2602 .rate_limits()
2603 .merge(rate_limits);
2604 }
2605 }
2606 }
2607
2608 bucket_limiter.into_buckets()
2609 }
2610
2611 #[cfg(feature = "processing")]
2613 async fn cardinality_limit_buckets(
2614 &self,
2615 scoping: Scoping,
2616 limits: &[CardinalityLimit],
2617 buckets: Vec<Bucket>,
2618 ) -> Vec<Bucket> {
2619 let global_config = self.inner.global_config.current();
2620 let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2621
2622 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2623 return buckets;
2624 }
2625
2626 let Some(ref limiter) = self.inner.cardinality_limiter else {
2627 return buckets;
2628 };
2629
2630 let scope = relay_cardinality::Scoping {
2631 organization_id: scoping.organization_id,
2632 project_id: scoping.project_id,
2633 };
2634
2635 let limits = match limiter
2636 .check_cardinality_limits(scope, limits, buckets)
2637 .await
2638 {
2639 Ok(limits) => limits,
2640 Err((buckets, error)) => {
2641 relay_log::error!(
2642 error = &error as &dyn std::error::Error,
2643 "cardinality limiter failed"
2644 );
2645 return buckets;
2646 }
2647 };
2648
2649 let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2650 if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2651 for limit in limits.exceeded_limits() {
2652 relay_log::with_scope(
2653 |scope| {
2654 scope.set_user(Some(relay_log::sentry::User {
2656 id: Some(scoping.organization_id.to_string()),
2657 ..Default::default()
2658 }));
2659 },
2660 || {
2661 relay_log::error!(
2662 tags.organization_id = scoping.organization_id.value(),
2663 tags.limit_id = limit.id,
2664 tags.passive = limit.passive,
2665 "Cardinality Limit"
2666 );
2667 },
2668 );
2669 }
2670 }
2671
2672 for (limit, reports) in limits.cardinality_reports() {
2673 for report in reports {
2674 self.inner
2675 .metric_outcomes
2676 .cardinality(scoping, limit, report);
2677 }
2678 }
2679
2680 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2681 return limits.into_source();
2682 }
2683
2684 let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2685
2686 for (bucket, exceeded) in rejected {
2687 self.inner.metric_outcomes.track(
2688 scoping,
2689 &[bucket],
2690 Outcome::CardinalityLimited(exceeded.id.clone()),
2691 );
2692 }
2693 accepted
2694 }
2695
2696 #[cfg(feature = "processing")]
2703 async fn encode_metrics_processing(
2704 &self,
2705 message: FlushBuckets,
2706 store_forwarder: &Addr<Store>,
2707 ) {
2708 use crate::constants::DEFAULT_EVENT_RETENTION;
2709 use crate::services::store::StoreMetrics;
2710
2711 for ProjectBuckets {
2712 buckets,
2713 scoping,
2714 project_info,
2715 ..
2716 } in message.buckets.into_values()
2717 {
2718 let buckets = self
2719 .rate_limit_buckets(scoping, &project_info, buckets)
2720 .await;
2721
2722 let limits = project_info.get_cardinality_limits();
2723 let buckets = self
2724 .cardinality_limit_buckets(scoping, limits, buckets)
2725 .await;
2726
2727 if buckets.is_empty() {
2728 continue;
2729 }
2730
2731 let retention = project_info
2732 .config
2733 .event_retention
2734 .unwrap_or(DEFAULT_EVENT_RETENTION);
2735
2736 store_forwarder.send(StoreMetrics {
2739 buckets,
2740 scoping,
2741 retention,
2742 });
2743 }
2744 }
2745
2746 fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2758 let FlushBuckets {
2759 partition_key,
2760 buckets,
2761 } = message;
2762
2763 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2764 let upstream = self.inner.config.upstream_descriptor();
2765
2766 for ProjectBuckets {
2767 buckets, scoping, ..
2768 } in buckets.values()
2769 {
2770 let dsn = PartialDsn::outbound(scoping, upstream);
2771
2772 relay_statsd::metric!(
2773 histogram(RelayHistograms::PartitionKeys) = u64::from(partition_key)
2774 );
2775
2776 let mut num_batches = 0;
2777 for batch in BucketsView::from(buckets).by_size(batch_size) {
2778 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2779
2780 let mut item = Item::new(ItemType::MetricBuckets);
2781 item.set_source_quantities(crate::metrics::extract_quantities(batch));
2782 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2783 envelope.add_item(item);
2784
2785 let mut envelope =
2786 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2787 envelope
2788 .set_partition_key(Some(partition_key))
2789 .scope(*scoping);
2790
2791 relay_statsd::metric!(
2792 histogram(RelayHistograms::BucketsPerBatch) = batch.len() as u64
2793 );
2794
2795 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2796 num_batches += 1;
2797 }
2798
2799 relay_statsd::metric!(histogram(RelayHistograms::BatchesPerPartition) = num_batches);
2800 }
2801 }
2802
2803 fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2805 if partition.is_empty() {
2806 return;
2807 }
2808
2809 let (unencoded, project_info) = partition.take();
2810 let http_encoding = self.inner.config.http_encoding();
2811 let encoded = match encode_payload(&unencoded, http_encoding) {
2812 Ok(payload) => payload,
2813 Err(error) => {
2814 let error = &error as &dyn std::error::Error;
2815 relay_log::error!(error, "failed to encode metrics payload");
2816 return;
2817 }
2818 };
2819
2820 let request = SendMetricsRequest {
2821 partition_key: partition_key.to_string(),
2822 unencoded,
2823 encoded,
2824 project_info,
2825 http_encoding,
2826 metric_outcomes: self.inner.metric_outcomes.clone(),
2827 };
2828
2829 self.inner.addrs.upstream_relay.send(SendRequest(request));
2830 }
2831
2832 fn encode_metrics_global(&self, message: FlushBuckets) {
2847 let FlushBuckets {
2848 partition_key,
2849 buckets,
2850 } = message;
2851
2852 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2853 let mut partition = Partition::new(batch_size);
2854 let mut partition_splits = 0;
2855
2856 for ProjectBuckets {
2857 buckets, scoping, ..
2858 } in buckets.values()
2859 {
2860 for bucket in buckets {
2861 let mut remaining = Some(BucketView::new(bucket));
2862
2863 while let Some(bucket) = remaining.take() {
2864 if let Some(next) = partition.insert(bucket, *scoping) {
2865 self.send_global_partition(partition_key, &mut partition);
2869 remaining = Some(next);
2870 partition_splits += 1;
2871 }
2872 }
2873 }
2874 }
2875
2876 if partition_splits > 0 {
2877 metric!(histogram(RelayHistograms::PartitionSplits) = partition_splits);
2878 }
2879
2880 self.send_global_partition(partition_key, &mut partition);
2881 }
2882
2883 async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2884 for (project_key, pb) in message.buckets.iter_mut() {
2885 let buckets = std::mem::take(&mut pb.buckets);
2886 pb.buckets =
2887 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2888 }
2889
2890 #[cfg(feature = "processing")]
2891 if self.inner.config.processing_enabled()
2892 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2893 {
2894 return self
2895 .encode_metrics_processing(message, store_forwarder)
2896 .await;
2897 }
2898
2899 if self.inner.config.http_global_metrics() {
2900 self.encode_metrics_global(message)
2901 } else {
2902 self.encode_metrics_envelope(cogs, message)
2903 }
2904 }
2905
2906 #[cfg(all(test, feature = "processing"))]
2907 fn redis_rate_limiter_enabled(&self) -> bool {
2908 self.inner.rate_limiter.is_some()
2909 }
2910
2911 async fn handle_message(&self, message: EnvelopeProcessor) {
2912 let ty = message.variant();
2913 let feature_weights = self.feature_weights(&message);
2914
2915 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2916 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2917
2918 match message {
2919 EnvelopeProcessor::ProcessEnvelope(m) => {
2920 self.handle_process_envelope(&mut cogs, *m).await
2921 }
2922 EnvelopeProcessor::ProcessProjectMetrics(m) => {
2923 self.handle_process_metrics(&mut cogs, *m)
2924 }
2925 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2926 self.handle_process_batched_metrics(&mut cogs, *m)
2927 }
2928 EnvelopeProcessor::FlushBuckets(m) => {
2929 self.handle_flush_buckets(&mut cogs, *m).await
2930 }
2931 EnvelopeProcessor::SubmitClientReports(m) => {
2932 self.handle_submit_client_reports(&mut cogs, *m)
2933 }
2934 }
2935 });
2936 }
2937
2938 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2939 match message {
2940 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2942 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2943 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2944 EnvelopeProcessor::FlushBuckets(v) => v
2945 .buckets
2946 .values()
2947 .map(|s| {
2948 if self.inner.config.processing_enabled() {
2949 relay_metrics::cogs::ByCount(&s.buckets).into()
2952 } else {
2953 relay_metrics::cogs::BySize(&s.buckets).into()
2954 }
2955 })
2956 .fold(FeatureWeights::none(), FeatureWeights::merge),
2957 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2958 }
2959 }
2960}
2961
2962impl Service for EnvelopeProcessorService {
2963 type Interface = EnvelopeProcessor;
2964
2965 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2966 while let Some(message) = rx.recv().await {
2967 let service = self.clone();
2968 self.inner
2969 .pool
2970 .spawn_async(
2971 async move {
2972 service.handle_message(message).await;
2973 }
2974 .boxed(),
2975 )
2976 .await;
2977 }
2978 }
2979}
2980
2981struct EnforcementResult {
2986 event: Annotated<Event>,
2987 #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2988 rate_limits: RateLimits,
2989}
2990
2991impl EnforcementResult {
2992 pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
2994 Self { event, rate_limits }
2995 }
2996}
2997
2998#[derive(Clone)]
2999enum RateLimiter {
3000 Cached,
3001 #[cfg(feature = "processing")]
3002 Consistent(Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>),
3003}
3004
3005impl RateLimiter {
3006 async fn enforce<Group>(
3007 &self,
3008 managed_envelope: &mut TypedEnvelope<Group>,
3009 event: Annotated<Event>,
3010 _extracted_metrics: &mut ProcessingExtractedMetrics,
3011 ctx: processing::Context<'_>,
3012 ) -> Result<EnforcementResult, ProcessingError> {
3013 if managed_envelope.envelope().is_empty() && event.value().is_none() {
3014 return Ok(EnforcementResult::new(event, RateLimits::default()));
3015 }
3016
3017 let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
3018 if quotas.is_empty() {
3019 return Ok(EnforcementResult::new(event, RateLimits::default()));
3020 }
3021
3022 let event_category = event_category(&event);
3023
3024 let this = self.clone();
3030 let mut envelope_limiter =
3031 EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
3032 let this = this.clone();
3033
3034 async move {
3035 match this {
3036 #[cfg(feature = "processing")]
3037 RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
3038 rate_limiter
3039 .is_rate_limited(quotas, item_scope, _quantity, false)
3040 .await?,
3041 ),
3042 _ => Ok::<_, ProcessingError>(
3043 ctx.rate_limits.check_with_quotas(quotas, item_scope),
3044 ),
3045 }
3046 }
3047 });
3048
3049 if let Some(category) = event_category {
3052 envelope_limiter.assume_event(category);
3053 }
3054
3055 let scoping = managed_envelope.scoping();
3056 let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
3057 envelope_limiter
3058 .compute(managed_envelope.envelope_mut(), &scoping)
3059 .await
3060 })?;
3061 let event_active = enforcement.is_event_active();
3062
3063 #[cfg(feature = "processing")]
3067 _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3068 enforcement.apply_with_outcomes(managed_envelope);
3069
3070 if event_active {
3071 debug_assert!(managed_envelope.envelope().is_empty());
3072 return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3073 }
3074
3075 Ok(EnforcementResult::new(event, rate_limits))
3076 }
3077
3078 fn name(&self) -> &'static str {
3079 match self {
3080 Self::Cached => "cached",
3081 #[cfg(feature = "processing")]
3082 Self::Consistent(_) => "consistent",
3083 }
3084 }
3085}
3086
3087pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3088 let envelope_body: Vec<u8> = match http_encoding {
3089 HttpEncoding::Identity => return Ok(body.clone()),
3090 HttpEncoding::Deflate => {
3091 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3092 encoder.write_all(body.as_ref())?;
3093 encoder.finish()?
3094 }
3095 HttpEncoding::Gzip => {
3096 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3097 encoder.write_all(body.as_ref())?;
3098 encoder.finish()?
3099 }
3100 HttpEncoding::Br => {
3101 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3103 encoder.write_all(body.as_ref())?;
3104 encoder.into_inner()
3105 }
3106 HttpEncoding::Zstd => {
3107 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3110 encoder.write_all(body.as_ref())?;
3111 encoder.finish()?
3112 }
3113 };
3114
3115 Ok(envelope_body.into())
3116}
3117
3118#[derive(Debug)]
3120pub struct SendEnvelope {
3121 pub envelope: TypedEnvelope<Processed>,
3122 pub body: Bytes,
3123 pub http_encoding: HttpEncoding,
3124 pub project_cache: ProjectCacheHandle,
3125}
3126
3127impl UpstreamRequest for SendEnvelope {
3128 fn method(&self) -> reqwest::Method {
3129 reqwest::Method::POST
3130 }
3131
3132 fn path(&self) -> Cow<'_, str> {
3133 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3134 }
3135
3136 fn route(&self) -> &'static str {
3137 "envelope"
3138 }
3139
3140 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3141 let envelope_body = self.body.clone();
3142 metric!(histogram(RelayHistograms::UpstreamEnvelopeBodySize) = envelope_body.len() as u64);
3143
3144 let meta = &self.envelope.meta();
3145 let shard = self.envelope.partition_key().map(|p| p.to_string());
3146 builder
3147 .content_encoding(self.http_encoding)
3148 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3149 .header_opt("User-Agent", meta.user_agent())
3150 .header("X-Sentry-Auth", meta.auth_header())
3151 .header("X-Forwarded-For", meta.forwarded_for())
3152 .header("Content-Type", envelope::CONTENT_TYPE)
3153 .header_opt("X-Sentry-Relay-Shard", shard)
3154 .body(envelope_body);
3155
3156 Ok(())
3157 }
3158
3159 fn sign(&mut self) -> Option<Sign> {
3160 Some(Sign::Optional(SignatureType::RequestSign))
3161 }
3162
3163 fn respond(
3164 self: Box<Self>,
3165 result: Result<http::Response, UpstreamRequestError>,
3166 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3167 Box::pin(async move {
3168 let result = match result {
3169 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3170 Err(error) => Err(error),
3171 };
3172
3173 match result {
3174 Ok(()) => self.envelope.accept(),
3175 Err(error) if error.is_received() => {
3176 let scoping = self.envelope.scoping();
3177 self.envelope.accept();
3178
3179 if let UpstreamRequestError::RateLimited(limits) = error {
3180 self.project_cache
3181 .get(scoping.project_key)
3182 .rate_limits()
3183 .merge(limits.scope(&scoping));
3184 }
3185 }
3186 Err(error) => {
3187 let mut envelope = self.envelope;
3190 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3191 relay_log::error!(
3192 error = &error as &dyn Error,
3193 tags.project_key = %envelope.scoping().project_key,
3194 "error sending envelope"
3195 );
3196 }
3197 }
3198 })
3199 }
3200}
3201
3202#[derive(Debug)]
3209struct Partition<'a> {
3210 max_size: usize,
3211 remaining: usize,
3212 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3213 project_info: HashMap<ProjectKey, Scoping>,
3214}
3215
3216impl<'a> Partition<'a> {
3217 pub fn new(size: usize) -> Self {
3219 Self {
3220 max_size: size,
3221 remaining: size,
3222 views: HashMap::new(),
3223 project_info: HashMap::new(),
3224 }
3225 }
3226
3227 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3238 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3239
3240 if let Some(current) = current {
3241 self.remaining = self.remaining.saturating_sub(current.estimated_size());
3242 self.views
3243 .entry(scoping.project_key)
3244 .or_default()
3245 .push(current);
3246
3247 self.project_info
3248 .entry(scoping.project_key)
3249 .or_insert(scoping);
3250 }
3251
3252 next
3253 }
3254
3255 fn is_empty(&self) -> bool {
3257 self.views.is_empty()
3258 }
3259
3260 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3264 #[derive(serde::Serialize)]
3265 struct Wrapper<'a> {
3266 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3267 }
3268
3269 let buckets = &self.views;
3270 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3271
3272 let scopings = self.project_info.clone();
3273 self.project_info.clear();
3274
3275 self.views.clear();
3276 self.remaining = self.max_size;
3277
3278 (payload, scopings)
3279 }
3280}
3281
3282#[derive(Debug)]
3286struct SendMetricsRequest {
3287 partition_key: String,
3289 unencoded: Bytes,
3291 encoded: Bytes,
3293 project_info: HashMap<ProjectKey, Scoping>,
3297 http_encoding: HttpEncoding,
3299 metric_outcomes: MetricOutcomes,
3301}
3302
3303impl SendMetricsRequest {
3304 fn create_error_outcomes(self) {
3305 #[derive(serde::Deserialize)]
3306 struct Wrapper {
3307 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3308 }
3309
3310 let buckets = match serde_json::from_slice(&self.unencoded) {
3311 Ok(Wrapper { buckets }) => buckets,
3312 Err(err) => {
3313 relay_log::error!(
3314 error = &err as &dyn std::error::Error,
3315 "failed to parse buckets from failed transmission"
3316 );
3317 return;
3318 }
3319 };
3320
3321 for (key, buckets) in buckets {
3322 let Some(&scoping) = self.project_info.get(&key) else {
3323 relay_log::error!("missing scoping for project key");
3324 continue;
3325 };
3326
3327 self.metric_outcomes.track(
3328 scoping,
3329 &buckets,
3330 Outcome::Invalid(DiscardReason::Internal),
3331 );
3332 }
3333 }
3334}
3335
3336impl UpstreamRequest for SendMetricsRequest {
3337 fn set_relay_id(&self) -> bool {
3338 true
3339 }
3340
3341 fn sign(&mut self) -> Option<Sign> {
3342 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3343 }
3344
3345 fn method(&self) -> reqwest::Method {
3346 reqwest::Method::POST
3347 }
3348
3349 fn path(&self) -> Cow<'_, str> {
3350 "/api/0/relays/metrics/".into()
3351 }
3352
3353 fn route(&self) -> &'static str {
3354 "global_metrics"
3355 }
3356
3357 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3358 metric!(histogram(RelayHistograms::UpstreamMetricsBodySize) = self.encoded.len() as u64);
3359
3360 builder
3361 .content_encoding(self.http_encoding)
3362 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3363 .header(header::CONTENT_TYPE, b"application/json")
3364 .body(self.encoded.clone());
3365
3366 Ok(())
3367 }
3368
3369 fn respond(
3370 self: Box<Self>,
3371 result: Result<http::Response, UpstreamRequestError>,
3372 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3373 Box::pin(async {
3374 match result {
3375 Ok(mut response) => {
3376 response.consume().await.ok();
3377 }
3378 Err(error) => {
3379 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3380
3381 if error.is_received() {
3384 return;
3385 }
3386
3387 self.create_error_outcomes()
3388 }
3389 }
3390 })
3391 }
3392}
3393
3394#[derive(Copy, Clone, Debug)]
3396struct CombinedQuotas<'a> {
3397 global_quotas: &'a [Quota],
3398 project_quotas: &'a [Quota],
3399}
3400
3401impl<'a> CombinedQuotas<'a> {
3402 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3404 Self {
3405 global_quotas: &global_config.quotas,
3406 project_quotas,
3407 }
3408 }
3409
3410 pub fn is_empty(&self) -> bool {
3412 self.len() == 0
3413 }
3414
3415 pub fn len(&self) -> usize {
3417 self.global_quotas.len() + self.project_quotas.len()
3418 }
3419}
3420
3421impl<'a> IntoIterator for CombinedQuotas<'a> {
3422 type Item = &'a Quota;
3423 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3424
3425 fn into_iter(self) -> Self::IntoIter {
3426 self.global_quotas.iter().chain(self.project_quotas.iter())
3427 }
3428}
3429
3430#[cfg(test)]
3431mod tests {
3432 use std::collections::BTreeMap;
3433
3434 use insta::assert_debug_snapshot;
3435 use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3436 use relay_common::glob2::LazyGlob;
3437 use relay_dynamic_config::ProjectConfig;
3438 use relay_event_normalization::{
3439 MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3440 TransactionNameRule,
3441 };
3442 use relay_event_schema::protocol::TransactionSource;
3443 use relay_pii::DataScrubbingConfig;
3444 use similar_asserts::assert_eq;
3445
3446 use crate::metrics_extraction::IntoMetric;
3447 use crate::metrics_extraction::transactions::types::{
3448 CommonTags, TransactionMeasurementTags, TransactionMetric,
3449 };
3450 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3451
3452 #[cfg(feature = "processing")]
3453 use {
3454 relay_metrics::BucketValue,
3455 relay_quotas::{QuotaScope, ReasonCode},
3456 relay_test::mock_service,
3457 };
3458
3459 use super::*;
3460
3461 #[cfg(feature = "processing")]
3462 fn mock_quota(id: &str) -> Quota {
3463 Quota {
3464 id: Some(id.into()),
3465 categories: smallvec::smallvec![DataCategory::MetricBucket],
3466 scope: QuotaScope::Organization,
3467 scope_id: None,
3468 limit: Some(0),
3469 window: None,
3470 reason_code: None,
3471 namespace: None,
3472 }
3473 }
3474
3475 #[cfg(feature = "processing")]
3476 #[test]
3477 fn test_dynamic_quotas() {
3478 let global_config = GlobalConfig {
3479 quotas: vec![mock_quota("foo"), mock_quota("bar")],
3480 ..Default::default()
3481 };
3482
3483 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3484
3485 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3486
3487 assert_eq!(dynamic_quotas.len(), 4);
3488 assert!(!dynamic_quotas.is_empty());
3489
3490 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3491 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3492 }
3493
3494 #[cfg(feature = "processing")]
3497 #[tokio::test]
3498 async fn test_ratelimit_per_batch() {
3499 use relay_base_schema::organization::OrganizationId;
3500 use relay_protocol::FiniteF64;
3501
3502 let rate_limited_org = Scoping {
3503 organization_id: OrganizationId::new(1),
3504 project_id: ProjectId::new(21),
3505 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3506 key_id: Some(17),
3507 };
3508
3509 let not_rate_limited_org = Scoping {
3510 organization_id: OrganizationId::new(2),
3511 project_id: ProjectId::new(21),
3512 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3513 key_id: Some(17),
3514 };
3515
3516 let message = {
3517 let project_info = {
3518 let quota = Quota {
3519 id: Some("testing".into()),
3520 categories: vec![DataCategory::MetricBucket].into(),
3521 scope: relay_quotas::QuotaScope::Organization,
3522 scope_id: Some(rate_limited_org.organization_id.to_string()),
3523 limit: Some(0),
3524 window: None,
3525 reason_code: Some(ReasonCode::new("test")),
3526 namespace: None,
3527 };
3528
3529 let mut config = ProjectConfig::default();
3530 config.quotas.push(quota);
3531
3532 Arc::new(ProjectInfo {
3533 config,
3534 ..Default::default()
3535 })
3536 };
3537
3538 let project_metrics = |scoping| ProjectBuckets {
3539 buckets: vec![Bucket {
3540 name: "d:transactions/bar".into(),
3541 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3542 timestamp: UnixTimestamp::now(),
3543 tags: Default::default(),
3544 width: 10,
3545 metadata: BucketMetadata::default(),
3546 }],
3547 rate_limits: Default::default(),
3548 project_info: project_info.clone(),
3549 scoping,
3550 };
3551
3552 let buckets = hashbrown::HashMap::from([
3553 (
3554 rate_limited_org.project_key,
3555 project_metrics(rate_limited_org),
3556 ),
3557 (
3558 not_rate_limited_org.project_key,
3559 project_metrics(not_rate_limited_org),
3560 ),
3561 ]);
3562
3563 FlushBuckets {
3564 partition_key: 0,
3565 buckets,
3566 }
3567 };
3568
3569 assert_eq!(message.buckets.keys().count(), 2);
3571
3572 let config = {
3573 let config_json = serde_json::json!({
3574 "processing": {
3575 "enabled": true,
3576 "kafka_config": [],
3577 "redis": {
3578 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3579 }
3580 }
3581 });
3582 Config::from_json_value(config_json).unwrap()
3583 };
3584
3585 let (store, handle) = {
3586 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3587 let org_id = match msg {
3588 Store::Metrics(x) => x.scoping.organization_id,
3589 _ => panic!("received envelope when expecting only metrics"),
3590 };
3591 org_ids.push(org_id);
3592 };
3593
3594 mock_service("store_forwarder", vec![], f)
3595 };
3596
3597 let processor = create_test_processor(config).await;
3598 assert!(processor.redis_rate_limiter_enabled());
3599
3600 processor.encode_metrics_processing(message, &store).await;
3601
3602 drop(store);
3603 let orgs_not_ratelimited = handle.await.unwrap();
3604
3605 assert_eq!(
3606 orgs_not_ratelimited,
3607 vec![not_rate_limited_org.organization_id]
3608 );
3609 }
3610
3611 #[tokio::test]
3612 async fn test_browser_version_extraction_with_pii_like_data() {
3613 let processor = create_test_processor(Default::default()).await;
3614 let outcome_aggregator = Addr::dummy();
3615 let event_id = EventId::new();
3616
3617 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3618 .parse()
3619 .unwrap();
3620
3621 let request_meta = RequestMeta::new(dsn);
3622 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3623
3624 envelope.add_item({
3625 let mut item = Item::new(ItemType::Event);
3626 item.set_payload(
3627 ContentType::Json,
3628 r#"
3629 {
3630 "request": {
3631 "headers": [
3632 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3633 ]
3634 }
3635 }
3636 "#,
3637 );
3638 item
3639 });
3640
3641 let mut datascrubbing_settings = DataScrubbingConfig::default();
3642 datascrubbing_settings.scrub_data = true;
3644 datascrubbing_settings.scrub_defaults = true;
3645 datascrubbing_settings.scrub_ip_addresses = true;
3646
3647 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3649
3650 let config = ProjectConfig {
3651 datascrubbing_settings,
3652 pii_config: Some(pii_config),
3653 ..Default::default()
3654 };
3655
3656 let project_info = ProjectInfo {
3657 config,
3658 ..Default::default()
3659 };
3660
3661 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3662 assert_eq!(envelopes.len(), 1);
3663
3664 let (group, envelope) = envelopes.pop().unwrap();
3665 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3666
3667 let message = ProcessEnvelopeGrouped {
3668 group,
3669 envelope,
3670 ctx: processing::Context {
3671 project_info: &project_info,
3672 ..processing::Context::for_test()
3673 },
3674 };
3675
3676 let Ok(Some(Submit::Envelope(mut new_envelope))) =
3677 processor.process(&mut Token::noop(), message).await
3678 else {
3679 panic!();
3680 };
3681 let new_envelope = new_envelope.envelope_mut();
3682
3683 let event_item = new_envelope.items().last().unwrap();
3684 let annotated_event: Annotated<Event> =
3685 Annotated::from_json_bytes(&event_item.payload()).unwrap();
3686 let event = annotated_event.into_value().unwrap();
3687 let headers = event
3688 .request
3689 .into_value()
3690 .unwrap()
3691 .headers
3692 .into_value()
3693 .unwrap();
3694
3695 assert_eq!(
3697 Some(
3698 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3699 ),
3700 headers.get_header("User-Agent")
3701 );
3702 let contexts = event.contexts.into_value().unwrap();
3704 let browser = contexts.0.get("browser").unwrap();
3705 assert_eq!(
3706 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3707 browser.to_json().unwrap()
3708 );
3709 }
3710
3711 #[tokio::test]
3712 #[cfg(feature = "processing")]
3713 async fn test_materialize_dsc() {
3714 use crate::services::projects::project::PublicKeyConfig;
3715
3716 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3717 .parse()
3718 .unwrap();
3719 let request_meta = RequestMeta::new(dsn);
3720 let mut envelope = Envelope::from_request(None, request_meta);
3721
3722 let dsc = r#"{
3723 "trace_id": "00000000-0000-0000-0000-000000000000",
3724 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3725 "sample_rate": "0.2"
3726 }"#;
3727 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3728
3729 let mut item = Item::new(ItemType::Event);
3730 item.set_payload(ContentType::Json, r#"{}"#);
3731 envelope.add_item(item);
3732
3733 let outcome_aggregator = Addr::dummy();
3734 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3735
3736 let mut project_info = ProjectInfo::default();
3737 project_info.public_keys.push(PublicKeyConfig {
3738 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3739 numeric_id: Some(1),
3740 });
3741
3742 let config = serde_json::json!({
3743 "processing": {
3744 "enabled": true,
3745 "kafka_config": [],
3746 }
3747 });
3748
3749 let message = ProcessEnvelopeGrouped {
3750 group: ProcessingGroup::Transaction,
3751 envelope: managed_envelope,
3752 ctx: processing::Context {
3753 config: &Config::from_json_value(config.clone()).unwrap(),
3754 project_info: &project_info,
3755 sampling_project_info: Some(&project_info),
3756 ..processing::Context::for_test()
3757 },
3758 };
3759
3760 let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3761 let Ok(Some(Submit::Envelope(envelope))) =
3762 processor.process(&mut Token::noop(), message).await
3763 else {
3764 panic!();
3765 };
3766 let event = envelope
3767 .envelope()
3768 .get_item_by(|item| item.ty() == &ItemType::Event)
3769 .unwrap();
3770
3771 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3772 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3773 Object(
3774 {
3775 "environment": ~,
3776 "public_key": String(
3777 "e12d836b15bb49d7bbf99e64295d995b",
3778 ),
3779 "release": ~,
3780 "replay_id": ~,
3781 "sample_rate": String(
3782 "0.2",
3783 ),
3784 "trace_id": String(
3785 "00000000000000000000000000000000",
3786 ),
3787 "transaction": ~,
3788 },
3789 )
3790 "###);
3791 }
3792
3793 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3794 let mut event = Annotated::<Event>::from_json(
3795 r#"
3796 {
3797 "type": "transaction",
3798 "transaction": "/foo/",
3799 "timestamp": 946684810.0,
3800 "start_timestamp": 946684800.0,
3801 "contexts": {
3802 "trace": {
3803 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3804 "span_id": "fa90fdead5f74053",
3805 "op": "http.server",
3806 "type": "trace"
3807 }
3808 },
3809 "transaction_info": {
3810 "source": "url"
3811 }
3812 }
3813 "#,
3814 )
3815 .unwrap();
3816 let e = event.value_mut().as_mut().unwrap();
3817 e.transaction.set_value(Some(transaction_name.into()));
3818
3819 e.transaction_info
3820 .value_mut()
3821 .as_mut()
3822 .unwrap()
3823 .source
3824 .set_value(Some(source));
3825
3826 relay_statsd::with_capturing_test_client(|| {
3827 utils::log_transaction_name_metrics(&mut event, |event| {
3828 let config = NormalizationConfig {
3829 transaction_name_config: TransactionNameConfig {
3830 rules: &[TransactionNameRule {
3831 pattern: LazyGlob::new("/foo/*/**".to_owned()),
3832 expiry: DateTime::<Utc>::MAX_UTC,
3833 redaction: RedactionRule::Replace {
3834 substitution: "*".to_owned(),
3835 },
3836 }],
3837 },
3838 ..Default::default()
3839 };
3840 relay_event_normalization::normalize_event(event, &config)
3841 });
3842 })
3843 }
3844
3845 #[test]
3846 fn test_log_transaction_metrics_none() {
3847 let captures = capture_test_event("/nothing", TransactionSource::Url);
3848 insta::assert_debug_snapshot!(captures, @r###"
3849 [
3850 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3851 ]
3852 "###);
3853 }
3854
3855 #[test]
3856 fn test_log_transaction_metrics_rule() {
3857 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3858 insta::assert_debug_snapshot!(captures, @r###"
3859 [
3860 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3861 ]
3862 "###);
3863 }
3864
3865 #[test]
3866 fn test_log_transaction_metrics_pattern() {
3867 let captures = capture_test_event("/something/12345", TransactionSource::Url);
3868 insta::assert_debug_snapshot!(captures, @r###"
3869 [
3870 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3871 ]
3872 "###);
3873 }
3874
3875 #[test]
3876 fn test_log_transaction_metrics_both() {
3877 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3878 insta::assert_debug_snapshot!(captures, @r###"
3879 [
3880 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3881 ]
3882 "###);
3883 }
3884
3885 #[test]
3886 fn test_log_transaction_metrics_no_match() {
3887 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3888 insta::assert_debug_snapshot!(captures, @r###"
3889 [
3890 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3891 ]
3892 "###);
3893 }
3894
3895 #[test]
3899 fn test_mri_overhead_constant() {
3900 let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
3901
3902 let derived_value = {
3903 let name = "foobar".to_owned();
3904 let value = 5.into(); let unit = MetricUnit::Duration(DurationUnit::default());
3906 let tags = TransactionMeasurementTags {
3907 measurement_rating: None,
3908 universal_tags: CommonTags(BTreeMap::new()),
3909 score_profile_version: None,
3910 };
3911
3912 let measurement = TransactionMetric::Measurement {
3913 name: name.clone(),
3914 value,
3915 unit,
3916 tags,
3917 };
3918
3919 let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
3920 metric.name.len() - unit.to_string().len() - name.len()
3921 };
3922 assert_eq!(
3923 hardcoded_value, derived_value,
3924 "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
3925 );
3926 }
3927
3928 #[tokio::test]
3929 async fn test_process_metrics_bucket_metadata() {
3930 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3931 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3932 let received_at = Utc::now();
3933 let config = Config::default();
3934
3935 let (aggregator, mut aggregator_rx) = Addr::custom();
3936 let processor = create_test_processor_with_addrs(
3937 config,
3938 Addrs {
3939 aggregator,
3940 ..Default::default()
3941 },
3942 )
3943 .await;
3944
3945 let mut item = Item::new(ItemType::Statsd);
3946 item.set_payload(
3947 ContentType::Text,
3948 "transactions/foo:3182887624:4267882815|s",
3949 );
3950 for (source, expected_received_at) in [
3951 (
3952 BucketSource::External,
3953 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3954 ),
3955 (BucketSource::Internal, None),
3956 ] {
3957 let message = ProcessMetrics {
3958 data: MetricData::Raw(vec![item.clone()]),
3959 project_key,
3960 source,
3961 received_at,
3962 sent_at: Some(Utc::now()),
3963 };
3964 processor.handle_process_metrics(&mut token, message);
3965
3966 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3967 let buckets = merge_buckets.buckets;
3968 assert_eq!(buckets.len(), 1);
3969 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3970 }
3971 }
3972
3973 #[tokio::test]
3974 async fn test_process_batched_metrics() {
3975 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3976 let received_at = Utc::now();
3977 let config = Config::default();
3978
3979 let (aggregator, mut aggregator_rx) = Addr::custom();
3980 let processor = create_test_processor_with_addrs(
3981 config,
3982 Addrs {
3983 aggregator,
3984 ..Default::default()
3985 },
3986 )
3987 .await;
3988
3989 let payload = r#"{
3990 "buckets": {
3991 "11111111111111111111111111111111": [
3992 {
3993 "timestamp": 1615889440,
3994 "width": 0,
3995 "name": "d:custom/endpoint.response_time@millisecond",
3996 "type": "d",
3997 "value": [
3998 68.0
3999 ],
4000 "tags": {
4001 "route": "user_index"
4002 }
4003 }
4004 ],
4005 "22222222222222222222222222222222": [
4006 {
4007 "timestamp": 1615889440,
4008 "width": 0,
4009 "name": "d:custom/endpoint.cache_rate@none",
4010 "type": "d",
4011 "value": [
4012 36.0
4013 ]
4014 }
4015 ]
4016 }
4017}
4018"#;
4019 let message = ProcessBatchedMetrics {
4020 payload: Bytes::from(payload),
4021 source: BucketSource::Internal,
4022 received_at,
4023 sent_at: Some(Utc::now()),
4024 };
4025 processor.handle_process_batched_metrics(&mut token, message);
4026
4027 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
4028 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
4029
4030 let mut messages = vec![mb1, mb2];
4031 messages.sort_by_key(|mb| mb.project_key);
4032
4033 let actual = messages
4034 .into_iter()
4035 .map(|mb| (mb.project_key, mb.buckets))
4036 .collect::<Vec<_>>();
4037
4038 assert_debug_snapshot!(actual, @r###"
4039 [
4040 (
4041 ProjectKey("11111111111111111111111111111111"),
4042 [
4043 Bucket {
4044 timestamp: UnixTimestamp(1615889440),
4045 width: 0,
4046 name: MetricName(
4047 "d:custom/endpoint.response_time@millisecond",
4048 ),
4049 value: Distribution(
4050 [
4051 68.0,
4052 ],
4053 ),
4054 tags: {
4055 "route": "user_index",
4056 },
4057 metadata: BucketMetadata {
4058 merges: 1,
4059 received_at: None,
4060 extracted_from_indexed: false,
4061 },
4062 },
4063 ],
4064 ),
4065 (
4066 ProjectKey("22222222222222222222222222222222"),
4067 [
4068 Bucket {
4069 timestamp: UnixTimestamp(1615889440),
4070 width: 0,
4071 name: MetricName(
4072 "d:custom/endpoint.cache_rate@none",
4073 ),
4074 value: Distribution(
4075 [
4076 36.0,
4077 ],
4078 ),
4079 tags: {},
4080 metadata: BucketMetadata {
4081 merges: 1,
4082 received_at: None,
4083 extracted_from_indexed: false,
4084 },
4085 },
4086 ],
4087 ),
4088 ]
4089 "###);
4090 }
4091}