1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{
27 ClientReport, Event, EventId, Metrics, NetworkReportError, SpanV2,
28};
29use relay_filter::FilterStatKey;
30use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
31use relay_pii::PiiConfigError;
32use relay_protocol::Annotated;
33use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
34use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision};
35use relay_statsd::metric;
36use relay_system::{Addr, FromMessage, NoResponse, Service};
37use reqwest::header;
38use smallvec::{SmallVec, smallvec};
39use zstd::stream::Encoder as ZstdEncoder;
40
41use crate::envelope::{
42 self, AttachmentType, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType,
43};
44use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
45use crate::integrations::Integration;
46use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
47use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
48use crate::metrics_extraction::transactions::ExtractedMetrics;
49use crate::metrics_extraction::transactions::types::ExtractMetricsError;
50use crate::processing::check_ins::CheckInsProcessor;
51use crate::processing::logs::LogsProcessor;
52use crate::processing::profile_chunks::ProfileChunksProcessor;
53use crate::processing::replays::ReplaysProcessor;
54use crate::processing::sessions::SessionsProcessor;
55use crate::processing::spans::SpansProcessor;
56use crate::processing::trace_attachments::TraceAttachmentsProcessor;
57use crate::processing::trace_metrics::TraceMetricsProcessor;
58use crate::processing::transactions::TransactionProcessor;
59use crate::processing::utils::event::{
60 EventFullyNormalized, EventMetricsExtracted, FiltersStatus, SpansExtracted, event_category,
61 event_type,
62};
63use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
64use crate::service::ServiceError;
65use crate::services::global_config::GlobalConfigHandle;
66use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
67use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
68use crate::services::projects::cache::ProjectCacheHandle;
69use crate::services::projects::project::{ProjectInfo, ProjectState};
70use crate::services::upstream::{
71 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
72};
73use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
74use crate::utils::{self, CheckLimits, EnvelopeLimiter};
75use crate::{http, processing};
76use relay_threading::AsyncPool;
77#[cfg(feature = "processing")]
78use {
79 crate::services::processor::nnswitch::SwitchProcessingError,
80 crate::services::store::{Store, StoreEnvelope},
81 crate::services::upload::Upload,
82 crate::utils::Enforcement,
83 itertools::Itertools,
84 relay_cardinality::{
85 CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
86 RedisSetLimiterOptions,
87 },
88 relay_dynamic_config::CardinalityLimiterMode,
89 relay_quotas::{RateLimitingError, RedisRateLimiter},
90 relay_redis::RedisClients,
91 std::time::Instant,
92 symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
93};
94
95mod attachment;
96mod dynamic_sampling;
97mod event;
98mod metrics;
99mod nel;
100mod profile;
101mod replay;
102mod report;
103mod span;
104
105#[cfg(all(sentry, feature = "processing"))]
106mod playstation;
107mod standalone;
108#[cfg(feature = "processing")]
109mod unreal;
110
111#[cfg(feature = "processing")]
112mod nnswitch;
113
114macro_rules! if_processing {
118 ($config:expr, $if_true:block) => {
119 #[cfg(feature = "processing")] {
120 if $config.processing_enabled() $if_true
121 }
122 };
123 ($config:expr, $if_true:block else $if_false:block) => {
124 {
125 #[cfg(feature = "processing")] {
126 if $config.processing_enabled() $if_true else $if_false
127 }
128 #[cfg(not(feature = "processing"))] {
129 $if_false
130 }
131 }
132 };
133}
134
135pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
137
138#[derive(Debug)]
139pub struct GroupTypeError;
140
141impl Display for GroupTypeError {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 f.write_str("failed to convert processing group into corresponding type")
144 }
145}
146
147impl std::error::Error for GroupTypeError {}
148
149macro_rules! processing_group {
150 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
151 #[derive(Clone, Copy, Debug)]
152 pub struct $ty;
153
154 impl From<$ty> for ProcessingGroup {
155 fn from(_: $ty) -> Self {
156 ProcessingGroup::$variant
157 }
158 }
159
160 impl TryFrom<ProcessingGroup> for $ty {
161 type Error = GroupTypeError;
162
163 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
164 if matches!(value, ProcessingGroup::$variant) {
165 return Ok($ty);
166 }
167 $($(
168 if matches!(value, ProcessingGroup::$other) {
169 return Ok($ty);
170 }
171 )+)?
172 return Err(GroupTypeError);
173 }
174 }
175 };
176}
177
178pub trait EventProcessing {}
182
183processing_group!(TransactionGroup, Transaction);
184impl EventProcessing for TransactionGroup {}
185
186processing_group!(ErrorGroup, Error);
187impl EventProcessing for ErrorGroup {}
188
189processing_group!(SessionGroup, Session);
190processing_group!(StandaloneGroup, Standalone);
191processing_group!(ClientReportGroup, ClientReport);
192processing_group!(ReplayGroup, Replay);
193processing_group!(CheckInGroup, CheckIn);
194processing_group!(LogGroup, Log, Nel);
195processing_group!(TraceMetricGroup, TraceMetric);
196processing_group!(SpanGroup, Span);
197
198processing_group!(ProfileChunkGroup, ProfileChunk);
199processing_group!(MetricsGroup, Metrics);
200processing_group!(ForwardUnknownGroup, ForwardUnknown);
201processing_group!(Ungrouped, Ungrouped);
202
203#[derive(Clone, Copy, Debug)]
207pub struct Processed;
208
209#[derive(Clone, Copy, Debug)]
211pub enum ProcessingGroup {
212 Transaction,
216 Error,
221 Session,
223 Standalone,
226 ClientReport,
228 Replay,
230 CheckIn,
232 Nel,
234 Log,
236 TraceMetric,
238 Span,
240 SpanV2,
242 Metrics,
244 ProfileChunk,
246 TraceAttachment,
248 ForwardUnknown,
251 Ungrouped,
253}
254
255impl ProcessingGroup {
256 fn split_envelope(
258 mut envelope: Envelope,
259 project_info: &ProjectInfo,
260 ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
261 let headers = envelope.headers().clone();
262 let mut grouped_envelopes = smallvec![];
263
264 let replay_items = envelope.take_items_by(|item| {
266 matches!(
267 item.ty(),
268 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
269 )
270 });
271 if !replay_items.is_empty() {
272 grouped_envelopes.push((
273 ProcessingGroup::Replay,
274 Envelope::from_parts(headers.clone(), replay_items),
275 ))
276 }
277
278 let session_items = envelope
280 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
281 if !session_items.is_empty() {
282 grouped_envelopes.push((
283 ProcessingGroup::Session,
284 Envelope::from_parts(headers.clone(), session_items),
285 ))
286 }
287
288 let span_v2_items = envelope.take_items_by(|item| {
289 let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
290
291 ItemContainer::<SpanV2>::is_container(item)
292 || matches!(item.integration(), Some(Integration::Spans(_)))
293 || (exp_feature && matches!(item.ty(), &ItemType::Span))
295 || (exp_feature && item.is_span_attachment())
296 });
297
298 if !span_v2_items.is_empty() {
299 grouped_envelopes.push((
300 ProcessingGroup::SpanV2,
301 Envelope::from_parts(headers.clone(), span_v2_items),
302 ))
303 }
304
305 let span_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Span));
307 if !span_items.is_empty() {
308 grouped_envelopes.push((
309 ProcessingGroup::Span,
310 Envelope::from_parts(headers.clone(), span_items),
311 ))
312 }
313
314 let logs_items = envelope.take_items_by(|item| {
316 matches!(item.ty(), &ItemType::Log)
317 || matches!(item.integration(), Some(Integration::Logs(_)))
318 });
319 if !logs_items.is_empty() {
320 grouped_envelopes.push((
321 ProcessingGroup::Log,
322 Envelope::from_parts(headers.clone(), logs_items),
323 ))
324 }
325
326 let trace_metric_items =
328 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
329 if !trace_metric_items.is_empty() {
330 grouped_envelopes.push((
331 ProcessingGroup::TraceMetric,
332 Envelope::from_parts(headers.clone(), trace_metric_items),
333 ))
334 }
335
336 let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
338 if !nel_items.is_empty() {
339 grouped_envelopes.push((
340 ProcessingGroup::Nel,
341 Envelope::from_parts(headers.clone(), nel_items),
342 ))
343 }
344
345 let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
350 if !metric_items.is_empty() {
351 grouped_envelopes.push((
352 ProcessingGroup::Metrics,
353 Envelope::from_parts(headers.clone(), metric_items),
354 ))
355 }
356
357 let profile_chunk_items =
359 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
360 if !profile_chunk_items.is_empty() {
361 grouped_envelopes.push((
362 ProcessingGroup::ProfileChunk,
363 Envelope::from_parts(headers.clone(), profile_chunk_items),
364 ))
365 }
366
367 let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
368 if !trace_attachment_items.is_empty() {
369 grouped_envelopes.push((
370 ProcessingGroup::TraceAttachment,
371 Envelope::from_parts(headers.clone(), trace_attachment_items),
372 ))
373 }
374
375 if !envelope.items().any(Item::creates_event) {
380 let standalone_items = envelope.take_items_by(Item::requires_event);
381 if !standalone_items.is_empty() {
382 grouped_envelopes.push((
383 ProcessingGroup::Standalone,
384 Envelope::from_parts(headers.clone(), standalone_items),
385 ))
386 }
387 };
388
389 let security_reports_items = envelope
391 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
392 .into_iter()
393 .map(|item| {
394 let headers = headers.clone();
395 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
396 let mut envelope = Envelope::from_parts(headers, items);
397 envelope.set_event_id(EventId::new());
398 (ProcessingGroup::Error, envelope)
399 });
400 grouped_envelopes.extend(security_reports_items);
401
402 let require_event_items = envelope.take_items_by(Item::requires_event);
404 if !require_event_items.is_empty() {
405 let group = if require_event_items
406 .iter()
407 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
408 {
409 ProcessingGroup::Transaction
410 } else {
411 ProcessingGroup::Error
412 };
413
414 grouped_envelopes.push((
415 group,
416 Envelope::from_parts(headers.clone(), require_event_items),
417 ))
418 }
419
420 let envelopes = envelope.items_mut().map(|item| {
422 let headers = headers.clone();
423 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
424 let envelope = Envelope::from_parts(headers, items);
425 let item_type = item.ty();
426 let group = if matches!(item_type, &ItemType::CheckIn) {
427 ProcessingGroup::CheckIn
428 } else if matches!(item.ty(), &ItemType::ClientReport) {
429 ProcessingGroup::ClientReport
430 } else if matches!(item_type, &ItemType::Unknown(_)) {
431 ProcessingGroup::ForwardUnknown
432 } else {
433 ProcessingGroup::Ungrouped
435 };
436
437 (group, envelope)
438 });
439 grouped_envelopes.extend(envelopes);
440
441 grouped_envelopes
442 }
443
444 pub fn variant(&self) -> &'static str {
446 match self {
447 ProcessingGroup::Transaction => "transaction",
448 ProcessingGroup::Error => "error",
449 ProcessingGroup::Session => "session",
450 ProcessingGroup::Standalone => "standalone",
451 ProcessingGroup::ClientReport => "client_report",
452 ProcessingGroup::Replay => "replay",
453 ProcessingGroup::CheckIn => "check_in",
454 ProcessingGroup::Log => "log",
455 ProcessingGroup::TraceMetric => "trace_metric",
456 ProcessingGroup::Nel => "nel",
457 ProcessingGroup::Span => "span",
458 ProcessingGroup::SpanV2 => "span_v2",
459 ProcessingGroup::Metrics => "metrics",
460 ProcessingGroup::ProfileChunk => "profile_chunk",
461 ProcessingGroup::TraceAttachment => "trace_attachment",
462 ProcessingGroup::ForwardUnknown => "forward_unknown",
463 ProcessingGroup::Ungrouped => "ungrouped",
464 }
465 }
466}
467
468impl From<ProcessingGroup> for AppFeature {
469 fn from(value: ProcessingGroup) -> Self {
470 match value {
471 ProcessingGroup::Transaction => AppFeature::Transactions,
472 ProcessingGroup::Error => AppFeature::Errors,
473 ProcessingGroup::Session => AppFeature::Sessions,
474 ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
475 ProcessingGroup::ClientReport => AppFeature::ClientReports,
476 ProcessingGroup::Replay => AppFeature::Replays,
477 ProcessingGroup::CheckIn => AppFeature::CheckIns,
478 ProcessingGroup::Log => AppFeature::Logs,
479 ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
480 ProcessingGroup::Nel => AppFeature::Logs,
481 ProcessingGroup::Span => AppFeature::Spans,
482 ProcessingGroup::SpanV2 => AppFeature::Spans,
483 ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
484 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
485 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
486 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
487 ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
488 }
489 }
490}
491
492#[derive(Debug, thiserror::Error)]
494pub enum ProcessingError {
495 #[error("invalid json in event")]
496 InvalidJson(#[source] serde_json::Error),
497
498 #[error("invalid message pack event payload")]
499 InvalidMsgpack(#[from] rmp_serde::decode::Error),
500
501 #[cfg(feature = "processing")]
502 #[error("invalid unreal crash report")]
503 InvalidUnrealReport(#[source] Unreal4Error),
504
505 #[error("event payload too large")]
506 PayloadTooLarge(DiscardItemType),
507
508 #[error("invalid transaction event")]
509 InvalidTransaction,
510
511 #[error("envelope processor failed")]
512 ProcessingFailed(#[from] ProcessingAction),
513
514 #[error("duplicate {0} in event")]
515 DuplicateItem(ItemType),
516
517 #[error("failed to extract event payload")]
518 NoEventPayload,
519
520 #[error("missing project id in DSN")]
521 MissingProjectId,
522
523 #[error("invalid security report type: {0:?}")]
524 InvalidSecurityType(Bytes),
525
526 #[error("unsupported security report type")]
527 UnsupportedSecurityType,
528
529 #[error("invalid security report")]
530 InvalidSecurityReport(#[source] serde_json::Error),
531
532 #[error("invalid nel report")]
533 InvalidNelReport(#[source] NetworkReportError),
534
535 #[error("event filtered with reason: {0:?}")]
536 EventFiltered(FilterStatKey),
537
538 #[error("missing or invalid required event timestamp")]
539 InvalidTimestamp,
540
541 #[error("could not serialize event payload")]
542 SerializeFailed(#[source] serde_json::Error),
543
544 #[cfg(feature = "processing")]
545 #[error("failed to apply quotas")]
546 QuotasFailed(#[from] RateLimitingError),
547
548 #[error("invalid pii config")]
549 PiiConfigError(PiiConfigError),
550
551 #[error("invalid processing group type")]
552 InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
553
554 #[error("invalid replay")]
555 InvalidReplay(DiscardReason),
556
557 #[error("replay filtered with reason: {0:?}")]
558 ReplayFiltered(FilterStatKey),
559
560 #[cfg(feature = "processing")]
561 #[error("nintendo switch dying message processing failed {0:?}")]
562 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
563
564 #[cfg(all(sentry, feature = "processing"))]
565 #[error("playstation dump processing failed: {0}")]
566 InvalidPlaystationDump(String),
567
568 #[error("processing group does not match specific processor")]
569 ProcessingGroupMismatch,
570 #[error("new processing pipeline failed")]
571 ProcessingFailure,
572}
573
574impl ProcessingError {
575 pub fn to_outcome(&self) -> Option<Outcome> {
576 match self {
577 Self::PayloadTooLarge(payload_type) => {
578 Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
579 }
580 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
581 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
582 Self::InvalidSecurityType(_) => {
583 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
584 }
585 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
586 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
587 Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
588 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
589 Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
590 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
591 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
592 #[cfg(feature = "processing")]
593 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
594 #[cfg(all(sentry, feature = "processing"))]
595 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
596 #[cfg(feature = "processing")]
597 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
598 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
599 }
600 #[cfg(feature = "processing")]
601 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
602 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
603 Some(Outcome::Invalid(DiscardReason::Internal))
604 }
605 #[cfg(feature = "processing")]
606 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
607 Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
608 Self::MissingProjectId => None,
609 Self::EventFiltered(_) => None,
610 Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
611 Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
612 Self::InvalidProcessingGroup(_) => None,
613
614 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
615 Self::ProcessingFailure => None,
617 }
618 }
619
620 fn is_unexpected(&self) -> bool {
621 self.to_outcome()
622 .is_some_and(|outcome| outcome.is_unexpected())
623 }
624}
625
626#[cfg(feature = "processing")]
627impl From<Unreal4Error> for ProcessingError {
628 fn from(err: Unreal4Error) -> Self {
629 match err.kind() {
630 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
631 _ => ProcessingError::InvalidUnrealReport(err),
632 }
633 }
634}
635
636impl From<ExtractMetricsError> for ProcessingError {
637 fn from(error: ExtractMetricsError) -> Self {
638 match error {
639 ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
640 Self::InvalidTimestamp
641 }
642 }
643 }
644}
645
646impl From<InvalidProcessingGroupType> for ProcessingError {
647 fn from(value: InvalidProcessingGroupType) -> Self {
648 Self::InvalidProcessingGroup(Box::new(value))
649 }
650}
651
652type ExtractedEvent = (Annotated<Event>, usize);
653
654#[derive(Debug)]
659pub struct ProcessingExtractedMetrics {
660 metrics: ExtractedMetrics,
661}
662
663impl ProcessingExtractedMetrics {
664 pub fn new() -> Self {
665 Self {
666 metrics: ExtractedMetrics::default(),
667 }
668 }
669
670 pub fn into_inner(self) -> ExtractedMetrics {
671 self.metrics
672 }
673
674 pub fn extend(
676 &mut self,
677 extracted: ExtractedMetrics,
678 sampling_decision: Option<SamplingDecision>,
679 ) {
680 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
681 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
682 }
683
684 pub fn extend_project_metrics<I>(
686 &mut self,
687 buckets: I,
688 sampling_decision: Option<SamplingDecision>,
689 ) where
690 I: IntoIterator<Item = Bucket>,
691 {
692 self.metrics
693 .project_metrics
694 .extend(buckets.into_iter().map(|mut bucket| {
695 bucket.metadata.extracted_from_indexed =
696 sampling_decision == Some(SamplingDecision::Keep);
697 bucket
698 }));
699 }
700
701 pub fn extend_sampling_metrics<I>(
703 &mut self,
704 buckets: I,
705 sampling_decision: Option<SamplingDecision>,
706 ) where
707 I: IntoIterator<Item = Bucket>,
708 {
709 self.metrics
710 .sampling_metrics
711 .extend(buckets.into_iter().map(|mut bucket| {
712 bucket.metadata.extracted_from_indexed =
713 sampling_decision == Some(SamplingDecision::Keep);
714 bucket
715 }));
716 }
717
718 #[cfg(feature = "processing")]
723 fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
724 let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
726 let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
729
730 for (namespace, limit, indexed) in [
731 (
732 MetricNamespace::Transactions,
733 &enforcement.event,
734 &enforcement.event_indexed,
735 ),
736 (
737 MetricNamespace::Spans,
738 &enforcement.spans,
739 &enforcement.spans_indexed,
740 ),
741 ] {
742 if limit.is_active() {
743 drop_namespaces.push(namespace);
744 } else if indexed.is_active() && !enforced_consistently {
745 reset_extracted_from_indexed.push(namespace);
750 }
751 }
752
753 if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
754 self.retain_mut(|bucket| {
755 let Some(namespace) = bucket.name.try_namespace() else {
756 return true;
757 };
758
759 if drop_namespaces.contains(&namespace) {
760 return false;
761 }
762
763 if reset_extracted_from_indexed.contains(&namespace) {
764 bucket.metadata.extracted_from_indexed = false;
765 }
766
767 true
768 });
769 }
770 }
771
772 #[cfg(feature = "processing")]
773 fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
774 self.metrics.project_metrics.retain_mut(&mut f);
775 self.metrics.sampling_metrics.retain_mut(&mut f);
776 }
777}
778
779fn send_metrics(
780 metrics: ExtractedMetrics,
781 project_key: ProjectKey,
782 sampling_key: Option<ProjectKey>,
783 aggregator: &Addr<Aggregator>,
784) {
785 let ExtractedMetrics {
786 project_metrics,
787 sampling_metrics,
788 } = metrics;
789
790 if !project_metrics.is_empty() {
791 aggregator.send(MergeBuckets {
792 project_key,
793 buckets: project_metrics,
794 });
795 }
796
797 if !sampling_metrics.is_empty() {
798 let sampling_project_key = sampling_key.unwrap_or(project_key);
805 aggregator.send(MergeBuckets {
806 project_key: sampling_project_key,
807 buckets: sampling_metrics,
808 });
809 }
810}
811
812fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
817 match config.relay_mode() {
818 RelayMode::Proxy => false,
819 RelayMode::Managed => !project_info.has_feature(feature),
820 }
821}
822
823#[derive(Debug)]
826#[expect(
827 clippy::large_enum_variant,
828 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
829)]
830enum ProcessingResult {
831 Envelope {
832 managed_envelope: TypedEnvelope<Processed>,
833 extracted_metrics: ProcessingExtractedMetrics,
834 },
835 Output(Output<Outputs>),
836}
837
838impl ProcessingResult {
839 fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
841 Self::Envelope {
842 managed_envelope,
843 extracted_metrics: ProcessingExtractedMetrics::new(),
844 }
845 }
846}
847
848#[derive(Debug)]
850#[expect(
851 clippy::large_enum_variant,
852 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
853)]
854enum Submit<'a> {
855 Envelope(TypedEnvelope<Processed>),
857 Output {
859 output: Outputs,
860 ctx: processing::ForwardContext<'a>,
861 },
862}
863
864#[derive(Debug)]
874pub struct ProcessEnvelope {
875 pub envelope: ManagedEnvelope,
877 pub project_info: Arc<ProjectInfo>,
879 pub rate_limits: Arc<RateLimits>,
881 pub sampling_project_info: Option<Arc<ProjectInfo>>,
883 pub reservoir_counters: ReservoirCounters,
885}
886
887#[derive(Debug)]
889struct ProcessEnvelopeGrouped<'a> {
890 pub group: ProcessingGroup,
892 pub envelope: ManagedEnvelope,
894 pub ctx: processing::Context<'a>,
896}
897
898#[derive(Debug)]
910pub struct ProcessMetrics {
911 pub data: MetricData,
913 pub project_key: ProjectKey,
915 pub source: BucketSource,
917 pub received_at: DateTime<Utc>,
919 pub sent_at: Option<DateTime<Utc>>,
922}
923
924#[derive(Debug)]
926pub enum MetricData {
927 Raw(Vec<Item>),
929 Parsed(Vec<Bucket>),
931}
932
933impl MetricData {
934 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
939 let items = match self {
940 Self::Parsed(buckets) => return buckets,
941 Self::Raw(items) => items,
942 };
943
944 let mut buckets = Vec::new();
945 for item in items {
946 let payload = item.payload();
947 if item.ty() == &ItemType::Statsd {
948 for bucket_result in Bucket::parse_all(&payload, timestamp) {
949 match bucket_result {
950 Ok(bucket) => buckets.push(bucket),
951 Err(error) => relay_log::debug!(
952 error = &error as &dyn Error,
953 "failed to parse metric bucket from statsd format",
954 ),
955 }
956 }
957 } else if item.ty() == &ItemType::MetricBuckets {
958 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
959 Ok(parsed_buckets) => {
960 if buckets.is_empty() {
962 buckets = parsed_buckets;
963 } else {
964 buckets.extend(parsed_buckets);
965 }
966 }
967 Err(error) => {
968 relay_log::debug!(
969 error = &error as &dyn Error,
970 "failed to parse metric bucket",
971 );
972 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
973 }
974 }
975 } else {
976 relay_log::error!(
977 "invalid item of type {} passed to ProcessMetrics",
978 item.ty()
979 );
980 }
981 }
982 buckets
983 }
984}
985
986#[derive(Debug)]
987pub struct ProcessBatchedMetrics {
988 pub payload: Bytes,
990 pub source: BucketSource,
992 pub received_at: DateTime<Utc>,
994 pub sent_at: Option<DateTime<Utc>>,
996}
997
998#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1000pub enum BucketSource {
1001 Internal,
1007 External,
1012}
1013
1014impl BucketSource {
1015 pub fn from_meta(meta: &RequestMeta) -> Self {
1017 match meta.request_trust() {
1018 RequestTrust::Trusted => Self::Internal,
1019 RequestTrust::Untrusted => Self::External,
1020 }
1021 }
1022}
1023
1024#[derive(Debug)]
1026pub struct SubmitClientReports {
1027 pub client_reports: Vec<ClientReport>,
1029 pub scoping: Scoping,
1031}
1032
1033#[derive(Debug)]
1035pub enum EnvelopeProcessor {
1036 ProcessEnvelope(Box<ProcessEnvelope>),
1037 ProcessProjectMetrics(Box<ProcessMetrics>),
1038 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1039 FlushBuckets(Box<FlushBuckets>),
1040 SubmitClientReports(Box<SubmitClientReports>),
1041}
1042
1043impl EnvelopeProcessor {
1044 pub fn variant(&self) -> &'static str {
1046 match self {
1047 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1048 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1049 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1050 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1051 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1052 }
1053 }
1054}
1055
1056impl relay_system::Interface for EnvelopeProcessor {}
1057
1058impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1059 type Response = relay_system::NoResponse;
1060
1061 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1062 Self::ProcessEnvelope(Box::new(message))
1063 }
1064}
1065
1066impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1067 type Response = NoResponse;
1068
1069 fn from_message(message: ProcessMetrics, _: ()) -> Self {
1070 Self::ProcessProjectMetrics(Box::new(message))
1071 }
1072}
1073
1074impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1075 type Response = NoResponse;
1076
1077 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1078 Self::ProcessBatchedMetrics(Box::new(message))
1079 }
1080}
1081
1082impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1083 type Response = NoResponse;
1084
1085 fn from_message(message: FlushBuckets, _: ()) -> Self {
1086 Self::FlushBuckets(Box::new(message))
1087 }
1088}
1089
1090impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1091 type Response = NoResponse;
1092
1093 fn from_message(message: SubmitClientReports, _: ()) -> Self {
1094 Self::SubmitClientReports(Box::new(message))
1095 }
1096}
1097
1098pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1100
1101#[derive(Clone)]
1105pub struct EnvelopeProcessorService {
1106 inner: Arc<InnerProcessor>,
1107}
1108
1109pub struct Addrs {
1111 pub outcome_aggregator: Addr<TrackOutcome>,
1112 pub upstream_relay: Addr<UpstreamRelay>,
1113 #[cfg(feature = "processing")]
1114 pub upload: Option<Addr<Upload>>,
1115 #[cfg(feature = "processing")]
1116 pub store_forwarder: Option<Addr<Store>>,
1117 pub aggregator: Addr<Aggregator>,
1118}
1119
1120impl Default for Addrs {
1121 fn default() -> Self {
1122 Addrs {
1123 outcome_aggregator: Addr::dummy(),
1124 upstream_relay: Addr::dummy(),
1125 #[cfg(feature = "processing")]
1126 upload: None,
1127 #[cfg(feature = "processing")]
1128 store_forwarder: None,
1129 aggregator: Addr::dummy(),
1130 }
1131 }
1132}
1133
1134struct InnerProcessor {
1135 pool: EnvelopeProcessorServicePool,
1136 config: Arc<Config>,
1137 global_config: GlobalConfigHandle,
1138 project_cache: ProjectCacheHandle,
1139 cogs: Cogs,
1140 addrs: Addrs,
1141 #[cfg(feature = "processing")]
1142 rate_limiter: Option<Arc<RedisRateLimiter>>,
1143 geoip_lookup: GeoIpLookup,
1144 #[cfg(feature = "processing")]
1145 cardinality_limiter: Option<CardinalityLimiter>,
1146 metric_outcomes: MetricOutcomes,
1147 processing: Processing,
1148}
1149
1150struct Processing {
1151 logs: LogsProcessor,
1152 trace_metrics: TraceMetricsProcessor,
1153 spans: SpansProcessor,
1154 check_ins: CheckInsProcessor,
1155 sessions: SessionsProcessor,
1156 transactions: TransactionProcessor,
1157 profile_chunks: ProfileChunksProcessor,
1158 trace_attachments: TraceAttachmentsProcessor,
1159 replays: ReplaysProcessor,
1160}
1161
1162impl EnvelopeProcessorService {
1163 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1165 pub fn new(
1166 pool: EnvelopeProcessorServicePool,
1167 config: Arc<Config>,
1168 global_config: GlobalConfigHandle,
1169 project_cache: ProjectCacheHandle,
1170 cogs: Cogs,
1171 #[cfg(feature = "processing")] redis: Option<RedisClients>,
1172 addrs: Addrs,
1173 metric_outcomes: MetricOutcomes,
1174 ) -> Self {
1175 let geoip_lookup = config
1176 .geoip_path()
1177 .and_then(
1178 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1179 Ok(geoip) => Some(geoip),
1180 Err(err) => {
1181 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1182 None
1183 }
1184 },
1185 )
1186 .unwrap_or_else(GeoIpLookup::empty);
1187
1188 #[cfg(feature = "processing")]
1189 let (cardinality, quotas) = match redis {
1190 Some(RedisClients {
1191 cardinality,
1192 quotas,
1193 ..
1194 }) => (Some(cardinality), Some(quotas)),
1195 None => (None, None),
1196 };
1197 #[cfg(not(feature = "processing"))]
1198 let quotas = None;
1199
1200 #[cfg(feature = "processing")]
1201 let rate_limiter = quotas.clone().map(|redis| {
1202 RedisRateLimiter::new(redis)
1203 .max_limit(config.max_rate_limit())
1204 .cache(config.quota_cache_ratio(), config.quota_cache_max())
1205 });
1206
1207 let quota_limiter = Arc::new(QuotaRateLimiter::new(
1208 #[cfg(feature = "processing")]
1209 project_cache.clone(),
1210 #[cfg(feature = "processing")]
1211 rate_limiter.clone(),
1212 ));
1213 #[cfg(feature = "processing")]
1214 let rate_limiter = rate_limiter.map(Arc::new);
1215
1216 let inner = InnerProcessor {
1217 pool,
1218 global_config,
1219 project_cache,
1220 cogs,
1221 #[cfg(feature = "processing")]
1222 rate_limiter,
1223 addrs,
1224 #[cfg(feature = "processing")]
1225 cardinality_limiter: cardinality
1226 .map(|cardinality| {
1227 RedisSetLimiter::new(
1228 RedisSetLimiterOptions {
1229 cache_vacuum_interval: config
1230 .cardinality_limiter_cache_vacuum_interval(),
1231 },
1232 cardinality,
1233 )
1234 })
1235 .map(CardinalityLimiter::new),
1236 metric_outcomes,
1237 processing: Processing {
1238 logs: LogsProcessor::new(Arc::clone("a_limiter)),
1239 trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)),
1240 spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1241 check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)),
1242 sessions: SessionsProcessor::new(Arc::clone("a_limiter)),
1243 transactions: TransactionProcessor::new(
1244 Arc::clone("a_limiter),
1245 geoip_lookup.clone(),
1246 quotas.clone(),
1247 ),
1248 profile_chunks: ProfileChunksProcessor::new(Arc::clone("a_limiter)),
1249 trace_attachments: TraceAttachmentsProcessor::new(Arc::clone("a_limiter)),
1250 replays: ReplaysProcessor::new(quota_limiter, geoip_lookup.clone()),
1251 },
1252 geoip_lookup,
1253 config,
1254 };
1255
1256 Self {
1257 inner: Arc::new(inner),
1258 }
1259 }
1260
1261 async fn enforce_quotas<Group>(
1262 &self,
1263 managed_envelope: &mut TypedEnvelope<Group>,
1264 event: Annotated<Event>,
1265 extracted_metrics: &mut ProcessingExtractedMetrics,
1266 ctx: processing::Context<'_>,
1267 ) -> Result<Annotated<Event>, ProcessingError> {
1268 let cached_result = RateLimiter::Cached
1271 .enforce(managed_envelope, event, extracted_metrics, ctx)
1272 .await?;
1273
1274 if_processing!(self.inner.config, {
1275 let rate_limiter = match self.inner.rate_limiter.clone() {
1276 Some(rate_limiter) => rate_limiter,
1277 None => return Ok(cached_result.event),
1278 };
1279
1280 let consistent_result = RateLimiter::Consistent(rate_limiter)
1282 .enforce(
1283 managed_envelope,
1284 cached_result.event,
1285 extracted_metrics,
1286 ctx
1287 )
1288 .await?;
1289
1290 if !consistent_result.rate_limits.is_empty() {
1292 self.inner
1293 .project_cache
1294 .get(managed_envelope.scoping().project_key)
1295 .rate_limits()
1296 .merge(consistent_result.rate_limits);
1297 }
1298
1299 Ok(consistent_result.event)
1300 } else { Ok(cached_result.event) })
1301 }
1302
1303 async fn process_errors(
1305 &self,
1306 managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1307 project_id: ProjectId,
1308 mut ctx: processing::Context<'_>,
1309 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1310 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1311 let mut metrics = Metrics::default();
1312 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1313
1314 report::process_user_reports(managed_envelope);
1316
1317 if_processing!(self.inner.config, {
1318 unreal::expand(managed_envelope, &self.inner.config)?;
1319 #[cfg(sentry)]
1320 playstation::expand(managed_envelope, &self.inner.config, ctx.project_info)?;
1321 nnswitch::expand(managed_envelope)?;
1322 });
1323
1324 let mut event = event::extract(
1325 managed_envelope,
1326 &mut metrics,
1327 event_fully_normalized,
1328 &self.inner.config,
1329 )?;
1330
1331 if_processing!(self.inner.config, {
1332 if let Some(inner_event_fully_normalized) =
1333 unreal::process(managed_envelope, &mut event)?
1334 {
1335 event_fully_normalized = inner_event_fully_normalized;
1336 }
1337 #[cfg(sentry)]
1338 if let Some(inner_event_fully_normalized) =
1339 playstation::process(managed_envelope, &mut event, ctx.project_info)?
1340 {
1341 event_fully_normalized = inner_event_fully_normalized;
1342 }
1343 if let Some(inner_event_fully_normalized) =
1344 attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1345 {
1346 event_fully_normalized = inner_event_fully_normalized;
1347 }
1348 });
1349
1350 ctx.sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1351 managed_envelope,
1352 &mut event,
1353 ctx.project_info,
1354 ctx.sampling_project_info,
1355 );
1356
1357 let attachments = managed_envelope
1358 .envelope()
1359 .items()
1360 .filter(|item| item.attachment_type() == Some(&AttachmentType::Attachment));
1361 processing::utils::event::finalize(
1362 managed_envelope.envelope().headers(),
1363 &mut event,
1364 attachments,
1365 &mut metrics,
1366 ctx.config,
1367 )?;
1368 event_fully_normalized = processing::utils::event::normalize(
1369 managed_envelope.envelope().headers(),
1370 &mut event,
1371 event_fully_normalized,
1372 project_id,
1373 ctx,
1374 &self.inner.geoip_lookup,
1375 )?;
1376 let filter_run =
1377 processing::utils::event::filter(managed_envelope.envelope().headers(), &event, &ctx)
1378 .map_err(|err| {
1379 managed_envelope.reject(Outcome::Filtered(err.clone()));
1380 ProcessingError::EventFiltered(err)
1381 })?;
1382
1383 if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1384 dynamic_sampling::tag_error_with_sampling_decision(
1385 managed_envelope,
1386 &mut event,
1387 ctx.sampling_project_info,
1388 &self.inner.config,
1389 )
1390 .await;
1391 }
1392
1393 event = self
1394 .enforce_quotas(managed_envelope, event, &mut extracted_metrics, ctx)
1395 .await?;
1396
1397 if event.value().is_some() {
1398 processing::utils::event::scrub(&mut event, ctx.project_info)?;
1399 event::serialize(
1400 managed_envelope,
1401 &mut event,
1402 event_fully_normalized,
1403 EventMetricsExtracted(false),
1404 SpansExtracted(false),
1405 )?;
1406 event::emit_feedback_metrics(managed_envelope.envelope());
1407 }
1408
1409 let attachments = managed_envelope
1410 .envelope_mut()
1411 .items_mut()
1412 .filter(|i| i.ty() == &ItemType::Attachment);
1413 processing::utils::attachments::scrub(attachments, ctx.project_info);
1414
1415 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1416 relay_log::error!(
1417 tags.project = %project_id,
1418 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1419 "ingested event without normalizing"
1420 );
1421 }
1422
1423 Ok(Some(extracted_metrics))
1424 }
1425
1426 async fn process_standalone(
1428 &self,
1429 managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
1430 ctx: processing::Context<'_>,
1431 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1432 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1433
1434 standalone::process(managed_envelope);
1435
1436 profile::filter(managed_envelope, ctx.config, ctx.project_info);
1437
1438 self.enforce_quotas(
1439 managed_envelope,
1440 Annotated::empty(),
1441 &mut extracted_metrics,
1442 ctx,
1443 )
1444 .await?;
1445
1446 report::process_user_reports(managed_envelope);
1447 let attachments = managed_envelope
1448 .envelope_mut()
1449 .items_mut()
1450 .filter(|i| i.ty() == &ItemType::Attachment);
1451 processing::utils::attachments::scrub(attachments, ctx.project_info);
1452
1453 Ok(Some(extracted_metrics))
1454 }
1455
1456 async fn process_client_reports(
1458 &self,
1459 managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
1460 ctx: processing::Context<'_>,
1461 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1462 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1463
1464 self.enforce_quotas(
1465 managed_envelope,
1466 Annotated::empty(),
1467 &mut extracted_metrics,
1468 ctx,
1469 )
1470 .await?;
1471
1472 report::process_client_reports(
1473 managed_envelope,
1474 ctx.config,
1475 ctx.project_info,
1476 self.inner.addrs.outcome_aggregator.clone(),
1477 );
1478
1479 Ok(Some(extracted_metrics))
1480 }
1481
1482 async fn process_replays(
1484 &self,
1485 managed_envelope: &mut TypedEnvelope<ReplayGroup>,
1486 ctx: processing::Context<'_>,
1487 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1488 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1489
1490 replay::process(
1491 managed_envelope,
1492 ctx.global_config,
1493 ctx.config,
1494 ctx.project_info,
1495 &self.inner.geoip_lookup,
1496 )?;
1497
1498 self.enforce_quotas(
1499 managed_envelope,
1500 Annotated::empty(),
1501 &mut extracted_metrics,
1502 ctx,
1503 )
1504 .await?;
1505
1506 Ok(Some(extracted_metrics))
1507 }
1508
1509 async fn process_nel(
1510 &self,
1511 mut managed_envelope: ManagedEnvelope,
1512 ctx: processing::Context<'_>,
1513 ) -> Result<ProcessingResult, ProcessingError> {
1514 nel::convert_to_logs(&mut managed_envelope);
1515 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1516 .await
1517 }
1518
1519 async fn process_with_processor<P: processing::Processor>(
1520 &self,
1521 processor: &P,
1522 mut managed_envelope: ManagedEnvelope,
1523 ctx: processing::Context<'_>,
1524 ) -> Result<ProcessingResult, ProcessingError>
1525 where
1526 Outputs: From<P::Output>,
1527 {
1528 let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1529 debug_assert!(
1530 false,
1531 "there must be work for the {} processor",
1532 std::any::type_name::<P>(),
1533 );
1534 return Err(ProcessingError::ProcessingGroupMismatch);
1535 };
1536
1537 managed_envelope.update();
1538 match managed_envelope.envelope().is_empty() {
1539 true => managed_envelope.accept(),
1540 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1541 }
1542
1543 processor
1544 .process(work, ctx)
1545 .await
1546 .map_err(|err| {
1547 relay_log::debug!(
1548 error = &err as &dyn std::error::Error,
1549 "processing pipeline failed"
1550 );
1551 ProcessingError::ProcessingFailure
1552 })
1553 .map(|o| o.map(Into::into))
1554 .map(ProcessingResult::Output)
1555 }
1556
1557 async fn process_standalone_spans(
1561 &self,
1562 managed_envelope: &mut TypedEnvelope<SpanGroup>,
1563 _project_id: ProjectId,
1564 ctx: processing::Context<'_>,
1565 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1566 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1567
1568 span::filter(managed_envelope, ctx.config, ctx.project_info);
1569
1570 if_processing!(self.inner.config, {
1571 span::process(
1572 managed_envelope,
1573 &mut Annotated::empty(),
1574 &mut extracted_metrics,
1575 _project_id,
1576 ctx,
1577 &self.inner.geoip_lookup,
1578 )
1579 .await;
1580 });
1581
1582 self.enforce_quotas(
1583 managed_envelope,
1584 Annotated::empty(),
1585 &mut extracted_metrics,
1586 ctx,
1587 )
1588 .await?;
1589
1590 Ok(Some(extracted_metrics))
1591 }
1592
1593 async fn process_envelope(
1594 &self,
1595 project_id: ProjectId,
1596 message: ProcessEnvelopeGrouped<'_>,
1597 ) -> Result<ProcessingResult, ProcessingError> {
1598 let ProcessEnvelopeGrouped {
1599 group,
1600 envelope: mut managed_envelope,
1601 ctx,
1602 } = message;
1603
1604 if let Some(sampling_state) = ctx.sampling_project_info {
1606 managed_envelope
1609 .envelope_mut()
1610 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1611 }
1612
1613 if let Some(retention) = ctx.project_info.config.event_retention {
1616 managed_envelope.envelope_mut().set_retention(retention);
1617 }
1618
1619 if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1622 managed_envelope
1623 .envelope_mut()
1624 .set_downsampled_retention(retention);
1625 }
1626
1627 managed_envelope
1632 .envelope_mut()
1633 .meta_mut()
1634 .set_project_id(project_id);
1635
1636 macro_rules! run {
1637 ($fn_name:ident $(, $args:expr)*) => {
1638 async {
1639 let mut managed_envelope = (managed_envelope, group).try_into()?;
1640 match self.$fn_name(&mut managed_envelope, $($args),*).await {
1641 Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1642 managed_envelope: managed_envelope.into_processed(),
1643 extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1644 }),
1645 Err(error) => {
1646 relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1647 if let Some(outcome) = error.to_outcome() {
1648 managed_envelope.reject(outcome);
1649 }
1650
1651 return Err(error);
1652 }
1653 }
1654 }.await
1655 };
1656 }
1657
1658 relay_log::trace!("Processing {group} group", group = group.variant());
1659
1660 match group {
1661 ProcessingGroup::Error => run!(process_errors, project_id, ctx),
1662 ProcessingGroup::Transaction => {
1663 self.process_with_processor(
1664 &self.inner.processing.transactions,
1665 managed_envelope,
1666 ctx,
1667 )
1668 .await
1669 }
1670 ProcessingGroup::Session => {
1671 self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1672 .await
1673 }
1674 ProcessingGroup::Standalone => run!(process_standalone, ctx),
1675 ProcessingGroup::ClientReport => run!(process_client_reports, ctx),
1676 ProcessingGroup::Replay => {
1677 if ctx.project_info.has_feature(Feature::NewReplayProcessing) {
1678 self.process_with_processor(
1679 &self.inner.processing.replays,
1680 managed_envelope,
1681 ctx,
1682 )
1683 .await
1684 } else {
1685 run!(process_replays, ctx)
1686 }
1687 }
1688 ProcessingGroup::CheckIn => {
1689 self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1690 .await
1691 }
1692 ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1693 ProcessingGroup::Log => {
1694 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1695 .await
1696 }
1697 ProcessingGroup::TraceMetric => {
1698 self.process_with_processor(
1699 &self.inner.processing.trace_metrics,
1700 managed_envelope,
1701 ctx,
1702 )
1703 .await
1704 }
1705 ProcessingGroup::SpanV2 => {
1706 self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1707 .await
1708 }
1709 ProcessingGroup::TraceAttachment => {
1710 self.process_with_processor(
1711 &self.inner.processing.trace_attachments,
1712 managed_envelope,
1713 ctx,
1714 )
1715 .await
1716 }
1717 ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1718 ProcessingGroup::ProfileChunk => {
1719 self.process_with_processor(
1720 &self.inner.processing.profile_chunks,
1721 managed_envelope,
1722 ctx,
1723 )
1724 .await
1725 }
1726 ProcessingGroup::Metrics => {
1728 if self.inner.config.relay_mode() != RelayMode::Proxy {
1731 relay_log::error!(
1732 tags.project = %project_id,
1733 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1734 "received metrics in the process_state"
1735 );
1736 }
1737
1738 Ok(ProcessingResult::no_metrics(
1739 managed_envelope.into_processed(),
1740 ))
1741 }
1742 ProcessingGroup::Ungrouped => {
1744 relay_log::error!(
1745 tags.project = %project_id,
1746 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1747 "could not identify the processing group based on the envelope's items"
1748 );
1749
1750 Ok(ProcessingResult::no_metrics(
1751 managed_envelope.into_processed(),
1752 ))
1753 }
1754 ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
1758 managed_envelope.into_processed(),
1759 )),
1760 }
1761 }
1762
1763 async fn process<'a>(
1769 &self,
1770 mut message: ProcessEnvelopeGrouped<'a>,
1771 ) -> Result<Option<Submit<'a>>, ProcessingError> {
1772 let ProcessEnvelopeGrouped {
1773 ref mut envelope,
1774 ctx,
1775 ..
1776 } = message;
1777
1778 let Some(project_id) = ctx
1785 .project_info
1786 .project_id
1787 .or_else(|| envelope.envelope().meta().project_id())
1788 else {
1789 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1790 return Err(ProcessingError::MissingProjectId);
1791 };
1792
1793 let client = envelope.envelope().meta().client().map(str::to_owned);
1794 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1795 let project_key = envelope.envelope().meta().public_key();
1796 let sampling_key = envelope
1800 .envelope()
1801 .sampling_key()
1802 .filter(|_| ctx.sampling_project_info.is_some());
1803
1804 relay_log::configure_scope(|scope| {
1807 scope.set_tag("project", project_id);
1808 if let Some(client) = client {
1809 scope.set_tag("sdk", client);
1810 }
1811 if let Some(user_agent) = user_agent {
1812 scope.set_extra("user_agent", user_agent.into());
1813 }
1814 });
1815
1816 let result = match self.process_envelope(project_id, message).await {
1817 Ok(ProcessingResult::Envelope {
1818 mut managed_envelope,
1819 extracted_metrics,
1820 }) => {
1821 managed_envelope.update();
1824
1825 let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
1826 send_metrics(
1827 extracted_metrics.metrics,
1828 project_key,
1829 sampling_key,
1830 &self.inner.addrs.aggregator,
1831 );
1832
1833 let envelope_response = if managed_envelope.envelope().is_empty() {
1834 if !has_metrics {
1835 managed_envelope.reject(Outcome::RateLimited(None));
1837 } else {
1838 managed_envelope.accept();
1839 }
1840
1841 None
1842 } else {
1843 Some(managed_envelope)
1844 };
1845
1846 Ok(envelope_response.map(Submit::Envelope))
1847 }
1848 Ok(ProcessingResult::Output(Output { main, metrics })) => {
1849 if let Some(metrics) = metrics {
1850 metrics.accept(|metrics| {
1851 send_metrics(
1852 metrics,
1853 project_key,
1854 sampling_key,
1855 &self.inner.addrs.aggregator,
1856 );
1857 });
1858 }
1859
1860 let ctx = ctx.to_forward();
1861 Ok(main.map(|output| Submit::Output { output, ctx }))
1862 }
1863 Err(err) => Err(err),
1864 };
1865
1866 relay_log::configure_scope(|scope| {
1867 scope.remove_tag("project");
1868 scope.remove_tag("sdk");
1869 scope.remove_tag("user_agent");
1870 });
1871
1872 result
1873 }
1874
1875 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1876 let project_key = message.envelope.envelope().meta().public_key();
1877 let wait_time = message.envelope.age();
1878 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1879
1880 cogs.cancel();
1883
1884 let scoping = message.envelope.scoping();
1885 for (group, envelope) in ProcessingGroup::split_envelope(
1886 *message.envelope.into_envelope(),
1887 &message.project_info,
1888 ) {
1889 let mut cogs = self
1890 .inner
1891 .cogs
1892 .timed(ResourceId::Relay, AppFeature::from(group));
1893
1894 let mut envelope =
1895 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1896 envelope.scope(scoping);
1897
1898 let global_config = self.inner.global_config.current();
1899
1900 let ctx = processing::Context {
1901 config: &self.inner.config,
1902 global_config: &global_config,
1903 project_info: &message.project_info,
1904 sampling_project_info: message.sampling_project_info.as_deref(),
1905 rate_limits: &message.rate_limits,
1906 reservoir_counters: &message.reservoir_counters,
1907 };
1908
1909 let message = ProcessEnvelopeGrouped {
1910 group,
1911 envelope,
1912 ctx,
1913 };
1914
1915 let result = metric!(
1916 timer(RelayTimers::EnvelopeProcessingTime),
1917 group = group.variant(),
1918 { self.process(message).await }
1919 );
1920
1921 match result {
1922 Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
1923 Ok(None) => {}
1924 Err(error) if error.is_unexpected() => {
1925 relay_log::error!(
1926 tags.project_key = %project_key,
1927 error = &error as &dyn Error,
1928 "error processing envelope"
1929 )
1930 }
1931 Err(error) => {
1932 relay_log::debug!(
1933 tags.project_key = %project_key,
1934 error = &error as &dyn Error,
1935 "error processing envelope"
1936 )
1937 }
1938 }
1939 }
1940 }
1941
1942 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1943 let ProcessMetrics {
1944 data,
1945 project_key,
1946 received_at,
1947 sent_at,
1948 source,
1949 } = message;
1950
1951 let received_timestamp =
1952 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1953
1954 let mut buckets = data.into_buckets(received_timestamp);
1955 if buckets.is_empty() {
1956 return;
1957 };
1958 cogs.update(relay_metrics::cogs::BySize(&buckets));
1959
1960 let clock_drift_processor =
1961 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
1962
1963 buckets.retain_mut(|bucket| {
1964 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
1965 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
1966 return false;
1967 }
1968
1969 if !self::metrics::is_valid_namespace(bucket) {
1970 return false;
1971 }
1972
1973 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
1974
1975 if !matches!(source, BucketSource::Internal) {
1976 bucket.metadata = BucketMetadata::new(received_timestamp);
1977 }
1978
1979 true
1980 });
1981
1982 let project = self.inner.project_cache.get(project_key);
1983
1984 let buckets = match project.state() {
1987 ProjectState::Enabled(project_info) => {
1988 let rate_limits = project.rate_limits().current_limits();
1989 self.check_buckets(project_key, project_info, &rate_limits, buckets)
1990 }
1991 _ => buckets,
1992 };
1993
1994 relay_log::trace!("merging metric buckets into the aggregator");
1995 self.inner
1996 .addrs
1997 .aggregator
1998 .send(MergeBuckets::new(project_key, buckets));
1999 }
2000
2001 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2002 let ProcessBatchedMetrics {
2003 payload,
2004 source,
2005 received_at,
2006 sent_at,
2007 } = message;
2008
2009 #[derive(serde::Deserialize)]
2010 struct Wrapper {
2011 buckets: HashMap<ProjectKey, Vec<Bucket>>,
2012 }
2013
2014 let buckets = match serde_json::from_slice(&payload) {
2015 Ok(Wrapper { buckets }) => buckets,
2016 Err(error) => {
2017 relay_log::debug!(
2018 error = &error as &dyn Error,
2019 "failed to parse batched metrics",
2020 );
2021 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2022 return;
2023 }
2024 };
2025
2026 for (project_key, buckets) in buckets {
2027 self.handle_process_metrics(
2028 cogs,
2029 ProcessMetrics {
2030 data: MetricData::Parsed(buckets),
2031 project_key,
2032 source,
2033 received_at,
2034 sent_at,
2035 },
2036 )
2037 }
2038 }
2039
2040 fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
2041 let _submit = cogs.start_category("submit");
2042
2043 #[cfg(feature = "processing")]
2044 if self.inner.config.processing_enabled()
2045 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2046 {
2047 use crate::processing::StoreHandle;
2048
2049 let upload = self.inner.addrs.upload.as_ref();
2050 match submit {
2051 Submit::Envelope(envelope) => {
2052 let envelope_has_attachments = envelope
2053 .envelope()
2054 .items()
2055 .any(|item| *item.ty() == ItemType::Attachment);
2056 let use_objectstore = || {
2058 let options = &self.inner.global_config.current().options;
2059 utils::sample(options.objectstore_attachments_sample_rate).is_keep()
2060 };
2061
2062 if let Some(upload) = &self.inner.addrs.upload
2063 && envelope_has_attachments
2064 && use_objectstore()
2065 {
2066 upload.send(StoreEnvelope { envelope })
2068 } else {
2069 store_forwarder.send(StoreEnvelope { envelope })
2070 }
2071 }
2072 Submit::Output { output, ctx } => output
2073 .forward_store(StoreHandle::new(store_forwarder, upload), ctx)
2074 .unwrap_or_else(|err| err.into_inner()),
2075 }
2076 return;
2077 }
2078
2079 let mut envelope = match submit {
2080 Submit::Envelope(envelope) => envelope,
2081 Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
2082 Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2083 Err(_) => {
2084 relay_log::error!("failed to serialize output to an envelope");
2085 return;
2086 }
2087 },
2088 };
2089
2090 if envelope.envelope_mut().is_empty() {
2091 envelope.accept();
2092 return;
2093 }
2094
2095 envelope.envelope_mut().set_sent_at(Utc::now());
2101
2102 relay_log::trace!("sending envelope to sentry endpoint");
2103 let http_encoding = self.inner.config.http_encoding();
2104 let result = envelope.envelope().to_vec().and_then(|v| {
2105 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2106 });
2107
2108 match result {
2109 Ok(body) => {
2110 self.inner
2111 .addrs
2112 .upstream_relay
2113 .send(SendRequest(SendEnvelope {
2114 envelope,
2115 body,
2116 http_encoding,
2117 project_cache: self.inner.project_cache.clone(),
2118 }));
2119 }
2120 Err(error) => {
2121 relay_log::error!(
2124 error = &error as &dyn Error,
2125 tags.project_key = %envelope.scoping().project_key,
2126 "failed to serialize envelope payload"
2127 );
2128
2129 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2130 }
2131 }
2132 }
2133
2134 fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2135 let SubmitClientReports {
2136 client_reports,
2137 scoping,
2138 } = message;
2139
2140 let upstream = self.inner.config.upstream_descriptor();
2141 let dsn = PartialDsn::outbound(&scoping, upstream);
2142
2143 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2144 for client_report in client_reports {
2145 match client_report.serialize() {
2146 Ok(payload) => {
2147 let mut item = Item::new(ItemType::ClientReport);
2148 item.set_payload(ContentType::Json, payload);
2149 envelope.add_item(item);
2150 }
2151 Err(error) => {
2152 relay_log::error!(
2153 error = &error as &dyn std::error::Error,
2154 "failed to serialize client report"
2155 );
2156 }
2157 }
2158 }
2159
2160 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2161 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2162 }
2163
2164 fn check_buckets(
2165 &self,
2166 project_key: ProjectKey,
2167 project_info: &ProjectInfo,
2168 rate_limits: &RateLimits,
2169 buckets: Vec<Bucket>,
2170 ) -> Vec<Bucket> {
2171 let Some(scoping) = project_info.scoping(project_key) else {
2172 relay_log::error!(
2173 tags.project_key = project_key.as_str(),
2174 "there is no scoping: dropping {} buckets",
2175 buckets.len(),
2176 );
2177 return Vec::new();
2178 };
2179
2180 let mut buckets = self::metrics::apply_project_info(
2181 buckets,
2182 &self.inner.metric_outcomes,
2183 project_info,
2184 scoping,
2185 );
2186
2187 let namespaces: BTreeSet<MetricNamespace> = buckets
2188 .iter()
2189 .filter_map(|bucket| bucket.name.try_namespace())
2190 .collect();
2191
2192 for namespace in namespaces {
2193 let limits = rate_limits.check_with_quotas(
2194 project_info.get_quotas(),
2195 scoping.item(DataCategory::MetricBucket),
2196 );
2197
2198 if limits.is_limited() {
2199 let rejected;
2200 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2201 bucket.name.try_namespace() == Some(namespace)
2202 });
2203
2204 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2205 self.inner.metric_outcomes.track(
2206 scoping,
2207 &rejected,
2208 Outcome::RateLimited(reason_code),
2209 );
2210 }
2211 }
2212
2213 let quotas = project_info.config.quotas.clone();
2214 match MetricsLimiter::create(buckets, quotas, scoping) {
2215 Ok(mut bucket_limiter) => {
2216 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2217 bucket_limiter.into_buckets()
2218 }
2219 Err(buckets) => buckets,
2220 }
2221 }
2222
2223 #[cfg(feature = "processing")]
2224 async fn rate_limit_buckets(
2225 &self,
2226 scoping: Scoping,
2227 project_info: &ProjectInfo,
2228 mut buckets: Vec<Bucket>,
2229 ) -> Vec<Bucket> {
2230 let Some(rate_limiter) = &self.inner.rate_limiter else {
2231 return buckets;
2232 };
2233
2234 let global_config = self.inner.global_config.current();
2235 let namespaces = buckets
2236 .iter()
2237 .filter_map(|bucket| bucket.name.try_namespace())
2238 .counts();
2239
2240 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2241
2242 for (namespace, quantity) in namespaces {
2243 let item_scoping = scoping.metric_bucket(namespace);
2244
2245 let limits = match rate_limiter
2246 .is_rate_limited(quotas, item_scoping, quantity, false)
2247 .await
2248 {
2249 Ok(limits) => limits,
2250 Err(err) => {
2251 relay_log::error!(
2252 error = &err as &dyn std::error::Error,
2253 "failed to check redis rate limits"
2254 );
2255 break;
2256 }
2257 };
2258
2259 if limits.is_limited() {
2260 let rejected;
2261 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2262 bucket.name.try_namespace() == Some(namespace)
2263 });
2264
2265 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2266 self.inner.metric_outcomes.track(
2267 scoping,
2268 &rejected,
2269 Outcome::RateLimited(reason_code),
2270 );
2271
2272 self.inner
2273 .project_cache
2274 .get(item_scoping.scoping.project_key)
2275 .rate_limits()
2276 .merge(limits);
2277 }
2278 }
2279
2280 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2281 Err(buckets) => buckets,
2282 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2283 }
2284 }
2285
2286 #[cfg(feature = "processing")]
2288 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2289 relay_log::trace!("handle_rate_limit_buckets");
2290
2291 let scoping = *bucket_limiter.scoping();
2292
2293 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2294 let global_config = self.inner.global_config.current();
2295 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2296
2297 let over_accept_once = true;
2300 let mut rate_limits = RateLimits::new();
2301
2302 for category in [DataCategory::Transaction, DataCategory::Span] {
2303 let count = bucket_limiter.count(category);
2304
2305 let timer = Instant::now();
2306 let mut is_limited = false;
2307
2308 if let Some(count) = count {
2309 match rate_limiter
2310 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2311 .await
2312 {
2313 Ok(limits) => {
2314 is_limited = limits.is_limited();
2315 rate_limits.merge(limits)
2316 }
2317 Err(e) => relay_log::error!(error = &e as &dyn Error),
2318 }
2319 }
2320
2321 relay_statsd::metric!(
2322 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2323 category = category.name(),
2324 limited = if is_limited { "true" } else { "false" },
2325 count = match count {
2326 None => "none",
2327 Some(0) => "0",
2328 Some(1) => "1",
2329 Some(1..=10) => "10",
2330 Some(1..=25) => "25",
2331 Some(1..=50) => "50",
2332 Some(51..=100) => "100",
2333 Some(101..=500) => "500",
2334 _ => "> 500",
2335 },
2336 );
2337 }
2338
2339 if rate_limits.is_limited() {
2340 let was_enforced =
2341 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2342
2343 if was_enforced {
2344 self.inner
2346 .project_cache
2347 .get(scoping.project_key)
2348 .rate_limits()
2349 .merge(rate_limits);
2350 }
2351 }
2352 }
2353
2354 bucket_limiter.into_buckets()
2355 }
2356
2357 #[cfg(feature = "processing")]
2359 async fn cardinality_limit_buckets(
2360 &self,
2361 scoping: Scoping,
2362 limits: &[CardinalityLimit],
2363 buckets: Vec<Bucket>,
2364 ) -> Vec<Bucket> {
2365 let global_config = self.inner.global_config.current();
2366 let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
2367
2368 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
2369 return buckets;
2370 }
2371
2372 let Some(ref limiter) = self.inner.cardinality_limiter else {
2373 return buckets;
2374 };
2375
2376 let scope = relay_cardinality::Scoping {
2377 organization_id: scoping.organization_id,
2378 project_id: scoping.project_id,
2379 };
2380
2381 let limits = match limiter
2382 .check_cardinality_limits(scope, limits, buckets)
2383 .await
2384 {
2385 Ok(limits) => limits,
2386 Err((buckets, error)) => {
2387 relay_log::error!(
2388 error = &error as &dyn std::error::Error,
2389 "cardinality limiter failed"
2390 );
2391 return buckets;
2392 }
2393 };
2394
2395 let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
2396 if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
2397 for limit in limits.exceeded_limits() {
2398 relay_log::with_scope(
2399 |scope| {
2400 scope.set_user(Some(relay_log::sentry::User {
2402 id: Some(scoping.organization_id.to_string()),
2403 ..Default::default()
2404 }));
2405 },
2406 || {
2407 relay_log::error!(
2408 tags.organization_id = scoping.organization_id.value(),
2409 tags.limit_id = limit.id,
2410 tags.passive = limit.passive,
2411 "Cardinality Limit"
2412 );
2413 },
2414 );
2415 }
2416 }
2417
2418 for (limit, reports) in limits.cardinality_reports() {
2419 for report in reports {
2420 self.inner
2421 .metric_outcomes
2422 .cardinality(scoping, limit, report);
2423 }
2424 }
2425
2426 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
2427 return limits.into_source();
2428 }
2429
2430 let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
2431
2432 for (bucket, exceeded) in rejected {
2433 self.inner.metric_outcomes.track(
2434 scoping,
2435 &[bucket],
2436 Outcome::CardinalityLimited(exceeded.id.clone()),
2437 );
2438 }
2439 accepted
2440 }
2441
2442 #[cfg(feature = "processing")]
2449 async fn encode_metrics_processing(
2450 &self,
2451 message: FlushBuckets,
2452 store_forwarder: &Addr<Store>,
2453 ) {
2454 use crate::constants::DEFAULT_EVENT_RETENTION;
2455 use crate::services::store::StoreMetrics;
2456
2457 for ProjectBuckets {
2458 buckets,
2459 scoping,
2460 project_info,
2461 ..
2462 } in message.buckets.into_values()
2463 {
2464 let buckets = self
2465 .rate_limit_buckets(scoping, &project_info, buckets)
2466 .await;
2467
2468 let limits = project_info.get_cardinality_limits();
2469 let buckets = self
2470 .cardinality_limit_buckets(scoping, limits, buckets)
2471 .await;
2472
2473 if buckets.is_empty() {
2474 continue;
2475 }
2476
2477 let retention = project_info
2478 .config
2479 .event_retention
2480 .unwrap_or(DEFAULT_EVENT_RETENTION);
2481
2482 store_forwarder.send(StoreMetrics {
2485 buckets,
2486 scoping,
2487 retention,
2488 });
2489 }
2490 }
2491
2492 fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2504 let FlushBuckets {
2505 partition_key,
2506 buckets,
2507 } = message;
2508
2509 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2510 let upstream = self.inner.config.upstream_descriptor();
2511
2512 for ProjectBuckets {
2513 buckets, scoping, ..
2514 } in buckets.values()
2515 {
2516 let dsn = PartialDsn::outbound(scoping, upstream);
2517
2518 relay_statsd::metric!(
2519 distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2520 );
2521
2522 let mut num_batches = 0;
2523 for batch in BucketsView::from(buckets).by_size(batch_size) {
2524 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2525
2526 let mut item = Item::new(ItemType::MetricBuckets);
2527 item.set_source_quantities(crate::metrics::extract_quantities(batch));
2528 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2529 envelope.add_item(item);
2530
2531 let mut envelope =
2532 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2533 envelope
2534 .set_partition_key(Some(partition_key))
2535 .scope(*scoping);
2536
2537 relay_statsd::metric!(
2538 distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2539 );
2540
2541 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2542 num_batches += 1;
2543 }
2544
2545 relay_statsd::metric!(
2546 distribution(RelayDistributions::BatchesPerPartition) = num_batches
2547 );
2548 }
2549 }
2550
2551 fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2553 if partition.is_empty() {
2554 return;
2555 }
2556
2557 let (unencoded, project_info) = partition.take();
2558 let http_encoding = self.inner.config.http_encoding();
2559 let encoded = match encode_payload(&unencoded, http_encoding) {
2560 Ok(payload) => payload,
2561 Err(error) => {
2562 let error = &error as &dyn std::error::Error;
2563 relay_log::error!(error, "failed to encode metrics payload");
2564 return;
2565 }
2566 };
2567
2568 let request = SendMetricsRequest {
2569 partition_key: partition_key.to_string(),
2570 unencoded,
2571 encoded,
2572 project_info,
2573 http_encoding,
2574 metric_outcomes: self.inner.metric_outcomes.clone(),
2575 };
2576
2577 self.inner.addrs.upstream_relay.send(SendRequest(request));
2578 }
2579
2580 fn encode_metrics_global(&self, message: FlushBuckets) {
2595 let FlushBuckets {
2596 partition_key,
2597 buckets,
2598 } = message;
2599
2600 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2601 let mut partition = Partition::new(batch_size);
2602 let mut partition_splits = 0;
2603
2604 for ProjectBuckets {
2605 buckets, scoping, ..
2606 } in buckets.values()
2607 {
2608 for bucket in buckets {
2609 let mut remaining = Some(BucketView::new(bucket));
2610
2611 while let Some(bucket) = remaining.take() {
2612 if let Some(next) = partition.insert(bucket, *scoping) {
2613 self.send_global_partition(partition_key, &mut partition);
2617 remaining = Some(next);
2618 partition_splits += 1;
2619 }
2620 }
2621 }
2622 }
2623
2624 if partition_splits > 0 {
2625 metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2626 }
2627
2628 self.send_global_partition(partition_key, &mut partition);
2629 }
2630
2631 async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2632 for (project_key, pb) in message.buckets.iter_mut() {
2633 let buckets = std::mem::take(&mut pb.buckets);
2634 pb.buckets =
2635 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2636 }
2637
2638 #[cfg(feature = "processing")]
2639 if self.inner.config.processing_enabled()
2640 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2641 {
2642 return self
2643 .encode_metrics_processing(message, store_forwarder)
2644 .await;
2645 }
2646
2647 if self.inner.config.http_global_metrics() {
2648 self.encode_metrics_global(message)
2649 } else {
2650 self.encode_metrics_envelope(cogs, message)
2651 }
2652 }
2653
2654 #[cfg(all(test, feature = "processing"))]
2655 fn redis_rate_limiter_enabled(&self) -> bool {
2656 self.inner.rate_limiter.is_some()
2657 }
2658
2659 async fn handle_message(&self, message: EnvelopeProcessor) {
2660 let ty = message.variant();
2661 let feature_weights = self.feature_weights(&message);
2662
2663 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2664 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2665
2666 match message {
2667 EnvelopeProcessor::ProcessEnvelope(m) => {
2668 self.handle_process_envelope(&mut cogs, *m).await
2669 }
2670 EnvelopeProcessor::ProcessProjectMetrics(m) => {
2671 self.handle_process_metrics(&mut cogs, *m)
2672 }
2673 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2674 self.handle_process_batched_metrics(&mut cogs, *m)
2675 }
2676 EnvelopeProcessor::FlushBuckets(m) => {
2677 self.handle_flush_buckets(&mut cogs, *m).await
2678 }
2679 EnvelopeProcessor::SubmitClientReports(m) => {
2680 self.handle_submit_client_reports(&mut cogs, *m)
2681 }
2682 }
2683 });
2684 }
2685
2686 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2687 match message {
2688 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2690 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2691 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2692 EnvelopeProcessor::FlushBuckets(v) => v
2693 .buckets
2694 .values()
2695 .map(|s| {
2696 if self.inner.config.processing_enabled() {
2697 relay_metrics::cogs::ByCount(&s.buckets).into()
2700 } else {
2701 relay_metrics::cogs::BySize(&s.buckets).into()
2702 }
2703 })
2704 .fold(FeatureWeights::none(), FeatureWeights::merge),
2705 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2706 }
2707 }
2708}
2709
2710impl Service for EnvelopeProcessorService {
2711 type Interface = EnvelopeProcessor;
2712
2713 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2714 while let Some(message) = rx.recv().await {
2715 let service = self.clone();
2716 self.inner
2717 .pool
2718 .spawn_async(
2719 async move {
2720 service.handle_message(message).await;
2721 }
2722 .boxed(),
2723 )
2724 .await;
2725 }
2726 }
2727}
2728
2729struct EnforcementResult {
2734 event: Annotated<Event>,
2735 #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2736 rate_limits: RateLimits,
2737}
2738
2739impl EnforcementResult {
2740 pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
2742 Self { event, rate_limits }
2743 }
2744}
2745
2746#[derive(Clone)]
2747enum RateLimiter {
2748 Cached,
2749 #[cfg(feature = "processing")]
2750 Consistent(Arc<RedisRateLimiter>),
2751}
2752
2753impl RateLimiter {
2754 async fn enforce<Group>(
2755 &self,
2756 managed_envelope: &mut TypedEnvelope<Group>,
2757 event: Annotated<Event>,
2758 _extracted_metrics: &mut ProcessingExtractedMetrics,
2759 ctx: processing::Context<'_>,
2760 ) -> Result<EnforcementResult, ProcessingError> {
2761 if managed_envelope.envelope().is_empty() && event.value().is_none() {
2762 return Ok(EnforcementResult::new(event, RateLimits::default()));
2763 }
2764
2765 let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
2766 if quotas.is_empty() {
2767 return Ok(EnforcementResult::new(event, RateLimits::default()));
2768 }
2769
2770 let event_category = event_category(&event);
2771
2772 let this = self.clone();
2778 let mut envelope_limiter =
2779 EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
2780 let this = this.clone();
2781
2782 async move {
2783 match this {
2784 #[cfg(feature = "processing")]
2785 RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
2786 rate_limiter
2787 .is_rate_limited(quotas, item_scope, _quantity, false)
2788 .await?,
2789 ),
2790 _ => Ok::<_, ProcessingError>(
2791 ctx.rate_limits.check_with_quotas(quotas, item_scope),
2792 ),
2793 }
2794 }
2795 });
2796
2797 if let Some(category) = event_category {
2800 envelope_limiter.assume_event(category);
2801 }
2802
2803 let scoping = managed_envelope.scoping();
2804 let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
2805 envelope_limiter
2806 .compute(managed_envelope.envelope_mut(), &scoping)
2807 .await
2808 })?;
2809 let event_active = enforcement.is_event_active();
2810
2811 #[cfg(feature = "processing")]
2815 _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
2816 enforcement.apply_with_outcomes(managed_envelope);
2817
2818 if event_active {
2819 debug_assert!(managed_envelope.envelope().is_empty());
2820 return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
2821 }
2822
2823 Ok(EnforcementResult::new(event, rate_limits))
2824 }
2825
2826 fn name(&self) -> &'static str {
2827 match self {
2828 Self::Cached => "cached",
2829 #[cfg(feature = "processing")]
2830 Self::Consistent(_) => "consistent",
2831 }
2832 }
2833}
2834
2835pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2836 let envelope_body: Vec<u8> = match http_encoding {
2837 HttpEncoding::Identity => return Ok(body.clone()),
2838 HttpEncoding::Deflate => {
2839 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2840 encoder.write_all(body.as_ref())?;
2841 encoder.finish()?
2842 }
2843 HttpEncoding::Gzip => {
2844 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2845 encoder.write_all(body.as_ref())?;
2846 encoder.finish()?
2847 }
2848 HttpEncoding::Br => {
2849 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2851 encoder.write_all(body.as_ref())?;
2852 encoder.into_inner()
2853 }
2854 HttpEncoding::Zstd => {
2855 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2858 encoder.write_all(body.as_ref())?;
2859 encoder.finish()?
2860 }
2861 };
2862
2863 Ok(envelope_body.into())
2864}
2865
2866#[derive(Debug)]
2868pub struct SendEnvelope {
2869 pub envelope: TypedEnvelope<Processed>,
2870 pub body: Bytes,
2871 pub http_encoding: HttpEncoding,
2872 pub project_cache: ProjectCacheHandle,
2873}
2874
2875impl UpstreamRequest for SendEnvelope {
2876 fn method(&self) -> reqwest::Method {
2877 reqwest::Method::POST
2878 }
2879
2880 fn path(&self) -> Cow<'_, str> {
2881 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2882 }
2883
2884 fn route(&self) -> &'static str {
2885 "envelope"
2886 }
2887
2888 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2889 let envelope_body = self.body.clone();
2890 metric!(
2891 distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2892 );
2893
2894 let meta = &self.envelope.meta();
2895 let shard = self.envelope.partition_key().map(|p| p.to_string());
2896 builder
2897 .content_encoding(self.http_encoding)
2898 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2899 .header_opt("User-Agent", meta.user_agent())
2900 .header("X-Sentry-Auth", meta.auth_header())
2901 .header("X-Forwarded-For", meta.forwarded_for())
2902 .header("Content-Type", envelope::CONTENT_TYPE)
2903 .header_opt("X-Sentry-Relay-Shard", shard)
2904 .body(envelope_body);
2905
2906 Ok(())
2907 }
2908
2909 fn sign(&mut self) -> Option<Sign> {
2910 Some(Sign::Optional(SignatureType::RequestSign))
2911 }
2912
2913 fn respond(
2914 self: Box<Self>,
2915 result: Result<http::Response, UpstreamRequestError>,
2916 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2917 Box::pin(async move {
2918 let result = match result {
2919 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2920 Err(error) => Err(error),
2921 };
2922
2923 match result {
2924 Ok(()) => self.envelope.accept(),
2925 Err(error) if error.is_received() => {
2926 let scoping = self.envelope.scoping();
2927 self.envelope.accept();
2928
2929 if let UpstreamRequestError::RateLimited(limits) = error {
2930 self.project_cache
2931 .get(scoping.project_key)
2932 .rate_limits()
2933 .merge(limits.scope(&scoping));
2934 }
2935 }
2936 Err(error) => {
2937 let mut envelope = self.envelope;
2940 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2941 relay_log::error!(
2942 error = &error as &dyn Error,
2943 tags.project_key = %envelope.scoping().project_key,
2944 "error sending envelope"
2945 );
2946 }
2947 }
2948 })
2949 }
2950}
2951
2952#[derive(Debug)]
2959struct Partition<'a> {
2960 max_size: usize,
2961 remaining: usize,
2962 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2963 project_info: HashMap<ProjectKey, Scoping>,
2964}
2965
2966impl<'a> Partition<'a> {
2967 pub fn new(size: usize) -> Self {
2969 Self {
2970 max_size: size,
2971 remaining: size,
2972 views: HashMap::new(),
2973 project_info: HashMap::new(),
2974 }
2975 }
2976
2977 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
2988 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
2989
2990 if let Some(current) = current {
2991 self.remaining = self.remaining.saturating_sub(current.estimated_size());
2992 self.views
2993 .entry(scoping.project_key)
2994 .or_default()
2995 .push(current);
2996
2997 self.project_info
2998 .entry(scoping.project_key)
2999 .or_insert(scoping);
3000 }
3001
3002 next
3003 }
3004
3005 fn is_empty(&self) -> bool {
3007 self.views.is_empty()
3008 }
3009
3010 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3014 #[derive(serde::Serialize)]
3015 struct Wrapper<'a> {
3016 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3017 }
3018
3019 let buckets = &self.views;
3020 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3021
3022 let scopings = self.project_info.clone();
3023 self.project_info.clear();
3024
3025 self.views.clear();
3026 self.remaining = self.max_size;
3027
3028 (payload, scopings)
3029 }
3030}
3031
3032#[derive(Debug)]
3036struct SendMetricsRequest {
3037 partition_key: String,
3039 unencoded: Bytes,
3041 encoded: Bytes,
3043 project_info: HashMap<ProjectKey, Scoping>,
3047 http_encoding: HttpEncoding,
3049 metric_outcomes: MetricOutcomes,
3051}
3052
3053impl SendMetricsRequest {
3054 fn create_error_outcomes(self) {
3055 #[derive(serde::Deserialize)]
3056 struct Wrapper {
3057 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3058 }
3059
3060 let buckets = match serde_json::from_slice(&self.unencoded) {
3061 Ok(Wrapper { buckets }) => buckets,
3062 Err(err) => {
3063 relay_log::error!(
3064 error = &err as &dyn std::error::Error,
3065 "failed to parse buckets from failed transmission"
3066 );
3067 return;
3068 }
3069 };
3070
3071 for (key, buckets) in buckets {
3072 let Some(&scoping) = self.project_info.get(&key) else {
3073 relay_log::error!("missing scoping for project key");
3074 continue;
3075 };
3076
3077 self.metric_outcomes.track(
3078 scoping,
3079 &buckets,
3080 Outcome::Invalid(DiscardReason::Internal),
3081 );
3082 }
3083 }
3084}
3085
3086impl UpstreamRequest for SendMetricsRequest {
3087 fn set_relay_id(&self) -> bool {
3088 true
3089 }
3090
3091 fn sign(&mut self) -> Option<Sign> {
3092 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3093 }
3094
3095 fn method(&self) -> reqwest::Method {
3096 reqwest::Method::POST
3097 }
3098
3099 fn path(&self) -> Cow<'_, str> {
3100 "/api/0/relays/metrics/".into()
3101 }
3102
3103 fn route(&self) -> &'static str {
3104 "global_metrics"
3105 }
3106
3107 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3108 metric!(
3109 distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
3110 );
3111
3112 builder
3113 .content_encoding(self.http_encoding)
3114 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3115 .header(header::CONTENT_TYPE, b"application/json")
3116 .body(self.encoded.clone());
3117
3118 Ok(())
3119 }
3120
3121 fn respond(
3122 self: Box<Self>,
3123 result: Result<http::Response, UpstreamRequestError>,
3124 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3125 Box::pin(async {
3126 match result {
3127 Ok(mut response) => {
3128 response.consume().await.ok();
3129 }
3130 Err(error) => {
3131 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3132
3133 if error.is_received() {
3136 return;
3137 }
3138
3139 self.create_error_outcomes()
3140 }
3141 }
3142 })
3143 }
3144}
3145
3146#[derive(Copy, Clone, Debug)]
3148struct CombinedQuotas<'a> {
3149 global_quotas: &'a [Quota],
3150 project_quotas: &'a [Quota],
3151}
3152
3153impl<'a> CombinedQuotas<'a> {
3154 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3156 Self {
3157 global_quotas: &global_config.quotas,
3158 project_quotas,
3159 }
3160 }
3161
3162 pub fn is_empty(&self) -> bool {
3164 self.len() == 0
3165 }
3166
3167 pub fn len(&self) -> usize {
3169 self.global_quotas.len() + self.project_quotas.len()
3170 }
3171}
3172
3173impl<'a> IntoIterator for CombinedQuotas<'a> {
3174 type Item = &'a Quota;
3175 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3176
3177 fn into_iter(self) -> Self::IntoIter {
3178 self.global_quotas.iter().chain(self.project_quotas.iter())
3179 }
3180}
3181
3182#[cfg(test)]
3183mod tests {
3184 use std::collections::BTreeMap;
3185
3186 use insta::assert_debug_snapshot;
3187 use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3188 use relay_common::glob2::LazyGlob;
3189 use relay_dynamic_config::ProjectConfig;
3190 use relay_event_normalization::{
3191 MeasurementsConfig, NormalizationConfig, RedactionRule, TransactionNameConfig,
3192 TransactionNameRule,
3193 };
3194 use relay_event_schema::protocol::TransactionSource;
3195 use relay_pii::DataScrubbingConfig;
3196 use similar_asserts::assert_eq;
3197
3198 use crate::metrics_extraction::IntoMetric;
3199 use crate::metrics_extraction::transactions::types::{
3200 CommonTags, TransactionMeasurementTags, TransactionMetric,
3201 };
3202 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3203
3204 #[cfg(feature = "processing")]
3205 use {
3206 relay_metrics::BucketValue,
3207 relay_quotas::{QuotaScope, ReasonCode},
3208 relay_test::mock_service,
3209 };
3210
3211 use super::*;
3212
3213 #[cfg(feature = "processing")]
3214 fn mock_quota(id: &str) -> Quota {
3215 Quota {
3216 id: Some(id.into()),
3217 categories: [DataCategory::MetricBucket].into(),
3218 scope: QuotaScope::Organization,
3219 scope_id: None,
3220 limit: Some(0),
3221 window: None,
3222 reason_code: None,
3223 namespace: None,
3224 }
3225 }
3226
3227 #[cfg(feature = "processing")]
3228 #[test]
3229 fn test_dynamic_quotas() {
3230 let global_config = GlobalConfig {
3231 quotas: vec![mock_quota("foo"), mock_quota("bar")],
3232 ..Default::default()
3233 };
3234
3235 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3236
3237 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3238
3239 assert_eq!(dynamic_quotas.len(), 4);
3240 assert!(!dynamic_quotas.is_empty());
3241
3242 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3243 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3244 }
3245
3246 #[cfg(feature = "processing")]
3249 #[tokio::test]
3250 async fn test_ratelimit_per_batch() {
3251 use relay_base_schema::organization::OrganizationId;
3252 use relay_protocol::FiniteF64;
3253
3254 let rate_limited_org = Scoping {
3255 organization_id: OrganizationId::new(1),
3256 project_id: ProjectId::new(21),
3257 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3258 key_id: Some(17),
3259 };
3260
3261 let not_rate_limited_org = Scoping {
3262 organization_id: OrganizationId::new(2),
3263 project_id: ProjectId::new(21),
3264 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3265 key_id: Some(17),
3266 };
3267
3268 let message = {
3269 let project_info = {
3270 let quota = Quota {
3271 id: Some("testing".into()),
3272 categories: [DataCategory::MetricBucket].into(),
3273 scope: relay_quotas::QuotaScope::Organization,
3274 scope_id: Some(rate_limited_org.organization_id.to_string().into()),
3275 limit: Some(0),
3276 window: None,
3277 reason_code: Some(ReasonCode::new("test")),
3278 namespace: None,
3279 };
3280
3281 let mut config = ProjectConfig::default();
3282 config.quotas.push(quota);
3283
3284 Arc::new(ProjectInfo {
3285 config,
3286 ..Default::default()
3287 })
3288 };
3289
3290 let project_metrics = |scoping| ProjectBuckets {
3291 buckets: vec![Bucket {
3292 name: "d:transactions/bar".into(),
3293 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3294 timestamp: UnixTimestamp::now(),
3295 tags: Default::default(),
3296 width: 10,
3297 metadata: BucketMetadata::default(),
3298 }],
3299 rate_limits: Default::default(),
3300 project_info: project_info.clone(),
3301 scoping,
3302 };
3303
3304 let buckets = hashbrown::HashMap::from([
3305 (
3306 rate_limited_org.project_key,
3307 project_metrics(rate_limited_org),
3308 ),
3309 (
3310 not_rate_limited_org.project_key,
3311 project_metrics(not_rate_limited_org),
3312 ),
3313 ]);
3314
3315 FlushBuckets {
3316 partition_key: 0,
3317 buckets,
3318 }
3319 };
3320
3321 assert_eq!(message.buckets.keys().count(), 2);
3323
3324 let config = {
3325 let config_json = serde_json::json!({
3326 "processing": {
3327 "enabled": true,
3328 "kafka_config": [],
3329 "redis": {
3330 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3331 }
3332 }
3333 });
3334 Config::from_json_value(config_json).unwrap()
3335 };
3336
3337 let (store, handle) = {
3338 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3339 let org_id = match msg {
3340 Store::Metrics(x) => x.scoping.organization_id,
3341 _ => panic!("received envelope when expecting only metrics"),
3342 };
3343 org_ids.push(org_id);
3344 };
3345
3346 mock_service("store_forwarder", vec![], f)
3347 };
3348
3349 let processor = create_test_processor(config).await;
3350 assert!(processor.redis_rate_limiter_enabled());
3351
3352 processor.encode_metrics_processing(message, &store).await;
3353
3354 drop(store);
3355 let orgs_not_ratelimited = handle.await.unwrap();
3356
3357 assert_eq!(
3358 orgs_not_ratelimited,
3359 vec![not_rate_limited_org.organization_id]
3360 );
3361 }
3362
3363 #[tokio::test]
3364 async fn test_browser_version_extraction_with_pii_like_data() {
3365 let processor = create_test_processor(Default::default()).await;
3366 let outcome_aggregator = Addr::dummy();
3367 let event_id = EventId::new();
3368
3369 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3370 .parse()
3371 .unwrap();
3372
3373 let request_meta = RequestMeta::new(dsn);
3374 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3375
3376 envelope.add_item({
3377 let mut item = Item::new(ItemType::Event);
3378 item.set_payload(
3379 ContentType::Json,
3380 r#"
3381 {
3382 "request": {
3383 "headers": [
3384 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3385 ]
3386 }
3387 }
3388 "#,
3389 );
3390 item
3391 });
3392
3393 let mut datascrubbing_settings = DataScrubbingConfig::default();
3394 datascrubbing_settings.scrub_data = true;
3396 datascrubbing_settings.scrub_defaults = true;
3397 datascrubbing_settings.scrub_ip_addresses = true;
3398
3399 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3401
3402 let config = ProjectConfig {
3403 datascrubbing_settings,
3404 pii_config: Some(pii_config),
3405 ..Default::default()
3406 };
3407
3408 let project_info = ProjectInfo {
3409 config,
3410 ..Default::default()
3411 };
3412
3413 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3414 assert_eq!(envelopes.len(), 1);
3415
3416 let (group, envelope) = envelopes.pop().unwrap();
3417 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3418
3419 let message = ProcessEnvelopeGrouped {
3420 group,
3421 envelope,
3422 ctx: processing::Context {
3423 project_info: &project_info,
3424 ..processing::Context::for_test()
3425 },
3426 };
3427
3428 let Ok(Some(Submit::Envelope(mut new_envelope))) = processor.process(message).await else {
3429 panic!();
3430 };
3431 let new_envelope = new_envelope.envelope_mut();
3432
3433 let event_item = new_envelope.items().last().unwrap();
3434 let annotated_event: Annotated<Event> =
3435 Annotated::from_json_bytes(&event_item.payload()).unwrap();
3436 let event = annotated_event.into_value().unwrap();
3437 let headers = event
3438 .request
3439 .into_value()
3440 .unwrap()
3441 .headers
3442 .into_value()
3443 .unwrap();
3444
3445 assert_eq!(
3447 Some(
3448 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3449 ),
3450 headers.get_header("User-Agent")
3451 );
3452 let contexts = event.contexts.into_value().unwrap();
3454 let browser = contexts.0.get("browser").unwrap();
3455 assert_eq!(
3456 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3457 browser.to_json().unwrap()
3458 );
3459 }
3460
3461 #[tokio::test]
3462 #[cfg(feature = "processing")]
3463 async fn test_materialize_dsc() {
3464 use crate::services::projects::project::PublicKeyConfig;
3465
3466 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3467 .parse()
3468 .unwrap();
3469 let request_meta = RequestMeta::new(dsn);
3470 let mut envelope = Envelope::from_request(None, request_meta);
3471
3472 let dsc = r#"{
3473 "trace_id": "00000000-0000-0000-0000-000000000000",
3474 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3475 "sample_rate": "0.2"
3476 }"#;
3477 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3478
3479 let mut item = Item::new(ItemType::Event);
3480 item.set_payload(ContentType::Json, r#"{}"#);
3481 envelope.add_item(item);
3482
3483 let outcome_aggregator = Addr::dummy();
3484 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3485
3486 let mut project_info = ProjectInfo::default();
3487 project_info.public_keys.push(PublicKeyConfig {
3488 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3489 numeric_id: Some(1),
3490 });
3491
3492 let config = serde_json::json!({
3493 "processing": {
3494 "enabled": true,
3495 "kafka_config": [],
3496 }
3497 });
3498
3499 let message = ProcessEnvelopeGrouped {
3500 group: ProcessingGroup::Error,
3501 envelope: managed_envelope,
3502 ctx: processing::Context {
3503 config: &Config::from_json_value(config.clone()).unwrap(),
3504 project_info: &project_info,
3505 sampling_project_info: Some(&project_info),
3506 ..processing::Context::for_test()
3507 },
3508 };
3509
3510 let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3511 let Ok(Some(Submit::Envelope(envelope))) = processor.process(message).await else {
3512 panic!();
3513 };
3514 let event = envelope
3515 .envelope()
3516 .get_item_by(|item| item.ty() == &ItemType::Event)
3517 .unwrap();
3518
3519 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3520 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3521 Object(
3522 {
3523 "environment": ~,
3524 "public_key": String(
3525 "e12d836b15bb49d7bbf99e64295d995b",
3526 ),
3527 "release": ~,
3528 "replay_id": ~,
3529 "sample_rate": String(
3530 "0.2",
3531 ),
3532 "trace_id": String(
3533 "00000000000000000000000000000000",
3534 ),
3535 "transaction": ~,
3536 },
3537 )
3538 "###);
3539 }
3540
3541 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3542 let mut event = Annotated::<Event>::from_json(
3543 r#"
3544 {
3545 "type": "transaction",
3546 "transaction": "/foo/",
3547 "timestamp": 946684810.0,
3548 "start_timestamp": 946684800.0,
3549 "contexts": {
3550 "trace": {
3551 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3552 "span_id": "fa90fdead5f74053",
3553 "op": "http.server",
3554 "type": "trace"
3555 }
3556 },
3557 "transaction_info": {
3558 "source": "url"
3559 }
3560 }
3561 "#,
3562 )
3563 .unwrap();
3564 let e = event.value_mut().as_mut().unwrap();
3565 e.transaction.set_value(Some(transaction_name.into()));
3566
3567 e.transaction_info
3568 .value_mut()
3569 .as_mut()
3570 .unwrap()
3571 .source
3572 .set_value(Some(source));
3573
3574 relay_statsd::with_capturing_test_client(|| {
3575 utils::log_transaction_name_metrics(&mut event, |event| {
3576 let config = NormalizationConfig {
3577 transaction_name_config: TransactionNameConfig {
3578 rules: &[TransactionNameRule {
3579 pattern: LazyGlob::new("/foo/*/**".to_owned()),
3580 expiry: DateTime::<Utc>::MAX_UTC,
3581 redaction: RedactionRule::Replace {
3582 substitution: "*".to_owned(),
3583 },
3584 }],
3585 },
3586 ..Default::default()
3587 };
3588 relay_event_normalization::normalize_event(event, &config)
3589 });
3590 })
3591 }
3592
3593 #[test]
3594 fn test_log_transaction_metrics_none() {
3595 let captures = capture_test_event("/nothing", TransactionSource::Url);
3596 insta::assert_debug_snapshot!(captures, @r###"
3597 [
3598 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3599 ]
3600 "###);
3601 }
3602
3603 #[test]
3604 fn test_log_transaction_metrics_rule() {
3605 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3606 insta::assert_debug_snapshot!(captures, @r###"
3607 [
3608 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3609 ]
3610 "###);
3611 }
3612
3613 #[test]
3614 fn test_log_transaction_metrics_pattern() {
3615 let captures = capture_test_event("/something/12345", TransactionSource::Url);
3616 insta::assert_debug_snapshot!(captures, @r###"
3617 [
3618 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3619 ]
3620 "###);
3621 }
3622
3623 #[test]
3624 fn test_log_transaction_metrics_both() {
3625 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3626 insta::assert_debug_snapshot!(captures, @r###"
3627 [
3628 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3629 ]
3630 "###);
3631 }
3632
3633 #[test]
3634 fn test_log_transaction_metrics_no_match() {
3635 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3636 insta::assert_debug_snapshot!(captures, @r###"
3637 [
3638 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3639 ]
3640 "###);
3641 }
3642
3643 #[test]
3647 fn test_mri_overhead_constant() {
3648 let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
3649
3650 let derived_value = {
3651 let name = "foobar".to_owned();
3652 let value = 5.into(); let unit = MetricUnit::Duration(DurationUnit::default());
3654 let tags = TransactionMeasurementTags {
3655 measurement_rating: None,
3656 universal_tags: CommonTags(BTreeMap::new()),
3657 score_profile_version: None,
3658 };
3659
3660 let measurement = TransactionMetric::Measurement {
3661 name: name.clone(),
3662 value,
3663 unit,
3664 tags,
3665 };
3666
3667 let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
3668 metric.name.len() - unit.to_string().len() - name.len()
3669 };
3670 assert_eq!(
3671 hardcoded_value, derived_value,
3672 "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
3673 );
3674 }
3675
3676 #[tokio::test]
3677 async fn test_process_metrics_bucket_metadata() {
3678 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3679 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3680 let received_at = Utc::now();
3681 let config = Config::default();
3682
3683 let (aggregator, mut aggregator_rx) = Addr::custom();
3684 let processor = create_test_processor_with_addrs(
3685 config,
3686 Addrs {
3687 aggregator,
3688 ..Default::default()
3689 },
3690 )
3691 .await;
3692
3693 let mut item = Item::new(ItemType::Statsd);
3694 item.set_payload(
3695 ContentType::Text,
3696 "transactions/foo:3182887624:4267882815|s",
3697 );
3698 for (source, expected_received_at) in [
3699 (
3700 BucketSource::External,
3701 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3702 ),
3703 (BucketSource::Internal, None),
3704 ] {
3705 let message = ProcessMetrics {
3706 data: MetricData::Raw(vec![item.clone()]),
3707 project_key,
3708 source,
3709 received_at,
3710 sent_at: Some(Utc::now()),
3711 };
3712 processor.handle_process_metrics(&mut token, message);
3713
3714 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3715 let buckets = merge_buckets.buckets;
3716 assert_eq!(buckets.len(), 1);
3717 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3718 }
3719 }
3720
3721 #[tokio::test]
3722 async fn test_process_batched_metrics() {
3723 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3724 let received_at = Utc::now();
3725 let config = Config::default();
3726
3727 let (aggregator, mut aggregator_rx) = Addr::custom();
3728 let processor = create_test_processor_with_addrs(
3729 config,
3730 Addrs {
3731 aggregator,
3732 ..Default::default()
3733 },
3734 )
3735 .await;
3736
3737 let payload = r#"{
3738 "buckets": {
3739 "11111111111111111111111111111111": [
3740 {
3741 "timestamp": 1615889440,
3742 "width": 0,
3743 "name": "d:custom/endpoint.response_time@millisecond",
3744 "type": "d",
3745 "value": [
3746 68.0
3747 ],
3748 "tags": {
3749 "route": "user_index"
3750 }
3751 }
3752 ],
3753 "22222222222222222222222222222222": [
3754 {
3755 "timestamp": 1615889440,
3756 "width": 0,
3757 "name": "d:custom/endpoint.cache_rate@none",
3758 "type": "d",
3759 "value": [
3760 36.0
3761 ]
3762 }
3763 ]
3764 }
3765}
3766"#;
3767 let message = ProcessBatchedMetrics {
3768 payload: Bytes::from(payload),
3769 source: BucketSource::Internal,
3770 received_at,
3771 sent_at: Some(Utc::now()),
3772 };
3773 processor.handle_process_batched_metrics(&mut token, message);
3774
3775 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3776 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3777
3778 let mut messages = vec![mb1, mb2];
3779 messages.sort_by_key(|mb| mb.project_key);
3780
3781 let actual = messages
3782 .into_iter()
3783 .map(|mb| (mb.project_key, mb.buckets))
3784 .collect::<Vec<_>>();
3785
3786 assert_debug_snapshot!(actual, @r###"
3787 [
3788 (
3789 ProjectKey("11111111111111111111111111111111"),
3790 [
3791 Bucket {
3792 timestamp: UnixTimestamp(1615889440),
3793 width: 0,
3794 name: MetricName(
3795 "d:custom/endpoint.response_time@millisecond",
3796 ),
3797 value: Distribution(
3798 [
3799 68.0,
3800 ],
3801 ),
3802 tags: {
3803 "route": "user_index",
3804 },
3805 metadata: BucketMetadata {
3806 merges: 1,
3807 received_at: None,
3808 extracted_from_indexed: false,
3809 },
3810 },
3811 ],
3812 ),
3813 (
3814 ProjectKey("22222222222222222222222222222222"),
3815 [
3816 Bucket {
3817 timestamp: UnixTimestamp(1615889440),
3818 width: 0,
3819 name: MetricName(
3820 "d:custom/endpoint.cache_rate@none",
3821 ),
3822 value: Distribution(
3823 [
3824 36.0,
3825 ],
3826 ),
3827 tags: {},
3828 metadata: BucketMetadata {
3829 merges: 1,
3830 received_at: None,
3831 extracted_from_indexed: false,
3832 },
3833 },
3834 ],
3835 ),
3836 ]
3837 "###);
3838 }
3839}