1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27 ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42 self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::{Integration, SpansIntegration};
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::ExtractedMetrics;
49use crate::metrics_extraction::transactions::types::ExtractMetricsError;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::profile_chunks::ProfileChunksProcessor;
53use crate::processing::sessions::SessionsProcessor;
54use crate::processing::spans::SpansProcessor;
55use crate::processing::trace_attachments::TraceAttachmentsProcessor;
56use crate::processing::trace_metrics::TraceMetricsProcessor;
57use crate::processing::transactions::extraction::ExtractMetricsContext;
58use crate::processing::utils::event::{
59 EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
60 event_type,
61};
62use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
63use crate::service::ServiceError;
64use crate::services::global_config::GlobalConfigHandle;
65use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
66use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
67use crate::services::projects::cache::ProjectCacheHandle;
68use crate::services::projects::project::{ProjectInfo, ProjectState};
69use crate::services::upstream::{
70 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
71};
72use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
73use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
74use crate::{http, processing};
75use relay_threading::AsyncPool;
76#[cfg(feature = "processing")]
77use {
78 crate::services::global_rate_limits::{GlobalRateLimits, GlobalRateLimitsServiceHandle},
79 crate::services::processor::nnswitch::SwitchProcessingError,
80 crate::services::store::{Store, StoreEnvelope},
81 crate::services::upload::Upload,
82 crate::utils::Enforcement,
83 itertools::Itertools,
84 relay_cardinality::{
85 CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
86 RedisSetLimiterOptions,
87 },
88 relay_dynamic_config::CardinalityLimiterMode,
89 relay_quotas::{RateLimitingError, RedisRateLimiter},
90 relay_redis::{AsyncRedisClient, RedisClients},
91 std::time::Instant,
92 symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
93};
94
95mod attachment;
96mod dynamic_sampling;
97mod event;
98mod metrics;
99mod nel;
100mod profile;
101mod replay;
102mod report;
103mod span;
104
105#[cfg(all(sentry, feature = "processing"))]
106mod playstation;
107mod standalone;
108#[cfg(feature = "processing")]
109mod unreal;
110
111#[cfg(feature = "processing")]
112mod nnswitch;
113
114macro_rules! if_processing {
118 ($config:expr, $if_true:block) => {
119 #[cfg(feature = "processing")] {
120 if $config.processing_enabled() $if_true
121 }
122 };
123 ($config:expr, $if_true:block else $if_false:block) => {
124 {
125 #[cfg(feature = "processing")] {
126 if $config.processing_enabled() $if_true else $if_false
127 }
128 #[cfg(not(feature = "processing"))] {
129 $if_false
130 }
131 }
132 };
133}
134
135pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
137
138#[derive(Debug)]
139pub struct GroupTypeError;
140
141impl Display for GroupTypeError {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 f.write_str("failed to convert processing group into corresponding type")
144 }
145}
146
147impl std::error::Error for GroupTypeError {}
148
149macro_rules! processing_group {
150 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
151 #[derive(Clone, Copy, Debug)]
152 pub struct $ty;
153
154 impl From<$ty> for ProcessingGroup {
155 fn from(_: $ty) -> Self {
156 ProcessingGroup::$variant
157 }
158 }
159
160 impl TryFrom<ProcessingGroup> for $ty {
161 type Error = GroupTypeError;
162
163 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
164 if matches!(value, ProcessingGroup::$variant) {
165 return Ok($ty);
166 }
167 $($(
168 if matches!(value, ProcessingGroup::$other) {
169 return Ok($ty);
170 }
171 )+)?
172 return Err(GroupTypeError);
173 }
174 }
175 };
176}
177
178pub trait EventProcessing {}
182
183processing_group!(TransactionGroup, Transaction);
184impl EventProcessing for TransactionGroup {}
185
186processing_group!(ErrorGroup, Error);
187impl EventProcessing for ErrorGroup {}
188
189processing_group!(SessionGroup, Session);
190processing_group!(StandaloneGroup, Standalone);
191processing_group!(ClientReportGroup, ClientReport);
192processing_group!(ReplayGroup, Replay);
193processing_group!(CheckInGroup, CheckIn);
194processing_group!(LogGroup, Log, Nel);
195processing_group!(TraceMetricGroup, TraceMetric);
196processing_group!(SpanGroup, Span);
197
198processing_group!(ProfileChunkGroup, ProfileChunk);
199processing_group!(MetricsGroup, Metrics);
200processing_group!(ForwardUnknownGroup, ForwardUnknown);
201processing_group!(Ungrouped, Ungrouped);
202
203#[derive(Clone, Copy, Debug)]
207pub struct Processed;
208
209#[derive(Clone, Copy, Debug)]
211pub enum ProcessingGroup {
212 Transaction,
216 Error,
221 Session,
223 Standalone,
226 ClientReport,
228 Replay,
230 CheckIn,
232 Nel,
234 Log,
236 TraceMetric,
238 Span,
240 SpanV2,
242 Metrics,
244 ProfileChunk,
246 TraceAttachment,
248 ForwardUnknown,
251 Ungrouped,
253}
254
255impl ProcessingGroup {
256 fn split_envelope(
258 mut envelope: Envelope,
259 project_info: &ProjectInfo,
260 ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
261 let headers = envelope.headers().clone();
262 let mut grouped_envelopes = smallvec![];
263
264 let replay_items = envelope.take_items_by(|item| {
266 matches!(
267 item.ty(),
268 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
269 )
270 });
271 if !replay_items.is_empty() {
272 grouped_envelopes.push((
273 ProcessingGroup::Replay,
274 Envelope::from_parts(headers.clone(), replay_items),
275 ))
276 }
277
278 let session_items = envelope
280 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
281 if !session_items.is_empty() {
282 grouped_envelopes.push((
283 ProcessingGroup::Session,
284 Envelope::from_parts(headers.clone(), session_items),
285 ))
286 }
287
288 let span_v2_items = envelope.take_items_by(|item| {
289 let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
290 let otlp_feature = project_info.has_feature(Feature::SpanV2OtlpProcessing);
291 let is_supported_integration = {
292 matches!(
293 item.integration(),
294 Some(Integration::Spans(SpansIntegration::OtelV1 { .. }))
295 )
296 };
297 let is_span = matches!(item.ty(), &ItemType::Span);
298 let is_span_attachment = item.is_span_attachment();
299
300 ItemContainer::<SpanV2>::is_container(item)
301 || (exp_feature && is_span)
302 || ((exp_feature || otlp_feature) && is_supported_integration)
303 || (exp_feature && is_span_attachment)
304 });
305
306 if !span_v2_items.is_empty() {
307 grouped_envelopes.push((
308 ProcessingGroup::SpanV2,
309 Envelope::from_parts(headers.clone(), span_v2_items),
310 ))
311 }
312
313 let span_items = envelope.take_items_by(|item| {
315 matches!(item.ty(), &ItemType::Span)
316 || matches!(item.integration(), Some(Integration::Spans(_)))
317 });
318
319 if !span_items.is_empty() {
320 grouped_envelopes.push((
321 ProcessingGroup::Span,
322 Envelope::from_parts(headers.clone(), span_items),
323 ))
324 }
325
326 let logs_items = envelope.take_items_by(|item| {
328 matches!(item.ty(), &ItemType::Log)
329 || matches!(item.integration(), Some(Integration::Logs(_)))
330 });
331 if !logs_items.is_empty() {
332 grouped_envelopes.push((
333 ProcessingGroup::Log,
334 Envelope::from_parts(headers.clone(), logs_items),
335 ))
336 }
337
338 let trace_metric_items =
340 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
341 if !trace_metric_items.is_empty() {
342 grouped_envelopes.push((
343 ProcessingGroup::TraceMetric,
344 Envelope::from_parts(headers.clone(), trace_metric_items),
345 ))
346 }
347
348 let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
350 if !nel_items.is_empty() {
351 grouped_envelopes.push((
352 ProcessingGroup::Nel,
353 Envelope::from_parts(headers.clone(), nel_items),
354 ))
355 }
356
357 let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
362 if !metric_items.is_empty() {
363 grouped_envelopes.push((
364 ProcessingGroup::Metrics,
365 Envelope::from_parts(headers.clone(), metric_items),
366 ))
367 }
368
369 let profile_chunk_items =
371 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
372 if !profile_chunk_items.is_empty() {
373 grouped_envelopes.push((
374 ProcessingGroup::ProfileChunk,
375 Envelope::from_parts(headers.clone(), profile_chunk_items),
376 ))
377 }
378
379 let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
380 if !trace_attachment_items.is_empty() {
381 grouped_envelopes.push((
382 ProcessingGroup::TraceAttachment,
383 Envelope::from_parts(headers.clone(), trace_attachment_items),
384 ))
385 }
386
387 if !envelope.items().any(Item::creates_event) {
392 let standalone_items = envelope.take_items_by(Item::requires_event);
393 if !standalone_items.is_empty() {
394 grouped_envelopes.push((
395 ProcessingGroup::Standalone,
396 Envelope::from_parts(headers.clone(), standalone_items),
397 ))
398 }
399 };
400
401 let security_reports_items = envelope
403 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
404 .into_iter()
405 .map(|item| {
406 let headers = headers.clone();
407 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
408 let mut envelope = Envelope::from_parts(headers, items);
409 envelope.set_event_id(EventId::new());
410 (ProcessingGroup::Error, envelope)
411 });
412 grouped_envelopes.extend(security_reports_items);
413
414 let require_event_items = envelope.take_items_by(Item::requires_event);
416 if !require_event_items.is_empty() {
417 let group = if require_event_items
418 .iter()
419 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
420 {
421 ProcessingGroup::Transaction
422 } else {
423 ProcessingGroup::Error
424 };
425
426 grouped_envelopes.push((
427 group,
428 Envelope::from_parts(headers.clone(), require_event_items),
429 ))
430 }
431
432 let envelopes = envelope.items_mut().map(|item| {
434 let headers = headers.clone();
435 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
436 let envelope = Envelope::from_parts(headers, items);
437 let item_type = item.ty();
438 let group = if matches!(item_type, &ItemType::CheckIn) {
439 ProcessingGroup::CheckIn
440 } else if matches!(item.ty(), &ItemType::ClientReport) {
441 ProcessingGroup::ClientReport
442 } else if matches!(item_type, &ItemType::Unknown(_)) {
443 ProcessingGroup::ForwardUnknown
444 } else {
445 ProcessingGroup::Ungrouped
447 };
448
449 (group, envelope)
450 });
451 grouped_envelopes.extend(envelopes);
452
453 grouped_envelopes
454 }
455
456 pub fn variant(&self) -> &'static str {
458 match self {
459 ProcessingGroup::Transaction => "transaction",
460 ProcessingGroup::Error => "error",
461 ProcessingGroup::Session => "session",
462 ProcessingGroup::Standalone => "standalone",
463 ProcessingGroup::ClientReport => "client_report",
464 ProcessingGroup::Replay => "replay",
465 ProcessingGroup::CheckIn => "check_in",
466 ProcessingGroup::Log => "log",
467 ProcessingGroup::TraceMetric => "trace_metric",
468 ProcessingGroup::Nel => "nel",
469 ProcessingGroup::Span => "span",
470 ProcessingGroup::SpanV2 => "span_v2",
471 ProcessingGroup::Metrics => "metrics",
472 ProcessingGroup::ProfileChunk => "profile_chunk",
473 ProcessingGroup::TraceAttachment => "trace_attachment",
474 ProcessingGroup::ForwardUnknown => "forward_unknown",
475 ProcessingGroup::Ungrouped => "ungrouped",
476 }
477 }
478}
479
480impl From<ProcessingGroup> for AppFeature {
481 fn from(value: ProcessingGroup) -> Self {
482 match value {
483 ProcessingGroup::Transaction => AppFeature::Transactions,
484 ProcessingGroup::Error => AppFeature::Errors,
485 ProcessingGroup::Session => AppFeature::Sessions,
486 ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
487 ProcessingGroup::ClientReport => AppFeature::ClientReports,
488 ProcessingGroup::Replay => AppFeature::Replays,
489 ProcessingGroup::CheckIn => AppFeature::CheckIns,
490 ProcessingGroup::Log => AppFeature::Logs,
491 ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
492 ProcessingGroup::Nel => AppFeature::Logs,
493 ProcessingGroup::Span => AppFeature::Spans,
494 ProcessingGroup::SpanV2 => AppFeature::Spans,
495 ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
496 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
497 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
498 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
499 ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
500 }
501 }
502}
503
504#[derive(Debug, thiserror::Error)]
506pub enum ProcessingError {
507 #[error("invalid json in event")]
508 InvalidJson(#[source] serde_json::Error),
509
510 #[error("invalid message pack event payload")]
511 InvalidMsgpack(#[from] rmp_serde::decode::Error),
512
513 #[cfg(feature = "processing")]
514 #[error("invalid unreal crash report")]
515 InvalidUnrealReport(#[source] Unreal4Error),
516
517 #[error("event payload too large")]
518 PayloadTooLarge(DiscardItemType),
519
520 #[error("invalid transaction event")]
521 InvalidTransaction,
522
523 #[error("envelope processor failed")]
524 ProcessingFailed(#[from] ProcessingAction),
525
526 #[error("duplicate {0} in event")]
527 DuplicateItem(ItemType),
528
529 #[error("failed to extract event payload")]
530 NoEventPayload,
531
532 #[error("missing project id in DSN")]
533 MissingProjectId,
534
535 #[error("invalid security report type: {0:?}")]
536 InvalidSecurityType(Bytes),
537
538 #[error("unsupported security report type")]
539 UnsupportedSecurityType,
540
541 #[error("invalid security report")]
542 InvalidSecurityReport(#[source] serde_json::Error),
543
544 #[error("invalid nel report")]
545 InvalidNelReport(#[source] NetworkReportError),
546
547 #[error("event filtered with reason: {0:?}")]
548 EventFiltered(FilterStatKey),
549
550 #[error("missing or invalid required event timestamp")]
551 InvalidTimestamp,
552
553 #[error("could not serialize event payload")]
554 SerializeFailed(#[source] serde_json::Error),
555
556 #[cfg(feature = "processing")]
557 #[error("failed to apply quotas")]
558 QuotasFailed(#[from] RateLimitingError),
559
560 #[error("invalid pii config")]
561 PiiConfigError(PiiConfigError),
562
563 #[error("invalid processing group type")]
564 InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
565
566 #[error("invalid replay")]
567 InvalidReplay(DiscardReason),
568
569 #[error("replay filtered with reason: {0:?}")]
570 ReplayFiltered(FilterStatKey),
571
572 #[cfg(feature = "processing")]
573 #[error("nintendo switch dying message processing failed {0:?}")]
574 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
575
576 #[cfg(all(sentry, feature = "processing"))]
577 #[error("playstation dump processing failed: {0}")]
578 InvalidPlaystationDump(String),
579
580 #[error("processing group does not match specific processor")]
581 ProcessingGroupMismatch,
582 #[error("new processing pipeline failed")]
583 ProcessingFailure,
584}
585
586impl ProcessingError {
587 pub fn to_outcome(&self) -> Option<Outcome> {
588 match self {
589 Self::PayloadTooLarge(payload_type) => {
590 Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
591 }
592 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
593 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
594 Self::InvalidSecurityType(_) => {
595 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
596 }
597 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
598 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
599 Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
600 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
601 Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
602 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
603 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
604 #[cfg(feature = "processing")]
605 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
606 #[cfg(all(sentry, feature = "processing"))]
607 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
608 #[cfg(feature = "processing")]
609 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
610 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
611 }
612 #[cfg(feature = "processing")]
613 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
614 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
615 Some(Outcome::Invalid(DiscardReason::Internal))
616 }
617 #[cfg(feature = "processing")]
618 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
619 Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
620 Self::MissingProjectId => None,
621 Self::EventFiltered(_) => None,
622 Self::InvalidProcessingGroup(_) => None,
623 Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
624 Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
625
626 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
627 Self::ProcessingFailure => None,
629 }
630 }
631
632 fn is_unexpected(&self) -> bool {
633 self.to_outcome()
634 .is_some_and(|outcome| outcome.is_unexpected())
635 }
636}
637
638#[cfg(feature = "processing")]
639impl From<Unreal4Error> for ProcessingError {
640 fn from(err: Unreal4Error) -> Self {
641 match err.kind() {
642 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
643 _ => ProcessingError::InvalidUnrealReport(err),
644 }
645 }
646}
647
648impl From<ExtractMetricsError> for ProcessingError {
649 fn from(error: ExtractMetricsError) -> Self {
650 match error {
651 ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
652 Self::InvalidTimestamp
653 }
654 }
655 }
656}
657
658impl From<InvalidProcessingGroupType> for ProcessingError {
659 fn from(value: InvalidProcessingGroupType) -> Self {
660 Self::InvalidProcessingGroup(Box::new(value))
661 }
662}
663
664type ExtractedEvent = (Annotated<Event>, usize);
665
666#[derive(Debug)]
671pub struct ProcessingExtractedMetrics {
672 metrics: ExtractedMetrics,
673}
674
675impl ProcessingExtractedMetrics {
676 pub fn new() -> Self {
677 Self {
678 metrics: ExtractedMetrics::default(),
679 }
680 }
681
682 pub fn into_inner(self) -> ExtractedMetrics {
683 self.metrics
684 }
685
686 pub fn extend(
688 &mut self,
689 extracted: ExtractedMetrics,
690 sampling_decision: Option<SamplingDecision>,
691 ) {
692 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
693 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
694 }
695
696 pub fn extend_project_metrics<I>(
698 &mut self,
699 buckets: I,
700 sampling_decision: Option<SamplingDecision>,
701 ) where
702 I: IntoIterator<Item = Bucket>,
703 {
704 self.metrics
705 .project_metrics
706 .extend(buckets.into_iter().map(|mut bucket| {
707 bucket.metadata.extracted_from_indexed =
708 sampling_decision == Some(SamplingDecision::Keep);
709 bucket
710 }));
711 }
712
713 pub fn extend_sampling_metrics<I>(
715 &mut self,
716 buckets: I,
717 sampling_decision: Option<SamplingDecision>,
718 ) where
719 I: IntoIterator<Item = Bucket>,
720 {
721 self.metrics
722 .sampling_metrics
723 .extend(buckets.into_iter().map(|mut bucket| {
724 bucket.metadata.extracted_from_indexed =
725 sampling_decision == Some(SamplingDecision::Keep);
726 bucket
727 }));
728 }
729
730 #[cfg(feature = "processing")]
735 fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
736 let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
738 let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
741
742 for (namespace, limit, indexed) in [
743 (
744 MetricNamespace::Transactions,
745 &enforcement.event,
746 &enforcement.event_indexed,
747 ),
748 (
749 MetricNamespace::Spans,
750 &enforcement.spans,
751 &enforcement.spans_indexed,
752 ),
753 ] {
754 if limit.is_active() {
755 drop_namespaces.push(namespace);
756 } else if indexed.is_active() && !enforced_consistently {
757 reset_extracted_from_indexed.push(namespace);
762 }
763 }
764
765 if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
766 self.retain_mut(|bucket| {
767 let Some(namespace) = bucket.name.try_namespace() else {
768 return true;
769 };
770
771 if drop_namespaces.contains(&namespace) {
772 return false;
773 }
774
775 if reset_extracted_from_indexed.contains(&namespace) {
776 bucket.metadata.extracted_from_indexed = false;
777 }
778
779 true
780 });
781 }
782 }
783
784 #[cfg(feature = "processing")]
785 fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
786 self.metrics.project_metrics.retain_mut(&mut f);
787 self.metrics.sampling_metrics.retain_mut(&mut f);
788 }
789}
790
791fn send_metrics(
792 metrics: ExtractedMetrics,
793 project_key: ProjectKey,
794 sampling_key: Option<ProjectKey>,
795 aggregator: &Addr<Aggregator>,
796) {
797 let ExtractedMetrics {
798 project_metrics,
799 sampling_metrics,
800 } = metrics;
801
802 if !project_metrics.is_empty() {
803 aggregator.send(MergeBuckets {
804 project_key,
805 buckets: project_metrics,
806 });
807 }
808
809 if !sampling_metrics.is_empty() {
810 let sampling_project_key = sampling_key.unwrap_or(project_key);
817 aggregator.send(MergeBuckets {
818 project_key: sampling_project_key,
819 buckets: sampling_metrics,
820 });
821 }
822}
823
824fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
829 match config.relay_mode() {
830 RelayMode::Proxy => false,
831 RelayMode::Managed => !project_info.has_feature(feature),
832 }
833}
834
835#[derive(Debug)]
838#[expect(
839 clippy::large_enum_variant,
840 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
841)]
842enum ProcessingResult {
843 Envelope {
844 managed_envelope: TypedEnvelope<Processed>,
845 extracted_metrics: ProcessingExtractedMetrics,
846 },
847 Output(Output<Outputs>),
848}
849
850impl ProcessingResult {
851 fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
853 Self::Envelope {
854 managed_envelope,
855 extracted_metrics: ProcessingExtractedMetrics::new(),
856 }
857 }
858}
859
860#[derive(Debug)]
862#[expect(
863 clippy::large_enum_variant,
864 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
865)]
866enum Submit<'a> {
867 Envelope(TypedEnvelope<Processed>),
869 Output {
871 output: Outputs,
872 ctx: processing::ForwardContext<'a>,
873 },
874}
875
876#[derive(Debug)]
886pub struct ProcessEnvelope {
887 pub envelope: ManagedEnvelope,
889 pub project_info: Arc<ProjectInfo>,
891 pub rate_limits: Arc<RateLimits>,
893 pub sampling_project_info: Option<Arc<ProjectInfo>>,
895 pub reservoir_counters: ReservoirCounters,
897}
898
899#[derive(Debug)]
901struct ProcessEnvelopeGrouped<'a> {
902 pub group: ProcessingGroup,
904 pub envelope: ManagedEnvelope,
906 pub ctx: processing::Context<'a>,
908}
909
910#[derive(Debug)]
922pub struct ProcessMetrics {
923 pub data: MetricData,
925 pub project_key: ProjectKey,
927 pub source: BucketSource,
929 pub received_at: DateTime<Utc>,
931 pub sent_at: Option<DateTime<Utc>>,
934}
935
936#[derive(Debug)]
938pub enum MetricData {
939 Raw(Vec<Item>),
941 Parsed(Vec<Bucket>),
943}
944
945impl MetricData {
946 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
951 let items = match self {
952 Self::Parsed(buckets) => return buckets,
953 Self::Raw(items) => items,
954 };
955
956 let mut buckets = Vec::new();
957 for item in items {
958 let payload = item.payload();
959 if item.ty() == &ItemType::Statsd {
960 for bucket_result in Bucket::parse_all(&payload, timestamp) {
961 match bucket_result {
962 Ok(bucket) => buckets.push(bucket),
963 Err(error) => relay_log::debug!(
964 error = &error as &dyn Error,
965 "failed to parse metric bucket from statsd format",
966 ),
967 }
968 }
969 } else if item.ty() == &ItemType::MetricBuckets {
970 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
971 Ok(parsed_buckets) => {
972 if buckets.is_empty() {
974 buckets = parsed_buckets;
975 } else {
976 buckets.extend(parsed_buckets);
977 }
978 }
979 Err(error) => {
980 relay_log::debug!(
981 error = &error as &dyn Error,
982 "failed to parse metric bucket",
983 );
984 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
985 }
986 }
987 } else {
988 relay_log::error!(
989 "invalid item of type {} passed to ProcessMetrics",
990 item.ty()
991 );
992 }
993 }
994 buckets
995 }
996}
997
998#[derive(Debug)]
999pub struct ProcessBatchedMetrics {
1000 pub payload: Bytes,
1002 pub source: BucketSource,
1004 pub received_at: DateTime<Utc>,
1006 pub sent_at: Option<DateTime<Utc>>,
1008}
1009
1010#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1012pub enum BucketSource {
1013 Internal,
1019 External,
1024}
1025
1026impl BucketSource {
1027 pub fn from_meta(meta: &RequestMeta) -> Self {
1029 match meta.request_trust() {
1030 RequestTrust::Trusted => Self::Internal,
1031 RequestTrust::Untrusted => Self::External,
1032 }
1033 }
1034}
1035
1036#[derive(Debug)]
1038pub struct SubmitClientReports {
1039 pub client_reports: Vec<ClientReport>,
1041 pub scoping: Scoping,
1043}
1044
1045#[derive(Debug)]
1047pub enum EnvelopeProcessor {
1048 ProcessEnvelope(Box<ProcessEnvelope>),
1049 ProcessProjectMetrics(Box<ProcessMetrics>),
1050 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1051 FlushBuckets(Box<FlushBuckets>),
1052 SubmitClientReports(Box<SubmitClientReports>),
1053}
1054
1055impl EnvelopeProcessor {
1056 pub fn variant(&self) -> &'static str {
1058 match self {
1059 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1060 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1061 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1062 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1063 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1064 }
1065 }
1066}
1067
1068impl relay_system::Interface for EnvelopeProcessor {}
1069
1070impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1071 type Response = relay_system::NoResponse;
1072
1073 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1074 Self::ProcessEnvelope(Box::new(message))
1075 }
1076}
1077
1078impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1079 type Response = NoResponse;
1080
1081 fn from_message(message: ProcessMetrics, _: ()) -> Self {
1082 Self::ProcessProjectMetrics(Box::new(message))
1083 }
1084}
1085
1086impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1087 type Response = NoResponse;
1088
1089 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1090 Self::ProcessBatchedMetrics(Box::new(message))
1091 }
1092}
1093
1094impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1095 type Response = NoResponse;
1096
1097 fn from_message(message: FlushBuckets, _: ()) -> Self {
1098 Self::FlushBuckets(Box::new(message))
1099 }
1100}
1101
1102impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1103 type Response = NoResponse;
1104
1105 fn from_message(message: SubmitClientReports, _: ()) -> Self {
1106 Self::SubmitClientReports(Box::new(message))
1107 }
1108}
1109
1110pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1112
1113#[derive(Clone)]
1117pub struct EnvelopeProcessorService {
1118 inner: Arc<InnerProcessor>,
1119}
1120
1121pub struct Addrs {
1123 pub outcome_aggregator: Addr<TrackOutcome>,
1124 pub upstream_relay: Addr<UpstreamRelay>,
1125 #[cfg(feature = "processing")]
1126 pub upload: Option<Addr<Upload>>,
1127 #[cfg(feature = "processing")]
1128 pub store_forwarder: Option<Addr<Store>>,
1129 pub aggregator: Addr<Aggregator>,
1130 #[cfg(feature = "processing")]
1131 pub global_rate_limits: Option<Addr<GlobalRateLimits>>,
1132}
1133
1134impl Default for Addrs {
1135 fn default() -> Self {
1136 Addrs {
1137 outcome_aggregator: Addr::dummy(),
1138 upstream_relay: Addr::dummy(),
1139 #[cfg(feature = "processing")]
1140 upload: None,
1141 #[cfg(feature = "processing")]
1142 store_forwarder: None,
1143 aggregator: Addr::dummy(),
1144 #[cfg(feature = "processing")]
1145 global_rate_limits: None,
1146 }
1147 }
1148}
1149
1150struct InnerProcessor {
1151 pool: EnvelopeProcessorServicePool,
1152 config: Arc<Config>,
1153 global_config: GlobalConfigHandle,
1154 project_cache: ProjectCacheHandle,
1155 cogs: Cogs,
1156 #[cfg(feature = "processing")]
1157 quotas_client: Option<AsyncRedisClient>,
1158 addrs: Addrs,
1159 #[cfg(feature = "processing")]
1160 rate_limiter: Option<Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>>,
1161 geoip_lookup: GeoIpLookup,
1162 #[cfg(feature = "processing")]
1163 cardinality_limiter: Option<CardinalityLimiter>,
1164 metric_outcomes: MetricOutcomes,
1165 processing: Processing,
1166}
1167
1168struct Processing {
1169 logs: LogsProcessor,
1170 trace_metrics: TraceMetricsProcessor,
1171 spans: SpansProcessor,
1172 check_ins: CheckInsProcessor,
1173 sessions: SessionsProcessor,
1174 profile_chunks: ProfileChunksProcessor,
1175 trace_attachments: TraceAttachmentsProcessor,
1176}
1177
1178impl EnvelopeProcessorService {
1179 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1181 pub fn new(
1182 pool: EnvelopeProcessorServicePool,
1183 config: Arc<Config>,
1184 global_config: GlobalConfigHandle,
1185 project_cache: ProjectCacheHandle,
1186 cogs: Cogs,
1187 #[cfg(feature = "processing")] redis: Option<RedisClients>,
1188 addrs: Addrs,
1189 metric_outcomes: MetricOutcomes,
1190 ) -> Self {
1191 let geoip_lookup = config
1192 .geoip_path()
1193 .and_then(
1194 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1195 Ok(geoip) => Some(geoip),
1196 Err(err) => {
1197 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1198 None
1199 }
1200 },
1201 )
1202 .unwrap_or_else(GeoIpLookup::empty);
1203
1204 #[cfg(feature = "processing")]
1205 let (cardinality, quotas) = match redis {
1206 Some(RedisClients {
1207 cardinality,
1208 quotas,
1209 ..
1210 }) => (Some(cardinality), Some(quotas)),
1211 None => (None, None),
1212 };
1213
1214 #[cfg(feature = "processing")]
1215 let global_rate_limits = addrs.global_rate_limits.clone().map(Into::into);
1216
1217 #[cfg(feature = "processing")]
1218 let rate_limiter = match (quotas.clone(), global_rate_limits) {
1219 (Some(redis), Some(global)) => Some(
1220 RedisRateLimiter::new(redis, global)
1221 .max_limit(config.max_rate_limit())
1222 .cache(config.quota_cache_ratio(), config.quota_cache_max()),
1223 ),
1224 _ => None,
1225 };
1226
1227 let quota_limiter = Arc::new(QuotaRateLimiter::new(
1228 #[cfg(feature = "processing")]
1229 project_cache.clone(),
1230 #[cfg(feature = "processing")]
1231 rate_limiter.clone(),
1232 ));
1233 #[cfg(feature = "processing")]
1234 let rate_limiter = rate_limiter.map(Arc::new);
1235
1236 let inner = InnerProcessor {
1237 pool,
1238 global_config,
1239 project_cache,
1240 cogs,
1241 #[cfg(feature = "processing")]
1242 quotas_client: quotas.clone(),
1243 #[cfg(feature = "processing")]
1244 rate_limiter,
1245 addrs,
1246 #[cfg(feature = "processing")]
1247 cardinality_limiter: cardinality
1248 .map(|cardinality| {
1249 RedisSetLimiter::new(
1250 RedisSetLimiterOptions {
1251 cache_vacuum_interval: config
1252 .cardinality_limiter_cache_vacuum_interval(),
1253 },
1254 cardinality,
1255 )
1256 })
1257 .map(CardinalityLimiter::new),
1258 metric_outcomes,
1259 processing: Processing {
1260 logs: LogsProcessor::new(Arc::clone("a_limiter)),
1261 trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)),
1262 spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1263 check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)),
1264 sessions: SessionsProcessor::new(Arc::clone("a_limiter)),
1265 profile_chunks: ProfileChunksProcessor::new(Arc::clone("a_limiter)),
1266 trace_attachments: TraceAttachmentsProcessor::new(quota_limiter),
1267 },
1268 geoip_lookup,
1269 config,
1270 };
1271
1272 Self {
1273 inner: Arc::new(inner),
1274 }
1275 }
1276
1277 async fn enforce_quotas<Group>(
1278 &self,
1279 managed_envelope: &mut TypedEnvelope<Group>,
1280 event: Annotated<Event>,
1281 extracted_metrics: &mut ProcessingExtractedMetrics,
1282 ctx: processing::Context<'_>,
1283 ) -> Result<Annotated<Event>, ProcessingError> {
1284 let cached_result = RateLimiter::Cached
1287 .enforce(managed_envelope, event, extracted_metrics, ctx)
1288 .await?;
1289
1290 if_processing!(self.inner.config, {
1291 let rate_limiter = match self.inner.rate_limiter.clone() {
1292 Some(rate_limiter) => rate_limiter,
1293 None => return Ok(cached_result.event),
1294 };
1295
1296 let consistent_result = RateLimiter::Consistent(rate_limiter)
1298 .enforce(
1299 managed_envelope,
1300 cached_result.event,
1301 extracted_metrics,
1302 ctx
1303 )
1304 .await?;
1305
1306 if !consistent_result.rate_limits.is_empty() {
1308 self.inner
1309 .project_cache
1310 .get(managed_envelope.scoping().project_key)
1311 .rate_limits()
1312 .merge(consistent_result.rate_limits);
1313 }
1314
1315 Ok(consistent_result.event)
1316 } else { Ok(cached_result.event) })
1317 }
1318
1319 async fn process_errors(
1321 &self,
1322 managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1323 project_id: ProjectId,
1324 mut ctx: processing::Context<'_>,
1325 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1326 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1327 let mut metrics = Metrics::default();
1328 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1329
1330 report::process_user_reports(managed_envelope);
1332
1333 if_processing!(self.inner.config, {
1334 unreal::expand(managed_envelope, &self.inner.config)?;
1335 #[cfg(sentry)]
1336 playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1337 nnswitch::expand(managed_envelope)?;
1338 });
1339
1340 let extraction_result = event::extract(
1341 managed_envelope,
1342 &mut metrics,
1343 event_fully_normalized,
1344 &self.inner.config,
1345 )?;
1346 let mut event = extraction_result.event;
1347
1348 if_processing!(self.inner.config, {
1349 if let Some(inner_event_fully_normalized) =
1350 unreal::process(managed_envelope, &mut event)?
1351 {
1352 event_fully_normalized = inner_event_fully_normalized;
1353 }
1354 #[cfg(sentry)]
1355 if let Some(inner_event_fully_normalized) =
1356 playstation::process(managed_envelope, &mut event, ctx.project_info)?
1357 {
1358 event_fully_normalized = inner_event_fully_normalized;
1359 }
1360 if let Some(inner_event_fully_normalized) =
1361 attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1362 {
1363 event_fully_normalized = inner_event_fully_normalized;
1364 }
1365 });
1366
1367 ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1368 managed_envelope,
1369 &mut event,
1370 ctx.project_info,
1371 ctx.sampling_project_info,
1372 );
1373
1374 let attachments = managed_envelope
1375 .envelope()
1376 .items()
1377 .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1378 processing::utils::event::finalize(
1379 managed_envelope.envelope().headers(),
1380 &mut event,
1381 attachments,
1382 &mut metrics,
1383 ctx.config,
1384 )?;
1385 event_fully_normalized = processing::utils::event::normalize(
1386 managed_envelope.envelope().headers(),
1387 &mut event,
1388 event_fully_normalized,
1389 project_id,
1390 ctx,
1391 &self.inner.geoip_lookup,
1392 )?;
1393 let filter_run =
1394 processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1395 .map_err(|err| {
1396 managed_envelope.reject(Outcome::Filtered(err.clone()));
1397 ProcessingError::EventFiltered(err)
1398 })?;
1399
1400 if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1401 dynamic_sampling::tag_error_with_sampling_decision(
1402 managed_envelope,
1403 &mut event,
1404 ctx.sampling_project_info,
1405 &self.inner.config,
1406 )
1407 .await;
1408 }
1409
1410 event = self
1411 .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1412 .await?;
1413
1414 if event.value().is_some() {
1415 processing::utils::event::scrub(&mut event, ctx.project_info)?;
1416 event::serialize(
1417 managed_envelope,
1418 &mut event,
1419 event_fully_normalized,
1420 EventMetricsExtracted(false),
1421 SpansExtracted(false),
1422 )?;
1423 event::emit_feedback_metrics(managed_envelope.envelope());
1424 }
1425
1426 let attachments = managed_envelope
1427 .envelope_mut()
1428 .items_mut()
1429 .filter(|i| i.ty() == &ItemType::Attachment);
1430 processing::utils::attachments::scrub(attachments, ctx.project_info);
1431
1432 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1433 relay_log::error!(
1434 tags.project = %project_id,
1435 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1436 "ingested event without normalizing"
1437 );
1438 }
1439
1440 Ok(Some(extracted_metrics))
1441 }
1442
1443 #[allow(unused_assignments)]
1445 async fn process_transactions(
1446 &self,
1447 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1448 cogs: &mut Token,
1449 project_id: ProjectId,
1450 mut ctx: processing::Context<'_>,
1451 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1452 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1453 let mut event_metrics_extracted = EventMetricsExtracted(false);
1454 let mut spans_extracted = SpansExtracted(false);
1455 let mut metrics = Metrics::default();
1456 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1457
1458 let extraction_result = event::extract(
1460 managed_envelope,
1461 &mut metrics,
1462 event_fully_normalized,
1463 &self.inner.config,
1464 )?;
1465
1466 if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1468 event_metrics_extracted = inner_event_metrics_extracted;
1469 }
1470 if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1471 spans_extracted = inner_spans_extracted;
1472 };
1473
1474 let mut event = extraction_result.event;
1476
1477 let profile_id = profile::filter(
1478 managed_envelope,
1479 &event,
1480 ctx.config,
1481 project_id,
1482 ctx.project_info,
1483 );
1484 processing::transactions::profile::transfer_id(&mut event, profile_id);
1485 processing::transactions::profile::remove_context_if_rate_limited(
1486 &mut event,
1487 managed_envelope.scoping(),
1488 ctx,
1489 );
1490
1491 ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1492 managed_envelope,
1493 &mut event,
1494 ctx.project_info,
1495 ctx.sampling_project_info,
1496 );
1497
1498 let attachments = managed_envelope
1499 .envelope()
1500 .items()
1501 .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1502 processing::utils::event::finalize(
1503 managed_envelope.envelope().headers(),
1504 &mut event,
1505 attachments,
1506 &mut metrics,
1507 &self.inner.config,
1508 )?;
1509
1510 event_fully_normalized = processing::utils::event::normalize(
1511 managed_envelope.envelope().headers(),
1512 &mut event,
1513 event_fully_normalized,
1514 project_id,
1515 ctx,
1516 &self.inner.geoip_lookup,
1517 )?;
1518
1519 let filter_run =
1520 processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1521 .map_err(|err| {
1522 managed_envelope.reject(Outcome::Filtered(err.clone()));
1523 ProcessingError::EventFiltered(err)
1524 })?;
1525
1526 let run_dynamic_sampling = (matches!(filter_run, FiltersStatus::Ok)
1530 || self.inner.config.processing_enabled())
1531 && matches!(&ctx.project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled());
1532
1533 let sampling_result = match run_dynamic_sampling {
1534 true => {
1535 #[allow(unused_mut)]
1536 let mut reservoir = ReservoirEvaluator::new(Arc::clone(ctx.reservoir_counters));
1537 #[cfg(feature = "processing")]
1538 if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
1539 reservoir.set_redis(managed_envelope.scoping().organization_id, quotas_client);
1540 }
1541 processing::utils::dynamic_sampling::run(
1542 managed_envelope.envelope().headers().dsc(),
1543 event.value(),
1544 &ctx,
1545 Some(&reservoir),
1546 )
1547 .await
1548 }
1549 false => SamplingResult::Pending,
1550 };
1551
1552 relay_statsd::metric!(
1553 counter(RelayCounters::SamplingDecision) += 1,
1554 decision = sampling_result.decision().as_str(),
1555 item = "transaction"
1556 );
1557
1558 #[cfg(feature = "processing")]
1559 let server_sample_rate = sampling_result.sample_rate();
1560
1561 if let Some(outcome) = sampling_result.into_dropped_outcome() {
1562 profile::process(
1565 managed_envelope,
1566 &mut event,
1567 ctx.global_config,
1568 ctx.config,
1569 ctx.project_info,
1570 );
1571 event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1573 &mut event,
1574 &mut extracted_metrics,
1575 ExtractMetricsContext {
1576 dsc: managed_envelope.envelope().dsc(),
1577 project_id,
1578 ctx,
1579 sampling_decision: SamplingDecision::Drop,
1580 metrics_extracted: event_metrics_extracted.0,
1581 spans_extracted: spans_extracted.0,
1582 },
1583 )?;
1584
1585 dynamic_sampling::drop_unsampled_items(
1586 managed_envelope,
1587 event,
1588 outcome,
1589 spans_extracted,
1590 );
1591
1592 event = self
1597 .enforce_quotas(
1598 managed_envelope,
1599 Annotated::empty(),
1600 &mut extracted_metrics,
1601 ctx,
1602 )
1603 .await?;
1604
1605 return Ok(Some(extracted_metrics));
1606 }
1607
1608 let _post_ds = cogs.start_category("post_ds");
1609
1610 processing::utils::event::scrub(&mut event, ctx.project_info)?;
1614
1615 let attachments = managed_envelope
1616 .envelope_mut()
1617 .items_mut()
1618 .filter(|i| i.ty() == &ItemType::Attachment);
1619 processing::utils::attachments::scrub(attachments, ctx.project_info);
1620
1621 if_processing!(self.inner.config, {
1622 let profile_id = profile::process(
1624 managed_envelope,
1625 &mut event,
1626 ctx.global_config,
1627 ctx.config,
1628 ctx.project_info,
1629 );
1630 processing::transactions::profile::transfer_id(&mut event, profile_id);
1631 processing::transactions::profile::scrub_profiler_id(&mut event);
1632
1633 event_metrics_extracted = processing::transactions::extraction::extract_metrics(
1635 &mut event,
1636 &mut extracted_metrics,
1637 ExtractMetricsContext {
1638 dsc: managed_envelope.envelope().dsc(),
1639 project_id,
1640 ctx,
1641 sampling_decision: SamplingDecision::Keep,
1642 metrics_extracted: event_metrics_extracted.0,
1643 spans_extracted: spans_extracted.0,
1644 },
1645 )?;
1646
1647 if let Some(spans) = processing::transactions::spans::extract_from_event(
1648 managed_envelope.envelope().dsc(),
1649 &event,
1650 ctx.global_config,
1651 ctx.config,
1652 server_sample_rate,
1653 event_metrics_extracted,
1654 spans_extracted,
1655 ) {
1656 spans_extracted = SpansExtracted(true);
1657 for item in spans {
1658 match item {
1659 Ok(item) => managed_envelope.envelope_mut().add_item(item),
1660 Err(()) => managed_envelope.track_outcome(
1661 Outcome::Invalid(DiscardReason::InvalidSpan),
1662 DataCategory::SpanIndexed,
1663 1,
1664 ),
1665 }
1667 }
1668 }
1669 });
1670
1671 event = self
1672 .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1673 .await?;
1674
1675 if event.value().is_some() {
1677 event::serialize(
1678 managed_envelope,
1679 &mut event,
1680 event_fully_normalized,
1681 event_metrics_extracted,
1682 spans_extracted,
1683 )?;
1684 }
1685
1686 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1687 relay_log::error!(
1688 tags.project = %project_id,
1689 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1690 "ingested event without normalizing"
1691 );
1692 };
1693
1694 Ok(Some(extracted_metrics))
1695 }
1696
1697 async fn process_standalone(
1699 &self,
1700 managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1701 project_id: ProjectId,
1702 ctx: processing::Context<'_>,
1703 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1704 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1705
1706 standalone::process(managed_envelope);
1707
1708 profile::filter(
1709 managed_envelope,
1710 &Annotated::empty(),
1711 ctx.config,
1712 project_id,
1713 ctx.project_info,
1714 );
1715
1716 self.enforce_quotas(
1717 managed_envelope,
1718 Annotated::empty(),
1719 &mut extracted_metrics,
1720 ctx,
1721 )
1722 .await?;
1723
1724 report::process_user_reports(managed_envelope);
1725 let attachments = managed_envelope
1726 .envelope_mut()
1727 .items_mut()
1728 .filter(|i| i.ty() == &ItemType::Attachment);
1729 processing::utils::attachments::scrub(attachments, ctx.project_info);
1730
1731 Ok(Some(extracted_metrics))
1732 }
1733
1734 async fn process_client_reports(
1736 &self,
1737 managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1738 ctx: processing::Context<'_>,
1739 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1740 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1741
1742 self.enforce_quotas(
1743 managed_envelope,
1744 Annotated::empty(),
1745 &mut extracted_metrics,
1746 ctx,
1747 )
1748 .await?;
1749
1750 report::process_client_reports(
1751 managed_envelope,
1752 ctx.config,
1753 ctx.project_info,
1754 self.inner.addrs.outcome_aggregator.clone(),
1755 );
1756
1757 Ok(Some(extracted_metrics))
1758 }
1759
1760 async fn process_replays(
1762 &self,
1763 managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1764 ctx: processing::Context<'_>,
1765 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1766 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1767
1768 replay::process(
1769 managed_envelope,
1770 ctx.global_config,
1771 ctx.config,
1772 ctx.project_info,
1773 &self.inner.geoip_lookup,
1774 )?;
1775
1776 self.enforce_quotas(
1777 managed_envelope,
1778 Annotated::empty(),
1779 &mut extracted_metrics,
1780 ctx,
1781 )
1782 .await?;
1783
1784 Ok(Some(extracted_metrics))
1785 }
1786
1787 async fn process_nel(
1788 &self,
1789 mut managed_envelope: ManagedEnvelope,
1790 ctx: processing::Context<'_>,
1791 ) -> Result<ProcessingResult, ProcessingError> {
1792 nel::convert_to_logs(&mut managed_envelope);
1793 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1794 .await
1795 }
1796
1797 async fn process_with_processor<P: processing::Processor>(
1798 &self,
1799 processor: &P,
1800 mut managed_envelope: ManagedEnvelope,
1801 ctx: processing::Context<'_>,
1802 ) -> Result<ProcessingResult, ProcessingError>
1803 where
1804 Outputs: From<P::Output>,
1805 {
1806 let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1807 debug_assert!(
1808 false,
1809 "there must be work for the {} processor",
1810 std::any::type_name::<P>(),
1811 );
1812 return Err(ProcessingError::ProcessingGroupMismatch);
1813 };
1814
1815 managed_envelope.update();
1816 match managed_envelope.envelope().is_empty() {
1817 true => managed_envelope.accept(),
1818 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1819 }
1820
1821 processor
1822 .process(work, ctx)
1823 .await
1824 .map_err(|err| {
1825 relay_log::debug!(
1826 error = &err as &dyn std::error::Error,
1827 "processing pipeline failed"
1828 );
1829 ProcessingError::ProcessingFailure
1830 })
1831 .map(|o| o.map(Into::into))
1832 .map(ProcessingResult::Output)
1833 }
1834
1835 async fn process_standalone_spans(
1839 &self,
1840 managed_envelope: &mut TypedEnvelope<SpanGroup>,
1841 _project_id: ProjectId,
1842 ctx: processing::Context<'_>,
1843 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1844 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1845
1846 span::filter(managed_envelope, ctx.config, ctx.project_info);
1847 span::convert_otel_traces_data(managed_envelope);
1848
1849 if_processing!(self.inner.config, {
1850 span::process(
1851 managed_envelope,
1852 &mut Annotated::empty(),
1853 &mut extracted_metrics,
1854 _project_id,
1855 ctx,
1856 &self.inner.geoip_lookup,
1857 )
1858 .await;
1859 });
1860
1861 self.enforce_quotas(
1862 managed_envelope,
1863 Annotated::empty(),
1864 &mut extracted_metrics,
1865 ctx,
1866 )
1867 .await?;
1868
1869 Ok(Some(extracted_metrics))
1870 }
1871
1872 async fn process_envelope(
1873 &self,
1874 cogs: &mut Token,
1875 project_id: ProjectId,
1876 message: ProcessEnvelopeGrouped<'_>,
1877 ) -> Result<ProcessingResult, ProcessingError> {
1878 let ProcessEnvelopeGrouped {
1879 group,
1880 envelope: mut managed_envelope,
1881 ctx,
1882 } = message;
1883
1884 if let Some(sampling_state) = ctx.sampling_project_info {
1886 managed_envelope
1889 .envelope_mut()
1890 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1891 }
1892
1893 if let Some(retention) = ctx.project_info.config.event_retention {
1896 managed_envelope.envelope_mut().set_retention(retention);
1897 }
1898
1899 if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1902 managed_envelope
1903 .envelope_mut()
1904 .set_downsampled_retention(retention);
1905 }
1906
1907 managed_envelope
1912 .envelope_mut()
1913 .meta_mut()
1914 .set_project_id(project_id);
1915
1916 macro_rules! run {
1917 ($fn_name:ident $(, $args:expr)*) => {
1918 async {
1919 let mut managed_envelope = (managed_envelope, group).try_into()?;
1920 match self.$fn_name(&mut managed_envelope, $($args),*).await {
1921 Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1922 managed_envelope: managed_envelope.into_processed(),
1923 extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1924 }),
1925 Err(error) => {
1926 relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1927 if let Some(outcome) = error.to_outcome() {
1928 managed_envelope.reject(outcome);
1929 }
1930
1931 return Err(error);
1932 }
1933 }
1934 }.await
1935 };
1936 }
1937
1938 relay_log::trace!("Processing {group} group", group = group.variant());
1939
1940 match group {
1941 ProcessingGroup::Error => run!(process_errors, project_id, ctx),
1942 ProcessingGroup::Transaction => {
1943 run!(process_transactions, cogs, project_id, ctx)
1944 }
1945 ProcessingGroup::Session => {
1946 self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1947 .await
1948 }
1949 ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx),
1950 ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
1951 ProcessingGroup::Replay => {
1952 run!(process_replays, ctx)
1953 }
1954 ProcessingGroup::CheckIn => {
1955 self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1956 .await
1957 }
1958 ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1959 ProcessingGroup::Log => {
1960 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1961 .await
1962 }
1963 ProcessingGroup::TraceMetric => {
1964 self.process_with_processor(
1965 &self.inner.processing.trace_metrics,
1966 managed_envelope,
1967 ctx,
1968 )
1969 .await
1970 }
1971 ProcessingGroup::SpanV2 => {
1972 self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1973 .await
1974 }
1975 ProcessingGroup::TraceAttachment => {
1976 self.process_with_processor(
1977 &self.inner.processing.trace_attachments,
1978 managed_envelope,
1979 ctx,
1980 )
1981 .await
1982 }
1983 ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1984 ProcessingGroup::ProfileChunk => {
1985 self.process_with_processor(
1986 &self.inner.processing.profile_chunks,
1987 managed_envelope,
1988 ctx,
1989 )
1990 .await
1991 }
1992 ProcessingGroup::Metrics => {
1994 if self.inner.config.relay_mode() != RelayMode::Proxy {
1997 relay_log::error!(
1998 tags.project = %project_id,
1999 items = ?managed_envelope.envelope().items().next().map(Item::ty),
2000 "received metrics in the process_state"
2001 );
2002 }
2003
2004 Ok(ProcessingResult::no_metrics(
2005 managed_envelope.into_processed(),
2006 ))
2007 }
2008 ProcessingGroup::Ungrouped => {
2010 relay_log::error!(
2011 tags.project = %project_id,
2012 items = ?managed_envelope.envelope().items().next().map(Item::ty),
2013 "could not identify the processing group based on the envelope's items"
2014 );
2015
2016 Ok(ProcessingResult::no_metrics(
2017 managed_envelope.into_processed(),
2018 ))
2019 }
2020 ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
2024 managed_envelope.into_processed(),
2025 )),
2026 }
2027 }
2028
2029 async fn process<'a>(
2035 &self,
2036 cogs: &mut Token,
2037 mut message: ProcessEnvelopeGrouped<'a>,
2038 ) -> Result<Option<Submit<'a>>, ProcessingError> {
2039 let ProcessEnvelopeGrouped {
2040 ref mut envelope,
2041 ctx,
2042 ..
2043 } = message;
2044
2045 let Some(project_id) = ctx
2052 .project_info
2053 .project_id
2054 .or_else(|| envelope.envelope().meta().project_id())
2055 else {
2056 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2057 return Err(ProcessingError::MissingProjectId);
2058 };
2059
2060 let client = envelope.envelope().meta().client().map(str::to_owned);
2061 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2062 let project_key = envelope.envelope().meta().public_key();
2063 let sampling_key = envelope
2067 .envelope()
2068 .sampling_key()
2069 .filter(|_| ctx.sampling_project_info.is_some());
2070
2071 relay_log::configure_scope(|scope| {
2074 scope.set_tag("project", project_id);
2075 if let Some(client) = client {
2076 scope.set_tag("sdk", client);
2077 }
2078 if let Some(user_agent) = user_agent {
2079 scope.set_extra("user_agent", user_agent.into());
2080 }
2081 });
2082
2083 let result = match self.process_envelope(cogs, project_id, message).await {
2084 Ok(ProcessingResult::Envelope {
2085 mut managed_envelope,
2086 extracted_metrics,
2087 }) => {
2088 managed_envelope.update();
2091
2092 let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2093 send_metrics(
2094 extracted_metrics.metrics,
2095 project_key,
2096 sampling_key,
2097 &self.inner.addrs.aggregator,
2098 );
2099
2100 let envelope_response = if managed_envelope.envelope().is_empty() {
2101 if !has_metrics {
2102 managed_envelope.reject(Outcome::RateLimited(None));
2104 } else {
2105 managed_envelope.accept();
2106 }
2107
2108 None
2109 } else {
2110 Some(managed_envelope)
2111 };
2112
2113 Ok(envelope_response.map(Submit::Envelope))
2114 }
2115 Ok(ProcessingResult::Output(Output { main, metrics })) => {
2116 if let Some(metrics) = metrics {
2117 metrics.accept(|metrics| {
2118 send_metrics(
2119 metrics,
2120 project_key,
2121 sampling_key,
2122 &self.inner.addrs.aggregator,
2123 );
2124 });
2125 }
2126
2127 let ctx = ctx.to_forward();
2128 Ok(main.map(|output| Submit::Output { output, ctx }))
2129 }
2130 Err(err) => Err(err),
2131 };
2132
2133 relay_log::configure_scope(|scope| {
2134 scope.remove_tag("project");
2135 scope.remove_tag("sdk");
2136 scope.remove_tag("user_agent");
2137 });
2138
2139 result
2140 }
2141
2142 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2143 let project_key = message.envelope.envelope().meta().public_key();
2144 let wait_time = message.envelope.age();
2145 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2146
2147 cogs.cancel();
2150
2151 let scoping = message.envelope.scoping();
2152 for (group, envelope) in ProcessingGroup::split_envelope(
2153 *message.envelope.into_envelope(),
2154 &message.project_info,
2155 ) {
2156 let mut cogs = self
2157 .inner
2158 .cogs
2159 .timed(ResourceId::Relay, AppFeature::from(group));
2160
2161 let mut envelope =
2162 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2163 envelope.scope(scoping);
2164
2165 let global_config = self.inner.global_config.current();
2166
2167 let ctx = processing::Context {
2168 config: &self.inner.config,
2169 global_config: &global_config,
2170 project_info: &message.project_info,
2171 sampling_project_info: message.sampling_project_info.as_deref(),
2172 rate_limits: &message.rate_limits,
2173 reservoir_counters: &message.reservoir_counters,
2174 };
2175
2176 let message = ProcessEnvelopeGrouped {
2177 group,
2178 envelope,
2179 ctx,
2180 };
2181
2182 let result = metric!(
2183 timer(RelayTimers::EnvelopeProcessingTime),
2184 group = group.variant(),
2185 { self.process(&mut cogs, message).await }
2186 );
2187
2188 match result {
2189 Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2190 Ok(None) => {}
2191 Err(error) if error.is_unexpected() => {
2192 relay_log::error!(
2193 tags.project_key = %project_key,
2194 error = &error as &dyn Error,
2195 "error processing envelope"
2196 )
2197 }
2198 Err(error) => {
2199 relay_log::debug!(
2200 tags.project_key = %project_key,
2201 error = &error as &dyn Error,
2202 "error processing envelope"
2203 )
2204 }
2205 }
2206 }
2207 }
2208
2209 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2210 let ProcessMetrics {
2211 data,
2212 project_key,
2213 received_at,
2214 sent_at,
2215 source,
2216 } = message;
2217
2218 let received_timestamp =
2219 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2220
2221 let mut buckets = data.into_buckets(received_timestamp);
2222 if buckets.is_empty() {
2223 return;
2224 };
2225 cogs.update(relay_metrics::cogs::BySize(&buckets));
2226
2227 let clock_drift_processor =
2228 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2229
2230 buckets.retain_mut(|bucket| {
2231 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2232 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2233 return false;
2234 }
2235
2236 if !self::metrics::is_valid_namespace(bucket) {
2237 return false;
2238 }
2239
2240 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2241
2242 if !matches!(source, BucketSource::Internal) {
2243 bucket.metadata = BucketMetadata::new(received_timestamp);
2244 }
2245
2246 true
2247 });
2248
2249 let project = self.inner.project_cache.get(project_key);
2250
2251 let buckets = match project.state() {
2254 ProjectState::Enabled(project_info) => {
2255 let rate_limits = project.rate_limits().current_limits();
2256 self.check_buckets(project_key, project_info, &rate_limits, buckets)
2257 }
2258 _ => buckets,
2259 };
2260
2261 relay_log::trace!("merging metric buckets into the aggregator");
2262 self.inner
2263 .addrs
2264 .aggregator
2265 .send(MergeBuckets::new(project_key, buckets));
2266 }
2267
2268 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2269 let ProcessBatchedMetrics {
2270 payload,
2271 source,
2272 received_at,
2273 sent_at,
2274 } = message;
2275
2276 #[derive(serde::Deserialize)]
2277 struct Wrapper {
2278 buckets: HashMap<ProjectKey, Vec<Bucket>>,
2279 }
2280
2281 let buckets = match serde_json::from_slice(&payload) {
2282 Ok(Wrapper { buckets }) => buckets,
2283 Err(error) => {
2284 relay_log::debug!(
2285 error = &error as &dyn Error,
2286 "failed to parse batched metrics",
2287 );
2288 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2289 return;
2290 }
2291 };
2292
2293 for (project_key, buckets) in buckets {
2294 self.handle_process_metrics(
2295 cogs,
2296 ProcessMetrics {
2297 data: MetricData::Parsed(buckets),
2298 project_key,
2299 source,
2300 received_at,
2301 sent_at,
2302 },
2303 )
2304 }
2305 }
2306
2307 fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2308 let _submit = cogs.start_category("submit");
2309
2310 #[cfg(feature = "processing")]
2311 if self.inner.config.processing_enabled()
2312 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2313 {
2314 use crate::processing::StoreHandle;
2315
2316 let upload = self.inner.addrs.upload.as_ref();
2317 match submit {
2318 Submit::Envelope(envelope) => {
2319 let envelope_has_attachments = envelope
2320 .envelope()
2321 .items()
2322 .any(|item| *item.ty() == ItemType::Attachment);
2323 let use_objectstore = || {
2325 let options = &self.inner.global_config.current().options;
2326 utils::sample(options.objectstore_attachments_sample_rate).is_keep()
2327 };
2328
2329 if let Some(upload) = &self.inner.addrs.upload
2330 && envelope_has_attachments
2331 && use_objectstore()
2332 {
2333 upload.send(StoreEnvelope { envelope })
2335 } else {
2336 store_forwarder.send(StoreEnvelope { envelope })
2337 }
2338 }
2339 Submit::Output { output, ctx } => output
2340 .forward_store(StoreHandle::new(store_forwarder, upload), ctx)
2341 .unwrap_or_else(|err| err.into_inner()),
2342 }
2343 return;
2344 }
2345
2346 let mut envelope = match submit {
2347 Submit::Envelope(envelope) => envelope,
2348 Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2349 Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2350 Err(_) => {
2351 relay_log::error!("failed to serialize output to an envelope");
2352 return;
2353 }
2354 },
2355 };
2356
2357 if envelope.envelope_mut().is_empty() {
2358 envelope.accept();
2359 return;
2360 }
2361
2362 envelope.envelope_mut().set_sent_at(Utc::now());
2368
2369 relay_log::trace!("sending envelope to sentry endpoint");
2370 let http_encoding = self.inner.config.http_encoding();
2371 let result = envelope.envelope().to_vec().and_then(|v| {
2372 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2373 });
2374
2375 match result {
2376 Ok(body) => {
2377 self.inner
2378 .addrs
2379 .upstream_relay
2380 .send(SendRequest(SendEnvelope {
2381 envelope,
2382 body,
2383 http_encoding,
2384 project_cache: self.inner.project_cache.clone(),
2385 }));
2386 }
2387 Err(error) => {
2388 relay_log::error!(
2391 error = &error as &dyn Error,
2392 tags.project_key = %envelope.scoping().project_key,
2393 "failed to serialize envelope payload"
2394 );
2395
2396 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2397 }
2398 }
2399 }
2400
2401 fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2402 let SubmitClientReports {
2403 client_reports,
2404 scoping,
2405 } = message;
2406
2407 let upstream = self.inner.config.upstream_descriptor();
2408 let dsn = PartialDsn::outbound(&scoping, upstream);
2409
2410 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2411 for client_report in client_reports {
2412 match client_report.serialize() {
2413 Ok(payload) => {
2414 let mut item = Item::new(ItemType::ClientReport);
2415 item.set_payload(ContentType::Json, payload);
2416 envelope.add_item(item);
2417 }
2418 Err(error) => {
2419 relay_log::error!(
2420 error = &error as &dyn std::error::Error,
2421 "failed to serialize client report"
2422 );
2423 }
2424 }
2425 }
2426
2427 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2428 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2429 }
2430
2431 fn check_buckets(
2432 &self,
2433 project_key: ProjectKey,
2434 project_info: &ProjectInfo,
2435 rate_limits: &RateLimits,
2436 buckets: Vec<Bucket>,
2437 ) -> Vec<Bucket> {
2438 let Some(scoping) = project_info.scoping(project_key) else {
2439 relay_log::error!(
2440 tags.project_key = project_key.as_str(),
2441 "there is no scoping: dropping {} buckets",
2442 buckets.len(),
2443 );
2444 return Vec::new();
2445 };
2446
2447 let mut buckets = self::metrics::apply_project_info(
2448 buckets,
2449 &self.inner.metric_outcomes,
2450 project_info,
2451 scoping,
2452 );
2453
2454 let namespaces: BTreeSet<MetricNamespace> = buckets
2455 .iter()
2456 .filter_map(|bucket| bucket.name.try_namespace())
2457 .collect();
2458
2459 for namespace in namespaces {
2460 let limits = rate_limits.check_with_quotas(
2461 project_info.get_quotas(),
2462 scoping.item(DataCategory::MetricBucket),
2463 );
2464
2465 if limits.is_limited() {
2466 let rejected;
2467 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2468 bucket.name.try_namespace() == Some(namespace)
2469 });
2470
2471 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2472 self.inner.metric_outcomes.track(
2473 scoping,
2474 &rejected,
2475 Outcome::RateLimited(reason_code),
2476 );
2477 }
2478 }
2479
2480 let quotas = project_info.config.quotas.clone();
2481 match MetricsLimiter::create(buckets, quotas, scoping) {
2482 Ok(mut bucket_limiter) => {
2483 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2484 bucket_limiter.into_buckets()
2485 }
2486 Err(buckets) => buckets,
2487 }
2488 }
2489
2490 #[cfg(feature = "processing")]
2491 async fn rate_limit_buckets(
2492 &self,
2493 scoping: Scoping,
2494 project_info: &ProjectInfo,
2495 mut buckets: Vec<Bucket>,
2496 ) -> Vec<Bucket> {
2497 let Some(rate_limiter) = &self.inner.rate_limiter else {
2498 return buckets;
2499 };
2500
2501 let global_config = self.inner.global_config.current();
2502 let namespaces = buckets
2503 .iter()
2504 .filter_map(|bucket| bucket.name.try_namespace())
2505 .counts();
2506
2507 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2508
2509 for (namespace, quantity) in namespaces {
2510 let item_scoping = scoping.metric_bucket(namespace);
2511
2512 let limits = match rate_limiter
2513 .is_rate_limited(quotas, item_scoping, quantity, false)
2514 .await
2515 {
2516 Ok(limits) => limits,
2517 Err(err) => {
2518 relay_log::error!(
2519 error = &err as &dyn std::error::Error,
2520 "failed to check redis rate limits"
2521 );
2522 break;
2523 }
2524 };
2525
2526 if limits.is_limited() {
2527 let rejected;
2528 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2529 bucket.name.try_namespace() == Some(namespace)
2530 });
2531
2532 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2533 self.inner.metric_outcomes.track(
2534 scoping,
2535 &rejected,
2536 Outcome::RateLimited(reason_code),
2537 );
2538
2539 self.inner
2540 .project_cache
2541 .get(item_scoping.scoping.project_key)
2542 .rate_limits()
2543 .merge(limits);
2544 }
2545 }
2546
2547 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2548 Err(buckets) => buckets,
2549 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2550 }
2551 }
2552
2553 #[cfg(feature = "processing")]
2555 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2556 relay_log::trace!("handle_rate_limit_buckets");
2557
2558 let scoping = *bucket_limiter.scoping();
2559
2560 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2561 let global_config = self.inner.global_config.current();
2562 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2563
2564 let over_accept_once = true;
2567 let mut rate_limits = RateLimits::new();
2568
2569 for category in [DataCategory::Transaction, DataCategory::Span] {
2570 let count = bucket_limiter.count(category);
2571
2572 let timer = Instant::now();
2573 let mut is_limited = false;
2574
2575 if let Some(count) = count {
2576 match rate_limiter
2577 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2578 .await
2579 {
2580 Ok(limits) => {
2581 is_limited = limits.is_limited();
2582 rate_limits.merge(limits)
2583 }
2584 Err(e) => relay_log::error!(error = &e as &dyn Error),
2585 }
2586 }
2587
2588 relay_statsd::metric!(
2589 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2590 category = category.name(),
2591 limited = if is_limited { "true" } else { "false" },
2592 count = match count {
2593 None => "none",
2594 Some(0) => "0",
2595 Some(1) => "1",
2596 Some(1..=10) => "10",
2597 Some(1..=25) => "25",
2598 Some(1..=50) => "50",
2599 Some(51..=100) => "100",
2600 Some(101..=500) => "500",
2601 _ => "> 500",
2602 },
2603 );
2604 }
2605
2606 if rate_limits.is_limited() {
2607 let was_enforced =
2608 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2609
2610 if was_enforced {
2611 self.inner
2613 .project_cache
2614 .get(scoping.project_key)
2615 .rate_limits()
2616 .merge(rate_limits);
2617 }
2618 }
2619 }
2620
2621 bucket_limiter.into_buckets()
2622 }
2623
2624 #[cfg(feature = "processing")]
2626 async fn cardinality_limit_buckets(
2627 &self,
2628 scoping: Scoping,
2629 limits: &[CardinalityLimit],
2630 buckets: Vec<Bucket>,
2631 ) -> Vec<Bucket> {
2632 let global_config = self.inner.global_config.current();
2633 let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2634
2635 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2636 return buckets;
2637 }
2638
2639 let Some(ref limiter) = self.inner.cardinality_limiter else {
2640 return buckets;
2641 };
2642
2643 let scope = relay_cardinality::Scoping {
2644 organization_id: scoping.organization_id,
2645 project_id: scoping.project_id,
2646 };
2647
2648 let limits = match limiter
2649 .check_cardinality_limits(scope, limits, buckets)
2650 .await
2651 {
2652 Ok(limits) => limits,
2653 Err((buckets, error)) => {
2654 relay_log::error!(
2655 error = &error as &dyn std::error::Error,
2656 "cardinality limiter failed"
2657 );
2658 return buckets;
2659 }
2660 };
2661
2662 let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2663 if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2664 for limit in limits.exceeded_limits() {
2665 relay_log::with_scope(
2666 |scope| {
2667 scope.set_user(Some(relay_log::sentry::User {
2669 id: Some(scoping.organization_id.to_string()),
2670 ..Default::default()
2671 }));
2672 },
2673 || {
2674 relay_log::error!(
2675 tags.organization_id = scoping.organization_id.value(),
2676 tags.limit_id = limit.id,
2677 tags.passive = limit.passive,
2678 "Cardinality Limit"
2679 );
2680 },
2681 );
2682 }
2683 }
2684
2685 for (limit, reports) in limits.cardinality_reports() {
2686 for report in reports {
2687 self.inner
2688 .metric_outcomes
2689 .cardinality(scoping, limit, report);
2690 }
2691 }
2692
2693 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2694 return limits.into_source();
2695 }
2696
2697 let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2698
2699 for (bucket, exceeded) in rejected {
2700 self.inner.metric_outcomes.track(
2701 scoping,
2702 &[bucket],
2703 Outcome::CardinalityLimited(exceeded.id.clone()),
2704 );
2705 }
2706 accepted
2707 }
2708
2709 #[cfg(feature = "processing")]
2716 async fn encode_metrics_processing(
2717 &self,
2718 message: FlushBuckets,
2719 store_forwarder: &Addr<Store>,
2720 ) {
2721 use crate::constants::DEFAULT_EVENT_RETENTION;
2722 use crate::services::store::StoreMetrics;
2723
2724 for ProjectBuckets {
2725 buckets,
2726 scoping,
2727 project_info,
2728 ..
2729 } in message.buckets.into_values()
2730 {
2731 let buckets = self
2732 .rate_limit_buckets(scoping, &project_info, buckets)
2733 .await;
2734
2735 let limits = project_info.get_cardinality_limits();
2736 let buckets = self
2737 .cardinality_limit_buckets(scoping, limits, buckets)
2738 .await;
2739
2740 if buckets.is_empty() {
2741 continue;
2742 }
2743
2744 let retention = project_info
2745 .config
2746 .event_retention
2747 .unwrap_or(DEFAULT_EVENT_RETENTION);
2748
2749 store_forwarder.send(StoreMetrics {
2752 buckets,
2753 scoping,
2754 retention,
2755 });
2756 }
2757 }
2758
2759 fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2771 let FlushBuckets {
2772 partition_key,
2773 buckets,
2774 } = message;
2775
2776 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2777 let upstream = self.inner.config.upstream_descriptor();
2778
2779 for ProjectBuckets {
2780 buckets, scoping, ..
2781 } in buckets.values()
2782 {
2783 let dsn = PartialDsn::outbound(scoping, upstream);
2784
2785 relay_statsd::metric!(
2786 distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2787 );
2788
2789 let mut num_batches = 0;
2790 for batch in BucketsView::from(buckets).by_size(batch_size) {
2791 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2792
2793 let mut item = Item::new(ItemType::MetricBuckets);
2794 item.set_source_quantities(crate::metrics::extract_quantities(batch));
2795 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2796 envelope.add_item(item);
2797
2798 let mut envelope =
2799 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2800 envelope
2801 .set_partition_key(Some(partition_key))
2802 .scope(*scoping);
2803
2804 relay_statsd::metric!(
2805 distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2806 );
2807
2808 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2809 num_batches += 1;
2810 }
2811
2812 relay_statsd::metric!(
2813 distribution(RelayDistributions::BatchesPerPartition) = num_batches
2814 );
2815 }
2816 }
2817
2818 fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2820 if partition.is_empty() {
2821 return;
2822 }
2823
2824 let (unencoded, project_info) = partition.take();
2825 let http_encoding = self.inner.config.http_encoding();
2826 let encoded = match encode_payload(&unencoded, http_encoding) {
2827 Ok(payload) => payload,
2828 Err(error) => {
2829 let error = &error as &dyn std::error::Error;
2830 relay_log::error!(error, "failed to encode metrics payload");
2831 return;
2832 }
2833 };
2834
2835 let request = SendMetricsRequest {
2836 partition_key: partition_key.to_string(),
2837 unencoded,
2838 encoded,
2839 project_info,
2840 http_encoding,
2841 metric_outcomes: self.inner.metric_outcomes.clone(),
2842 };
2843
2844 self.inner.addrs.upstream_relay.send(SendRequest(request));
2845 }
2846
2847 fn encode_metrics_global(&self, message: FlushBuckets) {
2862 let FlushBuckets {
2863 partition_key,
2864 buckets,
2865 } = message;
2866
2867 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2868 let mut partition = Partition::new(batch_size);
2869 let mut partition_splits = 0;
2870
2871 for ProjectBuckets {
2872 buckets, scoping, ..
2873 } in buckets.values()
2874 {
2875 for bucket in buckets {
2876 let mut remaining = Some(BucketView::new(bucket));
2877
2878 while let Some(bucket) = remaining.take() {
2879 if let Some(next) = partition.insert(bucket, *scoping) {
2880 self.send_global_partition(partition_key, &mut partition);
2884 remaining = Some(next);
2885 partition_splits += 1;
2886 }
2887 }
2888 }
2889 }
2890
2891 if partition_splits > 0 {
2892 metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2893 }
2894
2895 self.send_global_partition(partition_key, &mut partition);
2896 }
2897
2898 async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2899 for (project_key, pb) in message.buckets.iter_mut() {
2900 let buckets = std::mem::take(&mut pb.buckets);
2901 pb.buckets =
2902 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2903 }
2904
2905 #[cfg(feature = "processing")]
2906 if self.inner.config.processing_enabled()
2907 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2908 {
2909 return self
2910 .encode_metrics_processing(message, store_forwarder)
2911 .await;
2912 }
2913
2914 if self.inner.config.http_global_metrics() {
2915 self.encode_metrics_global(message)
2916 } else {
2917 self.encode_metrics_envelope(cogs, message)
2918 }
2919 }
2920
2921 #[cfg(all(test, feature = "processing"))]
2922 fn redis_rate_limiter_enabled(&self) -> bool {
2923 self.inner.rate_limiter.is_some()
2924 }
2925
2926 async fn handle_message(&self, message: EnvelopeProcessor) {
2927 let ty = message.variant();
2928 let feature_weights = self.feature_weights(&message);
2929
2930 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2931 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2932
2933 match message {
2934 EnvelopeProcessor::ProcessEnvelope(m) => {
2935 self.handle_process_envelope(&mut cogs, *m).await
2936 }
2937 EnvelopeProcessor::ProcessProjectMetrics(m) => {
2938 self.handle_process_metrics(&mut cogs, *m)
2939 }
2940 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2941 self.handle_process_batched_metrics(&mut cogs, *m)
2942 }
2943 EnvelopeProcessor::FlushBuckets(m) => {
2944 self.handle_flush_buckets(&mut cogs, *m).await
2945 }
2946 EnvelopeProcessor::SubmitClientReports(m) => {
2947 self.handle_submit_client_reports(&mut cogs, *m)
2948 }
2949 }
2950 });
2951 }
2952
2953 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2954 match message {
2955 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2957 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2958 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2959 EnvelopeProcessor::FlushBuckets(v) => v
2960 .buckets
2961 .values()
2962 .map(|s| {
2963 if self.inner.config.processing_enabled() {
2964 relay_metrics::cogs::ByCount(&s.buckets).into()
2967 } else {
2968 relay_metrics::cogs::BySize(&s.buckets).into()
2969 }
2970 })
2971 .fold(FeatureWeights::none(), FeatureWeights::merge),
2972 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2973 }
2974 }
2975}
2976
2977impl Service for EnvelopeProcessorService {
2978 type Interface = EnvelopeProcessor;
2979
2980 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2981 while let Some(message) = rx.recv().await {
2982 let service = self.clone();
2983 self.inner
2984 .pool
2985 .spawn_async(
2986 async move {
2987 service.handle_message(message).await;
2988 }
2989 .boxed(),
2990 )
2991 .await;
2992 }
2993 }
2994}
2995
2996struct EnforcementResult {
3001 event: Annotated<Event>,
3002 #[cfg_attr(not(feature = "processing"), expect(dead_code))]
3003 rate_limits: RateLimits,
3004}
3005
3006impl EnforcementResult {
3007 pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
3009 Self { event, rate_limits }
3010 }
3011}
3012
3013#[derive(Clone)]
3014enum RateLimiter {
3015 Cached,
3016 #[cfg(feature = "processing")]
3017 Consistent(Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>),
3018}
3019
3020impl RateLimiter {
3021 async fn enforce<Group>(
3022 &self,
3023 managed_envelope: &mut TypedEnvelope<Group>,
3024 event: Annotated<Event>,
3025 _extracted_metrics: &mut ProcessingExtractedMetrics,
3026 ctx: processing::Context<'_>,
3027 ) -> Result<EnforcementResult, ProcessingError> {
3028 if managed_envelope.envelope().is_empty() && event.value().is_none() {
3029 return Ok(EnforcementResult::new(event, RateLimits::default()));
3030 }
3031
3032 let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
3033 if quotas.is_empty() {
3034 return Ok(EnforcementResult::new(event, RateLimits::default()));
3035 }
3036
3037 let event_category = event_category(&event);
3038
3039 let this = self.clone();
3045 let mut envelope_limiter =
3046 EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
3047 let this = this.clone();
3048
3049 async move {
3050 match this {
3051 #[cfg(feature = "processing")]
3052 RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
3053 rate_limiter
3054 .is_rate_limited(quotas, item_scope, _quantity, false)
3055 .await?,
3056 ),
3057 _ => Ok::<_, ProcessingError>(
3058 ctx.rate_limits.check_with_quotas(quotas, item_scope),
3059 ),
3060 }
3061 }
3062 });
3063
3064 if let Some(category) = event_category {
3067 envelope_limiter.assume_event(category);
3068 }
3069
3070 let scoping = managed_envelope.scoping();
3071 let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
3072 envelope_limiter
3073 .compute(managed_envelope.envelope_mut(), &scoping)
3074 .await
3075 })?;
3076 let event_active = enforcement.is_event_active();
3077
3078 #[cfg(feature = "processing")]
3082 _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3083 enforcement.apply_with_outcomes(managed_envelope);
3084
3085 if event_active {
3086 debug_assert!(managed_envelope.envelope().is_empty());
3087 return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3088 }
3089
3090 Ok(EnforcementResult::new(event, rate_limits))
3091 }
3092
3093 fn name(&self) -> &'static str {
3094 match self {
3095 Self::Cached => "cached",
3096 #[cfg(feature = "processing")]
3097 Self::Consistent(_) => "consistent",
3098 }
3099 }
3100}
3101
3102pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3103 let envelope_body: Vec<u8> = match http_encoding {
3104 HttpEncoding::Identity => return Ok(body.clone()),
3105 HttpEncoding::Deflate => {
3106 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3107 encoder.write_all(body.as_ref())?;
3108 encoder.finish()?
3109 }
3110 HttpEncoding::Gzip => {
3111 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3112 encoder.write_all(body.as_ref())?;
3113 encoder.finish()?
3114 }
3115 HttpEncoding::Br => {
3116 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3118 encoder.write_all(body.as_ref())?;
3119 encoder.into_inner()
3120 }
3121 HttpEncoding::Zstd => {
3122 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3125 encoder.write_all(body.as_ref())?;
3126 encoder.finish()?
3127 }
3128 };
3129
3130 Ok(envelope_body.into())
3131}
3132
3133#[derive(Debug)]
3135pub struct SendEnvelope {
3136 pub envelope: TypedEnvelope<Processed>,
3137 pub body: Bytes,
3138 pub http_encoding: HttpEncoding,
3139 pub project_cache: ProjectCacheHandle,
3140}
3141
3142impl UpstreamRequest for SendEnvelope {
3143 fn method(&self) -> reqwest::Method {
3144 reqwest::Method::POST
3145 }
3146
3147 fn path(&self) -> Cow<'_, str> {
3148 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3149 }
3150
3151 fn route(&self) -> &'static str {
3152 "envelope"
3153 }
3154
3155 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3156 let envelope_body = self.body.clone();
3157 metric!(
3158 distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
3159 );
3160
3161 let meta = &self.envelope.meta();
3162 let shard = self.envelope.partition_key().map(|p| p.to_string());
3163 builder
3164 .content_encoding(self.http_encoding)
3165 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3166 .header_opt("User-Agent", meta.user_agent())
3167 .header("X-Sentry-Auth", meta.auth_header())
3168 .header("X-Forwarded-For", meta.forwarded_for())
3169 .header("Content-Type", envelope::CONTENT_TYPE)
3170 .header_opt("X-Sentry-Relay-Shard", shard)
3171 .body(envelope_body);
3172
3173 Ok(())
3174 }
3175
3176 fn sign(&mut self) -> Option<Sign> {
3177 Some(Sign::Optional(SignatureType::RequestSign))
3178 }
3179
3180 fn respond(
3181 self: Box<Self>,
3182 result: Result<http::Response, UpstreamRequestError>,
3183 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3184 Box::pin(async move {
3185 let result = match result {
3186 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3187 Err(error) => Err(error),
3188 };
3189
3190 match result {
3191 Ok(()) => self.envelope.accept(),
3192 Err(error) if error.is_received() => {
3193 let scoping = self.envelope.scoping();
3194 self.envelope.accept();
3195
3196 if let UpstreamRequestError::RateLimited(limits) = error {
3197 self.project_cache
3198 .get(scoping.project_key)
3199 .rate_limits()
3200 .merge(limits.scope(&scoping));
3201 }
3202 }
3203 Err(error) => {
3204 let mut envelope = self.envelope;
3207 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3208 relay_log::error!(
3209 error = &error as &dyn Error,
3210 tags.project_key = %envelope.scoping().project_key,
3211 "error sending envelope"
3212 );
3213 }
3214 }
3215 })
3216 }
3217}
3218
3219#[derive(Debug)]
3226struct Partition<'a> {
3227 max_size: usize,
3228 remaining: usize,
3229 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3230 project_info: HashMap<ProjectKey, Scoping>,
3231}
3232
3233impl<'a> Partition<'a> {
3234 pub fn new(size: usize) -> Self {
3236 Self {
3237 max_size: size,
3238 remaining: size,
3239 views: HashMap::new(),
3240 project_info: HashMap::new(),
3241 }
3242 }
3243
3244 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3255 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3256
3257 if let Some(current) = current {
3258 self.remaining = self.remaining.saturating_sub(current.estimated_size());
3259 self.views
3260 .entry(scoping.project_key)
3261 .or_default()
3262 .push(current);
3263
3264 self.project_info
3265 .entry(scoping.project_key)
3266 .or_insert(scoping);
3267 }
3268
3269 next
3270 }
3271
3272 fn is_empty(&self) -> bool {
3274 self.views.is_empty()
3275 }
3276
3277 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3281 #[derive(serde::Serialize)]
3282 struct Wrapper<'a> {
3283 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3284 }
3285
3286 let buckets = &self.views;
3287 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3288
3289 let scopings = self.project_info.clone();
3290 self.project_info.clear();
3291
3292 self.views.clear();
3293 self.remaining = self.max_size;
3294
3295 (payload, scopings)
3296 }
3297}
3298
3299#[derive(Debug)]
3303struct SendMetricsRequest {
3304 partition_key: String,
3306 unencoded: Bytes,
3308 encoded: Bytes,
3310 project_info: HashMap<ProjectKey, Scoping>,
3314 http_encoding: HttpEncoding,
3316 metric_outcomes: MetricOutcomes,
3318}
3319
3320impl SendMetricsRequest {
3321 fn create_error_outcomes(self) {
3322 #[derive(serde::Deserialize)]
3323 struct Wrapper {
3324 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3325 }
3326
3327 let buckets = match serde_json::from_slice(&self.unencoded) {
3328 Ok(Wrapper { buckets }) => buckets,
3329 Err(err) => {
3330 relay_log::error!(
3331 error = &err as &dyn std::error::Error,
3332 "failed to parse buckets from failed transmission"
3333 );
3334 return;
3335 }
3336 };
3337
3338 for (key, buckets) in buckets {
3339 let Some(&scoping) = self.project_info.get(&key) else {
3340 relay_log::error!("missing scoping for project key");
3341 continue;
3342 };
3343
3344 self.metric_outcomes.track(
3345 scoping,
3346 &buckets,
3347 Outcome::Invalid(DiscardReason::Internal),
3348 );
3349 }
3350 }
3351}
3352
3353impl UpstreamRequest for SendMetricsRequest {
3354 fn set_relay_id(&self) -> bool {
3355 true
3356 }
3357
3358 fn sign(&mut self) -> Option<Sign> {
3359 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3360 }
3361
3362 fn method(&self) -> reqwest::Method {
3363 reqwest::Method::POST
3364 }
3365
3366 fn path(&self) -> Cow<'_, str> {
3367 "/api/0/relays/metrics/".into()
3368 }
3369
3370 fn route(&self) -> &'static str {
3371 "global_metrics"
3372 }
3373
3374 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3375 metric!(
3376 distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3377 );
3378
3379 builder
3380 .content_encoding(self.http_encoding)
3381 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3382 .header(header::CONTENT_TYPE, b"application/json")
3383 .body(self.encoded.clone());
3384
3385 Ok(())
3386 }
3387
3388 fn respond(
3389 self: Box<Self>,
3390 result: Result<http::Response, UpstreamRequestError>,
3391 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3392 Box::pin(async {
3393 match result {
3394 Ok(mut response) => {
3395 response.consume().await.ok();
3396 }
3397 Err(error) => {
3398 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3399
3400 if error.is_received() {
3403 return;
3404 }
3405
3406 self.create_error_outcomes()
3407 }
3408 }
3409 })
3410 }
3411}
3412
3413#[derive(Copy, Clone, Debug)]
3415struct CombinedQuotas<'a> {
3416 global_quotas: &'a [Quota],
3417 project_quotas: &'a [Quota],
3418}
3419
3420impl<'a> CombinedQuotas<'a> {
3421 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3423 Self {
3424 global_quotas: &global_config.quotas,
3425 project_quotas,
3426 }
3427 }
3428
3429 pub fn is_empty(&self) -> bool {
3431 self.len() == 0
3432 }
3433
3434 pub fn len(&self) -> usize {
3436 self.global_quotas.len() + self.project_quotas.len()
3437 }
3438}
3439
3440impl<'a> IntoIterator for CombinedQuotas<'a> {
3441 type Item = &'a Quota;
3442 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3443
3444 fn into_iter(self) -> Self::IntoIter {
3445 self.global_quotas.iter().chain(self.project_quotas.iter())
3446 }
3447}
3448
3449#[cfg(test)]
3450mod tests {
3451 use std::collections::BTreeMap;
3452
3453 use insta::assert_debug_snapshot;
3454 use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3455 use relay_common::glob2::LazyGlob;
3456 use relay_dynamic_config::ProjectConfig;
3457 use relay_event_normalization::{
3458 MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3459 TransactionNameRule,
3460 };
3461 use relay_event_schema::protocol::TransactionSource;
3462 use relay_pii::DataScrubbingConfig;
3463 use similar_asserts::assert_eq;
3464
3465 use crate::metrics_extraction::IntoMetric;
3466 use crate::metrics_extraction::transactions::types::{
3467 CommonTags, TransactionMeasurementTags, TransactionMetric,
3468 };
3469 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3470
3471 #[cfg(feature = "processing")]
3472 use {
3473 relay_metrics::BucketValue,
3474 relay_quotas::{QuotaScope, ReasonCode},
3475 relay_test::mock_service,
3476 };
3477
3478 use super::*;
3479
3480 #[cfg(feature = "processing")]
3481 fn mock_quota(id: &str) -> Quota {
3482 Quota {
3483 id: Some(id.into()),
3484 categories: [DataCategory::MetricBucket].into(),
3485 scope: QuotaScope::Organization,
3486 scope_id: None,
3487 limit: Some(0),
3488 window: None,
3489 reason_code: None,
3490 namespace: None,
3491 }
3492 }
3493
3494 #[cfg(feature = "processing")]
3495 #[test]
3496 fn test_dynamic_quotas() {
3497 let global_config = GlobalConfig {
3498 quotas: vec![mock_quota("foo"), mock_quota("bar")],
3499 ..Default::default()
3500 };
3501
3502 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3503
3504 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3505
3506 assert_eq!(dynamic_quotas.len(), 4);
3507 assert!(!dynamic_quotas.is_empty());
3508
3509 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3510 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3511 }
3512
3513 #[cfg(feature = "processing")]
3516 #[tokio::test]
3517 async fn test_ratelimit_per_batch() {
3518 use relay_base_schema::organization::OrganizationId;
3519 use relay_protocol::FiniteF64;
3520
3521 let rate_limited_org = Scoping {
3522 organization_id: OrganizationId::new(1),
3523 project_id: ProjectId::new(21),
3524 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3525 key_id: Some(17),
3526 };
3527
3528 let not_rate_limited_org = Scoping {
3529 organization_id: OrganizationId::new(2),
3530 project_id: ProjectId::new(21),
3531 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3532 key_id: Some(17),
3533 };
3534
3535 let message = {
3536 let project_info = {
3537 let quota = Quota {
3538 id: Some("testing".into()),
3539 categories: [DataCategory::MetricBucket].into(),
3540 scope: relay_quotas::QuotaScope::Organization,
3541 scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3542 limit: Some(0),
3543 window: None,
3544 reason_code: Some(ReasonCode::new("test")),
3545 namespace: None,
3546 };
3547
3548 let mut config = ProjectConfig::default();
3549 config.quotas.push(quota);
3550
3551 Arc::new(ProjectInfo {
3552 config,
3553 ..Default::default()
3554 })
3555 };
3556
3557 let project_metrics = |scoping| ProjectBuckets {
3558 buckets: vec![Bucket {
3559 name: "d:transactions/bar".into(),
3560 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3561 timestamp: UnixTimestamp::now(),
3562 tags: Default::default(),
3563 width: 10,
3564 metadata: BucketMetadata::default(),
3565 }],
3566 rate_limits: Default::default(),
3567 project_info: project_info.clone(),
3568 scoping,
3569 };
3570
3571 let buckets = hashbrown::HashMap::from([
3572 (
3573 rate_limited_org.project_key,
3574 project_metrics(rate_limited_org),
3575 ),
3576 (
3577 not_rate_limited_org.project_key,
3578 project_metrics(not_rate_limited_org),
3579 ),
3580 ]);
3581
3582 FlushBuckets {
3583 partition_key: 0,
3584 buckets,
3585 }
3586 };
3587
3588 assert_eq!(message.buckets.keys().count(), 2);
3590
3591 let config = {
3592 let config_json = serde_json::json!({
3593 "processing": {
3594 "enabled": true,
3595 "kafka_config": [],
3596 "redis": {
3597 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3598 }
3599 }
3600 });
3601 Config::from_json_value(config_json).unwrap()
3602 };
3603
3604 let (store, handle) = {
3605 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3606 let org_id = match msg {
3607 Store::Metrics(x) => x.scoping.organization_id,
3608 _ => panic!("received envelope when expecting only metrics"),
3609 };
3610 org_ids.push(org_id);
3611 };
3612
3613 mock_service("store_forwarder", vec![], f)
3614 };
3615
3616 let processor = create_test_processor(config).await;
3617 assert!(processor.redis_rate_limiter_enabled());
3618
3619 processor.encode_metrics_processing(message, &store).await;
3620
3621 drop(store);
3622 let orgs_not_ratelimited = handle.await.unwrap();
3623
3624 assert_eq!(
3625 orgs_not_ratelimited,
3626 vec![not_rate_limited_org.organization_id]
3627 );
3628 }
3629
3630 #[tokio::test]
3631 async fn test_browser_version_extraction_with_pii_like_data() {
3632 let processor = create_test_processor(Default::default()).await;
3633 let outcome_aggregator = Addr::dummy();
3634 let event_id = EventId::new();
3635
3636 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3637 .parse()
3638 .unwrap();
3639
3640 let request_meta = RequestMeta::new(dsn);
3641 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3642
3643 envelope.add_item({
3644 let mut item = Item::new(ItemType::Event);
3645 item.set_payload(
3646 ContentType::Json,
3647 r#"
3648 {
3649 "request": {
3650 "headers": [
3651 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3652 ]
3653 }
3654 }
3655 "#,
3656 );
3657 item
3658 });
3659
3660 let mut datascrubbing_settings = DataScrubbingConfig::default();
3661 datascrubbing_settings.scrub_data = true;
3663 datascrubbing_settings.scrub_defaults = true;
3664 datascrubbing_settings.scrub_ip_addresses = true;
3665
3666 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3668
3669 let config = ProjectConfig {
3670 datascrubbing_settings,
3671 pii_config: Some(pii_config),
3672 ..Default::default()
3673 };
3674
3675 let project_info = ProjectInfo {
3676 config,
3677 ..Default::default()
3678 };
3679
3680 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3681 assert_eq!(envelopes.len(), 1);
3682
3683 let (group, envelope) = envelopes.pop().unwrap();
3684 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3685
3686 let message = ProcessEnvelopeGrouped {
3687 group,
3688 envelope,
3689 ctx: processing::Context {
3690 project_info: &project_info,
3691 ..processing::Context::for_test()
3692 },
3693 };
3694
3695 let Ok(Some(Submit::Envelope(mut new_envelope))) =
3696 processor.process(&mut Token::noop(), message).await
3697 else {
3698 panic!();
3699 };
3700 let new_envelope = new_envelope.envelope_mut();
3701
3702 let event_item = new_envelope.items().last().unwrap();
3703 let annotated_event: Annotated<Event> =
3704 Annotated::from_json_bytes(&event_item.payload()).unwrap();
3705 let event = annotated_event.into_value().unwrap();
3706 let headers = event
3707 .request
3708 .into_value()
3709 .unwrap()
3710 .headers
3711 .into_value()
3712 .unwrap();
3713
3714 assert_eq!(
3716 Some(
3717 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3718 ),
3719 headers.get_header("User-Agent")
3720 );
3721 let contexts = event.contexts.into_value().unwrap();
3723 let browser = contexts.0.get("browser").unwrap();
3724 assert_eq!(
3725 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3726 browser.to_json().unwrap()
3727 );
3728 }
3729
3730 #[tokio::test]
3731 #[cfg(feature = "processing")]
3732 async fn test_materialize_dsc() {
3733 use crate::services::projects::project::PublicKeyConfig;
3734
3735 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3736 .parse()
3737 .unwrap();
3738 let request_meta = RequestMeta::new(dsn);
3739 let mut envelope = Envelope::from_request(None, request_meta);
3740
3741 let dsc = r#"{
3742 "trace_id": "00000000-0000-0000-0000-000000000000",
3743 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3744 "sample_rate": "0.2"
3745 }"#;
3746 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3747
3748 let mut item = Item::new(ItemType::Event);
3749 item.set_payload(ContentType::Json, r#"{}"#);
3750 envelope.add_item(item);
3751
3752 let outcome_aggregator = Addr::dummy();
3753 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3754
3755 let mut project_info = ProjectInfo::default();
3756 project_info.public_keys.push(PublicKeyConfig {
3757 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3758 numeric_id: Some(1),
3759 });
3760
3761 let config = serde_json::json!({
3762 "processing": {
3763 "enabled": true,
3764 "kafka_config": [],
3765 }
3766 });
3767
3768 let message = ProcessEnvelopeGrouped {
3769 group: ProcessingGroup::Transaction,
3770 envelope: managed_envelope,
3771 ctx: processing::Context {
3772 config: &Config::from_json_value(config.clone()).unwrap(),
3773 project_info: &project_info,
3774 sampling_project_info: Some(&project_info),
3775 ..processing::Context::for_test()
3776 },
3777 };
3778
3779 let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3780 let Ok(Some(Submit::Envelope(envelope))) =
3781 processor.process(&mut Token::noop(), message).await
3782 else {
3783 panic!();
3784 };
3785 let event = envelope
3786 .envelope()
3787 .get_item_by(|item| item.ty() == &ItemType::Event)
3788 .unwrap();
3789
3790 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3791 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3792 Object(
3793 {
3794 "environment": ~,
3795 "public_key": String(
3796 "e12d836b15bb49d7bbf99e64295d995b",
3797 ),
3798 "release": ~,
3799 "replay_id": ~,
3800 "sample_rate": String(
3801 "0.2",
3802 ),
3803 "trace_id": String(
3804 "00000000000000000000000000000000",
3805 ),
3806 "transaction": ~,
3807 },
3808 )
3809 "###);
3810 }
3811
3812 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3813 let mut event = Annotated::<Event>::from_json(
3814 r#"
3815 {
3816 "type": "transaction",
3817 "transaction": "/foo/",
3818 "timestamp": 946684810.0,
3819 "start_timestamp": 946684800.0,
3820 "contexts": {
3821 "trace": {
3822 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3823 "span_id": "fa90fdead5f74053",
3824 "op": "http.server",
3825 "type": "trace"
3826 }
3827 },
3828 "transaction_info": {
3829 "source": "url"
3830 }
3831 }
3832 "#,
3833 )
3834 .unwrap();
3835 let e = event.value_mut().as_mut().unwrap();
3836 e.transaction.set_value(Some(transaction_name.into()));
3837
3838 e.transaction_info
3839 .value_mut()
3840 .as_mut()
3841 .unwrap()
3842 .source
3843 .set_value(Some(source));
3844
3845 relay_statsd::with_capturing_test_client(|| {
3846 utils::log_transaction_name_metrics(&mut event, |event| {
3847 let config = NormalizationConfig {
3848 transaction_name_config: TransactionNameConfig {
3849 rules: &[TransactionNameRule {
3850 pattern: LazyGlob::new("/foo/*/**".to_owned()),
3851 expiry: DateTime::<Utc>::MAX_UTC,
3852 redaction: RedactionRule::Replace {
3853 substitution: "*".to_owned(),
3854 },
3855 }],
3856 },
3857 ..Default::default()
3858 };
3859 relay_event_normalization::normalize_event(event, &config)
3860 });
3861 })
3862 }
3863
3864 #[test]
3865 fn test_log_transaction_metrics_none() {
3866 let captures = capture_test_event("/nothing", TransactionSource::Url);
3867 insta::assert_debug_snapshot!(captures, @r###"
3868 [
3869 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3870 ]
3871 "###);
3872 }
3873
3874 #[test]
3875 fn test_log_transaction_metrics_rule() {
3876 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3877 insta::assert_debug_snapshot!(captures, @r###"
3878 [
3879 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3880 ]
3881 "###);
3882 }
3883
3884 #[test]
3885 fn test_log_transaction_metrics_pattern() {
3886 let captures = capture_test_event("/something/12345", TransactionSource::Url);
3887 insta::assert_debug_snapshot!(captures, @r###"
3888 [
3889 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3890 ]
3891 "###);
3892 }
3893
3894 #[test]
3895 fn test_log_transaction_metrics_both() {
3896 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3897 insta::assert_debug_snapshot!(captures, @r###"
3898 [
3899 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3900 ]
3901 "###);
3902 }
3903
3904 #[test]
3905 fn test_log_transaction_metrics_no_match() {
3906 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3907 insta::assert_debug_snapshot!(captures, @r###"
3908 [
3909 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3910 ]
3911 "###);
3912 }
3913
3914 #[test]
3918 fn test_mri_overhead_constant() {
3919 let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
3920
3921 let derived_value = {
3922 let name = "foobar".to_owned();
3923 let value = 5.into(); let unit = MetricUnit::Duration(DurationUnit::default());
3925 let tags = TransactionMeasurementTags {
3926 measurement_rating: None,
3927 universal_tags: CommonTags(BTreeMap::new()),
3928 score_profile_version: None,
3929 };
3930
3931 let measurement = TransactionMetric::Measurement {
3932 name: name.clone(),
3933 value,
3934 unit,
3935 tags,
3936 };
3937
3938 let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
3939 metric.name.len() - unit.to_string().len() - name.len()
3940 };
3941 assert_eq!(
3942 hardcoded_value, derived_value,
3943 "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
3944 );
3945 }
3946
3947 #[tokio::test]
3948 async fn test_process_metrics_bucket_metadata() {
3949 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3950 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3951 let received_at = Utc::now();
3952 let config = Config::default();
3953
3954 let (aggregator, mut aggregator_rx) = Addr::custom();
3955 let processor = create_test_processor_with_addrs(
3956 config,
3957 Addrs {
3958 aggregator,
3959 ..Default::default()
3960 },
3961 )
3962 .await;
3963
3964 let mut item = Item::new(ItemType::Statsd);
3965 item.set_payload(
3966 ContentType::Text,
3967 "transactions/foo:3182887624:4267882815|s",
3968 );
3969 for (source, expected_received_at) in [
3970 (
3971 BucketSource::External,
3972 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3973 ),
3974 (BucketSource::Internal, None),
3975 ] {
3976 let message = ProcessMetrics {
3977 data: MetricData::Raw(vec![item.clone()]),
3978 project_key,
3979 source,
3980 received_at,
3981 sent_at: Some(Utc::now()),
3982 };
3983 processor.handle_process_metrics(&mut token, message);
3984
3985 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3986 let buckets = merge_buckets.buckets;
3987 assert_eq!(buckets.len(), 1);
3988 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3989 }
3990 }
3991
3992 #[tokio::test]
3993 async fn test_process_batched_metrics() {
3994 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3995 let received_at = Utc::now();
3996 let config = Config::default();
3997
3998 let (aggregator, mut aggregator_rx) = Addr::custom();
3999 let processor = create_test_processor_with_addrs(
4000 config,
4001 Addrs {
4002 aggregator,
4003 ..Default::default()
4004 },
4005 )
4006 .await;
4007
4008 let payload = r#"{
4009 "buckets": {
4010 "11111111111111111111111111111111": [
4011 {
4012 "timestamp": 1615889440,
4013 "width": 0,
4014 "name": "d:custom/endpoint.response_time@millisecond",
4015 "type": "d",
4016 "value": [
4017 68.0
4018 ],
4019 "tags": {
4020 "route": "user_index"
4021 }
4022 }
4023 ],
4024 "22222222222222222222222222222222": [
4025 {
4026 "timestamp": 1615889440,
4027 "width": 0,
4028 "name": "d:custom/endpoint.cache_rate@none",
4029 "type": "d",
4030 "value": [
4031 36.0
4032 ]
4033 }
4034 ]
4035 }
4036}
4037"#;
4038 let message = ProcessBatchedMetrics {
4039 payload: Bytes::from(payload),
4040 source: BucketSource::Internal,
4041 received_at,
4042 sent_at: Some(Utc::now()),
4043 };
4044 processor.handle_process_batched_metrics(&mut token, message);
4045
4046 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
4047 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
4048
4049 let mut messages = vec![mb1, mb2];
4050 messages.sort_by_key(|mb| mb.project_key);
4051
4052 let actual = messages
4053 .into_iter()
4054 .map(|mb| (mb.project_key, mb.buckets))
4055 .collect::<Vec<_>>();
4056
4057 assert_debug_snapshot!(actual, @r###"
4058 [
4059 (
4060 ProjectKey("11111111111111111111111111111111"),
4061 [
4062 Bucket {
4063 timestamp: UnixTimestamp(1615889440),
4064 width: 0,
4065 name: MetricName(
4066 "d:custom/endpoint.response_time@millisecond",
4067 ),
4068 value: Distribution(
4069 [
4070 68.0,
4071 ],
4072 ),
4073 tags: {
4074 "route": "user_index",
4075 },
4076 metadata: BucketMetadata {
4077 merges: 1,
4078 received_at: None,
4079 extracted_from_indexed: false,
4080 },
4081 },
4082 ],
4083 ),
4084 (
4085 ProjectKey("22222222222222222222222222222222"),
4086 [
4087 Bucket {
4088 timestamp: UnixTimestamp(1615889440),
4089 width: 0,
4090 name: MetricName(
4091 "d:custom/endpoint.cache_rate@none",
4092 ),
4093 value: Distribution(
4094 [
4095 36.0,
4096 ],
4097 ),
4098 tags: {},
4099 metadata: BucketMetadata {
4100 merges: 1,
4101 received_at: None,
4102 extracted_from_indexed: false,
4103 },
4104 },
4105 ],
4106 ),
4107 ]
4108 "###);
4109 }
4110}