1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27 ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42 self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::{Integration, SpansIntegration};
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::ExtractedMetrics;
49use crate::metrics_extraction::transactions::types::ExtractMetricsError;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::sessions::SessionsProcessor;
53use crate::processing::spans::SpansProcessor;
54use crate::processing::trace_attachments::TraceAttachmentsProcessor;
55use crate::processing::trace_metrics::TraceMetricsProcessor;
56use crate::processing::transactions::extraction::ExtractMetricsContext;
57use crate::processing::utils::event::{
58 EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
59 event_type,
60};
61use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
62use crate::service::ServiceError;
63use crate::services::global_config::GlobalConfigHandle;
64use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
65use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
66use crate::services::projects::cache::ProjectCacheHandle;
67use crate::services::projects::project::{ProjectInfo, ProjectState};
68use crate::services::upstream::{
69 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
70};
71use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
72use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
73use crate::{http, processing};
74use relay_threading::AsyncPool;
75#[cfg(feature = "processing")]
76use {
77 crate::services::global_rate_limits::{GlobalRateLimits, GlobalRateLimitsServiceHandle},
78 crate::services::processor::nnswitch::SwitchProcessingError,
79 crate::services::store::{Store, StoreEnvelope},
80 crate::services::upload::Upload,
81 crate::utils::Enforcement,
82 itertools::Itertools,
83 relay_cardinality::{
84 CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
85 RedisSetLimiterOptions,
86 },
87 relay_dynamic_config::CardinalityLimiterMode,
88 relay_quotas::{RateLimitingError, RedisRateLimiter},
89 relay_redis::{AsyncRedisClient, RedisClients},
90 std::time::Instant,
91 symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
92};
93
94mod attachment;
95mod dynamic_sampling;
96mod event;
97mod metrics;
98mod nel;
99mod profile;
100mod profile_chunk;
101mod replay;
102mod report;
103mod span;
104
105#[cfg(all(sentry, feature = "processing"))]
106mod playstation;
107mod standalone;
108#[cfg(feature = "processing")]
109mod unreal;
110
111#[cfg(feature = "processing")]
112mod nnswitch;
113
114macro_rules! if_processing {
118 ($config:expr, $if_true:block) => {
119 #[cfg(feature = "processing")] {
120 if $config.processing_enabled() $if_true
121 }
122 };
123 ($config:expr, $if_true:block else $if_false:block) => {
124 {
125 #[cfg(feature = "processing")] {
126 if $config.processing_enabled() $if_true else $if_false
127 }
128 #[cfg(not(feature = "processing"))] {
129 $if_false
130 }
131 }
132 };
133}
134
135pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
137
138#[derive(Debug)]
139pub struct GroupTypeError;
140
141impl Display for GroupTypeError {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 f.write_str("failed to convert processing group into corresponding type")
144 }
145}
146
147impl std::error::Error for GroupTypeError {}
148
149macro_rules! processing_group {
150 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
151 #[derive(Clone, Copy, Debug)]
152 pub struct $ty;
153
154 impl From<$ty> for ProcessingGroup {
155 fn from(_: $ty) -> Self {
156 ProcessingGroup::$variant
157 }
158 }
159
160 impl TryFrom<ProcessingGroup> for $ty {
161 type Error = GroupTypeError;
162
163 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
164 if matches!(value, ProcessingGroup::$variant) {
165 return Ok($ty);
166 }
167 $($(
168 if matches!(value, ProcessingGroup::$other) {
169 return Ok($ty);
170 }
171 )+)?
172 return Err(GroupTypeError);
173 }
174 }
175 };
176}
177
178pub trait EventProcessing {}
182
183processing_group!(TransactionGroup, Transaction);
184impl EventProcessing for TransactionGroup {}
185
186processing_group!(ErrorGroup, Error);
187impl EventProcessing for ErrorGroup {}
188
189processing_group!(SessionGroup, Session);
190processing_group!(StandaloneGroup, Standalone);
191processing_group!(ClientReportGroup, ClientReport);
192processing_group!(ReplayGroup, Replay);
193processing_group!(CheckInGroup, CheckIn);
194processing_group!(LogGroup, Log, Nel);
195processing_group!(TraceMetricGroup, TraceMetric);
196processing_group!(SpanGroup, Span);
197
198processing_group!(ProfileChunkGroup, ProfileChunk);
199processing_group!(MetricsGroup, Metrics);
200processing_group!(ForwardUnknownGroup, ForwardUnknown);
201processing_group!(Ungrouped, Ungrouped);
202
203#[derive(Clone, Copy, Debug)]
207pub struct Processed;
208
209#[derive(Clone, Copy, Debug)]
211pub enum ProcessingGroup {
212 Transaction,
216 Error,
221 Session,
223 Standalone,
226 ClientReport,
228 Replay,
230 CheckIn,
232 Nel,
234 Log,
236 TraceMetric,
238 Span,
240 SpanV2,
242 Metrics,
244 ProfileChunk,
246 TraceAttachment,
248 ForwardUnknown,
251 Ungrouped,
253}
254
255impl ProcessingGroup {
256 fn split_envelope(
258 mut envelope: Envelope,
259 project_info: &ProjectInfo,
260 ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
261 let headers = envelope.headers().clone();
262 let mut grouped_envelopes = smallvec![];
263
264 let replay_items = envelope.take_items_by(|item| {
266 matches!(
267 item.ty(),
268 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
269 )
270 });
271 if !replay_items.is_empty() {
272 grouped_envelopes.push((
273 ProcessingGroup::Replay,
274 Envelope::from_parts(headers.clone(), replay_items),
275 ))
276 }
277
278 let session_items = envelope
280 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
281 if !session_items.is_empty() {
282 grouped_envelopes.push((
283 ProcessingGroup::Session,
284 Envelope::from_parts(headers.clone(), session_items),
285 ))
286 }
287
288 let span_v2_items = envelope.take_items_by(|item| {
289 let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
290 let is_supported_integration = {
291 matches!(
292 item.integration(),
293 Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
294 )
295 };
296 let is_span = matches!(item.ty(), &ItemType::Span);
297 let is_span_attachment = item.is_span_attachment();
298
299 ItemContainer::<SpanV2>::is_container(item)
300 || (exp_feature && is_span)
301 || (exp_feature && is_supported_integration)
302 || (exp_feature && is_span_attachment)
303 });
304
305 if !span_v2_items.is_empty() {
306 grouped_envelopes.push((
307 ProcessingGroup::SpanV2,
308 Envelope::from_parts(headers.clone(), span_v2_items),
309 ))
310 }
311
312 let span_items = envelope.take_items_by(|item| {
314 matches!(item.ty(), &ItemType::Span)
315 || matches!(item.integration(), Some(Integration::Spans(_)))
316 });
317
318 if !span_items.is_empty() {
319 grouped_envelopes.push((
320 ProcessingGroup::Span,
321 Envelope::from_parts(headers.clone(), span_items),
322 ))
323 }
324
325 let logs_items = envelope.take_items_by(|item| {
327 matches!(item.ty(), &ItemType::Log)
328 || matches!(item.integration(), Some(Integration::Logs(_)))
329 });
330 if !logs_items.is_empty() {
331 grouped_envelopes.push((
332 ProcessingGroup::Log,
333 Envelope::from_parts(headers.clone(), logs_items),
334 ))
335 }
336
337 let trace_metric_items =
339 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
340 if !trace_metric_items.is_empty() {
341 grouped_envelopes.push((
342 ProcessingGroup::TraceMetric,
343 Envelope::from_parts(headers.clone(), trace_metric_items),
344 ))
345 }
346
347 let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
349 if !nel_items.is_empty() {
350 grouped_envelopes.push((
351 ProcessingGroup::Nel,
352 Envelope::from_parts(headers.clone(), nel_items),
353 ))
354 }
355
356 let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
361 if !metric_items.is_empty() {
362 grouped_envelopes.push((
363 ProcessingGroup::Metrics,
364 Envelope::from_parts(headers.clone(), metric_items),
365 ))
366 }
367
368 let profile_chunk_items =
370 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
371 if !profile_chunk_items.is_empty() {
372 grouped_envelopes.push((
373 ProcessingGroup::ProfileChunk,
374 Envelope::from_parts(headers.clone(), profile_chunk_items),
375 ))
376 }
377
378 let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
379 if !trace_attachment_items.is_empty() {
380 grouped_envelopes.push((
381 ProcessingGroup::TraceAttachment,
382 Envelope::from_parts(headers.clone(), trace_attachment_items),
383 ))
384 }
385
386 if !envelope.items().any(Item::creates_event) {
391 let standalone_items = envelope.take_items_by(Item::requires_event);
392 if !standalone_items.is_empty() {
393 grouped_envelopes.push((
394 ProcessingGroup::Standalone,
395 Envelope::from_parts(headers.clone(), standalone_items),
396 ))
397 }
398 };
399
400 let security_reports_items = envelope
402 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
403 .into_iter()
404 .map(|item| {
405 let headers = headers.clone();
406 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
407 let mut envelope = Envelope::from_parts(headers, items);
408 envelope.set_event_id(EventId::new());
409 (ProcessingGroup::Error, envelope)
410 });
411 grouped_envelopes.extend(security_reports_items);
412
413 let require_event_items = envelope.take_items_by(Item::requires_event);
415 if !require_event_items.is_empty() {
416 let group = if require_event_items
417 .iter()
418 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
419 {
420 ProcessingGroup::Transaction
421 } else {
422 ProcessingGroup::Error
423 };
424
425 grouped_envelopes.push((
426 group,
427 Envelope::from_parts(headers.clone(), require_event_items),
428 ))
429 }
430
431 let envelopes = envelope.items_mut().map(|item| {
433 let headers = headers.clone();
434 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
435 let envelope = Envelope::from_parts(headers, items);
436 let item_type = item.ty();
437 let group = if matches!(item_type, &ItemType::CheckIn) {
438 ProcessingGroup::CheckIn
439 } else if matches!(item.ty(), &ItemType::ClientReport) {
440 ProcessingGroup::ClientReport
441 } else if matches!(item_type, &ItemType::Unknown(_)) {
442 ProcessingGroup::ForwardUnknown
443 } else {
444 ProcessingGroup::Ungrouped
446 };
447
448 (group, envelope)
449 });
450 grouped_envelopes.extend(envelopes);
451
452 grouped_envelopes
453 }
454
455 pub fn variant(&self) -> &'static str {
457 match self {
458 ProcessingGroup::Transaction => "transaction",
459 ProcessingGroup::Error => "error",
460 ProcessingGroup::Session => "session",
461 ProcessingGroup::Standalone => "standalone",
462 ProcessingGroup::ClientReport => "client_report",
463 ProcessingGroup::Replay => "replay",
464 ProcessingGroup::CheckIn => "check_in",
465 ProcessingGroup::Log => "log",
466 ProcessingGroup::TraceMetric => "trace_metric",
467 ProcessingGroup::Nel => "nel",
468 ProcessingGroup::Span => "span",
469 ProcessingGroup::SpanV2 => "span_v2",
470 ProcessingGroup::Metrics => "metrics",
471 ProcessingGroup::ProfileChunk => "profile_chunk",
472 ProcessingGroup::TraceAttachment => "trace_attachment",
473 ProcessingGroup::ForwardUnknown => "forward_unknown",
474 ProcessingGroup::Ungrouped => "ungrouped",
475 }
476 }
477}
478
479impl From<ProcessingGroup> for AppFeature {
480 fn from(value: ProcessingGroup) -> Self {
481 match value {
482 ProcessingGroup::Transaction => AppFeature::Transactions,
483 ProcessingGroup::Error => AppFeature::Errors,
484 ProcessingGroup::Session => AppFeature::Sessions,
485 ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
486 ProcessingGroup::ClientReport => AppFeature::ClientReports,
487 ProcessingGroup::Replay => AppFeature::Replays,
488 ProcessingGroup::CheckIn => AppFeature::CheckIns,
489 ProcessingGroup::Log => AppFeature::Logs,
490 ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
491 ProcessingGroup::Nel => AppFeature::Logs,
492 ProcessingGroup::Span => AppFeature::Spans,
493 ProcessingGroup::SpanV2 => AppFeature::Spans,
494 ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
495 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
496 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
497 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
498 ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
499 }
500 }
501}
502
503#[derive(Debug, thiserror::Error)]
505pub enum ProcessingError {
506 #[error("invalid json in event")]
507 InvalidJson(#[source] serde_json::Error),
508
509 #[error("invalid message pack event payload")]
510 InvalidMsgpack(#[from] rmp_serde::decode::Error),
511
512 #[cfg(feature = "processing")]
513 #[error("invalid unreal crash report")]
514 InvalidUnrealReport(#[source] Unreal4Error),
515
516 #[error("event payload too large")]
517 PayloadTooLarge(DiscardItemType),
518
519 #[error("invalid transaction event")]
520 InvalidTransaction,
521
522 #[error("envelope processor failed")]
523 ProcessingFailed(#[from] ProcessingAction),
524
525 #[error("duplicate {0} in event")]
526 DuplicateItem(ItemType),
527
528 #[error("failed to extract event payload")]
529 NoEventPayload,
530
531 #[error("missing project id in DSN")]
532 MissingProjectId,
533
534 #[error("invalid security report type: {0:?}")]
535 InvalidSecurityType(Bytes),
536
537 #[error("unsupported security report type")]
538 UnsupportedSecurityType,
539
540 #[error("invalid security report")]
541 InvalidSecurityReport(#[source] serde_json::Error),
542
543 #[error("invalid nel report")]
544 InvalidNelReport(#[source] NetworkReportError),
545
546 #[error("event filtered with reason: {0:?}")]
547 EventFiltered(FilterStatKey),
548
549 #[error("missing or invalid required event timestamp")]
550 InvalidTimestamp,
551
552 #[error("could not serialize event payload")]
553 SerializeFailed(#[source] serde_json::Error),
554
555 #[cfg(feature = "processing")]
556 #[error("failed to apply quotas")]
557 QuotasFailed(#[from] RateLimitingError),
558
559 #[error("invalid pii config")]
560 PiiConfigError(PiiConfigError),
561
562 #[error("invalid processing group type")]
563 InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
564
565 #[error("invalid replay")]
566 InvalidReplay(DiscardReason),
567
568 #[error("replay filtered with reason: {0:?}")]
569 ReplayFiltered(FilterStatKey),
570
571 #[cfg(feature = "processing")]
572 #[error("nintendo switch dying message processing failed {0:?}")]
573 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
574
575 #[cfg(all(sentry, feature = "processing"))]
576 #[error("playstation dump processing failed: {0}")]
577 InvalidPlaystationDump(String),
578
579 #[error("processing group does not match specific processor")]
580 ProcessingGroupMismatch,
581 #[error("new processing pipeline failed")]
582 ProcessingFailure,
583}
584
585impl ProcessingError {
586 pub fn to_outcome(&self) -> Option<Outcome> {
587 match self {
588 Self::PayloadTooLarge(payload_type) => {
589 Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
590 }
591 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
592 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
593 Self::InvalidSecurityType(_) => {
594 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
595 }
596 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
597 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
598 Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
599 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
600 Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
601 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
602 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
603 #[cfg(feature = "processing")]
604 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
605 #[cfg(all(sentry, feature = "processing"))]
606 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
607 #[cfg(feature = "processing")]
608 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
609 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
610 }
611 #[cfg(feature = "processing")]
612 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
613 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
614 Some(Outcome::Invalid(DiscardReason::Internal))
615 }
616 #[cfg(feature = "processing")]
617 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
618 Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
619 Self::MissingProjectId => None,
620 Self::EventFiltered(_) => None,
621 Self::InvalidProcessingGroup(_) => None,
622 Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
623 Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
624
625 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
626 Self::ProcessingFailure => None,
628 }
629 }
630
631 fn is_unexpected(&self) -> bool {
632 self.to_outcome()
633 .is_some_and(|outcome| outcome.is_unexpected())
634 }
635}
636
637#[cfg(feature = "processing")]
638impl From<Unreal4Error> for ProcessingError {
639 fn from(err: Unreal4Error) -> Self {
640 match err.kind() {
641 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
642 _ => ProcessingError::InvalidUnrealReport(err),
643 }
644 }
645}
646
647impl From<ExtractMetricsError> for ProcessingError {
648 fn from(error: ExtractMetricsError) -> Self {
649 match error {
650 ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
651 Self::InvalidTimestamp
652 }
653 }
654 }
655}
656
657impl From<InvalidProcessingGroupType> for ProcessingError {
658 fn from(value: InvalidProcessingGroupType) -> Self {
659 Self::InvalidProcessingGroup(Box::new(value))
660 }
661}
662
663type ExtractedEvent = (Annotated<Event>, usize);
664
665#[derive(Debug)]
670pub struct ProcessingExtractedMetrics {
671 metrics: ExtractedMetrics,
672}
673
674impl ProcessingExtractedMetrics {
675 pub fn new() -> Self {
676 Self {
677 metrics: ExtractedMetrics::default(),
678 }
679 }
680
681 pub fn into_inner(self) -> ExtractedMetrics {
682 self.metrics
683 }
684
685 pub fn extend(
687 &mut self,
688 extracted: ExtractedMetrics,
689 sampling_decision: Option<SamplingDecision>,
690 ) {
691 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
692 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
693 }
694
695 pub fn extend_project_metrics<I>(
697 &mut self,
698 buckets: I,
699 sampling_decision: Option<SamplingDecision>,
700 ) where
701 I: IntoIterator<Item = Bucket>,
702 {
703 self.metrics
704 .project_metrics
705 .extend(buckets.into_iter().map(|mut bucket| {
706 bucket.metadata.extracted_from_indexed =
707 sampling_decision == Some(SamplingDecision::Keep);
708 bucket
709 }));
710 }
711
712 pub fn extend_sampling_metrics<I>(
714 &mut self,
715 buckets: I,
716 sampling_decision: Option<SamplingDecision>,
717 ) where
718 I: IntoIterator<Item = Bucket>,
719 {
720 self.metrics
721 .sampling_metrics
722 .extend(buckets.into_iter().map(|mut bucket| {
723 bucket.metadata.extracted_from_indexed =
724 sampling_decision == Some(SamplingDecision::Keep);
725 bucket
726 }));
727 }
728
729 #[cfg(feature = "processing")]
734 fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
735 let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
737 let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
740
741 for (namespace, limit, indexed) in [
742 (
743 MetricNamespace::Transactions,
744 &enforcement.event,
745 &enforcement.event_indexed,
746 ),
747 (
748 MetricNamespace::Spans,
749 &enforcement.spans,
750 &enforcement.spans_indexed,
751 ),
752 ] {
753 if limit.is_active() {
754 drop_namespaces.push(namespace);
755 } else if indexed.is_active() && !enforced_consistently {
756 reset_extracted_from_indexed.push(namespace);
761 }
762 }
763
764 if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
765 self.retain_mut(|bucket| {
766 let Some(namespace) = bucket.name.try_namespace() else {
767 return true;
768 };
769
770 if drop_namespaces.contains(&namespace) {
771 return false;
772 }
773
774 if reset_extracted_from_indexed.contains(&namespace) {
775 bucket.metadata.extracted_from_indexed = false;
776 }
777
778 true
779 });
780 }
781 }
782
783 #[cfg(feature = "processing")]
784 fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
785 self.metrics.project_metrics.retain_mut(&mut f);
786 self.metrics.sampling_metrics.retain_mut(&mut f);
787 }
788}
789
790fn send_metrics(
791 metrics: ExtractedMetrics,
792 project_key: ProjectKey,
793 sampling_key: Option<ProjectKey>,
794 aggregator: &Addr<Aggregator>,
795) {
796 let ExtractedMetrics {
797 project_metrics,
798 sampling_metrics,
799 } = metrics;
800
801 if !project_metrics.is_empty() {
802 aggregator.send(MergeBuckets {
803 project_key,
804 buckets: project_metrics,
805 });
806 }
807
808 if !sampling_metrics.is_empty() {
809 let sampling_project_key = sampling_key.unwrap_or(project_key);
816 aggregator.send(MergeBuckets {
817 project_key: sampling_project_key,
818 buckets: sampling_metrics,
819 });
820 }
821}
822
823fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
828 match config.relay_mode() {
829 RelayMode::Proxy => false,
830 RelayMode::Managed => !project_info.has_feature(feature),
831 }
832}
833
834#[derive(Debug)]
837#[expect(
838 clippy::large_enum_variant,
839 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
840)]
841enum ProcessingResult {
842 Envelope {
843 managed_envelope: TypedEnvelope<Processed>,
844 extracted_metrics: ProcessingExtractedMetrics,
845 },
846 Output(Output<Outputs>),
847}
848
849impl ProcessingResult {
850 fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
852 Self::Envelope {
853 managed_envelope,
854 extracted_metrics: ProcessingExtractedMetrics::new(),
855 }
856 }
857}
858
859#[derive(Debug)]
861#[expect(
862 clippy::large_enum_variant,
863 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
864)]
865enum Submit<'a> {
866 Envelope(TypedEnvelope<Processed>),
868 Output {
870 output: Outputs,
871 ctx: processing::ForwardContext<'a>,
872 },
873}
874
875#[derive(Debug)]
885pub struct ProcessEnvelope {
886 pub envelope: ManagedEnvelope,
888 pub project_info: Arc<ProjectInfo>,
890 pub rate_limits: Arc<RateLimits>,
892 pub sampling_project_info: Option<Arc<ProjectInfo>>,
894 pub reservoir_counters: ReservoirCounters,
896}
897
898#[derive(Debug)]
900struct ProcessEnvelopeGrouped<'a> {
901 pub group: ProcessingGroup,
903 pub envelope: ManagedEnvelope,
905 pub ctx: processing::Context<'a>,
907}
908
909#[derive(Debug)]
921pub struct ProcessMetrics {
922 pub data: MetricData,
924 pub project_key: ProjectKey,
926 pub source: BucketSource,
928 pub received_at: DateTime<Utc>,
930 pub sent_at: Option<DateTime<Utc>>,
933}
934
935#[derive(Debug)]
937pub enum MetricData {
938 Raw(Vec<Item>),
940 Parsed(Vec<Bucket>),
942}
943
944impl MetricData {
945 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
950 let items = match self {
951 Self::Parsed(buckets) => return buckets,
952 Self::Raw(items) => items,
953 };
954
955 let mut buckets = Vec::new();
956 for item in items {
957 let payload = item.payload();
958 if item.ty() == &ItemType::Statsd {
959 for bucket_result in Bucket::parse_all(&payload, timestamp) {
960 match bucket_result {
961 Ok(bucket) => buckets.push(bucket),
962 Err(error) => relay_log::debug!(
963 error = &error as &dyn Error,
964 "failed to parse metric bucket from statsd format",
965 ),
966 }
967 }
968 } else if item.ty() == &ItemType::MetricBuckets {
969 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
970 Ok(parsed_buckets) => {
971 if buckets.is_empty() {
973 buckets = parsed_buckets;
974 } else {
975 buckets.extend(parsed_buckets);
976 }
977 }
978 Err(error) => {
979 relay_log::debug!(
980 error = &error as &dyn Error,
981 "failed to parse metric bucket",
982 );
983 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
984 }
985 }
986 } else {
987 relay_log::error!(
988 "invalid item of type {} passed to ProcessMetrics",
989 item.ty()
990 );
991 }
992 }
993 buckets
994 }
995}
996
997#[derive(Debug)]
998pub struct ProcessBatchedMetrics {
999 pub payload: Bytes,
1001 pub source: BucketSource,
1003 pub received_at: DateTime<Utc>,
1005 pub sent_at: Option<DateTime<Utc>>,
1007}
1008
1009#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1011pub enum BucketSource {
1012 Internal,
1018 External,
1023}
1024
1025impl BucketSource {
1026 pub fn from_meta(meta: &RequestMeta) -> Self {
1028 match meta.request_trust() {
1029 RequestTrust::Trusted => Self::Internal,
1030 RequestTrust::Untrusted => Self::External,
1031 }
1032 }
1033}
1034
1035#[derive(Debug)]
1037pub struct SubmitClientReports {
1038 pub client_reports: Vec<ClientReport>,
1040 pub scoping: Scoping,
1042}
1043
1044#[derive(Debug)]
1046pub enum EnvelopeProcessor {
1047 ProcessEnvelope(Box<ProcessEnvelope>),
1048 ProcessProjectMetrics(Box<ProcessMetrics>),
1049 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1050 FlushBuckets(Box<FlushBuckets>),
1051 SubmitClientReports(Box<SubmitClientReports>),
1052}
1053
1054impl EnvelopeProcessor {
1055 pub fn variant(&self) -> &'static str {
1057 match self {
1058 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1059 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1060 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1061 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1062 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1063 }
1064 }
1065}
1066
1067impl relay_system::Interface for EnvelopeProcessor {}
1068
1069impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1070 type Response = relay_system::NoResponse;
1071
1072 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1073 Self::ProcessEnvelope(Box::new(message))
1074 }
1075}
1076
1077impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1078 type Response = NoResponse;
1079
1080 fn from_message(message: ProcessMetrics, _: ()) -> Self {
1081 Self::ProcessProjectMetrics(Box::new(message))
1082 }
1083}
1084
1085impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1086 type Response = NoResponse;
1087
1088 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1089 Self::ProcessBatchedMetrics(Box::new(message))
1090 }
1091}
1092
1093impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1094 type Response = NoResponse;
1095
1096 fn from_message(message: FlushBuckets, _: ()) -> Self {
1097 Self::FlushBuckets(Box::new(message))
1098 }
1099}
1100
1101impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1102 type Response = NoResponse;
1103
1104 fn from_message(message: SubmitClientReports, _: ()) -> Self {
1105 Self::SubmitClientReports(Box::new(message))
1106 }
1107}
1108
1109pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1111
1112#[derive(Clone)]
1116pub struct EnvelopeProcessorService {
1117 inner: Arc<InnerProcessor>,
1118}
1119
1120pub struct Addrs {
1122 pub outcome_aggregator: Addr<TrackOutcome>,
1123 pub upstream_relay: Addr<UpstreamRelay>,
1124 #[cfg(feature = "processing")]
1125 pub upload: Option<Addr<Upload>>,
1126 #[cfg(feature = "processing")]
1127 pub store_forwarder: Option<Addr<Store>>,
1128 pub aggregator: Addr<Aggregator>,
1129 #[cfg(feature = "processing")]
1130 pub global_rate_limits: Option<Addr<GlobalRateLimits>>,
1131}
1132
1133impl Default for Addrs {
1134 fn default() -> Self {
1135 Addrs {
1136 outcome_aggregator: Addr::dummy(),
1137 upstream_relay: Addr::dummy(),
1138 #[cfg(feature = "processing")]
1139 upload: None,
1140 #[cfg(feature = "processing")]
1141 store_forwarder: None,
1142 aggregator: Addr::dummy(),
1143 #[cfg(feature = "processing")]
1144 global_rate_limits: None,
1145 }
1146 }
1147}
1148
1149struct InnerProcessor {
1150 pool: EnvelopeProcessorServicePool,
1151 config: Arc<Config>,
1152 global_config: GlobalConfigHandle,
1153 project_cache: ProjectCacheHandle,
1154 cogs: Cogs,
1155 #[cfg(feature = "processing")]
1156 quotas_client: Option<AsyncRedisClient>,
1157 addrs: Addrs,
1158 #[cfg(feature = "processing")]
1159 rate_limiter: Option<Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>>,
1160 geoip_lookup: GeoIpLookup,
1161 #[cfg(feature = "processing")]
1162 cardinality_limiter: Option<CardinalityLimiter>,
1163 metric_outcomes: MetricOutcomes,
1164 processing: Processing,
1165}
1166
1167struct Processing {
1168 logs: LogsProcessor,
1169 trace_metrics: TraceMetricsProcessor,
1170 spans: SpansProcessor,
1171 check_ins: CheckInsProcessor,
1172 sessions: SessionsProcessor,
1173 trace_attachments: TraceAttachmentsProcessor,
1174}
1175
1176impl EnvelopeProcessorService {
1177 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1179 pub fn new(
1180 pool: EnvelopeProcessorServicePool,
1181 config: Arc<Config>,
1182 global_config: GlobalConfigHandle,
1183 project_cache: ProjectCacheHandle,
1184 cogs: Cogs,
1185 #[cfg(feature = "processing")] redis: Option<RedisClients>,
1186 addrs: Addrs,
1187 metric_outcomes: MetricOutcomes,
1188 ) -> Self {
1189 let geoip_lookup = config
1190 .geoip_path()
1191 .and_then(
1192 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1193 Ok(geoip) => Some(geoip),
1194 Err(err) => {
1195 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1196 None
1197 }
1198 },
1199 )
1200 .unwrap_or_else(GeoIpLookup::empty);
1201
1202 #[cfg(feature = "processing")]
1203 let (cardinality, quotas) = match redis {
1204 Some(RedisClients {
1205 cardinality,
1206 quotas,
1207 ..
1208 }) => (Some(cardinality), Some(quotas)),
1209 None => (None, None),
1210 };
1211
1212 #[cfg(feature = "processing")]
1213 let global_rate_limits = addrs.global_rate_limits.clone().map(Into::into);
1214
1215 #[cfg(feature = "processing")]
1216 let rate_limiter = match (quotas.clone(), global_rate_limits) {
1217 (Some(redis), Some(global)) => Some(
1218 RedisRateLimiter::new(redis, global)
1219 .max_limit(config.max_rate_limit())
1220 .cache(config.quota_cache_ratio(), config.quota_cache_max()),
1221 ),
1222 _ => None,
1223 };
1224
1225 let quota_limiter = Arc::new(QuotaRateLimiter::new(
1226 #[cfg(feature = "processing")]
1227 project_cache.clone(),
1228 #[cfg(feature = "processing")]
1229 rate_limiter.clone(),
1230 ));
1231 #[cfg(feature = "processing")]
1232 let rate_limiter = rate_limiter.map(Arc::new);
1233
1234 let inner = InnerProcessor {
1235 pool,
1236 global_config,
1237 project_cache,
1238 cogs,
1239 #[cfg(feature = "processing")]
1240 quotas_client: quotas.clone(),
1241 #[cfg(feature = "processing")]
1242 rate_limiter,
1243 addrs,
1244 #[cfg(feature = "processing")]
1245 cardinality_limiter: cardinality
1246 .map(|cardinality| {
1247 RedisSetLimiter::new(
1248 RedisSetLimiterOptions {
1249 cache_vacuum_interval: config
1250 .cardinality_limiter_cache_vacuum_interval(),
1251 },
1252 cardinality,
1253 )
1254 })
1255 .map(CardinalityLimiter::new),
1256 metric_outcomes,
1257 processing: Processing {
1258 logs: LogsProcessor::new(Arc::clone("a_limiter)),
1259 trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)),
1260 spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1261 check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)),
1262 sessions: SessionsProcessor::new(Arc::clone("a_limiter)),
1263 trace_attachments: TraceAttachmentsProcessor::new(quota_limiter),
1264 },
1265 geoip_lookup,
1266 config,
1267 };
1268
1269 Self {
1270 inner: Arc::new(inner),
1271 }
1272 }
1273
1274 async fn enforce_quotas<Group>(
1275 &self,
1276 managed_envelope: &mut TypedEnvelope<Group>,
1277 event: Annotated<Event>,
1278 extracted_metrics: &mut ProcessingExtractedMetrics,
1279 ctx: processing::Context<'_>,
1280 ) -> Result<Annotated<Event>, ProcessingError> {
1281 let cached_result = RateLimiter::Cached
1284 .enforce(managed_envelope, event, extracted_metrics, ctx)
1285 .await?;
1286
1287 if_processing!(self.inner.config, {
1288 let rate_limiter = match self.inner.rate_limiter.clone() {
1289 Some(rate_limiter) => rate_limiter,
1290 None => return Ok(cached_result.event),
1291 };
1292
1293 let consistent_result = RateLimiter::Consistent(rate_limiter)
1295 .enforce(
1296 managed_envelope,
1297 cached_result.event,
1298 extracted_metrics,
1299 ctx
1300 )
1301 .await?;
1302
1303 if !consistent_result.rate_limits.is_empty() {
1305 self.inner
1306 .project_cache
1307 .get(managed_envelope.scoping().project_key)
1308 .rate_limits()
1309 .merge(consistent_result.rate_limits);
1310 }
1311
1312 Ok(consistent_result.event)
1313 } else { Ok(cached_result.event) })
1314 }
1315
1316 async fn process_errors(
1318 &self,
1319 managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1320 project_id: ProjectId,
1321 mut ctx: processing::Context<'_>,
1322 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1323 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1324 let mut metrics = Metrics::default();
1325 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1326
1327 report::process_user_reports(managed_envelope);
1329
1330 if_processing!(self.inner.config, {
1331 unreal::expand(managed_envelope, &self.inner.config)?;
1332 #[cfg(sentry)]
1333 playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1334 nnswitch::expand(managed_envelope)?;
1335 });
1336
1337 let extraction_result = event::extract(
1338 managed_envelope,
1339 &mut metrics,
1340 event_fully_normalized,
1341 &self.inner.config,
1342 )?;
1343 let mut event = extraction_result.event;
1344
1345 if_processing!(self.inner.config, {
1346 if let Some(inner_event_fully_normalized) =
1347 unreal::process(managed_envelope, &mut event)?
1348 {
1349 event_fully_normalized = inner_event_fully_normalized;
1350 }
1351 #[cfg(sentry)]
1352 if let Some(inner_event_fully_normalized) =
1353 playstation::process(managed_envelope, &mut event, ctx.project_info)?
1354 {
1355 event_fully_normalized = inner_event_fully_normalized;
1356 }
1357 if let Some(inner_event_fully_normalized) =
1358 attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1359 {
1360 event_fully_normalized = inner_event_fully_normalized;
1361 }
1362 });
1363
1364 ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1365 managed_envelope,
1366 &mut event,
1367 ctx.project_info,
1368 ctx.sampling_project_info,
1369 );
1370
1371 let attachments = managed_envelope
1372 .envelope()
1373 .items()
1374 .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1375 processing::utils::event::finalize(
1376 managed_envelope.envelope().headers(),
1377 &mut event,
1378 attachments,
1379 &mut metrics,
1380 ctx.config,
1381 )?;
1382 event_fully_normalized = processing::utils::event::normalize(
1383 managed_envelope.envelope().headers(),
1384 &mut event,
1385 event_fully_normalized,
1386 project_id,
1387 ctx,
1388 &self.inner.geoip_lookup,
1389 )?;
1390 let filter_run =
1391 processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1392 .map_err(|err| {
1393 managed_envelope.reject(Outcome::Filtered(err.clone()));
1394 ProcessingError::EventFiltered(err)
1395 })?;
1396
1397 if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1398 dynamic_sampling::tag_error_with_sampling_decision(
1399 managed_envelope,
1400 &mut event,
1401 ctx.sampling_project_info,
1402 &self.inner.config,
1403 )
1404 .await;
1405 }
1406
1407 event = self
1408 .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1409 .await?;
1410
1411 if event.value().is_some() {
1412 processing::utils::event::scrub(&mut event, ctx.project_info)?;
1413 event::serialize(
1414 managed_envelope,
1415 &mut event,
1416 event_fully_normalized,
1417 EventMetricsExtracted(false),
1418 SpansExtracted(false),
1419 )?;
1420 event::emit_feedback_metrics(managed_envelope.envelope());
1421 }
1422
1423 let attachments = managed_envelope
1424 .envelope_mut()
1425 .items_mut()
1426 .filter(|i| i.ty() == &ItemType::Attachment);
1427 processing::utils::attachments::scrub(attachments, ctx.project_info);
1428
1429 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1430 relay_log::error!(
1431 tags.project = %project_id,
1432 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1433 "ingested event without normalizing"
1434 );
1435 }
1436
1437 Ok(Some(extracted_metrics))
1438 }
1439
1440 #[allow(unused_assignments)]
1442 async fn process_transactions(
1443 &self,
1444 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1445 cogs: &mut Token,
1446 project_id: ProjectId,
1447 mut ctx: processing::Context<'_>,
1448 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1449 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1450 let mut event_metrics_extracted = EventMetricsExtracted(false);
1451 let mut spans_extracted = SpansExtracted(false);
1452 let mut metrics = Metrics::default();
1453 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1454
1455 let extraction_result = event::extract(
1457 managed_envelope,
1458 &mut metrics,
1459 event_fully_normalized,
1460 &self.inner.config,
1461 )?;
1462
1463 if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1465 event_metrics_extracted = inner_event_metrics_extracted;
1466 }
1467 if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1468 spans_extracted = inner_spans_extracted;
1469 };
1470
1471 let mut event = extraction_result.event;
1473
1474 let profile_id = profile::filter(
1475 managed_envelope,
1476 &event,
1477 ctx.config,
1478 project_id,
1479 ctx.project_info,
1480 );
1481 processing::transactions::profile::transfer_id(&mut event, profile_id);
1482 processing::transactions::profile::remove_context_if_rate_limited(
1483 &mut event,
1484 managed_envelope.scoping(),
1485 ctx,
1486 );
1487
1488 ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1489 managed_envelope,
1490 &mut event,
1491 ctx.project_info,
1492 ctx.sampling_project_info,
1493 );
1494
1495 let attachments = managed_envelope
1496 .envelope()
1497 .items()
1498 .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1499 processing::utils::event::finalize(
1500 managed_envelope.envelope().headers(),
1501 &mut event,
1502 attachments,
1503 &mut metrics,
1504 &self.inner.config,
1505 )?;
1506
1507 event_fully_normalized = processing::utils::event::normalize(
1508 managed_envelope.envelope().headers(),
1509 &mut event,
1510 event_fully_normalized,
1511 project_id,
1512 ctx,
1513 &self.inner.geoip_lookup,
1514 )?;
1515
1516 let filter_run =
1517 processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1518 .map_err(|err| {
1519 managed_envelope.reject(Outcome::Filtered(err.clone()));
1520 ProcessingError::EventFiltered(err)
1521 })?;
1522
1523 let run_dynamic_sampling = (matches!(filter_run, FiltersStatus::Ok)
1527 || self.inner.config.processing_enabled())
1528 && matches!(&ctx.project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled());
1529
1530 let sampling_result = match run_dynamic_sampling {
1531 true => {
1532 #[allow(unused_mut)]
1533 let mut reservoir = ReservoirEvaluator::new(Arc::clone(ctx.reservoir_counters));
1534 #[cfg(feature = "processing")]
1535 if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
1536 reservoir.set_redis(managed_envelope.scoping().organization_id, quotas_client);
1537 }
1538 processing::utils::dynamic_sampling::run(
1539 managed_envelope.envelope().headers().dsc(),
1540 event.value(),
1541 &ctx,
1542 Some(&reservoir),
1543 )
1544 .await
1545 }
1546 false => SamplingResult::Pending,
1547 };
1548
1549 relay_statsd::metric!(
1550 counter(RelayCounters::SamplingDecision) += 1,
1551 decision = sampling_result.decision().as_str(),
1552 item = "transaction"
1553 );
1554
1555 #[cfg(feature = "processing")]
1556 let server_sample_rate = sampling_result.sample_rate();
1557
1558 if let Some(outcome) = sampling_result.into_dropped_outcome() {
1559 profile::process(
1562 managed_envelope,
1563 &mut event,
1564 ctx.global_config,
1565 ctx.config,
1566 ctx.project_info,
1567 );
1568 event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1570 &mut event,
1571 &mut extracted_metrics,
1572 ExtractMetricsContext {
1573 dsc: managed_envelope.envelope().dsc(),
1574 project_id,
1575 ctx,
1576 sampling_decision: SamplingDecision::Drop,
1577 metrics_extracted: event_metrics_extracted.0,
1578 spans_extracted: spans_extracted.0,
1579 },
1580 )?;
1581
1582 dynamic_sampling::drop_unsampled_items(
1583 managed_envelope,
1584 event,
1585 outcome,
1586 spans_extracted,
1587 );
1588
1589 event = self
1594 .enforce_quotas(
1595 managed_envelope,
1596 Annotated::empty(),
1597 &mut extracted_metrics,
1598 ctx,
1599 )
1600 .await?;
1601
1602 return Ok(Some(extracted_metrics));
1603 }
1604
1605 let _post_ds = cogs.start_category("post_ds");
1606
1607 processing::utils::event::scrub(&mut event, ctx.project_info)?;
1611
1612 let attachments = managed_envelope
1613 .envelope_mut()
1614 .items_mut()
1615 .filter(|i| i.ty() == &ItemType::Attachment);
1616 processing::utils::attachments::scrub(attachments, ctx.project_info);
1617
1618 if_processing!(self.inner.config, {
1619 let profile_id = profile::process(
1621 managed_envelope,
1622 &mut event,
1623 ctx.global_config,
1624 ctx.config,
1625 ctx.project_info,
1626 );
1627 processing::transactions::profile::transfer_id(&mut event, profile_id);
1628 processing::transactions::profile::scrub_profiler_id(&mut event);
1629
1630 event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1632 &mut event,
1633 &mut extracted_metrics,
1634 ExtractMetricsContext {
1635 dsc: managed_envelope.envelope().dsc(),
1636 project_id,
1637 ctx,
1638 sampling_decision: SamplingDecision::Keep,
1639 metrics_extracted: event_metrics_extracted.0,
1640 spans_extracted: spans_extracted.0,
1641 },
1642 )?;
1643
1644 if let Some(spans) = processing::transactions::spans::extract_from_event(
1645 managed_envelope.envelope().dsc(),
1646 &event,
1647 ctx.global_config,
1648 ctx.config,
1649 server_sample_rate,
1650 event_metrics_extracted,
1651 spans_extracted,
1652 ) {
1653 spans_extracted = SpansExtracted(true);
1654 for item in spans {
1655 match item {
1656 Ok(item) => managed_envelope.envelope_mut().add_item(item),
1657 Err(()) => managed_envelope.track_outcome(
1658 Outcome::Invalid(DiscardReason::InvalidSpan),
1659 DataCategory::SpanIndexed,
1660 1,
1661 ),
1662 }
1664 }
1665 }
1666 });
1667
1668 event = self
1669 .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1670 .await?;
1671
1672 if event.value().is_some() {
1674 event::serialize(
1675 managed_envelope,
1676 &mut event,
1677 event_fully_normalized,
1678 event_metrics_extracted,
1679 spans_extracted,
1680 )?;
1681 }
1682
1683 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1684 relay_log::error!(
1685 tags.project = %project_id,
1686 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1687 "ingested event without normalizing"
1688 );
1689 };
1690
1691 Ok(Some(extracted_metrics))
1692 }
1693
1694 async fn process_profile_chunks(
1695 &self,
1696 managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
1697 ctx: processing::Context<'_>,
1698 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1699 profile_chunk::filter(managed_envelope, ctx.project_info);
1700
1701 if_processing!(self.inner.config, {
1702 profile_chunk::process(
1703 managed_envelope,
1704 ctx.project_info,
1705 ctx.global_config,
1706 ctx.config,
1707 );
1708 });
1709
1710 self.enforce_quotas(
1711 managed_envelope,
1712 Annotated::empty(),
1713 &mut ProcessingExtractedMetrics::new(),
1714 ctx,
1715 )
1716 .await?;
1717
1718 Ok(None)
1719 }
1720
1721 async fn process_standalone(
1723 &self,
1724 managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1725 project_id: ProjectId,
1726 ctx: processing::Context<'_>,
1727 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1728 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1729
1730 standalone::process(managed_envelope);
1731
1732 profile::filter(
1733 managed_envelope,
1734 &Annotated::empty(),
1735 ctx.config,
1736 project_id,
1737 ctx.project_info,
1738 );
1739
1740 self.enforce_quotas(
1741 managed_envelope,
1742 Annotated::empty(),
1743 &mut extracted_metrics,
1744 ctx,
1745 )
1746 .await?;
1747
1748 report::process_user_reports(managed_envelope);
1749 let attachments = managed_envelope
1750 .envelope_mut()
1751 .items_mut()
1752 .filter(|i| i.ty() == &ItemType::Attachment);
1753 processing::utils::attachments::scrub(attachments, ctx.project_info);
1754
1755 Ok(Some(extracted_metrics))
1756 }
1757
1758 async fn process_client_reports(
1760 &self,
1761 managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1762 ctx: processing::Context<'_>,
1763 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1764 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1765
1766 self.enforce_quotas(
1767 managed_envelope,
1768 Annotated::empty(),
1769 &mut extracted_metrics,
1770 ctx,
1771 )
1772 .await?;
1773
1774 report::process_client_reports(
1775 managed_envelope,
1776 ctx.config,
1777 ctx.project_info,
1778 self.inner.addrs.outcome_aggregator.clone(),
1779 );
1780
1781 Ok(Some(extracted_metrics))
1782 }
1783
1784 async fn process_replays(
1786 &self,
1787 managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1788 ctx: processing::Context<'_>,
1789 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1790 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1791
1792 replay::process(
1793 managed_envelope,
1794 ctx.global_config,
1795 ctx.config,
1796 ctx.project_info,
1797 &self.inner.geoip_lookup,
1798 )?;
1799
1800 self.enforce_quotas(
1801 managed_envelope,
1802 Annotated::empty(),
1803 &mut extracted_metrics,
1804 ctx,
1805 )
1806 .await?;
1807
1808 Ok(Some(extracted_metrics))
1809 }
1810
1811 async fn process_nel(
1812 &self,
1813 mut managed_envelope: ManagedEnvelope,
1814 ctx: processing::Context<'_>,
1815 ) -> Result<ProcessingResult, ProcessingError> {
1816 nel::convert_to_logs(&mut managed_envelope);
1817 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1818 .await
1819 }
1820
1821 async fn process_with_processor<P: processing::Processor>(
1822 &self,
1823 processor: &P,
1824 mut managed_envelope: ManagedEnvelope,
1825 ctx: processing::Context<'_>,
1826 ) -> Result<ProcessingResult, ProcessingError>
1827 where
1828 Outputs: From<P::Output>,
1829 {
1830 let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1831 debug_assert!(
1832 false,
1833 "there must be work for the {} processor",
1834 std::any::type_name::<P>(),
1835 );
1836 return Err(ProcessingError::ProcessingGroupMismatch);
1837 };
1838
1839 managed_envelope.update();
1840 match managed_envelope.envelope().is_empty() {
1841 true => managed_envelope.accept(),
1842 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1843 }
1844
1845 processor
1846 .process(work, ctx)
1847 .await
1848 .map_err(|err| {
1849 relay_log::debug!(
1850 error = &err as &dyn std::error::Error,
1851 "processing pipeline failed"
1852 );
1853 ProcessingError::ProcessingFailure
1854 })
1855 .map(|o| o.map(Into::into))
1856 .map(ProcessingResult::Output)
1857 }
1858
1859 async fn process_standalone_spans(
1863 &self,
1864 managed_envelope: &mut TypedEnvelope<SpanGroup>,
1865 _project_id: ProjectId,
1866 ctx: processing::Context<'_>,
1867 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1868 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1869
1870 span::filter(managed_envelope, ctx.config, ctx.project_info);
1871 span::convert_otel_traces_data(managed_envelope);
1872
1873 if_processing!(self.inner.config, {
1874 span::process(
1875 managed_envelope,
1876 &mut Annotated::empty(),
1877 &mut extracted_metrics,
1878 _project_id,
1879 ctx,
1880 &self.inner.geoip_lookup,
1881 )
1882 .await;
1883 });
1884
1885 self.enforce_quotas(
1886 managed_envelope,
1887 Annotated::empty(),
1888 &mut extracted_metrics,
1889 ctx,
1890 )
1891 .await?;
1892
1893 Ok(Some(extracted_metrics))
1894 }
1895
1896 async fn process_envelope(
1897 &self,
1898 cogs: &mut Token,
1899 project_id: ProjectId,
1900 message: ProcessEnvelopeGrouped<'_>,
1901 ) -> Result<ProcessingResult, ProcessingError> {
1902 let ProcessEnvelopeGrouped {
1903 group,
1904 envelope: mut managed_envelope,
1905 ctx,
1906 } = message;
1907
1908 if let Some(sampling_state) = ctx.sampling_project_info {
1910 managed_envelope
1913 .envelope_mut()
1914 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1915 }
1916
1917 if let Some(retention) = ctx.project_info.config.event_retention {
1920 managed_envelope.envelope_mut().set_retention(retention);
1921 }
1922
1923 if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1926 managed_envelope
1927 .envelope_mut()
1928 .set_downsampled_retention(retention);
1929 }
1930
1931 managed_envelope
1936 .envelope_mut()
1937 .meta_mut()
1938 .set_project_id(project_id);
1939
1940 macro_rules! run {
1941 ($fn_name:ident $(, $args:expr)*) => {
1942 async {
1943 let mut managed_envelope = (managed_envelope, group).try_into()?;
1944 match self.$fn_name(&mut managed_envelope, $($args),*).await {
1945 Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1946 managed_envelope: managed_envelope.into_processed(),
1947 extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1948 }),
1949 Err(error) => {
1950 relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1951 if let Some(outcome) = error.to_outcome() {
1952 managed_envelope.reject(outcome);
1953 }
1954
1955 return Err(error);
1956 }
1957 }
1958 }.await
1959 };
1960 }
1961
1962 relay_log::trace!("Processing {group} group", group = group.variant());
1963
1964 match group {
1965 ProcessingGroup::Error => run!(process_errors, project_id, ctx),
1966 ProcessingGroup::Transaction => {
1967 run!(process_transactions, cogs, project_id, ctx)
1968 }
1969 ProcessingGroup::Session => {
1970 self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1971 .await
1972 }
1973 ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx),
1974 ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
1975 ProcessingGroup::Replay => {
1976 run!(process_replays, ctx)
1977 }
1978 ProcessingGroup::CheckIn => {
1979 self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1980 .await
1981 }
1982 ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1983 ProcessingGroup::Log => {
1984 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1985 .await
1986 }
1987 ProcessingGroup::TraceMetric => {
1988 self.process_with_processor(
1989 &self.inner.processing.trace_metrics,
1990 managed_envelope,
1991 ctx,
1992 )
1993 .await
1994 }
1995 ProcessingGroup::SpanV2 => {
1996 self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1997 .await
1998 }
1999 ProcessingGroup::TraceAttachment => {
2000 self.process_with_processor(
2001 &self.inner.processing.trace_attachments,
2002 managed_envelope,
2003 ctx,
2004 )
2005 .await
2006 }
2007 ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
2008 ProcessingGroup::ProfileChunk => {
2009 run!(process_profile_chunks, ctx)
2010 }
2011 ProcessingGroup::Metrics => {
2013 if self.inner.config.relay_mode() != RelayMode::Proxy {
2016 relay_log::error!(
2017 tags.project = %project_id,
2018 items = ?managed_envelope.envelope().items().next().map(Item::ty),
2019 "received metrics in the process_state"
2020 );
2021 }
2022
2023 Ok(ProcessingResult::no_metrics(
2024 managed_envelope.into_processed(),
2025 ))
2026 }
2027 ProcessingGroup::Ungrouped => {
2029 relay_log::error!(
2030 tags.project = %project_id,
2031 items = ?managed_envelope.envelope().items().next().map(Item::ty),
2032 "could not identify the processing group based on the envelope's items"
2033 );
2034
2035 Ok(ProcessingResult::no_metrics(
2036 managed_envelope.into_processed(),
2037 ))
2038 }
2039 ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
2043 managed_envelope.into_processed(),
2044 )),
2045 }
2046 }
2047
2048 async fn process<'a>(
2054 &self,
2055 cogs: &mut Token,
2056 mut message: ProcessEnvelopeGrouped<'a>,
2057 ) -> Result<Option<Submit<'a>>, ProcessingError> {
2058 let ProcessEnvelopeGrouped {
2059 ref mut envelope,
2060 ctx,
2061 ..
2062 } = message;
2063
2064 let Some(project_id) = ctx
2071 .project_info
2072 .project_id
2073 .or_else(|| envelope.envelope().meta().project_id())
2074 else {
2075 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2076 return Err(ProcessingError::MissingProjectId);
2077 };
2078
2079 let client = envelope.envelope().meta().client().map(str::to_owned);
2080 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2081 let project_key = envelope.envelope().meta().public_key();
2082 let sampling_key = envelope
2086 .envelope()
2087 .sampling_key()
2088 .filter(|_| ctx.sampling_project_info.is_some());
2089
2090 relay_log::configure_scope(|scope| {
2093 scope.set_tag("project", project_id);
2094 if let Some(client) = client {
2095 scope.set_tag("sdk", client);
2096 }
2097 if let Some(user_agent) = user_agent {
2098 scope.set_extra("user_agent", user_agent.into());
2099 }
2100 });
2101
2102 let result = match self.process_envelope(cogs, project_id, message).await {
2103 Ok(ProcessingResult::Envelope {
2104 mut managed_envelope,
2105 extracted_metrics,
2106 }) => {
2107 managed_envelope.update();
2110
2111 let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2112 send_metrics(
2113 extracted_metrics.metrics,
2114 project_key,
2115 sampling_key,
2116 &self.inner.addrs.aggregator,
2117 );
2118
2119 let envelope_response = if managed_envelope.envelope().is_empty() {
2120 if !has_metrics {
2121 managed_envelope.reject(Outcome::RateLimited(None));
2123 } else {
2124 managed_envelope.accept();
2125 }
2126
2127 None
2128 } else {
2129 Some(managed_envelope)
2130 };
2131
2132 Ok(envelope_response.map(Submit::Envelope))
2133 }
2134 Ok(ProcessingResult::Output(Output { main, metrics })) => {
2135 if let Some(metrics) = metrics {
2136 metrics.accept(|metrics| {
2137 send_metrics(
2138 metrics,
2139 project_key,
2140 sampling_key,
2141 &self.inner.addrs.aggregator,
2142 );
2143 });
2144 }
2145
2146 let ctx = ctx.to_forward();
2147 Ok(main.map(|output| Submit::Output { output, ctx }))
2148 }
2149 Err(err) => Err(err),
2150 };
2151
2152 relay_log::configure_scope(|scope| {
2153 scope.remove_tag("project");
2154 scope.remove_tag("sdk");
2155 scope.remove_tag("user_agent");
2156 });
2157
2158 result
2159 }
2160
2161 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2162 let project_key = message.envelope.envelope().meta().public_key();
2163 let wait_time = message.envelope.age();
2164 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2165
2166 cogs.cancel();
2169
2170 let scoping = message.envelope.scoping();
2171 for (group, envelope) in ProcessingGroup::split_envelope(
2172 *message.envelope.into_envelope(),
2173 &message.project_info,
2174 ) {
2175 let mut cogs = self
2176 .inner
2177 .cogs
2178 .timed(ResourceId::Relay, AppFeature::from(group));
2179
2180 let mut envelope =
2181 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2182 envelope.scope(scoping);
2183
2184 let global_config = self.inner.global_config.current();
2185
2186 let ctx = processing::Context {
2187 config: &self.inner.config,
2188 global_config: &global_config,
2189 project_info: &message.project_info,
2190 sampling_project_info: message.sampling_project_info.as_deref(),
2191 rate_limits: &message.rate_limits,
2192 reservoir_counters: &message.reservoir_counters,
2193 };
2194
2195 let message = ProcessEnvelopeGrouped {
2196 group,
2197 envelope,
2198 ctx,
2199 };
2200
2201 let result = metric!(
2202 timer(RelayTimers::EnvelopeProcessingTime),
2203 group = group.variant(),
2204 { self.process(&mut cogs, message).await }
2205 );
2206
2207 match result {
2208 Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2209 Ok(None) => {}
2210 Err(error) if error.is_unexpected() => {
2211 relay_log::error!(
2212 tags.project_key = %project_key,
2213 error = &error as &dyn Error,
2214 "error processing envelope"
2215 )
2216 }
2217 Err(error) => {
2218 relay_log::debug!(
2219 tags.project_key = %project_key,
2220 error = &error as &dyn Error,
2221 "error processing envelope"
2222 )
2223 }
2224 }
2225 }
2226 }
2227
2228 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2229 let ProcessMetrics {
2230 data,
2231 project_key,
2232 received_at,
2233 sent_at,
2234 source,
2235 } = message;
2236
2237 let received_timestamp =
2238 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2239
2240 let mut buckets = data.into_buckets(received_timestamp);
2241 if buckets.is_empty() {
2242 return;
2243 };
2244 cogs.update(relay_metrics::cogs::BySize(&buckets));
2245
2246 let clock_drift_processor =
2247 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2248
2249 buckets.retain_mut(|bucket| {
2250 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2251 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2252 return false;
2253 }
2254
2255 if !self::metrics::is_valid_namespace(bucket) {
2256 return false;
2257 }
2258
2259 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2260
2261 if !matches!(source, BucketSource::Internal) {
2262 bucket.metadata = BucketMetadata::new(received_timestamp);
2263 }
2264
2265 true
2266 });
2267
2268 let project = self.inner.project_cache.get(project_key);
2269
2270 let buckets = match project.state() {
2273 ProjectState::Enabled(project_info) => {
2274 let rate_limits = project.rate_limits().current_limits();
2275 self.check_buckets(project_key, project_info, &rate_limits, buckets)
2276 }
2277 _ => buckets,
2278 };
2279
2280 relay_log::trace!("merging metric buckets into the aggregator");
2281 self.inner
2282 .addrs
2283 .aggregator
2284 .send(MergeBuckets::new(project_key, buckets));
2285 }
2286
2287 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2288 let ProcessBatchedMetrics {
2289 payload,
2290 source,
2291 received_at,
2292 sent_at,
2293 } = message;
2294
2295 #[derive(serde::Deserialize)]
2296 struct Wrapper {
2297 buckets: HashMap<ProjectKey, Vec<Bucket>>,
2298 }
2299
2300 let buckets = match serde_json::from_slice(&payload) {
2301 Ok(Wrapper { buckets }) => buckets,
2302 Err(error) => {
2303 relay_log::debug!(
2304 error = &error as &dyn Error,
2305 "failed to parse batched metrics",
2306 );
2307 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2308 return;
2309 }
2310 };
2311
2312 for (project_key, buckets) in buckets {
2313 self.handle_process_metrics(
2314 cogs,
2315 ProcessMetrics {
2316 data: MetricData::Parsed(buckets),
2317 project_key,
2318 source,
2319 received_at,
2320 sent_at,
2321 },
2322 )
2323 }
2324 }
2325
2326 fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2327 let _submit = cogs.start_category("submit");
2328
2329 #[cfg(feature = "processing")]
2330 if self.inner.config.processing_enabled()
2331 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2332 {
2333 use crate::processing::StoreHandle;
2334
2335 let upload = self.inner.addrs.upload.as_ref();
2336 match submit {
2337 Submit::Envelope(envelope) => {
2338 let envelope_has_attachments = envelope
2339 .envelope()
2340 .items()
2341 .any(|item| *item.ty() == ItemType::Attachment);
2342 let use_objectstore = || {
2344 let options = &self.inner.global_config.current().options;
2345 utils::sample(options.objectstore_attachments_sample_rate).is_keep()
2346 };
2347
2348 if let Some(upload) = &self.inner.addrs.upload
2349 && envelope_has_attachments
2350 && use_objectstore()
2351 {
2352 upload.send(StoreEnvelope { envelope })
2354 } else {
2355 store_forwarder.send(StoreEnvelope { envelope })
2356 }
2357 }
2358 Submit::Output { output, ctx } => output
2359 .forward_store(StoreHandle::new(store_forwarder, upload), ctx)
2360 .unwrap_or_else(|err| err.into_inner()),
2361 }
2362 return;
2363 }
2364
2365 let mut envelope = match submit {
2366 Submit::Envelope(envelope) => envelope,
2367 Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2368 Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2369 Err(_) => {
2370 relay_log::error!("failed to serialize output to an envelope");
2371 return;
2372 }
2373 },
2374 };
2375
2376 if envelope.envelope_mut().is_empty() {
2377 envelope.accept();
2378 return;
2379 }
2380
2381 envelope.envelope_mut().set_sent_at(Utc::now());
2387
2388 relay_log::trace!("sending envelope to sentry endpoint");
2389 let http_encoding = self.inner.config.http_encoding();
2390 let result = envelope.envelope().to_vec().and_then(|v| {
2391 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2392 });
2393
2394 match result {
2395 Ok(body) => {
2396 self.inner
2397 .addrs
2398 .upstream_relay
2399 .send(SendRequest(SendEnvelope {
2400 envelope,
2401 body,
2402 http_encoding,
2403 project_cache: self.inner.project_cache.clone(),
2404 }));
2405 }
2406 Err(error) => {
2407 relay_log::error!(
2410 error = &error as &dyn Error,
2411 tags.project_key = %envelope.scoping().project_key,
2412 "failed to serialize envelope payload"
2413 );
2414
2415 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2416 }
2417 }
2418 }
2419
2420 fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2421 let SubmitClientReports {
2422 client_reports,
2423 scoping,
2424 } = message;
2425
2426 let upstream = self.inner.config.upstream_descriptor();
2427 let dsn = PartialDsn::outbound(&scoping, upstream);
2428
2429 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2430 for client_report in client_reports {
2431 match client_report.serialize() {
2432 Ok(payload) => {
2433 let mut item = Item::new(ItemType::ClientReport);
2434 item.set_payload(ContentType::Json, payload);
2435 envelope.add_item(item);
2436 }
2437 Err(error) => {
2438 relay_log::error!(
2439 error = &error as &dyn std::error::Error,
2440 "failed to serialize client report"
2441 );
2442 }
2443 }
2444 }
2445
2446 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2447 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2448 }
2449
2450 fn check_buckets(
2451 &self,
2452 project_key: ProjectKey,
2453 project_info: &ProjectInfo,
2454 rate_limits: &RateLimits,
2455 buckets: Vec<Bucket>,
2456 ) -> Vec<Bucket> {
2457 let Some(scoping) = project_info.scoping(project_key) else {
2458 relay_log::error!(
2459 tags.project_key = project_key.as_str(),
2460 "there is no scoping: dropping {} buckets",
2461 buckets.len(),
2462 );
2463 return Vec::new();
2464 };
2465
2466 let mut buckets = self::metrics::apply_project_info(
2467 buckets,
2468 &self.inner.metric_outcomes,
2469 project_info,
2470 scoping,
2471 );
2472
2473 let namespaces: BTreeSet<MetricNamespace> = buckets
2474 .iter()
2475 .filter_map(|bucket| bucket.name.try_namespace())
2476 .collect();
2477
2478 for namespace in namespaces {
2479 let limits = rate_limits.check_with_quotas(
2480 project_info.get_quotas(),
2481 scoping.item(DataCategory::MetricBucket),
2482 );
2483
2484 if limits.is_limited() {
2485 let rejected;
2486 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2487 bucket.name.try_namespace() == Some(namespace)
2488 });
2489
2490 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2491 self.inner.metric_outcomes.track(
2492 scoping,
2493 &rejected,
2494 Outcome::RateLimited(reason_code),
2495 );
2496 }
2497 }
2498
2499 let quotas = project_info.config.quotas.clone();
2500 match MetricsLimiter::create(buckets, quotas, scoping) {
2501 Ok(mut bucket_limiter) => {
2502 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2503 bucket_limiter.into_buckets()
2504 }
2505 Err(buckets) => buckets,
2506 }
2507 }
2508
2509 #[cfg(feature = "processing")]
2510 async fn rate_limit_buckets(
2511 &self,
2512 scoping: Scoping,
2513 project_info: &ProjectInfo,
2514 mut buckets: Vec<Bucket>,
2515 ) -> Vec<Bucket> {
2516 let Some(rate_limiter) = &self.inner.rate_limiter else {
2517 return buckets;
2518 };
2519
2520 let global_config = self.inner.global_config.current();
2521 let namespaces = buckets
2522 .iter()
2523 .filter_map(|bucket| bucket.name.try_namespace())
2524 .counts();
2525
2526 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2527
2528 for (namespace, quantity) in namespaces {
2529 let item_scoping = scoping.metric_bucket(namespace);
2530
2531 let limits = match rate_limiter
2532 .is_rate_limited(quotas, item_scoping, quantity, false)
2533 .await
2534 {
2535 Ok(limits) => limits,
2536 Err(err) => {
2537 relay_log::error!(
2538 error = &err as &dyn std::error::Error,
2539 "failed to check redis rate limits"
2540 );
2541 break;
2542 }
2543 };
2544
2545 if limits.is_limited() {
2546 let rejected;
2547 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2548 bucket.name.try_namespace() == Some(namespace)
2549 });
2550
2551 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2552 self.inner.metric_outcomes.track(
2553 scoping,
2554 &rejected,
2555 Outcome::RateLimited(reason_code),
2556 );
2557
2558 self.inner
2559 .project_cache
2560 .get(item_scoping.scoping.project_key)
2561 .rate_limits()
2562 .merge(limits);
2563 }
2564 }
2565
2566 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2567 Err(buckets) => buckets,
2568 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2569 }
2570 }
2571
2572 #[cfg(feature = "processing")]
2574 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2575 relay_log::trace!("handle_rate_limit_buckets");
2576
2577 let scoping = *bucket_limiter.scoping();
2578
2579 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2580 let global_config = self.inner.global_config.current();
2581 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2582
2583 let over_accept_once = true;
2586 let mut rate_limits = RateLimits::new();
2587
2588 for category in [DataCategory::Transaction, DataCategory::Span] {
2589 let count = bucket_limiter.count(category);
2590
2591 let timer = Instant::now();
2592 let mut is_limited = false;
2593
2594 if let Some(count) = count {
2595 match rate_limiter
2596 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2597 .await
2598 {
2599 Ok(limits) => {
2600 is_limited = limits.is_limited();
2601 rate_limits.merge(limits)
2602 }
2603 Err(e) => relay_log::error!(error = &e as &dyn Error),
2604 }
2605 }
2606
2607 relay_statsd::metric!(
2608 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2609 category = category.name(),
2610 limited = if is_limited { "true" } else { "false" },
2611 count = match count {
2612 None => "none",
2613 Some(0) => "0",
2614 Some(1) => "1",
2615 Some(1..=10) => "10",
2616 Some(1..=25) => "25",
2617 Some(1..=50) => "50",
2618 Some(51..=100) => "100",
2619 Some(101..=500) => "500",
2620 _ => "> 500",
2621 },
2622 );
2623 }
2624
2625 if rate_limits.is_limited() {
2626 let was_enforced =
2627 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2628
2629 if was_enforced {
2630 self.inner
2632 .project_cache
2633 .get(scoping.project_key)
2634 .rate_limits()
2635 .merge(rate_limits);
2636 }
2637 }
2638 }
2639
2640 bucket_limiter.into_buckets()
2641 }
2642
2643 #[cfg(feature = "processing")]
2645 async fn cardinality_limit_buckets(
2646 &self,
2647 scoping: Scoping,
2648 limits: &[CardinalityLimit],
2649 buckets: Vec<Bucket>,
2650 ) -> Vec<Bucket> {
2651 let global_config = self.inner.global_config.current();
2652 let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2653
2654 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2655 return buckets;
2656 }
2657
2658 let Some(ref limiter) = self.inner.cardinality_limiter else {
2659 return buckets;
2660 };
2661
2662 let scope = relay_cardinality::Scoping {
2663 organization_id: scoping.organization_id,
2664 project_id: scoping.project_id,
2665 };
2666
2667 let limits = match limiter
2668 .check_cardinality_limits(scope, limits, buckets)
2669 .await
2670 {
2671 Ok(limits) => limits,
2672 Err((buckets, error)) => {
2673 relay_log::error!(
2674 error = &error as &dyn std::error::Error,
2675 "cardinality limiter failed"
2676 );
2677 return buckets;
2678 }
2679 };
2680
2681 let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2682 if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2683 for limit in limits.exceeded_limits() {
2684 relay_log::with_scope(
2685 |scope| {
2686 scope.set_user(Some(relay_log::sentry::User {
2688 id: Some(scoping.organization_id.to_string()),
2689 ..Default::default()
2690 }));
2691 },
2692 || {
2693 relay_log::error!(
2694 tags.organization_id = scoping.organization_id.value(),
2695 tags.limit_id = limit.id,
2696 tags.passive = limit.passive,
2697 "Cardinality Limit"
2698 );
2699 },
2700 );
2701 }
2702 }
2703
2704 for (limit, reports) in limits.cardinality_reports() {
2705 for report in reports {
2706 self.inner
2707 .metric_outcomes
2708 .cardinality(scoping, limit, report);
2709 }
2710 }
2711
2712 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2713 return limits.into_source();
2714 }
2715
2716 let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2717
2718 for (bucket, exceeded) in rejected {
2719 self.inner.metric_outcomes.track(
2720 scoping,
2721 &[bucket],
2722 Outcome::CardinalityLimited(exceeded.id.clone()),
2723 );
2724 }
2725 accepted
2726 }
2727
2728 #[cfg(feature = "processing")]
2735 async fn encode_metrics_processing(
2736 &self,
2737 message: FlushBuckets,
2738 store_forwarder: &Addr<Store>,
2739 ) {
2740 use crate::constants::DEFAULT_EVENT_RETENTION;
2741 use crate::services::store::StoreMetrics;
2742
2743 for ProjectBuckets {
2744 buckets,
2745 scoping,
2746 project_info,
2747 ..
2748 } in message.buckets.into_values()
2749 {
2750 let buckets = self
2751 .rate_limit_buckets(scoping, &project_info, buckets)
2752 .await;
2753
2754 let limits = project_info.get_cardinality_limits();
2755 let buckets = self
2756 .cardinality_limit_buckets(scoping, limits, buckets)
2757 .await;
2758
2759 if buckets.is_empty() {
2760 continue;
2761 }
2762
2763 let retention = project_info
2764 .config
2765 .event_retention
2766 .unwrap_or(DEFAULT_EVENT_RETENTION);
2767
2768 store_forwarder.send(StoreMetrics {
2771 buckets,
2772 scoping,
2773 retention,
2774 });
2775 }
2776 }
2777
2778 fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2790 let FlushBuckets {
2791 partition_key,
2792 buckets,
2793 } = message;
2794
2795 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2796 let upstream = self.inner.config.upstream_descriptor();
2797
2798 for ProjectBuckets {
2799 buckets, scoping, ..
2800 } in buckets.values()
2801 {
2802 let dsn = PartialDsn::outbound(scoping, upstream);
2803
2804 relay_statsd::metric!(
2805 distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2806 );
2807
2808 let mut num_batches = 0;
2809 for batch in BucketsView::from(buckets).by_size(batch_size) {
2810 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2811
2812 let mut item = Item::new(ItemType::MetricBuckets);
2813 item.set_source_quantities(crate::metrics::extract_quantities(batch));
2814 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2815 envelope.add_item(item);
2816
2817 let mut envelope =
2818 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2819 envelope
2820 .set_partition_key(Some(partition_key))
2821 .scope(*scoping);
2822
2823 relay_statsd::metric!(
2824 distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2825 );
2826
2827 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2828 num_batches += 1;
2829 }
2830
2831 relay_statsd::metric!(
2832 distribution(RelayDistributions::BatchesPerPartition) = num_batches
2833 );
2834 }
2835 }
2836
2837 fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2839 if partition.is_empty() {
2840 return;
2841 }
2842
2843 let (unencoded, project_info) = partition.take();
2844 let http_encoding = self.inner.config.http_encoding();
2845 let encoded = match encode_payload(&unencoded, http_encoding) {
2846 Ok(payload) => payload,
2847 Err(error) => {
2848 let error = &error as &dyn std::error::Error;
2849 relay_log::error!(error, "failed to encode metrics payload");
2850 return;
2851 }
2852 };
2853
2854 let request = SendMetricsRequest {
2855 partition_key: partition_key.to_string(),
2856 unencoded,
2857 encoded,
2858 project_info,
2859 http_encoding,
2860 metric_outcomes: self.inner.metric_outcomes.clone(),
2861 };
2862
2863 self.inner.addrs.upstream_relay.send(SendRequest(request));
2864 }
2865
2866 fn encode_metrics_global(&self, message: FlushBuckets) {
2881 let FlushBuckets {
2882 partition_key,
2883 buckets,
2884 } = message;
2885
2886 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2887 let mut partition = Partition::new(batch_size);
2888 let mut partition_splits = 0;
2889
2890 for ProjectBuckets {
2891 buckets, scoping, ..
2892 } in buckets.values()
2893 {
2894 for bucket in buckets {
2895 let mut remaining = Some(BucketView::new(bucket));
2896
2897 while let Some(bucket) = remaining.take() {
2898 if let Some(next) = partition.insert(bucket, *scoping) {
2899 self.send_global_partition(partition_key, &mut partition);
2903 remaining = Some(next);
2904 partition_splits += 1;
2905 }
2906 }
2907 }
2908 }
2909
2910 if partition_splits > 0 {
2911 metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2912 }
2913
2914 self.send_global_partition(partition_key, &mut partition);
2915 }
2916
2917 async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2918 for (project_key, pb) in message.buckets.iter_mut() {
2919 let buckets = std::mem::take(&mut pb.buckets);
2920 pb.buckets =
2921 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2922 }
2923
2924 #[cfg(feature = "processing")]
2925 if self.inner.config.processing_enabled()
2926 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2927 {
2928 return self
2929 .encode_metrics_processing(message, store_forwarder)
2930 .await;
2931 }
2932
2933 if self.inner.config.http_global_metrics() {
2934 self.encode_metrics_global(message)
2935 } else {
2936 self.encode_metrics_envelope(cogs, message)
2937 }
2938 }
2939
2940 #[cfg(all(test, feature = "processing"))]
2941 fn redis_rate_limiter_enabled(&self) -> bool {
2942 self.inner.rate_limiter.is_some()
2943 }
2944
2945 async fn handle_message(&self, message: EnvelopeProcessor) {
2946 let ty = message.variant();
2947 let feature_weights = self.feature_weights(&message);
2948
2949 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2950 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2951
2952 match message {
2953 EnvelopeProcessor::ProcessEnvelope(m) => {
2954 self.handle_process_envelope(&mut cogs, *m).await
2955 }
2956 EnvelopeProcessor::ProcessProjectMetrics(m) => {
2957 self.handle_process_metrics(&mut cogs, *m)
2958 }
2959 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2960 self.handle_process_batched_metrics(&mut cogs, *m)
2961 }
2962 EnvelopeProcessor::FlushBuckets(m) => {
2963 self.handle_flush_buckets(&mut cogs, *m).await
2964 }
2965 EnvelopeProcessor::SubmitClientReports(m) => {
2966 self.handle_submit_client_reports(&mut cogs, *m)
2967 }
2968 }
2969 });
2970 }
2971
2972 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2973 match message {
2974 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2976 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2977 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2978 EnvelopeProcessor::FlushBuckets(v) => v
2979 .buckets
2980 .values()
2981 .map(|s| {
2982 if self.inner.config.processing_enabled() {
2983 relay_metrics::cogs::ByCount(&s.buckets).into()
2986 } else {
2987 relay_metrics::cogs::BySize(&s.buckets).into()
2988 }
2989 })
2990 .fold(FeatureWeights::none(), FeatureWeights::merge),
2991 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2992 }
2993 }
2994}
2995
2996impl Service for EnvelopeProcessorService {
2997 type Interface = EnvelopeProcessor;
2998
2999 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
3000 while let Some(message) = rx.recv().await {
3001 let service = self.clone();
3002 self.inner
3003 .pool
3004 .spawn_async(
3005 async move {
3006 service.handle_message(message).await;
3007 }
3008 .boxed(),
3009 )
3010 .await;
3011 }
3012 }
3013}
3014
3015struct EnforcementResult {
3020 event: Annotated<Event>,
3021 #[cfg_attr(not(feature = "processing"), expect(dead_code))]
3022 rate_limits: RateLimits,
3023}
3024
3025impl EnforcementResult {
3026 pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
3028 Self { event, rate_limits }
3029 }
3030}
3031
3032#[derive(Clone)]
3033enum RateLimiter {
3034 Cached,
3035 #[cfg(feature = "processing")]
3036 Consistent(Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>),
3037}
3038
3039impl RateLimiter {
3040 async fn enforce<Group>(
3041 &self,
3042 managed_envelope: &mut TypedEnvelope<Group>,
3043 event: Annotated<Event>,
3044 _extracted_metrics: &mut ProcessingExtractedMetrics,
3045 ctx: processing::Context<'_>,
3046 ) -> Result<EnforcementResult, ProcessingError> {
3047 if managed_envelope.envelope().is_empty() && event.value().is_none() {
3048 return Ok(EnforcementResult::new(event, RateLimits::default()));
3049 }
3050
3051 let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
3052 if quotas.is_empty() {
3053 return Ok(EnforcementResult::new(event, RateLimits::default()));
3054 }
3055
3056 let event_category = event_category(&event);
3057
3058 let this = self.clone();
3064 let mut envelope_limiter =
3065 EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
3066 let this = this.clone();
3067
3068 async move {
3069 match this {
3070 #[cfg(feature = "processing")]
3071 RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
3072 rate_limiter
3073 .is_rate_limited(quotas, item_scope, _quantity, false)
3074 .await?,
3075 ),
3076 _ => Ok::<_, ProcessingError>(
3077 ctx.rate_limits.check_with_quotas(quotas, item_scope),
3078 ),
3079 }
3080 }
3081 });
3082
3083 if let Some(category) = event_category {
3086 envelope_limiter.assume_event(category);
3087 }
3088
3089 let scoping = managed_envelope.scoping();
3090 let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
3091 envelope_limiter
3092 .compute(managed_envelope.envelope_mut(), &scoping)
3093 .await
3094 })?;
3095 let event_active = enforcement.is_event_active();
3096
3097 #[cfg(feature = "processing")]
3101 _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3102 enforcement.apply_with_outcomes(managed_envelope);
3103
3104 if event_active {
3105 debug_assert!(managed_envelope.envelope().is_empty());
3106 return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3107 }
3108
3109 Ok(EnforcementResult::new(event, rate_limits))
3110 }
3111
3112 fn name(&self) -> &'static str {
3113 match self {
3114 Self::Cached => "cached",
3115 #[cfg(feature = "processing")]
3116 Self::Consistent(_) => "consistent",
3117 }
3118 }
3119}
3120
3121pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3122 let envelope_body: Vec<u8> = match http_encoding {
3123 HttpEncoding::Identity => return Ok(body.clone()),
3124 HttpEncoding::Deflate => {
3125 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3126 encoder.write_all(body.as_ref())?;
3127 encoder.finish()?
3128 }
3129 HttpEncoding::Gzip => {
3130 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3131 encoder.write_all(body.as_ref())?;
3132 encoder.finish()?
3133 }
3134 HttpEncoding::Br => {
3135 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3137 encoder.write_all(body.as_ref())?;
3138 encoder.into_inner()
3139 }
3140 HttpEncoding::Zstd => {
3141 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3144 encoder.write_all(body.as_ref())?;
3145 encoder.finish()?
3146 }
3147 };
3148
3149 Ok(envelope_body.into())
3150}
3151
3152#[derive(Debug)]
3154pub struct SendEnvelope {
3155 pub envelope: TypedEnvelope<Processed>,
3156 pub body: Bytes,
3157 pub http_encoding: HttpEncoding,
3158 pub project_cache: ProjectCacheHandle,
3159}
3160
3161impl UpstreamRequest for SendEnvelope {
3162 fn method(&self) -> reqwest::Method {
3163 reqwest::Method::POST
3164 }
3165
3166 fn path(&self) -> Cow<'_, str> {
3167 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3168 }
3169
3170 fn route(&self) -> &'static str {
3171 "envelope"
3172 }
3173
3174 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3175 let envelope_body = self.body.clone();
3176 metric!(
3177 distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
3178 );
3179
3180 let meta = &self.envelope.meta();
3181 let shard = self.envelope.partition_key().map(|p| p.to_string());
3182 builder
3183 .content_encoding(self.http_encoding)
3184 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3185 .header_opt("User-Agent", meta.user_agent())
3186 .header("X-Sentry-Auth", meta.auth_header())
3187 .header("X-Forwarded-For", meta.forwarded_for())
3188 .header("Content-Type", envelope::CONTENT_TYPE)
3189 .header_opt("X-Sentry-Relay-Shard", shard)
3190 .body(envelope_body);
3191
3192 Ok(())
3193 }
3194
3195 fn sign(&mut self) -> Option<Sign> {
3196 Some(Sign::Optional(SignatureType::RequestSign))
3197 }
3198
3199 fn respond(
3200 self: Box<Self>,
3201 result: Result<http::Response, UpstreamRequestError>,
3202 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3203 Box::pin(async move {
3204 let result = match result {
3205 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3206 Err(error) => Err(error),
3207 };
3208
3209 match result {
3210 Ok(()) => self.envelope.accept(),
3211 Err(error) if error.is_received() => {
3212 let scoping = self.envelope.scoping();
3213 self.envelope.accept();
3214
3215 if let UpstreamRequestError::RateLimited(limits) = error {
3216 self.project_cache
3217 .get(scoping.project_key)
3218 .rate_limits()
3219 .merge(limits.scope(&scoping));
3220 }
3221 }
3222 Err(error) => {
3223 let mut envelope = self.envelope;
3226 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3227 relay_log::error!(
3228 error = &error as &dyn Error,
3229 tags.project_key = %envelope.scoping().project_key,
3230 "error sending envelope"
3231 );
3232 }
3233 }
3234 })
3235 }
3236}
3237
3238#[derive(Debug)]
3245struct Partition<'a> {
3246 max_size: usize,
3247 remaining: usize,
3248 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3249 project_info: HashMap<ProjectKey, Scoping>,
3250}
3251
3252impl<'a> Partition<'a> {
3253 pub fn new(size: usize) -> Self {
3255 Self {
3256 max_size: size,
3257 remaining: size,
3258 views: HashMap::new(),
3259 project_info: HashMap::new(),
3260 }
3261 }
3262
3263 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3274 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3275
3276 if let Some(current) = current {
3277 self.remaining = self.remaining.saturating_sub(current.estimated_size());
3278 self.views
3279 .entry(scoping.project_key)
3280 .or_default()
3281 .push(current);
3282
3283 self.project_info
3284 .entry(scoping.project_key)
3285 .or_insert(scoping);
3286 }
3287
3288 next
3289 }
3290
3291 fn is_empty(&self) -> bool {
3293 self.views.is_empty()
3294 }
3295
3296 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3300 #[derive(serde::Serialize)]
3301 struct Wrapper<'a> {
3302 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3303 }
3304
3305 let buckets = &self.views;
3306 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3307
3308 let scopings = self.project_info.clone();
3309 self.project_info.clear();
3310
3311 self.views.clear();
3312 self.remaining = self.max_size;
3313
3314 (payload, scopings)
3315 }
3316}
3317
3318#[derive(Debug)]
3322struct SendMetricsRequest {
3323 partition_key: String,
3325 unencoded: Bytes,
3327 encoded: Bytes,
3329 project_info: HashMap<ProjectKey, Scoping>,
3333 http_encoding: HttpEncoding,
3335 metric_outcomes: MetricOutcomes,
3337}
3338
3339impl SendMetricsRequest {
3340 fn create_error_outcomes(self) {
3341 #[derive(serde::Deserialize)]
3342 struct Wrapper {
3343 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3344 }
3345
3346 let buckets = match serde_json::from_slice(&self.unencoded) {
3347 Ok(Wrapper { buckets }) => buckets,
3348 Err(err) => {
3349 relay_log::error!(
3350 error = &err as &dyn std::error::Error,
3351 "failed to parse buckets from failed transmission"
3352 );
3353 return;
3354 }
3355 };
3356
3357 for (key, buckets) in buckets {
3358 let Some(&scoping) = self.project_info.get(&key) else {
3359 relay_log::error!("missing scoping for project key");
3360 continue;
3361 };
3362
3363 self.metric_outcomes.track(
3364 scoping,
3365 &buckets,
3366 Outcome::Invalid(DiscardReason::Internal),
3367 );
3368 }
3369 }
3370}
3371
3372impl UpstreamRequest for SendMetricsRequest {
3373 fn set_relay_id(&self) -> bool {
3374 true
3375 }
3376
3377 fn sign(&mut self) -> Option<Sign> {
3378 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3379 }
3380
3381 fn method(&self) -> reqwest::Method {
3382 reqwest::Method::POST
3383 }
3384
3385 fn path(&self) -> Cow<'_, str> {
3386 "/api/0/relays/metrics/".into()
3387 }
3388
3389 fn route(&self) -> &'static str {
3390 "global_metrics"
3391 }
3392
3393 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3394 metric!(
3395 distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3396 );
3397
3398 builder
3399 .content_encoding(self.http_encoding)
3400 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3401 .header(header::CONTENT_TYPE, b"application/json")
3402 .body(self.encoded.clone());
3403
3404 Ok(())
3405 }
3406
3407 fn respond(
3408 self: Box<Self>,
3409 result: Result<http::Response, UpstreamRequestError>,
3410 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3411 Box::pin(async {
3412 match result {
3413 Ok(mut response) => {
3414 response.consume().await.ok();
3415 }
3416 Err(error) => {
3417 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3418
3419 if error.is_received() {
3422 return;
3423 }
3424
3425 self.create_error_outcomes()
3426 }
3427 }
3428 })
3429 }
3430}
3431
3432#[derive(Copy, Clone, Debug)]
3434struct CombinedQuotas<'a> {
3435 global_quotas: &'a [Quota],
3436 project_quotas: &'a [Quota],
3437}
3438
3439impl<'a> CombinedQuotas<'a> {
3440 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3442 Self {
3443 global_quotas: &global_config.quotas,
3444 project_quotas,
3445 }
3446 }
3447
3448 pub fn is_empty(&self) -> bool {
3450 self.len() == 0
3451 }
3452
3453 pub fn len(&self) -> usize {
3455 self.global_quotas.len() + self.project_quotas.len()
3456 }
3457}
3458
3459impl<'a> IntoIterator for CombinedQuotas<'a> {
3460 type Item = &'a Quota;
3461 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3462
3463 fn into_iter(self) -> Self::IntoIter {
3464 self.global_quotas.iter().chain(self.project_quotas.iter())
3465 }
3466}
3467
3468#[cfg(test)]
3469mod tests {
3470 use std::collections::BTreeMap;
3471
3472 use insta::assert_debug_snapshot;
3473 use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3474 use relay_common::glob2::LazyGlob;
3475 use relay_dynamic_config::ProjectConfig;
3476 use relay_event_normalization::{
3477 MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3478 TransactionNameRule,
3479 };
3480 use relay_event_schema::protocol::TransactionSource;
3481 use relay_pii::DataScrubbingConfig;
3482 use similar_asserts::assert_eq;
3483
3484 use crate::metrics_extraction::IntoMetric;
3485 use crate::metrics_extraction::transactions::types::{
3486 CommonTags, TransactionMeasurementTags, TransactionMetric,
3487 };
3488 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3489
3490 #[cfg(feature = "processing")]
3491 use {
3492 relay_metrics::BucketValue,
3493 relay_quotas::{QuotaScope, ReasonCode},
3494 relay_test::mock_service,
3495 };
3496
3497 use super::*;
3498
3499 #[cfg(feature = "processing")]
3500 fn mock_quota(id: &str) -> Quota {
3501 Quota {
3502 id: Some(id.into()),
3503 categories: [DataCategory::MetricBucket].into(),
3504 scope: QuotaScope::Organization,
3505 scope_id: None,
3506 limit: Some(0),
3507 window: None,
3508 reason_code: None,
3509 namespace: None,
3510 }
3511 }
3512
3513 #[cfg(feature = "processing")]
3514 #[test]
3515 fn test_dynamic_quotas() {
3516 let global_config = GlobalConfig {
3517 quotas: vec![mock_quota("foo"), mock_quota("bar")],
3518 ..Default::default()
3519 };
3520
3521 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3522
3523 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3524
3525 assert_eq!(dynamic_quotas.len(), 4);
3526 assert!(!dynamic_quotas.is_empty());
3527
3528 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3529 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3530 }
3531
3532 #[cfg(feature = "processing")]
3535 #[tokio::test]
3536 async fn test_ratelimit_per_batch() {
3537 use relay_base_schema::organization::OrganizationId;
3538 use relay_protocol::FiniteF64;
3539
3540 let rate_limited_org = Scoping {
3541 organization_id: OrganizationId::new(1),
3542 project_id: ProjectId::new(21),
3543 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3544 key_id: Some(17),
3545 };
3546
3547 let not_rate_limited_org = Scoping {
3548 organization_id: OrganizationId::new(2),
3549 project_id: ProjectId::new(21),
3550 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3551 key_id: Some(17),
3552 };
3553
3554 let message = {
3555 let project_info = {
3556 let quota = Quota {
3557 id: Some("testing".into()),
3558 categories: [DataCategory::MetricBucket].into(),
3559 scope: relay_quotas::QuotaScope::Organization,
3560 scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3561 limit: Some(0),
3562 window: None,
3563 reason_code: Some(ReasonCode::new("test")),
3564 namespace: None,
3565 };
3566
3567 let mut config = ProjectConfig::default();
3568 config.quotas.push(quota);
3569
3570 Arc::new(ProjectInfo {
3571 config,
3572 ..Default::default()
3573 })
3574 };
3575
3576 let project_metrics = |scoping| ProjectBuckets {
3577 buckets: vec![Bucket {
3578 name: "d:transactions/bar".into(),
3579 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3580 timestamp: UnixTimestamp::now(),
3581 tags: Default::default(),
3582 width: 10,
3583 metadata: BucketMetadata::default(),
3584 }],
3585 rate_limits: Default::default(),
3586 project_info: project_info.clone(),
3587 scoping,
3588 };
3589
3590 let buckets = hashbrown::HashMap::from([
3591 (
3592 rate_limited_org.project_key,
3593 project_metrics(rate_limited_org),
3594 ),
3595 (
3596 not_rate_limited_org.project_key,
3597 project_metrics(not_rate_limited_org),
3598 ),
3599 ]);
3600
3601 FlushBuckets {
3602 partition_key: 0,
3603 buckets,
3604 }
3605 };
3606
3607 assert_eq!(message.buckets.keys().count(), 2);
3609
3610 let config = {
3611 let config_json = serde_json::json!({
3612 "processing": {
3613 "enabled": true,
3614 "kafka_config": [],
3615 "redis": {
3616 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3617 }
3618 }
3619 });
3620 Config::from_json_value(config_json).unwrap()
3621 };
3622
3623 let (store, handle) = {
3624 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3625 let org_id = match msg {
3626 Store::Metrics(x) => x.scoping.organization_id,
3627 _ => panic!("received envelope when expecting only metrics"),
3628 };
3629 org_ids.push(org_id);
3630 };
3631
3632 mock_service("store_forwarder", vec![], f)
3633 };
3634
3635 let processor = create_test_processor(config).await;
3636 assert!(processor.redis_rate_limiter_enabled());
3637
3638 processor.encode_metrics_processing(message, &store).await;
3639
3640 drop(store);
3641 let orgs_not_ratelimited = handle.await.unwrap();
3642
3643 assert_eq!(
3644 orgs_not_ratelimited,
3645 vec![not_rate_limited_org.organization_id]
3646 );
3647 }
3648
3649 #[tokio::test]
3650 async fn test_browser_version_extraction_with_pii_like_data() {
3651 let processor = create_test_processor(Default::default()).await;
3652 let outcome_aggregator = Addr::dummy();
3653 let event_id = EventId::new();
3654
3655 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3656 .parse()
3657 .unwrap();
3658
3659 let request_meta = RequestMeta::new(dsn);
3660 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3661
3662 envelope.add_item({
3663 let mut item = Item::new(ItemType::Event);
3664 item.set_payload(
3665 ContentType::Json,
3666 r#"
3667 {
3668 "request": {
3669 "headers": [
3670 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3671 ]
3672 }
3673 }
3674 "#,
3675 );
3676 item
3677 });
3678
3679 let mut datascrubbing_settings = DataScrubbingConfig::default();
3680 datascrubbing_settings.scrub_data = true;
3682 datascrubbing_settings.scrub_defaults = true;
3683 datascrubbing_settings.scrub_ip_addresses = true;
3684
3685 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3687
3688 let config = ProjectConfig {
3689 datascrubbing_settings,
3690 pii_config: Some(pii_config),
3691 ..Default::default()
3692 };
3693
3694 let project_info = ProjectInfo {
3695 config,
3696 ..Default::default()
3697 };
3698
3699 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3700 assert_eq!(envelopes.len(), 1);
3701
3702 let (group, envelope) = envelopes.pop().unwrap();
3703 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3704
3705 let message = ProcessEnvelopeGrouped {
3706 group,
3707 envelope,
3708 ctx: processing::Context {
3709 project_info: &project_info,
3710 ..processing::Context::for_test()
3711 },
3712 };
3713
3714 let Ok(Some(Submit::Envelope(mut new_envelope))) =
3715 processor.process(&mut Token::noop(), message).await
3716 else {
3717 panic!();
3718 };
3719 let new_envelope = new_envelope.envelope_mut();
3720
3721 let event_item = new_envelope.items().last().unwrap();
3722 let annotated_event: Annotated<Event> =
3723 Annotated::from_json_bytes(&event_item.payload()).unwrap();
3724 let event = annotated_event.into_value().unwrap();
3725 let headers = event
3726 .request
3727 .into_value()
3728 .unwrap()
3729 .headers
3730 .into_value()
3731 .unwrap();
3732
3733 assert_eq!(
3735 Some(
3736 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3737 ),
3738 headers.get_header("User-Agent")
3739 );
3740 let contexts = event.contexts.into_value().unwrap();
3742 let browser = contexts.0.get("browser").unwrap();
3743 assert_eq!(
3744 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3745 browser.to_json().unwrap()
3746 );
3747 }
3748
3749 #[tokio::test]
3750 #[cfg(feature = "processing")]
3751 async fn test_materialize_dsc() {
3752 use crate::services::projects::project::PublicKeyConfig;
3753
3754 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3755 .parse()
3756 .unwrap();
3757 let request_meta = RequestMeta::new(dsn);
3758 let mut envelope = Envelope::from_request(None, request_meta);
3759
3760 let dsc = r#"{
3761 "trace_id": "00000000-0000-0000-0000-000000000000",
3762 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3763 "sample_rate": "0.2"
3764 }"#;
3765 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3766
3767 let mut item = Item::new(ItemType::Event);
3768 item.set_payload(ContentType::Json, r#"{}"#);
3769 envelope.add_item(item);
3770
3771 let outcome_aggregator = Addr::dummy();
3772 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3773
3774 let mut project_info = ProjectInfo::default();
3775 project_info.public_keys.push(PublicKeyConfig {
3776 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3777 numeric_id: Some(1),
3778 });
3779
3780 let config = serde_json::json!({
3781 "processing": {
3782 "enabled": true,
3783 "kafka_config": [],
3784 }
3785 });
3786
3787 let message = ProcessEnvelopeGrouped {
3788 group: ProcessingGroup::Transaction,
3789 envelope: managed_envelope,
3790 ctx: processing::Context {
3791 config: &Config::from_json_value(config.clone()).unwrap(),
3792 project_info: &project_info,
3793 sampling_project_info: Some(&project_info),
3794 ..processing::Context::for_test()
3795 },
3796 };
3797
3798 let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3799 let Ok(Some(Submit::Envelope(envelope))) =
3800 processor.process(&mut Token::noop(), message).await
3801 else {
3802 panic!();
3803 };
3804 let event = envelope
3805 .envelope()
3806 .get_item_by(|item| item.ty() == &ItemType::Event)
3807 .unwrap();
3808
3809 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3810 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3811 Object(
3812 {
3813 "environment": ~,
3814 "public_key": String(
3815 "e12d836b15bb49d7bbf99e64295d995b",
3816 ),
3817 "release": ~,
3818 "replay_id": ~,
3819 "sample_rate": String(
3820 "0.2",
3821 ),
3822 "trace_id": String(
3823 "00000000000000000000000000000000",
3824 ),
3825 "transaction": ~,
3826 },
3827 )
3828 "###);
3829 }
3830
3831 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3832 let mut event = Annotated::<Event>::from_json(
3833 r#"
3834 {
3835 "type": "transaction",
3836 "transaction": "/foo/",
3837 "timestamp": 946684810.0,
3838 "start_timestamp": 946684800.0,
3839 "contexts": {
3840 "trace": {
3841 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3842 "span_id": "fa90fdead5f74053",
3843 "op": "http.server",
3844 "type": "trace"
3845 }
3846 },
3847 "transaction_info": {
3848 "source": "url"
3849 }
3850 }
3851 "#,
3852 )
3853 .unwrap();
3854 let e = event.value_mut().as_mut().unwrap();
3855 e.transaction.set_value(Some(transaction_name.into()));
3856
3857 e.transaction_info
3858 .value_mut()
3859 .as_mut()
3860 .unwrap()
3861 .source
3862 .set_value(Some(source));
3863
3864 relay_statsd::with_capturing_test_client(|| {
3865 utils::log_transaction_name_metrics(&mut event, |event| {
3866 let config = NormalizationConfig {
3867 transaction_name_config: TransactionNameConfig {
3868 rules: &[TransactionNameRule {
3869 pattern: LazyGlob::new("/foo/*/**".to_owned()),
3870 expiry: DateTime::<Utc>::MAX_UTC,
3871 redaction: RedactionRule::Replace {
3872 substitution: "*".to_owned(),
3873 },
3874 }],
3875 },
3876 ..Default::default()
3877 };
3878 relay_event_normalization::normalize_event(event, &config)
3879 });
3880 })
3881 }
3882
3883 #[test]
3884 fn test_log_transaction_metrics_none() {
3885 let captures = capture_test_event("/nothing", TransactionSource::Url);
3886 insta::assert_debug_snapshot!(captures, @r###"
3887 [
3888 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3889 ]
3890 "###);
3891 }
3892
3893 #[test]
3894 fn test_log_transaction_metrics_rule() {
3895 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3896 insta::assert_debug_snapshot!(captures, @r###"
3897 [
3898 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3899 ]
3900 "###);
3901 }
3902
3903 #[test]
3904 fn test_log_transaction_metrics_pattern() {
3905 let captures = capture_test_event("/something/12345", TransactionSource::Url);
3906 insta::assert_debug_snapshot!(captures, @r###"
3907 [
3908 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3909 ]
3910 "###);
3911 }
3912
3913 #[test]
3914 fn test_log_transaction_metrics_both() {
3915 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3916 insta::assert_debug_snapshot!(captures, @r###"
3917 [
3918 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3919 ]
3920 "###);
3921 }
3922
3923 #[test]
3924 fn test_log_transaction_metrics_no_match() {
3925 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3926 insta::assert_debug_snapshot!(captures, @r###"
3927 [
3928 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3929 ]
3930 "###);
3931 }
3932
3933 #[test]
3937 fn test_mri_overhead_constant() {
3938 let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
3939
3940 let derived_value = {
3941 let name = "foobar".to_owned();
3942 let value = 5.into(); let unit = MetricUnit::Duration(DurationUnit::default());
3944 let tags = TransactionMeasurementTags {
3945 measurement_rating: None,
3946 universal_tags: CommonTags(BTreeMap::new()),
3947 score_profile_version: None,
3948 };
3949
3950 let measurement = TransactionMetric::Measurement {
3951 name: name.clone(),
3952 value,
3953 unit,
3954 tags,
3955 };
3956
3957 let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
3958 metric.name.len() - unit.to_string().len() - name.len()
3959 };
3960 assert_eq!(
3961 hardcoded_value, derived_value,
3962 "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
3963 );
3964 }
3965
3966 #[tokio::test]
3967 async fn test_process_metrics_bucket_metadata() {
3968 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3969 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3970 let received_at = Utc::now();
3971 let config = Config::default();
3972
3973 let (aggregator, mut aggregator_rx) = Addr::custom();
3974 let processor = create_test_processor_with_addrs(
3975 config,
3976 Addrs {
3977 aggregator,
3978 ..Default::default()
3979 },
3980 )
3981 .await;
3982
3983 let mut item = Item::new(ItemType::Statsd);
3984 item.set_payload(
3985 ContentType::Text,
3986 "transactions/foo:3182887624:4267882815|s",
3987 );
3988 for (source, expected_received_at) in [
3989 (
3990 BucketSource::External,
3991 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3992 ),
3993 (BucketSource::Internal, None),
3994 ] {
3995 let message = ProcessMetrics {
3996 data: MetricData::Raw(vec![item.clone()]),
3997 project_key,
3998 source,
3999 received_at,
4000 sent_at: Some(Utc::now()),
4001 };
4002 processor.handle_process_metrics(&mut token, message);
4003
4004 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
4005 let buckets = merge_buckets.buckets;
4006 assert_eq!(buckets.len(), 1);
4007 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
4008 }
4009 }
4010
4011 #[tokio::test]
4012 async fn test_process_batched_metrics() {
4013 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
4014 let received_at = Utc::now();
4015 let config = Config::default();
4016
4017 let (aggregator, mut aggregator_rx) = Addr::custom();
4018 let processor = create_test_processor_with_addrs(
4019 config,
4020 Addrs {
4021 aggregator,
4022 ..Default::default()
4023 },
4024 )
4025 .await;
4026
4027 let payload = r#"{
4028 "buckets": {
4029 "11111111111111111111111111111111": [
4030 {
4031 "timestamp": 1615889440,
4032 "width": 0,
4033 "name": "d:custom/endpoint.response_time@millisecond",
4034 "type": "d",
4035 "value": [
4036 68.0
4037 ],
4038 "tags": {
4039 "route": "user_index"
4040 }
4041 }
4042 ],
4043 "22222222222222222222222222222222": [
4044 {
4045 "timestamp": 1615889440,
4046 "width": 0,
4047 "name": "d:custom/endpoint.cache_rate@none",
4048 "type": "d",
4049 "value": [
4050 36.0
4051 ]
4052 }
4053 ]
4054 }
4055}
4056"#;
4057 let message = ProcessBatchedMetrics {
4058 payload: Bytes::from(payload),
4059 source: BucketSource::Internal,
4060 received_at,
4061 sent_at: Some(Utc::now()),
4062 };
4063 processor.handle_process_batched_metrics(&mut token, message);
4064
4065 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
4066 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
4067
4068 let mut messages = vec![mb1, mb2];
4069 messages.sort_by_key(|mb| mb.project_key);
4070
4071 let actual = messages
4072 .into_iter()
4073 .map(|mb| (mb.project_key, mb.buckets))
4074 .collect::<Vec<_>>();
4075
4076 assert_debug_snapshot!(actual, @r###"
4077 [
4078 (
4079 ProjectKey("11111111111111111111111111111111"),
4080 [
4081 Bucket {
4082 timestamp: UnixTimestamp(1615889440),
4083 width: 0,
4084 name: MetricName(
4085 "d:custom/endpoint.response_time@millisecond",
4086 ),
4087 value: Distribution(
4088 [
4089 68.0,
4090 ],
4091 ),
4092 tags: {
4093 "route": "user_index",
4094 },
4095 metadata: BucketMetadata {
4096 merges: 1,
4097 received_at: None,
4098 extracted_from_indexed: false,
4099 },
4100 },
4101 ],
4102 ),
4103 (
4104 ProjectKey("22222222222222222222222222222222"),
4105 [
4106 Bucket {
4107 timestamp: UnixTimestamp(1615889440),
4108 width: 0,
4109 name: MetricName(
4110 "d:custom/endpoint.cache_rate@none",
4111 ),
4112 value: Distribution(
4113 [
4114 36.0,
4115 ],
4116 ),
4117 tags: {},
4118 metadata: BucketMetadata {
4119 merges: 1,
4120 received_at: None,
4121 extracted_from_indexed: false,
4122 },
4123 },
4124 ],
4125 ),
4126 ]
4127 "###);
4128 }
4129}