1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27 ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42 self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::{Integration, SpansIntegration};
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::ExtractedMetrics;
49use crate::metrics_extraction::transactions::types::ExtractMetricsError;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::sessions::SessionsProcessor;
53use crate::processing::spans::SpansProcessor;
54use crate::processing::trace_metrics::TraceMetricsProcessor;
55use crate::processing::transactions::extraction::ExtractMetricsContext;
56use crate::processing::utils::event::{
57 EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
58 event_type,
59};
60use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
61use crate::service::ServiceError;
62use crate::services::global_config::GlobalConfigHandle;
63use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
64use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
65use crate::services::projects::cache::ProjectCacheHandle;
66use crate::services::projects::project::{ProjectInfo, ProjectState};
67use crate::services::upstream::{
68 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
69};
70use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
71use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
72use crate::{http, processing};
73use relay_threading::AsyncPool;
74#[cfg(feature = "processing")]
75use {
76 crate::services::global_rate_limits::{GlobalRateLimits, GlobalRateLimitsServiceHandle},
77 crate::services::processor::nnswitch::SwitchProcessingError,
78 crate::services::store::{Store, StoreEnvelope},
79 crate::services::upload::Upload,
80 crate::utils::Enforcement,
81 itertools::Itertools,
82 relay_cardinality::{
83 CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
84 RedisSetLimiterOptions,
85 },
86 relay_dynamic_config::CardinalityLimiterMode,
87 relay_quotas::{RateLimitingError, RedisRateLimiter},
88 relay_redis::{AsyncRedisClient, RedisClients},
89 std::time::Instant,
90 symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
91};
92
93mod attachment;
94mod dynamic_sampling;
95mod event;
96mod metrics;
97mod nel;
98mod profile;
99mod profile_chunk;
100mod replay;
101mod report;
102mod span;
103
104#[cfg(all(sentry, feature = "processing"))]
105mod playstation;
106mod standalone;
107#[cfg(feature = "processing")]
108mod unreal;
109
110#[cfg(feature = "processing")]
111mod nnswitch;
112
113macro_rules! if_processing {
117 ($config:expr, $if_true:block) => {
118 #[cfg(feature = "processing")] {
119 if $config.processing_enabled() $if_true
120 }
121 };
122 ($config:expr, $if_true:block else $if_false:block) => {
123 {
124 #[cfg(feature = "processing")] {
125 if $config.processing_enabled() $if_true else $if_false
126 }
127 #[cfg(not(feature = "processing"))] {
128 $if_false
129 }
130 }
131 };
132}
133
134pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
136
137#[derive(Debug)]
138pub struct GroupTypeError;
139
140impl Display for GroupTypeError {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 f.write_str("failed to convert processing group into corresponding type")
143 }
144}
145
146impl std::error::Error for GroupTypeError {}
147
148macro_rules! processing_group {
149 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
150 #[derive(Clone, Copy, Debug)]
151 pub struct $ty;
152
153 impl From<$ty> for ProcessingGroup {
154 fn from(_: $ty) -> Self {
155 ProcessingGroup::$variant
156 }
157 }
158
159 impl TryFrom<ProcessingGroup> for $ty {
160 type Error = GroupTypeError;
161
162 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
163 if matches!(value, ProcessingGroup::$variant) {
164 return Ok($ty);
165 }
166 $($(
167 if matches!(value, ProcessingGroup::$other) {
168 return Ok($ty);
169 }
170 )+)?
171 return Err(GroupTypeError);
172 }
173 }
174 };
175}
176
177pub trait EventProcessing {}
181
182processing_group!(TransactionGroup, Transaction);
183impl EventProcessing for TransactionGroup {}
184
185processing_group!(ErrorGroup, Error);
186impl EventProcessing for ErrorGroup {}
187
188processing_group!(SessionGroup, Session);
189processing_group!(StandaloneGroup, Standalone);
190processing_group!(ClientReportGroup, ClientReport);
191processing_group!(ReplayGroup, Replay);
192processing_group!(CheckInGroup, CheckIn);
193processing_group!(LogGroup, Log, Nel);
194processing_group!(TraceMetricGroup, TraceMetric);
195processing_group!(SpanGroup, Span);
196
197processing_group!(ProfileChunkGroup, ProfileChunk);
198processing_group!(MetricsGroup, Metrics);
199processing_group!(ForwardUnknownGroup, ForwardUnknown);
200processing_group!(Ungrouped, Ungrouped);
201
202#[derive(Clone, Copy, Debug)]
206pub struct Processed;
207
208#[derive(Clone, Copy, Debug)]
210pub enum ProcessingGroup {
211 Transaction,
215 Error,
220 Session,
222 Standalone,
225 ClientReport,
227 Replay,
229 CheckIn,
231 Nel,
233 Log,
235 TraceMetric,
237 Span,
239 SpanV2,
241 Metrics,
243 ProfileChunk,
245 ForwardUnknown,
248 Ungrouped,
250}
251
252impl ProcessingGroup {
253 fn split_envelope(
255 mut envelope: Envelope,
256 project_info: &ProjectInfo,
257 ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
258 let headers = envelope.headers().clone();
259 let mut grouped_envelopes = smallvec![];
260
261 let replay_items = envelope.take_items_by(|item| {
263 matches!(
264 item.ty(),
265 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
266 )
267 });
268 if !replay_items.is_empty() {
269 grouped_envelopes.push((
270 ProcessingGroup::Replay,
271 Envelope::from_parts(headers.clone(), replay_items),
272 ))
273 }
274
275 let session_items = envelope
277 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
278 if !session_items.is_empty() {
279 grouped_envelopes.push((
280 ProcessingGroup::Session,
281 Envelope::from_parts(headers.clone(), session_items),
282 ))
283 }
284
285 let span_v2_items = envelope.take_items_by(|item| {
286 let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
287 let is_supported_integration = matches!(
288 item.integration(),
289 Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
290 );
291 let is_span = matches!(item.ty(), &ItemType::Span);
292 let is_span_attachment = item.is_attachment_v2();
293
294 ItemContainer::<SpanV2>::is_container(item)
295 || (exp_feature && is_span)
296 || (exp_feature && is_supported_integration)
297 || (exp_feature && is_span_attachment)
298 });
299
300 if !span_v2_items.is_empty() {
301 grouped_envelopes.push((
302 ProcessingGroup::SpanV2,
303 Envelope::from_parts(headers.clone(), span_v2_items),
304 ))
305 }
306
307 let span_items = envelope.take_items_by(|item| {
309 matches!(item.ty(), &ItemType::Span)
310 || matches!(item.integration(), Some(Integration::Spans(_)))
311 });
312
313 if !span_items.is_empty() {
314 grouped_envelopes.push((
315 ProcessingGroup::Span,
316 Envelope::from_parts(headers.clone(), span_items),
317 ))
318 }
319
320 let logs_items = envelope.take_items_by(|item| {
322 matches!(item.ty(), &ItemType::Log)
323 || matches!(item.integration(), Some(Integration::Logs(_)))
324 });
325 if !logs_items.is_empty() {
326 grouped_envelopes.push((
327 ProcessingGroup::Log,
328 Envelope::from_parts(headers.clone(), logs_items),
329 ))
330 }
331
332 let trace_metric_items =
334 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
335 if !trace_metric_items.is_empty() {
336 grouped_envelopes.push((
337 ProcessingGroup::TraceMetric,
338 Envelope::from_parts(headers.clone(), trace_metric_items),
339 ))
340 }
341
342 let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
344 if !nel_items.is_empty() {
345 grouped_envelopes.push((
346 ProcessingGroup::Nel,
347 Envelope::from_parts(headers.clone(), nel_items),
348 ))
349 }
350
351 let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
356 if !metric_items.is_empty() {
357 grouped_envelopes.push((
358 ProcessingGroup::Metrics,
359 Envelope::from_parts(headers.clone(), metric_items),
360 ))
361 }
362
363 let profile_chunk_items =
365 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
366 if !profile_chunk_items.is_empty() {
367 grouped_envelopes.push((
368 ProcessingGroup::ProfileChunk,
369 Envelope::from_parts(headers.clone(), profile_chunk_items),
370 ))
371 }
372
373 if !envelope.items().any(Item::creates_event) {
378 let standalone_items = envelope.take_items_by(Item::requires_event);
379 if !standalone_items.is_empty() {
380 grouped_envelopes.push((
381 ProcessingGroup::Standalone,
382 Envelope::from_parts(headers.clone(), standalone_items),
383 ))
384 }
385 };
386
387 let security_reports_items = envelope
389 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
390 .into_iter()
391 .map(|item| {
392 let headers = headers.clone();
393 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
394 let mut envelope = Envelope::from_parts(headers, items);
395 envelope.set_event_id(EventId::new());
396 (ProcessingGroup::Error, envelope)
397 });
398 grouped_envelopes.extend(security_reports_items);
399
400 let require_event_items = envelope.take_items_by(Item::requires_event);
402 if !require_event_items.is_empty() {
403 let group = if require_event_items
404 .iter()
405 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
406 {
407 ProcessingGroup::Transaction
408 } else {
409 ProcessingGroup::Error
410 };
411
412 grouped_envelopes.push((
413 group,
414 Envelope::from_parts(headers.clone(), require_event_items),
415 ))
416 }
417
418 let envelopes = envelope.items_mut().map(|item| {
420 let headers = headers.clone();
421 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
422 let envelope = Envelope::from_parts(headers, items);
423 let item_type = item.ty();
424 let group = if matches!(item_type, &ItemType::CheckIn) {
425 ProcessingGroup::CheckIn
426 } else if matches!(item.ty(), &ItemType::ClientReport) {
427 ProcessingGroup::ClientReport
428 } else if matches!(item_type, &ItemType::Unknown(_)) {
429 ProcessingGroup::ForwardUnknown
430 } else {
431 ProcessingGroup::Ungrouped
433 };
434
435 (group, envelope)
436 });
437 grouped_envelopes.extend(envelopes);
438
439 grouped_envelopes
440 }
441
442 pub fn variant(&self) -> &'static str {
444 match self {
445 ProcessingGroup::Transaction => "transaction",
446 ProcessingGroup::Error => "error",
447 ProcessingGroup::Session => "session",
448 ProcessingGroup::Standalone => "standalone",
449 ProcessingGroup::ClientReport => "client_report",
450 ProcessingGroup::Replay => "replay",
451 ProcessingGroup::CheckIn => "check_in",
452 ProcessingGroup::Log => "log",
453 ProcessingGroup::TraceMetric => "trace_metric",
454 ProcessingGroup::Nel => "nel",
455 ProcessingGroup::Span => "span",
456 ProcessingGroup::SpanV2 => "span_v2",
457 ProcessingGroup::Metrics => "metrics",
458 ProcessingGroup::ProfileChunk => "profile_chunk",
459 ProcessingGroup::ForwardUnknown => "forward_unknown",
460 ProcessingGroup::Ungrouped => "ungrouped",
461 }
462 }
463}
464
465impl From<ProcessingGroup> for AppFeature {
466 fn from(value: ProcessingGroup) -> Self {
467 match value {
468 ProcessingGroup::Transaction => AppFeature::Transactions,
469 ProcessingGroup::Error => AppFeature::Errors,
470 ProcessingGroup::Session => AppFeature::Sessions,
471 ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
472 ProcessingGroup::ClientReport => AppFeature::ClientReports,
473 ProcessingGroup::Replay => AppFeature::Replays,
474 ProcessingGroup::CheckIn => AppFeature::CheckIns,
475 ProcessingGroup::Log => AppFeature::Logs,
476 ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
477 ProcessingGroup::Nel => AppFeature::Logs,
478 ProcessingGroup::Span => AppFeature::Spans,
479 ProcessingGroup::SpanV2 => AppFeature::Spans,
480 ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
481 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
482 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
483 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
484 }
485 }
486}
487
488#[derive(Debug, thiserror::Error)]
490pub enum ProcessingError {
491 #[error("invalid json in event")]
492 InvalidJson(#[source] serde_json::Error),
493
494 #[error("invalid message pack event payload")]
495 InvalidMsgpack(#[from] rmp_serde::decode::Error),
496
497 #[cfg(feature = "processing")]
498 #[error("invalid unreal crash report")]
499 InvalidUnrealReport(#[source] Unreal4Error),
500
501 #[error("event payload too large")]
502 PayloadTooLarge(DiscardItemType),
503
504 #[error("invalid transaction event")]
505 InvalidTransaction,
506
507 #[error("envelope processor failed")]
508 ProcessingFailed(#[from] ProcessingAction),
509
510 #[error("duplicate {0} in event")]
511 DuplicateItem(ItemType),
512
513 #[error("failed to extract event payload")]
514 NoEventPayload,
515
516 #[error("missing project id in DSN")]
517 MissingProjectId,
518
519 #[error("invalid security report type: {0:?}")]
520 InvalidSecurityType(Bytes),
521
522 #[error("unsupported security report type")]
523 UnsupportedSecurityType,
524
525 #[error("invalid security report")]
526 InvalidSecurityReport(#[source] serde_json::Error),
527
528 #[error("invalid nel report")]
529 InvalidNelReport(#[source] NetworkReportError),
530
531 #[error("event filtered with reason: {0:?}")]
532 EventFiltered(FilterStatKey),
533
534 #[error("missing or invalid required event timestamp")]
535 InvalidTimestamp,
536
537 #[error("could not serialize event payload")]
538 SerializeFailed(#[source] serde_json::Error),
539
540 #[cfg(feature = "processing")]
541 #[error("failed to apply quotas")]
542 QuotasFailed(#[from] RateLimitingError),
543
544 #[error("invalid pii config")]
545 PiiConfigError(PiiConfigError),
546
547 #[error("invalid processing group type")]
548 InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
549
550 #[error("invalid replay")]
551 InvalidReplay(DiscardReason),
552
553 #[error("replay filtered with reason: {0:?}")]
554 ReplayFiltered(FilterStatKey),
555
556 #[cfg(feature = "processing")]
557 #[error("nintendo switch dying message processing failed {0:?}")]
558 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
559
560 #[cfg(all(sentry, feature = "processing"))]
561 #[error("playstation dump processing failed: {0}")]
562 InvalidPlaystationDump(String),
563
564 #[error("processing group does not match specific processor")]
565 ProcessingGroupMismatch,
566 #[error("new processing pipeline failed")]
567 ProcessingFailure,
568}
569
570impl ProcessingError {
571 pub fn to_outcome(&self) -> Option<Outcome> {
572 match self {
573 Self::PayloadTooLarge(payload_type) => {
574 Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
575 }
576 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
577 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
578 Self::InvalidSecurityType(_) => {
579 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
580 }
581 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
582 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
583 Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
584 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
585 Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
586 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
587 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
588 #[cfg(feature = "processing")]
589 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
590 #[cfg(all(sentry, feature = "processing"))]
591 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
592 #[cfg(feature = "processing")]
593 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
594 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
595 }
596 #[cfg(feature = "processing")]
597 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
598 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
599 Some(Outcome::Invalid(DiscardReason::Internal))
600 }
601 #[cfg(feature = "processing")]
602 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
603 Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
604 Self::MissingProjectId => None,
605 Self::EventFiltered(_) => None,
606 Self::InvalidProcessingGroup(_) => None,
607 Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
608 Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
609
610 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
611 Self::ProcessingFailure => None,
613 }
614 }
615
616 fn is_unexpected(&self) -> bool {
617 self.to_outcome()
618 .is_some_and(|outcome| outcome.is_unexpected())
619 }
620}
621
622#[cfg(feature = "processing")]
623impl From<Unreal4Error> for ProcessingError {
624 fn from(err: Unreal4Error) -> Self {
625 match err.kind() {
626 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
627 _ => ProcessingError::InvalidUnrealReport(err),
628 }
629 }
630}
631
632impl From<ExtractMetricsError> for ProcessingError {
633 fn from(error: ExtractMetricsError) -> Self {
634 match error {
635 ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
636 Self::InvalidTimestamp
637 }
638 }
639 }
640}
641
642impl From<InvalidProcessingGroupType> for ProcessingError {
643 fn from(value: InvalidProcessingGroupType) -> Self {
644 Self::InvalidProcessingGroup(Box::new(value))
645 }
646}
647
648type ExtractedEvent = (Annotated<Event>, usize);
649
650#[derive(Debug)]
655pub struct ProcessingExtractedMetrics {
656 metrics: ExtractedMetrics,
657}
658
659impl ProcessingExtractedMetrics {
660 pub fn new() -> Self {
661 Self {
662 metrics: ExtractedMetrics::default(),
663 }
664 }
665
666 pub fn into_inner(self) -> ExtractedMetrics {
667 self.metrics
668 }
669
670 pub fn extend(
672 &mut self,
673 extracted: ExtractedMetrics,
674 sampling_decision: Option<SamplingDecision>,
675 ) {
676 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
677 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
678 }
679
680 pub fn extend_project_metrics<I>(
682 &mut self,
683 buckets: I,
684 sampling_decision: Option<SamplingDecision>,
685 ) where
686 I: IntoIterator<Item = Bucket>,
687 {
688 self.metrics
689 .project_metrics
690 .extend(buckets.into_iter().map(|mut bucket| {
691 bucket.metadata.extracted_from_indexed =
692 sampling_decision == Some(SamplingDecision::Keep);
693 bucket
694 }));
695 }
696
697 pub fn extend_sampling_metrics<I>(
699 &mut self,
700 buckets: I,
701 sampling_decision: Option<SamplingDecision>,
702 ) where
703 I: IntoIterator<Item = Bucket>,
704 {
705 self.metrics
706 .sampling_metrics
707 .extend(buckets.into_iter().map(|mut bucket| {
708 bucket.metadata.extracted_from_indexed =
709 sampling_decision == Some(SamplingDecision::Keep);
710 bucket
711 }));
712 }
713
714 #[cfg(feature = "processing")]
719 fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
720 let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
722 let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
725
726 for (namespace, limit, indexed) in [
727 (
728 MetricNamespace::Transactions,
729 &enforcement.event,
730 &enforcement.event_indexed,
731 ),
732 (
733 MetricNamespace::Spans,
734 &enforcement.spans,
735 &enforcement.spans_indexed,
736 ),
737 ] {
738 if limit.is_active() {
739 drop_namespaces.push(namespace);
740 } else if indexed.is_active() && !enforced_consistently {
741 reset_extracted_from_indexed.push(namespace);
746 }
747 }
748
749 if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
750 self.retain_mut(|bucket| {
751 let Some(namespace) = bucket.name.try_namespace() else {
752 return true;
753 };
754
755 if drop_namespaces.contains(&namespace) {
756 return false;
757 }
758
759 if reset_extracted_from_indexed.contains(&namespace) {
760 bucket.metadata.extracted_from_indexed = false;
761 }
762
763 true
764 });
765 }
766 }
767
768 #[cfg(feature = "processing")]
769 fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
770 self.metrics.project_metrics.retain_mut(&mut f);
771 self.metrics.sampling_metrics.retain_mut(&mut f);
772 }
773}
774
775fn send_metrics(
776 metrics: ExtractedMetrics,
777 project_key: ProjectKey,
778 sampling_key: Option<ProjectKey>,
779 aggregator: &Addr<Aggregator>,
780) {
781 let ExtractedMetrics {
782 project_metrics,
783 sampling_metrics,
784 } = metrics;
785
786 if !project_metrics.is_empty() {
787 aggregator.send(MergeBuckets {
788 project_key,
789 buckets: project_metrics,
790 });
791 }
792
793 if !sampling_metrics.is_empty() {
794 let sampling_project_key = sampling_key.unwrap_or(project_key);
801 aggregator.send(MergeBuckets {
802 project_key: sampling_project_key,
803 buckets: sampling_metrics,
804 });
805 }
806}
807
808fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
813 match config.relay_mode() {
814 RelayMode::Proxy => false,
815 RelayMode::Managed => !project_info.has_feature(feature),
816 }
817}
818
819#[derive(Debug)]
822#[expect(
823 clippy::large_enum_variant,
824 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
825)]
826enum ProcessingResult {
827 Envelope {
828 managed_envelope: TypedEnvelope<Processed>,
829 extracted_metrics: ProcessingExtractedMetrics,
830 },
831 Output(Output<Outputs>),
832}
833
834impl ProcessingResult {
835 fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
837 Self::Envelope {
838 managed_envelope,
839 extracted_metrics: ProcessingExtractedMetrics::new(),
840 }
841 }
842}
843
844#[derive(Debug)]
846#[expect(
847 clippy::large_enum_variant,
848 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
849)]
850enum Submit<'a> {
851 Envelope(TypedEnvelope<Processed>),
853 Output {
855 output: Outputs,
856 ctx: processing::ForwardContext<'a>,
857 },
858}
859
860#[derive(Debug)]
870pub struct ProcessEnvelope {
871 pub envelope: ManagedEnvelope,
873 pub project_info: Arc<ProjectInfo>,
875 pub rate_limits: Arc<RateLimits>,
877 pub sampling_project_info: Option<Arc<ProjectInfo>>,
879 pub reservoir_counters: ReservoirCounters,
881}
882
883#[derive(Debug)]
885struct ProcessEnvelopeGrouped<'a> {
886 pub group: ProcessingGroup,
888 pub envelope: ManagedEnvelope,
890 pub ctx: processing::Context<'a>,
892}
893
894#[derive(Debug)]
906pub struct ProcessMetrics {
907 pub data: MetricData,
909 pub project_key: ProjectKey,
911 pub source: BucketSource,
913 pub received_at: DateTime<Utc>,
915 pub sent_at: Option<DateTime<Utc>>,
918}
919
920#[derive(Debug)]
922pub enum MetricData {
923 Raw(Vec<Item>),
925 Parsed(Vec<Bucket>),
927}
928
929impl MetricData {
930 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
935 let items = match self {
936 Self::Parsed(buckets) => return buckets,
937 Self::Raw(items) => items,
938 };
939
940 let mut buckets = Vec::new();
941 for item in items {
942 let payload = item.payload();
943 if item.ty() == &ItemType::Statsd {
944 for bucket_result in Bucket::parse_all(&payload, timestamp) {
945 match bucket_result {
946 Ok(bucket) => buckets.push(bucket),
947 Err(error) => relay_log::debug!(
948 error = &error as &dyn Error,
949 "failed to parse metric bucket from statsd format",
950 ),
951 }
952 }
953 } else if item.ty() == &ItemType::MetricBuckets {
954 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
955 Ok(parsed_buckets) => {
956 if buckets.is_empty() {
958 buckets = parsed_buckets;
959 } else {
960 buckets.extend(parsed_buckets);
961 }
962 }
963 Err(error) => {
964 relay_log::debug!(
965 error = &error as &dyn Error,
966 "failed to parse metric bucket",
967 );
968 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
969 }
970 }
971 } else {
972 relay_log::error!(
973 "invalid item of type {} passed to ProcessMetrics",
974 item.ty()
975 );
976 }
977 }
978 buckets
979 }
980}
981
982#[derive(Debug)]
983pub struct ProcessBatchedMetrics {
984 pub payload: Bytes,
986 pub source: BucketSource,
988 pub received_at: DateTime<Utc>,
990 pub sent_at: Option<DateTime<Utc>>,
992}
993
994#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
996pub enum BucketSource {
997 Internal,
1003 External,
1008}
1009
1010impl BucketSource {
1011 pub fn from_meta(meta: &RequestMeta) -> Self {
1013 match meta.request_trust() {
1014 RequestTrust::Trusted => Self::Internal,
1015 RequestTrust::Untrusted => Self::External,
1016 }
1017 }
1018}
1019
1020#[derive(Debug)]
1022pub struct SubmitClientReports {
1023 pub client_reports: Vec<ClientReport>,
1025 pub scoping: Scoping,
1027}
1028
1029#[derive(Debug)]
1031pub enum EnvelopeProcessor {
1032 ProcessEnvelope(Box<ProcessEnvelope>),
1033 ProcessProjectMetrics(Box<ProcessMetrics>),
1034 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1035 FlushBuckets(Box<FlushBuckets>),
1036 SubmitClientReports(Box<SubmitClientReports>),
1037}
1038
1039impl EnvelopeProcessor {
1040 pub fn variant(&self) -> &'static str {
1042 match self {
1043 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1044 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1045 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1046 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1047 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1048 }
1049 }
1050}
1051
1052impl relay_system::Interface for EnvelopeProcessor {}
1053
1054impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1055 type Response = relay_system::NoResponse;
1056
1057 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1058 Self::ProcessEnvelope(Box::new(message))
1059 }
1060}
1061
1062impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1063 type Response = NoResponse;
1064
1065 fn from_message(message: ProcessMetrics, _: ()) -> Self {
1066 Self::ProcessProjectMetrics(Box::new(message))
1067 }
1068}
1069
1070impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1071 type Response = NoResponse;
1072
1073 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1074 Self::ProcessBatchedMetrics(Box::new(message))
1075 }
1076}
1077
1078impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1079 type Response = NoResponse;
1080
1081 fn from_message(message: FlushBuckets, _: ()) -> Self {
1082 Self::FlushBuckets(Box::new(message))
1083 }
1084}
1085
1086impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1087 type Response = NoResponse;
1088
1089 fn from_message(message: SubmitClientReports, _: ()) -> Self {
1090 Self::SubmitClientReports(Box::new(message))
1091 }
1092}
1093
1094pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1096
1097#[derive(Clone)]
1101pub struct EnvelopeProcessorService {
1102 inner: Arc<InnerProcessor>,
1103}
1104
1105pub struct Addrs {
1107 pub outcome_aggregator: Addr<TrackOutcome>,
1108 pub upstream_relay: Addr<UpstreamRelay>,
1109 #[cfg(feature = "processing")]
1110 pub upload: Option<Addr<Upload>>,
1111 #[cfg(feature = "processing")]
1112 pub store_forwarder: Option<Addr<Store>>,
1113 pub aggregator: Addr<Aggregator>,
1114 #[cfg(feature = "processing")]
1115 pub global_rate_limits: Option<Addr<GlobalRateLimits>>,
1116}
1117
1118impl Default for Addrs {
1119 fn default() -> Self {
1120 Addrs {
1121 outcome_aggregator: Addr::dummy(),
1122 upstream_relay: Addr::dummy(),
1123 #[cfg(feature = "processing")]
1124 upload: None,
1125 #[cfg(feature = "processing")]
1126 store_forwarder: None,
1127 aggregator: Addr::dummy(),
1128 #[cfg(feature = "processing")]
1129 global_rate_limits: None,
1130 }
1131 }
1132}
1133
1134struct InnerProcessor {
1135 pool: EnvelopeProcessorServicePool,
1136 config: Arc<Config>,
1137 global_config: GlobalConfigHandle,
1138 project_cache: ProjectCacheHandle,
1139 cogs: Cogs,
1140 #[cfg(feature = "processing")]
1141 quotas_client: Option<AsyncRedisClient>,
1142 addrs: Addrs,
1143 #[cfg(feature = "processing")]
1144 rate_limiter: Option<Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>>,
1145 geoip_lookup: GeoIpLookup,
1146 #[cfg(feature = "processing")]
1147 cardinality_limiter: Option<CardinalityLimiter>,
1148 metric_outcomes: MetricOutcomes,
1149 processing: Processing,
1150}
1151
1152struct Processing {
1153 logs: LogsProcessor,
1154 trace_metrics: TraceMetricsProcessor,
1155 spans: SpansProcessor,
1156 check_ins: CheckInsProcessor,
1157 sessions: SessionsProcessor,
1158}
1159
1160impl EnvelopeProcessorService {
1161 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1163 pub fn new(
1164 pool: EnvelopeProcessorServicePool,
1165 config: Arc<Config>,
1166 global_config: GlobalConfigHandle,
1167 project_cache: ProjectCacheHandle,
1168 cogs: Cogs,
1169 #[cfg(feature = "processing")] redis: Option<RedisClients>,
1170 addrs: Addrs,
1171 metric_outcomes: MetricOutcomes,
1172 ) -> Self {
1173 let geoip_lookup = config
1174 .geoip_path()
1175 .and_then(
1176 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1177 Ok(geoip) => Some(geoip),
1178 Err(err) => {
1179 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1180 None
1181 }
1182 },
1183 )
1184 .unwrap_or_else(GeoIpLookup::empty);
1185
1186 #[cfg(feature = "processing")]
1187 let (cardinality, quotas) = match redis {
1188 Some(RedisClients {
1189 cardinality,
1190 quotas,
1191 ..
1192 }) => (Some(cardinality), Some(quotas)),
1193 None => (None, None),
1194 };
1195
1196 #[cfg(feature = "processing")]
1197 let global_rate_limits = addrs.global_rate_limits.clone().map(Into::into);
1198
1199 #[cfg(feature = "processing")]
1200 let rate_limiter = match (quotas.clone(), global_rate_limits) {
1201 (Some(redis), Some(global)) => Some(
1202 RedisRateLimiter::new(redis, global)
1203 .max_limit(config.max_rate_limit())
1204 .cache(config.quota_cache_ratio(), config.quota_cache_max()),
1205 ),
1206 _ => None,
1207 };
1208
1209 let quota_limiter = Arc::new(QuotaRateLimiter::new(
1210 #[cfg(feature = "processing")]
1211 project_cache.clone(),
1212 #[cfg(feature = "processing")]
1213 rate_limiter.clone(),
1214 ));
1215 #[cfg(feature = "processing")]
1216 let rate_limiter = rate_limiter.map(Arc::new);
1217
1218 let inner = InnerProcessor {
1219 pool,
1220 global_config,
1221 project_cache,
1222 cogs,
1223 #[cfg(feature = "processing")]
1224 quotas_client: quotas.clone(),
1225 #[cfg(feature = "processing")]
1226 rate_limiter,
1227 addrs,
1228 #[cfg(feature = "processing")]
1229 cardinality_limiter: cardinality
1230 .map(|cardinality| {
1231 RedisSetLimiter::new(
1232 RedisSetLimiterOptions {
1233 cache_vacuum_interval: config
1234 .cardinality_limiter_cache_vacuum_interval(),
1235 },
1236 cardinality,
1237 )
1238 })
1239 .map(CardinalityLimiter::new),
1240 metric_outcomes,
1241 processing: Processing {
1242 logs: LogsProcessor::new(Arc::clone("a_limiter)),
1243 trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)),
1244 spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1245 check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)),
1246 sessions: SessionsProcessor::new(quota_limiter),
1247 },
1248 geoip_lookup,
1249 config,
1250 };
1251
1252 Self {
1253 inner: Arc::new(inner),
1254 }
1255 }
1256
1257 async fn enforce_quotas<Group>(
1258 &self,
1259 managed_envelope: &mut TypedEnvelope<Group>,
1260 event: Annotated<Event>,
1261 extracted_metrics: &mut ProcessingExtractedMetrics,
1262 ctx: processing::Context<'_>,
1263 ) -> Result<Annotated<Event>, ProcessingError> {
1264 let cached_result = RateLimiter::Cached
1267 .enforce(managed_envelope, event, extracted_metrics, ctx)
1268 .await?;
1269
1270 if_processing!(self.inner.config, {
1271 let rate_limiter = match self.inner.rate_limiter.clone() {
1272 Some(rate_limiter) => rate_limiter,
1273 None => return Ok(cached_result.event),
1274 };
1275
1276 let consistent_result = RateLimiter::Consistent(rate_limiter)
1278 .enforce(
1279 managed_envelope,
1280 cached_result.event,
1281 extracted_metrics,
1282 ctx
1283 )
1284 .await?;
1285
1286 if !consistent_result.rate_limits.is_empty() {
1288 self.inner
1289 .project_cache
1290 .get(managed_envelope.scoping().project_key)
1291 .rate_limits()
1292 .merge(consistent_result.rate_limits);
1293 }
1294
1295 Ok(consistent_result.event)
1296 } else { Ok(cached_result.event) })
1297 }
1298
1299 async fn process_errors(
1301 &self,
1302 managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1303 project_id: ProjectId,
1304 mut ctx: processing::Context<'_>,
1305 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1306 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1307 let mut metrics = Metrics::default();
1308 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1309
1310 report::process_user_reports(managed_envelope);
1312
1313 if_processing!(self.inner.config, {
1314 unreal::expand(managed_envelope, &self.inner.config)?;
1315 #[cfg(sentry)]
1316 playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1317 nnswitch::expand(managed_envelope)?;
1318 });
1319
1320 let extraction_result = event::extract(
1321 managed_envelope,
1322 &mut metrics,
1323 event_fully_normalized,
1324 &self.inner.config,
1325 )?;
1326 let mut event = extraction_result.event;
1327
1328 if_processing!(self.inner.config, {
1329 if let Some(inner_event_fully_normalized) =
1330 unreal::process(managed_envelope, &mut event)?
1331 {
1332 event_fully_normalized = inner_event_fully_normalized;
1333 }
1334 #[cfg(sentry)]
1335 if let Some(inner_event_fully_normalized) =
1336 playstation::process(managed_envelope, &mut event, ctx.project_info)?
1337 {
1338 event_fully_normalized = inner_event_fully_normalized;
1339 }
1340 if let Some(inner_event_fully_normalized) =
1341 attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1342 {
1343 event_fully_normalized = inner_event_fully_normalized;
1344 }
1345 });
1346
1347 ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1348 managed_envelope,
1349 &mut event,
1350 ctx.project_info,
1351 ctx.sampling_project_info,
1352 );
1353
1354 let attachments = managed_envelope
1355 .envelope()
1356 .items()
1357 .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1358 processing::utils::event::finalize(
1359 managed_envelope.envelope().headers(),
1360 &mut event,
1361 attachments,
1362 &mut metrics,
1363 ctx.config,
1364 )?;
1365 event_fully_normalized = processing::utils::event::normalize(
1366 managed_envelope.envelope().headers(),
1367 &mut event,
1368 event_fully_normalized,
1369 project_id,
1370 ctx,
1371 &self.inner.geoip_lookup,
1372 )?;
1373 let filter_run =
1374 processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1375 .map_err(|err| {
1376 managed_envelope.reject(Outcome::Filtered(err.clone()));
1377 ProcessingError::EventFiltered(err)
1378 })?;
1379
1380 if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1381 dynamic_sampling::tag_error_with_sampling_decision(
1382 managed_envelope,
1383 &mut event,
1384 ctx.sampling_project_info,
1385 &self.inner.config,
1386 )
1387 .await;
1388 }
1389
1390 event = self
1391 .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1392 .await?;
1393
1394 if event.value().is_some() {
1395 processing::utils::event::scrub(&mut event, ctx.project_info)?;
1396 event::serialize(
1397 managed_envelope,
1398 &mut event,
1399 event_fully_normalized,
1400 EventMetricsExtracted(false),
1401 SpansExtracted(false),
1402 )?;
1403 event::emit_feedback_metrics(managed_envelope.envelope());
1404 }
1405
1406 let attachments = managed_envelope
1407 .envelope_mut()
1408 .items_mut()
1409 .filter(|i| i.ty() == &ItemType::Attachment);
1410 processing::utils::attachments::scrub(attachments, ctx.project_info);
1411
1412 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1413 relay_log::error!(
1414 tags.project = %project_id,
1415 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1416 "ingested event without normalizing"
1417 );
1418 }
1419
1420 Ok(Some(extracted_metrics))
1421 }
1422
1423 #[allow(unused_assignments)]
1425 async fn process_transactions(
1426 &self,
1427 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1428 cogs: &mut Token,
1429 project_id: ProjectId,
1430 mut ctx: processing::Context<'_>,
1431 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1432 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1433 let mut event_metrics_extracted = EventMetricsExtracted(false);
1434 let mut spans_extracted = SpansExtracted(false);
1435 let mut metrics = Metrics::default();
1436 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1437
1438 let extraction_result = event::extract(
1440 managed_envelope,
1441 &mut metrics,
1442 event_fully_normalized,
1443 &self.inner.config,
1444 )?;
1445
1446 if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1448 event_metrics_extracted = inner_event_metrics_extracted;
1449 }
1450 if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1451 spans_extracted = inner_spans_extracted;
1452 };
1453
1454 let mut event = extraction_result.event;
1456
1457 let profile_id = profile::filter(
1458 managed_envelope,
1459 &event,
1460 ctx.config,
1461 project_id,
1462 ctx.project_info,
1463 );
1464 processing::transactions::profile::transfer_id(&mut event, profile_id);
1465 processing::transactions::profile::remove_context_if_rate_limited(
1466 &mut event,
1467 managed_envelope.scoping(),
1468 ctx,
1469 );
1470
1471 ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1472 managed_envelope,
1473 &mut event,
1474 ctx.project_info,
1475 ctx.sampling_project_info,
1476 );
1477
1478 let attachments = managed_envelope
1479 .envelope()
1480 .items()
1481 .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1482 processing::utils::event::finalize(
1483 managed_envelope.envelope().headers(),
1484 &mut event,
1485 attachments,
1486 &mut metrics,
1487 &self.inner.config,
1488 )?;
1489
1490 event_fully_normalized = processing::utils::event::normalize(
1491 managed_envelope.envelope().headers(),
1492 &mut event,
1493 event_fully_normalized,
1494 project_id,
1495 ctx,
1496 &self.inner.geoip_lookup,
1497 )?;
1498
1499 let filter_run =
1500 processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1501 .map_err(|err| {
1502 managed_envelope.reject(Outcome::Filtered(err.clone()));
1503 ProcessingError::EventFiltered(err)
1504 })?;
1505
1506 let run_dynamic_sampling = (matches!(filter_run, FiltersStatus::Ok)
1510 || self.inner.config.processing_enabled())
1511 && matches!(&ctx.project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled());
1512
1513 let sampling_result = match run_dynamic_sampling {
1514 true => {
1515 #[allow(unused_mut)]
1516 let mut reservoir = ReservoirEvaluator::new(Arc::clone(ctx.reservoir_counters));
1517 #[cfg(feature = "processing")]
1518 if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
1519 reservoir.set_redis(managed_envelope.scoping().organization_id, quotas_client);
1520 }
1521 processing::utils::dynamic_sampling::run(
1522 managed_envelope.envelope().headers().dsc(),
1523 &event,
1524 &ctx,
1525 Some(&reservoir),
1526 )
1527 .await
1528 }
1529 false => SamplingResult::Pending,
1530 };
1531
1532 relay_statsd::metric!(
1533 counter(RelayCounters::SamplingDecision) += 1,
1534 decision = sampling_result.decision().as_str(),
1535 item = "transaction"
1536 );
1537
1538 #[cfg(feature = "processing")]
1539 let server_sample_rate = sampling_result.sample_rate();
1540
1541 if let Some(outcome) = sampling_result.into_dropped_outcome() {
1542 profile::process(
1545 managed_envelope,
1546 &mut event,
1547 ctx.global_config,
1548 ctx.config,
1549 ctx.project_info,
1550 );
1551 event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1553 &mut event,
1554 &mut extracted_metrics,
1555 ExtractMetricsContext {
1556 dsc: managed_envelope.envelope().dsc(),
1557 project_id,
1558 ctx,
1559 sampling_decision: SamplingDecision::Drop,
1560 metrics_extracted: event_metrics_extracted.0,
1561 spans_extracted: spans_extracted.0,
1562 },
1563 )?;
1564
1565 dynamic_sampling::drop_unsampled_items(
1566 managed_envelope,
1567 event,
1568 outcome,
1569 spans_extracted,
1570 );
1571
1572 event = self
1577 .enforce_quotas(
1578 managed_envelope,
1579 Annotated::empty(),
1580 &mut extracted_metrics,
1581 ctx,
1582 )
1583 .await?;
1584
1585 return Ok(Some(extracted_metrics));
1586 }
1587
1588 let _post_ds = cogs.start_category("post_ds");
1589
1590 processing::utils::event::scrub(&mut event, ctx.project_info)?;
1594
1595 let attachments = managed_envelope
1596 .envelope_mut()
1597 .items_mut()
1598 .filter(|i| i.ty() == &ItemType::Attachment);
1599 processing::utils::attachments::scrub(attachments, ctx.project_info);
1600
1601 if_processing!(self.inner.config, {
1602 let profile_id = profile::process(
1604 managed_envelope,
1605 &mut event,
1606 ctx.global_config,
1607 ctx.config,
1608 ctx.project_info,
1609 );
1610 processing::transactions::profile::transfer_id(&mut event, profile_id);
1611 processing::transactions::profile::scrub_profiler_id(&mut event);
1612
1613 event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1615 &mut event,
1616 &mut extracted_metrics,
1617 ExtractMetricsContext {
1618 dsc: managed_envelope.envelope().dsc(),
1619 project_id,
1620 ctx,
1621 sampling_decision: SamplingDecision::Keep,
1622 metrics_extracted: event_metrics_extracted.0,
1623 spans_extracted: spans_extracted.0,
1624 },
1625 )?;
1626
1627 if let Some(spans) = processing::transactions::spans::extract_from_event(
1628 managed_envelope.envelope().dsc(),
1629 &event,
1630 ctx.global_config,
1631 ctx.config,
1632 server_sample_rate,
1633 event_metrics_extracted,
1634 spans_extracted,
1635 ) {
1636 spans_extracted = SpansExtracted(true);
1637 for item in spans {
1638 match item {
1639 Ok(item) => managed_envelope.envelope_mut().add_item(item),
1640 Err(()) => managed_envelope.track_outcome(
1641 Outcome::Invalid(DiscardReason::InvalidSpan),
1642 DataCategory::SpanIndexed,
1643 1,
1644 ),
1645 }
1647 }
1648 }
1649 });
1650
1651 event = self
1652 .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1653 .await?;
1654
1655 if event.value().is_some() {
1657 event::serialize(
1658 managed_envelope,
1659 &mut event,
1660 event_fully_normalized,
1661 event_metrics_extracted,
1662 spans_extracted,
1663 )?;
1664 }
1665
1666 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1667 relay_log::error!(
1668 tags.project = %project_id,
1669 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1670 "ingested event without normalizing"
1671 );
1672 };
1673
1674 Ok(Some(extracted_metrics))
1675 }
1676
1677 async fn process_profile_chunks(
1678 &self,
1679 managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
1680 ctx: processing::Context<'_>,
1681 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1682 profile_chunk::filter(managed_envelope, ctx.project_info);
1683
1684 if_processing!(self.inner.config, {
1685 profile_chunk::process(
1686 managed_envelope,
1687 ctx.project_info,
1688 ctx.global_config,
1689 ctx.config,
1690 );
1691 });
1692
1693 self.enforce_quotas(
1694 managed_envelope,
1695 Annotated::empty(),
1696 &mut ProcessingExtractedMetrics::new(),
1697 ctx,
1698 )
1699 .await?;
1700
1701 Ok(None)
1702 }
1703
1704 async fn process_standalone(
1706 &self,
1707 managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1708 project_id: ProjectId,
1709 ctx: processing::Context<'_>,
1710 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1711 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1712
1713 standalone::process(managed_envelope);
1714
1715 profile::filter(
1716 managed_envelope,
1717 &Annotated::empty(),
1718 ctx.config,
1719 project_id,
1720 ctx.project_info,
1721 );
1722
1723 self.enforce_quotas(
1724 managed_envelope,
1725 Annotated::empty(),
1726 &mut extracted_metrics,
1727 ctx,
1728 )
1729 .await?;
1730
1731 report::process_user_reports(managed_envelope);
1732 let attachments = managed_envelope
1733 .envelope_mut()
1734 .items_mut()
1735 .filter(|i| i.ty() == &ItemType::Attachment);
1736 processing::utils::attachments::scrub(attachments, ctx.project_info);
1737
1738 Ok(Some(extracted_metrics))
1739 }
1740
1741 async fn process_client_reports(
1743 &self,
1744 managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1745 ctx: processing::Context<'_>,
1746 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1747 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1748
1749 self.enforce_quotas(
1750 managed_envelope,
1751 Annotated::empty(),
1752 &mut extracted_metrics,
1753 ctx,
1754 )
1755 .await?;
1756
1757 report::process_client_reports(
1758 managed_envelope,
1759 ctx.config,
1760 ctx.project_info,
1761 self.inner.addrs.outcome_aggregator.clone(),
1762 );
1763
1764 Ok(Some(extracted_metrics))
1765 }
1766
1767 async fn process_replays(
1769 &self,
1770 managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1771 ctx: processing::Context<'_>,
1772 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1773 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1774
1775 replay::process(
1776 managed_envelope,
1777 ctx.global_config,
1778 ctx.config,
1779 ctx.project_info,
1780 &self.inner.geoip_lookup,
1781 )?;
1782
1783 self.enforce_quotas(
1784 managed_envelope,
1785 Annotated::empty(),
1786 &mut extracted_metrics,
1787 ctx,
1788 )
1789 .await?;
1790
1791 Ok(Some(extracted_metrics))
1792 }
1793
1794 async fn process_nel(
1795 &self,
1796 mut managed_envelope: ManagedEnvelope,
1797 ctx: processing::Context<'_>,
1798 ) -> Result<ProcessingResult, ProcessingError> {
1799 nel::convert_to_logs(&mut managed_envelope);
1800 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1801 .await
1802 }
1803
1804 async fn process_with_processor<P: processing::Processor>(
1805 &self,
1806 processor: &P,
1807 mut managed_envelope: ManagedEnvelope,
1808 ctx: processing::Context<'_>,
1809 ) -> Result<ProcessingResult, ProcessingError>
1810 where
1811 Outputs: From<P::Output>,
1812 {
1813 let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1814 debug_assert!(
1815 false,
1816 "there must be work for the {} processor",
1817 std::any::type_name::<P>(),
1818 );
1819 return Err(ProcessingError::ProcessingGroupMismatch);
1820 };
1821
1822 managed_envelope.update();
1823 match managed_envelope.envelope().is_empty() {
1824 true => managed_envelope.accept(),
1825 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1826 }
1827
1828 processor
1829 .process(work, ctx)
1830 .await
1831 .map_err(|err| {
1832 relay_log::debug!(
1833 error = &err as &dyn std::error::Error,
1834 "processing pipeline failed"
1835 );
1836 ProcessingError::ProcessingFailure
1837 })
1838 .map(|o| o.map(Into::into))
1839 .map(ProcessingResult::Output)
1840 }
1841
1842 async fn process_standalone_spans(
1846 &self,
1847 managed_envelope: &mut TypedEnvelope<SpanGroup>,
1848 _project_id: ProjectId,
1849 ctx: processing::Context<'_>,
1850 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1851 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1852
1853 span::filter(managed_envelope, ctx.config, ctx.project_info);
1854 span::convert_otel_traces_data(managed_envelope);
1855
1856 if_processing!(self.inner.config, {
1857 span::process(
1858 managed_envelope,
1859 &mut Annotated::empty(),
1860 &mut extracted_metrics,
1861 _project_id,
1862 ctx,
1863 &self.inner.geoip_lookup,
1864 )
1865 .await;
1866 });
1867
1868 self.enforce_quotas(
1869 managed_envelope,
1870 Annotated::empty(),
1871 &mut extracted_metrics,
1872 ctx,
1873 )
1874 .await?;
1875
1876 Ok(Some(extracted_metrics))
1877 }
1878
1879 async fn process_envelope(
1880 &self,
1881 cogs: &mut Token,
1882 project_id: ProjectId,
1883 message: ProcessEnvelopeGrouped<'_>,
1884 ) -> Result<ProcessingResult, ProcessingError> {
1885 let ProcessEnvelopeGrouped {
1886 group,
1887 envelope: mut managed_envelope,
1888 ctx,
1889 } = message;
1890
1891 if let Some(sampling_state) = ctx.sampling_project_info {
1893 managed_envelope
1896 .envelope_mut()
1897 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1898 }
1899
1900 if let Some(retention) = ctx.project_info.config.event_retention {
1903 managed_envelope.envelope_mut().set_retention(retention);
1904 }
1905
1906 if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1909 managed_envelope
1910 .envelope_mut()
1911 .set_downsampled_retention(retention);
1912 }
1913
1914 managed_envelope
1919 .envelope_mut()
1920 .meta_mut()
1921 .set_project_id(project_id);
1922
1923 macro_rules! run {
1924 ($fn_name:ident $(, $args:expr)*) => {
1925 async {
1926 let mut managed_envelope = (managed_envelope, group).try_into()?;
1927 match self.$fn_name(&mut managed_envelope, $($args),*).await {
1928 Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1929 managed_envelope: managed_envelope.into_processed(),
1930 extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1931 }),
1932 Err(error) => {
1933 relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1934 if let Some(outcome) = error.to_outcome() {
1935 managed_envelope.reject(outcome);
1936 }
1937
1938 return Err(error);
1939 }
1940 }
1941 }.await
1942 };
1943 }
1944
1945 relay_log::trace!("Processing {group} group", group = group.variant());
1946
1947 match group {
1948 ProcessingGroup::Error => run!(process_errors, project_id, ctx),
1949 ProcessingGroup::Transaction => {
1950 run!(process_transactions, cogs, project_id, ctx)
1951 }
1952 ProcessingGroup::Session => {
1953 self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1954 .await
1955 }
1956 ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx),
1957 ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
1958 ProcessingGroup::Replay => {
1959 run!(process_replays, ctx)
1960 }
1961 ProcessingGroup::CheckIn => {
1962 self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1963 .await
1964 }
1965 ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1966 ProcessingGroup::Log => {
1967 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1968 .await
1969 }
1970 ProcessingGroup::TraceMetric => {
1971 self.process_with_processor(
1972 &self.inner.processing.trace_metrics,
1973 managed_envelope,
1974 ctx,
1975 )
1976 .await
1977 }
1978 ProcessingGroup::SpanV2 => {
1979 self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1980 .await
1981 }
1982 ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1983 ProcessingGroup::ProfileChunk => {
1984 run!(process_profile_chunks, ctx)
1985 }
1986 ProcessingGroup::Metrics => {
1988 if self.inner.config.relay_mode() != RelayMode::Proxy {
1991 relay_log::error!(
1992 tags.project = %project_id,
1993 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1994 "received metrics in the process_state"
1995 );
1996 }
1997
1998 Ok(ProcessingResult::no_metrics(
1999 managed_envelope.into_processed(),
2000 ))
2001 }
2002 ProcessingGroup::Ungrouped => {
2004 relay_log::error!(
2005 tags.project = %project_id,
2006 items = ?managed_envelope.envelope().items().next().map(Item::ty),
2007 "could not identify the processing group based on the envelope's items"
2008 );
2009
2010 Ok(ProcessingResult::no_metrics(
2011 managed_envelope.into_processed(),
2012 ))
2013 }
2014 ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
2018 managed_envelope.into_processed(),
2019 )),
2020 }
2021 }
2022
2023 async fn process<'a>(
2029 &self,
2030 cogs: &mut Token,
2031 mut message: ProcessEnvelopeGrouped<'a>,
2032 ) -> Result<Option<Submit<'a>>, ProcessingError> {
2033 let ProcessEnvelopeGrouped {
2034 ref mut envelope,
2035 ctx,
2036 ..
2037 } = message;
2038
2039 let Some(project_id) = ctx
2046 .project_info
2047 .project_id
2048 .or_else(|| envelope.envelope().meta().project_id())
2049 else {
2050 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2051 return Err(ProcessingError::MissingProjectId);
2052 };
2053
2054 let client = envelope.envelope().meta().client().map(str::to_owned);
2055 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2056 let project_key = envelope.envelope().meta().public_key();
2057 let sampling_key = envelope
2061 .envelope()
2062 .sampling_key()
2063 .filter(|_| ctx.sampling_project_info.is_some());
2064
2065 relay_log::configure_scope(|scope| {
2068 scope.set_tag("project", project_id);
2069 if let Some(client) = client {
2070 scope.set_tag("sdk", client);
2071 }
2072 if let Some(user_agent) = user_agent {
2073 scope.set_extra("user_agent", user_agent.into());
2074 }
2075 });
2076
2077 let result = match self.process_envelope(cogs, project_id, message).await {
2078 Ok(ProcessingResult::Envelope {
2079 mut managed_envelope,
2080 extracted_metrics,
2081 }) => {
2082 managed_envelope.update();
2085
2086 let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2087 send_metrics(
2088 extracted_metrics.metrics,
2089 project_key,
2090 sampling_key,
2091 &self.inner.addrs.aggregator,
2092 );
2093
2094 let envelope_response = if managed_envelope.envelope().is_empty() {
2095 if !has_metrics {
2096 managed_envelope.reject(Outcome::RateLimited(None));
2098 } else {
2099 managed_envelope.accept();
2100 }
2101
2102 None
2103 } else {
2104 Some(managed_envelope)
2105 };
2106
2107 Ok(envelope_response.map(Submit::Envelope))
2108 }
2109 Ok(ProcessingResult::Output(Output { main, metrics })) => {
2110 if let Some(metrics) = metrics {
2111 metrics.accept(|metrics| {
2112 send_metrics(
2113 metrics,
2114 project_key,
2115 sampling_key,
2116 &self.inner.addrs.aggregator,
2117 );
2118 });
2119 }
2120
2121 let ctx = ctx.to_forward();
2122 Ok(main.map(|output| Submit::Output { output, ctx }))
2123 }
2124 Err(err) => Err(err),
2125 };
2126
2127 relay_log::configure_scope(|scope| {
2128 scope.remove_tag("project");
2129 scope.remove_tag("sdk");
2130 scope.remove_tag("user_agent");
2131 });
2132
2133 result
2134 }
2135
2136 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2137 let project_key = message.envelope.envelope().meta().public_key();
2138 let wait_time = message.envelope.age();
2139 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2140
2141 cogs.cancel();
2144
2145 let scoping = message.envelope.scoping();
2146 for (group, envelope) in ProcessingGroup::split_envelope(
2147 *message.envelope.into_envelope(),
2148 &message.project_info,
2149 ) {
2150 let mut cogs = self
2151 .inner
2152 .cogs
2153 .timed(ResourceId::Relay, AppFeature::from(group));
2154
2155 let mut envelope =
2156 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2157 envelope.scope(scoping);
2158
2159 let global_config = self.inner.global_config.current();
2160
2161 let ctx = processing::Context {
2162 config: &self.inner.config,
2163 global_config: &global_config,
2164 project_info: &message.project_info,
2165 sampling_project_info: message.sampling_project_info.as_deref(),
2166 rate_limits: &message.rate_limits,
2167 reservoir_counters: &message.reservoir_counters,
2168 };
2169
2170 let message = ProcessEnvelopeGrouped {
2171 group,
2172 envelope,
2173 ctx,
2174 };
2175
2176 let result = metric!(
2177 timer(RelayTimers::EnvelopeProcessingTime),
2178 group = group.variant(),
2179 { self.process(&mut cogs, message).await }
2180 );
2181
2182 match result {
2183 Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2184 Ok(None) => {}
2185 Err(error) if error.is_unexpected() => {
2186 relay_log::error!(
2187 tags.project_key = %project_key,
2188 error = &error as &dyn Error,
2189 "error processing envelope"
2190 )
2191 }
2192 Err(error) => {
2193 relay_log::debug!(
2194 tags.project_key = %project_key,
2195 error = &error as &dyn Error,
2196 "error processing envelope"
2197 )
2198 }
2199 }
2200 }
2201 }
2202
2203 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2204 let ProcessMetrics {
2205 data,
2206 project_key,
2207 received_at,
2208 sent_at,
2209 source,
2210 } = message;
2211
2212 let received_timestamp =
2213 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2214
2215 let mut buckets = data.into_buckets(received_timestamp);
2216 if buckets.is_empty() {
2217 return;
2218 };
2219 cogs.update(relay_metrics::cogs::BySize(&buckets));
2220
2221 let clock_drift_processor =
2222 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2223
2224 buckets.retain_mut(|bucket| {
2225 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2226 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2227 return false;
2228 }
2229
2230 if !self::metrics::is_valid_namespace(bucket) {
2231 return false;
2232 }
2233
2234 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2235
2236 if !matches!(source, BucketSource::Internal) {
2237 bucket.metadata = BucketMetadata::new(received_timestamp);
2238 }
2239
2240 true
2241 });
2242
2243 let project = self.inner.project_cache.get(project_key);
2244
2245 let buckets = match project.state() {
2248 ProjectState::Enabled(project_info) => {
2249 let rate_limits = project.rate_limits().current_limits();
2250 self.check_buckets(project_key, project_info, &rate_limits, buckets)
2251 }
2252 _ => buckets,
2253 };
2254
2255 relay_log::trace!("merging metric buckets into the aggregator");
2256 self.inner
2257 .addrs
2258 .aggregator
2259 .send(MergeBuckets::new(project_key, buckets));
2260 }
2261
2262 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2263 let ProcessBatchedMetrics {
2264 payload,
2265 source,
2266 received_at,
2267 sent_at,
2268 } = message;
2269
2270 #[derive(serde::Deserialize)]
2271 struct Wrapper {
2272 buckets: HashMap<ProjectKey, Vec<Bucket>>,
2273 }
2274
2275 let buckets = match serde_json::from_slice(&payload) {
2276 Ok(Wrapper { buckets }) => buckets,
2277 Err(error) => {
2278 relay_log::debug!(
2279 error = &error as &dyn Error,
2280 "failed to parse batched metrics",
2281 );
2282 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2283 return;
2284 }
2285 };
2286
2287 for (project_key, buckets) in buckets {
2288 self.handle_process_metrics(
2289 cogs,
2290 ProcessMetrics {
2291 data: MetricData::Parsed(buckets),
2292 project_key,
2293 source,
2294 received_at,
2295 sent_at,
2296 },
2297 )
2298 }
2299 }
2300
2301 fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2302 let _submit = cogs.start_category("submit");
2303
2304 #[cfg(feature = "processing")]
2305 if self.inner.config.processing_enabled()
2306 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2307 {
2308 use crate::processing::StoreHandle;
2309
2310 let upload = self.inner.addrs.upload.as_ref();
2311 match submit {
2312 Submit::Envelope(envelope) => {
2313 let envelope_has_attachments = envelope
2314 .envelope()
2315 .items()
2316 .any(|item| *item.ty() == ItemType::Attachment);
2317 let use_objectstore = || {
2319 let options = &self.inner.global_config.current().options;
2320 utils::sample(options.objectstore_attachments_sample_rate).is_keep()
2321 };
2322
2323 if let Some(upload) = &self.inner.addrs.upload
2324 && envelope_has_attachments
2325 && use_objectstore()
2326 {
2327 upload.send(StoreEnvelope { envelope })
2329 } else {
2330 store_forwarder.send(StoreEnvelope { envelope })
2331 }
2332 }
2333 Submit::Output { output, ctx } => output
2334 .forward_store(StoreHandle::new(store_forwarder, upload), ctx)
2335 .unwrap_or_else(|err| err.into_inner()),
2336 }
2337 return;
2338 }
2339
2340 let mut envelope = match submit {
2341 Submit::Envelope(envelope) => envelope,
2342 Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2343 Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2344 Err(_) => {
2345 relay_log::error!("failed to serialize output to an envelope");
2346 return;
2347 }
2348 },
2349 };
2350
2351 if envelope.envelope_mut().is_empty() {
2352 envelope.accept();
2353 return;
2354 }
2355
2356 envelope.envelope_mut().set_sent_at(Utc::now());
2362
2363 relay_log::trace!("sending envelope to sentry endpoint");
2364 let http_encoding = self.inner.config.http_encoding();
2365 let result = envelope.envelope().to_vec().and_then(|v| {
2366 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2367 });
2368
2369 match result {
2370 Ok(body) => {
2371 self.inner
2372 .addrs
2373 .upstream_relay
2374 .send(SendRequest(SendEnvelope {
2375 envelope,
2376 body,
2377 http_encoding,
2378 project_cache: self.inner.project_cache.clone(),
2379 }));
2380 }
2381 Err(error) => {
2382 relay_log::error!(
2385 error = &error as &dyn Error,
2386 tags.project_key = %envelope.scoping().project_key,
2387 "failed to serialize envelope payload"
2388 );
2389
2390 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2391 }
2392 }
2393 }
2394
2395 fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2396 let SubmitClientReports {
2397 client_reports,
2398 scoping,
2399 } = message;
2400
2401 let upstream = self.inner.config.upstream_descriptor();
2402 let dsn = PartialDsn::outbound(&scoping, upstream);
2403
2404 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2405 for client_report in client_reports {
2406 match client_report.serialize() {
2407 Ok(payload) => {
2408 let mut item = Item::new(ItemType::ClientReport);
2409 item.set_payload(ContentType::Json, payload);
2410 envelope.add_item(item);
2411 }
2412 Err(error) => {
2413 relay_log::error!(
2414 error = &error as &dyn std::error::Error,
2415 "failed to serialize client report"
2416 );
2417 }
2418 }
2419 }
2420
2421 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2422 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2423 }
2424
2425 fn check_buckets(
2426 &self,
2427 project_key: ProjectKey,
2428 project_info: &ProjectInfo,
2429 rate_limits: &RateLimits,
2430 buckets: Vec<Bucket>,
2431 ) -> Vec<Bucket> {
2432 let Some(scoping) = project_info.scoping(project_key) else {
2433 relay_log::error!(
2434 tags.project_key = project_key.as_str(),
2435 "there is no scoping: dropping {} buckets",
2436 buckets.len(),
2437 );
2438 return Vec::new();
2439 };
2440
2441 let mut buckets = self::metrics::apply_project_info(
2442 buckets,
2443 &self.inner.metric_outcomes,
2444 project_info,
2445 scoping,
2446 );
2447
2448 let namespaces: BTreeSet<MetricNamespace> = buckets
2449 .iter()
2450 .filter_map(|bucket| bucket.name.try_namespace())
2451 .collect();
2452
2453 for namespace in namespaces {
2454 let limits = rate_limits.check_with_quotas(
2455 project_info.get_quotas(),
2456 scoping.item(DataCategory::MetricBucket),
2457 );
2458
2459 if limits.is_limited() {
2460 let rejected;
2461 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2462 bucket.name.try_namespace() == Some(namespace)
2463 });
2464
2465 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2466 self.inner.metric_outcomes.track(
2467 scoping,
2468 &rejected,
2469 Outcome::RateLimited(reason_code),
2470 );
2471 }
2472 }
2473
2474 let quotas = project_info.config.quotas.clone();
2475 match MetricsLimiter::create(buckets, quotas, scoping) {
2476 Ok(mut bucket_limiter) => {
2477 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2478 bucket_limiter.into_buckets()
2479 }
2480 Err(buckets) => buckets,
2481 }
2482 }
2483
2484 #[cfg(feature = "processing")]
2485 async fn rate_limit_buckets(
2486 &self,
2487 scoping: Scoping,
2488 project_info: &ProjectInfo,
2489 mut buckets: Vec<Bucket>,
2490 ) -> Vec<Bucket> {
2491 let Some(rate_limiter) = &self.inner.rate_limiter else {
2492 return buckets;
2493 };
2494
2495 let global_config = self.inner.global_config.current();
2496 let namespaces = buckets
2497 .iter()
2498 .filter_map(|bucket| bucket.name.try_namespace())
2499 .counts();
2500
2501 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2502
2503 for (namespace, quantity) in namespaces {
2504 let item_scoping = scoping.metric_bucket(namespace);
2505
2506 let limits = match rate_limiter
2507 .is_rate_limited(quotas, item_scoping, quantity, false)
2508 .await
2509 {
2510 Ok(limits) => limits,
2511 Err(err) => {
2512 relay_log::error!(
2513 error = &err as &dyn std::error::Error,
2514 "failed to check redis rate limits"
2515 );
2516 break;
2517 }
2518 };
2519
2520 if limits.is_limited() {
2521 let rejected;
2522 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2523 bucket.name.try_namespace() == Some(namespace)
2524 });
2525
2526 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2527 self.inner.metric_outcomes.track(
2528 scoping,
2529 &rejected,
2530 Outcome::RateLimited(reason_code),
2531 );
2532
2533 self.inner
2534 .project_cache
2535 .get(item_scoping.scoping.project_key)
2536 .rate_limits()
2537 .merge(limits);
2538 }
2539 }
2540
2541 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2542 Err(buckets) => buckets,
2543 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2544 }
2545 }
2546
2547 #[cfg(feature = "processing")]
2549 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2550 relay_log::trace!("handle_rate_limit_buckets");
2551
2552 let scoping = *bucket_limiter.scoping();
2553
2554 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2555 let global_config = self.inner.global_config.current();
2556 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2557
2558 let over_accept_once = true;
2561 let mut rate_limits = RateLimits::new();
2562
2563 for category in [DataCategory::Transaction, DataCategory::Span] {
2564 let count = bucket_limiter.count(category);
2565
2566 let timer = Instant::now();
2567 let mut is_limited = false;
2568
2569 if let Some(count) = count {
2570 match rate_limiter
2571 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2572 .await
2573 {
2574 Ok(limits) => {
2575 is_limited = limits.is_limited();
2576 rate_limits.merge(limits)
2577 }
2578 Err(e) => relay_log::error!(error = &e as &dyn Error),
2579 }
2580 }
2581
2582 relay_statsd::metric!(
2583 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2584 category = category.name(),
2585 limited = if is_limited { "true" } else { "false" },
2586 count = match count {
2587 None => "none",
2588 Some(0) => "0",
2589 Some(1) => "1",
2590 Some(1..=10) => "10",
2591 Some(1..=25) => "25",
2592 Some(1..=50) => "50",
2593 Some(51..=100) => "100",
2594 Some(101..=500) => "500",
2595 _ => "> 500",
2596 },
2597 );
2598 }
2599
2600 if rate_limits.is_limited() {
2601 let was_enforced =
2602 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2603
2604 if was_enforced {
2605 self.inner
2607 .project_cache
2608 .get(scoping.project_key)
2609 .rate_limits()
2610 .merge(rate_limits);
2611 }
2612 }
2613 }
2614
2615 bucket_limiter.into_buckets()
2616 }
2617
2618 #[cfg(feature = "processing")]
2620 async fn cardinality_limit_buckets(
2621 &self,
2622 scoping: Scoping,
2623 limits: &[CardinalityLimit],
2624 buckets: Vec<Bucket>,
2625 ) -> Vec<Bucket> {
2626 let global_config = self.inner.global_config.current();
2627 let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2628
2629 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2630 return buckets;
2631 }
2632
2633 let Some(ref limiter) = self.inner.cardinality_limiter else {
2634 return buckets;
2635 };
2636
2637 let scope = relay_cardinality::Scoping {
2638 organization_id: scoping.organization_id,
2639 project_id: scoping.project_id,
2640 };
2641
2642 let limits = match limiter
2643 .check_cardinality_limits(scope, limits, buckets)
2644 .await
2645 {
2646 Ok(limits) => limits,
2647 Err((buckets, error)) => {
2648 relay_log::error!(
2649 error = &error as &dyn std::error::Error,
2650 "cardinality limiter failed"
2651 );
2652 return buckets;
2653 }
2654 };
2655
2656 let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2657 if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2658 for limit in limits.exceeded_limits() {
2659 relay_log::with_scope(
2660 |scope| {
2661 scope.set_user(Some(relay_log::sentry::User {
2663 id: Some(scoping.organization_id.to_string()),
2664 ..Default::default()
2665 }));
2666 },
2667 || {
2668 relay_log::error!(
2669 tags.organization_id = scoping.organization_id.value(),
2670 tags.limit_id = limit.id,
2671 tags.passive = limit.passive,
2672 "Cardinality Limit"
2673 );
2674 },
2675 );
2676 }
2677 }
2678
2679 for (limit, reports) in limits.cardinality_reports() {
2680 for report in reports {
2681 self.inner
2682 .metric_outcomes
2683 .cardinality(scoping, limit, report);
2684 }
2685 }
2686
2687 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2688 return limits.into_source();
2689 }
2690
2691 let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2692
2693 for (bucket, exceeded) in rejected {
2694 self.inner.metric_outcomes.track(
2695 scoping,
2696 &[bucket],
2697 Outcome::CardinalityLimited(exceeded.id.clone()),
2698 );
2699 }
2700 accepted
2701 }
2702
2703 #[cfg(feature = "processing")]
2710 async fn encode_metrics_processing(
2711 &self,
2712 message: FlushBuckets,
2713 store_forwarder: &Addr<Store>,
2714 ) {
2715 use crate::constants::DEFAULT_EVENT_RETENTION;
2716 use crate::services::store::StoreMetrics;
2717
2718 for ProjectBuckets {
2719 buckets,
2720 scoping,
2721 project_info,
2722 ..
2723 } in message.buckets.into_values()
2724 {
2725 let buckets = self
2726 .rate_limit_buckets(scoping, &project_info, buckets)
2727 .await;
2728
2729 let limits = project_info.get_cardinality_limits();
2730 let buckets = self
2731 .cardinality_limit_buckets(scoping, limits, buckets)
2732 .await;
2733
2734 if buckets.is_empty() {
2735 continue;
2736 }
2737
2738 let retention = project_info
2739 .config
2740 .event_retention
2741 .unwrap_or(DEFAULT_EVENT_RETENTION);
2742
2743 store_forwarder.send(StoreMetrics {
2746 buckets,
2747 scoping,
2748 retention,
2749 });
2750 }
2751 }
2752
2753 fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2765 let FlushBuckets {
2766 partition_key,
2767 buckets,
2768 } = message;
2769
2770 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2771 let upstream = self.inner.config.upstream_descriptor();
2772
2773 for ProjectBuckets {
2774 buckets, scoping, ..
2775 } in buckets.values()
2776 {
2777 let dsn = PartialDsn::outbound(scoping, upstream);
2778
2779 relay_statsd::metric!(
2780 distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2781 );
2782
2783 let mut num_batches = 0;
2784 for batch in BucketsView::from(buckets).by_size(batch_size) {
2785 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2786
2787 let mut item = Item::new(ItemType::MetricBuckets);
2788 item.set_source_quantities(crate::metrics::extract_quantities(batch));
2789 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2790 envelope.add_item(item);
2791
2792 let mut envelope =
2793 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2794 envelope
2795 .set_partition_key(Some(partition_key))
2796 .scope(*scoping);
2797
2798 relay_statsd::metric!(
2799 distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2800 );
2801
2802 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2803 num_batches += 1;
2804 }
2805
2806 relay_statsd::metric!(
2807 distribution(RelayDistributions::BatchesPerPartition) = num_batches
2808 );
2809 }
2810 }
2811
2812 fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2814 if partition.is_empty() {
2815 return;
2816 }
2817
2818 let (unencoded, project_info) = partition.take();
2819 let http_encoding = self.inner.config.http_encoding();
2820 let encoded = match encode_payload(&unencoded, http_encoding) {
2821 Ok(payload) => payload,
2822 Err(error) => {
2823 let error = &error as &dyn std::error::Error;
2824 relay_log::error!(error, "failed to encode metrics payload");
2825 return;
2826 }
2827 };
2828
2829 let request = SendMetricsRequest {
2830 partition_key: partition_key.to_string(),
2831 unencoded,
2832 encoded,
2833 project_info,
2834 http_encoding,
2835 metric_outcomes: self.inner.metric_outcomes.clone(),
2836 };
2837
2838 self.inner.addrs.upstream_relay.send(SendRequest(request));
2839 }
2840
2841 fn encode_metrics_global(&self, message: FlushBuckets) {
2856 let FlushBuckets {
2857 partition_key,
2858 buckets,
2859 } = message;
2860
2861 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2862 let mut partition = Partition::new(batch_size);
2863 let mut partition_splits = 0;
2864
2865 for ProjectBuckets {
2866 buckets, scoping, ..
2867 } in buckets.values()
2868 {
2869 for bucket in buckets {
2870 let mut remaining = Some(BucketView::new(bucket));
2871
2872 while let Some(bucket) = remaining.take() {
2873 if let Some(next) = partition.insert(bucket, *scoping) {
2874 self.send_global_partition(partition_key, &mut partition);
2878 remaining = Some(next);
2879 partition_splits += 1;
2880 }
2881 }
2882 }
2883 }
2884
2885 if partition_splits > 0 {
2886 metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2887 }
2888
2889 self.send_global_partition(partition_key, &mut partition);
2890 }
2891
2892 async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2893 for (project_key, pb) in message.buckets.iter_mut() {
2894 let buckets = std::mem::take(&mut pb.buckets);
2895 pb.buckets =
2896 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2897 }
2898
2899 #[cfg(feature = "processing")]
2900 if self.inner.config.processing_enabled()
2901 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2902 {
2903 return self
2904 .encode_metrics_processing(message, store_forwarder)
2905 .await;
2906 }
2907
2908 if self.inner.config.http_global_metrics() {
2909 self.encode_metrics_global(message)
2910 } else {
2911 self.encode_metrics_envelope(cogs, message)
2912 }
2913 }
2914
2915 #[cfg(all(test, feature = "processing"))]
2916 fn redis_rate_limiter_enabled(&self) -> bool {
2917 self.inner.rate_limiter.is_some()
2918 }
2919
2920 async fn handle_message(&self, message: EnvelopeProcessor) {
2921 let ty = message.variant();
2922 let feature_weights = self.feature_weights(&message);
2923
2924 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2925 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2926
2927 match message {
2928 EnvelopeProcessor::ProcessEnvelope(m) => {
2929 self.handle_process_envelope(&mut cogs, *m).await
2930 }
2931 EnvelopeProcessor::ProcessProjectMetrics(m) => {
2932 self.handle_process_metrics(&mut cogs, *m)
2933 }
2934 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2935 self.handle_process_batched_metrics(&mut cogs, *m)
2936 }
2937 EnvelopeProcessor::FlushBuckets(m) => {
2938 self.handle_flush_buckets(&mut cogs, *m).await
2939 }
2940 EnvelopeProcessor::SubmitClientReports(m) => {
2941 self.handle_submit_client_reports(&mut cogs, *m)
2942 }
2943 }
2944 });
2945 }
2946
2947 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2948 match message {
2949 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2951 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2952 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2953 EnvelopeProcessor::FlushBuckets(v) => v
2954 .buckets
2955 .values()
2956 .map(|s| {
2957 if self.inner.config.processing_enabled() {
2958 relay_metrics::cogs::ByCount(&s.buckets).into()
2961 } else {
2962 relay_metrics::cogs::BySize(&s.buckets).into()
2963 }
2964 })
2965 .fold(FeatureWeights::none(), FeatureWeights::merge),
2966 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2967 }
2968 }
2969}
2970
2971impl Service for EnvelopeProcessorService {
2972 type Interface = EnvelopeProcessor;
2973
2974 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2975 while let Some(message) = rx.recv().await {
2976 let service = self.clone();
2977 self.inner
2978 .pool
2979 .spawn_async(
2980 async move {
2981 service.handle_message(message).await;
2982 }
2983 .boxed(),
2984 )
2985 .await;
2986 }
2987 }
2988}
2989
2990struct EnforcementResult {
2995 event: Annotated<Event>,
2996 #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2997 rate_limits: RateLimits,
2998}
2999
3000impl EnforcementResult {
3001 pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
3003 Self { event, rate_limits }
3004 }
3005}
3006
3007#[derive(Clone)]
3008enum RateLimiter {
3009 Cached,
3010 #[cfg(feature = "processing")]
3011 Consistent(Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>),
3012}
3013
3014impl RateLimiter {
3015 async fn enforce<Group>(
3016 &self,
3017 managed_envelope: &mut TypedEnvelope<Group>,
3018 event: Annotated<Event>,
3019 _extracted_metrics: &mut ProcessingExtractedMetrics,
3020 ctx: processing::Context<'_>,
3021 ) -> Result<EnforcementResult, ProcessingError> {
3022 if managed_envelope.envelope().is_empty() && event.value().is_none() {
3023 return Ok(EnforcementResult::new(event, RateLimits::default()));
3024 }
3025
3026 let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
3027 if quotas.is_empty() {
3028 return Ok(EnforcementResult::new(event, RateLimits::default()));
3029 }
3030
3031 let event_category = event_category(&event);
3032
3033 let this = self.clone();
3039 let mut envelope_limiter =
3040 EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
3041 let this = this.clone();
3042
3043 async move {
3044 match this {
3045 #[cfg(feature = "processing")]
3046 RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
3047 rate_limiter
3048 .is_rate_limited(quotas, item_scope, _quantity, false)
3049 .await?,
3050 ),
3051 _ => Ok::<_, ProcessingError>(
3052 ctx.rate_limits.check_with_quotas(quotas, item_scope),
3053 ),
3054 }
3055 }
3056 });
3057
3058 if let Some(category) = event_category {
3061 envelope_limiter.assume_event(category);
3062 }
3063
3064 let scoping = managed_envelope.scoping();
3065 let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
3066 envelope_limiter
3067 .compute(managed_envelope.envelope_mut(), &scoping)
3068 .await
3069 })?;
3070 let event_active = enforcement.is_event_active();
3071
3072 #[cfg(feature = "processing")]
3076 _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3077 enforcement.apply_with_outcomes(managed_envelope);
3078
3079 if event_active {
3080 debug_assert!(managed_envelope.envelope().is_empty());
3081 return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3082 }
3083
3084 Ok(EnforcementResult::new(event, rate_limits))
3085 }
3086
3087 fn name(&self) -> &'static str {
3088 match self {
3089 Self::Cached => "cached",
3090 #[cfg(feature = "processing")]
3091 Self::Consistent(_) => "consistent",
3092 }
3093 }
3094}
3095
3096pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3097 let envelope_body: Vec<u8> = match http_encoding {
3098 HttpEncoding::Identity => return Ok(body.clone()),
3099 HttpEncoding::Deflate => {
3100 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3101 encoder.write_all(body.as_ref())?;
3102 encoder.finish()?
3103 }
3104 HttpEncoding::Gzip => {
3105 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3106 encoder.write_all(body.as_ref())?;
3107 encoder.finish()?
3108 }
3109 HttpEncoding::Br => {
3110 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3112 encoder.write_all(body.as_ref())?;
3113 encoder.into_inner()
3114 }
3115 HttpEncoding::Zstd => {
3116 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3119 encoder.write_all(body.as_ref())?;
3120 encoder.finish()?
3121 }
3122 };
3123
3124 Ok(envelope_body.into())
3125}
3126
3127#[derive(Debug)]
3129pub struct SendEnvelope {
3130 pub envelope: TypedEnvelope<Processed>,
3131 pub body: Bytes,
3132 pub http_encoding: HttpEncoding,
3133 pub project_cache: ProjectCacheHandle,
3134}
3135
3136impl UpstreamRequest for SendEnvelope {
3137 fn method(&self) -> reqwest::Method {
3138 reqwest::Method::POST
3139 }
3140
3141 fn path(&self) -> Cow<'_, str> {
3142 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3143 }
3144
3145 fn route(&self) -> &'static str {
3146 "envelope"
3147 }
3148
3149 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3150 let envelope_body = self.body.clone();
3151 metric!(
3152 distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
3153 );
3154
3155 let meta = &self.envelope.meta();
3156 let shard = self.envelope.partition_key().map(|p| p.to_string());
3157 builder
3158 .content_encoding(self.http_encoding)
3159 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3160 .header_opt("User-Agent", meta.user_agent())
3161 .header("X-Sentry-Auth", meta.auth_header())
3162 .header("X-Forwarded-For", meta.forwarded_for())
3163 .header("Content-Type", envelope::CONTENT_TYPE)
3164 .header_opt("X-Sentry-Relay-Shard", shard)
3165 .body(envelope_body);
3166
3167 Ok(())
3168 }
3169
3170 fn sign(&mut self) -> Option<Sign> {
3171 Some(Sign::Optional(SignatureType::RequestSign))
3172 }
3173
3174 fn respond(
3175 self: Box<Self>,
3176 result: Result<http::Response, UpstreamRequestError>,
3177 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3178 Box::pin(async move {
3179 let result = match result {
3180 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3181 Err(error) => Err(error),
3182 };
3183
3184 match result {
3185 Ok(()) => self.envelope.accept(),
3186 Err(error) if error.is_received() => {
3187 let scoping = self.envelope.scoping();
3188 self.envelope.accept();
3189
3190 if let UpstreamRequestError::RateLimited(limits) = error {
3191 self.project_cache
3192 .get(scoping.project_key)
3193 .rate_limits()
3194 .merge(limits.scope(&scoping));
3195 }
3196 }
3197 Err(error) => {
3198 let mut envelope = self.envelope;
3201 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3202 relay_log::error!(
3203 error = &error as &dyn Error,
3204 tags.project_key = %envelope.scoping().project_key,
3205 "error sending envelope"
3206 );
3207 }
3208 }
3209 })
3210 }
3211}
3212
3213#[derive(Debug)]
3220struct Partition<'a> {
3221 max_size: usize,
3222 remaining: usize,
3223 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3224 project_info: HashMap<ProjectKey, Scoping>,
3225}
3226
3227impl<'a> Partition<'a> {
3228 pub fn new(size: usize) -> Self {
3230 Self {
3231 max_size: size,
3232 remaining: size,
3233 views: HashMap::new(),
3234 project_info: HashMap::new(),
3235 }
3236 }
3237
3238 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3249 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3250
3251 if let Some(current) = current {
3252 self.remaining = self.remaining.saturating_sub(current.estimated_size());
3253 self.views
3254 .entry(scoping.project_key)
3255 .or_default()
3256 .push(current);
3257
3258 self.project_info
3259 .entry(scoping.project_key)
3260 .or_insert(scoping);
3261 }
3262
3263 next
3264 }
3265
3266 fn is_empty(&self) -> bool {
3268 self.views.is_empty()
3269 }
3270
3271 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3275 #[derive(serde::Serialize)]
3276 struct Wrapper<'a> {
3277 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3278 }
3279
3280 let buckets = &self.views;
3281 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3282
3283 let scopings = self.project_info.clone();
3284 self.project_info.clear();
3285
3286 self.views.clear();
3287 self.remaining = self.max_size;
3288
3289 (payload, scopings)
3290 }
3291}
3292
3293#[derive(Debug)]
3297struct SendMetricsRequest {
3298 partition_key: String,
3300 unencoded: Bytes,
3302 encoded: Bytes,
3304 project_info: HashMap<ProjectKey, Scoping>,
3308 http_encoding: HttpEncoding,
3310 metric_outcomes: MetricOutcomes,
3312}
3313
3314impl SendMetricsRequest {
3315 fn create_error_outcomes(self) {
3316 #[derive(serde::Deserialize)]
3317 struct Wrapper {
3318 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3319 }
3320
3321 let buckets = match serde_json::from_slice(&self.unencoded) {
3322 Ok(Wrapper { buckets }) => buckets,
3323 Err(err) => {
3324 relay_log::error!(
3325 error = &err as &dyn std::error::Error,
3326 "failed to parse buckets from failed transmission"
3327 );
3328 return;
3329 }
3330 };
3331
3332 for (key, buckets) in buckets {
3333 let Some(&scoping) = self.project_info.get(&key) else {
3334 relay_log::error!("missing scoping for project key");
3335 continue;
3336 };
3337
3338 self.metric_outcomes.track(
3339 scoping,
3340 &buckets,
3341 Outcome::Invalid(DiscardReason::Internal),
3342 );
3343 }
3344 }
3345}
3346
3347impl UpstreamRequest for SendMetricsRequest {
3348 fn set_relay_id(&self) -> bool {
3349 true
3350 }
3351
3352 fn sign(&mut self) -> Option<Sign> {
3353 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3354 }
3355
3356 fn method(&self) -> reqwest::Method {
3357 reqwest::Method::POST
3358 }
3359
3360 fn path(&self) -> Cow<'_, str> {
3361 "/api/0/relays/metrics/".into()
3362 }
3363
3364 fn route(&self) -> &'static str {
3365 "global_metrics"
3366 }
3367
3368 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3369 metric!(
3370 distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3371 );
3372
3373 builder
3374 .content_encoding(self.http_encoding)
3375 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3376 .header(header::CONTENT_TYPE, b"application/json")
3377 .body(self.encoded.clone());
3378
3379 Ok(())
3380 }
3381
3382 fn respond(
3383 self: Box<Self>,
3384 result: Result<http::Response, UpstreamRequestError>,
3385 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3386 Box::pin(async {
3387 match result {
3388 Ok(mut response) => {
3389 response.consume().await.ok();
3390 }
3391 Err(error) => {
3392 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3393
3394 if error.is_received() {
3397 return;
3398 }
3399
3400 self.create_error_outcomes()
3401 }
3402 }
3403 })
3404 }
3405}
3406
3407#[derive(Copy, Clone, Debug)]
3409struct CombinedQuotas<'a> {
3410 global_quotas: &'a [Quota],
3411 project_quotas: &'a [Quota],
3412}
3413
3414impl<'a> CombinedQuotas<'a> {
3415 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3417 Self {
3418 global_quotas: &global_config.quotas,
3419 project_quotas,
3420 }
3421 }
3422
3423 pub fn is_empty(&self) -> bool {
3425 self.len() == 0
3426 }
3427
3428 pub fn len(&self) -> usize {
3430 self.global_quotas.len() + self.project_quotas.len()
3431 }
3432}
3433
3434impl<'a> IntoIterator for CombinedQuotas<'a> {
3435 type Item = &'a Quota;
3436 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3437
3438 fn into_iter(self) -> Self::IntoIter {
3439 self.global_quotas.iter().chain(self.project_quotas.iter())
3440 }
3441}
3442
3443#[cfg(test)]
3444mod tests {
3445 use std::collections::BTreeMap;
3446
3447 use insta::assert_debug_snapshot;
3448 use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3449 use relay_common::glob2::LazyGlob;
3450 use relay_dynamic_config::ProjectConfig;
3451 use relay_event_normalization::{
3452 MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3453 TransactionNameRule,
3454 };
3455 use relay_event_schema::protocol::TransactionSource;
3456 use relay_pii::DataScrubbingConfig;
3457 use similar_asserts::assert_eq;
3458
3459 use crate::metrics_extraction::IntoMetric;
3460 use crate::metrics_extraction::transactions::types::{
3461 CommonTags, TransactionMeasurementTags, TransactionMetric,
3462 };
3463 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3464
3465 #[cfg(feature = "processing")]
3466 use {
3467 relay_metrics::BucketValue,
3468 relay_quotas::{QuotaScope, ReasonCode},
3469 relay_test::mock_service,
3470 };
3471
3472 use super::*;
3473
3474 #[cfg(feature = "processing")]
3475 fn mock_quota(id: &str) -> Quota {
3476 Quota {
3477 id: Some(id.into()),
3478 categories: [DataCategory::MetricBucket].into(),
3479 scope: QuotaScope::Organization,
3480 scope_id: None,
3481 limit: Some(0),
3482 window: None,
3483 reason_code: None,
3484 namespace: None,
3485 }
3486 }
3487
3488 #[cfg(feature = "processing")]
3489 #[test]
3490 fn test_dynamic_quotas() {
3491 let global_config = GlobalConfig {
3492 quotas: vec![mock_quota("foo"), mock_quota("bar")],
3493 ..Default::default()
3494 };
3495
3496 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3497
3498 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3499
3500 assert_eq!(dynamic_quotas.len(), 4);
3501 assert!(!dynamic_quotas.is_empty());
3502
3503 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3504 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3505 }
3506
3507 #[cfg(feature = "processing")]
3510 #[tokio::test]
3511 async fn test_ratelimit_per_batch() {
3512 use relay_base_schema::organization::OrganizationId;
3513 use relay_protocol::FiniteF64;
3514
3515 let rate_limited_org = Scoping {
3516 organization_id: OrganizationId::new(1),
3517 project_id: ProjectId::new(21),
3518 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3519 key_id: Some(17),
3520 };
3521
3522 let not_rate_limited_org = Scoping {
3523 organization_id: OrganizationId::new(2),
3524 project_id: ProjectId::new(21),
3525 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3526 key_id: Some(17),
3527 };
3528
3529 let message = {
3530 let project_info = {
3531 let quota = Quota {
3532 id: Some("testing".into()),
3533 categories: [DataCategory::MetricBucket].into(),
3534 scope: relay_quotas::QuotaScope::Organization,
3535 scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3536 limit: Some(0),
3537 window: None,
3538 reason_code: Some(ReasonCode::new("test")),
3539 namespace: None,
3540 };
3541
3542 let mut config = ProjectConfig::default();
3543 config.quotas.push(quota);
3544
3545 Arc::new(ProjectInfo {
3546 config,
3547 ..Default::default()
3548 })
3549 };
3550
3551 let project_metrics = |scoping| ProjectBuckets {
3552 buckets: vec![Bucket {
3553 name: "d:transactions/bar".into(),
3554 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3555 timestamp: UnixTimestamp::now(),
3556 tags: Default::default(),
3557 width: 10,
3558 metadata: BucketMetadata::default(),
3559 }],
3560 rate_limits: Default::default(),
3561 project_info: project_info.clone(),
3562 scoping,
3563 };
3564
3565 let buckets = hashbrown::HashMap::from([
3566 (
3567 rate_limited_org.project_key,
3568 project_metrics(rate_limited_org),
3569 ),
3570 (
3571 not_rate_limited_org.project_key,
3572 project_metrics(not_rate_limited_org),
3573 ),
3574 ]);
3575
3576 FlushBuckets {
3577 partition_key: 0,
3578 buckets,
3579 }
3580 };
3581
3582 assert_eq!(message.buckets.keys().count(), 2);
3584
3585 let config = {
3586 let config_json = serde_json::json!({
3587 "processing": {
3588 "enabled": true,
3589 "kafka_config": [],
3590 "redis": {
3591 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3592 }
3593 }
3594 });
3595 Config::from_json_value(config_json).unwrap()
3596 };
3597
3598 let (store, handle) = {
3599 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3600 let org_id = match msg {
3601 Store::Metrics(x) => x.scoping.organization_id,
3602 _ => panic!("received envelope when expecting only metrics"),
3603 };
3604 org_ids.push(org_id);
3605 };
3606
3607 mock_service("store_forwarder", vec![], f)
3608 };
3609
3610 let processor = create_test_processor(config).await;
3611 assert!(processor.redis_rate_limiter_enabled());
3612
3613 processor.encode_metrics_processing(message, &store).await;
3614
3615 drop(store);
3616 let orgs_not_ratelimited = handle.await.unwrap();
3617
3618 assert_eq!(
3619 orgs_not_ratelimited,
3620 vec![not_rate_limited_org.organization_id]
3621 );
3622 }
3623
3624 #[tokio::test]
3625 async fn test_browser_version_extraction_with_pii_like_data() {
3626 let processor = create_test_processor(Default::default()).await;
3627 let outcome_aggregator = Addr::dummy();
3628 let event_id = EventId::new();
3629
3630 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3631 .parse()
3632 .unwrap();
3633
3634 let request_meta = RequestMeta::new(dsn);
3635 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3636
3637 envelope.add_item({
3638 let mut item = Item::new(ItemType::Event);
3639 item.set_payload(
3640 ContentType::Json,
3641 r#"
3642 {
3643 "request": {
3644 "headers": [
3645 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3646 ]
3647 }
3648 }
3649 "#,
3650 );
3651 item
3652 });
3653
3654 let mut datascrubbing_settings = DataScrubbingConfig::default();
3655 datascrubbing_settings.scrub_data = true;
3657 datascrubbing_settings.scrub_defaults = true;
3658 datascrubbing_settings.scrub_ip_addresses = true;
3659
3660 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3662
3663 let config = ProjectConfig {
3664 datascrubbing_settings,
3665 pii_config: Some(pii_config),
3666 ..Default::default()
3667 };
3668
3669 let project_info = ProjectInfo {
3670 config,
3671 ..Default::default()
3672 };
3673
3674 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3675 assert_eq!(envelopes.len(), 1);
3676
3677 let (group, envelope) = envelopes.pop().unwrap();
3678 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3679
3680 let message = ProcessEnvelopeGrouped {
3681 group,
3682 envelope,
3683 ctx: processing::Context {
3684 project_info: &project_info,
3685 ..processing::Context::for_test()
3686 },
3687 };
3688
3689 let Ok(Some(Submit::Envelope(mut new_envelope))) =
3690 processor.process(&mut Token::noop(), message).await
3691 else {
3692 panic!();
3693 };
3694 let new_envelope = new_envelope.envelope_mut();
3695
3696 let event_item = new_envelope.items().last().unwrap();
3697 let annotated_event: Annotated<Event> =
3698 Annotated::from_json_bytes(&event_item.payload()).unwrap();
3699 let event = annotated_event.into_value().unwrap();
3700 let headers = event
3701 .request
3702 .into_value()
3703 .unwrap()
3704 .headers
3705 .into_value()
3706 .unwrap();
3707
3708 assert_eq!(
3710 Some(
3711 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3712 ),
3713 headers.get_header("User-Agent")
3714 );
3715 let contexts = event.contexts.into_value().unwrap();
3717 let browser = contexts.0.get("browser").unwrap();
3718 assert_eq!(
3719 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3720 browser.to_json().unwrap()
3721 );
3722 }
3723
3724 #[tokio::test]
3725 #[cfg(feature = "processing")]
3726 async fn test_materialize_dsc() {
3727 use crate::services::projects::project::PublicKeyConfig;
3728
3729 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3730 .parse()
3731 .unwrap();
3732 let request_meta = RequestMeta::new(dsn);
3733 let mut envelope = Envelope::from_request(None, request_meta);
3734
3735 let dsc = r#"{
3736 "trace_id": "00000000-0000-0000-0000-000000000000",
3737 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3738 "sample_rate": "0.2"
3739 }"#;
3740 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3741
3742 let mut item = Item::new(ItemType::Event);
3743 item.set_payload(ContentType::Json, r#"{}"#);
3744 envelope.add_item(item);
3745
3746 let outcome_aggregator = Addr::dummy();
3747 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3748
3749 let mut project_info = ProjectInfo::default();
3750 project_info.public_keys.push(PublicKeyConfig {
3751 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3752 numeric_id: Some(1),
3753 });
3754
3755 let config = serde_json::json!({
3756 "processing": {
3757 "enabled": true,
3758 "kafka_config": [],
3759 }
3760 });
3761
3762 let message = ProcessEnvelopeGrouped {
3763 group: ProcessingGroup::Transaction,
3764 envelope: managed_envelope,
3765 ctx: processing::Context {
3766 config: &Config::from_json_value(config.clone()).unwrap(),
3767 project_info: &project_info,
3768 sampling_project_info: Some(&project_info),
3769 ..processing::Context::for_test()
3770 },
3771 };
3772
3773 let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3774 let Ok(Some(Submit::Envelope(envelope))) =
3775 processor.process(&mut Token::noop(), message).await
3776 else {
3777 panic!();
3778 };
3779 let event = envelope
3780 .envelope()
3781 .get_item_by(|item| item.ty() == &ItemType::Event)
3782 .unwrap();
3783
3784 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3785 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3786 Object(
3787 {
3788 "environment": ~,
3789 "public_key": String(
3790 "e12d836b15bb49d7bbf99e64295d995b",
3791 ),
3792 "release": ~,
3793 "replay_id": ~,
3794 "sample_rate": String(
3795 "0.2",
3796 ),
3797 "trace_id": String(
3798 "00000000000000000000000000000000",
3799 ),
3800 "transaction": ~,
3801 },
3802 )
3803 "###);
3804 }
3805
3806 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3807 let mut event = Annotated::<Event>::from_json(
3808 r#"
3809 {
3810 "type": "transaction",
3811 "transaction": "/foo/",
3812 "timestamp": 946684810.0,
3813 "start_timestamp": 946684800.0,
3814 "contexts": {
3815 "trace": {
3816 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3817 "span_id": "fa90fdead5f74053",
3818 "op": "http.server",
3819 "type": "trace"
3820 }
3821 },
3822 "transaction_info": {
3823 "source": "url"
3824 }
3825 }
3826 "#,
3827 )
3828 .unwrap();
3829 let e = event.value_mut().as_mut().unwrap();
3830 e.transaction.set_value(Some(transaction_name.into()));
3831
3832 e.transaction_info
3833 .value_mut()
3834 .as_mut()
3835 .unwrap()
3836 .source
3837 .set_value(Some(source));
3838
3839 relay_statsd::with_capturing_test_client(|| {
3840 utils::log_transaction_name_metrics(&mut event, |event| {
3841 let config = NormalizationConfig {
3842 transaction_name_config: TransactionNameConfig {
3843 rules: &[TransactionNameRule {
3844 pattern: LazyGlob::new("/foo/*/**".to_owned()),
3845 expiry: DateTime::<Utc>::MAX_UTC,
3846 redaction: RedactionRule::Replace {
3847 substitution: "*".to_owned(),
3848 },
3849 }],
3850 },
3851 ..Default::default()
3852 };
3853 relay_event_normalization::normalize_event(event, &config)
3854 });
3855 })
3856 }
3857
3858 #[test]
3859 fn test_log_transaction_metrics_none() {
3860 let captures = capture_test_event("/nothing", TransactionSource::Url);
3861 insta::assert_debug_snapshot!(captures, @r###"
3862 [
3863 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3864 ]
3865 "###);
3866 }
3867
3868 #[test]
3869 fn test_log_transaction_metrics_rule() {
3870 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3871 insta::assert_debug_snapshot!(captures, @r###"
3872 [
3873 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3874 ]
3875 "###);
3876 }
3877
3878 #[test]
3879 fn test_log_transaction_metrics_pattern() {
3880 let captures = capture_test_event("/something/12345", TransactionSource::Url);
3881 insta::assert_debug_snapshot!(captures, @r###"
3882 [
3883 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3884 ]
3885 "###);
3886 }
3887
3888 #[test]
3889 fn test_log_transaction_metrics_both() {
3890 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3891 insta::assert_debug_snapshot!(captures, @r###"
3892 [
3893 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3894 ]
3895 "###);
3896 }
3897
3898 #[test]
3899 fn test_log_transaction_metrics_no_match() {
3900 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3901 insta::assert_debug_snapshot!(captures, @r###"
3902 [
3903 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3904 ]
3905 "###);
3906 }
3907
3908 #[test]
3912 fn test_mri_overhead_constant() {
3913 let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
3914
3915 let derived_value = {
3916 let name = "foobar".to_owned();
3917 let value = 5.into(); let unit = MetricUnit::Duration(DurationUnit::default());
3919 let tags = TransactionMeasurementTags {
3920 measurement_rating: None,
3921 universal_tags: CommonTags(BTreeMap::new()),
3922 score_profile_version: None,
3923 };
3924
3925 let measurement = TransactionMetric::Measurement {
3926 name: name.clone(),
3927 value,
3928 unit,
3929 tags,
3930 };
3931
3932 let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
3933 metric.name.len() - unit.to_string().len() - name.len()
3934 };
3935 assert_eq!(
3936 hardcoded_value, derived_value,
3937 "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
3938 );
3939 }
3940
3941 #[tokio::test]
3942 async fn test_process_metrics_bucket_metadata() {
3943 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3944 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3945 let received_at = Utc::now();
3946 let config = Config::default();
3947
3948 let (aggregator, mut aggregator_rx) = Addr::custom();
3949 let processor = create_test_processor_with_addrs(
3950 config,
3951 Addrs {
3952 aggregator,
3953 ..Default::default()
3954 },
3955 )
3956 .await;
3957
3958 let mut item = Item::new(ItemType::Statsd);
3959 item.set_payload(
3960 ContentType::Text,
3961 "transactions/foo:3182887624:4267882815|s",
3962 );
3963 for (source, expected_received_at) in [
3964 (
3965 BucketSource::External,
3966 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3967 ),
3968 (BucketSource::Internal, None),
3969 ] {
3970 let message = ProcessMetrics {
3971 data: MetricData::Raw(vec![item.clone()]),
3972 project_key,
3973 source,
3974 received_at,
3975 sent_at: Some(Utc::now()),
3976 };
3977 processor.handle_process_metrics(&mut token, message);
3978
3979 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3980 let buckets = merge_buckets.buckets;
3981 assert_eq!(buckets.len(), 1);
3982 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3983 }
3984 }
3985
3986 #[tokio::test]
3987 async fn test_process_batched_metrics() {
3988 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3989 let received_at = Utc::now();
3990 let config = Config::default();
3991
3992 let (aggregator, mut aggregator_rx) = Addr::custom();
3993 let processor = create_test_processor_with_addrs(
3994 config,
3995 Addrs {
3996 aggregator,
3997 ..Default::default()
3998 },
3999 )
4000 .await;
4001
4002 let payload = r#"{
4003 "buckets": {
4004 "11111111111111111111111111111111": [
4005 {
4006 "timestamp": 1615889440,
4007 "width": 0,
4008 "name": "d:custom/endpoint.response_time@millisecond",
4009 "type": "d",
4010 "value": [
4011 68.0
4012 ],
4013 "tags": {
4014 "route": "user_index"
4015 }
4016 }
4017 ],
4018 "22222222222222222222222222222222": [
4019 {
4020 "timestamp": 1615889440,
4021 "width": 0,
4022 "name": "d:custom/endpoint.cache_rate@none",
4023 "type": "d",
4024 "value": [
4025 36.0
4026 ]
4027 }
4028 ]
4029 }
4030}
4031"#;
4032 let message = ProcessBatchedMetrics {
4033 payload: Bytes::from(payload),
4034 source: BucketSource::Internal,
4035 received_at,
4036 sent_at: Some(Utc::now()),
4037 };
4038 processor.handle_process_batched_metrics(&mut token, message);
4039
4040 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
4041 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
4042
4043 let mut messages = vec![mb1, mb2];
4044 messages.sort_by_key(|mb| mb.project_key);
4045
4046 let actual = messages
4047 .into_iter()
4048 .map(|mb| (mb.project_key, mb.buckets))
4049 .collect::<Vec<_>>();
4050
4051 assert_debug_snapshot!(actual, @r###"
4052 [
4053 (
4054 ProjectKey("11111111111111111111111111111111"),
4055 [
4056 Bucket {
4057 timestamp: UnixTimestamp(1615889440),
4058 width: 0,
4059 name: MetricName(
4060 "d:custom/endpoint.response_time@millisecond",
4061 ),
4062 value: Distribution(
4063 [
4064 68.0,
4065 ],
4066 ),
4067 tags: {
4068 "route": "user_index",
4069 },
4070 metadata: BucketMetadata {
4071 merges: 1,
4072 received_at: None,
4073 extracted_from_indexed: false,
4074 },
4075 },
4076 ],
4077 ),
4078 (
4079 ProjectKey("22222222222222222222222222222222"),
4080 [
4081 Bucket {
4082 timestamp: UnixTimestamp(1615889440),
4083 width: 0,
4084 name: MetricName(
4085 "d:custom/endpoint.cache_rate@none",
4086 ),
4087 value: Distribution(
4088 [
4089 36.0,
4090 ],
4091 ),
4092 tags: {},
4093 metadata: BucketMetadata {
4094 merges: 1,
4095 received_at: None,
4096 extracted_from_indexed: false,
4097 },
4098 },
4099 ],
4100 ),
4101 ]
4102 "###);
4103 }
4104}