1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27 ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_protocol::Annotated;
32use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
33use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision};
34use relay_statsd::metric;
35use relay_system::{Addr, FromMessage, NoResponse, Service};
36use reqwest::header;
37use smallvec::{SmallVec, smallvec};
38use zstd::stream::Encoder as ZstdEncoder;
39
40use crate::envelope::{
41 self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
42};
43use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
44use crate::integrations::Integration;
45use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
46use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
47use crate::metrics_extraction::transactions::ExtractedMetrics;
48use crate::metrics_extraction::transactions::types::ExtractMetricsError;
49use crate::processing::attachments::AttachmentProcessor;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::client_reports::ClientReportsProcessor;
52use crate::processing::errors::{ErrorsProcessor, SwitchProcessingError};
53use crate::processing::logs::LogsProcessor;
54use crate::processing::profile_chunks::ProfileChunksProcessor;
55use crate::processing::replays::ReplaysProcessor;
56use crate::processing::sessions::SessionsProcessor;
57use crate::processing::spans::SpansProcessor;
58use crate::processing::trace_attachments::TraceAttachmentsProcessor;
59use crate::processing::trace_metrics::TraceMetricsProcessor;
60use crate::processing::transactions::TransactionProcessor;
61use crate::processing::user_reports::UserReportsProcessor;
62use crate::processing::utils::event::{
63 EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
64 event_type,
65};
66use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
67use crate::service::ServiceError;
68use crate::services::global_config::GlobalConfigHandle;
69use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
70use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
71use crate::services::projects::cache::ProjectCacheHandle;
72use crate::services::projects::project::{ProjectInfo, ProjectState};
73use crate::services::upstream::{
74 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
75};
76use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
77use crate::utils::{self, CheckLimits, EnvelopeLimiter};
78use crate::{http, processing};
79use relay_threading::AsyncPool;
80#[cfg(feature = "processing")]
81use {
82 crate::services::objectstore::Objectstore,
83 crate::services::store::Store,
84 crate::utils::Enforcement,
85 itertools::Itertools,
86 relay_cardinality::{
87 CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
88 RedisSetLimiterOptions,
89 },
90 relay_dynamic_config::CardinalityLimiterMode,
91 relay_quotas::{RateLimitingError, RedisRateLimiter},
92 relay_redis::RedisClients,
93 std::time::Instant,
94 symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
95};
96
97mod attachment;
98mod dynamic_sampling;
99pub mod event;
100mod metrics;
101mod nel;
102mod profile;
103mod report;
104mod span;
105
106#[cfg(all(sentry, feature = "processing"))]
107pub mod playstation;
108mod standalone;
109#[cfg(feature = "processing")]
110mod unreal;
111
112#[cfg(feature = "processing")]
113mod nnswitch;
114
115macro_rules! if_processing {
119 ($config:expr, $if_true:block) => {
120 #[cfg(feature = "processing")] {
121 if $config.processing_enabled() $if_true
122 }
123 };
124 ($config:expr, $if_true:block else $if_false:block) => {
125 {
126 #[cfg(feature = "processing")] {
127 if $config.processing_enabled() $if_true else $if_false
128 }
129 #[cfg(not(feature = "processing"))] {
130 $if_false
131 }
132 }
133 };
134}
135
136pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
138
139#[derive(Debug)]
140pub struct GroupTypeError;
141
142impl Display for GroupTypeError {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 f.write_str("failed to convert processing group into corresponding type")
145 }
146}
147
148impl std::error::Error for GroupTypeError {}
149
150macro_rules! processing_group {
151 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
152 #[derive(Clone, Copy, Debug)]
153 pub struct $ty;
154
155 impl From<$ty> for ProcessingGroup {
156 fn from(_: $ty) -> Self {
157 ProcessingGroup::$variant
158 }
159 }
160
161 impl TryFrom<ProcessingGroup> for $ty {
162 type Error = GroupTypeError;
163
164 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
165 if matches!(value, ProcessingGroup::$variant) {
166 return Ok($ty);
167 }
168 $($(
169 if matches!(value, ProcessingGroup::$other) {
170 return Ok($ty);
171 }
172 )+)?
173 return Err(GroupTypeError);
174 }
175 }
176 };
177}
178
179pub trait EventProcessing {}
183
184processing_group!(TransactionGroup, Transaction);
185impl EventProcessing for TransactionGroup {}
186
187processing_group!(ErrorGroup, Error);
188impl EventProcessing for ErrorGroup {}
189
190processing_group!(SessionGroup, Session);
191processing_group!(
192 StandaloneGroup,
193 Standalone,
194 StandaloneAttachments,
195 StandaloneUserReports
196);
197processing_group!(ClientReportGroup, ClientReport);
198processing_group!(ReplayGroup, Replay);
199processing_group!(CheckInGroup, CheckIn);
200processing_group!(LogGroup, Log, Nel);
201processing_group!(TraceMetricGroup, TraceMetric);
202processing_group!(SpanGroup, Span);
203
204processing_group!(ProfileChunkGroup, ProfileChunk);
205processing_group!(MetricsGroup, Metrics);
206processing_group!(ForwardUnknownGroup, ForwardUnknown);
207processing_group!(Ungrouped, Ungrouped);
208
209#[derive(Clone, Copy, Debug)]
213pub struct Processed;
214
215#[derive(Clone, Copy, Debug)]
217pub enum ProcessingGroup {
218 Transaction,
222 Error,
227 Session,
229 Standalone,
232 StandaloneAttachments,
236 StandaloneUserReports,
240 ClientReport,
242 Replay,
244 CheckIn,
246 Nel,
248 Log,
250 TraceMetric,
252 Span,
254 SpanV2,
256 Metrics,
258 ProfileChunk,
260 TraceAttachment,
262 ForwardUnknown,
265 Ungrouped,
267}
268
269impl ProcessingGroup {
270 fn split_envelope(
272 mut envelope: Envelope,
273 project_info: &ProjectInfo,
274 ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
275 let headers = envelope.headers().clone();
276 let mut grouped_envelopes = smallvec![];
277
278 let replay_items = envelope.take_items_by(|item| {
280 matches!(
281 item.ty(),
282 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
283 )
284 });
285 if !replay_items.is_empty() {
286 grouped_envelopes.push((
287 ProcessingGroup::Replay,
288 Envelope::from_parts(headers.clone(), replay_items),
289 ))
290 }
291
292 let session_items = envelope
294 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
295 if !session_items.is_empty() {
296 grouped_envelopes.push((
297 ProcessingGroup::Session,
298 Envelope::from_parts(headers.clone(), session_items),
299 ))
300 }
301
302 let span_v2_items = envelope.take_items_by(|item| {
303 let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
304
305 ItemContainer::<SpanV2>::is_container(item)
306 || matches!(item.integration(), Some(Integration::Spans(_)))
307 || (exp_feature && matches!(item.ty(), &ItemType::Span))
309 || (exp_feature && item.is_span_attachment())
310 });
311
312 if !span_v2_items.is_empty() {
313 grouped_envelopes.push((
314 ProcessingGroup::SpanV2,
315 Envelope::from_parts(headers.clone(), span_v2_items),
316 ))
317 }
318
319 let span_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Span));
321 if !span_items.is_empty() {
322 grouped_envelopes.push((
323 ProcessingGroup::Span,
324 Envelope::from_parts(headers.clone(), span_items),
325 ))
326 }
327
328 let logs_items = envelope.take_items_by(|item| {
330 matches!(item.ty(), &ItemType::Log)
331 || matches!(item.integration(), Some(Integration::Logs(_)))
332 });
333 if !logs_items.is_empty() {
334 grouped_envelopes.push((
335 ProcessingGroup::Log,
336 Envelope::from_parts(headers.clone(), logs_items),
337 ))
338 }
339
340 let trace_metric_items =
342 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
343 if !trace_metric_items.is_empty() {
344 grouped_envelopes.push((
345 ProcessingGroup::TraceMetric,
346 Envelope::from_parts(headers.clone(), trace_metric_items),
347 ))
348 }
349
350 let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
352 if !nel_items.is_empty() {
353 grouped_envelopes.push((
354 ProcessingGroup::Nel,
355 Envelope::from_parts(headers.clone(), nel_items),
356 ))
357 }
358
359 let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
364 if !metric_items.is_empty() {
365 grouped_envelopes.push((
366 ProcessingGroup::Metrics,
367 Envelope::from_parts(headers.clone(), metric_items),
368 ))
369 }
370
371 let profile_chunk_items =
373 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
374 if !profile_chunk_items.is_empty() {
375 grouped_envelopes.push((
376 ProcessingGroup::ProfileChunk,
377 Envelope::from_parts(headers.clone(), profile_chunk_items),
378 ))
379 }
380
381 let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
382 if !trace_attachment_items.is_empty() {
383 grouped_envelopes.push((
384 ProcessingGroup::TraceAttachment,
385 Envelope::from_parts(headers.clone(), trace_attachment_items),
386 ))
387 }
388
389 if !envelope.items().any(Item::creates_event) {
390 let standalone_attachments = envelope
392 .take_items_by(|i| i.requires_event() && matches!(i.ty(), ItemType::Attachment));
393 if !standalone_attachments.is_empty() {
394 grouped_envelopes.push((
395 ProcessingGroup::StandaloneAttachments,
396 Envelope::from_parts(headers.clone(), standalone_attachments),
397 ))
398 }
399
400 let standalone_user_reports =
402 envelope.take_items_by(|i| matches!(i.ty(), ItemType::UserReport));
403 if !standalone_user_reports.is_empty() {
404 grouped_envelopes.push((
405 ProcessingGroup::StandaloneUserReports,
406 Envelope::from_parts(headers.clone(), standalone_user_reports),
407 ));
408 }
409 }
410
411 if !envelope.items().any(Item::creates_event) {
416 let standalone_items = envelope.take_items_by(Item::requires_event);
417 if !standalone_items.is_empty() {
418 grouped_envelopes.push((
419 ProcessingGroup::Standalone,
420 Envelope::from_parts(headers.clone(), standalone_items),
421 ))
422 }
423 };
424
425 let security_reports_items = envelope
427 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
428 .into_iter()
429 .map(|item| {
430 let headers = headers.clone();
431 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
432 let mut envelope = Envelope::from_parts(headers, items);
433 envelope.set_event_id(EventId::new());
434 (ProcessingGroup::Error, envelope)
435 });
436 grouped_envelopes.extend(security_reports_items);
437
438 let require_event_items = envelope.take_items_by(Item::requires_event);
440 if !require_event_items.is_empty() {
441 let group = if require_event_items
442 .iter()
443 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
444 {
445 ProcessingGroup::Transaction
446 } else {
447 ProcessingGroup::Error
448 };
449
450 grouped_envelopes.push((
451 group,
452 Envelope::from_parts(headers.clone(), require_event_items),
453 ))
454 }
455
456 let envelopes = envelope.items_mut().map(|item| {
458 let headers = headers.clone();
459 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
460 let envelope = Envelope::from_parts(headers, items);
461 let item_type = item.ty();
462 let group = if matches!(item_type, &ItemType::CheckIn) {
463 ProcessingGroup::CheckIn
464 } else if matches!(item.ty(), &ItemType::ClientReport) {
465 ProcessingGroup::ClientReport
466 } else if matches!(item_type, &ItemType::Unknown(_)) {
467 ProcessingGroup::ForwardUnknown
468 } else {
469 ProcessingGroup::Ungrouped
471 };
472
473 (group, envelope)
474 });
475 grouped_envelopes.extend(envelopes);
476
477 grouped_envelopes
478 }
479
480 pub fn variant(&self) -> &'static str {
482 match self {
483 ProcessingGroup::Transaction => "transaction",
484 ProcessingGroup::Error => "error",
485 ProcessingGroup::Session => "session",
486 ProcessingGroup::Standalone => "standalone",
487 ProcessingGroup::StandaloneAttachments => "standalone_attachment",
488 ProcessingGroup::StandaloneUserReports => "standalone_user_reports",
489 ProcessingGroup::ClientReport => "client_report",
490 ProcessingGroup::Replay => "replay",
491 ProcessingGroup::CheckIn => "check_in",
492 ProcessingGroup::Log => "log",
493 ProcessingGroup::TraceMetric => "trace_metric",
494 ProcessingGroup::Nel => "nel",
495 ProcessingGroup::Span => "span",
496 ProcessingGroup::SpanV2 => "span_v2",
497 ProcessingGroup::Metrics => "metrics",
498 ProcessingGroup::ProfileChunk => "profile_chunk",
499 ProcessingGroup::TraceAttachment => "trace_attachment",
500 ProcessingGroup::ForwardUnknown => "forward_unknown",
501 ProcessingGroup::Ungrouped => "ungrouped",
502 }
503 }
504}
505
506impl From<ProcessingGroup> for AppFeature {
507 fn from(value: ProcessingGroup) -> Self {
508 match value {
509 ProcessingGroup::Transaction => AppFeature::Transactions,
510 ProcessingGroup::Error => AppFeature::Errors,
511 ProcessingGroup::Session => AppFeature::Sessions,
512 ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
513 ProcessingGroup::StandaloneAttachments => AppFeature::UnattributedEnvelope,
514 ProcessingGroup::StandaloneUserReports => AppFeature::UserReports,
515 ProcessingGroup::ClientReport => AppFeature::ClientReports,
516 ProcessingGroup::Replay => AppFeature::Replays,
517 ProcessingGroup::CheckIn => AppFeature::CheckIns,
518 ProcessingGroup::Log => AppFeature::Logs,
519 ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
520 ProcessingGroup::Nel => AppFeature::Logs,
521 ProcessingGroup::Span => AppFeature::Spans,
522 ProcessingGroup::SpanV2 => AppFeature::Spans,
523 ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
524 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
525 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
526 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
527 ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
528 }
529 }
530}
531
532#[derive(Debug, thiserror::Error)]
534pub enum ProcessingError {
535 #[error("invalid json in event")]
536 InvalidJson(#[source] serde_json::Error),
537
538 #[error("invalid message pack event payload")]
539 InvalidMsgpack(#[from] rmp_serde::decode::Error),
540
541 #[cfg(feature = "processing")]
542 #[error("invalid unreal crash report")]
543 InvalidUnrealReport(#[source] Unreal4Error),
544
545 #[error("event payload too large")]
546 PayloadTooLarge(DiscardItemType),
547
548 #[error("invalid transaction event")]
549 InvalidTransaction,
550
551 #[error("the item is not allowed/supported in this envelope")]
552 UnsupportedItem,
553
554 #[error("envelope processor failed")]
555 ProcessingFailed(#[from] ProcessingAction),
556
557 #[error("duplicate {0} in event")]
558 DuplicateItem(ItemType),
559
560 #[error("failed to extract event payload")]
561 NoEventPayload,
562
563 #[error("missing project id in DSN")]
564 MissingProjectId,
565
566 #[error("invalid security report type: {0:?}")]
567 InvalidSecurityType(Bytes),
568
569 #[error("unsupported security report type")]
570 UnsupportedSecurityType,
571
572 #[error("invalid security report")]
573 InvalidSecurityReport(#[source] serde_json::Error),
574
575 #[error("invalid nel report")]
576 InvalidNelReport(#[source] NetworkReportError),
577
578 #[error("event filtered with reason: {0:?}")]
579 EventFiltered(FilterStatKey),
580
581 #[error("missing or invalid required event timestamp")]
582 InvalidTimestamp,
583
584 #[error("could not serialize event payload")]
585 SerializeFailed(#[source] serde_json::Error),
586
587 #[cfg(feature = "processing")]
588 #[error("failed to apply quotas")]
589 QuotasFailed(#[from] RateLimitingError),
590
591 #[error("invalid processing group type")]
592 InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
593
594 #[error("nintendo switch dying message processing failed {0:?}")]
595 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
596
597 #[cfg(all(sentry, feature = "processing"))]
598 #[error("playstation dump processing failed: {0}")]
599 InvalidPlaystationDump(String),
600
601 #[error("processing group does not match specific processor")]
602 ProcessingGroupMismatch,
603 #[error("new processing pipeline failed")]
604 ProcessingFailure,
605}
606
607impl ProcessingError {
608 pub fn to_outcome(&self) -> Option<Outcome> {
609 match self {
610 Self::PayloadTooLarge(payload_type) => {
611 Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
612 }
613 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
614 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
615 Self::InvalidSecurityType(_) => {
616 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
617 }
618 Self::UnsupportedItem => Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)),
619 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
620 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
621 Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
622 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
623 Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
624 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
625 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
626 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
627 #[cfg(all(sentry, feature = "processing"))]
628 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
629 #[cfg(feature = "processing")]
630 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
631 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
632 }
633 #[cfg(feature = "processing")]
634 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
635 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
636 Some(Outcome::Invalid(DiscardReason::Internal))
637 }
638 #[cfg(feature = "processing")]
639 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
640 Self::MissingProjectId => None,
641 Self::EventFiltered(key) => Some(Outcome::Filtered(key.clone())),
642 Self::InvalidProcessingGroup(_) => None,
643
644 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
645 Self::ProcessingFailure => None,
647 }
648 }
649
650 fn is_unexpected(&self) -> bool {
651 self.to_outcome()
652 .is_some_and(|outcome| outcome.is_unexpected())
653 }
654}
655
656#[cfg(feature = "processing")]
657impl From<Unreal4Error> for ProcessingError {
658 fn from(err: Unreal4Error) -> Self {
659 match err.kind() {
660 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
661 _ => ProcessingError::InvalidUnrealReport(err),
662 }
663 }
664}
665
666impl From<ExtractMetricsError> for ProcessingError {
667 fn from(error: ExtractMetricsError) -> Self {
668 match error {
669 ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
670 Self::InvalidTimestamp
671 }
672 }
673 }
674}
675
676impl From<InvalidProcessingGroupType> for ProcessingError {
677 fn from(value: InvalidProcessingGroupType) -> Self {
678 Self::InvalidProcessingGroup(Box::new(value))
679 }
680}
681
682type ExtractedEvent = (Annotated<Event>, usize);
683
684#[derive(Debug)]
689pub struct ProcessingExtractedMetrics {
690 metrics: ExtractedMetrics,
691}
692
693impl ProcessingExtractedMetrics {
694 pub fn new() -> Self {
695 Self {
696 metrics: ExtractedMetrics::default(),
697 }
698 }
699
700 pub fn into_inner(self) -> ExtractedMetrics {
701 self.metrics
702 }
703
704 pub fn extend(
706 &mut self,
707 extracted: ExtractedMetrics,
708 sampling_decision: Option<SamplingDecision>,
709 ) {
710 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
711 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
712 }
713
714 pub fn extend_project_metrics<I>(
716 &mut self,
717 buckets: I,
718 sampling_decision: Option<SamplingDecision>,
719 ) where
720 I: IntoIterator<Item = Bucket>,
721 {
722 self.metrics
723 .project_metrics
724 .extend(buckets.into_iter().map(|mut bucket| {
725 bucket.metadata.extracted_from_indexed =
726 sampling_decision == Some(SamplingDecision::Keep);
727 bucket
728 }));
729 }
730
731 pub fn extend_sampling_metrics<I>(
733 &mut self,
734 buckets: I,
735 sampling_decision: Option<SamplingDecision>,
736 ) where
737 I: IntoIterator<Item = Bucket>,
738 {
739 self.metrics
740 .sampling_metrics
741 .extend(buckets.into_iter().map(|mut bucket| {
742 bucket.metadata.extracted_from_indexed =
743 sampling_decision == Some(SamplingDecision::Keep);
744 bucket
745 }));
746 }
747
748 #[cfg(feature = "processing")]
753 fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
754 let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
756 let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
759
760 for (namespace, limit, indexed) in [
761 (
762 MetricNamespace::Transactions,
763 &enforcement.event,
764 &enforcement.event_indexed,
765 ),
766 (
767 MetricNamespace::Spans,
768 &enforcement.spans,
769 &enforcement.spans_indexed,
770 ),
771 ] {
772 if limit.is_active() {
773 drop_namespaces.push(namespace);
774 } else if indexed.is_active() && !enforced_consistently {
775 reset_extracted_from_indexed.push(namespace);
780 }
781 }
782
783 if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
784 self.retain_mut(|bucket| {
785 let Some(namespace) = bucket.name.try_namespace() else {
786 return true;
787 };
788
789 if drop_namespaces.contains(&namespace) {
790 return false;
791 }
792
793 if reset_extracted_from_indexed.contains(&namespace) {
794 bucket.metadata.extracted_from_indexed = false;
795 }
796
797 true
798 });
799 }
800 }
801
802 #[cfg(feature = "processing")]
803 fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
804 self.metrics.project_metrics.retain_mut(&mut f);
805 self.metrics.sampling_metrics.retain_mut(&mut f);
806 }
807}
808
809fn send_metrics(
810 metrics: ExtractedMetrics,
811 project_key: ProjectKey,
812 sampling_key: Option<ProjectKey>,
813 aggregator: &Addr<Aggregator>,
814) {
815 let ExtractedMetrics {
816 project_metrics,
817 sampling_metrics,
818 } = metrics;
819
820 if !project_metrics.is_empty() {
821 aggregator.send(MergeBuckets {
822 project_key,
823 buckets: project_metrics,
824 });
825 }
826
827 if !sampling_metrics.is_empty() {
828 let sampling_project_key = sampling_key.unwrap_or(project_key);
835 aggregator.send(MergeBuckets {
836 project_key: sampling_project_key,
837 buckets: sampling_metrics,
838 });
839 }
840}
841
842fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
847 match config.relay_mode() {
848 RelayMode::Proxy => false,
849 RelayMode::Managed => !project_info.has_feature(feature),
850 }
851}
852
853#[derive(Debug)]
856#[expect(
857 clippy::large_enum_variant,
858 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
859)]
860enum ProcessingResult {
861 Envelope {
862 managed_envelope: TypedEnvelope<Processed>,
863 extracted_metrics: ProcessingExtractedMetrics,
864 },
865 Output(Output<Outputs>),
866}
867
868impl ProcessingResult {
869 fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
871 Self::Envelope {
872 managed_envelope,
873 extracted_metrics: ProcessingExtractedMetrics::new(),
874 }
875 }
876}
877
878#[derive(Debug)]
880#[expect(
881 clippy::large_enum_variant,
882 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
883)]
884enum Submit<'a> {
885 Envelope(TypedEnvelope<Processed>),
887 Output {
889 output: Outputs,
890 ctx: processing::ForwardContext<'a>,
891 },
892}
893
894#[derive(Debug)]
904pub struct ProcessEnvelope {
905 pub envelope: ManagedEnvelope,
907 pub project_info: Arc<ProjectInfo>,
909 pub rate_limits: Arc<RateLimits>,
911 pub sampling_project_info: Option<Arc<ProjectInfo>>,
913 pub reservoir_counters: ReservoirCounters,
915}
916
917#[derive(Debug)]
919struct ProcessEnvelopeGrouped<'a> {
920 pub group: ProcessingGroup,
922 pub envelope: ManagedEnvelope,
924 pub ctx: processing::Context<'a>,
926}
927
928#[derive(Debug)]
940pub struct ProcessMetrics {
941 pub data: MetricData,
943 pub project_key: ProjectKey,
945 pub source: BucketSource,
947 pub received_at: DateTime<Utc>,
949 pub sent_at: Option<DateTime<Utc>>,
952}
953
954#[derive(Debug)]
956pub enum MetricData {
957 Raw(Vec<Item>),
959 Parsed(Vec<Bucket>),
961}
962
963impl MetricData {
964 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
969 let items = match self {
970 Self::Parsed(buckets) => return buckets,
971 Self::Raw(items) => items,
972 };
973
974 let mut buckets = Vec::new();
975 for item in items {
976 let payload = item.payload();
977 if item.ty() == &ItemType::Statsd {
978 for bucket_result in Bucket::parse_all(&payload, timestamp) {
979 match bucket_result {
980 Ok(bucket) => buckets.push(bucket),
981 Err(error) => relay_log::debug!(
982 error = &error as &dyn Error,
983 "failed to parse metric bucket from statsd format",
984 ),
985 }
986 }
987 } else if item.ty() == &ItemType::MetricBuckets {
988 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
989 Ok(parsed_buckets) => {
990 if buckets.is_empty() {
992 buckets = parsed_buckets;
993 } else {
994 buckets.extend(parsed_buckets);
995 }
996 }
997 Err(error) => {
998 relay_log::debug!(
999 error = &error as &dyn Error,
1000 "failed to parse metric bucket",
1001 );
1002 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1003 }
1004 }
1005 } else {
1006 relay_log::error!(
1007 "invalid item of type {} passed to ProcessMetrics",
1008 item.ty()
1009 );
1010 }
1011 }
1012 buckets
1013 }
1014}
1015
1016#[derive(Debug)]
1017pub struct ProcessBatchedMetrics {
1018 pub payload: Bytes,
1020 pub source: BucketSource,
1022 pub received_at: DateTime<Utc>,
1024 pub sent_at: Option<DateTime<Utc>>,
1026}
1027
1028#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1030pub enum BucketSource {
1031 Internal,
1037 External,
1042}
1043
1044impl BucketSource {
1045 pub fn from_meta(meta: &RequestMeta) -> Self {
1047 match meta.request_trust() {
1048 RequestTrust::Trusted => Self::Internal,
1049 RequestTrust::Untrusted => Self::External,
1050 }
1051 }
1052}
1053
1054#[derive(Debug)]
1056pub struct SubmitClientReports {
1057 pub client_reports: Vec<ClientReport>,
1059 pub scoping: Scoping,
1061}
1062
1063#[derive(Debug)]
1065pub enum EnvelopeProcessor {
1066 ProcessEnvelope(Box<ProcessEnvelope>),
1067 ProcessProjectMetrics(Box<ProcessMetrics>),
1068 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1069 FlushBuckets(Box<FlushBuckets>),
1070 SubmitClientReports(Box<SubmitClientReports>),
1071}
1072
1073impl EnvelopeProcessor {
1074 pub fn variant(&self) -> &'static str {
1076 match self {
1077 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1078 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1079 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1080 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1081 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1082 }
1083 }
1084}
1085
1086impl relay_system::Interface for EnvelopeProcessor {}
1087
1088impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1089 type Response = relay_system::NoResponse;
1090
1091 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1092 Self::ProcessEnvelope(Box::new(message))
1093 }
1094}
1095
1096impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1097 type Response = NoResponse;
1098
1099 fn from_message(message: ProcessMetrics, _: ()) -> Self {
1100 Self::ProcessProjectMetrics(Box::new(message))
1101 }
1102}
1103
1104impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1105 type Response = NoResponse;
1106
1107 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1108 Self::ProcessBatchedMetrics(Box::new(message))
1109 }
1110}
1111
1112impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1113 type Response = NoResponse;
1114
1115 fn from_message(message: FlushBuckets, _: ()) -> Self {
1116 Self::FlushBuckets(Box::new(message))
1117 }
1118}
1119
1120impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1121 type Response = NoResponse;
1122
1123 fn from_message(message: SubmitClientReports, _: ()) -> Self {
1124 Self::SubmitClientReports(Box::new(message))
1125 }
1126}
1127
1128pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1130
1131#[derive(Clone)]
1135pub struct EnvelopeProcessorService {
1136 inner: Arc<InnerProcessor>,
1137}
1138
1139pub struct Addrs {
1141 pub outcome_aggregator: Addr<TrackOutcome>,
1142 pub upstream_relay: Addr<UpstreamRelay>,
1143 #[cfg(feature = "processing")]
1144 pub objectstore: Option<Addr<Objectstore>>,
1145 #[cfg(feature = "processing")]
1146 pub store_forwarder: Option<Addr<Store>>,
1147 pub aggregator: Addr<Aggregator>,
1148}
1149
1150impl Default for Addrs {
1151 fn default() -> Self {
1152 Addrs {
1153 outcome_aggregator: Addr::dummy(),
1154 upstream_relay: Addr::dummy(),
1155 #[cfg(feature = "processing")]
1156 objectstore: None,
1157 #[cfg(feature = "processing")]
1158 store_forwarder: None,
1159 aggregator: Addr::dummy(),
1160 }
1161 }
1162}
1163
1164struct InnerProcessor {
1165 pool: EnvelopeProcessorServicePool,
1166 config: Arc<Config>,
1167 global_config: GlobalConfigHandle,
1168 project_cache: ProjectCacheHandle,
1169 cogs: Cogs,
1170 addrs: Addrs,
1171 #[cfg(feature = "processing")]
1172 rate_limiter: Option<Arc<RedisRateLimiter>>,
1173 geoip_lookup: GeoIpLookup,
1174 #[cfg(feature = "processing")]
1175 cardinality_limiter: Option<CardinalityLimiter>,
1176 metric_outcomes: MetricOutcomes,
1177 processing: Processing,
1178}
1179
1180struct Processing {
1181 errors: ErrorsProcessor,
1182 logs: LogsProcessor,
1183 trace_metrics: TraceMetricsProcessor,
1184 spans: SpansProcessor,
1185 check_ins: CheckInsProcessor,
1186 sessions: SessionsProcessor,
1187 transactions: TransactionProcessor,
1188 profile_chunks: ProfileChunksProcessor,
1189 trace_attachments: TraceAttachmentsProcessor,
1190 replays: ReplaysProcessor,
1191 client_reports: ClientReportsProcessor,
1192 attachments: AttachmentProcessor,
1193 user_reports: UserReportsProcessor,
1194}
1195
1196impl EnvelopeProcessorService {
1197 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1199 pub fn new(
1200 pool: EnvelopeProcessorServicePool,
1201 config: Arc<Config>,
1202 global_config: GlobalConfigHandle,
1203 project_cache: ProjectCacheHandle,
1204 cogs: Cogs,
1205 #[cfg(feature = "processing")] redis: Option<RedisClients>,
1206 addrs: Addrs,
1207 metric_outcomes: MetricOutcomes,
1208 ) -> Self {
1209 let geoip_lookup = config
1210 .geoip_path()
1211 .and_then(
1212 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1213 Ok(geoip) => Some(geoip),
1214 Err(err) => {
1215 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1216 None
1217 }
1218 },
1219 )
1220 .unwrap_or_else(GeoIpLookup::empty);
1221
1222 #[cfg(feature = "processing")]
1223 let (cardinality, quotas) = match redis {
1224 Some(RedisClients {
1225 cardinality,
1226 quotas,
1227 ..
1228 }) => (Some(cardinality), Some(quotas)),
1229 None => (None, None),
1230 };
1231 #[cfg(not(feature = "processing"))]
1232 let quotas = None;
1233
1234 #[cfg(feature = "processing")]
1235 let rate_limiter = quotas.clone().map(|redis| {
1236 RedisRateLimiter::new(redis)
1237 .max_limit(config.max_rate_limit())
1238 .cache(config.quota_cache_ratio(), config.quota_cache_max())
1239 });
1240
1241 let quota_limiter = Arc::new(QuotaRateLimiter::new(
1242 #[cfg(feature = "processing")]
1243 project_cache.clone(),
1244 #[cfg(feature = "processing")]
1245 rate_limiter.clone(),
1246 ));
1247 #[cfg(feature = "processing")]
1248 let rate_limiter = rate_limiter.map(Arc::new);
1249 let outcome_aggregator = addrs.outcome_aggregator.clone();
1250 let inner = InnerProcessor {
1251 pool,
1252 global_config,
1253 project_cache,
1254 cogs,
1255 #[cfg(feature = "processing")]
1256 rate_limiter,
1257 addrs,
1258 #[cfg(feature = "processing")]
1259 cardinality_limiter: cardinality
1260 .map(|cardinality| {
1261 RedisSetLimiter::new(
1262 RedisSetLimiterOptions {
1263 cache_vacuum_interval: config
1264 .cardinality_limiter_cache_vacuum_interval(),
1265 },
1266 cardinality,
1267 )
1268 })
1269 .map(CardinalityLimiter::new),
1270 metric_outcomes,
1271 processing: Processing {
1272 errors: ErrorsProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1273 logs: LogsProcessor::new(Arc::clone("a_limiter)),
1274 trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)),
1275 spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1276 check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)),
1277 sessions: SessionsProcessor::new(Arc::clone("a_limiter)),
1278 transactions: TransactionProcessor::new(
1279 Arc::clone("a_limiter),
1280 geoip_lookup.clone(),
1281 quotas.clone(),
1282 ),
1283 profile_chunks: ProfileChunksProcessor::new(Arc::clone("a_limiter)),
1284 trace_attachments: TraceAttachmentsProcessor::new(Arc::clone("a_limiter)),
1285 replays: ReplaysProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1286 client_reports: ClientReportsProcessor::new(outcome_aggregator),
1287 attachments: AttachmentProcessor::new(Arc::clone("a_limiter)),
1288 user_reports: UserReportsProcessor::new(quota_limiter),
1289 },
1290 geoip_lookup,
1291 config,
1292 };
1293
1294 Self {
1295 inner: Arc::new(inner),
1296 }
1297 }
1298
1299 async fn enforce_quotas<Group>(
1300 &self,
1301 managed_envelope: &mut TypedEnvelope<Group>,
1302 event: Annotated<Event>,
1303 extracted_metrics: &mut ProcessingExtractedMetrics,
1304 ctx: processing::Context<'_>,
1305 ) -> Result<Annotated<Event>, ProcessingError> {
1306 let cached_result = RateLimiter::Cached
1309 .enforce(managed_envelope, event, extracted_metrics, ctx)
1310 .await?;
1311
1312 if_processing!(self.inner.config, {
1313 let rate_limiter = match self.inner.rate_limiter.clone() {
1314 Some(rate_limiter) => rate_limiter,
1315 None => return Ok(cached_result.event),
1316 };
1317
1318 let consistent_result = RateLimiter::Consistent(rate_limiter)
1320 .enforce(
1321 managed_envelope,
1322 cached_result.event,
1323 extracted_metrics,
1324 ctx
1325 )
1326 .await?;
1327
1328 if !consistent_result.rate_limits.is_empty() {
1330 self.inner
1331 .project_cache
1332 .get(managed_envelope.scoping().project_key)
1333 .rate_limits()
1334 .merge(consistent_result.rate_limits);
1335 }
1336
1337 Ok(consistent_result.event)
1338 } else { Ok(cached_result.event) })
1339 }
1340
1341 async fn process_errors(
1343 &self,
1344 managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1345 project_id: ProjectId,
1346 mut ctx: processing::Context<'_>,
1347 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1348 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1349 let mut metrics = Metrics::default();
1350 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1351
1352 report::process_user_reports(managed_envelope);
1354
1355 if_processing!(self.inner.config, {
1356 unreal::expand(managed_envelope, &self.inner.config)?;
1357 #[cfg(sentry)]
1358 playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1359 nnswitch::expand(managed_envelope)?;
1360 });
1361
1362 let mut event = event::extract(
1363 managed_envelope,
1364 &mut metrics,
1365 event_fully_normalized,
1366 &self.inner.config,
1367 )?;
1368
1369 if_processing!(self.inner.config, {
1370 if let Some(inner_event_fully_normalized) =
1371 unreal::process(managed_envelope, &mut event)?
1372 {
1373 event_fully_normalized = inner_event_fully_normalized;
1374 }
1375 #[cfg(sentry)]
1376 if let Some(inner_event_fully_normalized) =
1377 playstation::process(managed_envelope, &mut event, ctx.project_info)?
1378 {
1379 event_fully_normalized = inner_event_fully_normalized;
1380 }
1381 if let Some(inner_event_fully_normalized) =
1382 attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1383 {
1384 event_fully_normalized = inner_event_fully_normalized;
1385 }
1386 });
1387
1388 ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1389 managed_envelope,
1390 &mut event,
1391 ctx.project_info,
1392 ctx.sampling_project_info,
1393 );
1394
1395 let attachments = managed_envelope
1396 .envelope()
1397 .items()
1398 .filter(|item| item.attachment_type() == Some(AttachmentType::Attachment));
1399 processing::utils::event::finalize(
1400 managed_envelope.envelope().headers(),
1401 &mut event,
1402 attachments,
1403 &mut metrics,
1404 ctx.config,
1405 )?;
1406 event_fully_normalized = processing::utils::event::normalize(
1407 managed_envelope.envelope().headers(),
1408 &mut event,
1409 event_fully_normalized,
1410 project_id,
1411 ctx,
1412 &self.inner.geoip_lookup,
1413 )?;
1414 let filter_run =
1415 processing::utils::event::filter(managed_envelope.envelope().headers(), &event, ctx)
1416 .map_err(ProcessingError::EventFiltered)?;
1417
1418 if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1419 dynamic_sampling::tag_error_with_sampling_decision(
1420 managed_envelope,
1421 &mut event,
1422 ctx.sampling_project_info,
1423 &self.inner.config,
1424 )
1425 .await;
1426 }
1427
1428 event = self
1429 .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1430 .await?;
1431
1432 if event.value().is_some() {
1433 processing::utils::event::scrub(&mut event, ctx.project_info)?;
1434 event::serialize(
1435 managed_envelope,
1436 &mut event,
1437 event_fully_normalized,
1438 EventMetricsExtracted(false),
1439 SpansExtracted(false),
1440 )?;
1441 event::emit_feedback_metrics(managed_envelope.envelope());
1442 }
1443
1444 let attachments = managed_envelope
1445 .envelope_mut()
1446 .items_mut()
1447 .filter(|i| i.ty() == &ItemType::Attachment);
1448 processing::utils::attachments::scrub(attachments, ctx.project_info, None);
1449
1450 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1451 relay_log::error!(
1452 tags.project = %project_id,
1453 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1454 "ingested event without normalizing"
1455 );
1456 }
1457
1458 Ok(Some(extracted_metrics))
1459 }
1460
1461 async fn process_standalone(
1463 &self,
1464 managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1465 ctx: processing::Context<'_>,
1466 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1467 for item in managed_envelope.envelope().items() {
1468 let attachment_type_tag = match item.attachment_type() {
1469 Some(t) => &t.to_string(),
1470 None => "",
1471 };
1472 relay_statsd::metric!(
1473 counter(RelayCounters::StandaloneItem) += 1,
1474 processor = "old",
1475 item_type = item.ty().name(),
1476 attachment_type = attachment_type_tag,
1477 );
1478 }
1479
1480 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1481
1482 standalone::process(managed_envelope);
1483
1484 profile::filter(managed_envelope, ctx.config, ctx.project_info);
1485
1486 self.enforce_quotas(
1487 managed_envelope,
1488 Annotated::empty(),
1489 &mut extracted_metrics,
1490 ctx,
1491 )
1492 .await?;
1493
1494 report::process_user_reports(managed_envelope);
1495
1496 Ok(Some(extracted_metrics))
1497 }
1498
1499 async fn process_nel(
1500 &self,
1501 mut managed_envelope: ManagedEnvelope,
1502 ctx: processing::Context<'_>,
1503 ) -> Result<ProcessingResult, ProcessingError> {
1504 nel::convert_to_logs(&mut managed_envelope);
1505 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1506 .await
1507 }
1508
1509 async fn process_with_processor<P: processing::Processor>(
1510 &self,
1511 processor: &P,
1512 mut managed_envelope: ManagedEnvelope,
1513 ctx: processing::Context<'_>,
1514 ) -> Result<ProcessingResult, ProcessingError>
1515 where
1516 Outputs: From<P::Output>,
1517 {
1518 let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1519 debug_assert!(
1520 false,
1521 "there must be work for the {} processor",
1522 std::any::type_name::<P>(),
1523 );
1524 return Err(ProcessingError::ProcessingGroupMismatch);
1525 };
1526
1527 managed_envelope.update();
1528 match managed_envelope.envelope().is_empty() {
1529 true => managed_envelope.accept(),
1530 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1531 }
1532
1533 processor
1534 .process(work, ctx)
1535 .await
1536 .map_err(|err| {
1537 relay_log::debug!(
1538 error = &err as &dyn std::error::Error,
1539 "processing pipeline failed"
1540 );
1541 ProcessingError::ProcessingFailure
1542 })
1543 .map(|o| o.map(Into::into))
1544 .map(ProcessingResult::Output)
1545 }
1546
1547 async fn process_standalone_spans(
1551 &self,
1552 managed_envelope: &mut TypedEnvelope<SpanGroup>,
1553 _project_id: ProjectId,
1554 ctx: processing::Context<'_>,
1555 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1556 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1557
1558 span::filter(managed_envelope, ctx.config, ctx.project_info);
1559
1560 if_processing!(self.inner.config, {
1561 span::process(
1562 managed_envelope,
1563 &mut Annotated::empty(),
1564 &mut extracted_metrics,
1565 _project_id,
1566 ctx,
1567 &self.inner.geoip_lookup,
1568 )
1569 .await;
1570 });
1571
1572 self.enforce_quotas(
1573 managed_envelope,
1574 Annotated::empty(),
1575 &mut extracted_metrics,
1576 ctx,
1577 )
1578 .await?;
1579
1580 Ok(Some(extracted_metrics))
1581 }
1582
1583 async fn process_envelope(
1584 &self,
1585 project_id: ProjectId,
1586 message: ProcessEnvelopeGrouped<'_>,
1587 ) -> Result<ProcessingResult, ProcessingError> {
1588 let ProcessEnvelopeGrouped {
1589 group,
1590 envelope: mut managed_envelope,
1591 ctx,
1592 } = message;
1593
1594 if let Some(sampling_state) = ctx.sampling_project_info {
1596 managed_envelope
1599 .envelope_mut()
1600 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1601 }
1602
1603 if let Some(retention) = ctx.project_info.config.event_retention {
1606 managed_envelope.envelope_mut().set_retention(retention);
1607 }
1608
1609 if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1612 managed_envelope
1613 .envelope_mut()
1614 .set_downsampled_retention(retention);
1615 }
1616
1617 managed_envelope
1622 .envelope_mut()
1623 .meta_mut()
1624 .set_project_id(project_id);
1625
1626 macro_rules! run {
1627 ($fn_name:ident $(, $args:expr)*) => {
1628 async {
1629 let mut managed_envelope = (managed_envelope, group).try_into()?;
1630 match self.$fn_name(&mut managed_envelope, $($args),*).await {
1631 Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1632 managed_envelope: managed_envelope.into_processed(),
1633 extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1634 }),
1635 Err(error) => {
1636 relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1637 if let Some(outcome) = error.to_outcome() {
1638 managed_envelope.reject(outcome);
1639 }
1640
1641 return Err(error);
1642 }
1643 }
1644 }.await
1645 };
1646 }
1647
1648 relay_log::trace!("Processing {group} group", group = group.variant());
1649
1650 match group {
1651 ProcessingGroup::Error => {
1652 if ctx.project_info.has_feature(Feature::NewErrorProcessing) {
1653 self.process_with_processor(
1654 &self.inner.processing.errors,
1655 managed_envelope,
1656 ctx,
1657 )
1658 .await
1659 } else {
1660 run!(process_errors, project_id, ctx)
1661 }
1662 }
1663 ProcessingGroup::Transaction => {
1664 self.process_with_processor(
1665 &self.inner.processing.transactions,
1666 managed_envelope,
1667 ctx,
1668 )
1669 .await
1670 }
1671 ProcessingGroup::Session => {
1672 self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1673 .await
1674 }
1675 ProcessingGroup::Standalone => run!(process_standalone, ctx),
1676 ProcessingGroup::StandaloneAttachments => {
1677 self.process_with_processor(
1678 &self.inner.processing.attachments,
1679 managed_envelope,
1680 ctx,
1681 )
1682 .await
1683 }
1684 ProcessingGroup::StandaloneUserReports => {
1685 self.process_with_processor(
1686 &self.inner.processing.user_reports,
1687 managed_envelope,
1688 ctx,
1689 )
1690 .await
1691 }
1692 ProcessingGroup::ClientReport => {
1693 self.process_with_processor(
1694 &self.inner.processing.client_reports,
1695 managed_envelope,
1696 ctx,
1697 )
1698 .await
1699 }
1700 ProcessingGroup::Replay => {
1701 self.process_with_processor(&self.inner.processing.replays, managed_envelope, ctx)
1702 .await
1703 }
1704 ProcessingGroup::CheckIn => {
1705 self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1706 .await
1707 }
1708 ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1709 ProcessingGroup::Log => {
1710 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1711 .await
1712 }
1713 ProcessingGroup::TraceMetric => {
1714 self.process_with_processor(
1715 &self.inner.processing.trace_metrics,
1716 managed_envelope,
1717 ctx,
1718 )
1719 .await
1720 }
1721 ProcessingGroup::SpanV2 => {
1722 self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1723 .await
1724 }
1725 ProcessingGroup::TraceAttachment => {
1726 self.process_with_processor(
1727 &self.inner.processing.trace_attachments,
1728 managed_envelope,
1729 ctx,
1730 )
1731 .await
1732 }
1733 ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1734 ProcessingGroup::ProfileChunk => {
1735 self.process_with_processor(
1736 &self.inner.processing.profile_chunks,
1737 managed_envelope,
1738 ctx,
1739 )
1740 .await
1741 }
1742 ProcessingGroup::Metrics => {
1744 if self.inner.config.relay_mode() != RelayMode::Proxy {
1747 relay_log::error!(
1748 tags.project = %project_id,
1749 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1750 "received metrics in the process_state"
1751 );
1752 }
1753
1754 Ok(ProcessingResult::no_metrics(
1755 managed_envelope.into_processed(),
1756 ))
1757 }
1758 ProcessingGroup::Ungrouped => {
1760 relay_log::error!(
1761 tags.project = %project_id,
1762 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1763 "could not identify the processing group based on the envelope's items"
1764 );
1765
1766 Ok(ProcessingResult::no_metrics(
1767 managed_envelope.into_processed(),
1768 ))
1769 }
1770 ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
1774 managed_envelope.into_processed(),
1775 )),
1776 }
1777 }
1778
1779 async fn process<'a>(
1785 &self,
1786 mut message: ProcessEnvelopeGrouped<'a>,
1787 ) -> Result<Option<Submit<'a>>, ProcessingError> {
1788 let ProcessEnvelopeGrouped {
1789 ref mut envelope,
1790 ctx,
1791 ..
1792 } = message;
1793
1794 let Some(project_id) = ctx
1801 .project_info
1802 .project_id
1803 .or_else(|| envelope.envelope().meta().project_id())
1804 else {
1805 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1806 return Err(ProcessingError::MissingProjectId);
1807 };
1808
1809 let client = envelope.envelope().meta().client().map(str::to_owned);
1810 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1811 let project_key = envelope.envelope().meta().public_key();
1812 let sampling_key = envelope
1816 .envelope()
1817 .sampling_key()
1818 .filter(|_| ctx.sampling_project_info.is_some());
1819
1820 relay_log::configure_scope(|scope| {
1823 scope.set_tag("project", project_id);
1824 if let Some(client) = client {
1825 scope.set_tag("sdk", client);
1826 }
1827 if let Some(user_agent) = user_agent {
1828 scope.set_extra("user_agent", user_agent.into());
1829 }
1830 });
1831
1832 let result = match self.process_envelope(project_id, message).await {
1833 Ok(ProcessingResult::Envelope {
1834 mut managed_envelope,
1835 extracted_metrics,
1836 }) => {
1837 managed_envelope.update();
1840
1841 let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
1842 send_metrics(
1843 extracted_metrics.metrics,
1844 project_key,
1845 sampling_key,
1846 &self.inner.addrs.aggregator,
1847 );
1848
1849 let envelope_response = if managed_envelope.envelope().is_empty() {
1850 if !has_metrics {
1851 managed_envelope.reject(Outcome::RateLimited(None));
1853 } else {
1854 managed_envelope.accept();
1855 }
1856
1857 None
1858 } else {
1859 Some(managed_envelope)
1860 };
1861
1862 Ok(envelope_response.map(Submit::Envelope))
1863 }
1864 Ok(ProcessingResult::Output(Output { main, metrics })) => {
1865 if let Some(metrics) = metrics {
1866 metrics.accept(|metrics| {
1867 send_metrics(
1868 metrics,
1869 project_key,
1870 sampling_key,
1871 &self.inner.addrs.aggregator,
1872 );
1873 });
1874 }
1875
1876 let ctx = ctx.to_forward();
1877 Ok(main.map(|output| Submit::Output { output, ctx }))
1878 }
1879 Err(err) => Err(err),
1880 };
1881
1882 relay_log::configure_scope(|scope| {
1883 scope.remove_tag("project");
1884 scope.remove_tag("sdk");
1885 scope.remove_tag("user_agent");
1886 });
1887
1888 result
1889 }
1890
1891 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1892 let project_key = message.envelope.envelope().meta().public_key();
1893 let wait_time = message.envelope.age();
1894 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1895
1896 cogs.cancel();
1899
1900 let scoping = message.envelope.scoping();
1901 for (group, envelope) in ProcessingGroup::split_envelope(
1902 *message.envelope.into_envelope(),
1903 &message.project_info,
1904 ) {
1905 let mut cogs = self
1906 .inner
1907 .cogs
1908 .timed(ResourceId::Relay, AppFeature::from(group));
1909
1910 let mut envelope =
1911 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1912 envelope.scope(scoping);
1913
1914 let global_config = self.inner.global_config.current();
1915
1916 let ctx = processing::Context {
1917 config: &self.inner.config,
1918 global_config: &global_config,
1919 project_info: &message.project_info,
1920 sampling_project_info: message.sampling_project_info.as_deref(),
1921 rate_limits: &message.rate_limits,
1922 reservoir_counters: &message.reservoir_counters,
1923 };
1924
1925 let message = ProcessEnvelopeGrouped {
1926 group,
1927 envelope,
1928 ctx,
1929 };
1930
1931 let result = metric!(
1932 timer(RelayTimers::EnvelopeProcessingTime),
1933 group = group.variant(),
1934 { self.process(message).await }
1935 );
1936
1937 match result {
1938 Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
1939 Ok(None) => {}
1940 Err(error) if error.is_unexpected() => {
1941 relay_log::error!(
1942 tags.project_key = %project_key,
1943 error = &error as &dyn Error,
1944 "error processing envelope"
1945 )
1946 }
1947 Err(error) => {
1948 relay_log::debug!(
1949 tags.project_key = %project_key,
1950 error = &error as &dyn Error,
1951 "error processing envelope"
1952 )
1953 }
1954 }
1955 }
1956 }
1957
1958 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1959 let ProcessMetrics {
1960 data,
1961 project_key,
1962 received_at,
1963 sent_at,
1964 source,
1965 } = message;
1966
1967 let received_timestamp =
1968 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1969
1970 let mut buckets = data.into_buckets(received_timestamp);
1971 if buckets.is_empty() {
1972 return;
1973 };
1974 cogs.update(relay_metrics::cogs::BySize(&buckets));
1975
1976 let clock_drift_processor =
1977 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
1978
1979 buckets.retain_mut(|bucket| {
1980 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
1981 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
1982 return false;
1983 }
1984
1985 if !self::metrics::is_valid_namespace(bucket) {
1986 return false;
1987 }
1988
1989 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
1990
1991 if !matches!(source, BucketSource::Internal) {
1992 bucket.metadata = BucketMetadata::new(received_timestamp);
1993 }
1994
1995 true
1996 });
1997
1998 let project = self.inner.project_cache.get(project_key);
1999
2000 let buckets = match project.state() {
2003 ProjectState::Enabled(project_info) => {
2004 let rate_limits = project.rate_limits().current_limits();
2005 self.check_buckets(project_key, project_info, &rate_limits, buckets)
2006 }
2007 _ => buckets,
2008 };
2009
2010 relay_log::trace!("merging metric buckets into the aggregator");
2011 self.inner
2012 .addrs
2013 .aggregator
2014 .send(MergeBuckets::new(project_key, buckets));
2015 }
2016
2017 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2018 let ProcessBatchedMetrics {
2019 payload,
2020 source,
2021 received_at,
2022 sent_at,
2023 } = message;
2024
2025 #[derive(serde::Deserialize)]
2026 struct Wrapper {
2027 buckets: HashMap<ProjectKey, Vec<Bucket>>,
2028 }
2029
2030 let buckets = match serde_json::from_slice(&payload) {
2031 Ok(Wrapper { buckets }) => buckets,
2032 Err(error) => {
2033 relay_log::debug!(
2034 error = &error as &dyn Error,
2035 "failed to parse batched metrics",
2036 );
2037 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2038 return;
2039 }
2040 };
2041
2042 for (project_key, buckets) in buckets {
2043 self.handle_process_metrics(
2044 cogs,
2045 ProcessMetrics {
2046 data: MetricData::Parsed(buckets),
2047 project_key,
2048 source,
2049 received_at,
2050 sent_at,
2051 },
2052 )
2053 }
2054 }
2055
2056 fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2057 let _submit = cogs.start_category("submit");
2058
2059 #[cfg(feature = "processing")]
2060 if self.inner.config.processing_enabled()
2061 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2062 {
2063 use crate::processing::StoreHandle;
2064
2065 let objectstore = self.inner.addrs.objectstore.as_ref();
2066 let global_config = &self.inner.global_config.current();
2067 let handle = StoreHandle::new(store_forwarder, objectstore, global_config);
2068
2069 match submit {
2070 Submit::Envelope(envelope) => handle.send_envelope(envelope.into_inner()),
2076 Submit::Output { output, ctx } => output
2077 .forward_store(handle, ctx)
2078 .unwrap_or_else(|err| err.into_inner()),
2079 }
2080 return;
2081 }
2082
2083 let mut envelope = match submit {
2084 Submit::Envelope(envelope) => envelope,
2085 Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2086 Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2087 Err(_) => {
2088 relay_log::error!("failed to serialize output to an envelope");
2089 return;
2090 }
2091 },
2092 };
2093
2094 if envelope.envelope_mut().is_empty() {
2095 envelope.accept();
2096 return;
2097 }
2098
2099 envelope.envelope_mut().set_sent_at(Utc::now());
2105
2106 relay_log::trace!("sending envelope to sentry endpoint");
2107 let http_encoding = self.inner.config.http_encoding();
2108 let result = envelope.envelope().to_vec().and_then(|v| {
2109 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2110 });
2111
2112 match result {
2113 Ok(body) => {
2114 self.inner
2115 .addrs
2116 .upstream_relay
2117 .send(SendRequest(SendEnvelope {
2118 envelope,
2119 body,
2120 http_encoding,
2121 project_cache: self.inner.project_cache.clone(),
2122 }));
2123 }
2124 Err(error) => {
2125 relay_log::error!(
2128 error = &error as &dyn Error,
2129 tags.project_key = %envelope.scoping().project_key,
2130 "failed to serialize envelope payload"
2131 );
2132
2133 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2134 }
2135 }
2136 }
2137
2138 fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2139 let SubmitClientReports {
2140 client_reports,
2141 scoping,
2142 } = message;
2143
2144 let upstream = self.inner.config.upstream_descriptor();
2145 let dsn = PartialDsn::outbound(&scoping, upstream);
2146
2147 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2148 for client_report in client_reports {
2149 match client_report.serialize() {
2150 Ok(payload) => {
2151 let mut item = Item::new(ItemType::ClientReport);
2152 item.set_payload(ContentType::Json, payload);
2153 envelope.add_item(item);
2154 }
2155 Err(error) => {
2156 relay_log::error!(
2157 error = &error as &dyn std::error::Error,
2158 "failed to serialize client report"
2159 );
2160 }
2161 }
2162 }
2163
2164 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2165 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2166 }
2167
2168 fn check_buckets(
2169 &self,
2170 project_key: ProjectKey,
2171 project_info: &ProjectInfo,
2172 rate_limits: &RateLimits,
2173 buckets: Vec<Bucket>,
2174 ) -> Vec<Bucket> {
2175 let Some(scoping) = project_info.scoping(project_key) else {
2176 relay_log::error!(
2177 tags.project_key = project_key.as_str(),
2178 "there is no scoping: dropping {} buckets",
2179 buckets.len(),
2180 );
2181 return Vec::new();
2182 };
2183
2184 let mut buckets = self::metrics::apply_project_info(
2185 buckets,
2186 &self.inner.metric_outcomes,
2187 project_info,
2188 scoping,
2189 );
2190
2191 let namespaces: BTreeSet<MetricNamespace> = buckets
2192 .iter()
2193 .filter_map(|bucket| bucket.name.try_namespace())
2194 .collect();
2195
2196 for namespace in namespaces {
2197 let limits = rate_limits.check_with_quotas(
2198 project_info.get_quotas(),
2199 scoping.item(DataCategory::MetricBucket),
2200 );
2201
2202 if limits.is_limited() {
2203 let rejected;
2204 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2205 bucket.name.try_namespace() == Some(namespace)
2206 });
2207
2208 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2209 self.inner.metric_outcomes.track(
2210 scoping,
2211 &rejected,
2212 Outcome::RateLimited(reason_code),
2213 );
2214 }
2215 }
2216
2217 let quotas = project_info.config.quotas.clone();
2218 match MetricsLimiter::create(buckets, quotas, scoping) {
2219 Ok(mut bucket_limiter) => {
2220 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2221 bucket_limiter.into_buckets()
2222 }
2223 Err(buckets) => buckets,
2224 }
2225 }
2226
2227 #[cfg(feature = "processing")]
2228 async fn rate_limit_buckets(
2229 &self,
2230 scoping: Scoping,
2231 project_info: &ProjectInfo,
2232 mut buckets: Vec<Bucket>,
2233 ) -> Vec<Bucket> {
2234 let Some(rate_limiter) = &self.inner.rate_limiter else {
2235 return buckets;
2236 };
2237
2238 let global_config = self.inner.global_config.current();
2239 let namespaces = buckets
2240 .iter()
2241 .filter_map(|bucket| bucket.name.try_namespace())
2242 .counts();
2243
2244 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2245
2246 for (namespace, quantity) in namespaces {
2247 let item_scoping = scoping.metric_bucket(namespace);
2248
2249 let limits = match rate_limiter
2250 .is_rate_limited(quotas, item_scoping, quantity, false)
2251 .await
2252 {
2253 Ok(limits) => limits,
2254 Err(err) => {
2255 relay_log::error!(
2256 error = &err as &dyn std::error::Error,
2257 "failed to check redis rate limits"
2258 );
2259 break;
2260 }
2261 };
2262
2263 if limits.is_limited() {
2264 let rejected;
2265 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2266 bucket.name.try_namespace() == Some(namespace)
2267 });
2268
2269 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2270 self.inner.metric_outcomes.track(
2271 scoping,
2272 &rejected,
2273 Outcome::RateLimited(reason_code),
2274 );
2275
2276 self.inner
2277 .project_cache
2278 .get(item_scoping.scoping.project_key)
2279 .rate_limits()
2280 .merge(limits);
2281 }
2282 }
2283
2284 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2285 Err(buckets) => buckets,
2286 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2287 }
2288 }
2289
2290 #[cfg(feature = "processing")]
2292 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2293 relay_log::trace!("handle_rate_limit_buckets");
2294
2295 let scoping = *bucket_limiter.scoping();
2296
2297 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2298 let global_config = self.inner.global_config.current();
2299 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2300
2301 let over_accept_once = true;
2304 let mut rate_limits = RateLimits::new();
2305
2306 for category in [DataCategory::Transaction, DataCategory::Span] {
2307 let count = bucket_limiter.count(category);
2308
2309 let timer = Instant::now();
2310 let mut is_limited = false;
2311
2312 if let Some(count) = count {
2313 match rate_limiter
2314 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2315 .await
2316 {
2317 Ok(limits) => {
2318 is_limited = limits.is_limited();
2319 rate_limits.merge(limits)
2320 }
2321 Err(e) => {
2322 relay_log::error!(error = &e as &dyn Error, "rate limiting error")
2323 }
2324 }
2325 }
2326
2327 relay_statsd::metric!(
2328 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2329 category = category.name(),
2330 limited = if is_limited { "true" } else { "false" },
2331 count = match count {
2332 None => "none",
2333 Some(0) => "0",
2334 Some(1) => "1",
2335 Some(1..=10) => "10",
2336 Some(1..=25) => "25",
2337 Some(1..=50) => "50",
2338 Some(51..=100) => "100",
2339 Some(101..=500) => "500",
2340 _ => "> 500",
2341 },
2342 );
2343 }
2344
2345 if rate_limits.is_limited() {
2346 let was_enforced =
2347 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2348
2349 if was_enforced {
2350 self.inner
2352 .project_cache
2353 .get(scoping.project_key)
2354 .rate_limits()
2355 .merge(rate_limits);
2356 }
2357 }
2358 }
2359
2360 bucket_limiter.into_buckets()
2361 }
2362
2363 #[cfg(feature = "processing")]
2365 async fn cardinality_limit_buckets(
2366 &self,
2367 scoping: Scoping,
2368 limits: &[CardinalityLimit],
2369 buckets: Vec<Bucket>,
2370 ) -> Vec<Bucket> {
2371 let global_config = self.inner.global_config.current();
2372 let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2373
2374 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2375 return buckets;
2376 }
2377
2378 let Some(ref limiter) = self.inner.cardinality_limiter else {
2379 return buckets;
2380 };
2381
2382 let scope = relay_cardinality::Scoping {
2383 organization_id: scoping.organization_id,
2384 project_id: scoping.project_id,
2385 };
2386
2387 let limits = match limiter
2388 .check_cardinality_limits(scope, limits, buckets)
2389 .await
2390 {
2391 Ok(limits) => limits,
2392 Err((buckets, error)) => {
2393 relay_log::error!(
2394 error = &error as &dyn std::error::Error,
2395 "cardinality limiter failed"
2396 );
2397 return buckets;
2398 }
2399 };
2400
2401 let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2402 if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2403 for limit in limits.exceeded_limits() {
2404 relay_log::with_scope(
2405 |scope| {
2406 scope.set_user(Some(relay_log::sentry::User {
2408 id: Some(scoping.organization_id.to_string()),
2409 ..Default::default()
2410 }));
2411 },
2412 || {
2413 relay_log::error!(
2414 tags.organization_id = scoping.organization_id.value(),
2415 tags.limit_id = limit.id,
2416 tags.passive = limit.passive,
2417 "Cardinality Limit"
2418 );
2419 },
2420 );
2421 }
2422 }
2423
2424 for (limit, reports) in limits.cardinality_reports() {
2425 for report in reports {
2426 self.inner
2427 .metric_outcomes
2428 .cardinality(scoping, limit, report);
2429 }
2430 }
2431
2432 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2433 return limits.into_source();
2434 }
2435
2436 let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2437
2438 for (bucket, exceeded) in rejected {
2439 self.inner.metric_outcomes.track(
2440 scoping,
2441 &[bucket],
2442 Outcome::CardinalityLimited(exceeded.id.clone()),
2443 );
2444 }
2445 accepted
2446 }
2447
2448 #[cfg(feature = "processing")]
2455 async fn encode_metrics_processing(
2456 &self,
2457 message: FlushBuckets,
2458 store_forwarder: &Addr<Store>,
2459 ) {
2460 use crate::constants::DEFAULT_EVENT_RETENTION;
2461 use crate::services::store::StoreMetrics;
2462
2463 for ProjectBuckets {
2464 buckets,
2465 scoping,
2466 project_info,
2467 ..
2468 } in message.buckets.into_values()
2469 {
2470 let buckets = self
2471 .rate_limit_buckets(scoping, &project_info, buckets)
2472 .await;
2473
2474 let limits = project_info.get_cardinality_limits();
2475 let buckets = self
2476 .cardinality_limit_buckets(scoping, limits, buckets)
2477 .await;
2478
2479 if buckets.is_empty() {
2480 continue;
2481 }
2482
2483 let retention = project_info
2484 .config
2485 .event_retention
2486 .unwrap_or(DEFAULT_EVENT_RETENTION);
2487
2488 store_forwarder.send(StoreMetrics {
2491 buckets,
2492 scoping,
2493 retention,
2494 });
2495 }
2496 }
2497
2498 fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2510 let FlushBuckets {
2511 partition_key,
2512 buckets,
2513 } = message;
2514
2515 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2516 let upstream = self.inner.config.upstream_descriptor();
2517
2518 for ProjectBuckets {
2519 buckets, scoping, ..
2520 } in buckets.values()
2521 {
2522 let dsn = PartialDsn::outbound(scoping, upstream);
2523
2524 relay_statsd::metric!(
2525 distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2526 );
2527
2528 let mut num_batches = 0;
2529 for batch in BucketsView::from(buckets).by_size(batch_size) {
2530 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2531
2532 let mut item = Item::new(ItemType::MetricBuckets);
2533 item.set_source_quantities(crate::metrics::extract_quantities(batch));
2534 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2535 envelope.add_item(item);
2536
2537 let mut envelope =
2538 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2539 envelope
2540 .set_partition_key(Some(partition_key))
2541 .scope(*scoping);
2542
2543 relay_statsd::metric!(
2544 distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2545 );
2546
2547 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2548 num_batches += 1;
2549 }
2550
2551 relay_statsd::metric!(
2552 distribution(RelayDistributions::BatchesPerPartition) = num_batches
2553 );
2554 }
2555 }
2556
2557 fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2559 if partition.is_empty() {
2560 return;
2561 }
2562
2563 let (unencoded, project_info) = partition.take();
2564 let http_encoding = self.inner.config.http_encoding();
2565 let encoded = match encode_payload(&unencoded, http_encoding) {
2566 Ok(payload) => payload,
2567 Err(error) => {
2568 let error = &error as &dyn std::error::Error;
2569 relay_log::error!(error, "failed to encode metrics payload");
2570 return;
2571 }
2572 };
2573
2574 let request = SendMetricsRequest {
2575 partition_key: partition_key.to_string(),
2576 unencoded,
2577 encoded,
2578 project_info,
2579 http_encoding,
2580 metric_outcomes: self.inner.metric_outcomes.clone(),
2581 };
2582
2583 self.inner.addrs.upstream_relay.send(SendRequest(request));
2584 }
2585
2586 fn encode_metrics_global(&self, message: FlushBuckets) {
2601 let FlushBuckets {
2602 partition_key,
2603 buckets,
2604 } = message;
2605
2606 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2607 let mut partition = Partition::new(batch_size);
2608 let mut partition_splits = 0;
2609
2610 for ProjectBuckets {
2611 buckets, scoping, ..
2612 } in buckets.values()
2613 {
2614 for bucket in buckets {
2615 let mut remaining = Some(BucketView::new(bucket));
2616
2617 while let Some(bucket) = remaining.take() {
2618 if let Some(next) = partition.insert(bucket, *scoping) {
2619 self.send_global_partition(partition_key, &mut partition);
2623 remaining = Some(next);
2624 partition_splits += 1;
2625 }
2626 }
2627 }
2628 }
2629
2630 if partition_splits > 0 {
2631 metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2632 }
2633
2634 self.send_global_partition(partition_key, &mut partition);
2635 }
2636
2637 async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2638 for (project_key, pb) in message.buckets.iter_mut() {
2639 let buckets = std::mem::take(&mut pb.buckets);
2640 pb.buckets =
2641 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2642 }
2643
2644 #[cfg(feature = "processing")]
2645 if self.inner.config.processing_enabled()
2646 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2647 {
2648 return self
2649 .encode_metrics_processing(message, store_forwarder)
2650 .await;
2651 }
2652
2653 if self.inner.config.http_global_metrics() {
2654 self.encode_metrics_global(message)
2655 } else {
2656 self.encode_metrics_envelope(cogs, message)
2657 }
2658 }
2659
2660 #[cfg(all(test, feature = "processing"))]
2661 fn redis_rate_limiter_enabled(&self) -> bool {
2662 self.inner.rate_limiter.is_some()
2663 }
2664
2665 async fn handle_message(&self, message: EnvelopeProcessor) {
2666 let ty = message.variant();
2667 let feature_weights = self.feature_weights(&message);
2668
2669 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2670 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2671
2672 match message {
2673 EnvelopeProcessor::ProcessEnvelope(m) => {
2674 self.handle_process_envelope(&mut cogs, *m).await
2675 }
2676 EnvelopeProcessor::ProcessProjectMetrics(m) => {
2677 self.handle_process_metrics(&mut cogs, *m)
2678 }
2679 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2680 self.handle_process_batched_metrics(&mut cogs, *m)
2681 }
2682 EnvelopeProcessor::FlushBuckets(m) => {
2683 self.handle_flush_buckets(&mut cogs, *m).await
2684 }
2685 EnvelopeProcessor::SubmitClientReports(m) => {
2686 self.handle_submit_client_reports(&mut cogs, *m)
2687 }
2688 }
2689 });
2690 }
2691
2692 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2693 match message {
2694 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2696 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2697 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2698 EnvelopeProcessor::FlushBuckets(v) => v
2699 .buckets
2700 .values()
2701 .map(|s| {
2702 if self.inner.config.processing_enabled() {
2703 relay_metrics::cogs::ByCount(&s.buckets).into()
2706 } else {
2707 relay_metrics::cogs::BySize(&s.buckets).into()
2708 }
2709 })
2710 .fold(FeatureWeights::none(), FeatureWeights::merge),
2711 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2712 }
2713 }
2714}
2715
2716impl Service for EnvelopeProcessorService {
2717 type Interface = EnvelopeProcessor;
2718
2719 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2720 while let Some(message) = rx.recv().await {
2721 let service = self.clone();
2722 self.inner
2723 .pool
2724 .spawn_async(
2725 async move {
2726 service.handle_message(message).await;
2727 }
2728 .boxed(),
2729 )
2730 .await;
2731 }
2732 }
2733}
2734
2735struct EnforcementResult {
2740 event: Annotated<Event>,
2741 #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2742 rate_limits: RateLimits,
2743}
2744
2745impl EnforcementResult {
2746 pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
2748 Self { event, rate_limits }
2749 }
2750}
2751
2752#[derive(Clone)]
2753enum RateLimiter {
2754 Cached,
2755 #[cfg(feature = "processing")]
2756 Consistent(Arc<RedisRateLimiter>),
2757}
2758
2759impl RateLimiter {
2760 async fn enforce<Group>(
2761 &self,
2762 managed_envelope: &mut TypedEnvelope<Group>,
2763 event: Annotated<Event>,
2764 _extracted_metrics: &mut ProcessingExtractedMetrics,
2765 ctx: processing::Context<'_>,
2766 ) -> Result<EnforcementResult, ProcessingError> {
2767 if managed_envelope.envelope().is_empty() && event.value().is_none() {
2768 return Ok(EnforcementResult::new(event, RateLimits::default()));
2769 }
2770
2771 let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
2772 if quotas.is_empty() {
2773 return Ok(EnforcementResult::new(event, RateLimits::default()));
2774 }
2775
2776 let event_category = event_category(&event);
2777
2778 let this = self.clone();
2784 let mut envelope_limiter =
2785 EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
2786 let this = this.clone();
2787
2788 async move {
2789 match this {
2790 #[cfg(feature = "processing")]
2791 RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
2792 rate_limiter
2793 .is_rate_limited(quotas, item_scope, _quantity, false)
2794 .await?,
2795 ),
2796 _ => Ok::<_, ProcessingError>(
2797 ctx.rate_limits.check_with_quotas(quotas, item_scope),
2798 ),
2799 }
2800 }
2801 });
2802
2803 if let Some(category) = event_category {
2806 envelope_limiter.assume_event(category);
2807 }
2808
2809 let scoping = managed_envelope.scoping();
2810 let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
2811 envelope_limiter
2812 .compute(managed_envelope.envelope_mut(), &scoping)
2813 .await
2814 })?;
2815 let event_active = enforcement.is_event_active();
2816
2817 #[cfg(feature = "processing")]
2821 _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
2822 enforcement.apply_with_outcomes(managed_envelope);
2823
2824 if event_active {
2825 debug_assert!(managed_envelope.envelope().is_empty());
2826 return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
2827 }
2828
2829 Ok(EnforcementResult::new(event, rate_limits))
2830 }
2831
2832 fn name(&self) -> &'static str {
2833 match self {
2834 Self::Cached => "cached",
2835 #[cfg(feature = "processing")]
2836 Self::Consistent(_) => "consistent",
2837 }
2838 }
2839}
2840
2841pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2842 let envelope_body: Vec<u8> = match http_encoding {
2843 HttpEncoding::Identity => return Ok(body.clone()),
2844 HttpEncoding::Deflate => {
2845 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2846 encoder.write_all(body.as_ref())?;
2847 encoder.finish()?
2848 }
2849 HttpEncoding::Gzip => {
2850 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2851 encoder.write_all(body.as_ref())?;
2852 encoder.finish()?
2853 }
2854 HttpEncoding::Br => {
2855 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2857 encoder.write_all(body.as_ref())?;
2858 encoder.into_inner()
2859 }
2860 HttpEncoding::Zstd => {
2861 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2864 encoder.write_all(body.as_ref())?;
2865 encoder.finish()?
2866 }
2867 };
2868
2869 Ok(envelope_body.into())
2870}
2871
2872#[derive(Debug)]
2874pub struct SendEnvelope {
2875 pub envelope: TypedEnvelope<Processed>,
2876 pub body: Bytes,
2877 pub http_encoding: HttpEncoding,
2878 pub project_cache: ProjectCacheHandle,
2879}
2880
2881impl UpstreamRequest for SendEnvelope {
2882 fn method(&self) -> reqwest::Method {
2883 reqwest::Method::POST
2884 }
2885
2886 fn path(&self) -> Cow<'_, str> {
2887 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2888 }
2889
2890 fn route(&self) -> &'static str {
2891 "envelope"
2892 }
2893
2894 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2895 let envelope_body = self.body.clone();
2896 metric!(
2897 distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2898 );
2899
2900 let meta = &self.envelope.meta();
2901 let shard = self.envelope.partition_key().map(|p| p.to_string());
2902 builder
2903 .content_encoding(self.http_encoding)
2904 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2905 .header_opt("User-Agent", meta.user_agent())
2906 .header("X-Sentry-Auth", meta.auth_header())
2907 .header("X-Forwarded-For", meta.forwarded_for())
2908 .header("Content-Type", envelope::CONTENT_TYPE)
2909 .header_opt("X-Sentry-Relay-Shard", shard)
2910 .body(envelope_body);
2911
2912 Ok(())
2913 }
2914
2915 fn sign(&mut self) -> Option<Sign> {
2916 Some(Sign::Optional(SignatureType::RequestSign))
2917 }
2918
2919 fn respond(
2920 self: Box<Self>,
2921 result: Result<http::Response, UpstreamRequestError>,
2922 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2923 Box::pin(async move {
2924 let result = match result {
2925 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2926 Err(error) => Err(error),
2927 };
2928
2929 match result {
2930 Ok(()) => self.envelope.accept(),
2931 Err(error) if error.is_received() => {
2932 let scoping = self.envelope.scoping();
2933 self.envelope.accept();
2934
2935 if let UpstreamRequestError::RateLimited(limits) = error {
2936 self.project_cache
2937 .get(scoping.project_key)
2938 .rate_limits()
2939 .merge(limits.scope(&scoping));
2940 }
2941 }
2942 Err(error) => {
2943 let mut envelope = self.envelope;
2946 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2947 relay_log::error!(
2948 error = &error as &dyn Error,
2949 tags.project_key = %envelope.scoping().project_key,
2950 "error sending envelope"
2951 );
2952 }
2953 }
2954 })
2955 }
2956}
2957
2958#[derive(Debug)]
2965struct Partition<'a> {
2966 max_size: usize,
2967 remaining: usize,
2968 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2969 project_info: HashMap<ProjectKey, Scoping>,
2970}
2971
2972impl<'a> Partition<'a> {
2973 pub fn new(size: usize) -> Self {
2975 Self {
2976 max_size: size,
2977 remaining: size,
2978 views: HashMap::new(),
2979 project_info: HashMap::new(),
2980 }
2981 }
2982
2983 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
2994 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
2995
2996 if let Some(current) = current {
2997 self.remaining = self.remaining.saturating_sub(current.estimated_size());
2998 self.views
2999 .entry(scoping.project_key)
3000 .or_default()
3001 .push(current);
3002
3003 self.project_info
3004 .entry(scoping.project_key)
3005 .or_insert(scoping);
3006 }
3007
3008 next
3009 }
3010
3011 fn is_empty(&self) -> bool {
3013 self.views.is_empty()
3014 }
3015
3016 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3020 #[derive(serde::Serialize)]
3021 struct Wrapper<'a> {
3022 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3023 }
3024
3025 let buckets = &self.views;
3026 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3027
3028 let scopings = self.project_info.clone();
3029 self.project_info.clear();
3030
3031 self.views.clear();
3032 self.remaining = self.max_size;
3033
3034 (payload, scopings)
3035 }
3036}
3037
3038#[derive(Debug)]
3042struct SendMetricsRequest {
3043 partition_key: String,
3045 unencoded: Bytes,
3047 encoded: Bytes,
3049 project_info: HashMap<ProjectKey, Scoping>,
3053 http_encoding: HttpEncoding,
3055 metric_outcomes: MetricOutcomes,
3057}
3058
3059impl SendMetricsRequest {
3060 fn create_error_outcomes(self) {
3061 #[derive(serde::Deserialize)]
3062 struct Wrapper {
3063 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3064 }
3065
3066 let buckets = match serde_json::from_slice(&self.unencoded) {
3067 Ok(Wrapper { buckets }) => buckets,
3068 Err(err) => {
3069 relay_log::error!(
3070 error = &err as &dyn std::error::Error,
3071 "failed to parse buckets from failed transmission"
3072 );
3073 return;
3074 }
3075 };
3076
3077 for (key, buckets) in buckets {
3078 let Some(&scoping) = self.project_info.get(&key) else {
3079 relay_log::error!("missing scoping for project key");
3080 continue;
3081 };
3082
3083 self.metric_outcomes.track(
3084 scoping,
3085 &buckets,
3086 Outcome::Invalid(DiscardReason::Internal),
3087 );
3088 }
3089 }
3090}
3091
3092impl UpstreamRequest for SendMetricsRequest {
3093 fn set_relay_id(&self) -> bool {
3094 true
3095 }
3096
3097 fn sign(&mut self) -> Option<Sign> {
3098 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3099 }
3100
3101 fn method(&self) -> reqwest::Method {
3102 reqwest::Method::POST
3103 }
3104
3105 fn path(&self) -> Cow<'_, str> {
3106 "/api/0/relays/metrics/".into()
3107 }
3108
3109 fn route(&self) -> &'static str {
3110 "global_metrics"
3111 }
3112
3113 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3114 metric!(
3115 distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3116 );
3117
3118 builder
3119 .content_encoding(self.http_encoding)
3120 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3121 .header(header::CONTENT_TYPE, b"application/json")
3122 .body(self.encoded.clone());
3123
3124 Ok(())
3125 }
3126
3127 fn respond(
3128 self: Box<Self>,
3129 result: Result<http::Response, UpstreamRequestError>,
3130 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3131 Box::pin(async {
3132 match result {
3133 Ok(mut response) => {
3134 response.consume().await.ok();
3135 }
3136 Err(error) => {
3137 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3138
3139 if error.is_received() {
3142 return;
3143 }
3144
3145 self.create_error_outcomes()
3146 }
3147 }
3148 })
3149 }
3150}
3151
3152#[derive(Copy, Clone, Debug)]
3154struct CombinedQuotas<'a> {
3155 global_quotas: &'a [Quota],
3156 project_quotas: &'a [Quota],
3157}
3158
3159impl<'a> CombinedQuotas<'a> {
3160 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3162 Self {
3163 global_quotas: &global_config.quotas,
3164 project_quotas,
3165 }
3166 }
3167
3168 pub fn is_empty(&self) -> bool {
3170 self.len() == 0
3171 }
3172
3173 pub fn len(&self) -> usize {
3175 self.global_quotas.len() + self.project_quotas.len()
3176 }
3177}
3178
3179impl<'a> IntoIterator for CombinedQuotas<'a> {
3180 type Item = &'a Quota;
3181 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3182
3183 fn into_iter(self) -> Self::IntoIter {
3184 self.global_quotas.iter().chain(self.project_quotas.iter())
3185 }
3186}
3187
3188#[cfg(test)]
3189mod tests {
3190 use insta::assert_debug_snapshot;
3191 use relay_common::glob2::LazyGlob;
3192 use relay_dynamic_config::ProjectConfig;
3193 use relay_event_normalization::{
3194 NormalizationConfig, RedactionRule, TransactionNameConfig, TransactionNameRule,
3195 };
3196 use relay_event_schema::protocol::TransactionSource;
3197 use relay_pii::DataScrubbingConfig;
3198 use similar_asserts::assert_eq;
3199
3200 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3201
3202 #[cfg(feature = "processing")]
3203 use {
3204 relay_metrics::BucketValue,
3205 relay_quotas::{QuotaScope, ReasonCode},
3206 relay_test::mock_service,
3207 };
3208
3209 use super::*;
3210
3211 #[cfg(feature = "processing")]
3212 fn mock_quota(id: &str) -> Quota {
3213 Quota {
3214 id: Some(id.into()),
3215 categories: [DataCategory::MetricBucket].into(),
3216 scope: QuotaScope::Organization,
3217 scope_id: None,
3218 limit: Some(0),
3219 window: None,
3220 reason_code: None,
3221 namespace: None,
3222 }
3223 }
3224
3225 #[cfg(feature = "processing")]
3226 #[test]
3227 fn test_dynamic_quotas() {
3228 let global_config = GlobalConfig {
3229 quotas: vec![mock_quota("foo"), mock_quota("bar")],
3230 ..Default::default()
3231 };
3232
3233 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3234
3235 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3236
3237 assert_eq!(dynamic_quotas.len(), 4);
3238 assert!(!dynamic_quotas.is_empty());
3239
3240 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3241 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3242 }
3243
3244 #[cfg(feature = "processing")]
3247 #[tokio::test]
3248 async fn test_ratelimit_per_batch() {
3249 use relay_base_schema::organization::OrganizationId;
3250 use relay_protocol::FiniteF64;
3251
3252 let rate_limited_org = Scoping {
3253 organization_id: OrganizationId::new(1),
3254 project_id: ProjectId::new(21),
3255 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3256 key_id: Some(17),
3257 };
3258
3259 let not_rate_limited_org = Scoping {
3260 organization_id: OrganizationId::new(2),
3261 project_id: ProjectId::new(21),
3262 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3263 key_id: Some(17),
3264 };
3265
3266 let message = {
3267 let project_info = {
3268 let quota = Quota {
3269 id: Some("testing".into()),
3270 categories: [DataCategory::MetricBucket].into(),
3271 scope: relay_quotas::QuotaScope::Organization,
3272 scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3273 limit: Some(0),
3274 window: None,
3275 reason_code: Some(ReasonCode::new("test")),
3276 namespace: None,
3277 };
3278
3279 let mut config = ProjectConfig::default();
3280 config.quotas.push(quota);
3281
3282 Arc::new(ProjectInfo {
3283 config,
3284 ..Default::default()
3285 })
3286 };
3287
3288 let project_metrics = |scoping| ProjectBuckets {
3289 buckets: vec![Bucket {
3290 name: "d:transactions/bar".into(),
3291 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3292 timestamp: UnixTimestamp::now(),
3293 tags: Default::default(),
3294 width: 10,
3295 metadata: BucketMetadata::default(),
3296 }],
3297 rate_limits: Default::default(),
3298 project_info: project_info.clone(),
3299 scoping,
3300 };
3301
3302 let buckets = hashbrown::HashMap::from([
3303 (
3304 rate_limited_org.project_key,
3305 project_metrics(rate_limited_org),
3306 ),
3307 (
3308 not_rate_limited_org.project_key,
3309 project_metrics(not_rate_limited_org),
3310 ),
3311 ]);
3312
3313 FlushBuckets {
3314 partition_key: 0,
3315 buckets,
3316 }
3317 };
3318
3319 assert_eq!(message.buckets.keys().count(), 2);
3321
3322 let config = {
3323 let config_json = serde_json::json!({
3324 "processing": {
3325 "enabled": true,
3326 "kafka_config": [],
3327 "redis": {
3328 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3329 }
3330 }
3331 });
3332 Config::from_json_value(config_json).unwrap()
3333 };
3334
3335 let (store, handle) = {
3336 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3337 let org_id = match msg {
3338 Store::Metrics(x) => x.scoping.organization_id,
3339 _ => panic!("received envelope when expecting only metrics"),
3340 };
3341 org_ids.push(org_id);
3342 };
3343
3344 mock_service("store_forwarder", vec![], f)
3345 };
3346
3347 let processor = create_test_processor(config).await;
3348 assert!(processor.redis_rate_limiter_enabled());
3349
3350 processor.encode_metrics_processing(message, &store).await;
3351
3352 drop(store);
3353 let orgs_not_ratelimited = handle.await.unwrap();
3354
3355 assert_eq!(
3356 orgs_not_ratelimited,
3357 vec![not_rate_limited_org.organization_id]
3358 );
3359 }
3360
3361 #[tokio::test]
3362 async fn test_browser_version_extraction_with_pii_like_data() {
3363 let processor = create_test_processor(Default::default()).await;
3364 let outcome_aggregator = Addr::dummy();
3365 let event_id = EventId::new();
3366
3367 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3368 .parse()
3369 .unwrap();
3370
3371 let request_meta = RequestMeta::new(dsn);
3372 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3373
3374 envelope.add_item({
3375 let mut item = Item::new(ItemType::Event);
3376 item.set_payload(
3377 ContentType::Json,
3378 r#"
3379 {
3380 "request": {
3381 "headers": [
3382 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3383 ]
3384 }
3385 }
3386 "#,
3387 );
3388 item
3389 });
3390
3391 let mut datascrubbing_settings = DataScrubbingConfig::default();
3392 datascrubbing_settings.scrub_data = true;
3394 datascrubbing_settings.scrub_defaults = true;
3395 datascrubbing_settings.scrub_ip_addresses = true;
3396
3397 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3399
3400 let config = ProjectConfig {
3401 datascrubbing_settings,
3402 pii_config: Some(pii_config),
3403 ..Default::default()
3404 };
3405
3406 let project_info = ProjectInfo {
3407 config,
3408 ..Default::default()
3409 };
3410
3411 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3412 assert_eq!(envelopes.len(), 1);
3413
3414 let (group, envelope) = envelopes.pop().unwrap();
3415 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3416
3417 let message = ProcessEnvelopeGrouped {
3418 group,
3419 envelope,
3420 ctx: processing::Context {
3421 project_info: &project_info,
3422 ..processing::Context::for_test()
3423 },
3424 };
3425
3426 let Ok(Some(Submit::Envelope(mut new_envelope))) = processor.process(message).await else {
3427 panic!();
3428 };
3429 let new_envelope = new_envelope.envelope_mut();
3430
3431 let event_item = new_envelope.items().last().unwrap();
3432 let annotated_event: Annotated<Event> =
3433 Annotated::from_json_bytes(&event_item.payload()).unwrap();
3434 let event = annotated_event.into_value().unwrap();
3435 let headers = event
3436 .request
3437 .into_value()
3438 .unwrap()
3439 .headers
3440 .into_value()
3441 .unwrap();
3442
3443 assert_eq!(
3445 Some(
3446 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3447 ),
3448 headers.get_header("User-Agent")
3449 );
3450 let contexts = event.contexts.into_value().unwrap();
3452 let browser = contexts.0.get("browser").unwrap();
3453 assert_eq!(
3454 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3455 browser.to_json().unwrap()
3456 );
3457 }
3458
3459 #[tokio::test]
3460 #[cfg(feature = "processing")]
3461 async fn test_materialize_dsc() {
3462 use crate::services::projects::project::PublicKeyConfig;
3463
3464 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3465 .parse()
3466 .unwrap();
3467 let request_meta = RequestMeta::new(dsn);
3468 let mut envelope = Envelope::from_request(None, request_meta);
3469
3470 let dsc = r#"{
3471 "trace_id": "00000000-0000-0000-0000-000000000000",
3472 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3473 "sample_rate": "0.2"
3474 }"#;
3475 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3476
3477 let mut item = Item::new(ItemType::Event);
3478 item.set_payload(ContentType::Json, r#"{}"#);
3479 envelope.add_item(item);
3480
3481 let outcome_aggregator = Addr::dummy();
3482 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3483
3484 let mut project_info = ProjectInfo::default();
3485 project_info.public_keys.push(PublicKeyConfig {
3486 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3487 numeric_id: Some(1),
3488 });
3489
3490 let config = serde_json::json!({
3491 "processing": {
3492 "enabled": true,
3493 "kafka_config": [],
3494 }
3495 });
3496
3497 let message = ProcessEnvelopeGrouped {
3498 group: ProcessingGroup::Error,
3499 envelope: managed_envelope,
3500 ctx: processing::Context {
3501 config: &Config::from_json_value(config.clone()).unwrap(),
3502 project_info: &project_info,
3503 sampling_project_info: Some(&project_info),
3504 ..processing::Context::for_test()
3505 },
3506 };
3507
3508 let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3509 let Ok(Some(Submit::Envelope(envelope))) = processor.process(message).await else {
3510 panic!();
3511 };
3512 let event = envelope
3513 .envelope()
3514 .get_item_by(|item| item.ty() == &ItemType::Event)
3515 .unwrap();
3516
3517 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3518 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3519 Object(
3520 {
3521 "environment": ~,
3522 "public_key": String(
3523 "e12d836b15bb49d7bbf99e64295d995b",
3524 ),
3525 "release": ~,
3526 "replay_id": ~,
3527 "sample_rate": String(
3528 "0.2",
3529 ),
3530 "trace_id": String(
3531 "00000000000000000000000000000000",
3532 ),
3533 "transaction": ~,
3534 },
3535 )
3536 "###);
3537 }
3538
3539 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3540 let mut event = Annotated::<Event>::from_json(
3541 r#"
3542 {
3543 "type": "transaction",
3544 "transaction": "/foo/",
3545 "timestamp": 946684810.0,
3546 "start_timestamp": 946684800.0,
3547 "contexts": {
3548 "trace": {
3549 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3550 "span_id": "fa90fdead5f74053",
3551 "op": "http.server",
3552 "type": "trace"
3553 }
3554 },
3555 "transaction_info": {
3556 "source": "url"
3557 }
3558 }
3559 "#,
3560 )
3561 .unwrap();
3562 let e = event.value_mut().as_mut().unwrap();
3563 e.transaction.set_value(Some(transaction_name.into()));
3564
3565 e.transaction_info
3566 .value_mut()
3567 .as_mut()
3568 .unwrap()
3569 .source
3570 .set_value(Some(source));
3571
3572 relay_statsd::with_capturing_test_client(|| {
3573 utils::log_transaction_name_metrics(&mut event, |event| {
3574 let config = NormalizationConfig {
3575 transaction_name_config: TransactionNameConfig {
3576 rules: &[TransactionNameRule {
3577 pattern: LazyGlob::new("/foo/*/**".to_owned()),
3578 expiry: DateTime::<Utc>::MAX_UTC,
3579 redaction: RedactionRule::Replace {
3580 substitution: "*".to_owned(),
3581 },
3582 }],
3583 },
3584 ..Default::default()
3585 };
3586 relay_event_normalization::normalize_event(event, &config)
3587 });
3588 })
3589 }
3590
3591 #[test]
3592 fn test_log_transaction_metrics_none() {
3593 let captures = capture_test_event("/nothing", TransactionSource::Url);
3594 insta::assert_debug_snapshot!(captures, @r###"
3595 [
3596 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3597 ]
3598 "###);
3599 }
3600
3601 #[test]
3602 fn test_log_transaction_metrics_rule() {
3603 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3604 insta::assert_debug_snapshot!(captures, @r###"
3605 [
3606 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3607 ]
3608 "###);
3609 }
3610
3611 #[test]
3612 fn test_log_transaction_metrics_pattern() {
3613 let captures = capture_test_event("/something/12345", TransactionSource::Url);
3614 insta::assert_debug_snapshot!(captures, @r###"
3615 [
3616 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3617 ]
3618 "###);
3619 }
3620
3621 #[test]
3622 fn test_log_transaction_metrics_both() {
3623 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3624 insta::assert_debug_snapshot!(captures, @r###"
3625 [
3626 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3627 ]
3628 "###);
3629 }
3630
3631 #[test]
3632 fn test_log_transaction_metrics_no_match() {
3633 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3634 insta::assert_debug_snapshot!(captures, @r###"
3635 [
3636 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3637 ]
3638 "###);
3639 }
3640
3641 #[tokio::test]
3642 async fn test_process_metrics_bucket_metadata() {
3643 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3644 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3645 let received_at = Utc::now();
3646 let config = Config::default();
3647
3648 let (aggregator, mut aggregator_rx) = Addr::custom();
3649 let processor = create_test_processor_with_addrs(
3650 config,
3651 Addrs {
3652 aggregator,
3653 ..Default::default()
3654 },
3655 )
3656 .await;
3657
3658 let mut item = Item::new(ItemType::Statsd);
3659 item.set_payload(
3660 ContentType::Text,
3661 "transactions/foo:3182887624:4267882815|s",
3662 );
3663 for (source, expected_received_at) in [
3664 (
3665 BucketSource::External,
3666 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3667 ),
3668 (BucketSource::Internal, None),
3669 ] {
3670 let message = ProcessMetrics {
3671 data: MetricData::Raw(vec![item.clone()]),
3672 project_key,
3673 source,
3674 received_at,
3675 sent_at: Some(Utc::now()),
3676 };
3677 processor.handle_process_metrics(&mut token, message);
3678
3679 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3680 let buckets = merge_buckets.buckets;
3681 assert_eq!(buckets.len(), 1);
3682 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3683 }
3684 }
3685
3686 #[tokio::test]
3687 async fn test_process_batched_metrics() {
3688 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3689 let received_at = Utc::now();
3690 let config = Config::default();
3691
3692 let (aggregator, mut aggregator_rx) = Addr::custom();
3693 let processor = create_test_processor_with_addrs(
3694 config,
3695 Addrs {
3696 aggregator,
3697 ..Default::default()
3698 },
3699 )
3700 .await;
3701
3702 let payload = r#"{
3703 "buckets": {
3704 "11111111111111111111111111111111": [
3705 {
3706 "timestamp": 1615889440,
3707 "width": 0,
3708 "name": "d:custom/endpoint.response_time@millisecond",
3709 "type": "d",
3710 "value": [
3711 68.0
3712 ],
3713 "tags": {
3714 "route": "user_index"
3715 }
3716 }
3717 ],
3718 "22222222222222222222222222222222": [
3719 {
3720 "timestamp": 1615889440,
3721 "width": 0,
3722 "name": "d:custom/endpoint.cache_rate@none",
3723 "type": "d",
3724 "value": [
3725 36.0
3726 ]
3727 }
3728 ]
3729 }
3730}
3731"#;
3732 let message = ProcessBatchedMetrics {
3733 payload: Bytes::from(payload),
3734 source: BucketSource::Internal,
3735 received_at,
3736 sent_at: Some(Utc::now()),
3737 };
3738 processor.handle_process_batched_metrics(&mut token, message);
3739
3740 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3741 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3742
3743 let mut messages = vec![mb1, mb2];
3744 messages.sort_by_key(|mb| mb.project_key);
3745
3746 let actual = messages
3747 .into_iter()
3748 .map(|mb| (mb.project_key, mb.buckets))
3749 .collect::<Vec<_>>();
3750
3751 assert_debug_snapshot!(actual, @r###"
3752 [
3753 (
3754 ProjectKey("11111111111111111111111111111111"),
3755 [
3756 Bucket {
3757 timestamp: UnixTimestamp(1615889440),
3758 width: 0,
3759 name: MetricName(
3760 "d:custom/endpoint.response_time@millisecond",
3761 ),
3762 value: Distribution(
3763 [
3764 68.0,
3765 ],
3766 ),
3767 tags: {
3768 "route": "user_index",
3769 },
3770 metadata: BucketMetadata {
3771 merges: 1,
3772 received_at: None,
3773 extracted_from_indexed: false,
3774 },
3775 },
3776 ],
3777 ),
3778 (
3779 ProjectKey("22222222222222222222222222222222"),
3780 [
3781 Bucket {
3782 timestamp: UnixTimestamp(1615889440),
3783 width: 0,
3784 name: MetricName(
3785 "d:custom/endpoint.cache_rate@none",
3786 ),
3787 value: Distribution(
3788 [
3789 36.0,
3790 ],
3791 ),
3792 tags: {},
3793 metadata: BucketMetadata {
3794 merges: 1,
3795 received_at: None,
3796 extracted_from_indexed: false,
3797 },
3798 },
3799 ],
3800 ),
3801 ]
3802 "###);
3803 }
3804}