1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27 ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42 self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::{Integration, SpansIntegration};
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::ExtractedMetrics;
49use crate::metrics_extraction::transactions::types::ExtractMetricsError;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::profile_chunks::ProfileChunksProcessor;
53use crate::processing::sessions::SessionsProcessor;
54use crate::processing::spans::SpansProcessor;
55use crate::processing::trace_attachments::TraceAttachmentsProcessor;
56use crate::processing::trace_metrics::TraceMetricsProcessor;
57use crate::processing::transactions::extraction::ExtractMetricsContext;
58use crate::processing::utils::event::{
59 EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
60 event_type,
61};
62use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
63use crate::service::ServiceError;
64use crate::services::global_config::GlobalConfigHandle;
65use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
66use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
67use crate::services::projects::cache::ProjectCacheHandle;
68use crate::services::projects::project::{ProjectInfo, ProjectState};
69use crate::services::upstream::{
70 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
71};
72use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
73use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
74use crate::{http, processing};
75use relay_threading::AsyncPool;
76#[cfg(feature = "processing")]
77use {
78 crate::services::processor::nnswitch::SwitchProcessingError,
79 crate::services::store::{Store, StoreEnvelope},
80 crate::services::upload::Upload,
81 crate::utils::Enforcement,
82 itertools::Itertools,
83 relay_cardinality::{
84 CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
85 RedisSetLimiterOptions,
86 },
87 relay_dynamic_config::CardinalityLimiterMode,
88 relay_quotas::{RateLimitingError, RedisRateLimiter},
89 relay_redis::{AsyncRedisClient, RedisClients},
90 std::time::Instant,
91 symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
92};
93
94mod attachment;
95mod dynamic_sampling;
96mod event;
97mod metrics;
98mod nel;
99mod profile;
100mod replay;
101mod report;
102mod span;
103
104#[cfg(all(sentry, feature = "processing"))]
105mod playstation;
106mod standalone;
107#[cfg(feature = "processing")]
108mod unreal;
109
110#[cfg(feature = "processing")]
111mod nnswitch;
112
113macro_rules! if_processing {
117 ($config:expr, $if_true:block) => {
118 #[cfg(feature = "processing")] {
119 if $config.processing_enabled() $if_true
120 }
121 };
122 ($config:expr, $if_true:block else $if_false:block) => {
123 {
124 #[cfg(feature = "processing")] {
125 if $config.processing_enabled() $if_true else $if_false
126 }
127 #[cfg(not(feature = "processing"))] {
128 $if_false
129 }
130 }
131 };
132}
133
134pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
136
137#[derive(Debug)]
138pub struct GroupTypeError;
139
140impl Display for GroupTypeError {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 f.write_str("failed to convert processing group into corresponding type")
143 }
144}
145
146impl std::error::Error for GroupTypeError {}
147
148macro_rules! processing_group {
149 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
150 #[derive(Clone, Copy, Debug)]
151 pub struct $ty;
152
153 impl From<$ty> for ProcessingGroup {
154 fn from(_: $ty) -> Self {
155 ProcessingGroup::$variant
156 }
157 }
158
159 impl TryFrom<ProcessingGroup> for $ty {
160 type Error = GroupTypeError;
161
162 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
163 if matches!(value, ProcessingGroup::$variant) {
164 return Ok($ty);
165 }
166 $($(
167 if matches!(value, ProcessingGroup::$other) {
168 return Ok($ty);
169 }
170 )+)?
171 return Err(GroupTypeError);
172 }
173 }
174 };
175}
176
177pub trait EventProcessing {}
181
182processing_group!(TransactionGroup, Transaction);
183impl EventProcessing for TransactionGroup {}
184
185processing_group!(ErrorGroup, Error);
186impl EventProcessing for ErrorGroup {}
187
188processing_group!(SessionGroup, Session);
189processing_group!(StandaloneGroup, Standalone);
190processing_group!(ClientReportGroup, ClientReport);
191processing_group!(ReplayGroup, Replay);
192processing_group!(CheckInGroup, CheckIn);
193processing_group!(LogGroup, Log, Nel);
194processing_group!(TraceMetricGroup, TraceMetric);
195processing_group!(SpanGroup, Span);
196
197processing_group!(ProfileChunkGroup, ProfileChunk);
198processing_group!(MetricsGroup, Metrics);
199processing_group!(ForwardUnknownGroup, ForwardUnknown);
200processing_group!(Ungrouped, Ungrouped);
201
202#[derive(Clone, Copy, Debug)]
206pub struct Processed;
207
208#[derive(Clone, Copy, Debug)]
210pub enum ProcessingGroup {
211 Transaction,
215 Error,
220 Session,
222 Standalone,
225 ClientReport,
227 Replay,
229 CheckIn,
231 Nel,
233 Log,
235 TraceMetric,
237 Span,
239 SpanV2,
241 Metrics,
243 ProfileChunk,
245 TraceAttachment,
247 ForwardUnknown,
250 Ungrouped,
252}
253
254impl ProcessingGroup {
255 fn split_envelope(
257 mut envelope: Envelope,
258 project_info: &ProjectInfo,
259 ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
260 let headers = envelope.headers().clone();
261 let mut grouped_envelopes = smallvec![];
262
263 let replay_items = envelope.take_items_by(|item| {
265 matches!(
266 item.ty(),
267 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
268 )
269 });
270 if !replay_items.is_empty() {
271 grouped_envelopes.push((
272 ProcessingGroup::Replay,
273 Envelope::from_parts(headers.clone(), replay_items),
274 ))
275 }
276
277 let session_items = envelope
279 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
280 if !session_items.is_empty() {
281 grouped_envelopes.push((
282 ProcessingGroup::Session,
283 Envelope::from_parts(headers.clone(), session_items),
284 ))
285 }
286
287 let span_v2_items = envelope.take_items_by(|item| {
288 let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
289 let otlp_feature = project_info.has_feature(Feature::SpanV2OtlpProcessing);
290 let is_supported_integration = {
291 matches!(
292 item.integration(),
293 Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
294 )
295 };
296 let is_span = matches!(item.ty(), &ItemType::Span);
297 let is_span_attachment = item.is_span_attachment();
298
299 ItemContainer::<SpanV2>::is_container(item)
300 || (exp_feature && is_span)
301 || ((exp_feature || otlp_feature) && is_supported_integration)
302 || (exp_feature && is_span_attachment)
303 });
304
305 if !span_v2_items.is_empty() {
306 grouped_envelopes.push((
307 ProcessingGroup::SpanV2,
308 Envelope::from_parts(headers.clone(), span_v2_items),
309 ))
310 }
311
312 let span_items = envelope.take_items_by(|item| {
314 matches!(item.ty(), &ItemType::Span)
315 || matches!(item.integration(), Some(Integration::Spans(_)))
316 });
317
318 if !span_items.is_empty() {
319 grouped_envelopes.push((
320 ProcessingGroup::Span,
321 Envelope::from_parts(headers.clone(), span_items),
322 ))
323 }
324
325 let logs_items = envelope.take_items_by(|item| {
327 matches!(item.ty(), &ItemType::Log)
328 || matches!(item.integration(), Some(Integration::Logs(_)))
329 });
330 if !logs_items.is_empty() {
331 grouped_envelopes.push((
332 ProcessingGroup::Log,
333 Envelope::from_parts(headers.clone(), logs_items),
334 ))
335 }
336
337 let trace_metric_items =
339 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
340 if !trace_metric_items.is_empty() {
341 grouped_envelopes.push((
342 ProcessingGroup::TraceMetric,
343 Envelope::from_parts(headers.clone(), trace_metric_items),
344 ))
345 }
346
347 let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
349 if !nel_items.is_empty() {
350 grouped_envelopes.push((
351 ProcessingGroup::Nel,
352 Envelope::from_parts(headers.clone(), nel_items),
353 ))
354 }
355
356 let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
361 if !metric_items.is_empty() {
362 grouped_envelopes.push((
363 ProcessingGroup::Metrics,
364 Envelope::from_parts(headers.clone(), metric_items),
365 ))
366 }
367
368 let profile_chunk_items =
370 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
371 if !profile_chunk_items.is_empty() {
372 grouped_envelopes.push((
373 ProcessingGroup::ProfileChunk,
374 Envelope::from_parts(headers.clone(), profile_chunk_items),
375 ))
376 }
377
378 let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
379 if !trace_attachment_items.is_empty() {
380 grouped_envelopes.push((
381 ProcessingGroup::TraceAttachment,
382 Envelope::from_parts(headers.clone(), trace_attachment_items),
383 ))
384 }
385
386 if !envelope.items().any(Item::creates_event) {
391 let standalone_items = envelope.take_items_by(Item::requires_event);
392 if !standalone_items.is_empty() {
393 grouped_envelopes.push((
394 ProcessingGroup::Standalone,
395 Envelope::from_parts(headers.clone(), standalone_items),
396 ))
397 }
398 };
399
400 let security_reports_items = envelope
402 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
403 .into_iter()
404 .map(|item| {
405 let headers = headers.clone();
406 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
407 let mut envelope = Envelope::from_parts(headers, items);
408 envelope.set_event_id(EventId::new());
409 (ProcessingGroup::Error, envelope)
410 });
411 grouped_envelopes.extend(security_reports_items);
412
413 let require_event_items = envelope.take_items_by(Item::requires_event);
415 if !require_event_items.is_empty() {
416 let group = if require_event_items
417 .iter()
418 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
419 {
420 ProcessingGroup::Transaction
421 } else {
422 ProcessingGroup::Error
423 };
424
425 grouped_envelopes.push((
426 group,
427 Envelope::from_parts(headers.clone(), require_event_items),
428 ))
429 }
430
431 let envelopes = envelope.items_mut().map(|item| {
433 let headers = headers.clone();
434 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
435 let envelope = Envelope::from_parts(headers, items);
436 let item_type = item.ty();
437 let group = if matches!(item_type, &ItemType::CheckIn) {
438 ProcessingGroup::CheckIn
439 } else if matches!(item.ty(), &ItemType::ClientReport) {
440 ProcessingGroup::ClientReport
441 } else if matches!(item_type, &ItemType::Unknown(_)) {
442 ProcessingGroup::ForwardUnknown
443 } else {
444 ProcessingGroup::Ungrouped
446 };
447
448 (group, envelope)
449 });
450 grouped_envelopes.extend(envelopes);
451
452 grouped_envelopes
453 }
454
455 pub fn variant(&self) -> &'static str {
457 match self {
458 ProcessingGroup::Transaction => "transaction",
459 ProcessingGroup::Error => "error",
460 ProcessingGroup::Session => "session",
461 ProcessingGroup::Standalone => "standalone",
462 ProcessingGroup::ClientReport => "client_report",
463 ProcessingGroup::Replay => "replay",
464 ProcessingGroup::CheckIn => "check_in",
465 ProcessingGroup::Log => "log",
466 ProcessingGroup::TraceMetric => "trace_metric",
467 ProcessingGroup::Nel => "nel",
468 ProcessingGroup::Span => "span",
469 ProcessingGroup::SpanV2 => "span_v2",
470 ProcessingGroup::Metrics => "metrics",
471 ProcessingGroup::ProfileChunk => "profile_chunk",
472 ProcessingGroup::TraceAttachment => "trace_attachment",
473 ProcessingGroup::ForwardUnknown => "forward_unknown",
474 ProcessingGroup::Ungrouped => "ungrouped",
475 }
476 }
477}
478
479impl From<ProcessingGroup> for AppFeature {
480 fn from(value: ProcessingGroup) -> Self {
481 match value {
482 ProcessingGroup::Transaction => AppFeature::Transactions,
483 ProcessingGroup::Error => AppFeature::Errors,
484 ProcessingGroup::Session => AppFeature::Sessions,
485 ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
486 ProcessingGroup::ClientReport => AppFeature::ClientReports,
487 ProcessingGroup::Replay => AppFeature::Replays,
488 ProcessingGroup::CheckIn => AppFeature::CheckIns,
489 ProcessingGroup::Log => AppFeature::Logs,
490 ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
491 ProcessingGroup::Nel => AppFeature::Logs,
492 ProcessingGroup::Span => AppFeature::Spans,
493 ProcessingGroup::SpanV2 => AppFeature::Spans,
494 ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
495 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
496 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
497 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
498 ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
499 }
500 }
501}
502
503#[derive(Debug, thiserror::Error)]
505pub enum ProcessingError {
506 #[error("invalid json in event")]
507 InvalidJson(#[source] serde_json::Error),
508
509 #[error("invalid message pack event payload")]
510 InvalidMsgpack(#[from] rmp_serde::decode::Error),
511
512 #[cfg(feature = "processing")]
513 #[error("invalid unreal crash report")]
514 InvalidUnrealReport(#[source] Unreal4Error),
515
516 #[error("event payload too large")]
517 PayloadTooLarge(DiscardItemType),
518
519 #[error("invalid transaction event")]
520 InvalidTransaction,
521
522 #[error("envelope processor failed")]
523 ProcessingFailed(#[from] ProcessingAction),
524
525 #[error("duplicate {0} in event")]
526 DuplicateItem(ItemType),
527
528 #[error("failed to extract event payload")]
529 NoEventPayload,
530
531 #[error("missing project id in DSN")]
532 MissingProjectId,
533
534 #[error("invalid security report type: {0:?}")]
535 InvalidSecurityType(Bytes),
536
537 #[error("unsupported security report type")]
538 UnsupportedSecurityType,
539
540 #[error("invalid security report")]
541 InvalidSecurityReport(#[source] serde_json::Error),
542
543 #[error("invalid nel report")]
544 InvalidNelReport(#[source] NetworkReportError),
545
546 #[error("event filtered with reason: {0:?}")]
547 EventFiltered(FilterStatKey),
548
549 #[error("missing or invalid required event timestamp")]
550 InvalidTimestamp,
551
552 #[error("could not serialize event payload")]
553 SerializeFailed(#[source] serde_json::Error),
554
555 #[cfg(feature = "processing")]
556 #[error("failed to apply quotas")]
557 QuotasFailed(#[from] RateLimitingError),
558
559 #[error("invalid pii config")]
560 PiiConfigError(PiiConfigError),
561
562 #[error("invalid processing group type")]
563 InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
564
565 #[error("invalid replay")]
566 InvalidReplay(DiscardReason),
567
568 #[error("replay filtered with reason: {0:?}")]
569 ReplayFiltered(FilterStatKey),
570
571 #[cfg(feature = "processing")]
572 #[error("nintendo switch dying message processing failed {0:?}")]
573 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
574
575 #[cfg(all(sentry, feature = "processing"))]
576 #[error("playstation dump processing failed: {0}")]
577 InvalidPlaystationDump(String),
578
579 #[error("processing group does not match specific processor")]
580 ProcessingGroupMismatch,
581 #[error("new processing pipeline failed")]
582 ProcessingFailure,
583}
584
585impl ProcessingError {
586 pub fn to_outcome(&self) -> Option<Outcome> {
587 match self {
588 Self::PayloadTooLarge(payload_type) => {
589 Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
590 }
591 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
592 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
593 Self::InvalidSecurityType(_) => {
594 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
595 }
596 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
597 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
598 Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
599 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
600 Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
601 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
602 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
603 #[cfg(feature = "processing")]
604 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
605 #[cfg(all(sentry, feature = "processing"))]
606 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
607 #[cfg(feature = "processing")]
608 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
609 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
610 }
611 #[cfg(feature = "processing")]
612 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
613 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
614 Some(Outcome::Invalid(DiscardReason::Internal))
615 }
616 #[cfg(feature = "processing")]
617 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
618 Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
619 Self::MissingProjectId => None,
620 Self::EventFiltered(_) => None,
621 Self::InvalidProcessingGroup(_) => None,
622 Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
623 Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
624
625 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
626 Self::ProcessingFailure => None,
628 }
629 }
630
631 fn is_unexpected(&self) -> bool {
632 self.to_outcome()
633 .is_some_and(|outcome| outcome.is_unexpected())
634 }
635}
636
637#[cfg(feature = "processing")]
638impl From<Unreal4Error> for ProcessingError {
639 fn from(err: Unreal4Error) -> Self {
640 match err.kind() {
641 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
642 _ => ProcessingError::InvalidUnrealReport(err),
643 }
644 }
645}
646
647impl From<ExtractMetricsError> for ProcessingError {
648 fn from(error: ExtractMetricsError) -> Self {
649 match error {
650 ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
651 Self::InvalidTimestamp
652 }
653 }
654 }
655}
656
657impl From<InvalidProcessingGroupType> for ProcessingError {
658 fn from(value: InvalidProcessingGroupType) -> Self {
659 Self::InvalidProcessingGroup(Box::new(value))
660 }
661}
662
663type ExtractedEvent = (Annotated<Event>, usize);
664
665#[derive(Debug)]
670pub struct ProcessingExtractedMetrics {
671 metrics: ExtractedMetrics,
672}
673
674impl ProcessingExtractedMetrics {
675 pub fn new() -> Self {
676 Self {
677 metrics: ExtractedMetrics::default(),
678 }
679 }
680
681 pub fn into_inner(self) -> ExtractedMetrics {
682 self.metrics
683 }
684
685 pub fn extend(
687 &mut self,
688 extracted: ExtractedMetrics,
689 sampling_decision: Option<SamplingDecision>,
690 ) {
691 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
692 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
693 }
694
695 pub fn extend_project_metrics<I>(
697 &mut self,
698 buckets: I,
699 sampling_decision: Option<SamplingDecision>,
700 ) where
701 I: IntoIterator<Item = Bucket>,
702 {
703 self.metrics
704 .project_metrics
705 .extend(buckets.into_iter().map(|mut bucket| {
706 bucket.metadata.extracted_from_indexed =
707 sampling_decision == Some(SamplingDecision::Keep);
708 bucket
709 }));
710 }
711
712 pub fn extend_sampling_metrics<I>(
714 &mut self,
715 buckets: I,
716 sampling_decision: Option<SamplingDecision>,
717 ) where
718 I: IntoIterator<Item = Bucket>,
719 {
720 self.metrics
721 .sampling_metrics
722 .extend(buckets.into_iter().map(|mut bucket| {
723 bucket.metadata.extracted_from_indexed =
724 sampling_decision == Some(SamplingDecision::Keep);
725 bucket
726 }));
727 }
728
729 #[cfg(feature = "processing")]
734 fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
735 let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
737 let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
740
741 for (namespace, limit, indexed) in [
742 (
743 MetricNamespace::Transactions,
744 &enforcement.event,
745 &enforcement.event_indexed,
746 ),
747 (
748 MetricNamespace::Spans,
749 &enforcement.spans,
750 &enforcement.spans_indexed,
751 ),
752 ] {
753 if limit.is_active() {
754 drop_namespaces.push(namespace);
755 } else if indexed.is_active() && !enforced_consistently {
756 reset_extracted_from_indexed.push(namespace);
761 }
762 }
763
764 if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
765 self.retain_mut(|bucket| {
766 let Some(namespace) = bucket.name.try_namespace() else {
767 return true;
768 };
769
770 if drop_namespaces.contains(&namespace) {
771 return false;
772 }
773
774 if reset_extracted_from_indexed.contains(&namespace) {
775 bucket.metadata.extracted_from_indexed = false;
776 }
777
778 true
779 });
780 }
781 }
782
783 #[cfg(feature = "processing")]
784 fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
785 self.metrics.project_metrics.retain_mut(&mut f);
786 self.metrics.sampling_metrics.retain_mut(&mut f);
787 }
788}
789
790fn send_metrics(
791 metrics: ExtractedMetrics,
792 project_key: ProjectKey,
793 sampling_key: Option<ProjectKey>,
794 aggregator: &Addr<Aggregator>,
795) {
796 let ExtractedMetrics {
797 project_metrics,
798 sampling_metrics,
799 } = metrics;
800
801 if !project_metrics.is_empty() {
802 aggregator.send(MergeBuckets {
803 project_key,
804 buckets: project_metrics,
805 });
806 }
807
808 if !sampling_metrics.is_empty() {
809 let sampling_project_key = sampling_key.unwrap_or(project_key);
816 aggregator.send(MergeBuckets {
817 project_key: sampling_project_key,
818 buckets: sampling_metrics,
819 });
820 }
821}
822
823fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
828 match config.relay_mode() {
829 RelayMode::Proxy => false,
830 RelayMode::Managed => !project_info.has_feature(feature),
831 }
832}
833
834#[derive(Debug)]
837#[expect(
838 clippy::large_enum_variant,
839 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
840)]
841enum ProcessingResult {
842 Envelope {
843 managed_envelope: TypedEnvelope<Processed>,
844 extracted_metrics: ProcessingExtractedMetrics,
845 },
846 Output(Output<Outputs>),
847}
848
849impl ProcessingResult {
850 fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
852 Self::Envelope {
853 managed_envelope,
854 extracted_metrics: ProcessingExtractedMetrics::new(),
855 }
856 }
857}
858
859#[derive(Debug)]
861#[expect(
862 clippy::large_enum_variant,
863 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
864)]
865enum Submit<'a> {
866 Envelope(TypedEnvelope<Processed>),
868 Output {
870 output: Outputs,
871 ctx: processing::ForwardContext<'a>,
872 },
873}
874
875#[derive(Debug)]
885pub struct ProcessEnvelope {
886 pub envelope: ManagedEnvelope,
888 pub project_info: Arc<ProjectInfo>,
890 pub rate_limits: Arc<RateLimits>,
892 pub sampling_project_info: Option<Arc<ProjectInfo>>,
894 pub reservoir_counters: ReservoirCounters,
896}
897
898#[derive(Debug)]
900struct ProcessEnvelopeGrouped<'a> {
901 pub group: ProcessingGroup,
903 pub envelope: ManagedEnvelope,
905 pub ctx: processing::Context<'a>,
907}
908
909#[derive(Debug)]
921pub struct ProcessMetrics {
922 pub data: MetricData,
924 pub project_key: ProjectKey,
926 pub source: BucketSource,
928 pub received_at: DateTime<Utc>,
930 pub sent_at: Option<DateTime<Utc>>,
933}
934
935#[derive(Debug)]
937pub enum MetricData {
938 Raw(Vec<Item>),
940 Parsed(Vec<Bucket>),
942}
943
944impl MetricData {
945 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
950 let items = match self {
951 Self::Parsed(buckets) => return buckets,
952 Self::Raw(items) => items,
953 };
954
955 let mut buckets = Vec::new();
956 for item in items {
957 let payload = item.payload();
958 if item.ty() == &ItemType::Statsd {
959 for bucket_result in Bucket::parse_all(&payload, timestamp) {
960 match bucket_result {
961 Ok(bucket) => buckets.push(bucket),
962 Err(error) => relay_log::debug!(
963 error = &error as &dyn Error,
964 "failed to parse metric bucket from statsd format",
965 ),
966 }
967 }
968 } else if item.ty() == &ItemType::MetricBuckets {
969 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
970 Ok(parsed_buckets) => {
971 if buckets.is_empty() {
973 buckets = parsed_buckets;
974 } else {
975 buckets.extend(parsed_buckets);
976 }
977 }
978 Err(error) => {
979 relay_log::debug!(
980 error = &error as &dyn Error,
981 "failed to parse metric bucket",
982 );
983 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
984 }
985 }
986 } else {
987 relay_log::error!(
988 "invalid item of type {} passed to ProcessMetrics",
989 item.ty()
990 );
991 }
992 }
993 buckets
994 }
995}
996
997#[derive(Debug)]
998pub struct ProcessBatchedMetrics {
999 pub payload: Bytes,
1001 pub source: BucketSource,
1003 pub received_at: DateTime<Utc>,
1005 pub sent_at: Option<DateTime<Utc>>,
1007}
1008
1009#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1011pub enum BucketSource {
1012 Internal,
1018 External,
1023}
1024
1025impl BucketSource {
1026 pub fn from_meta(meta: &RequestMeta) -> Self {
1028 match meta.request_trust() {
1029 RequestTrust::Trusted => Self::Internal,
1030 RequestTrust::Untrusted => Self::External,
1031 }
1032 }
1033}
1034
1035#[derive(Debug)]
1037pub struct SubmitClientReports {
1038 pub client_reports: Vec<ClientReport>,
1040 pub scoping: Scoping,
1042}
1043
1044#[derive(Debug)]
1046pub enum EnvelopeProcessor {
1047 ProcessEnvelope(Box<ProcessEnvelope>),
1048 ProcessProjectMetrics(Box<ProcessMetrics>),
1049 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1050 FlushBuckets(Box<FlushBuckets>),
1051 SubmitClientReports(Box<SubmitClientReports>),
1052}
1053
1054impl EnvelopeProcessor {
1055 pub fn variant(&self) -> &'static str {
1057 match self {
1058 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1059 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1060 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1061 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1062 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1063 }
1064 }
1065}
1066
1067impl relay_system::Interface for EnvelopeProcessor {}
1068
1069impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1070 type Response = relay_system::NoResponse;
1071
1072 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1073 Self::ProcessEnvelope(Box::new(message))
1074 }
1075}
1076
1077impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1078 type Response = NoResponse;
1079
1080 fn from_message(message: ProcessMetrics, _: ()) -> Self {
1081 Self::ProcessProjectMetrics(Box::new(message))
1082 }
1083}
1084
1085impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1086 type Response = NoResponse;
1087
1088 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1089 Self::ProcessBatchedMetrics(Box::new(message))
1090 }
1091}
1092
1093impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1094 type Response = NoResponse;
1095
1096 fn from_message(message: FlushBuckets, _: ()) -> Self {
1097 Self::FlushBuckets(Box::new(message))
1098 }
1099}
1100
1101impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1102 type Response = NoResponse;
1103
1104 fn from_message(message: SubmitClientReports, _: ()) -> Self {
1105 Self::SubmitClientReports(Box::new(message))
1106 }
1107}
1108
1109pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1111
1112#[derive(Clone)]
1116pub struct EnvelopeProcessorService {
1117 inner: Arc<InnerProcessor>,
1118}
1119
1120pub struct Addrs {
1122 pub outcome_aggregator: Addr<TrackOutcome>,
1123 pub upstream_relay: Addr<UpstreamRelay>,
1124 #[cfg(feature = "processing")]
1125 pub upload: Option<Addr<Upload>>,
1126 #[cfg(feature = "processing")]
1127 pub store_forwarder: Option<Addr<Store>>,
1128 pub aggregator: Addr<Aggregator>,
1129}
1130
1131impl Default for Addrs {
1132 fn default() -> Self {
1133 Addrs {
1134 outcome_aggregator: Addr::dummy(),
1135 upstream_relay: Addr::dummy(),
1136 #[cfg(feature = "processing")]
1137 upload: None,
1138 #[cfg(feature = "processing")]
1139 store_forwarder: None,
1140 aggregator: Addr::dummy(),
1141 }
1142 }
1143}
1144
1145struct InnerProcessor {
1146 pool: EnvelopeProcessorServicePool,
1147 config: Arc<Config>,
1148 global_config: GlobalConfigHandle,
1149 project_cache: ProjectCacheHandle,
1150 cogs: Cogs,
1151 #[cfg(feature = "processing")]
1152 quotas_client: Option<AsyncRedisClient>,
1153 addrs: Addrs,
1154 #[cfg(feature = "processing")]
1155 rate_limiter: Option<Arc<RedisRateLimiter>>,
1156 geoip_lookup: GeoIpLookup,
1157 #[cfg(feature = "processing")]
1158 cardinality_limiter: Option<CardinalityLimiter>,
1159 metric_outcomes: MetricOutcomes,
1160 processing: Processing,
1161}
1162
1163struct Processing {
1164 logs: LogsProcessor,
1165 trace_metrics: TraceMetricsProcessor,
1166 spans: SpansProcessor,
1167 check_ins: CheckInsProcessor,
1168 sessions: SessionsProcessor,
1169 profile_chunks: ProfileChunksProcessor,
1170 trace_attachments: TraceAttachmentsProcessor,
1171}
1172
1173impl EnvelopeProcessorService {
1174 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1176 pub fn new(
1177 pool: EnvelopeProcessorServicePool,
1178 config: Arc<Config>,
1179 global_config: GlobalConfigHandle,
1180 project_cache: ProjectCacheHandle,
1181 cogs: Cogs,
1182 #[cfg(feature = "processing")] redis: Option<RedisClients>,
1183 addrs: Addrs,
1184 metric_outcomes: MetricOutcomes,
1185 ) -> Self {
1186 let geoip_lookup = config
1187 .geoip_path()
1188 .and_then(
1189 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1190 Ok(geoip) => Some(geoip),
1191 Err(err) => {
1192 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1193 None
1194 }
1195 },
1196 )
1197 .unwrap_or_else(GeoIpLookup::empty);
1198
1199 #[cfg(feature = "processing")]
1200 let (cardinality, quotas) = match redis {
1201 Some(RedisClients {
1202 cardinality,
1203 quotas,
1204 ..
1205 }) => (Some(cardinality), Some(quotas)),
1206 None => (None, None),
1207 };
1208
1209 #[cfg(feature = "processing")]
1210 let rate_limiter = quotas.clone().map(|redis| {
1211 RedisRateLimiter::new(redis)
1212 .max_limit(config.max_rate_limit())
1213 .cache(config.quota_cache_ratio(), config.quota_cache_max())
1214 });
1215
1216 let quota_limiter = Arc::new(QuotaRateLimiter::new(
1217 #[cfg(feature = "processing")]
1218 project_cache.clone(),
1219 #[cfg(feature = "processing")]
1220 rate_limiter.clone(),
1221 ));
1222 #[cfg(feature = "processing")]
1223 let rate_limiter = rate_limiter.map(Arc::new);
1224
1225 let inner = InnerProcessor {
1226 pool,
1227 global_config,
1228 project_cache,
1229 cogs,
1230 #[cfg(feature = "processing")]
1231 quotas_client: quotas.clone(),
1232 #[cfg(feature = "processing")]
1233 rate_limiter,
1234 addrs,
1235 #[cfg(feature = "processing")]
1236 cardinality_limiter: cardinality
1237 .map(|cardinality| {
1238 RedisSetLimiter::new(
1239 RedisSetLimiterOptions {
1240 cache_vacuum_interval: config
1241 .cardinality_limiter_cache_vacuum_interval(),
1242 },
1243 cardinality,
1244 )
1245 })
1246 .map(CardinalityLimiter::new),
1247 metric_outcomes,
1248 processing: Processing {
1249 logs: LogsProcessor::new(Arc::clone("a_limiter)),
1250 trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)),
1251 spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1252 check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)),
1253 sessions: SessionsProcessor::new(Arc::clone("a_limiter)),
1254 profile_chunks: ProfileChunksProcessor::new(Arc::clone("a_limiter)),
1255 trace_attachments: TraceAttachmentsProcessor::new(quota_limiter),
1256 },
1257 geoip_lookup,
1258 config,
1259 };
1260
1261 Self {
1262 inner: Arc::new(inner),
1263 }
1264 }
1265
1266 async fn enforce_quotas<Group>(
1267 &self,
1268 managed_envelope: &mut TypedEnvelope<Group>,
1269 event: Annotated<Event>,
1270 extracted_metrics: &mut ProcessingExtractedMetrics,
1271 ctx: processing::Context<'_>,
1272 ) -> Result<Annotated<Event>, ProcessingError> {
1273 let cached_result = RateLimiter::Cached
1276 .enforce(managed_envelope, event, extracted_metrics, ctx)
1277 .await?;
1278
1279 if_processing!(self.inner.config, {
1280 let rate_limiter = match self.inner.rate_limiter.clone() {
1281 Some(rate_limiter) => rate_limiter,
1282 None => return Ok(cached_result.event),
1283 };
1284
1285 let consistent_result = RateLimiter::Consistent(rate_limiter)
1287 .enforce(
1288 managed_envelope,
1289 cached_result.event,
1290 extracted_metrics,
1291 ctx
1292 )
1293 .await?;
1294
1295 if !consistent_result.rate_limits.is_empty() {
1297 self.inner
1298 .project_cache
1299 .get(managed_envelope.scoping().project_key)
1300 .rate_limits()
1301 .merge(consistent_result.rate_limits);
1302 }
1303
1304 Ok(consistent_result.event)
1305 } else { Ok(cached_result.event) })
1306 }
1307
1308 async fn process_errors(
1310 &self,
1311 managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1312 project_id: ProjectId,
1313 mut ctx: processing::Context<'_>,
1314 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1315 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1316 let mut metrics = Metrics::default();
1317 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1318
1319 report::process_user_reports(managed_envelope);
1321
1322 if_processing!(self.inner.config, {
1323 unreal::expand(managed_envelope, &self.inner.config)?;
1324 #[cfg(sentry)]
1325 playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1326 nnswitch::expand(managed_envelope)?;
1327 });
1328
1329 let extraction_result = event::extract(
1330 managed_envelope,
1331 &mut metrics,
1332 event_fully_normalized,
1333 &self.inner.config,
1334 )?;
1335 let mut event = extraction_result.event;
1336
1337 if_processing!(self.inner.config, {
1338 if let Some(inner_event_fully_normalized) =
1339 unreal::process(managed_envelope, &mut event)?
1340 {
1341 event_fully_normalized = inner_event_fully_normalized;
1342 }
1343 #[cfg(sentry)]
1344 if let Some(inner_event_fully_normalized) =
1345 playstation::process(managed_envelope, &mut event, ctx.project_info)?
1346 {
1347 event_fully_normalized = inner_event_fully_normalized;
1348 }
1349 if let Some(inner_event_fully_normalized) =
1350 attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1351 {
1352 event_fully_normalized = inner_event_fully_normalized;
1353 }
1354 });
1355
1356 ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1357 managed_envelope,
1358 &mut event,
1359 ctx.project_info,
1360 ctx.sampling_project_info,
1361 );
1362
1363 let attachments = managed_envelope
1364 .envelope()
1365 .items()
1366 .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1367 processing::utils::event::finalize(
1368 managed_envelope.envelope().headers(),
1369 &mut event,
1370 attachments,
1371 &mut metrics,
1372 ctx.config,
1373 )?;
1374 event_fully_normalized = processing::utils::event::normalize(
1375 managed_envelope.envelope().headers(),
1376 &mut event,
1377 event_fully_normalized,
1378 project_id,
1379 ctx,
1380 &self.inner.geoip_lookup,
1381 )?;
1382 let filter_run =
1383 processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1384 .map_err(|err| {
1385 managed_envelope.reject(Outcome::Filtered(err.clone()));
1386 ProcessingError::EventFiltered(err)
1387 })?;
1388
1389 if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1390 dynamic_sampling::tag_error_with_sampling_decision(
1391 managed_envelope,
1392 &mut event,
1393 ctx.sampling_project_info,
1394 &self.inner.config,
1395 )
1396 .await;
1397 }
1398
1399 event = self
1400 .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1401 .await?;
1402
1403 if event.value().is_some() {
1404 processing::utils::event::scrub(&mut event, ctx.project_info)?;
1405 event::serialize(
1406 managed_envelope,
1407 &mut event,
1408 event_fully_normalized,
1409 EventMetricsExtracted(false),
1410 SpansExtracted(false),
1411 )?;
1412 event::emit_feedback_metrics(managed_envelope.envelope());
1413 }
1414
1415 let attachments = managed_envelope
1416 .envelope_mut()
1417 .items_mut()
1418 .filter(|i| i.ty() == &ItemType::Attachment);
1419 processing::utils::attachments::scrub(attachments, ctx.project_info);
1420
1421 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1422 relay_log::error!(
1423 tags.project = %project_id,
1424 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1425 "ingested event without normalizing"
1426 );
1427 }
1428
1429 Ok(Some(extracted_metrics))
1430 }
1431
1432 #[allow(unused_assignments)]
1434 async fn process_transactions(
1435 &self,
1436 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1437 cogs: &mut Token,
1438 project_id: ProjectId,
1439 mut ctx: processing::Context<'_>,
1440 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1441 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1442 let mut event_metrics_extracted = EventMetricsExtracted(false);
1443 let mut spans_extracted = SpansExtracted(false);
1444 let mut metrics = Metrics::default();
1445 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1446
1447 let extraction_result = event::extract(
1449 managed_envelope,
1450 &mut metrics,
1451 event_fully_normalized,
1452 &self.inner.config,
1453 )?;
1454
1455 if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1457 event_metrics_extracted = inner_event_metrics_extracted;
1458 }
1459 if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1460 spans_extracted = inner_spans_extracted;
1461 };
1462
1463 let mut event = extraction_result.event;
1465
1466 let profile_id = profile::filter(
1467 managed_envelope,
1468 &event,
1469 ctx.config,
1470 project_id,
1471 ctx.project_info,
1472 );
1473 processing::transactions::profile::transfer_id(&mut event, profile_id);
1474 processing::transactions::profile::remove_context_if_rate_limited(
1475 &mut event,
1476 managed_envelope.scoping(),
1477 ctx,
1478 );
1479
1480 ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1481 managed_envelope,
1482 &mut event,
1483 ctx.project_info,
1484 ctx.sampling_project_info,
1485 );
1486
1487 let attachments = managed_envelope
1488 .envelope()
1489 .items()
1490 .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1491 processing::utils::event::finalize(
1492 managed_envelope.envelope().headers(),
1493 &mut event,
1494 attachments,
1495 &mut metrics,
1496 &self.inner.config,
1497 )?;
1498
1499 event_fully_normalized = processing::utils::event::normalize(
1500 managed_envelope.envelope().headers(),
1501 &mut event,
1502 event_fully_normalized,
1503 project_id,
1504 ctx,
1505 &self.inner.geoip_lookup,
1506 )?;
1507
1508 let filter_run =
1509 processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1510 .map_err(|err| {
1511 managed_envelope.reject(Outcome::Filtered(err.clone()));
1512 ProcessingError::EventFiltered(err)
1513 })?;
1514
1515 let run_dynamic_sampling = (matches!(filter_run, FiltersStatus::Ok)
1519 || self.inner.config.processing_enabled())
1520 && matches!(&ctx.project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled());
1521
1522 let sampling_result = match run_dynamic_sampling {
1523 true => {
1524 #[allow(unused_mut)]
1525 let mut reservoir = ReservoirEvaluator::new(Arc::clone(ctx.reservoir_counters));
1526 #[cfg(feature = "processing")]
1527 if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
1528 reservoir.set_redis(managed_envelope.scoping().organization_id, quotas_client);
1529 }
1530 processing::utils::dynamic_sampling::run(
1531 managed_envelope.envelope().headers().dsc(),
1532 event.value(),
1533 &ctx,
1534 Some(&reservoir),
1535 )
1536 .await
1537 }
1538 false => SamplingResult::Pending,
1539 };
1540
1541 relay_statsd::metric!(
1542 counter(RelayCounters::SamplingDecision) += 1,
1543 decision = sampling_result.decision().as_str(),
1544 item = "transaction"
1545 );
1546
1547 #[cfg(feature = "processing")]
1548 let server_sample_rate = sampling_result.sample_rate();
1549
1550 if let Some(outcome) = sampling_result.into_dropped_outcome() {
1551 profile::process(
1554 managed_envelope,
1555 &mut event,
1556 ctx.global_config,
1557 ctx.config,
1558 ctx.project_info,
1559 );
1560 event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1562 &mut event,
1563 &mut extracted_metrics,
1564 ExtractMetricsContext {
1565 dsc: managed_envelope.envelope().dsc(),
1566 project_id,
1567 ctx,
1568 sampling_decision: SamplingDecision::Drop,
1569 metrics_extracted: event_metrics_extracted.0,
1570 spans_extracted: spans_extracted.0,
1571 },
1572 )?;
1573
1574 dynamic_sampling::drop_unsampled_items(
1575 managed_envelope,
1576 event,
1577 outcome,
1578 spans_extracted,
1579 );
1580
1581 event = self
1586 .enforce_quotas(
1587 managed_envelope,
1588 Annotated::empty(),
1589 &mut extracted_metrics,
1590 ctx,
1591 )
1592 .await?;
1593
1594 return Ok(Some(extracted_metrics));
1595 }
1596
1597 let _post_ds = cogs.start_category("post_ds");
1598
1599 processing::utils::event::scrub(&mut event, ctx.project_info)?;
1603
1604 let attachments = managed_envelope
1605 .envelope_mut()
1606 .items_mut()
1607 .filter(|i| i.ty() == &ItemType::Attachment);
1608 processing::utils::attachments::scrub(attachments, ctx.project_info);
1609
1610 if_processing!(self.inner.config, {
1611 let profile_id = profile::process(
1613 managed_envelope,
1614 &mut event,
1615 ctx.global_config,
1616 ctx.config,
1617 ctx.project_info,
1618 );
1619 processing::transactions::profile::transfer_id(&mut event, profile_id);
1620 processing::transactions::profile::scrub_profiler_id(&mut event);
1621
1622 event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1624 &mut event,
1625 &mut extracted_metrics,
1626 ExtractMetricsContext {
1627 dsc: managed_envelope.envelope().dsc(),
1628 project_id,
1629 ctx,
1630 sampling_decision: SamplingDecision::Keep,
1631 metrics_extracted: event_metrics_extracted.0,
1632 spans_extracted: spans_extracted.0,
1633 },
1634 )?;
1635
1636 if let Some(spans) = processing::transactions::spans::extract_from_event(
1637 managed_envelope.envelope().dsc(),
1638 &event,
1639 ctx.global_config,
1640 ctx.config,
1641 server_sample_rate,
1642 event_metrics_extracted,
1643 spans_extracted,
1644 ) {
1645 spans_extracted = SpansExtracted(true);
1646 for item in spans {
1647 match item {
1648 Ok(item) => managed_envelope.envelope_mut().add_item(item),
1649 Err(()) => managed_envelope.track_outcome(
1650 Outcome::Invalid(DiscardReason::InvalidSpan),
1651 DataCategory::SpanIndexed,
1652 1,
1653 ),
1654 }
1656 }
1657 }
1658 });
1659
1660 event = self
1661 .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1662 .await?;
1663
1664 if event.value().is_some() {
1666 event::serialize(
1667 managed_envelope,
1668 &mut event,
1669 event_fully_normalized,
1670 event_metrics_extracted,
1671 spans_extracted,
1672 )?;
1673 }
1674
1675 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1676 relay_log::error!(
1677 tags.project = %project_id,
1678 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1679 "ingested event without normalizing"
1680 );
1681 };
1682
1683 Ok(Some(extracted_metrics))
1684 }
1685
1686 async fn process_standalone(
1688 &self,
1689 managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1690 project_id: ProjectId,
1691 ctx: processing::Context<'_>,
1692 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1693 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1694
1695 standalone::process(managed_envelope);
1696
1697 profile::filter(
1698 managed_envelope,
1699 &Annotated::empty(),
1700 ctx.config,
1701 project_id,
1702 ctx.project_info,
1703 );
1704
1705 self.enforce_quotas(
1706 managed_envelope,
1707 Annotated::empty(),
1708 &mut extracted_metrics,
1709 ctx,
1710 )
1711 .await?;
1712
1713 report::process_user_reports(managed_envelope);
1714 let attachments = managed_envelope
1715 .envelope_mut()
1716 .items_mut()
1717 .filter(|i| i.ty() == &ItemType::Attachment);
1718 processing::utils::attachments::scrub(attachments, ctx.project_info);
1719
1720 Ok(Some(extracted_metrics))
1721 }
1722
1723 async fn process_client_reports(
1725 &self,
1726 managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1727 ctx: processing::Context<'_>,
1728 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1729 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1730
1731 self.enforce_quotas(
1732 managed_envelope,
1733 Annotated::empty(),
1734 &mut extracted_metrics,
1735 ctx,
1736 )
1737 .await?;
1738
1739 report::process_client_reports(
1740 managed_envelope,
1741 ctx.config,
1742 ctx.project_info,
1743 self.inner.addrs.outcome_aggregator.clone(),
1744 );
1745
1746 Ok(Some(extracted_metrics))
1747 }
1748
1749 async fn process_replays(
1751 &self,
1752 managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1753 ctx: processing::Context<'_>,
1754 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1755 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1756
1757 replay::process(
1758 managed_envelope,
1759 ctx.global_config,
1760 ctx.config,
1761 ctx.project_info,
1762 &self.inner.geoip_lookup,
1763 )?;
1764
1765 self.enforce_quotas(
1766 managed_envelope,
1767 Annotated::empty(),
1768 &mut extracted_metrics,
1769 ctx,
1770 )
1771 .await?;
1772
1773 Ok(Some(extracted_metrics))
1774 }
1775
1776 async fn process_nel(
1777 &self,
1778 mut managed_envelope: ManagedEnvelope,
1779 ctx: processing::Context<'_>,
1780 ) -> Result<ProcessingResult, ProcessingError> {
1781 nel::convert_to_logs(&mut managed_envelope);
1782 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1783 .await
1784 }
1785
1786 async fn process_with_processor<P: processing::Processor>(
1787 &self,
1788 processor: &P,
1789 mut managed_envelope: ManagedEnvelope,
1790 ctx: processing::Context<'_>,
1791 ) -> Result<ProcessingResult, ProcessingError>
1792 where
1793 Outputs: From<P::Output>,
1794 {
1795 let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1796 debug_assert!(
1797 false,
1798 "there must be work for the {} processor",
1799 std::any::type_name::<P>(),
1800 );
1801 return Err(ProcessingError::ProcessingGroupMismatch);
1802 };
1803
1804 managed_envelope.update();
1805 match managed_envelope.envelope().is_empty() {
1806 true => managed_envelope.accept(),
1807 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1808 }
1809
1810 processor
1811 .process(work, ctx)
1812 .await
1813 .map_err(|err| {
1814 relay_log::debug!(
1815 error = &err as &dyn std::error::Error,
1816 "processing pipeline failed"
1817 );
1818 ProcessingError::ProcessingFailure
1819 })
1820 .map(|o| o.map(Into::into))
1821 .map(ProcessingResult::Output)
1822 }
1823
1824 async fn process_standalone_spans(
1828 &self,
1829 managed_envelope: &mut TypedEnvelope<SpanGroup>,
1830 _project_id: ProjectId,
1831 ctx: processing::Context<'_>,
1832 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1833 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1834
1835 span::filter(managed_envelope, ctx.config, ctx.project_info);
1836 span::convert_otel_traces_data(managed_envelope);
1837
1838 if_processing!(self.inner.config, {
1839 span::process(
1840 managed_envelope,
1841 &mut Annotated::empty(),
1842 &mut extracted_metrics,
1843 _project_id,
1844 ctx,
1845 &self.inner.geoip_lookup,
1846 )
1847 .await;
1848 });
1849
1850 self.enforce_quotas(
1851 managed_envelope,
1852 Annotated::empty(),
1853 &mut extracted_metrics,
1854 ctx,
1855 )
1856 .await?;
1857
1858 Ok(Some(extracted_metrics))
1859 }
1860
1861 async fn process_envelope(
1862 &self,
1863 cogs: &mut Token,
1864 project_id: ProjectId,
1865 message: ProcessEnvelopeGrouped<'_>,
1866 ) -> Result<ProcessingResult, ProcessingError> {
1867 let ProcessEnvelopeGrouped {
1868 group,
1869 envelope: mut managed_envelope,
1870 ctx,
1871 } = message;
1872
1873 if let Some(sampling_state) = ctx.sampling_project_info {
1875 managed_envelope
1878 .envelope_mut()
1879 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1880 }
1881
1882 if let Some(retention) = ctx.project_info.config.event_retention {
1885 managed_envelope.envelope_mut().set_retention(retention);
1886 }
1887
1888 if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1891 managed_envelope
1892 .envelope_mut()
1893 .set_downsampled_retention(retention);
1894 }
1895
1896 managed_envelope
1901 .envelope_mut()
1902 .meta_mut()
1903 .set_project_id(project_id);
1904
1905 macro_rules! run {
1906 ($fn_name:ident $(, $args:expr)*) => {
1907 async {
1908 let mut managed_envelope = (managed_envelope, group).try_into()?;
1909 match self.$fn_name(&mut managed_envelope, $($args),*).await {
1910 Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1911 managed_envelope: managed_envelope.into_processed(),
1912 extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1913 }),
1914 Err(error) => {
1915 relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1916 if let Some(outcome) = error.to_outcome() {
1917 managed_envelope.reject(outcome);
1918 }
1919
1920 return Err(error);
1921 }
1922 }
1923 }.await
1924 };
1925 }
1926
1927 relay_log::trace!("Processing {group} group", group = group.variant());
1928
1929 match group {
1930 ProcessingGroup::Error => run!(process_errors, project_id, ctx),
1931 ProcessingGroup::Transaction => {
1932 run!(process_transactions, cogs, project_id, ctx)
1933 }
1934 ProcessingGroup::Session => {
1935 self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1936 .await
1937 }
1938 ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx),
1939 ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
1940 ProcessingGroup::Replay => {
1941 run!(process_replays, ctx)
1942 }
1943 ProcessingGroup::CheckIn => {
1944 self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1945 .await
1946 }
1947 ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1948 ProcessingGroup::Log => {
1949 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1950 .await
1951 }
1952 ProcessingGroup::TraceMetric => {
1953 self.process_with_processor(
1954 &self.inner.processing.trace_metrics,
1955 managed_envelope,
1956 ctx,
1957 )
1958 .await
1959 }
1960 ProcessingGroup::SpanV2 => {
1961 self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1962 .await
1963 }
1964 ProcessingGroup::TraceAttachment => {
1965 self.process_with_processor(
1966 &self.inner.processing.trace_attachments,
1967 managed_envelope,
1968 ctx,
1969 )
1970 .await
1971 }
1972 ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1973 ProcessingGroup::ProfileChunk => {
1974 self.process_with_processor(
1975 &self.inner.processing.profile_chunks,
1976 managed_envelope,
1977 ctx,
1978 )
1979 .await
1980 }
1981 ProcessingGroup::Metrics => {
1983 if self.inner.config.relay_mode() != RelayMode::Proxy {
1986 relay_log::error!(
1987 tags.project = %project_id,
1988 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1989 "received metrics in the process_state"
1990 );
1991 }
1992
1993 Ok(ProcessingResult::no_metrics(
1994 managed_envelope.into_processed(),
1995 ))
1996 }
1997 ProcessingGroup::Ungrouped => {
1999 relay_log::error!(
2000 tags.project = %project_id,
2001 items = ?managed_envelope.envelope().items().next().map(Item::ty),
2002 "could not identify the processing group based on the envelope's items"
2003 );
2004
2005 Ok(ProcessingResult::no_metrics(
2006 managed_envelope.into_processed(),
2007 ))
2008 }
2009 ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
2013 managed_envelope.into_processed(),
2014 )),
2015 }
2016 }
2017
2018 async fn process<'a>(
2024 &self,
2025 cogs: &mut Token,
2026 mut message: ProcessEnvelopeGrouped<'a>,
2027 ) -> Result<Option<Submit<'a>>, ProcessingError> {
2028 let ProcessEnvelopeGrouped {
2029 ref mut envelope,
2030 ctx,
2031 ..
2032 } = message;
2033
2034 let Some(project_id) = ctx
2041 .project_info
2042 .project_id
2043 .or_else(|| envelope.envelope().meta().project_id())
2044 else {
2045 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2046 return Err(ProcessingError::MissingProjectId);
2047 };
2048
2049 let client = envelope.envelope().meta().client().map(str::to_owned);
2050 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2051 let project_key = envelope.envelope().meta().public_key();
2052 let sampling_key = envelope
2056 .envelope()
2057 .sampling_key()
2058 .filter(|_| ctx.sampling_project_info.is_some());
2059
2060 relay_log::configure_scope(|scope| {
2063 scope.set_tag("project", project_id);
2064 if let Some(client) = client {
2065 scope.set_tag("sdk", client);
2066 }
2067 if let Some(user_agent) = user_agent {
2068 scope.set_extra("user_agent", user_agent.into());
2069 }
2070 });
2071
2072 let result = match self.process_envelope(cogs, project_id, message).await {
2073 Ok(ProcessingResult::Envelope {
2074 mut managed_envelope,
2075 extracted_metrics,
2076 }) => {
2077 managed_envelope.update();
2080
2081 let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2082 send_metrics(
2083 extracted_metrics.metrics,
2084 project_key,
2085 sampling_key,
2086 &self.inner.addrs.aggregator,
2087 );
2088
2089 let envelope_response = if managed_envelope.envelope().is_empty() {
2090 if !has_metrics {
2091 managed_envelope.reject(Outcome::RateLimited(None));
2093 } else {
2094 managed_envelope.accept();
2095 }
2096
2097 None
2098 } else {
2099 Some(managed_envelope)
2100 };
2101
2102 Ok(envelope_response.map(Submit::Envelope))
2103 }
2104 Ok(ProcessingResult::Output(Output { main, metrics })) => {
2105 if let Some(metrics) = metrics {
2106 metrics.accept(|metrics| {
2107 send_metrics(
2108 metrics,
2109 project_key,
2110 sampling_key,
2111 &self.inner.addrs.aggregator,
2112 );
2113 });
2114 }
2115
2116 let ctx = ctx.to_forward();
2117 Ok(main.map(|output| Submit::Output { output, ctx }))
2118 }
2119 Err(err) => Err(err),
2120 };
2121
2122 relay_log::configure_scope(|scope| {
2123 scope.remove_tag("project");
2124 scope.remove_tag("sdk");
2125 scope.remove_tag("user_agent");
2126 });
2127
2128 result
2129 }
2130
2131 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2132 let project_key = message.envelope.envelope().meta().public_key();
2133 let wait_time = message.envelope.age();
2134 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2135
2136 cogs.cancel();
2139
2140 let scoping = message.envelope.scoping();
2141 for (group, envelope) in ProcessingGroup::split_envelope(
2142 *message.envelope.into_envelope(),
2143 &message.project_info,
2144 ) {
2145 let mut cogs = self
2146 .inner
2147 .cogs
2148 .timed(ResourceId::Relay, AppFeature::from(group));
2149
2150 let mut envelope =
2151 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2152 envelope.scope(scoping);
2153
2154 let global_config = self.inner.global_config.current();
2155
2156 let ctx = processing::Context {
2157 config: &self.inner.config,
2158 global_config: &global_config,
2159 project_info: &message.project_info,
2160 sampling_project_info: message.sampling_project_info.as_deref(),
2161 rate_limits: &message.rate_limits,
2162 reservoir_counters: &message.reservoir_counters,
2163 };
2164
2165 let message = ProcessEnvelopeGrouped {
2166 group,
2167 envelope,
2168 ctx,
2169 };
2170
2171 let result = metric!(
2172 timer(RelayTimers::EnvelopeProcessingTime),
2173 group = group.variant(),
2174 { self.process(&mut cogs, message).await }
2175 );
2176
2177 match result {
2178 Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2179 Ok(None) => {}
2180 Err(error) if error.is_unexpected() => {
2181 relay_log::error!(
2182 tags.project_key = %project_key,
2183 error = &error as &dyn Error,
2184 "error processing envelope"
2185 )
2186 }
2187 Err(error) => {
2188 relay_log::debug!(
2189 tags.project_key = %project_key,
2190 error = &error as &dyn Error,
2191 "error processing envelope"
2192 )
2193 }
2194 }
2195 }
2196 }
2197
2198 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2199 let ProcessMetrics {
2200 data,
2201 project_key,
2202 received_at,
2203 sent_at,
2204 source,
2205 } = message;
2206
2207 let received_timestamp =
2208 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2209
2210 let mut buckets = data.into_buckets(received_timestamp);
2211 if buckets.is_empty() {
2212 return;
2213 };
2214 cogs.update(relay_metrics::cogs::BySize(&buckets));
2215
2216 let clock_drift_processor =
2217 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2218
2219 buckets.retain_mut(|bucket| {
2220 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2221 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2222 return false;
2223 }
2224
2225 if !self::metrics::is_valid_namespace(bucket) {
2226 return false;
2227 }
2228
2229 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2230
2231 if !matches!(source, BucketSource::Internal) {
2232 bucket.metadata = BucketMetadata::new(received_timestamp);
2233 }
2234
2235 true
2236 });
2237
2238 let project = self.inner.project_cache.get(project_key);
2239
2240 let buckets = match project.state() {
2243 ProjectState::Enabled(project_info) => {
2244 let rate_limits = project.rate_limits().current_limits();
2245 self.check_buckets(project_key, project_info, &rate_limits, buckets)
2246 }
2247 _ => buckets,
2248 };
2249
2250 relay_log::trace!("merging metric buckets into the aggregator");
2251 self.inner
2252 .addrs
2253 .aggregator
2254 .send(MergeBuckets::new(project_key, buckets));
2255 }
2256
2257 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2258 let ProcessBatchedMetrics {
2259 payload,
2260 source,
2261 received_at,
2262 sent_at,
2263 } = message;
2264
2265 #[derive(serde::Deserialize)]
2266 struct Wrapper {
2267 buckets: HashMap<ProjectKey, Vec<Bucket>>,
2268 }
2269
2270 let buckets = match serde_json::from_slice(&payload) {
2271 Ok(Wrapper { buckets }) => buckets,
2272 Err(error) => {
2273 relay_log::debug!(
2274 error = &error as &dyn Error,
2275 "failed to parse batched metrics",
2276 );
2277 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2278 return;
2279 }
2280 };
2281
2282 for (project_key, buckets) in buckets {
2283 self.handle_process_metrics(
2284 cogs,
2285 ProcessMetrics {
2286 data: MetricData::Parsed(buckets),
2287 project_key,
2288 source,
2289 received_at,
2290 sent_at,
2291 },
2292 )
2293 }
2294 }
2295
2296 fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2297 let _submit = cogs.start_category("submit");
2298
2299 #[cfg(feature = "processing")]
2300 if self.inner.config.processing_enabled()
2301 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2302 {
2303 use crate::processing::StoreHandle;
2304
2305 let upload = self.inner.addrs.upload.as_ref();
2306 match submit {
2307 Submit::Envelope(envelope) => {
2308 let envelope_has_attachments = envelope
2309 .envelope()
2310 .items()
2311 .any(|item| *item.ty() == ItemType::Attachment);
2312 let use_objectstore = || {
2314 let options = &self.inner.global_config.current().options;
2315 utils::sample(options.objectstore_attachments_sample_rate).is_keep()
2316 };
2317
2318 if let Some(upload) = &self.inner.addrs.upload
2319 && envelope_has_attachments
2320 && use_objectstore()
2321 {
2322 upload.send(StoreEnvelope { envelope })
2324 } else {
2325 store_forwarder.send(StoreEnvelope { envelope })
2326 }
2327 }
2328 Submit::Output { output, ctx } => output
2329 .forward_store(StoreHandle::new(store_forwarder, upload), ctx)
2330 .unwrap_or_else(|err| err.into_inner()),
2331 }
2332 return;
2333 }
2334
2335 let mut envelope = match submit {
2336 Submit::Envelope(envelope) => envelope,
2337 Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2338 Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2339 Err(_) => {
2340 relay_log::error!("failed to serialize output to an envelope");
2341 return;
2342 }
2343 },
2344 };
2345
2346 if envelope.envelope_mut().is_empty() {
2347 envelope.accept();
2348 return;
2349 }
2350
2351 envelope.envelope_mut().set_sent_at(Utc::now());
2357
2358 relay_log::trace!("sending envelope to sentry endpoint");
2359 let http_encoding = self.inner.config.http_encoding();
2360 let result = envelope.envelope().to_vec().and_then(|v| {
2361 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2362 });
2363
2364 match result {
2365 Ok(body) => {
2366 self.inner
2367 .addrs
2368 .upstream_relay
2369 .send(SendRequest(SendEnvelope {
2370 envelope,
2371 body,
2372 http_encoding,
2373 project_cache: self.inner.project_cache.clone(),
2374 }));
2375 }
2376 Err(error) => {
2377 relay_log::error!(
2380 error = &error as &dyn Error,
2381 tags.project_key = %envelope.scoping().project_key,
2382 "failed to serialize envelope payload"
2383 );
2384
2385 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2386 }
2387 }
2388 }
2389
2390 fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2391 let SubmitClientReports {
2392 client_reports,
2393 scoping,
2394 } = message;
2395
2396 let upstream = self.inner.config.upstream_descriptor();
2397 let dsn = PartialDsn::outbound(&scoping, upstream);
2398
2399 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2400 for client_report in client_reports {
2401 match client_report.serialize() {
2402 Ok(payload) => {
2403 let mut item = Item::new(ItemType::ClientReport);
2404 item.set_payload(ContentType::Json, payload);
2405 envelope.add_item(item);
2406 }
2407 Err(error) => {
2408 relay_log::error!(
2409 error = &error as &dyn std::error::Error,
2410 "failed to serialize client report"
2411 );
2412 }
2413 }
2414 }
2415
2416 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2417 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2418 }
2419
2420 fn check_buckets(
2421 &self,
2422 project_key: ProjectKey,
2423 project_info: &ProjectInfo,
2424 rate_limits: &RateLimits,
2425 buckets: Vec<Bucket>,
2426 ) -> Vec<Bucket> {
2427 let Some(scoping) = project_info.scoping(project_key) else {
2428 relay_log::error!(
2429 tags.project_key = project_key.as_str(),
2430 "there is no scoping: dropping {} buckets",
2431 buckets.len(),
2432 );
2433 return Vec::new();
2434 };
2435
2436 let mut buckets = self::metrics::apply_project_info(
2437 buckets,
2438 &self.inner.metric_outcomes,
2439 project_info,
2440 scoping,
2441 );
2442
2443 let namespaces: BTreeSet<MetricNamespace> = buckets
2444 .iter()
2445 .filter_map(|bucket| bucket.name.try_namespace())
2446 .collect();
2447
2448 for namespace in namespaces {
2449 let limits = rate_limits.check_with_quotas(
2450 project_info.get_quotas(),
2451 scoping.item(DataCategory::MetricBucket),
2452 );
2453
2454 if limits.is_limited() {
2455 let rejected;
2456 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2457 bucket.name.try_namespace() == Some(namespace)
2458 });
2459
2460 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2461 self.inner.metric_outcomes.track(
2462 scoping,
2463 &rejected,
2464 Outcome::RateLimited(reason_code),
2465 );
2466 }
2467 }
2468
2469 let quotas = project_info.config.quotas.clone();
2470 match MetricsLimiter::create(buckets, quotas, scoping) {
2471 Ok(mut bucket_limiter) => {
2472 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2473 bucket_limiter.into_buckets()
2474 }
2475 Err(buckets) => buckets,
2476 }
2477 }
2478
2479 #[cfg(feature = "processing")]
2480 async fn rate_limit_buckets(
2481 &self,
2482 scoping: Scoping,
2483 project_info: &ProjectInfo,
2484 mut buckets: Vec<Bucket>,
2485 ) -> Vec<Bucket> {
2486 let Some(rate_limiter) = &self.inner.rate_limiter else {
2487 return buckets;
2488 };
2489
2490 let global_config = self.inner.global_config.current();
2491 let namespaces = buckets
2492 .iter()
2493 .filter_map(|bucket| bucket.name.try_namespace())
2494 .counts();
2495
2496 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2497
2498 for (namespace, quantity) in namespaces {
2499 let item_scoping = scoping.metric_bucket(namespace);
2500
2501 let limits = match rate_limiter
2502 .is_rate_limited(quotas, item_scoping, quantity, false)
2503 .await
2504 {
2505 Ok(limits) => limits,
2506 Err(err) => {
2507 relay_log::error!(
2508 error = &err as &dyn std::error::Error,
2509 "failed to check redis rate limits"
2510 );
2511 break;
2512 }
2513 };
2514
2515 if limits.is_limited() {
2516 let rejected;
2517 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2518 bucket.name.try_namespace() == Some(namespace)
2519 });
2520
2521 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2522 self.inner.metric_outcomes.track(
2523 scoping,
2524 &rejected,
2525 Outcome::RateLimited(reason_code),
2526 );
2527
2528 self.inner
2529 .project_cache
2530 .get(item_scoping.scoping.project_key)
2531 .rate_limits()
2532 .merge(limits);
2533 }
2534 }
2535
2536 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2537 Err(buckets) => buckets,
2538 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2539 }
2540 }
2541
2542 #[cfg(feature = "processing")]
2544 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2545 relay_log::trace!("handle_rate_limit_buckets");
2546
2547 let scoping = *bucket_limiter.scoping();
2548
2549 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2550 let global_config = self.inner.global_config.current();
2551 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2552
2553 let over_accept_once = true;
2556 let mut rate_limits = RateLimits::new();
2557
2558 for category in [DataCategory::Transaction, DataCategory::Span] {
2559 let count = bucket_limiter.count(category);
2560
2561 let timer = Instant::now();
2562 let mut is_limited = false;
2563
2564 if let Some(count) = count {
2565 match rate_limiter
2566 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2567 .await
2568 {
2569 Ok(limits) => {
2570 is_limited = limits.is_limited();
2571 rate_limits.merge(limits)
2572 }
2573 Err(e) => relay_log::error!(error = &e as &dyn Error),
2574 }
2575 }
2576
2577 relay_statsd::metric!(
2578 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2579 category = category.name(),
2580 limited = if is_limited { "true" } else { "false" },
2581 count = match count {
2582 None => "none",
2583 Some(0) => "0",
2584 Some(1) => "1",
2585 Some(1..=10) => "10",
2586 Some(1..=25) => "25",
2587 Some(1..=50) => "50",
2588 Some(51..=100) => "100",
2589 Some(101..=500) => "500",
2590 _ => "> 500",
2591 },
2592 );
2593 }
2594
2595 if rate_limits.is_limited() {
2596 let was_enforced =
2597 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2598
2599 if was_enforced {
2600 self.inner
2602 .project_cache
2603 .get(scoping.project_key)
2604 .rate_limits()
2605 .merge(rate_limits);
2606 }
2607 }
2608 }
2609
2610 bucket_limiter.into_buckets()
2611 }
2612
2613 #[cfg(feature = "processing")]
2615 async fn cardinality_limit_buckets(
2616 &self,
2617 scoping: Scoping,
2618 limits: &[CardinalityLimit],
2619 buckets: Vec<Bucket>,
2620 ) -> Vec<Bucket> {
2621 let global_config = self.inner.global_config.current();
2622 let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2623
2624 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2625 return buckets;
2626 }
2627
2628 let Some(ref limiter) = self.inner.cardinality_limiter else {
2629 return buckets;
2630 };
2631
2632 let scope = relay_cardinality::Scoping {
2633 organization_id: scoping.organization_id,
2634 project_id: scoping.project_id,
2635 };
2636
2637 let limits = match limiter
2638 .check_cardinality_limits(scope, limits, buckets)
2639 .await
2640 {
2641 Ok(limits) => limits,
2642 Err((buckets, error)) => {
2643 relay_log::error!(
2644 error = &error as &dyn std::error::Error,
2645 "cardinality limiter failed"
2646 );
2647 return buckets;
2648 }
2649 };
2650
2651 let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2652 if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2653 for limit in limits.exceeded_limits() {
2654 relay_log::with_scope(
2655 |scope| {
2656 scope.set_user(Some(relay_log::sentry::User {
2658 id: Some(scoping.organization_id.to_string()),
2659 ..Default::default()
2660 }));
2661 },
2662 || {
2663 relay_log::error!(
2664 tags.organization_id = scoping.organization_id.value(),
2665 tags.limit_id = limit.id,
2666 tags.passive = limit.passive,
2667 "Cardinality Limit"
2668 );
2669 },
2670 );
2671 }
2672 }
2673
2674 for (limit, reports) in limits.cardinality_reports() {
2675 for report in reports {
2676 self.inner
2677 .metric_outcomes
2678 .cardinality(scoping, limit, report);
2679 }
2680 }
2681
2682 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2683 return limits.into_source();
2684 }
2685
2686 let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2687
2688 for (bucket, exceeded) in rejected {
2689 self.inner.metric_outcomes.track(
2690 scoping,
2691 &[bucket],
2692 Outcome::CardinalityLimited(exceeded.id.clone()),
2693 );
2694 }
2695 accepted
2696 }
2697
2698 #[cfg(feature = "processing")]
2705 async fn encode_metrics_processing(
2706 &self,
2707 message: FlushBuckets,
2708 store_forwarder: &Addr<Store>,
2709 ) {
2710 use crate::constants::DEFAULT_EVENT_RETENTION;
2711 use crate::services::store::StoreMetrics;
2712
2713 for ProjectBuckets {
2714 buckets,
2715 scoping,
2716 project_info,
2717 ..
2718 } in message.buckets.into_values()
2719 {
2720 let buckets = self
2721 .rate_limit_buckets(scoping, &project_info, buckets)
2722 .await;
2723
2724 let limits = project_info.get_cardinality_limits();
2725 let buckets = self
2726 .cardinality_limit_buckets(scoping, limits, buckets)
2727 .await;
2728
2729 if buckets.is_empty() {
2730 continue;
2731 }
2732
2733 let retention = project_info
2734 .config
2735 .event_retention
2736 .unwrap_or(DEFAULT_EVENT_RETENTION);
2737
2738 store_forwarder.send(StoreMetrics {
2741 buckets,
2742 scoping,
2743 retention,
2744 });
2745 }
2746 }
2747
2748 fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2760 let FlushBuckets {
2761 partition_key,
2762 buckets,
2763 } = message;
2764
2765 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2766 let upstream = self.inner.config.upstream_descriptor();
2767
2768 for ProjectBuckets {
2769 buckets, scoping, ..
2770 } in buckets.values()
2771 {
2772 let dsn = PartialDsn::outbound(scoping, upstream);
2773
2774 relay_statsd::metric!(
2775 distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2776 );
2777
2778 let mut num_batches = 0;
2779 for batch in BucketsView::from(buckets).by_size(batch_size) {
2780 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2781
2782 let mut item = Item::new(ItemType::MetricBuckets);
2783 item.set_source_quantities(crate::metrics::extract_quantities(batch));
2784 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2785 envelope.add_item(item);
2786
2787 let mut envelope =
2788 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2789 envelope
2790 .set_partition_key(Some(partition_key))
2791 .scope(*scoping);
2792
2793 relay_statsd::metric!(
2794 distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2795 );
2796
2797 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2798 num_batches += 1;
2799 }
2800
2801 relay_statsd::metric!(
2802 distribution(RelayDistributions::BatchesPerPartition) = num_batches
2803 );
2804 }
2805 }
2806
2807 fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2809 if partition.is_empty() {
2810 return;
2811 }
2812
2813 let (unencoded, project_info) = partition.take();
2814 let http_encoding = self.inner.config.http_encoding();
2815 let encoded = match encode_payload(&unencoded, http_encoding) {
2816 Ok(payload) => payload,
2817 Err(error) => {
2818 let error = &error as &dyn std::error::Error;
2819 relay_log::error!(error, "failed to encode metrics payload");
2820 return;
2821 }
2822 };
2823
2824 let request = SendMetricsRequest {
2825 partition_key: partition_key.to_string(),
2826 unencoded,
2827 encoded,
2828 project_info,
2829 http_encoding,
2830 metric_outcomes: self.inner.metric_outcomes.clone(),
2831 };
2832
2833 self.inner.addrs.upstream_relay.send(SendRequest(request));
2834 }
2835
2836 fn encode_metrics_global(&self, message: FlushBuckets) {
2851 let FlushBuckets {
2852 partition_key,
2853 buckets,
2854 } = message;
2855
2856 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2857 let mut partition = Partition::new(batch_size);
2858 let mut partition_splits = 0;
2859
2860 for ProjectBuckets {
2861 buckets, scoping, ..
2862 } in buckets.values()
2863 {
2864 for bucket in buckets {
2865 let mut remaining = Some(BucketView::new(bucket));
2866
2867 while let Some(bucket) = remaining.take() {
2868 if let Some(next) = partition.insert(bucket, *scoping) {
2869 self.send_global_partition(partition_key, &mut partition);
2873 remaining = Some(next);
2874 partition_splits += 1;
2875 }
2876 }
2877 }
2878 }
2879
2880 if partition_splits > 0 {
2881 metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2882 }
2883
2884 self.send_global_partition(partition_key, &mut partition);
2885 }
2886
2887 async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2888 for (project_key, pb) in message.buckets.iter_mut() {
2889 let buckets = std::mem::take(&mut pb.buckets);
2890 pb.buckets =
2891 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2892 }
2893
2894 #[cfg(feature = "processing")]
2895 if self.inner.config.processing_enabled()
2896 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2897 {
2898 return self
2899 .encode_metrics_processing(message, store_forwarder)
2900 .await;
2901 }
2902
2903 if self.inner.config.http_global_metrics() {
2904 self.encode_metrics_global(message)
2905 } else {
2906 self.encode_metrics_envelope(cogs, message)
2907 }
2908 }
2909
2910 #[cfg(all(test, feature = "processing"))]
2911 fn redis_rate_limiter_enabled(&self) -> bool {
2912 self.inner.rate_limiter.is_some()
2913 }
2914
2915 async fn handle_message(&self, message: EnvelopeProcessor) {
2916 let ty = message.variant();
2917 let feature_weights = self.feature_weights(&message);
2918
2919 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2920 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2921
2922 match message {
2923 EnvelopeProcessor::ProcessEnvelope(m) => {
2924 self.handle_process_envelope(&mut cogs, *m).await
2925 }
2926 EnvelopeProcessor::ProcessProjectMetrics(m) => {
2927 self.handle_process_metrics(&mut cogs, *m)
2928 }
2929 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2930 self.handle_process_batched_metrics(&mut cogs, *m)
2931 }
2932 EnvelopeProcessor::FlushBuckets(m) => {
2933 self.handle_flush_buckets(&mut cogs, *m).await
2934 }
2935 EnvelopeProcessor::SubmitClientReports(m) => {
2936 self.handle_submit_client_reports(&mut cogs, *m)
2937 }
2938 }
2939 });
2940 }
2941
2942 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2943 match message {
2944 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2946 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2947 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2948 EnvelopeProcessor::FlushBuckets(v) => v
2949 .buckets
2950 .values()
2951 .map(|s| {
2952 if self.inner.config.processing_enabled() {
2953 relay_metrics::cogs::ByCount(&s.buckets).into()
2956 } else {
2957 relay_metrics::cogs::BySize(&s.buckets).into()
2958 }
2959 })
2960 .fold(FeatureWeights::none(), FeatureWeights::merge),
2961 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2962 }
2963 }
2964}
2965
2966impl Service for EnvelopeProcessorService {
2967 type Interface = EnvelopeProcessor;
2968
2969 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2970 while let Some(message) = rx.recv().await {
2971 let service = self.clone();
2972 self.inner
2973 .pool
2974 .spawn_async(
2975 async move {
2976 service.handle_message(message).await;
2977 }
2978 .boxed(),
2979 )
2980 .await;
2981 }
2982 }
2983}
2984
2985struct EnforcementResult {
2990 event: Annotated<Event>,
2991 #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2992 rate_limits: RateLimits,
2993}
2994
2995impl EnforcementResult {
2996 pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
2998 Self { event, rate_limits }
2999 }
3000}
3001
3002#[derive(Clone)]
3003enum RateLimiter {
3004 Cached,
3005 #[cfg(feature = "processing")]
3006 Consistent(Arc<RedisRateLimiter>),
3007}
3008
3009impl RateLimiter {
3010 async fn enforce<Group>(
3011 &self,
3012 managed_envelope: &mut TypedEnvelope<Group>,
3013 event: Annotated<Event>,
3014 _extracted_metrics: &mut ProcessingExtractedMetrics,
3015 ctx: processing::Context<'_>,
3016 ) -> Result<EnforcementResult, ProcessingError> {
3017 if managed_envelope.envelope().is_empty() && event.value().is_none() {
3018 return Ok(EnforcementResult::new(event, RateLimits::default()));
3019 }
3020
3021 let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
3022 if quotas.is_empty() {
3023 return Ok(EnforcementResult::new(event, RateLimits::default()));
3024 }
3025
3026 let event_category = event_category(&event);
3027
3028 let this = self.clone();
3034 let mut envelope_limiter =
3035 EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
3036 let this = this.clone();
3037
3038 async move {
3039 match this {
3040 #[cfg(feature = "processing")]
3041 RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
3042 rate_limiter
3043 .is_rate_limited(quotas, item_scope, _quantity, false)
3044 .await?,
3045 ),
3046 _ => Ok::<_, ProcessingError>(
3047 ctx.rate_limits.check_with_quotas(quotas, item_scope),
3048 ),
3049 }
3050 }
3051 });
3052
3053 if let Some(category) = event_category {
3056 envelope_limiter.assume_event(category);
3057 }
3058
3059 let scoping = managed_envelope.scoping();
3060 let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
3061 envelope_limiter
3062 .compute(managed_envelope.envelope_mut(), &scoping)
3063 .await
3064 })?;
3065 let event_active = enforcement.is_event_active();
3066
3067 #[cfg(feature = "processing")]
3071 _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3072 enforcement.apply_with_outcomes(managed_envelope);
3073
3074 if event_active {
3075 debug_assert!(managed_envelope.envelope().is_empty());
3076 return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3077 }
3078
3079 Ok(EnforcementResult::new(event, rate_limits))
3080 }
3081
3082 fn name(&self) -> &'static str {
3083 match self {
3084 Self::Cached => "cached",
3085 #[cfg(feature = "processing")]
3086 Self::Consistent(_) => "consistent",
3087 }
3088 }
3089}
3090
3091pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3092 let envelope_body: Vec<u8> = match http_encoding {
3093 HttpEncoding::Identity => return Ok(body.clone()),
3094 HttpEncoding::Deflate => {
3095 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3096 encoder.write_all(body.as_ref())?;
3097 encoder.finish()?
3098 }
3099 HttpEncoding::Gzip => {
3100 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3101 encoder.write_all(body.as_ref())?;
3102 encoder.finish()?
3103 }
3104 HttpEncoding::Br => {
3105 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3107 encoder.write_all(body.as_ref())?;
3108 encoder.into_inner()
3109 }
3110 HttpEncoding::Zstd => {
3111 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3114 encoder.write_all(body.as_ref())?;
3115 encoder.finish()?
3116 }
3117 };
3118
3119 Ok(envelope_body.into())
3120}
3121
3122#[derive(Debug)]
3124pub struct SendEnvelope {
3125 pub envelope: TypedEnvelope<Processed>,
3126 pub body: Bytes,
3127 pub http_encoding: HttpEncoding,
3128 pub project_cache: ProjectCacheHandle,
3129}
3130
3131impl UpstreamRequest for SendEnvelope {
3132 fn method(&self) -> reqwest::Method {
3133 reqwest::Method::POST
3134 }
3135
3136 fn path(&self) -> Cow<'_, str> {
3137 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3138 }
3139
3140 fn route(&self) -> &'static str {
3141 "envelope"
3142 }
3143
3144 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3145 let envelope_body = self.body.clone();
3146 metric!(
3147 distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
3148 );
3149
3150 let meta = &self.envelope.meta();
3151 let shard = self.envelope.partition_key().map(|p| p.to_string());
3152 builder
3153 .content_encoding(self.http_encoding)
3154 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3155 .header_opt("User-Agent", meta.user_agent())
3156 .header("X-Sentry-Auth", meta.auth_header())
3157 .header("X-Forwarded-For", meta.forwarded_for())
3158 .header("Content-Type", envelope::CONTENT_TYPE)
3159 .header_opt("X-Sentry-Relay-Shard", shard)
3160 .body(envelope_body);
3161
3162 Ok(())
3163 }
3164
3165 fn sign(&mut self) -> Option<Sign> {
3166 Some(Sign::Optional(SignatureType::RequestSign))
3167 }
3168
3169 fn respond(
3170 self: Box<Self>,
3171 result: Result<http::Response, UpstreamRequestError>,
3172 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3173 Box::pin(async move {
3174 let result = match result {
3175 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3176 Err(error) => Err(error),
3177 };
3178
3179 match result {
3180 Ok(()) => self.envelope.accept(),
3181 Err(error) if error.is_received() => {
3182 let scoping = self.envelope.scoping();
3183 self.envelope.accept();
3184
3185 if let UpstreamRequestError::RateLimited(limits) = error {
3186 self.project_cache
3187 .get(scoping.project_key)
3188 .rate_limits()
3189 .merge(limits.scope(&scoping));
3190 }
3191 }
3192 Err(error) => {
3193 let mut envelope = self.envelope;
3196 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3197 relay_log::error!(
3198 error = &error as &dyn Error,
3199 tags.project_key = %envelope.scoping().project_key,
3200 "error sending envelope"
3201 );
3202 }
3203 }
3204 })
3205 }
3206}
3207
3208#[derive(Debug)]
3215struct Partition<'a> {
3216 max_size: usize,
3217 remaining: usize,
3218 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3219 project_info: HashMap<ProjectKey, Scoping>,
3220}
3221
3222impl<'a> Partition<'a> {
3223 pub fn new(size: usize) -> Self {
3225 Self {
3226 max_size: size,
3227 remaining: size,
3228 views: HashMap::new(),
3229 project_info: HashMap::new(),
3230 }
3231 }
3232
3233 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3244 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3245
3246 if let Some(current) = current {
3247 self.remaining = self.remaining.saturating_sub(current.estimated_size());
3248 self.views
3249 .entry(scoping.project_key)
3250 .or_default()
3251 .push(current);
3252
3253 self.project_info
3254 .entry(scoping.project_key)
3255 .or_insert(scoping);
3256 }
3257
3258 next
3259 }
3260
3261 fn is_empty(&self) -> bool {
3263 self.views.is_empty()
3264 }
3265
3266 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3270 #[derive(serde::Serialize)]
3271 struct Wrapper<'a> {
3272 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3273 }
3274
3275 let buckets = &self.views;
3276 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3277
3278 let scopings = self.project_info.clone();
3279 self.project_info.clear();
3280
3281 self.views.clear();
3282 self.remaining = self.max_size;
3283
3284 (payload, scopings)
3285 }
3286}
3287
3288#[derive(Debug)]
3292struct SendMetricsRequest {
3293 partition_key: String,
3295 unencoded: Bytes,
3297 encoded: Bytes,
3299 project_info: HashMap<ProjectKey, Scoping>,
3303 http_encoding: HttpEncoding,
3305 metric_outcomes: MetricOutcomes,
3307}
3308
3309impl SendMetricsRequest {
3310 fn create_error_outcomes(self) {
3311 #[derive(serde::Deserialize)]
3312 struct Wrapper {
3313 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3314 }
3315
3316 let buckets = match serde_json::from_slice(&self.unencoded) {
3317 Ok(Wrapper { buckets }) => buckets,
3318 Err(err) => {
3319 relay_log::error!(
3320 error = &err as &dyn std::error::Error,
3321 "failed to parse buckets from failed transmission"
3322 );
3323 return;
3324 }
3325 };
3326
3327 for (key, buckets) in buckets {
3328 let Some(&scoping) = self.project_info.get(&key) else {
3329 relay_log::error!("missing scoping for project key");
3330 continue;
3331 };
3332
3333 self.metric_outcomes.track(
3334 scoping,
3335 &buckets,
3336 Outcome::Invalid(DiscardReason::Internal),
3337 );
3338 }
3339 }
3340}
3341
3342impl UpstreamRequest for SendMetricsRequest {
3343 fn set_relay_id(&self) -> bool {
3344 true
3345 }
3346
3347 fn sign(&mut self) -> Option<Sign> {
3348 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3349 }
3350
3351 fn method(&self) -> reqwest::Method {
3352 reqwest::Method::POST
3353 }
3354
3355 fn path(&self) -> Cow<'_, str> {
3356 "/api/0/relays/metrics/".into()
3357 }
3358
3359 fn route(&self) -> &'static str {
3360 "global_metrics"
3361 }
3362
3363 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3364 metric!(
3365 distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3366 );
3367
3368 builder
3369 .content_encoding(self.http_encoding)
3370 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3371 .header(header::CONTENT_TYPE, b"application/json")
3372 .body(self.encoded.clone());
3373
3374 Ok(())
3375 }
3376
3377 fn respond(
3378 self: Box<Self>,
3379 result: Result<http::Response, UpstreamRequestError>,
3380 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3381 Box::pin(async {
3382 match result {
3383 Ok(mut response) => {
3384 response.consume().await.ok();
3385 }
3386 Err(error) => {
3387 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3388
3389 if error.is_received() {
3392 return;
3393 }
3394
3395 self.create_error_outcomes()
3396 }
3397 }
3398 })
3399 }
3400}
3401
3402#[derive(Copy, Clone, Debug)]
3404struct CombinedQuotas<'a> {
3405 global_quotas: &'a [Quota],
3406 project_quotas: &'a [Quota],
3407}
3408
3409impl<'a> CombinedQuotas<'a> {
3410 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3412 Self {
3413 global_quotas: &global_config.quotas,
3414 project_quotas,
3415 }
3416 }
3417
3418 pub fn is_empty(&self) -> bool {
3420 self.len() == 0
3421 }
3422
3423 pub fn len(&self) -> usize {
3425 self.global_quotas.len() + self.project_quotas.len()
3426 }
3427}
3428
3429impl<'a> IntoIterator for CombinedQuotas<'a> {
3430 type Item = &'a Quota;
3431 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3432
3433 fn into_iter(self) -> Self::IntoIter {
3434 self.global_quotas.iter().chain(self.project_quotas.iter())
3435 }
3436}
3437
3438#[cfg(test)]
3439mod tests {
3440 use std::collections::BTreeMap;
3441
3442 use insta::assert_debug_snapshot;
3443 use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3444 use relay_common::glob2::LazyGlob;
3445 use relay_dynamic_config::ProjectConfig;
3446 use relay_event_normalization::{
3447 MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3448 TransactionNameRule,
3449 };
3450 use relay_event_schema::protocol::TransactionSource;
3451 use relay_pii::DataScrubbingConfig;
3452 use similar_asserts::assert_eq;
3453
3454 use crate::metrics_extraction::IntoMetric;
3455 use crate::metrics_extraction::transactions::types::{
3456 CommonTags, TransactionMeasurementTags, TransactionMetric,
3457 };
3458 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3459
3460 #[cfg(feature = "processing")]
3461 use {
3462 relay_metrics::BucketValue,
3463 relay_quotas::{QuotaScope, ReasonCode},
3464 relay_test::mock_service,
3465 };
3466
3467 use super::*;
3468
3469 #[cfg(feature = "processing")]
3470 fn mock_quota(id: &str) -> Quota {
3471 Quota {
3472 id: Some(id.into()),
3473 categories: [DataCategory::MetricBucket].into(),
3474 scope: QuotaScope::Organization,
3475 scope_id: None,
3476 limit: Some(0),
3477 window: None,
3478 reason_code: None,
3479 namespace: None,
3480 }
3481 }
3482
3483 #[cfg(feature = "processing")]
3484 #[test]
3485 fn test_dynamic_quotas() {
3486 let global_config = GlobalConfig {
3487 quotas: vec![mock_quota("foo"), mock_quota("bar")],
3488 ..Default::default()
3489 };
3490
3491 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3492
3493 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3494
3495 assert_eq!(dynamic_quotas.len(), 4);
3496 assert!(!dynamic_quotas.is_empty());
3497
3498 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3499 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3500 }
3501
3502 #[cfg(feature = "processing")]
3505 #[tokio::test]
3506 async fn test_ratelimit_per_batch() {
3507 use relay_base_schema::organization::OrganizationId;
3508 use relay_protocol::FiniteF64;
3509
3510 let rate_limited_org = Scoping {
3511 organization_id: OrganizationId::new(1),
3512 project_id: ProjectId::new(21),
3513 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3514 key_id: Some(17),
3515 };
3516
3517 let not_rate_limited_org = Scoping {
3518 organization_id: OrganizationId::new(2),
3519 project_id: ProjectId::new(21),
3520 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3521 key_id: Some(17),
3522 };
3523
3524 let message = {
3525 let project_info = {
3526 let quota = Quota {
3527 id: Some("testing".into()),
3528 categories: [DataCategory::MetricBucket].into(),
3529 scope: relay_quotas::QuotaScope::Organization,
3530 scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3531 limit: Some(0),
3532 window: None,
3533 reason_code: Some(ReasonCode::new("test")),
3534 namespace: None,
3535 };
3536
3537 let mut config = ProjectConfig::default();
3538 config.quotas.push(quota);
3539
3540 Arc::new(ProjectInfo {
3541 config,
3542 ..Default::default()
3543 })
3544 };
3545
3546 let project_metrics = |scoping| ProjectBuckets {
3547 buckets: vec![Bucket {
3548 name: "d:transactions/bar".into(),
3549 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3550 timestamp: UnixTimestamp::now(),
3551 tags: Default::default(),
3552 width: 10,
3553 metadata: BucketMetadata::default(),
3554 }],
3555 rate_limits: Default::default(),
3556 project_info: project_info.clone(),
3557 scoping,
3558 };
3559
3560 let buckets = hashbrown::HashMap::from([
3561 (
3562 rate_limited_org.project_key,
3563 project_metrics(rate_limited_org),
3564 ),
3565 (
3566 not_rate_limited_org.project_key,
3567 project_metrics(not_rate_limited_org),
3568 ),
3569 ]);
3570
3571 FlushBuckets {
3572 partition_key: 0,
3573 buckets,
3574 }
3575 };
3576
3577 assert_eq!(message.buckets.keys().count(), 2);
3579
3580 let config = {
3581 let config_json = serde_json::json!({
3582 "processing": {
3583 "enabled": true,
3584 "kafka_config": [],
3585 "redis": {
3586 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3587 }
3588 }
3589 });
3590 Config::from_json_value(config_json).unwrap()
3591 };
3592
3593 let (store, handle) = {
3594 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3595 let org_id = match msg {
3596 Store::Metrics(x) => x.scoping.organization_id,
3597 _ => panic!("received envelope when expecting only metrics"),
3598 };
3599 org_ids.push(org_id);
3600 };
3601
3602 mock_service("store_forwarder", vec![], f)
3603 };
3604
3605 let processor = create_test_processor(config).await;
3606 assert!(processor.redis_rate_limiter_enabled());
3607
3608 processor.encode_metrics_processing(message, &store).await;
3609
3610 drop(store);
3611 let orgs_not_ratelimited = handle.await.unwrap();
3612
3613 assert_eq!(
3614 orgs_not_ratelimited,
3615 vec![not_rate_limited_org.organization_id]
3616 );
3617 }
3618
3619 #[tokio::test]
3620 async fn test_browser_version_extraction_with_pii_like_data() {
3621 let processor = create_test_processor(Default::default()).await;
3622 let outcome_aggregator = Addr::dummy();
3623 let event_id = EventId::new();
3624
3625 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3626 .parse()
3627 .unwrap();
3628
3629 let request_meta = RequestMeta::new(dsn);
3630 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3631
3632 envelope.add_item({
3633 let mut item = Item::new(ItemType::Event);
3634 item.set_payload(
3635 ContentType::Json,
3636 r#"
3637 {
3638 "request": {
3639 "headers": [
3640 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3641 ]
3642 }
3643 }
3644 "#,
3645 );
3646 item
3647 });
3648
3649 let mut datascrubbing_settings = DataScrubbingConfig::default();
3650 datascrubbing_settings.scrub_data = true;
3652 datascrubbing_settings.scrub_defaults = true;
3653 datascrubbing_settings.scrub_ip_addresses = true;
3654
3655 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3657
3658 let config = ProjectConfig {
3659 datascrubbing_settings,
3660 pii_config: Some(pii_config),
3661 ..Default::default()
3662 };
3663
3664 let project_info = ProjectInfo {
3665 config,
3666 ..Default::default()
3667 };
3668
3669 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3670 assert_eq!(envelopes.len(), 1);
3671
3672 let (group, envelope) = envelopes.pop().unwrap();
3673 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3674
3675 let message = ProcessEnvelopeGrouped {
3676 group,
3677 envelope,
3678 ctx: processing::Context {
3679 project_info: &project_info,
3680 ..processing::Context::for_test()
3681 },
3682 };
3683
3684 let Ok(Some(Submit::Envelope(mut new_envelope))) =
3685 processor.process(&mut Token::noop(), message).await
3686 else {
3687 panic!();
3688 };
3689 let new_envelope = new_envelope.envelope_mut();
3690
3691 let event_item = new_envelope.items().last().unwrap();
3692 let annotated_event: Annotated<Event> =
3693 Annotated::from_json_bytes(&event_item.payload()).unwrap();
3694 let event = annotated_event.into_value().unwrap();
3695 let headers = event
3696 .request
3697 .into_value()
3698 .unwrap()
3699 .headers
3700 .into_value()
3701 .unwrap();
3702
3703 assert_eq!(
3705 Some(
3706 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3707 ),
3708 headers.get_header("User-Agent")
3709 );
3710 let contexts = event.contexts.into_value().unwrap();
3712 let browser = contexts.0.get("browser").unwrap();
3713 assert_eq!(
3714 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3715 browser.to_json().unwrap()
3716 );
3717 }
3718
3719 #[tokio::test]
3720 #[cfg(feature = "processing")]
3721 async fn test_materialize_dsc() {
3722 use crate::services::projects::project::PublicKeyConfig;
3723
3724 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3725 .parse()
3726 .unwrap();
3727 let request_meta = RequestMeta::new(dsn);
3728 let mut envelope = Envelope::from_request(None, request_meta);
3729
3730 let dsc = r#"{
3731 "trace_id": "00000000-0000-0000-0000-000000000000",
3732 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3733 "sample_rate": "0.2"
3734 }"#;
3735 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3736
3737 let mut item = Item::new(ItemType::Event);
3738 item.set_payload(ContentType::Json, r#"{}"#);
3739 envelope.add_item(item);
3740
3741 let outcome_aggregator = Addr::dummy();
3742 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3743
3744 let mut project_info = ProjectInfo::default();
3745 project_info.public_keys.push(PublicKeyConfig {
3746 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3747 numeric_id: Some(1),
3748 });
3749
3750 let config = serde_json::json!({
3751 "processing": {
3752 "enabled": true,
3753 "kafka_config": [],
3754 }
3755 });
3756
3757 let message = ProcessEnvelopeGrouped {
3758 group: ProcessingGroup::Transaction,
3759 envelope: managed_envelope,
3760 ctx: processing::Context {
3761 config: &Config::from_json_value(config.clone()).unwrap(),
3762 project_info: &project_info,
3763 sampling_project_info: Some(&project_info),
3764 ..processing::Context::for_test()
3765 },
3766 };
3767
3768 let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3769 let Ok(Some(Submit::Envelope(envelope))) =
3770 processor.process(&mut Token::noop(), message).await
3771 else {
3772 panic!();
3773 };
3774 let event = envelope
3775 .envelope()
3776 .get_item_by(|item| item.ty() == &ItemType::Event)
3777 .unwrap();
3778
3779 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3780 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3781 Object(
3782 {
3783 "environment": ~,
3784 "public_key": String(
3785 "e12d836b15bb49d7bbf99e64295d995b",
3786 ),
3787 "release": ~,
3788 "replay_id": ~,
3789 "sample_rate": String(
3790 "0.2",
3791 ),
3792 "trace_id": String(
3793 "00000000000000000000000000000000",
3794 ),
3795 "transaction": ~,
3796 },
3797 )
3798 "###);
3799 }
3800
3801 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3802 let mut event = Annotated::<Event>::from_json(
3803 r#"
3804 {
3805 "type": "transaction",
3806 "transaction": "/foo/",
3807 "timestamp": 946684810.0,
3808 "start_timestamp": 946684800.0,
3809 "contexts": {
3810 "trace": {
3811 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3812 "span_id": "fa90fdead5f74053",
3813 "op": "http.server",
3814 "type": "trace"
3815 }
3816 },
3817 "transaction_info": {
3818 "source": "url"
3819 }
3820 }
3821 "#,
3822 )
3823 .unwrap();
3824 let e = event.value_mut().as_mut().unwrap();
3825 e.transaction.set_value(Some(transaction_name.into()));
3826
3827 e.transaction_info
3828 .value_mut()
3829 .as_mut()
3830 .unwrap()
3831 .source
3832 .set_value(Some(source));
3833
3834 relay_statsd::with_capturing_test_client(|| {
3835 utils::log_transaction_name_metrics(&mut event, |event| {
3836 let config = NormalizationConfig {
3837 transaction_name_config: TransactionNameConfig {
3838 rules: &[TransactionNameRule {
3839 pattern: LazyGlob::new("/foo/*/**".to_owned()),
3840 expiry: DateTime::<Utc>::MAX_UTC,
3841 redaction: RedactionRule::Replace {
3842 substitution: "*".to_owned(),
3843 },
3844 }],
3845 },
3846 ..Default::default()
3847 };
3848 relay_event_normalization::normalize_event(event, &config)
3849 });
3850 })
3851 }
3852
3853 #[test]
3854 fn test_log_transaction_metrics_none() {
3855 let captures = capture_test_event("/nothing", TransactionSource::Url);
3856 insta::assert_debug_snapshot!(captures, @r###"
3857 [
3858 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3859 ]
3860 "###);
3861 }
3862
3863 #[test]
3864 fn test_log_transaction_metrics_rule() {
3865 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3866 insta::assert_debug_snapshot!(captures, @r###"
3867 [
3868 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3869 ]
3870 "###);
3871 }
3872
3873 #[test]
3874 fn test_log_transaction_metrics_pattern() {
3875 let captures = capture_test_event("/something/12345", TransactionSource::Url);
3876 insta::assert_debug_snapshot!(captures, @r###"
3877 [
3878 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3879 ]
3880 "###);
3881 }
3882
3883 #[test]
3884 fn test_log_transaction_metrics_both() {
3885 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3886 insta::assert_debug_snapshot!(captures, @r###"
3887 [
3888 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3889 ]
3890 "###);
3891 }
3892
3893 #[test]
3894 fn test_log_transaction_metrics_no_match() {
3895 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3896 insta::assert_debug_snapshot!(captures, @r###"
3897 [
3898 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3899 ]
3900 "###);
3901 }
3902
3903 #[test]
3907 fn test_mri_overhead_constant() {
3908 let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
3909
3910 let derived_value = {
3911 let name = "foobar".to_owned();
3912 let value = 5.into(); let unit = MetricUnit::Duration(DurationUnit::default());
3914 let tags = TransactionMeasurementTags {
3915 measurement_rating: None,
3916 universal_tags: CommonTags(BTreeMap::new()),
3917 score_profile_version: None,
3918 };
3919
3920 let measurement = TransactionMetric::Measurement {
3921 name: name.clone(),
3922 value,
3923 unit,
3924 tags,
3925 };
3926
3927 let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
3928 metric.name.len() - unit.to_string().len() - name.len()
3929 };
3930 assert_eq!(
3931 hardcoded_value, derived_value,
3932 "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
3933 );
3934 }
3935
3936 #[tokio::test]
3937 async fn test_process_metrics_bucket_metadata() {
3938 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3939 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3940 let received_at = Utc::now();
3941 let config = Config::default();
3942
3943 let (aggregator, mut aggregator_rx) = Addr::custom();
3944 let processor = create_test_processor_with_addrs(
3945 config,
3946 Addrs {
3947 aggregator,
3948 ..Default::default()
3949 },
3950 )
3951 .await;
3952
3953 let mut item = Item::new(ItemType::Statsd);
3954 item.set_payload(
3955 ContentType::Text,
3956 "transactions/foo:3182887624:4267882815|s",
3957 );
3958 for (source, expected_received_at) in [
3959 (
3960 BucketSource::External,
3961 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3962 ),
3963 (BucketSource::Internal, None),
3964 ] {
3965 let message = ProcessMetrics {
3966 data: MetricData::Raw(vec![item.clone()]),
3967 project_key,
3968 source,
3969 received_at,
3970 sent_at: Some(Utc::now()),
3971 };
3972 processor.handle_process_metrics(&mut token, message);
3973
3974 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3975 let buckets = merge_buckets.buckets;
3976 assert_eq!(buckets.len(), 1);
3977 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3978 }
3979 }
3980
3981 #[tokio::test]
3982 async fn test_process_batched_metrics() {
3983 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3984 let received_at = Utc::now();
3985 let config = Config::default();
3986
3987 let (aggregator, mut aggregator_rx) = Addr::custom();
3988 let processor = create_test_processor_with_addrs(
3989 config,
3990 Addrs {
3991 aggregator,
3992 ..Default::default()
3993 },
3994 )
3995 .await;
3996
3997 let payload = r#"{
3998 "buckets": {
3999 "11111111111111111111111111111111": [
4000 {
4001 "timestamp": 1615889440,
4002 "width": 0,
4003 "name": "d:custom/endpoint.response_time@millisecond",
4004 "type": "d",
4005 "value": [
4006 68.0
4007 ],
4008 "tags": {
4009 "route": "user_index"
4010 }
4011 }
4012 ],
4013 "22222222222222222222222222222222": [
4014 {
4015 "timestamp": 1615889440,
4016 "width": 0,
4017 "name": "d:custom/endpoint.cache_rate@none",
4018 "type": "d",
4019 "value": [
4020 36.0
4021 ]
4022 }
4023 ]
4024 }
4025}
4026"#;
4027 let message = ProcessBatchedMetrics {
4028 payload: Bytes::from(payload),
4029 source: BucketSource::Internal,
4030 received_at,
4031 sent_at: Some(Utc::now()),
4032 };
4033 processor.handle_process_batched_metrics(&mut token, message);
4034
4035 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
4036 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
4037
4038 let mut messages = vec![mb1, mb2];
4039 messages.sort_by_key(|mb| mb.project_key);
4040
4041 let actual = messages
4042 .into_iter()
4043 .map(|mb| (mb.project_key, mb.buckets))
4044 .collect::<Vec<_>>();
4045
4046 assert_debug_snapshot!(actual, @r###"
4047 [
4048 (
4049 ProjectKey("11111111111111111111111111111111"),
4050 [
4051 Bucket {
4052 timestamp: UnixTimestamp(1615889440),
4053 width: 0,
4054 name: MetricName(
4055 "d:custom/endpoint.response_time@millisecond",
4056 ),
4057 value: Distribution(
4058 [
4059 68.0,
4060 ],
4061 ),
4062 tags: {
4063 "route": "user_index",
4064 },
4065 metadata: BucketMetadata {
4066 merges: 1,
4067 received_at: None,
4068 extracted_from_indexed: false,
4069 },
4070 },
4071 ],
4072 ),
4073 (
4074 ProjectKey("22222222222222222222222222222222"),
4075 [
4076 Bucket {
4077 timestamp: UnixTimestamp(1615889440),
4078 width: 0,
4079 name: MetricName(
4080 "d:custom/endpoint.cache_rate@none",
4081 ),
4082 value: Distribution(
4083 [
4084 36.0,
4085 ],
4086 ),
4087 tags: {},
4088 metadata: BucketMetadata {
4089 merges: 1,
4090 received_at: None,
4091 extracted_from_indexed: false,
4092 },
4093 },
4094 ],
4095 ),
4096 ]
4097 "###);
4098 }
4099}