1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27 ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_protocol::Annotated;
32use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
33use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision};
34use relay_statsd::metric;
35use relay_system::{Addr, FromMessage, NoResponse, Service};
36use reqwest::header;
37use smallvec::{SmallVec, smallvec};
38use zstd::stream::Encoder as ZstdEncoder;
39
40use crate::envelope::{
41 self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
42};
43use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
44use crate::integrations::Integration;
45use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
46use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
47use crate::metrics_extraction::transactions::ExtractedMetrics;
48use crate::metrics_extraction::transactions::types::ExtractMetricsError;
49use crate::processing::attachments::AttachmentProcessor;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::client_reports::ClientReportsProcessor;
52use crate::processing::errors::{ErrorsProcessor, SwitchProcessingError};
53use crate::processing::logs::LogsProcessor;
54use crate::processing::profile_chunks::ProfileChunksProcessor;
55use crate::processing::profiles::ProfilesProcessor;
56use crate::processing::replays::ReplaysProcessor;
57use crate::processing::sessions::SessionsProcessor;
58use crate::processing::spans::SpansProcessor;
59use crate::processing::trace_attachments::TraceAttachmentsProcessor;
60use crate::processing::trace_metrics::TraceMetricsProcessor;
61use crate::processing::transactions::TransactionProcessor;
62use crate::processing::user_reports::UserReportsProcessor;
63use crate::processing::utils::event::{
64 EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
65 event_type,
66};
67use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
68use crate::service::ServiceError;
69use crate::services::global_config::GlobalConfigHandle;
70use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
71use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
72use crate::services::projects::cache::ProjectCacheHandle;
73use crate::services::projects::project::{ProjectInfo, ProjectState};
74use crate::services::upstream::{
75 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
76};
77use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
78use crate::utils::{self, CheckLimits, EnvelopeLimiter};
79use crate::{http, processing};
80use relay_threading::AsyncPool;
81#[cfg(feature = "processing")]
82use {
83 crate::services::objectstore::Objectstore,
84 crate::services::store::Store,
85 crate::utils::Enforcement,
86 itertools::Itertools,
87 relay_cardinality::{
88 CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
89 RedisSetLimiterOptions,
90 },
91 relay_dynamic_config::CardinalityLimiterMode,
92 relay_quotas::{RateLimitingError, RedisRateLimiter},
93 relay_redis::RedisClients,
94 std::time::Instant,
95 symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
96};
97
98mod attachment;
99mod dynamic_sampling;
100pub mod event;
101mod metrics;
102mod nel;
103mod profile;
104mod report;
105mod span;
106
107#[cfg(all(sentry, feature = "processing"))]
108pub mod playstation;
109mod standalone;
110#[cfg(feature = "processing")]
111mod unreal;
112
113#[cfg(feature = "processing")]
114mod nnswitch;
115
116macro_rules! if_processing {
120 ($config:expr, $if_true:block) => {
121 #[cfg(feature = "processing")] {
122 if $config.processing_enabled() $if_true
123 }
124 };
125 ($config:expr, $if_true:block else $if_false:block) => {
126 {
127 #[cfg(feature = "processing")] {
128 if $config.processing_enabled() $if_true else $if_false
129 }
130 #[cfg(not(feature = "processing"))] {
131 $if_false
132 }
133 }
134 };
135}
136
137pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
139
140#[derive(Debug)]
141pub struct GroupTypeError;
142
143impl Display for GroupTypeError {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 f.write_str("failed to convert processing group into corresponding type")
146 }
147}
148
149impl std::error::Error for GroupTypeError {}
150
151macro_rules! processing_group {
152 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
153 #[derive(Clone, Copy, Debug)]
154 pub struct $ty;
155
156 impl From<$ty> for ProcessingGroup {
157 fn from(_: $ty) -> Self {
158 ProcessingGroup::$variant
159 }
160 }
161
162 impl TryFrom<ProcessingGroup> for $ty {
163 type Error = GroupTypeError;
164
165 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
166 if matches!(value, ProcessingGroup::$variant) {
167 return Ok($ty);
168 }
169 $($(
170 if matches!(value, ProcessingGroup::$other) {
171 return Ok($ty);
172 }
173 )+)?
174 return Err(GroupTypeError);
175 }
176 }
177 };
178}
179
180pub trait EventProcessing {}
184
185processing_group!(TransactionGroup, Transaction);
186impl EventProcessing for TransactionGroup {}
187
188processing_group!(ErrorGroup, Error);
189impl EventProcessing for ErrorGroup {}
190
191processing_group!(SessionGroup, Session);
192processing_group!(
193 StandaloneGroup,
194 Standalone,
195 StandaloneAttachments,
196 StandaloneUserReports,
197 StandaloneProfiles
198);
199processing_group!(ClientReportGroup, ClientReport);
200processing_group!(ReplayGroup, Replay);
201processing_group!(CheckInGroup, CheckIn);
202processing_group!(LogGroup, Log, Nel);
203processing_group!(TraceMetricGroup, TraceMetric);
204processing_group!(SpanGroup, Span);
205
206processing_group!(ProfileChunkGroup, ProfileChunk);
207processing_group!(MetricsGroup, Metrics);
208processing_group!(ForwardUnknownGroup, ForwardUnknown);
209processing_group!(Ungrouped, Ungrouped);
210
211#[derive(Clone, Copy, Debug)]
215pub struct Processed;
216
217#[derive(Clone, Copy, Debug)]
219pub enum ProcessingGroup {
220 Transaction,
224 Error,
229 Session,
231 Standalone,
234 StandaloneAttachments,
238 StandaloneUserReports,
242 StandaloneProfiles,
246 ClientReport,
248 Replay,
250 CheckIn,
252 Nel,
254 Log,
256 TraceMetric,
258 Span,
260 SpanV2,
262 Metrics,
264 ProfileChunk,
266 TraceAttachment,
268 ForwardUnknown,
271 Ungrouped,
273}
274
275impl ProcessingGroup {
276 fn split_envelope(
278 mut envelope: Envelope,
279 project_info: &ProjectInfo,
280 ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
281 let headers = envelope.headers().clone();
282 let mut grouped_envelopes = smallvec![];
283
284 let replay_items = envelope.take_items_by(|item| {
286 matches!(
287 item.ty(),
288 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
289 )
290 });
291 if !replay_items.is_empty() {
292 grouped_envelopes.push((
293 ProcessingGroup::Replay,
294 Envelope::from_parts(headers.clone(), replay_items),
295 ))
296 }
297
298 let session_items = envelope
300 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
301 if !session_items.is_empty() {
302 grouped_envelopes.push((
303 ProcessingGroup::Session,
304 Envelope::from_parts(headers.clone(), session_items),
305 ))
306 }
307
308 let span_v2_items = envelope.take_items_by(|item| {
309 let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
310
311 ItemContainer::<SpanV2>::is_container(item)
312 || matches!(item.integration(), Some(Integration::Spans(_)))
313 || (exp_feature && matches!(item.ty(), &ItemType::Span))
315 || (exp_feature && item.is_span_attachment())
316 });
317
318 if !span_v2_items.is_empty() {
319 grouped_envelopes.push((
320 ProcessingGroup::SpanV2,
321 Envelope::from_parts(headers.clone(), span_v2_items),
322 ))
323 }
324
325 let span_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Span));
327 if !span_items.is_empty() {
328 grouped_envelopes.push((
329 ProcessingGroup::Span,
330 Envelope::from_parts(headers.clone(), span_items),
331 ))
332 }
333
334 let logs_items = envelope.take_items_by(|item| {
336 matches!(item.ty(), &ItemType::Log)
337 || matches!(item.integration(), Some(Integration::Logs(_)))
338 });
339 if !logs_items.is_empty() {
340 grouped_envelopes.push((
341 ProcessingGroup::Log,
342 Envelope::from_parts(headers.clone(), logs_items),
343 ))
344 }
345
346 let trace_metric_items =
348 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
349 if !trace_metric_items.is_empty() {
350 grouped_envelopes.push((
351 ProcessingGroup::TraceMetric,
352 Envelope::from_parts(headers.clone(), trace_metric_items),
353 ))
354 }
355
356 let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
358 if !nel_items.is_empty() {
359 grouped_envelopes.push((
360 ProcessingGroup::Nel,
361 Envelope::from_parts(headers.clone(), nel_items),
362 ))
363 }
364
365 let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
370 if !metric_items.is_empty() {
371 grouped_envelopes.push((
372 ProcessingGroup::Metrics,
373 Envelope::from_parts(headers.clone(), metric_items),
374 ))
375 }
376
377 let profile_chunk_items =
379 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
380 if !profile_chunk_items.is_empty() {
381 grouped_envelopes.push((
382 ProcessingGroup::ProfileChunk,
383 Envelope::from_parts(headers.clone(), profile_chunk_items),
384 ))
385 }
386
387 let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
388 if !trace_attachment_items.is_empty() {
389 grouped_envelopes.push((
390 ProcessingGroup::TraceAttachment,
391 Envelope::from_parts(headers.clone(), trace_attachment_items),
392 ))
393 }
394
395 if !envelope.items().any(Item::creates_event) {
396 let standalone_attachments = envelope
398 .take_items_by(|i| i.requires_event() && matches!(i.ty(), ItemType::Attachment));
399 if !standalone_attachments.is_empty() {
400 grouped_envelopes.push((
401 ProcessingGroup::StandaloneAttachments,
402 Envelope::from_parts(headers.clone(), standalone_attachments),
403 ))
404 }
405
406 let standalone_user_reports =
408 envelope.take_items_by(|i| matches!(i.ty(), ItemType::UserReport));
409 if !standalone_user_reports.is_empty() {
410 grouped_envelopes.push((
411 ProcessingGroup::StandaloneUserReports,
412 Envelope::from_parts(headers.clone(), standalone_user_reports),
413 ));
414 }
415
416 let standalone_profiles =
418 envelope.take_items_by(|i| matches!(i.ty(), ItemType::Profile));
419 if !standalone_profiles.is_empty() {
420 grouped_envelopes.push((
421 ProcessingGroup::StandaloneProfiles,
422 Envelope::from_parts(headers.clone(), standalone_profiles),
423 ));
424 }
425 }
426
427 if !envelope.items().any(Item::creates_event) {
432 let standalone_items = envelope.take_items_by(Item::requires_event);
433 if !standalone_items.is_empty() {
434 grouped_envelopes.push((
435 ProcessingGroup::Standalone,
436 Envelope::from_parts(headers.clone(), standalone_items),
437 ))
438 }
439 };
440
441 let security_reports_items = envelope
443 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
444 .into_iter()
445 .map(|item| {
446 let headers = headers.clone();
447 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
448 let mut envelope = Envelope::from_parts(headers, items);
449 envelope.set_event_id(EventId::new());
450 (ProcessingGroup::Error, envelope)
451 });
452 grouped_envelopes.extend(security_reports_items);
453
454 let require_event_items = envelope.take_items_by(Item::requires_event);
456 if !require_event_items.is_empty() {
457 let group = if require_event_items
458 .iter()
459 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
460 {
461 ProcessingGroup::Transaction
462 } else {
463 ProcessingGroup::Error
464 };
465
466 grouped_envelopes.push((
467 group,
468 Envelope::from_parts(headers.clone(), require_event_items),
469 ))
470 }
471
472 let envelopes = envelope.items_mut().map(|item| {
474 let headers = headers.clone();
475 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
476 let envelope = Envelope::from_parts(headers, items);
477 let item_type = item.ty();
478 let group = if matches!(item_type, &ItemType::CheckIn) {
479 ProcessingGroup::CheckIn
480 } else if matches!(item.ty(), &ItemType::ClientReport) {
481 ProcessingGroup::ClientReport
482 } else if matches!(item_type, &ItemType::Unknown(_)) {
483 ProcessingGroup::ForwardUnknown
484 } else {
485 ProcessingGroup::Ungrouped
487 };
488
489 (group, envelope)
490 });
491 grouped_envelopes.extend(envelopes);
492
493 grouped_envelopes
494 }
495
496 pub fn variant(&self) -> &'static str {
498 match self {
499 ProcessingGroup::Transaction => "transaction",
500 ProcessingGroup::Error => "error",
501 ProcessingGroup::Session => "session",
502 ProcessingGroup::Standalone => "standalone",
503 ProcessingGroup::StandaloneAttachments => "standalone_attachment",
504 ProcessingGroup::StandaloneUserReports => "standalone_user_reports",
505 ProcessingGroup::StandaloneProfiles => "standalone_profiles",
506 ProcessingGroup::ClientReport => "client_report",
507 ProcessingGroup::Replay => "replay",
508 ProcessingGroup::CheckIn => "check_in",
509 ProcessingGroup::Log => "log",
510 ProcessingGroup::TraceMetric => "trace_metric",
511 ProcessingGroup::Nel => "nel",
512 ProcessingGroup::Span => "span",
513 ProcessingGroup::SpanV2 => "span_v2",
514 ProcessingGroup::Metrics => "metrics",
515 ProcessingGroup::ProfileChunk => "profile_chunk",
516 ProcessingGroup::TraceAttachment => "trace_attachment",
517 ProcessingGroup::ForwardUnknown => "forward_unknown",
518 ProcessingGroup::Ungrouped => "ungrouped",
519 }
520 }
521}
522
523impl From<ProcessingGroup> for AppFeature {
524 fn from(value: ProcessingGroup) -> Self {
525 match value {
526 ProcessingGroup::Transaction => AppFeature::Transactions,
527 ProcessingGroup::Error => AppFeature::Errors,
528 ProcessingGroup::Session => AppFeature::Sessions,
529 ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
530 ProcessingGroup::StandaloneAttachments => AppFeature::UnattributedEnvelope,
531 ProcessingGroup::StandaloneUserReports => AppFeature::UserReports,
532 ProcessingGroup::StandaloneProfiles => AppFeature::Profiles,
533 ProcessingGroup::ClientReport => AppFeature::ClientReports,
534 ProcessingGroup::Replay => AppFeature::Replays,
535 ProcessingGroup::CheckIn => AppFeature::CheckIns,
536 ProcessingGroup::Log => AppFeature::Logs,
537 ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
538 ProcessingGroup::Nel => AppFeature::Logs,
539 ProcessingGroup::Span => AppFeature::Spans,
540 ProcessingGroup::SpanV2 => AppFeature::Spans,
541 ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
542 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
543 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
544 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
545 ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
546 }
547 }
548}
549
550#[derive(Debug, thiserror::Error)]
552pub enum ProcessingError {
553 #[error("invalid json in event")]
554 InvalidJson(#[source] serde_json::Error),
555
556 #[error("invalid message pack event payload")]
557 InvalidMsgpack(#[from] rmp_serde::decode::Error),
558
559 #[cfg(feature = "processing")]
560 #[error("invalid unreal crash report")]
561 InvalidUnrealReport(#[source] Unreal4Error),
562
563 #[error("event payload too large")]
564 PayloadTooLarge(DiscardItemType),
565
566 #[error("invalid transaction event")]
567 InvalidTransaction,
568
569 #[error("the item is not allowed/supported in this envelope")]
570 UnsupportedItem,
571
572 #[error("envelope processor failed")]
573 ProcessingFailed(#[from] ProcessingAction),
574
575 #[error("duplicate {0} in event")]
576 DuplicateItem(ItemType),
577
578 #[error("failed to extract event payload")]
579 NoEventPayload,
580
581 #[error("missing project id in DSN")]
582 MissingProjectId,
583
584 #[error("invalid security report type: {0:?}")]
585 InvalidSecurityType(Bytes),
586
587 #[error("unsupported security report type")]
588 UnsupportedSecurityType,
589
590 #[error("invalid security report")]
591 InvalidSecurityReport(#[source] serde_json::Error),
592
593 #[error("invalid nel report")]
594 InvalidNelReport(#[source] NetworkReportError),
595
596 #[error("event filtered with reason: {0:?}")]
597 EventFiltered(FilterStatKey),
598
599 #[error("missing or invalid required event timestamp")]
600 InvalidTimestamp,
601
602 #[error("could not serialize event payload")]
603 SerializeFailed(#[source] serde_json::Error),
604
605 #[cfg(feature = "processing")]
606 #[error("failed to apply quotas")]
607 QuotasFailed(#[from] RateLimitingError),
608
609 #[error("invalid processing group type")]
610 InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
611
612 #[error("nintendo switch dying message processing failed {0:?}")]
613 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
614
615 #[cfg(all(sentry, feature = "processing"))]
616 #[error("playstation dump processing failed: {0}")]
617 InvalidPlaystationDump(String),
618
619 #[error("processing group does not match specific processor")]
620 ProcessingGroupMismatch,
621 #[error("new processing pipeline failed")]
622 ProcessingFailure,
623}
624
625impl ProcessingError {
626 pub fn to_outcome(&self) -> Option<Outcome> {
627 match self {
628 Self::PayloadTooLarge(payload_type) => {
629 Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
630 }
631 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
632 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
633 Self::InvalidSecurityType(_) => {
634 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
635 }
636 Self::UnsupportedItem => Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)),
637 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
638 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
639 Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
640 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
641 Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
642 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
643 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
644 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
645 #[cfg(all(sentry, feature = "processing"))]
646 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
647 #[cfg(feature = "processing")]
648 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
649 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
650 }
651 #[cfg(feature = "processing")]
652 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
653 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
654 Some(Outcome::Invalid(DiscardReason::Internal))
655 }
656 #[cfg(feature = "processing")]
657 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
658 Self::MissingProjectId => None,
659 Self::EventFiltered(key) => Some(Outcome::Filtered(key.clone())),
660 Self::InvalidProcessingGroup(_) => None,
661
662 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
663 Self::ProcessingFailure => None,
665 }
666 }
667
668 fn is_unexpected(&self) -> bool {
669 self.to_outcome()
670 .is_some_and(|outcome| outcome.is_unexpected())
671 }
672}
673
674#[cfg(feature = "processing")]
675impl From<Unreal4Error> for ProcessingError {
676 fn from(err: Unreal4Error) -> Self {
677 match err.kind() {
678 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
679 _ => ProcessingError::InvalidUnrealReport(err),
680 }
681 }
682}
683
684impl From<ExtractMetricsError> for ProcessingError {
685 fn from(error: ExtractMetricsError) -> Self {
686 match error {
687 ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
688 Self::InvalidTimestamp
689 }
690 }
691 }
692}
693
694impl From<InvalidProcessingGroupType> for ProcessingError {
695 fn from(value: InvalidProcessingGroupType) -> Self {
696 Self::InvalidProcessingGroup(Box::new(value))
697 }
698}
699
700type ExtractedEvent = (Annotated<Event>, usize);
701
702#[derive(Debug)]
707pub struct ProcessingExtractedMetrics {
708 metrics: ExtractedMetrics,
709}
710
711impl ProcessingExtractedMetrics {
712 pub fn new() -> Self {
713 Self {
714 metrics: ExtractedMetrics::default(),
715 }
716 }
717
718 pub fn into_inner(self) -> ExtractedMetrics {
719 self.metrics
720 }
721
722 pub fn extend(
724 &mut self,
725 extracted: ExtractedMetrics,
726 sampling_decision: Option<SamplingDecision>,
727 ) {
728 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
729 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
730 }
731
732 pub fn extend_project_metrics<I>(
734 &mut self,
735 buckets: I,
736 sampling_decision: Option<SamplingDecision>,
737 ) where
738 I: IntoIterator<Item = Bucket>,
739 {
740 self.metrics
741 .project_metrics
742 .extend(buckets.into_iter().map(|mut bucket| {
743 bucket.metadata.extracted_from_indexed =
744 sampling_decision == Some(SamplingDecision::Keep);
745 bucket
746 }));
747 }
748
749 pub fn extend_sampling_metrics<I>(
751 &mut self,
752 buckets: I,
753 sampling_decision: Option<SamplingDecision>,
754 ) where
755 I: IntoIterator<Item = Bucket>,
756 {
757 self.metrics
758 .sampling_metrics
759 .extend(buckets.into_iter().map(|mut bucket| {
760 bucket.metadata.extracted_from_indexed =
761 sampling_decision == Some(SamplingDecision::Keep);
762 bucket
763 }));
764 }
765
766 #[cfg(feature = "processing")]
771 fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
772 let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
774 let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
777
778 for (namespace, limit, indexed) in [
779 (
780 MetricNamespace::Transactions,
781 &enforcement.event,
782 &enforcement.event_indexed,
783 ),
784 (
785 MetricNamespace::Spans,
786 &enforcement.spans,
787 &enforcement.spans_indexed,
788 ),
789 ] {
790 if limit.is_active() {
791 drop_namespaces.push(namespace);
792 } else if indexed.is_active() && !enforced_consistently {
793 reset_extracted_from_indexed.push(namespace);
798 }
799 }
800
801 if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
802 self.retain_mut(|bucket| {
803 let Some(namespace) = bucket.name.try_namespace() else {
804 return true;
805 };
806
807 if drop_namespaces.contains(&namespace) {
808 return false;
809 }
810
811 if reset_extracted_from_indexed.contains(&namespace) {
812 bucket.metadata.extracted_from_indexed = false;
813 }
814
815 true
816 });
817 }
818 }
819
820 #[cfg(feature = "processing")]
821 fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
822 self.metrics.project_metrics.retain_mut(&mut f);
823 self.metrics.sampling_metrics.retain_mut(&mut f);
824 }
825}
826
827fn send_metrics(
828 metrics: ExtractedMetrics,
829 project_key: ProjectKey,
830 sampling_key: Option<ProjectKey>,
831 aggregator: &Addr<Aggregator>,
832) {
833 let ExtractedMetrics {
834 project_metrics,
835 sampling_metrics,
836 } = metrics;
837
838 if !project_metrics.is_empty() {
839 aggregator.send(MergeBuckets {
840 project_key,
841 buckets: project_metrics,
842 });
843 }
844
845 if !sampling_metrics.is_empty() {
846 let sampling_project_key = sampling_key.unwrap_or(project_key);
853 aggregator.send(MergeBuckets {
854 project_key: sampling_project_key,
855 buckets: sampling_metrics,
856 });
857 }
858}
859
860fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
865 match config.relay_mode() {
866 RelayMode::Proxy => false,
867 RelayMode::Managed => !project_info.has_feature(feature),
868 }
869}
870
871#[derive(Debug)]
874#[expect(
875 clippy::large_enum_variant,
876 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
877)]
878enum ProcessingResult {
879 Envelope {
880 managed_envelope: TypedEnvelope<Processed>,
881 extracted_metrics: ProcessingExtractedMetrics,
882 },
883 Output(Output<Outputs>),
884}
885
886impl ProcessingResult {
887 fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
889 Self::Envelope {
890 managed_envelope,
891 extracted_metrics: ProcessingExtractedMetrics::new(),
892 }
893 }
894}
895
896#[derive(Debug)]
898#[expect(
899 clippy::large_enum_variant,
900 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
901)]
902enum Submit<'a> {
903 Envelope(TypedEnvelope<Processed>),
905 Output {
907 output: Outputs,
908 ctx: processing::ForwardContext<'a>,
909 },
910}
911
912#[derive(Debug)]
922pub struct ProcessEnvelope {
923 pub envelope: ManagedEnvelope,
925 pub project_info: Arc<ProjectInfo>,
927 pub rate_limits: Arc<RateLimits>,
929 pub sampling_project_info: Option<Arc<ProjectInfo>>,
931 pub reservoir_counters: ReservoirCounters,
933}
934
935#[derive(Debug)]
937struct ProcessEnvelopeGrouped<'a> {
938 pub group: ProcessingGroup,
940 pub envelope: ManagedEnvelope,
942 pub ctx: processing::Context<'a>,
944}
945
946#[derive(Debug)]
958pub struct ProcessMetrics {
959 pub data: MetricData,
961 pub project_key: ProjectKey,
963 pub source: BucketSource,
965 pub received_at: DateTime<Utc>,
967 pub sent_at: Option<DateTime<Utc>>,
970}
971
972#[derive(Debug)]
974pub enum MetricData {
975 Raw(Vec<Item>),
977 Parsed(Vec<Bucket>),
979}
980
981impl MetricData {
982 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
987 let items = match self {
988 Self::Parsed(buckets) => return buckets,
989 Self::Raw(items) => items,
990 };
991
992 let mut buckets = Vec::new();
993 for item in items {
994 let payload = item.payload();
995 if item.ty() == &ItemType::Statsd {
996 for bucket_result in Bucket::parse_all(&payload, timestamp) {
997 match bucket_result {
998 Ok(bucket) => buckets.push(bucket),
999 Err(error) => relay_log::debug!(
1000 error = &error as &dyn Error,
1001 "failed to parse metric bucket from statsd format",
1002 ),
1003 }
1004 }
1005 } else if item.ty() == &ItemType::MetricBuckets {
1006 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
1007 Ok(parsed_buckets) => {
1008 if buckets.is_empty() {
1010 buckets = parsed_buckets;
1011 } else {
1012 buckets.extend(parsed_buckets);
1013 }
1014 }
1015 Err(error) => {
1016 relay_log::debug!(
1017 error = &error as &dyn Error,
1018 "failed to parse metric bucket",
1019 );
1020 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1021 }
1022 }
1023 } else {
1024 relay_log::error!(
1025 "invalid item of type {} passed to ProcessMetrics",
1026 item.ty()
1027 );
1028 }
1029 }
1030 buckets
1031 }
1032}
1033
1034#[derive(Debug)]
1035pub struct ProcessBatchedMetrics {
1036 pub payload: Bytes,
1038 pub source: BucketSource,
1040 pub received_at: DateTime<Utc>,
1042 pub sent_at: Option<DateTime<Utc>>,
1044}
1045
1046#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1048pub enum BucketSource {
1049 Internal,
1055 External,
1060}
1061
1062impl BucketSource {
1063 pub fn from_meta(meta: &RequestMeta) -> Self {
1065 match meta.request_trust() {
1066 RequestTrust::Trusted => Self::Internal,
1067 RequestTrust::Untrusted => Self::External,
1068 }
1069 }
1070}
1071
1072#[derive(Debug)]
1074pub struct SubmitClientReports {
1075 pub client_reports: Vec<ClientReport>,
1077 pub scoping: Scoping,
1079}
1080
1081#[derive(Debug)]
1083pub enum EnvelopeProcessor {
1084 ProcessEnvelope(Box<ProcessEnvelope>),
1085 ProcessProjectMetrics(Box<ProcessMetrics>),
1086 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1087 FlushBuckets(Box<FlushBuckets>),
1088 SubmitClientReports(Box<SubmitClientReports>),
1089}
1090
1091impl EnvelopeProcessor {
1092 pub fn variant(&self) -> &'static str {
1094 match self {
1095 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1096 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1097 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1098 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1099 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1100 }
1101 }
1102}
1103
1104impl relay_system::Interface for EnvelopeProcessor {}
1105
1106impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1107 type Response = relay_system::NoResponse;
1108
1109 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1110 Self::ProcessEnvelope(Box::new(message))
1111 }
1112}
1113
1114impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1115 type Response = NoResponse;
1116
1117 fn from_message(message: ProcessMetrics, _: ()) -> Self {
1118 Self::ProcessProjectMetrics(Box::new(message))
1119 }
1120}
1121
1122impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1123 type Response = NoResponse;
1124
1125 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1126 Self::ProcessBatchedMetrics(Box::new(message))
1127 }
1128}
1129
1130impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1131 type Response = NoResponse;
1132
1133 fn from_message(message: FlushBuckets, _: ()) -> Self {
1134 Self::FlushBuckets(Box::new(message))
1135 }
1136}
1137
1138impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1139 type Response = NoResponse;
1140
1141 fn from_message(message: SubmitClientReports, _: ()) -> Self {
1142 Self::SubmitClientReports(Box::new(message))
1143 }
1144}
1145
1146pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1148
1149#[derive(Clone)]
1153pub struct EnvelopeProcessorService {
1154 inner: Arc<InnerProcessor>,
1155}
1156
1157pub struct Addrs {
1159 pub outcome_aggregator: Addr<TrackOutcome>,
1160 pub upstream_relay: Addr<UpstreamRelay>,
1161 #[cfg(feature = "processing")]
1162 pub objectstore: Option<Addr<Objectstore>>,
1163 #[cfg(feature = "processing")]
1164 pub store_forwarder: Option<Addr<Store>>,
1165 pub aggregator: Addr<Aggregator>,
1166}
1167
1168impl Default for Addrs {
1169 fn default() -> Self {
1170 Addrs {
1171 outcome_aggregator: Addr::dummy(),
1172 upstream_relay: Addr::dummy(),
1173 #[cfg(feature = "processing")]
1174 objectstore: None,
1175 #[cfg(feature = "processing")]
1176 store_forwarder: None,
1177 aggregator: Addr::dummy(),
1178 }
1179 }
1180}
1181
1182struct InnerProcessor {
1183 pool: EnvelopeProcessorServicePool,
1184 config: Arc<Config>,
1185 global_config: GlobalConfigHandle,
1186 project_cache: ProjectCacheHandle,
1187 cogs: Cogs,
1188 addrs: Addrs,
1189 #[cfg(feature = "processing")]
1190 rate_limiter: Option<Arc<RedisRateLimiter>>,
1191 geoip_lookup: GeoIpLookup,
1192 #[cfg(feature = "processing")]
1193 cardinality_limiter: Option<CardinalityLimiter>,
1194 metric_outcomes: MetricOutcomes,
1195 processing: Processing,
1196}
1197
1198struct Processing {
1199 errors: ErrorsProcessor,
1200 logs: LogsProcessor,
1201 trace_metrics: TraceMetricsProcessor,
1202 spans: SpansProcessor,
1203 check_ins: CheckInsProcessor,
1204 sessions: SessionsProcessor,
1205 transactions: TransactionProcessor,
1206 profile_chunks: ProfileChunksProcessor,
1207 trace_attachments: TraceAttachmentsProcessor,
1208 replays: ReplaysProcessor,
1209 client_reports: ClientReportsProcessor,
1210 attachments: AttachmentProcessor,
1211 user_reports: UserReportsProcessor,
1212 profiles: ProfilesProcessor,
1213}
1214
1215impl EnvelopeProcessorService {
1216 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1218 pub fn new(
1219 pool: EnvelopeProcessorServicePool,
1220 config: Arc<Config>,
1221 global_config: GlobalConfigHandle,
1222 project_cache: ProjectCacheHandle,
1223 cogs: Cogs,
1224 #[cfg(feature = "processing")] redis: Option<RedisClients>,
1225 addrs: Addrs,
1226 metric_outcomes: MetricOutcomes,
1227 ) -> Self {
1228 let geoip_lookup = config
1229 .geoip_path()
1230 .and_then(
1231 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1232 Ok(geoip) => Some(geoip),
1233 Err(err) => {
1234 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1235 None
1236 }
1237 },
1238 )
1239 .unwrap_or_else(GeoIpLookup::empty);
1240
1241 #[cfg(feature = "processing")]
1242 let (cardinality, quotas) = match redis {
1243 Some(RedisClients {
1244 cardinality,
1245 quotas,
1246 ..
1247 }) => (Some(cardinality), Some(quotas)),
1248 None => (None, None),
1249 };
1250 #[cfg(not(feature = "processing"))]
1251 let quotas = None;
1252
1253 #[cfg(feature = "processing")]
1254 let rate_limiter = quotas.clone().map(|redis| {
1255 RedisRateLimiter::new(redis)
1256 .max_limit(config.max_rate_limit())
1257 .cache(config.quota_cache_ratio(), config.quota_cache_max())
1258 });
1259
1260 let quota_limiter = Arc::new(QuotaRateLimiter::new(
1261 #[cfg(feature = "processing")]
1262 project_cache.clone(),
1263 #[cfg(feature = "processing")]
1264 rate_limiter.clone(),
1265 ));
1266 #[cfg(feature = "processing")]
1267 let rate_limiter = rate_limiter.map(Arc::new);
1268 let outcome_aggregator = addrs.outcome_aggregator.clone();
1269 let inner = InnerProcessor {
1270 pool,
1271 global_config,
1272 project_cache,
1273 cogs,
1274 #[cfg(feature = "processing")]
1275 rate_limiter,
1276 addrs,
1277 #[cfg(feature = "processing")]
1278 cardinality_limiter: cardinality
1279 .map(|cardinality| {
1280 RedisSetLimiter::new(
1281 RedisSetLimiterOptions {
1282 cache_vacuum_interval: config
1283 .cardinality_limiter_cache_vacuum_interval(),
1284 },
1285 cardinality,
1286 )
1287 })
1288 .map(CardinalityLimiter::new),
1289 metric_outcomes,
1290 processing: Processing {
1291 errors: ErrorsProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1292 logs: LogsProcessor::new(Arc::clone("a_limiter)),
1293 trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)),
1294 spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1295 check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)),
1296 sessions: SessionsProcessor::new(Arc::clone("a_limiter)),
1297 transactions: TransactionProcessor::new(
1298 Arc::clone("a_limiter),
1299 geoip_lookup.clone(),
1300 quotas.clone(),
1301 ),
1302 profile_chunks: ProfileChunksProcessor::new(Arc::clone("a_limiter)),
1303 trace_attachments: TraceAttachmentsProcessor::new(Arc::clone("a_limiter)),
1304 replays: ReplaysProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1305 client_reports: ClientReportsProcessor::new(outcome_aggregator),
1306 attachments: AttachmentProcessor::new(Arc::clone("a_limiter)),
1307 user_reports: UserReportsProcessor::new(Arc::clone("a_limiter)),
1308 profiles: ProfilesProcessor::new(quota_limiter),
1309 },
1310 geoip_lookup,
1311 config,
1312 };
1313
1314 Self {
1315 inner: Arc::new(inner),
1316 }
1317 }
1318
1319 async fn enforce_quotas<Group>(
1320 &self,
1321 managed_envelope: &mut TypedEnvelope<Group>,
1322 event: Annotated<Event>,
1323 extracted_metrics: &mut ProcessingExtractedMetrics,
1324 ctx: processing::Context<'_>,
1325 ) -> Result<Annotated<Event>, ProcessingError> {
1326 let cached_result = RateLimiter::Cached
1329 .enforce(managed_envelope, event, extracted_metrics, ctx)
1330 .await?;
1331
1332 if_processing!(self.inner.config, {
1333 let rate_limiter = match self.inner.rate_limiter.clone() {
1334 Some(rate_limiter) => rate_limiter,
1335 None => return Ok(cached_result.event),
1336 };
1337
1338 let consistent_result = RateLimiter::Consistent(rate_limiter)
1340 .enforce(
1341 managed_envelope,
1342 cached_result.event,
1343 extracted_metrics,
1344 ctx
1345 )
1346 .await?;
1347
1348 if !consistent_result.rate_limits.is_empty() {
1350 self.inner
1351 .project_cache
1352 .get(managed_envelope.scoping().project_key)
1353 .rate_limits()
1354 .merge(consistent_result.rate_limits);
1355 }
1356
1357 Ok(consistent_result.event)
1358 } else { Ok(cached_result.event) })
1359 }
1360
1361 async fn process_errors(
1363 &self,
1364 managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1365 project_id: ProjectId,
1366 mut ctx: processing::Context<'_>,
1367 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1368 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1369 let mut metrics = Metrics::default();
1370 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1371
1372 report::process_user_reports(managed_envelope);
1374
1375 if_processing!(self.inner.config, {
1376 unreal::expand(managed_envelope, &self.inner.config)?;
1377 #[cfg(sentry)]
1378 playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1379 nnswitch::expand(managed_envelope)?;
1380 });
1381
1382 let mut event = event::extract(
1383 managed_envelope,
1384 &mut metrics,
1385 event_fully_normalized,
1386 &self.inner.config,
1387 )?;
1388
1389 if_processing!(self.inner.config, {
1390 if let Some(inner_event_fully_normalized) =
1391 unreal::process(managed_envelope, &mut event)?
1392 {
1393 event_fully_normalized = inner_event_fully_normalized;
1394 }
1395 #[cfg(sentry)]
1396 if let Some(inner_event_fully_normalized) =
1397 playstation::process(managed_envelope, &mut event, ctx.project_info)?
1398 {
1399 event_fully_normalized = inner_event_fully_normalized;
1400 }
1401 if let Some(inner_event_fully_normalized) =
1402 attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1403 {
1404 event_fully_normalized = inner_event_fully_normalized;
1405 }
1406 });
1407
1408 ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1409 managed_envelope,
1410 &mut event,
1411 ctx.project_info,
1412 ctx.sampling_project_info,
1413 );
1414
1415 let attachments = managed_envelope
1416 .envelope()
1417 .items()
1418 .filter(|item| item.attachment_type() == Some(AttachmentType::Attachment));
1419 processing::utils::event::finalize(
1420 managed_envelope.envelope().headers(),
1421 &mut event,
1422 attachments,
1423 &mut metrics,
1424 ctx.config,
1425 )?;
1426 event_fully_normalized = processing::utils::event::normalize(
1427 managed_envelope.envelope().headers(),
1428 &mut event,
1429 event_fully_normalized,
1430 project_id,
1431 ctx,
1432 &self.inner.geoip_lookup,
1433 )?;
1434 let filter_run =
1435 processing::utils::event::filter(managed_envelope.envelope().headers(), &event, ctx)
1436 .map_err(ProcessingError::EventFiltered)?;
1437
1438 if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1439 dynamic_sampling::tag_error_with_sampling_decision(
1440 managed_envelope,
1441 &mut event,
1442 ctx.sampling_project_info,
1443 &self.inner.config,
1444 )
1445 .await;
1446 }
1447
1448 event = self
1449 .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1450 .await?;
1451
1452 if event.value().is_some() {
1453 processing::utils::event::scrub(&mut event, ctx.project_info)?;
1454 event::serialize(
1455 managed_envelope,
1456 &mut event,
1457 event_fully_normalized,
1458 EventMetricsExtracted(false),
1459 SpansExtracted(false),
1460 )?;
1461 event::emit_feedback_metrics(managed_envelope.envelope());
1462 }
1463
1464 let attachments = managed_envelope
1465 .envelope_mut()
1466 .items_mut()
1467 .filter(|i| i.ty() == &ItemType::Attachment);
1468 processing::utils::attachments::scrub(attachments, ctx.project_info, None);
1469
1470 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1471 relay_log::error!(
1472 tags.project = %project_id,
1473 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1474 "ingested event without normalizing"
1475 );
1476 }
1477
1478 Ok(Some(extracted_metrics))
1479 }
1480
1481 async fn process_standalone(
1483 &self,
1484 managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1485 ctx: processing::Context<'_>,
1486 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1487 for item in managed_envelope.envelope().items() {
1488 let attachment_type_tag = match item.attachment_type() {
1489 Some(t) => &t.to_string(),
1490 None => "",
1491 };
1492 relay_statsd::metric!(
1493 counter(RelayCounters::StandaloneItem) += 1,
1494 processor = "old",
1495 item_type = item.ty().name(),
1496 attachment_type = attachment_type_tag,
1497 );
1498 }
1499
1500 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1501
1502 standalone::process(managed_envelope);
1503
1504 profile::filter(managed_envelope, ctx.config, ctx.project_info);
1505
1506 self.enforce_quotas(
1507 managed_envelope,
1508 Annotated::empty(),
1509 &mut extracted_metrics,
1510 ctx,
1511 )
1512 .await?;
1513
1514 report::process_user_reports(managed_envelope);
1515
1516 Ok(Some(extracted_metrics))
1517 }
1518
1519 async fn process_nel(
1520 &self,
1521 mut managed_envelope: ManagedEnvelope,
1522 ctx: processing::Context<'_>,
1523 ) -> Result<ProcessingResult, ProcessingError> {
1524 nel::convert_to_logs(&mut managed_envelope);
1525 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1526 .await
1527 }
1528
1529 async fn process_with_processor<P: processing::Processor>(
1530 &self,
1531 processor: &P,
1532 mut managed_envelope: ManagedEnvelope,
1533 ctx: processing::Context<'_>,
1534 ) -> Result<ProcessingResult, ProcessingError>
1535 where
1536 Outputs: From<P::Output>,
1537 {
1538 let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1539 debug_assert!(
1540 false,
1541 "there must be work for the {} processor",
1542 std::any::type_name::<P>(),
1543 );
1544 return Err(ProcessingError::ProcessingGroupMismatch);
1545 };
1546
1547 managed_envelope.update();
1548 match managed_envelope.envelope().is_empty() {
1549 true => managed_envelope.accept(),
1550 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1551 }
1552
1553 processor
1554 .process(work, ctx)
1555 .await
1556 .map_err(|err| {
1557 relay_log::debug!(
1558 error = &err as &dyn std::error::Error,
1559 "processing pipeline failed"
1560 );
1561 ProcessingError::ProcessingFailure
1562 })
1563 .map(|o| o.map(Into::into))
1564 .map(ProcessingResult::Output)
1565 }
1566
1567 async fn process_standalone_spans(
1571 &self,
1572 managed_envelope: &mut TypedEnvelope<SpanGroup>,
1573 _project_id: ProjectId,
1574 ctx: processing::Context<'_>,
1575 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1576 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1577
1578 span::filter(managed_envelope, ctx.config, ctx.project_info);
1579
1580 if_processing!(self.inner.config, {
1581 span::process(
1582 managed_envelope,
1583 &mut Annotated::empty(),
1584 &mut extracted_metrics,
1585 _project_id,
1586 ctx,
1587 &self.inner.geoip_lookup,
1588 )
1589 .await;
1590 });
1591
1592 self.enforce_quotas(
1593 managed_envelope,
1594 Annotated::empty(),
1595 &mut extracted_metrics,
1596 ctx,
1597 )
1598 .await?;
1599
1600 Ok(Some(extracted_metrics))
1601 }
1602
1603 async fn process_envelope(
1604 &self,
1605 project_id: ProjectId,
1606 message: ProcessEnvelopeGrouped<'_>,
1607 ) -> Result<ProcessingResult, ProcessingError> {
1608 let ProcessEnvelopeGrouped {
1609 group,
1610 envelope: mut managed_envelope,
1611 ctx,
1612 } = message;
1613
1614 if let Some(sampling_state) = ctx.sampling_project_info {
1616 managed_envelope
1619 .envelope_mut()
1620 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1621 }
1622
1623 if let Some(retention) = ctx.project_info.config.event_retention {
1626 managed_envelope.envelope_mut().set_retention(retention);
1627 }
1628
1629 if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1632 managed_envelope
1633 .envelope_mut()
1634 .set_downsampled_retention(retention);
1635 }
1636
1637 managed_envelope
1642 .envelope_mut()
1643 .meta_mut()
1644 .set_project_id(project_id);
1645
1646 macro_rules! run {
1647 ($fn_name:ident $(, $args:expr)*) => {
1648 async {
1649 let mut managed_envelope = (managed_envelope, group).try_into()?;
1650 match self.$fn_name(&mut managed_envelope, $($args),*).await {
1651 Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1652 managed_envelope: managed_envelope.into_processed(),
1653 extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1654 }),
1655 Err(error) => {
1656 relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1657 if let Some(outcome) = error.to_outcome() {
1658 managed_envelope.reject(outcome);
1659 }
1660
1661 return Err(error);
1662 }
1663 }
1664 }.await
1665 };
1666 }
1667
1668 relay_log::trace!("Processing {group} group", group = group.variant());
1669
1670 match group {
1671 ProcessingGroup::Error => {
1672 if ctx.project_info.has_feature(Feature::NewErrorProcessing) {
1673 self.process_with_processor(
1674 &self.inner.processing.errors,
1675 managed_envelope,
1676 ctx,
1677 )
1678 .await
1679 } else {
1680 run!(process_errors, project_id, ctx)
1681 }
1682 }
1683 ProcessingGroup::Transaction => {
1684 self.process_with_processor(
1685 &self.inner.processing.transactions,
1686 managed_envelope,
1687 ctx,
1688 )
1689 .await
1690 }
1691 ProcessingGroup::Session => {
1692 self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1693 .await
1694 }
1695 ProcessingGroup::Standalone => run!(process_standalone, ctx),
1696 ProcessingGroup::StandaloneAttachments => {
1697 self.process_with_processor(
1698 &self.inner.processing.attachments,
1699 managed_envelope,
1700 ctx,
1701 )
1702 .await
1703 }
1704 ProcessingGroup::StandaloneUserReports => {
1705 self.process_with_processor(
1706 &self.inner.processing.user_reports,
1707 managed_envelope,
1708 ctx,
1709 )
1710 .await
1711 }
1712 ProcessingGroup::StandaloneProfiles => {
1713 self.process_with_processor(&self.inner.processing.profiles, managed_envelope, ctx)
1714 .await
1715 }
1716 ProcessingGroup::ClientReport => {
1717 self.process_with_processor(
1718 &self.inner.processing.client_reports,
1719 managed_envelope,
1720 ctx,
1721 )
1722 .await
1723 }
1724 ProcessingGroup::Replay => {
1725 self.process_with_processor(&self.inner.processing.replays, managed_envelope, ctx)
1726 .await
1727 }
1728 ProcessingGroup::CheckIn => {
1729 self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1730 .await
1731 }
1732 ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1733 ProcessingGroup::Log => {
1734 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1735 .await
1736 }
1737 ProcessingGroup::TraceMetric => {
1738 self.process_with_processor(
1739 &self.inner.processing.trace_metrics,
1740 managed_envelope,
1741 ctx,
1742 )
1743 .await
1744 }
1745 ProcessingGroup::SpanV2 => {
1746 self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1747 .await
1748 }
1749 ProcessingGroup::TraceAttachment => {
1750 self.process_with_processor(
1751 &self.inner.processing.trace_attachments,
1752 managed_envelope,
1753 ctx,
1754 )
1755 .await
1756 }
1757 ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1758 ProcessingGroup::ProfileChunk => {
1759 self.process_with_processor(
1760 &self.inner.processing.profile_chunks,
1761 managed_envelope,
1762 ctx,
1763 )
1764 .await
1765 }
1766 ProcessingGroup::Metrics => {
1768 if self.inner.config.relay_mode() != RelayMode::Proxy {
1771 relay_log::error!(
1772 tags.project = %project_id,
1773 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1774 "received metrics in the process_state"
1775 );
1776 }
1777
1778 Ok(ProcessingResult::no_metrics(
1779 managed_envelope.into_processed(),
1780 ))
1781 }
1782 ProcessingGroup::Ungrouped => {
1784 relay_log::error!(
1785 tags.project = %project_id,
1786 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1787 "could not identify the processing group based on the envelope's items"
1788 );
1789
1790 Ok(ProcessingResult::no_metrics(
1791 managed_envelope.into_processed(),
1792 ))
1793 }
1794 ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
1798 managed_envelope.into_processed(),
1799 )),
1800 }
1801 }
1802
1803 async fn process<'a>(
1809 &self,
1810 mut message: ProcessEnvelopeGrouped<'a>,
1811 ) -> Result<Option<Submit<'a>>, ProcessingError> {
1812 let ProcessEnvelopeGrouped {
1813 ref mut envelope,
1814 ctx,
1815 ..
1816 } = message;
1817
1818 let Some(project_id) = ctx
1825 .project_info
1826 .project_id
1827 .or_else(|| envelope.envelope().meta().project_id())
1828 else {
1829 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1830 return Err(ProcessingError::MissingProjectId);
1831 };
1832
1833 let client = envelope.envelope().meta().client().map(str::to_owned);
1834 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1835 let project_key = envelope.envelope().meta().public_key();
1836 let sampling_key = envelope
1840 .envelope()
1841 .sampling_key()
1842 .filter(|_| ctx.sampling_project_info.is_some());
1843
1844 relay_log::configure_scope(|scope| {
1847 scope.set_tag("project", project_id);
1848 if let Some(client) = client {
1849 scope.set_tag("sdk", client);
1850 }
1851 if let Some(user_agent) = user_agent {
1852 scope.set_extra("user_agent", user_agent.into());
1853 }
1854 });
1855
1856 let result = match self.process_envelope(project_id, message).await {
1857 Ok(ProcessingResult::Envelope {
1858 mut managed_envelope,
1859 extracted_metrics,
1860 }) => {
1861 managed_envelope.update();
1864
1865 let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
1866 send_metrics(
1867 extracted_metrics.metrics,
1868 project_key,
1869 sampling_key,
1870 &self.inner.addrs.aggregator,
1871 );
1872
1873 let envelope_response = if managed_envelope.envelope().is_empty() {
1874 if !has_metrics {
1875 managed_envelope.reject(Outcome::RateLimited(None));
1877 } else {
1878 managed_envelope.accept();
1879 }
1880
1881 None
1882 } else {
1883 Some(managed_envelope)
1884 };
1885
1886 Ok(envelope_response.map(Submit::Envelope))
1887 }
1888 Ok(ProcessingResult::Output(Output { main, metrics })) => {
1889 if let Some(metrics) = metrics {
1890 metrics.accept(|metrics| {
1891 send_metrics(
1892 metrics,
1893 project_key,
1894 sampling_key,
1895 &self.inner.addrs.aggregator,
1896 );
1897 });
1898 }
1899
1900 let ctx = ctx.to_forward();
1901 Ok(main.map(|output| Submit::Output { output, ctx }))
1902 }
1903 Err(err) => Err(err),
1904 };
1905
1906 relay_log::configure_scope(|scope| {
1907 scope.remove_tag("project");
1908 scope.remove_tag("sdk");
1909 scope.remove_tag("user_agent");
1910 });
1911
1912 result
1913 }
1914
1915 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1916 let project_key = message.envelope.envelope().meta().public_key();
1917 let wait_time = message.envelope.age();
1918 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1919
1920 cogs.cancel();
1923
1924 let scoping = message.envelope.scoping();
1925 for (group, envelope) in ProcessingGroup::split_envelope(
1926 *message.envelope.into_envelope(),
1927 &message.project_info,
1928 ) {
1929 let mut cogs = self
1930 .inner
1931 .cogs
1932 .timed(ResourceId::Relay, AppFeature::from(group));
1933
1934 let mut envelope =
1935 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1936 envelope.scope(scoping);
1937
1938 let global_config = self.inner.global_config.current();
1939
1940 let ctx = processing::Context {
1941 config: &self.inner.config,
1942 global_config: &global_config,
1943 project_info: &message.project_info,
1944 sampling_project_info: message.sampling_project_info.as_deref(),
1945 rate_limits: &message.rate_limits,
1946 reservoir_counters: &message.reservoir_counters,
1947 };
1948
1949 let message = ProcessEnvelopeGrouped {
1950 group,
1951 envelope,
1952 ctx,
1953 };
1954
1955 let result = metric!(
1956 timer(RelayTimers::EnvelopeProcessingTime),
1957 group = group.variant(),
1958 { self.process(message).await }
1959 );
1960
1961 match result {
1962 Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
1963 Ok(None) => {}
1964 Err(error) if error.is_unexpected() => {
1965 relay_log::error!(
1966 tags.project_key = %project_key,
1967 error = &error as &dyn Error,
1968 "error processing envelope"
1969 )
1970 }
1971 Err(error) => {
1972 relay_log::debug!(
1973 tags.project_key = %project_key,
1974 error = &error as &dyn Error,
1975 "error processing envelope"
1976 )
1977 }
1978 }
1979 }
1980 }
1981
1982 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1983 let ProcessMetrics {
1984 data,
1985 project_key,
1986 received_at,
1987 sent_at,
1988 source,
1989 } = message;
1990
1991 let received_timestamp =
1992 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1993
1994 let mut buckets = data.into_buckets(received_timestamp);
1995 if buckets.is_empty() {
1996 return;
1997 };
1998 cogs.update(relay_metrics::cogs::BySize(&buckets));
1999
2000 let clock_drift_processor =
2001 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2002
2003 buckets.retain_mut(|bucket| {
2004 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2005 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2006 return false;
2007 }
2008
2009 if !self::metrics::is_valid_namespace(bucket) {
2010 return false;
2011 }
2012
2013 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2014
2015 if !matches!(source, BucketSource::Internal) {
2016 bucket.metadata = BucketMetadata::new(received_timestamp);
2017 }
2018
2019 true
2020 });
2021
2022 let project = self.inner.project_cache.get(project_key);
2023
2024 let buckets = match project.state() {
2027 ProjectState::Enabled(project_info) => {
2028 let rate_limits = project.rate_limits().current_limits();
2029 self.check_buckets(project_key, project_info, &rate_limits, buckets)
2030 }
2031 _ => buckets,
2032 };
2033
2034 relay_log::trace!("merging metric buckets into the aggregator");
2035 self.inner
2036 .addrs
2037 .aggregator
2038 .send(MergeBuckets::new(project_key, buckets));
2039 }
2040
2041 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2042 let ProcessBatchedMetrics {
2043 payload,
2044 source,
2045 received_at,
2046 sent_at,
2047 } = message;
2048
2049 #[derive(serde::Deserialize)]
2050 struct Wrapper {
2051 buckets: HashMap<ProjectKey, Vec<Bucket>>,
2052 }
2053
2054 let buckets = match serde_json::from_slice(&payload) {
2055 Ok(Wrapper { buckets }) => buckets,
2056 Err(error) => {
2057 relay_log::debug!(
2058 error = &error as &dyn Error,
2059 "failed to parse batched metrics",
2060 );
2061 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2062 return;
2063 }
2064 };
2065
2066 for (project_key, buckets) in buckets {
2067 self.handle_process_metrics(
2068 cogs,
2069 ProcessMetrics {
2070 data: MetricData::Parsed(buckets),
2071 project_key,
2072 source,
2073 received_at,
2074 sent_at,
2075 },
2076 )
2077 }
2078 }
2079
2080 fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2081 let _submit = cogs.start_category("submit");
2082
2083 #[cfg(feature = "processing")]
2084 if self.inner.config.processing_enabled()
2085 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2086 {
2087 use crate::processing::StoreHandle;
2088
2089 let objectstore = self.inner.addrs.objectstore.as_ref();
2090 let global_config = &self.inner.global_config.current();
2091 let handle = StoreHandle::new(store_forwarder, objectstore, global_config);
2092
2093 match submit {
2094 Submit::Envelope(envelope) => handle.send_envelope(envelope.into_inner()),
2100 Submit::Output { output, ctx } => output
2101 .forward_store(handle, ctx)
2102 .unwrap_or_else(|err| err.into_inner()),
2103 }
2104 return;
2105 }
2106
2107 let mut envelope = match submit {
2108 Submit::Envelope(envelope) => envelope,
2109 Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2110 Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2111 Err(_) => {
2112 relay_log::error!("failed to serialize output to an envelope");
2113 return;
2114 }
2115 },
2116 };
2117
2118 if envelope.envelope_mut().is_empty() {
2119 envelope.accept();
2120 return;
2121 }
2122
2123 envelope.envelope_mut().set_sent_at(Utc::now());
2129
2130 relay_log::trace!("sending envelope to sentry endpoint");
2131 let http_encoding = self.inner.config.http_encoding();
2132 let result = envelope.envelope().to_vec().and_then(|v| {
2133 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2134 });
2135
2136 match result {
2137 Ok(body) => {
2138 self.inner
2139 .addrs
2140 .upstream_relay
2141 .send(SendRequest(SendEnvelope {
2142 envelope,
2143 body,
2144 http_encoding,
2145 project_cache: self.inner.project_cache.clone(),
2146 }));
2147 }
2148 Err(error) => {
2149 relay_log::error!(
2152 error = &error as &dyn Error,
2153 tags.project_key = %envelope.scoping().project_key,
2154 "failed to serialize envelope payload"
2155 );
2156
2157 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2158 }
2159 }
2160 }
2161
2162 fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2163 let SubmitClientReports {
2164 client_reports,
2165 scoping,
2166 } = message;
2167
2168 let upstream = self.inner.config.upstream_descriptor();
2169 let dsn = PartialDsn::outbound(&scoping, upstream);
2170
2171 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2172 for client_report in client_reports {
2173 match client_report.serialize() {
2174 Ok(payload) => {
2175 let mut item = Item::new(ItemType::ClientReport);
2176 item.set_payload(ContentType::Json, payload);
2177 envelope.add_item(item);
2178 }
2179 Err(error) => {
2180 relay_log::error!(
2181 error = &error as &dyn std::error::Error,
2182 "failed to serialize client report"
2183 );
2184 }
2185 }
2186 }
2187
2188 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2189 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2190 }
2191
2192 fn check_buckets(
2193 &self,
2194 project_key: ProjectKey,
2195 project_info: &ProjectInfo,
2196 rate_limits: &RateLimits,
2197 buckets: Vec<Bucket>,
2198 ) -> Vec<Bucket> {
2199 let Some(scoping) = project_info.scoping(project_key) else {
2200 relay_log::error!(
2201 tags.project_key = project_key.as_str(),
2202 "there is no scoping: dropping {} buckets",
2203 buckets.len(),
2204 );
2205 return Vec::new();
2206 };
2207
2208 let mut buckets = self::metrics::apply_project_info(
2209 buckets,
2210 &self.inner.metric_outcomes,
2211 project_info,
2212 scoping,
2213 );
2214
2215 let namespaces: BTreeSet<MetricNamespace> = buckets
2216 .iter()
2217 .filter_map(|bucket| bucket.name.try_namespace())
2218 .collect();
2219
2220 for namespace in namespaces {
2221 let limits = rate_limits.check_with_quotas(
2222 project_info.get_quotas(),
2223 scoping.item(DataCategory::MetricBucket),
2224 );
2225
2226 if limits.is_limited() {
2227 let rejected;
2228 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2229 bucket.name.try_namespace() == Some(namespace)
2230 });
2231
2232 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2233 self.inner.metric_outcomes.track(
2234 scoping,
2235 &rejected,
2236 Outcome::RateLimited(reason_code),
2237 );
2238 }
2239 }
2240
2241 let quotas = project_info.config.quotas.clone();
2242 match MetricsLimiter::create(buckets, quotas, scoping) {
2243 Ok(mut bucket_limiter) => {
2244 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2245 bucket_limiter.into_buckets()
2246 }
2247 Err(buckets) => buckets,
2248 }
2249 }
2250
2251 #[cfg(feature = "processing")]
2252 async fn rate_limit_buckets(
2253 &self,
2254 scoping: Scoping,
2255 project_info: &ProjectInfo,
2256 mut buckets: Vec<Bucket>,
2257 ) -> Vec<Bucket> {
2258 let Some(rate_limiter) = &self.inner.rate_limiter else {
2259 return buckets;
2260 };
2261
2262 let global_config = self.inner.global_config.current();
2263 let namespaces = buckets
2264 .iter()
2265 .filter_map(|bucket| bucket.name.try_namespace())
2266 .counts();
2267
2268 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2269
2270 for (namespace, quantity) in namespaces {
2271 let item_scoping = scoping.metric_bucket(namespace);
2272
2273 let limits = match rate_limiter
2274 .is_rate_limited(quotas, item_scoping, quantity, false)
2275 .await
2276 {
2277 Ok(limits) => limits,
2278 Err(err) => {
2279 relay_log::error!(
2280 error = &err as &dyn std::error::Error,
2281 "failed to check redis rate limits"
2282 );
2283 break;
2284 }
2285 };
2286
2287 if limits.is_limited() {
2288 let rejected;
2289 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2290 bucket.name.try_namespace() == Some(namespace)
2291 });
2292
2293 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2294 self.inner.metric_outcomes.track(
2295 scoping,
2296 &rejected,
2297 Outcome::RateLimited(reason_code),
2298 );
2299
2300 self.inner
2301 .project_cache
2302 .get(item_scoping.scoping.project_key)
2303 .rate_limits()
2304 .merge(limits);
2305 }
2306 }
2307
2308 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2309 Err(buckets) => buckets,
2310 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2311 }
2312 }
2313
2314 #[cfg(feature = "processing")]
2316 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2317 relay_log::trace!("handle_rate_limit_buckets");
2318
2319 let scoping = *bucket_limiter.scoping();
2320
2321 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2322 let global_config = self.inner.global_config.current();
2323 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2324
2325 let over_accept_once = true;
2328 let mut rate_limits = RateLimits::new();
2329
2330 for category in [DataCategory::Transaction, DataCategory::Span] {
2331 let count = bucket_limiter.count(category);
2332
2333 let timer = Instant::now();
2334 let mut is_limited = false;
2335
2336 if let Some(count) = count {
2337 match rate_limiter
2338 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2339 .await
2340 {
2341 Ok(limits) => {
2342 is_limited = limits.is_limited();
2343 rate_limits.merge(limits)
2344 }
2345 Err(e) => {
2346 relay_log::error!(error = &e as &dyn Error, "rate limiting error")
2347 }
2348 }
2349 }
2350
2351 relay_statsd::metric!(
2352 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2353 category = category.name(),
2354 limited = if is_limited { "true" } else { "false" },
2355 count = match count {
2356 None => "none",
2357 Some(0) => "0",
2358 Some(1) => "1",
2359 Some(1..=10) => "10",
2360 Some(1..=25) => "25",
2361 Some(1..=50) => "50",
2362 Some(51..=100) => "100",
2363 Some(101..=500) => "500",
2364 _ => "> 500",
2365 },
2366 );
2367 }
2368
2369 if rate_limits.is_limited() {
2370 let was_enforced =
2371 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2372
2373 if was_enforced {
2374 self.inner
2376 .project_cache
2377 .get(scoping.project_key)
2378 .rate_limits()
2379 .merge(rate_limits);
2380 }
2381 }
2382 }
2383
2384 bucket_limiter.into_buckets()
2385 }
2386
2387 #[cfg(feature = "processing")]
2389 async fn cardinality_limit_buckets(
2390 &self,
2391 scoping: Scoping,
2392 limits: &[CardinalityLimit],
2393 buckets: Vec<Bucket>,
2394 ) -> Vec<Bucket> {
2395 let global_config = self.inner.global_config.current();
2396 let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2397
2398 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2399 return buckets;
2400 }
2401
2402 let Some(ref limiter) = self.inner.cardinality_limiter else {
2403 return buckets;
2404 };
2405
2406 let scope = relay_cardinality::Scoping {
2407 organization_id: scoping.organization_id,
2408 project_id: scoping.project_id,
2409 };
2410
2411 let limits = match limiter
2412 .check_cardinality_limits(scope, limits, buckets)
2413 .await
2414 {
2415 Ok(limits) => limits,
2416 Err((buckets, error)) => {
2417 relay_log::error!(
2418 error = &error as &dyn std::error::Error,
2419 "cardinality limiter failed"
2420 );
2421 return buckets;
2422 }
2423 };
2424
2425 let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2426 if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2427 for limit in limits.exceeded_limits() {
2428 relay_log::with_scope(
2429 |scope| {
2430 scope.set_user(Some(relay_log::sentry::User {
2432 id: Some(scoping.organization_id.to_string()),
2433 ..Default::default()
2434 }));
2435 },
2436 || {
2437 relay_log::error!(
2438 tags.organization_id = scoping.organization_id.value(),
2439 tags.limit_id = limit.id,
2440 tags.passive = limit.passive,
2441 "Cardinality Limit"
2442 );
2443 },
2444 );
2445 }
2446 }
2447
2448 for (limit, reports) in limits.cardinality_reports() {
2449 for report in reports {
2450 self.inner
2451 .metric_outcomes
2452 .cardinality(scoping, limit, report);
2453 }
2454 }
2455
2456 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2457 return limits.into_source();
2458 }
2459
2460 let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2461
2462 for (bucket, exceeded) in rejected {
2463 self.inner.metric_outcomes.track(
2464 scoping,
2465 &[bucket],
2466 Outcome::CardinalityLimited(exceeded.id.clone()),
2467 );
2468 }
2469 accepted
2470 }
2471
2472 #[cfg(feature = "processing")]
2479 async fn encode_metrics_processing(
2480 &self,
2481 message: FlushBuckets,
2482 store_forwarder: &Addr<Store>,
2483 ) {
2484 use crate::constants::DEFAULT_EVENT_RETENTION;
2485 use crate::services::store::StoreMetrics;
2486
2487 for ProjectBuckets {
2488 buckets,
2489 scoping,
2490 project_info,
2491 ..
2492 } in message.buckets.into_values()
2493 {
2494 let buckets = self
2495 .rate_limit_buckets(scoping, &project_info, buckets)
2496 .await;
2497
2498 let limits = project_info.get_cardinality_limits();
2499 let buckets = self
2500 .cardinality_limit_buckets(scoping, limits, buckets)
2501 .await;
2502
2503 if buckets.is_empty() {
2504 continue;
2505 }
2506
2507 let retention = project_info
2508 .config
2509 .event_retention
2510 .unwrap_or(DEFAULT_EVENT_RETENTION);
2511
2512 store_forwarder.send(StoreMetrics {
2515 buckets,
2516 scoping,
2517 retention,
2518 });
2519 }
2520 }
2521
2522 fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2534 let FlushBuckets {
2535 partition_key,
2536 buckets,
2537 } = message;
2538
2539 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2540 let upstream = self.inner.config.upstream_descriptor();
2541
2542 for ProjectBuckets {
2543 buckets, scoping, ..
2544 } in buckets.values()
2545 {
2546 let dsn = PartialDsn::outbound(scoping, upstream);
2547
2548 relay_statsd::metric!(
2549 distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2550 );
2551
2552 let mut num_batches = 0;
2553 for batch in BucketsView::from(buckets).by_size(batch_size) {
2554 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2555
2556 let mut item = Item::new(ItemType::MetricBuckets);
2557 item.set_source_quantities(crate::metrics::extract_quantities(batch));
2558 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2559 envelope.add_item(item);
2560
2561 let mut envelope =
2562 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2563 envelope
2564 .set_partition_key(Some(partition_key))
2565 .scope(*scoping);
2566
2567 relay_statsd::metric!(
2568 distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2569 );
2570
2571 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2572 num_batches += 1;
2573 }
2574
2575 relay_statsd::metric!(
2576 distribution(RelayDistributions::BatchesPerPartition) = num_batches
2577 );
2578 }
2579 }
2580
2581 fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2583 if partition.is_empty() {
2584 return;
2585 }
2586
2587 let (unencoded, project_info) = partition.take();
2588 let http_encoding = self.inner.config.http_encoding();
2589 let encoded = match encode_payload(&unencoded, http_encoding) {
2590 Ok(payload) => payload,
2591 Err(error) => {
2592 let error = &error as &dyn std::error::Error;
2593 relay_log::error!(error, "failed to encode metrics payload");
2594 return;
2595 }
2596 };
2597
2598 let request = SendMetricsRequest {
2599 partition_key: partition_key.to_string(),
2600 unencoded,
2601 encoded,
2602 project_info,
2603 http_encoding,
2604 metric_outcomes: self.inner.metric_outcomes.clone(),
2605 };
2606
2607 self.inner.addrs.upstream_relay.send(SendRequest(request));
2608 }
2609
2610 fn encode_metrics_global(&self, message: FlushBuckets) {
2625 let FlushBuckets {
2626 partition_key,
2627 buckets,
2628 } = message;
2629
2630 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2631 let mut partition = Partition::new(batch_size);
2632 let mut partition_splits = 0;
2633
2634 for ProjectBuckets {
2635 buckets, scoping, ..
2636 } in buckets.values()
2637 {
2638 for bucket in buckets {
2639 let mut remaining = Some(BucketView::new(bucket));
2640
2641 while let Some(bucket) = remaining.take() {
2642 if let Some(next) = partition.insert(bucket, *scoping) {
2643 self.send_global_partition(partition_key, &mut partition);
2647 remaining = Some(next);
2648 partition_splits += 1;
2649 }
2650 }
2651 }
2652 }
2653
2654 if partition_splits > 0 {
2655 metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2656 }
2657
2658 self.send_global_partition(partition_key, &mut partition);
2659 }
2660
2661 async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2662 for (project_key, pb) in message.buckets.iter_mut() {
2663 let buckets = std::mem::take(&mut pb.buckets);
2664 pb.buckets =
2665 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2666 }
2667
2668 #[cfg(feature = "processing")]
2669 if self.inner.config.processing_enabled()
2670 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2671 {
2672 return self
2673 .encode_metrics_processing(message, store_forwarder)
2674 .await;
2675 }
2676
2677 if self.inner.config.http_global_metrics() {
2678 self.encode_metrics_global(message)
2679 } else {
2680 self.encode_metrics_envelope(cogs, message)
2681 }
2682 }
2683
2684 #[cfg(all(test, feature = "processing"))]
2685 fn redis_rate_limiter_enabled(&self) -> bool {
2686 self.inner.rate_limiter.is_some()
2687 }
2688
2689 async fn handle_message(&self, message: EnvelopeProcessor) {
2690 let ty = message.variant();
2691 let feature_weights = self.feature_weights(&message);
2692
2693 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2694 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2695
2696 match message {
2697 EnvelopeProcessor::ProcessEnvelope(m) => {
2698 self.handle_process_envelope(&mut cogs, *m).await
2699 }
2700 EnvelopeProcessor::ProcessProjectMetrics(m) => {
2701 self.handle_process_metrics(&mut cogs, *m)
2702 }
2703 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2704 self.handle_process_batched_metrics(&mut cogs, *m)
2705 }
2706 EnvelopeProcessor::FlushBuckets(m) => {
2707 self.handle_flush_buckets(&mut cogs, *m).await
2708 }
2709 EnvelopeProcessor::SubmitClientReports(m) => {
2710 self.handle_submit_client_reports(&mut cogs, *m)
2711 }
2712 }
2713 });
2714 }
2715
2716 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2717 match message {
2718 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2720 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2721 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2722 EnvelopeProcessor::FlushBuckets(v) => v
2723 .buckets
2724 .values()
2725 .map(|s| {
2726 if self.inner.config.processing_enabled() {
2727 relay_metrics::cogs::ByCount(&s.buckets).into()
2730 } else {
2731 relay_metrics::cogs::BySize(&s.buckets).into()
2732 }
2733 })
2734 .fold(FeatureWeights::none(), FeatureWeights::merge),
2735 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2736 }
2737 }
2738}
2739
2740impl Service for EnvelopeProcessorService {
2741 type Interface = EnvelopeProcessor;
2742
2743 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2744 while let Some(message) = rx.recv().await {
2745 let service = self.clone();
2746 self.inner
2747 .pool
2748 .spawn_async(
2749 async move {
2750 service.handle_message(message).await;
2751 }
2752 .boxed(),
2753 )
2754 .await;
2755 }
2756 }
2757}
2758
2759struct EnforcementResult {
2764 event: Annotated<Event>,
2765 #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2766 rate_limits: RateLimits,
2767}
2768
2769impl EnforcementResult {
2770 pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
2772 Self { event, rate_limits }
2773 }
2774}
2775
2776#[derive(Clone)]
2777enum RateLimiter {
2778 Cached,
2779 #[cfg(feature = "processing")]
2780 Consistent(Arc<RedisRateLimiter>),
2781}
2782
2783impl RateLimiter {
2784 async fn enforce<Group>(
2785 &self,
2786 managed_envelope: &mut TypedEnvelope<Group>,
2787 event: Annotated<Event>,
2788 _extracted_metrics: &mut ProcessingExtractedMetrics,
2789 ctx: processing::Context<'_>,
2790 ) -> Result<EnforcementResult, ProcessingError> {
2791 if managed_envelope.envelope().is_empty() && event.value().is_none() {
2792 return Ok(EnforcementResult::new(event, RateLimits::default()));
2793 }
2794
2795 let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
2796 if quotas.is_empty() {
2797 return Ok(EnforcementResult::new(event, RateLimits::default()));
2798 }
2799
2800 let event_category = event_category(&event);
2801
2802 let this = self.clone();
2808 let mut envelope_limiter =
2809 EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
2810 let this = this.clone();
2811
2812 async move {
2813 match this {
2814 #[cfg(feature = "processing")]
2815 RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
2816 rate_limiter
2817 .is_rate_limited(quotas, item_scope, _quantity, false)
2818 .await?,
2819 ),
2820 _ => Ok::<_, ProcessingError>(
2821 ctx.rate_limits.check_with_quotas(quotas, item_scope),
2822 ),
2823 }
2824 }
2825 });
2826
2827 if let Some(category) = event_category {
2830 envelope_limiter.assume_event(category);
2831 }
2832
2833 let scoping = managed_envelope.scoping();
2834 let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
2835 envelope_limiter
2836 .compute(managed_envelope.envelope_mut(), &scoping)
2837 .await
2838 })?;
2839 let event_active = enforcement.is_event_active();
2840
2841 #[cfg(feature = "processing")]
2845 _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
2846 enforcement.apply_with_outcomes(managed_envelope);
2847
2848 if event_active {
2849 debug_assert!(managed_envelope.envelope().is_empty());
2850 return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
2851 }
2852
2853 Ok(EnforcementResult::new(event, rate_limits))
2854 }
2855
2856 fn name(&self) -> &'static str {
2857 match self {
2858 Self::Cached => "cached",
2859 #[cfg(feature = "processing")]
2860 Self::Consistent(_) => "consistent",
2861 }
2862 }
2863}
2864
2865pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2866 let envelope_body: Vec<u8> = match http_encoding {
2867 HttpEncoding::Identity => return Ok(body.clone()),
2868 HttpEncoding::Deflate => {
2869 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2870 encoder.write_all(body.as_ref())?;
2871 encoder.finish()?
2872 }
2873 HttpEncoding::Gzip => {
2874 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2875 encoder.write_all(body.as_ref())?;
2876 encoder.finish()?
2877 }
2878 HttpEncoding::Br => {
2879 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2881 encoder.write_all(body.as_ref())?;
2882 encoder.into_inner()
2883 }
2884 HttpEncoding::Zstd => {
2885 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2888 encoder.write_all(body.as_ref())?;
2889 encoder.finish()?
2890 }
2891 };
2892
2893 Ok(envelope_body.into())
2894}
2895
2896#[derive(Debug)]
2898pub struct SendEnvelope {
2899 pub envelope: TypedEnvelope<Processed>,
2900 pub body: Bytes,
2901 pub http_encoding: HttpEncoding,
2902 pub project_cache: ProjectCacheHandle,
2903}
2904
2905impl UpstreamRequest for SendEnvelope {
2906 fn method(&self) -> reqwest::Method {
2907 reqwest::Method::POST
2908 }
2909
2910 fn path(&self) -> Cow<'_, str> {
2911 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2912 }
2913
2914 fn route(&self) -> &'static str {
2915 "envelope"
2916 }
2917
2918 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2919 let envelope_body = self.body.clone();
2920 metric!(
2921 distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2922 );
2923
2924 let meta = &self.envelope.meta();
2925 let shard = self.envelope.partition_key().map(|p| p.to_string());
2926 builder
2927 .content_encoding(self.http_encoding)
2928 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2929 .header_opt("User-Agent", meta.user_agent())
2930 .header("X-Sentry-Auth", meta.auth_header())
2931 .header("X-Forwarded-For", meta.forwarded_for())
2932 .header("Content-Type", envelope::CONTENT_TYPE)
2933 .header_opt("X-Sentry-Relay-Shard", shard)
2934 .body(envelope_body);
2935
2936 Ok(())
2937 }
2938
2939 fn sign(&mut self) -> Option<Sign> {
2940 Some(Sign::Optional(SignatureType::RequestSign))
2941 }
2942
2943 fn respond(
2944 self: Box<Self>,
2945 result: Result<http::Response, UpstreamRequestError>,
2946 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2947 Box::pin(async move {
2948 let result = match result {
2949 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2950 Err(error) => Err(error),
2951 };
2952
2953 match result {
2954 Ok(()) => self.envelope.accept(),
2955 Err(error) if error.is_received() => {
2956 let scoping = self.envelope.scoping();
2957 self.envelope.accept();
2958
2959 if let UpstreamRequestError::RateLimited(limits) = error {
2960 self.project_cache
2961 .get(scoping.project_key)
2962 .rate_limits()
2963 .merge(limits.scope(&scoping));
2964 }
2965 }
2966 Err(error) => {
2967 let mut envelope = self.envelope;
2970 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2971 relay_log::error!(
2972 error = &error as &dyn Error,
2973 tags.project_key = %envelope.scoping().project_key,
2974 "error sending envelope"
2975 );
2976 }
2977 }
2978 })
2979 }
2980}
2981
2982#[derive(Debug)]
2989struct Partition<'a> {
2990 max_size: usize,
2991 remaining: usize,
2992 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2993 project_info: HashMap<ProjectKey, Scoping>,
2994}
2995
2996impl<'a> Partition<'a> {
2997 pub fn new(size: usize) -> Self {
2999 Self {
3000 max_size: size,
3001 remaining: size,
3002 views: HashMap::new(),
3003 project_info: HashMap::new(),
3004 }
3005 }
3006
3007 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3018 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3019
3020 if let Some(current) = current {
3021 self.remaining = self.remaining.saturating_sub(current.estimated_size());
3022 self.views
3023 .entry(scoping.project_key)
3024 .or_default()
3025 .push(current);
3026
3027 self.project_info
3028 .entry(scoping.project_key)
3029 .or_insert(scoping);
3030 }
3031
3032 next
3033 }
3034
3035 fn is_empty(&self) -> bool {
3037 self.views.is_empty()
3038 }
3039
3040 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3044 #[derive(serde::Serialize)]
3045 struct Wrapper<'a> {
3046 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3047 }
3048
3049 let buckets = &self.views;
3050 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3051
3052 let scopings = self.project_info.clone();
3053 self.project_info.clear();
3054
3055 self.views.clear();
3056 self.remaining = self.max_size;
3057
3058 (payload, scopings)
3059 }
3060}
3061
3062#[derive(Debug)]
3066struct SendMetricsRequest {
3067 partition_key: String,
3069 unencoded: Bytes,
3071 encoded: Bytes,
3073 project_info: HashMap<ProjectKey, Scoping>,
3077 http_encoding: HttpEncoding,
3079 metric_outcomes: MetricOutcomes,
3081}
3082
3083impl SendMetricsRequest {
3084 fn create_error_outcomes(self) {
3085 #[derive(serde::Deserialize)]
3086 struct Wrapper {
3087 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3088 }
3089
3090 let buckets = match serde_json::from_slice(&self.unencoded) {
3091 Ok(Wrapper { buckets }) => buckets,
3092 Err(err) => {
3093 relay_log::error!(
3094 error = &err as &dyn std::error::Error,
3095 "failed to parse buckets from failed transmission"
3096 );
3097 return;
3098 }
3099 };
3100
3101 for (key, buckets) in buckets {
3102 let Some(&scoping) = self.project_info.get(&key) else {
3103 relay_log::error!("missing scoping for project key");
3104 continue;
3105 };
3106
3107 self.metric_outcomes.track(
3108 scoping,
3109 &buckets,
3110 Outcome::Invalid(DiscardReason::Internal),
3111 );
3112 }
3113 }
3114}
3115
3116impl UpstreamRequest for SendMetricsRequest {
3117 fn set_relay_id(&self) -> bool {
3118 true
3119 }
3120
3121 fn sign(&mut self) -> Option<Sign> {
3122 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3123 }
3124
3125 fn method(&self) -> reqwest::Method {
3126 reqwest::Method::POST
3127 }
3128
3129 fn path(&self) -> Cow<'_, str> {
3130 "/api/0/relays/metrics/".into()
3131 }
3132
3133 fn route(&self) -> &'static str {
3134 "global_metrics"
3135 }
3136
3137 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3138 metric!(
3139 distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3140 );
3141
3142 builder
3143 .content_encoding(self.http_encoding)
3144 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3145 .header(header::CONTENT_TYPE, b"application/json")
3146 .body(self.encoded.clone());
3147
3148 Ok(())
3149 }
3150
3151 fn respond(
3152 self: Box<Self>,
3153 result: Result<http::Response, UpstreamRequestError>,
3154 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3155 Box::pin(async {
3156 match result {
3157 Ok(mut response) => {
3158 response.consume().await.ok();
3159 }
3160 Err(error) => {
3161 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3162
3163 if error.is_received() {
3166 return;
3167 }
3168
3169 self.create_error_outcomes()
3170 }
3171 }
3172 })
3173 }
3174}
3175
3176#[derive(Copy, Clone, Debug)]
3178struct CombinedQuotas<'a> {
3179 global_quotas: &'a [Quota],
3180 project_quotas: &'a [Quota],
3181}
3182
3183impl<'a> CombinedQuotas<'a> {
3184 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3186 Self {
3187 global_quotas: &global_config.quotas,
3188 project_quotas,
3189 }
3190 }
3191
3192 pub fn is_empty(&self) -> bool {
3194 self.len() == 0
3195 }
3196
3197 pub fn len(&self) -> usize {
3199 self.global_quotas.len() + self.project_quotas.len()
3200 }
3201}
3202
3203impl<'a> IntoIterator for CombinedQuotas<'a> {
3204 type Item = &'a Quota;
3205 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3206
3207 fn into_iter(self) -> Self::IntoIter {
3208 self.global_quotas.iter().chain(self.project_quotas.iter())
3209 }
3210}
3211
3212#[cfg(test)]
3213mod tests {
3214 use insta::assert_debug_snapshot;
3215 use relay_common::glob2::LazyGlob;
3216 use relay_dynamic_config::ProjectConfig;
3217 use relay_event_normalization::{
3218 NormalizationConfig, RedactionRule, TransactionNameConfig, TransactionNameRule,
3219 };
3220 use relay_event_schema::protocol::TransactionSource;
3221 use relay_pii::DataScrubbingConfig;
3222 use similar_asserts::assert_eq;
3223
3224 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3225
3226 #[cfg(feature = "processing")]
3227 use {
3228 relay_metrics::BucketValue,
3229 relay_quotas::{QuotaScope, ReasonCode},
3230 relay_test::mock_service,
3231 };
3232
3233 use super::*;
3234
3235 #[cfg(feature = "processing")]
3236 fn mock_quota(id: &str) -> Quota {
3237 Quota {
3238 id: Some(id.into()),
3239 categories: [DataCategory::MetricBucket].into(),
3240 scope: QuotaScope::Organization,
3241 scope_id: None,
3242 limit: Some(0),
3243 window: None,
3244 reason_code: None,
3245 namespace: None,
3246 }
3247 }
3248
3249 #[cfg(feature = "processing")]
3250 #[test]
3251 fn test_dynamic_quotas() {
3252 let global_config = GlobalConfig {
3253 quotas: vec![mock_quota("foo"), mock_quota("bar")],
3254 ..Default::default()
3255 };
3256
3257 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3258
3259 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3260
3261 assert_eq!(dynamic_quotas.len(), 4);
3262 assert!(!dynamic_quotas.is_empty());
3263
3264 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3265 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3266 }
3267
3268 #[cfg(feature = "processing")]
3271 #[tokio::test]
3272 async fn test_ratelimit_per_batch() {
3273 use relay_base_schema::organization::OrganizationId;
3274 use relay_protocol::FiniteF64;
3275
3276 let rate_limited_org = Scoping {
3277 organization_id: OrganizationId::new(1),
3278 project_id: ProjectId::new(21),
3279 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3280 key_id: Some(17),
3281 };
3282
3283 let not_rate_limited_org = Scoping {
3284 organization_id: OrganizationId::new(2),
3285 project_id: ProjectId::new(21),
3286 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3287 key_id: Some(17),
3288 };
3289
3290 let message = {
3291 let project_info = {
3292 let quota = Quota {
3293 id: Some("testing".into()),
3294 categories: [DataCategory::MetricBucket].into(),
3295 scope: relay_quotas::QuotaScope::Organization,
3296 scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3297 limit: Some(0),
3298 window: None,
3299 reason_code: Some(ReasonCode::new("test")),
3300 namespace: None,
3301 };
3302
3303 let mut config = ProjectConfig::default();
3304 config.quotas.push(quota);
3305
3306 Arc::new(ProjectInfo {
3307 config,
3308 ..Default::default()
3309 })
3310 };
3311
3312 let project_metrics = |scoping| ProjectBuckets {
3313 buckets: vec![Bucket {
3314 name: "d:transactions/bar".into(),
3315 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3316 timestamp: UnixTimestamp::now(),
3317 tags: Default::default(),
3318 width: 10,
3319 metadata: BucketMetadata::default(),
3320 }],
3321 rate_limits: Default::default(),
3322 project_info: project_info.clone(),
3323 scoping,
3324 };
3325
3326 let buckets = hashbrown::HashMap::from([
3327 (
3328 rate_limited_org.project_key,
3329 project_metrics(rate_limited_org),
3330 ),
3331 (
3332 not_rate_limited_org.project_key,
3333 project_metrics(not_rate_limited_org),
3334 ),
3335 ]);
3336
3337 FlushBuckets {
3338 partition_key: 0,
3339 buckets,
3340 }
3341 };
3342
3343 assert_eq!(message.buckets.keys().count(), 2);
3345
3346 let config = {
3347 let config_json = serde_json::json!({
3348 "processing": {
3349 "enabled": true,
3350 "kafka_config": [],
3351 "redis": {
3352 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3353 }
3354 }
3355 });
3356 Config::from_json_value(config_json).unwrap()
3357 };
3358
3359 let (store, handle) = {
3360 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3361 let org_id = match msg {
3362 Store::Metrics(x) => x.scoping.organization_id,
3363 _ => panic!("received envelope when expecting only metrics"),
3364 };
3365 org_ids.push(org_id);
3366 };
3367
3368 mock_service("store_forwarder", vec![], f)
3369 };
3370
3371 let processor = create_test_processor(config).await;
3372 assert!(processor.redis_rate_limiter_enabled());
3373
3374 processor.encode_metrics_processing(message, &store).await;
3375
3376 drop(store);
3377 let orgs_not_ratelimited = handle.await.unwrap();
3378
3379 assert_eq!(
3380 orgs_not_ratelimited,
3381 vec![not_rate_limited_org.organization_id]
3382 );
3383 }
3384
3385 #[tokio::test]
3386 async fn test_browser_version_extraction_with_pii_like_data() {
3387 let processor = create_test_processor(Default::default()).await;
3388 let outcome_aggregator = Addr::dummy();
3389 let event_id = EventId::new();
3390
3391 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3392 .parse()
3393 .unwrap();
3394
3395 let request_meta = RequestMeta::new(dsn);
3396 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3397
3398 envelope.add_item({
3399 let mut item = Item::new(ItemType::Event);
3400 item.set_payload(
3401 ContentType::Json,
3402 r#"
3403 {
3404 "request": {
3405 "headers": [
3406 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3407 ]
3408 }
3409 }
3410 "#,
3411 );
3412 item
3413 });
3414
3415 let mut datascrubbing_settings = DataScrubbingConfig::default();
3416 datascrubbing_settings.scrub_data = true;
3418 datascrubbing_settings.scrub_defaults = true;
3419 datascrubbing_settings.scrub_ip_addresses = true;
3420
3421 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3423
3424 let config = ProjectConfig {
3425 datascrubbing_settings,
3426 pii_config: Some(pii_config),
3427 ..Default::default()
3428 };
3429
3430 let project_info = ProjectInfo {
3431 config,
3432 ..Default::default()
3433 };
3434
3435 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3436 assert_eq!(envelopes.len(), 1);
3437
3438 let (group, envelope) = envelopes.pop().unwrap();
3439 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3440
3441 let message = ProcessEnvelopeGrouped {
3442 group,
3443 envelope,
3444 ctx: processing::Context {
3445 project_info: &project_info,
3446 ..processing::Context::for_test()
3447 },
3448 };
3449
3450 let Ok(Some(Submit::Envelope(mut new_envelope))) = processor.process(message).await else {
3451 panic!();
3452 };
3453 let new_envelope = new_envelope.envelope_mut();
3454
3455 let event_item = new_envelope.items().last().unwrap();
3456 let annotated_event: Annotated<Event> =
3457 Annotated::from_json_bytes(&event_item.payload()).unwrap();
3458 let event = annotated_event.into_value().unwrap();
3459 let headers = event
3460 .request
3461 .into_value()
3462 .unwrap()
3463 .headers
3464 .into_value()
3465 .unwrap();
3466
3467 assert_eq!(
3469 Some(
3470 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3471 ),
3472 headers.get_header("User-Agent")
3473 );
3474 let contexts = event.contexts.into_value().unwrap();
3476 let browser = contexts.0.get("browser").unwrap();
3477 assert_eq!(
3478 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3479 browser.to_json().unwrap()
3480 );
3481 }
3482
3483 #[tokio::test]
3484 #[cfg(feature = "processing")]
3485 async fn test_materialize_dsc() {
3486 use crate::services::projects::project::PublicKeyConfig;
3487
3488 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3489 .parse()
3490 .unwrap();
3491 let request_meta = RequestMeta::new(dsn);
3492 let mut envelope = Envelope::from_request(None, request_meta);
3493
3494 let dsc = r#"{
3495 "trace_id": "00000000-0000-0000-0000-000000000000",
3496 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3497 "sample_rate": "0.2"
3498 }"#;
3499 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3500
3501 let mut item = Item::new(ItemType::Event);
3502 item.set_payload(ContentType::Json, r#"{}"#);
3503 envelope.add_item(item);
3504
3505 let outcome_aggregator = Addr::dummy();
3506 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3507
3508 let mut project_info = ProjectInfo::default();
3509 project_info.public_keys.push(PublicKeyConfig {
3510 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3511 numeric_id: Some(1),
3512 });
3513
3514 let config = serde_json::json!({
3515 "processing": {
3516 "enabled": true,
3517 "kafka_config": [],
3518 }
3519 });
3520
3521 let message = ProcessEnvelopeGrouped {
3522 group: ProcessingGroup::Error,
3523 envelope: managed_envelope,
3524 ctx: processing::Context {
3525 config: &Config::from_json_value(config.clone()).unwrap(),
3526 project_info: &project_info,
3527 sampling_project_info: Some(&project_info),
3528 ..processing::Context::for_test()
3529 },
3530 };
3531
3532 let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3533 let Ok(Some(Submit::Envelope(envelope))) = processor.process(message).await else {
3534 panic!();
3535 };
3536 let event = envelope
3537 .envelope()
3538 .get_item_by(|item| item.ty() == &ItemType::Event)
3539 .unwrap();
3540
3541 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3542 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3543 Object(
3544 {
3545 "environment": ~,
3546 "public_key": String(
3547 "e12d836b15bb49d7bbf99e64295d995b",
3548 ),
3549 "release": ~,
3550 "replay_id": ~,
3551 "sample_rate": String(
3552 "0.2",
3553 ),
3554 "trace_id": String(
3555 "00000000000000000000000000000000",
3556 ),
3557 "transaction": ~,
3558 },
3559 )
3560 "###);
3561 }
3562
3563 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3564 let mut event = Annotated::<Event>::from_json(
3565 r#"
3566 {
3567 "type": "transaction",
3568 "transaction": "/foo/",
3569 "timestamp": 946684810.0,
3570 "start_timestamp": 946684800.0,
3571 "contexts": {
3572 "trace": {
3573 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3574 "span_id": "fa90fdead5f74053",
3575 "op": "http.server",
3576 "type": "trace"
3577 }
3578 },
3579 "transaction_info": {
3580 "source": "url"
3581 }
3582 }
3583 "#,
3584 )
3585 .unwrap();
3586 let e = event.value_mut().as_mut().unwrap();
3587 e.transaction.set_value(Some(transaction_name.into()));
3588
3589 e.transaction_info
3590 .value_mut()
3591 .as_mut()
3592 .unwrap()
3593 .source
3594 .set_value(Some(source));
3595
3596 relay_statsd::with_capturing_test_client(|| {
3597 utils::log_transaction_name_metrics(&mut event, |event| {
3598 let config = NormalizationConfig {
3599 transaction_name_config: TransactionNameConfig {
3600 rules: &[TransactionNameRule {
3601 pattern: LazyGlob::new("/foo/*/**".to_owned()),
3602 expiry: DateTime::<Utc>::MAX_UTC,
3603 redaction: RedactionRule::Replace {
3604 substitution: "*".to_owned(),
3605 },
3606 }],
3607 },
3608 ..Default::default()
3609 };
3610 relay_event_normalization::normalize_event(event, &config)
3611 });
3612 })
3613 }
3614
3615 #[test]
3616 fn test_log_transaction_metrics_none() {
3617 let captures = capture_test_event("/nothing", TransactionSource::Url);
3618 insta::assert_debug_snapshot!(captures, @r###"
3619 [
3620 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3621 ]
3622 "###);
3623 }
3624
3625 #[test]
3626 fn test_log_transaction_metrics_rule() {
3627 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3628 insta::assert_debug_snapshot!(captures, @r###"
3629 [
3630 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3631 ]
3632 "###);
3633 }
3634
3635 #[test]
3636 fn test_log_transaction_metrics_pattern() {
3637 let captures = capture_test_event("/something/12345", TransactionSource::Url);
3638 insta::assert_debug_snapshot!(captures, @r###"
3639 [
3640 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3641 ]
3642 "###);
3643 }
3644
3645 #[test]
3646 fn test_log_transaction_metrics_both() {
3647 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3648 insta::assert_debug_snapshot!(captures, @r###"
3649 [
3650 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3651 ]
3652 "###);
3653 }
3654
3655 #[test]
3656 fn test_log_transaction_metrics_no_match() {
3657 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3658 insta::assert_debug_snapshot!(captures, @r###"
3659 [
3660 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3661 ]
3662 "###);
3663 }
3664
3665 #[tokio::test]
3666 async fn test_process_metrics_bucket_metadata() {
3667 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3668 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3669 let received_at = Utc::now();
3670 let config = Config::default();
3671
3672 let (aggregator, mut aggregator_rx) = Addr::custom();
3673 let processor = create_test_processor_with_addrs(
3674 config,
3675 Addrs {
3676 aggregator,
3677 ..Default::default()
3678 },
3679 )
3680 .await;
3681
3682 let mut item = Item::new(ItemType::Statsd);
3683 item.set_payload(
3684 ContentType::Text,
3685 "transactions/foo:3182887624:4267882815|s",
3686 );
3687 for (source, expected_received_at) in [
3688 (
3689 BucketSource::External,
3690 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3691 ),
3692 (BucketSource::Internal, None),
3693 ] {
3694 let message = ProcessMetrics {
3695 data: MetricData::Raw(vec![item.clone()]),
3696 project_key,
3697 source,
3698 received_at,
3699 sent_at: Some(Utc::now()),
3700 };
3701 processor.handle_process_metrics(&mut token, message);
3702
3703 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3704 let buckets = merge_buckets.buckets;
3705 assert_eq!(buckets.len(), 1);
3706 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3707 }
3708 }
3709
3710 #[tokio::test]
3711 async fn test_process_batched_metrics() {
3712 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3713 let received_at = Utc::now();
3714 let config = Config::default();
3715
3716 let (aggregator, mut aggregator_rx) = Addr::custom();
3717 let processor = create_test_processor_with_addrs(
3718 config,
3719 Addrs {
3720 aggregator,
3721 ..Default::default()
3722 },
3723 )
3724 .await;
3725
3726 let payload = r#"{
3727 "buckets": {
3728 "11111111111111111111111111111111": [
3729 {
3730 "timestamp": 1615889440,
3731 "width": 0,
3732 "name": "d:custom/endpoint.response_time@millisecond",
3733 "type": "d",
3734 "value": [
3735 68.0
3736 ],
3737 "tags": {
3738 "route": "user_index"
3739 }
3740 }
3741 ],
3742 "22222222222222222222222222222222": [
3743 {
3744 "timestamp": 1615889440,
3745 "width": 0,
3746 "name": "d:custom/endpoint.cache_rate@none",
3747 "type": "d",
3748 "value": [
3749 36.0
3750 ]
3751 }
3752 ]
3753 }
3754}
3755"#;
3756 let message = ProcessBatchedMetrics {
3757 payload: Bytes::from(payload),
3758 source: BucketSource::Internal,
3759 received_at,
3760 sent_at: Some(Utc::now()),
3761 };
3762 processor.handle_process_batched_metrics(&mut token, message);
3763
3764 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3765 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3766
3767 let mut messages = vec![mb1, mb2];
3768 messages.sort_by_key(|mb| mb.project_key);
3769
3770 let actual = messages
3771 .into_iter()
3772 .map(|mb| (mb.project_key, mb.buckets))
3773 .collect::<Vec<_>>();
3774
3775 assert_debug_snapshot!(actual, @r###"
3776 [
3777 (
3778 ProjectKey("11111111111111111111111111111111"),
3779 [
3780 Bucket {
3781 timestamp: UnixTimestamp(1615889440),
3782 width: 0,
3783 name: MetricName(
3784 "d:custom/endpoint.response_time@millisecond",
3785 ),
3786 value: Distribution(
3787 [
3788 68.0,
3789 ],
3790 ),
3791 tags: {
3792 "route": "user_index",
3793 },
3794 metadata: BucketMetadata {
3795 merges: 1,
3796 received_at: None,
3797 extracted_from_indexed: false,
3798 },
3799 },
3800 ],
3801 ),
3802 (
3803 ProjectKey("22222222222222222222222222222222"),
3804 [
3805 Bucket {
3806 timestamp: UnixTimestamp(1615889440),
3807 width: 0,
3808 name: MetricName(
3809 "d:custom/endpoint.cache_rate@none",
3810 ),
3811 value: Distribution(
3812 [
3813 36.0,
3814 ],
3815 ),
3816 tags: {},
3817 metadata: BucketMetadata {
3818 merges: 1,
3819 received_at: None,
3820 extracted_from_indexed: false,
3821 },
3822 },
3823 ],
3824 ),
3825 ]
3826 "###);
3827 }
3828}