1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, RelayMode};
23use relay_dynamic_config::{Feature, GlobalConfig};
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{ClientReport, Event, EventId, NetworkReportError, SpanV2};
27use relay_filter::FilterStatKey;
28use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
29use relay_protocol::Annotated;
30use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
31use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision};
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, NoResponse, Service};
34use reqwest::header;
35use smallvec::{SmallVec, smallvec};
36use zstd::stream::Encoder as ZstdEncoder;
37
38use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType};
39use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
40use crate::integrations::Integration;
41use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
42use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
43use crate::metrics_extraction::ExtractedMetrics;
44use crate::processing::attachments::AttachmentProcessor;
45use crate::processing::check_ins::CheckInsProcessor;
46use crate::processing::client_reports::ClientReportsProcessor;
47use crate::processing::errors::{ErrorsProcessor, SwitchProcessingError};
48use crate::processing::logs::LogsProcessor;
49use crate::processing::profile_chunks::ProfileChunksProcessor;
50use crate::processing::profiles::ProfilesProcessor;
51use crate::processing::replays::ReplaysProcessor;
52use crate::processing::sessions::SessionsProcessor;
53use crate::processing::spans::SpansProcessor;
54use crate::processing::trace_attachments::TraceAttachmentsProcessor;
55use crate::processing::trace_metrics::TraceMetricsProcessor;
56use crate::processing::transactions::TransactionProcessor;
57use crate::processing::user_reports::UserReportsProcessor;
58use crate::processing::utils::event::event_category;
59use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
60use crate::service::ServiceError;
61use crate::services::global_config::GlobalConfigHandle;
62use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
63use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
64use crate::services::projects::cache::ProjectCacheHandle;
65use crate::services::projects::project::{ProjectInfo, ProjectState};
66use crate::services::upstream::{
67 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
68};
69use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
70use crate::utils::{self, CheckLimits, EnvelopeLimiter};
71use crate::{http, processing};
72use relay_threading::AsyncPool;
73#[cfg(feature = "processing")]
74use {
75 crate::services::objectstore::Objectstore,
76 crate::services::store::Store,
77 crate::utils::Enforcement,
78 itertools::Itertools,
79 relay_quotas::{RateLimitingError, RedisRateLimiter},
80 relay_redis::RedisClients,
81 std::time::Instant,
82 symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
83};
84
85pub mod event;
86mod metrics;
87mod nel;
88#[cfg(feature = "processing")]
89mod span;
90
91#[cfg(all(sentry, feature = "processing"))]
92pub mod playstation;
93
94macro_rules! if_processing {
98 ($config:expr, $if_true:block) => {
99 #[cfg(feature = "processing")] {
100 if $config.processing_enabled() $if_true
101 }
102 };
103 ($config:expr, $if_true:block else $if_false:block) => {
104 {
105 #[cfg(feature = "processing")] {
106 if $config.processing_enabled() $if_true else $if_false
107 }
108 #[cfg(not(feature = "processing"))] {
109 $if_false
110 }
111 }
112 };
113}
114
115pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
117
118#[derive(Debug)]
119pub struct GroupTypeError;
120
121impl Display for GroupTypeError {
122 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123 f.write_str("failed to convert processing group into corresponding type")
124 }
125}
126
127impl std::error::Error for GroupTypeError {}
128
129macro_rules! processing_group {
130 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
131 #[derive(Clone, Copy, Debug)]
132 pub struct $ty;
133
134 impl From<$ty> for ProcessingGroup {
135 fn from(_: $ty) -> Self {
136 ProcessingGroup::$variant
137 }
138 }
139
140 impl TryFrom<ProcessingGroup> for $ty {
141 type Error = GroupTypeError;
142
143 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
144 if matches!(value, ProcessingGroup::$variant) {
145 return Ok($ty);
146 }
147 $($(
148 if matches!(value, ProcessingGroup::$other) {
149 return Ok($ty);
150 }
151 )+)?
152 return Err(GroupTypeError);
153 }
154 }
155 };
156}
157
158processing_group!(TransactionGroup, Transaction);
159
160processing_group!(ErrorGroup, Error);
161
162processing_group!(SessionGroup, Session);
163processing_group!(ClientReportGroup, ClientReport);
164processing_group!(ReplayGroup, Replay);
165processing_group!(CheckInGroup, CheckIn);
166processing_group!(LogGroup, Log, Nel);
167processing_group!(TraceMetricGroup, TraceMetric);
168processing_group!(SpanGroup, Span);
169
170processing_group!(ProfileChunkGroup, ProfileChunk);
171processing_group!(MetricsGroup, Metrics);
172processing_group!(ForwardUnknownGroup, ForwardUnknown);
173processing_group!(Ungrouped, Ungrouped);
174
175#[derive(Clone, Copy, Debug)]
179pub struct Processed;
180
181#[derive(Clone, Copy, Debug)]
183pub enum ProcessingGroup {
184 Transaction,
188 Error,
193 Session,
195 StandaloneAttachments,
199 StandaloneUserReports,
203 StandaloneProfiles,
207 ClientReport,
209 Replay,
211 CheckIn,
213 Nel,
215 Log,
217 TraceMetric,
219 Span,
221 SpanV2,
223 Metrics,
225 ProfileChunk,
227 TraceAttachment,
229 ForwardUnknown,
232 Ungrouped,
234}
235
236impl ProcessingGroup {
237 fn split_envelope(
239 mut envelope: Envelope,
240 project_info: &ProjectInfo,
241 ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
242 let headers = envelope.headers().clone();
243 let mut grouped_envelopes = smallvec![];
244
245 let replay_items = envelope.take_items_by(|item| {
247 matches!(
248 item.ty(),
249 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
250 )
251 });
252 if !replay_items.is_empty() {
253 grouped_envelopes.push((
254 ProcessingGroup::Replay,
255 Envelope::from_parts(headers.clone(), replay_items),
256 ))
257 }
258
259 let session_items = envelope
261 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
262 if !session_items.is_empty() {
263 grouped_envelopes.push((
264 ProcessingGroup::Session,
265 Envelope::from_parts(headers.clone(), session_items),
266 ))
267 }
268
269 let span_v2_items = envelope.take_items_by(|item| {
270 let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
271
272 ItemContainer::<SpanV2>::is_container(item)
273 || matches!(item.integration(), Some(Integration::Spans(_)))
274 || (exp_feature && matches!(item.ty(), &ItemType::Span))
276 || (exp_feature && item.is_span_attachment())
277 });
278
279 if !span_v2_items.is_empty() {
280 grouped_envelopes.push((
281 ProcessingGroup::SpanV2,
282 Envelope::from_parts(headers.clone(), span_v2_items),
283 ))
284 }
285
286 let span_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Span));
288 if !span_items.is_empty() {
289 grouped_envelopes.push((
290 ProcessingGroup::Span,
291 Envelope::from_parts(headers.clone(), span_items),
292 ))
293 }
294
295 let logs_items = envelope.take_items_by(|item| {
297 matches!(item.ty(), &ItemType::Log)
298 || matches!(item.integration(), Some(Integration::Logs(_)))
299 });
300 if !logs_items.is_empty() {
301 grouped_envelopes.push((
302 ProcessingGroup::Log,
303 Envelope::from_parts(headers.clone(), logs_items),
304 ))
305 }
306
307 let trace_metric_items =
309 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
310 if !trace_metric_items.is_empty() {
311 grouped_envelopes.push((
312 ProcessingGroup::TraceMetric,
313 Envelope::from_parts(headers.clone(), trace_metric_items),
314 ))
315 }
316
317 let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
319 if !nel_items.is_empty() {
320 grouped_envelopes.push((
321 ProcessingGroup::Nel,
322 Envelope::from_parts(headers.clone(), nel_items),
323 ))
324 }
325
326 let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
331 if !metric_items.is_empty() {
332 grouped_envelopes.push((
333 ProcessingGroup::Metrics,
334 Envelope::from_parts(headers.clone(), metric_items),
335 ))
336 }
337
338 let profile_chunk_items =
340 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
341 if !profile_chunk_items.is_empty() {
342 grouped_envelopes.push((
343 ProcessingGroup::ProfileChunk,
344 Envelope::from_parts(headers.clone(), profile_chunk_items),
345 ))
346 }
347
348 let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
349 if !trace_attachment_items.is_empty() {
350 grouped_envelopes.push((
351 ProcessingGroup::TraceAttachment,
352 Envelope::from_parts(headers.clone(), trace_attachment_items),
353 ))
354 }
355
356 if !envelope.items().any(Item::creates_event) {
357 let standalone_attachments = envelope
359 .take_items_by(|i| i.requires_event() && matches!(i.ty(), ItemType::Attachment));
360 if !standalone_attachments.is_empty() {
361 grouped_envelopes.push((
362 ProcessingGroup::StandaloneAttachments,
363 Envelope::from_parts(headers.clone(), standalone_attachments),
364 ))
365 }
366
367 let standalone_user_reports =
369 envelope.take_items_by(|i| matches!(i.ty(), ItemType::UserReport));
370 if !standalone_user_reports.is_empty() {
371 grouped_envelopes.push((
372 ProcessingGroup::StandaloneUserReports,
373 Envelope::from_parts(headers.clone(), standalone_user_reports),
374 ));
375 }
376
377 let standalone_profiles =
379 envelope.take_items_by(|i| matches!(i.ty(), ItemType::Profile));
380 if !standalone_profiles.is_empty() {
381 grouped_envelopes.push((
382 ProcessingGroup::StandaloneProfiles,
383 Envelope::from_parts(headers.clone(), standalone_profiles),
384 ));
385 }
386 }
387
388 let security_reports_items = envelope
390 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
391 .into_iter()
392 .map(|item| {
393 let headers = headers.clone();
394 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
395 let mut envelope = Envelope::from_parts(headers, items);
396 envelope.set_event_id(EventId::new());
397 (ProcessingGroup::Error, envelope)
398 });
399 grouped_envelopes.extend(security_reports_items);
400
401 let require_event_items = envelope.take_items_by(Item::requires_event);
403 if !require_event_items.is_empty() {
404 let group = if require_event_items
405 .iter()
406 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
407 {
408 ProcessingGroup::Transaction
409 } else {
410 ProcessingGroup::Error
411 };
412
413 grouped_envelopes.push((
414 group,
415 Envelope::from_parts(headers.clone(), require_event_items),
416 ))
417 }
418
419 let envelopes = envelope.items_mut().map(|item| {
421 let headers = headers.clone();
422 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
423 let envelope = Envelope::from_parts(headers, items);
424 let item_type = item.ty();
425 let group = if matches!(item_type, &ItemType::CheckIn) {
426 ProcessingGroup::CheckIn
427 } else if matches!(item.ty(), &ItemType::ClientReport) {
428 ProcessingGroup::ClientReport
429 } else if matches!(item_type, &ItemType::Unknown(_)) {
430 ProcessingGroup::ForwardUnknown
431 } else {
432 ProcessingGroup::Ungrouped
434 };
435
436 (group, envelope)
437 });
438 grouped_envelopes.extend(envelopes);
439
440 grouped_envelopes
441 }
442
443 pub fn variant(&self) -> &'static str {
445 match self {
446 ProcessingGroup::Transaction => "transaction",
447 ProcessingGroup::Error => "error",
448 ProcessingGroup::Session => "session",
449 ProcessingGroup::StandaloneAttachments => "standalone_attachment",
450 ProcessingGroup::StandaloneUserReports => "standalone_user_reports",
451 ProcessingGroup::StandaloneProfiles => "standalone_profiles",
452 ProcessingGroup::ClientReport => "client_report",
453 ProcessingGroup::Replay => "replay",
454 ProcessingGroup::CheckIn => "check_in",
455 ProcessingGroup::Log => "log",
456 ProcessingGroup::TraceMetric => "trace_metric",
457 ProcessingGroup::Nel => "nel",
458 ProcessingGroup::Span => "span",
459 ProcessingGroup::SpanV2 => "span_v2",
460 ProcessingGroup::Metrics => "metrics",
461 ProcessingGroup::ProfileChunk => "profile_chunk",
462 ProcessingGroup::TraceAttachment => "trace_attachment",
463 ProcessingGroup::ForwardUnknown => "forward_unknown",
464 ProcessingGroup::Ungrouped => "ungrouped",
465 }
466 }
467}
468
469impl From<ProcessingGroup> for AppFeature {
470 fn from(value: ProcessingGroup) -> Self {
471 match value {
472 ProcessingGroup::Transaction => AppFeature::Transactions,
473 ProcessingGroup::Error => AppFeature::Errors,
474 ProcessingGroup::Session => AppFeature::Sessions,
475 ProcessingGroup::StandaloneAttachments => AppFeature::UnattributedEnvelope,
476 ProcessingGroup::StandaloneUserReports => AppFeature::UserReports,
477 ProcessingGroup::StandaloneProfiles => AppFeature::Profiles,
478 ProcessingGroup::ClientReport => AppFeature::ClientReports,
479 ProcessingGroup::Replay => AppFeature::Replays,
480 ProcessingGroup::CheckIn => AppFeature::CheckIns,
481 ProcessingGroup::Log => AppFeature::Logs,
482 ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
483 ProcessingGroup::Nel => AppFeature::Logs,
484 ProcessingGroup::Span => AppFeature::Spans,
485 ProcessingGroup::SpanV2 => AppFeature::Spans,
486 ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
487 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
488 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
489 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
490 ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
491 }
492 }
493}
494
495#[derive(Debug, thiserror::Error)]
497pub enum ProcessingError {
498 #[error("invalid json in event")]
499 InvalidJson(#[source] serde_json::Error),
500
501 #[error("invalid message pack event payload")]
502 InvalidMsgpack(#[from] rmp_serde::decode::Error),
503
504 #[cfg(feature = "processing")]
505 #[error("invalid unreal crash report")]
506 InvalidUnrealReport(#[source] Unreal4Error),
507
508 #[error("event payload too large")]
509 PayloadTooLarge(DiscardItemType),
510
511 #[error("invalid transaction event")]
512 InvalidTransaction,
513
514 #[error("the item is not allowed/supported in this envelope")]
515 UnsupportedItem,
516
517 #[error("envelope processor failed")]
518 ProcessingFailed(#[from] ProcessingAction),
519
520 #[error("duplicate {0} in event")]
521 DuplicateItem(ItemType),
522
523 #[error("failed to extract event payload")]
524 NoEventPayload,
525
526 #[error("missing project id in DSN")]
527 MissingProjectId,
528
529 #[error("invalid security report type: {0:?}")]
530 InvalidSecurityType(Bytes),
531
532 #[error("unsupported security report type")]
533 UnsupportedSecurityType,
534
535 #[error("invalid security report")]
536 InvalidSecurityReport(#[source] serde_json::Error),
537
538 #[error("invalid nel report")]
539 InvalidNelReport(#[source] NetworkReportError),
540
541 #[error("event filtered with reason: {0:?}")]
542 EventFiltered(FilterStatKey),
543
544 #[error("could not serialize event payload")]
545 SerializeFailed(#[source] serde_json::Error),
546
547 #[cfg(feature = "processing")]
548 #[error("failed to apply quotas")]
549 QuotasFailed(#[from] RateLimitingError),
550
551 #[error("invalid processing group type")]
552 InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
553
554 #[error("nintendo switch dying message processing failed {0:?}")]
555 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
556
557 #[cfg(all(sentry, feature = "processing"))]
558 #[error("playstation dump processing failed: {0}")]
559 InvalidPlaystationDump(String),
560
561 #[error("processing group does not match specific processor")]
562 ProcessingGroupMismatch,
563 #[error("new processing pipeline failed")]
564 ProcessingFailure,
565
566 #[cfg(feature = "processing")]
567 #[error("invalid attachment reference")]
568 InvalidAttachmentRef,
569
570 #[error("could not determine processing group for envelope items")]
571 NoProcessingGroup,
572}
573
574impl ProcessingError {
575 pub fn to_outcome(&self) -> Option<Outcome> {
576 match self {
577 Self::PayloadTooLarge(payload_type) => {
578 Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
579 }
580 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
581 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
582 Self::InvalidSecurityType(_) => {
583 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
584 }
585 Self::UnsupportedItem => Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)),
586 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
587 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
588 Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
589 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
590 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
591 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
592 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
593 #[cfg(all(sentry, feature = "processing"))]
594 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
595 #[cfg(feature = "processing")]
596 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
597 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
598 }
599 #[cfg(feature = "processing")]
600 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
601 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
602 Some(Outcome::Invalid(DiscardReason::Internal))
603 }
604 #[cfg(feature = "processing")]
605 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
606 Self::MissingProjectId => None,
607 Self::EventFiltered(key) => Some(Outcome::Filtered(key.clone())),
608 Self::InvalidProcessingGroup(_) => None,
609
610 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
611 Self::ProcessingFailure => None,
613 #[cfg(feature = "processing")]
614 Self::InvalidAttachmentRef => {
615 Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
616 }
617 Self::NoProcessingGroup => Some(Outcome::Invalid(DiscardReason::Internal)),
618 }
619 }
620
621 fn is_unexpected(&self) -> bool {
622 self.to_outcome()
623 .is_some_and(|outcome| outcome.is_unexpected())
624 }
625}
626
627#[cfg(feature = "processing")]
628impl From<Unreal4Error> for ProcessingError {
629 fn from(err: Unreal4Error) -> Self {
630 match err.kind() {
631 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
632 _ => ProcessingError::InvalidUnrealReport(err),
633 }
634 }
635}
636
637impl From<InvalidProcessingGroupType> for ProcessingError {
638 fn from(value: InvalidProcessingGroupType) -> Self {
639 Self::InvalidProcessingGroup(Box::new(value))
640 }
641}
642
643type ExtractedEvent = (Annotated<Event>, usize);
644
645#[derive(Debug)]
650pub struct ProcessingExtractedMetrics {
651 metrics: ExtractedMetrics,
652}
653
654impl ProcessingExtractedMetrics {
655 pub fn new() -> Self {
656 Self {
657 metrics: ExtractedMetrics::default(),
658 }
659 }
660
661 pub fn into_inner(self) -> ExtractedMetrics {
662 self.metrics
663 }
664
665 pub fn extend(
667 &mut self,
668 extracted: ExtractedMetrics,
669 sampling_decision: Option<SamplingDecision>,
670 ) {
671 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
672 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
673 }
674
675 pub fn extend_project_metrics<I>(
677 &mut self,
678 buckets: I,
679 sampling_decision: Option<SamplingDecision>,
680 ) where
681 I: IntoIterator<Item = Bucket>,
682 {
683 self.metrics
684 .project_metrics
685 .extend(buckets.into_iter().map(|mut bucket| {
686 bucket.metadata.extracted_from_indexed =
687 sampling_decision == Some(SamplingDecision::Keep);
688 bucket
689 }));
690 }
691
692 pub fn extend_sampling_metrics<I>(
694 &mut self,
695 buckets: I,
696 sampling_decision: Option<SamplingDecision>,
697 ) where
698 I: IntoIterator<Item = Bucket>,
699 {
700 self.metrics
701 .sampling_metrics
702 .extend(buckets.into_iter().map(|mut bucket| {
703 bucket.metadata.extracted_from_indexed =
704 sampling_decision == Some(SamplingDecision::Keep);
705 bucket
706 }));
707 }
708
709 #[cfg(feature = "processing")]
714 fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
715 let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
717 let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
720
721 for (namespace, limit, indexed) in [(
722 MetricNamespace::Spans,
723 &enforcement.spans,
724 &enforcement.spans_indexed,
725 )] {
726 if limit.is_active() {
727 drop_namespaces.push(namespace);
728 } else if indexed.is_active() && !enforced_consistently {
729 reset_extracted_from_indexed.push(namespace);
734 }
735 }
736
737 if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
738 self.retain_mut(|bucket| {
739 let Some(namespace) = bucket.name.try_namespace() else {
740 return true;
741 };
742
743 if drop_namespaces.contains(&namespace) {
744 return false;
745 }
746
747 if reset_extracted_from_indexed.contains(&namespace) {
748 bucket.metadata.extracted_from_indexed = false;
749 }
750
751 true
752 });
753 }
754 }
755
756 #[cfg(feature = "processing")]
757 fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
758 self.metrics.project_metrics.retain_mut(&mut f);
759 self.metrics.sampling_metrics.retain_mut(&mut f);
760 }
761}
762
763fn send_metrics(
764 metrics: ExtractedMetrics,
765 project_key: ProjectKey,
766 sampling_key: Option<ProjectKey>,
767 aggregator: &Addr<Aggregator>,
768) {
769 let ExtractedMetrics {
770 project_metrics,
771 sampling_metrics,
772 } = metrics;
773
774 if !project_metrics.is_empty() {
775 aggregator.send(MergeBuckets {
776 project_key,
777 buckets: project_metrics,
778 });
779 }
780
781 if !sampling_metrics.is_empty() {
782 let sampling_project_key = sampling_key.unwrap_or(project_key);
789 aggregator.send(MergeBuckets {
790 project_key: sampling_project_key,
791 buckets: sampling_metrics,
792 });
793 }
794}
795
796#[derive(Debug)]
799#[expect(
800 clippy::large_enum_variant,
801 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
802)]
803enum ProcessingResult {
804 Envelope {
805 managed_envelope: TypedEnvelope<Processed>,
806 extracted_metrics: ProcessingExtractedMetrics,
807 },
808 Output(Output<Outputs>),
809}
810
811impl ProcessingResult {
812 fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
814 Self::Envelope {
815 managed_envelope,
816 extracted_metrics: ProcessingExtractedMetrics::new(),
817 }
818 }
819}
820
821#[derive(Debug)]
823#[expect(
824 clippy::large_enum_variant,
825 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
826)]
827enum Submit<'a> {
828 Envelope(TypedEnvelope<Processed>),
830 Output {
832 output: Outputs,
833 ctx: processing::ForwardContext<'a>,
834 },
835}
836
837#[derive(Debug)]
847pub struct ProcessEnvelope {
848 pub envelope: ManagedEnvelope,
850 pub project_info: Arc<ProjectInfo>,
852 pub rate_limits: Arc<RateLimits>,
854 pub sampling_project_info: Option<Arc<ProjectInfo>>,
856 pub reservoir_counters: ReservoirCounters,
858}
859
860#[derive(Debug)]
862struct ProcessEnvelopeGrouped<'a> {
863 pub group: ProcessingGroup,
865 pub envelope: ManagedEnvelope,
867 pub ctx: processing::Context<'a>,
869}
870
871#[derive(Debug)]
883pub struct ProcessMetrics {
884 pub data: MetricData,
886 pub project_key: ProjectKey,
888 pub source: BucketSource,
890 pub received_at: DateTime<Utc>,
892 pub sent_at: Option<DateTime<Utc>>,
895}
896
897#[derive(Debug)]
899pub enum MetricData {
900 Raw(Vec<Item>),
902 Parsed(Vec<Bucket>),
904}
905
906impl MetricData {
907 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
912 let items = match self {
913 Self::Parsed(buckets) => return buckets,
914 Self::Raw(items) => items,
915 };
916
917 let mut buckets = Vec::new();
918 for item in items {
919 let payload = item.payload();
920 if item.ty() == &ItemType::Statsd {
921 for bucket_result in Bucket::parse_all(&payload, timestamp) {
922 match bucket_result {
923 Ok(bucket) => buckets.push(bucket),
924 Err(error) => relay_log::debug!(
925 error = &error as &dyn Error,
926 "failed to parse metric bucket from statsd format",
927 ),
928 }
929 }
930 } else if item.ty() == &ItemType::MetricBuckets {
931 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
932 Ok(parsed_buckets) => {
933 if buckets.is_empty() {
935 buckets = parsed_buckets;
936 } else {
937 buckets.extend(parsed_buckets);
938 }
939 }
940 Err(error) => {
941 relay_log::debug!(
942 error = &error as &dyn Error,
943 "failed to parse metric bucket",
944 );
945 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
946 }
947 }
948 } else {
949 relay_log::error!(
950 "invalid item of type {} passed to ProcessMetrics",
951 item.ty()
952 );
953 }
954 }
955 buckets
956 }
957}
958
959#[derive(Debug)]
960pub struct ProcessBatchedMetrics {
961 pub payload: Bytes,
963 pub source: BucketSource,
965 pub received_at: DateTime<Utc>,
967 pub sent_at: Option<DateTime<Utc>>,
969}
970
971#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
973pub enum BucketSource {
974 Internal,
980 External,
985}
986
987impl BucketSource {
988 pub fn from_meta(meta: &RequestMeta) -> Self {
990 match meta.request_trust() {
991 RequestTrust::Trusted => Self::Internal,
992 RequestTrust::Untrusted => Self::External,
993 }
994 }
995}
996
997#[derive(Debug)]
999pub struct SubmitClientReports {
1000 pub client_reports: Vec<ClientReport>,
1002 pub scoping: Scoping,
1004}
1005
1006#[derive(Debug)]
1008pub enum EnvelopeProcessor {
1009 ProcessEnvelope(Box<ProcessEnvelope>),
1010 ProcessProjectMetrics(Box<ProcessMetrics>),
1011 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1012 FlushBuckets(Box<FlushBuckets>),
1013 SubmitClientReports(Box<SubmitClientReports>),
1014}
1015
1016impl EnvelopeProcessor {
1017 pub fn variant(&self) -> &'static str {
1019 match self {
1020 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1021 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1022 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1023 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1024 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1025 }
1026 }
1027}
1028
1029impl relay_system::Interface for EnvelopeProcessor {}
1030
1031impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1032 type Response = relay_system::NoResponse;
1033
1034 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1035 Self::ProcessEnvelope(Box::new(message))
1036 }
1037}
1038
1039impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1040 type Response = NoResponse;
1041
1042 fn from_message(message: ProcessMetrics, _: ()) -> Self {
1043 Self::ProcessProjectMetrics(Box::new(message))
1044 }
1045}
1046
1047impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1048 type Response = NoResponse;
1049
1050 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1051 Self::ProcessBatchedMetrics(Box::new(message))
1052 }
1053}
1054
1055impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1056 type Response = NoResponse;
1057
1058 fn from_message(message: FlushBuckets, _: ()) -> Self {
1059 Self::FlushBuckets(Box::new(message))
1060 }
1061}
1062
1063impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1064 type Response = NoResponse;
1065
1066 fn from_message(message: SubmitClientReports, _: ()) -> Self {
1067 Self::SubmitClientReports(Box::new(message))
1068 }
1069}
1070
1071pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1073
1074#[derive(Clone)]
1078pub struct EnvelopeProcessorService {
1079 inner: Arc<InnerProcessor>,
1080}
1081
1082pub struct Addrs {
1084 pub outcome_aggregator: Addr<TrackOutcome>,
1085 pub upstream_relay: Addr<UpstreamRelay>,
1086 #[cfg(feature = "processing")]
1087 pub objectstore: Option<Addr<Objectstore>>,
1088 #[cfg(feature = "processing")]
1089 pub store_forwarder: Option<Addr<Store>>,
1090 pub aggregator: Addr<Aggregator>,
1091}
1092
1093impl Default for Addrs {
1094 fn default() -> Self {
1095 Addrs {
1096 outcome_aggregator: Addr::dummy(),
1097 upstream_relay: Addr::dummy(),
1098 #[cfg(feature = "processing")]
1099 objectstore: None,
1100 #[cfg(feature = "processing")]
1101 store_forwarder: None,
1102 aggregator: Addr::dummy(),
1103 }
1104 }
1105}
1106
1107struct InnerProcessor {
1108 pool: EnvelopeProcessorServicePool,
1109 config: Arc<Config>,
1110 global_config: GlobalConfigHandle,
1111 project_cache: ProjectCacheHandle,
1112 cogs: Cogs,
1113 addrs: Addrs,
1114 #[cfg(feature = "processing")]
1115 rate_limiter: Option<Arc<RedisRateLimiter>>,
1116 #[cfg(feature = "processing")]
1117 geoip_lookup: GeoIpLookup,
1118 metric_outcomes: MetricOutcomes,
1119 processing: Processing,
1120}
1121
1122struct Processing {
1123 errors: ErrorsProcessor,
1124 logs: LogsProcessor,
1125 trace_metrics: TraceMetricsProcessor,
1126 spans: SpansProcessor,
1127 check_ins: CheckInsProcessor,
1128 sessions: SessionsProcessor,
1129 transactions: TransactionProcessor,
1130 profile_chunks: ProfileChunksProcessor,
1131 trace_attachments: TraceAttachmentsProcessor,
1132 replays: ReplaysProcessor,
1133 client_reports: ClientReportsProcessor,
1134 attachments: AttachmentProcessor,
1135 user_reports: UserReportsProcessor,
1136 profiles: ProfilesProcessor,
1137}
1138
1139impl EnvelopeProcessorService {
1140 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1142 pub fn new(
1143 pool: EnvelopeProcessorServicePool,
1144 config: Arc<Config>,
1145 global_config: GlobalConfigHandle,
1146 project_cache: ProjectCacheHandle,
1147 cogs: Cogs,
1148 #[cfg(feature = "processing")] redis: Option<RedisClients>,
1149 addrs: Addrs,
1150 metric_outcomes: MetricOutcomes,
1151 ) -> Self {
1152 let geoip_lookup = config
1153 .geoip_path()
1154 .and_then(
1155 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1156 Ok(geoip) => Some(geoip),
1157 Err(err) => {
1158 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1159 None
1160 }
1161 },
1162 )
1163 .unwrap_or_else(GeoIpLookup::empty);
1164
1165 #[cfg(feature = "processing")]
1166 let rate_limiter = redis.as_ref().map(|redis| {
1167 RedisRateLimiter::new(redis.quotas.clone())
1168 .max_limit(config.max_rate_limit())
1169 .cache(config.quota_cache_ratio(), config.quota_cache_max())
1170 });
1171
1172 let quota_limiter = Arc::new(QuotaRateLimiter::new(
1173 #[cfg(feature = "processing")]
1174 project_cache.clone(),
1175 #[cfg(feature = "processing")]
1176 rate_limiter.clone(),
1177 ));
1178 #[cfg(feature = "processing")]
1179 let rate_limiter = rate_limiter.map(Arc::new);
1180 let outcome_aggregator = addrs.outcome_aggregator.clone();
1181 let inner = InnerProcessor {
1182 pool,
1183 global_config,
1184 project_cache,
1185 cogs,
1186 #[cfg(feature = "processing")]
1187 rate_limiter,
1188 addrs,
1189 metric_outcomes,
1190 #[cfg(feature = "processing")]
1191 geoip_lookup: geoip_lookup.clone(),
1192 processing: Processing {
1193 errors: ErrorsProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1194 logs: LogsProcessor::new(Arc::clone("a_limiter)),
1195 trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)),
1196 spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1197 check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)),
1198 sessions: SessionsProcessor::new(Arc::clone("a_limiter)),
1199 transactions: TransactionProcessor::new(
1200 Arc::clone("a_limiter),
1201 geoip_lookup.clone(),
1202 #[cfg(feature = "processing")]
1203 redis.map(|r| r.quotas),
1204 #[cfg(not(feature = "processing"))]
1205 None,
1206 ),
1207 profile_chunks: ProfileChunksProcessor::new(Arc::clone("a_limiter)),
1208 trace_attachments: TraceAttachmentsProcessor::new(Arc::clone("a_limiter)),
1209 replays: ReplaysProcessor::new(Arc::clone("a_limiter), geoip_lookup),
1210 client_reports: ClientReportsProcessor::new(outcome_aggregator),
1211 attachments: AttachmentProcessor::new(Arc::clone("a_limiter)),
1212 user_reports: UserReportsProcessor::new(Arc::clone("a_limiter)),
1213 profiles: ProfilesProcessor::new(quota_limiter),
1214 },
1215 config,
1216 };
1217
1218 Self {
1219 inner: Arc::new(inner),
1220 }
1221 }
1222
1223 async fn enforce_quotas<Group>(
1224 &self,
1225 managed_envelope: &mut TypedEnvelope<Group>,
1226 event: Annotated<Event>,
1227 extracted_metrics: &mut ProcessingExtractedMetrics,
1228 ctx: processing::Context<'_>,
1229 ) -> Result<Annotated<Event>, ProcessingError> {
1230 let cached_result = RateLimiter::Cached
1233 .enforce(managed_envelope, event, extracted_metrics, ctx)
1234 .await?;
1235
1236 if_processing!(self.inner.config, {
1237 let rate_limiter = match self.inner.rate_limiter.clone() {
1238 Some(rate_limiter) => rate_limiter,
1239 None => return Ok(cached_result.event),
1240 };
1241
1242 let consistent_result = RateLimiter::Consistent(rate_limiter)
1244 .enforce(
1245 managed_envelope,
1246 cached_result.event,
1247 extracted_metrics,
1248 ctx
1249 )
1250 .await?;
1251
1252 if !consistent_result.rate_limits.is_empty() {
1254 self.inner
1255 .project_cache
1256 .get(managed_envelope.scoping().project_key)
1257 .rate_limits()
1258 .merge(consistent_result.rate_limits);
1259 }
1260
1261 Ok(consistent_result.event)
1262 } else { Ok(cached_result.event) })
1263 }
1264
1265 async fn process_nel(
1266 &self,
1267 mut managed_envelope: ManagedEnvelope,
1268 ctx: processing::Context<'_>,
1269 ) -> Result<ProcessingResult, ProcessingError> {
1270 nel::convert_to_logs(&mut managed_envelope);
1271 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1272 .await
1273 }
1274
1275 async fn process_with_processor<P: processing::Processor>(
1276 &self,
1277 processor: &P,
1278 mut managed_envelope: ManagedEnvelope,
1279 ctx: processing::Context<'_>,
1280 ) -> Result<ProcessingResult, ProcessingError>
1281 where
1282 Outputs: From<P::Output>,
1283 {
1284 let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1285 debug_assert!(
1286 false,
1287 "there must be work for the {} processor",
1288 std::any::type_name::<P>(),
1289 );
1290 return Err(ProcessingError::ProcessingGroupMismatch);
1291 };
1292
1293 managed_envelope.update();
1294 match managed_envelope.envelope().is_empty() {
1295 true => managed_envelope.accept(),
1296 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1297 }
1298
1299 processor
1300 .process(work, ctx)
1301 .await
1302 .map_err(|err| {
1303 relay_log::debug!(
1304 error = &err as &dyn std::error::Error,
1305 "processing pipeline failed"
1306 );
1307 ProcessingError::ProcessingFailure
1308 })
1309 .map(|o| o.map(Into::into))
1310 .map(ProcessingResult::Output)
1311 }
1312
1313 async fn process_standalone_spans(
1317 &self,
1318 managed_envelope: &mut TypedEnvelope<SpanGroup>,
1319 _project_id: ProjectId,
1320 ctx: processing::Context<'_>,
1321 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1322 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1323
1324 if_processing!(self.inner.config, {
1325 span::process(
1326 managed_envelope,
1327 &mut Annotated::empty(),
1328 &mut extracted_metrics,
1329 _project_id,
1330 ctx,
1331 &self.inner.geoip_lookup,
1332 )
1333 .await;
1334 });
1335
1336 self.enforce_quotas(
1337 managed_envelope,
1338 Annotated::empty(),
1339 &mut extracted_metrics,
1340 ctx,
1341 )
1342 .await?;
1343
1344 Ok(Some(extracted_metrics))
1345 }
1346
1347 async fn process_envelope(
1348 &self,
1349 project_id: ProjectId,
1350 message: ProcessEnvelopeGrouped<'_>,
1351 ) -> Result<ProcessingResult, ProcessingError> {
1352 let ProcessEnvelopeGrouped {
1353 group,
1354 envelope: mut managed_envelope,
1355 ctx,
1356 } = message;
1357
1358 if let Some(sampling_state) = ctx.sampling_project_info {
1360 managed_envelope
1363 .envelope_mut()
1364 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1365 }
1366
1367 if let Some(retention) = ctx.project_info.config.event_retention {
1370 managed_envelope.envelope_mut().set_retention(retention);
1371 }
1372
1373 if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1376 managed_envelope
1377 .envelope_mut()
1378 .set_downsampled_retention(retention);
1379 }
1380
1381 managed_envelope
1386 .envelope_mut()
1387 .meta_mut()
1388 .set_project_id(project_id);
1389
1390 macro_rules! run {
1391 ($fn_name:ident $(, $args:expr)*) => {
1392 async {
1393 let mut managed_envelope = (managed_envelope, group).try_into()?;
1394 match self.$fn_name(&mut managed_envelope, $($args),*).await {
1395 Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
1396 managed_envelope: managed_envelope.into_processed(),
1397 extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
1398 }),
1399 Err(error) => {
1400 relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
1401 if let Some(outcome) = error.to_outcome() {
1402 managed_envelope.reject(outcome);
1403 }
1404
1405 return Err(error);
1406 }
1407 }
1408 }.await
1409 };
1410 }
1411
1412 relay_log::trace!("Processing {group} group", group = group.variant());
1413
1414 match group {
1415 ProcessingGroup::Error => {
1416 self.process_with_processor(&self.inner.processing.errors, managed_envelope, ctx)
1417 .await
1418 }
1419 ProcessingGroup::Transaction => {
1420 self.process_with_processor(
1421 &self.inner.processing.transactions,
1422 managed_envelope,
1423 ctx,
1424 )
1425 .await
1426 }
1427 ProcessingGroup::Session => {
1428 self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1429 .await
1430 }
1431 ProcessingGroup::StandaloneAttachments => {
1432 self.process_with_processor(
1433 &self.inner.processing.attachments,
1434 managed_envelope,
1435 ctx,
1436 )
1437 .await
1438 }
1439 ProcessingGroup::StandaloneUserReports => {
1440 self.process_with_processor(
1441 &self.inner.processing.user_reports,
1442 managed_envelope,
1443 ctx,
1444 )
1445 .await
1446 }
1447 ProcessingGroup::StandaloneProfiles => {
1448 self.process_with_processor(&self.inner.processing.profiles, managed_envelope, ctx)
1449 .await
1450 }
1451 ProcessingGroup::ClientReport => {
1452 self.process_with_processor(
1453 &self.inner.processing.client_reports,
1454 managed_envelope,
1455 ctx,
1456 )
1457 .await
1458 }
1459 ProcessingGroup::Replay => {
1460 self.process_with_processor(&self.inner.processing.replays, managed_envelope, ctx)
1461 .await
1462 }
1463 ProcessingGroup::CheckIn => {
1464 self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1465 .await
1466 }
1467 ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
1468 ProcessingGroup::Log => {
1469 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1470 .await
1471 }
1472 ProcessingGroup::TraceMetric => {
1473 self.process_with_processor(
1474 &self.inner.processing.trace_metrics,
1475 managed_envelope,
1476 ctx,
1477 )
1478 .await
1479 }
1480 ProcessingGroup::SpanV2 => {
1481 self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1482 .await
1483 }
1484 ProcessingGroup::TraceAttachment => {
1485 self.process_with_processor(
1486 &self.inner.processing.trace_attachments,
1487 managed_envelope,
1488 ctx,
1489 )
1490 .await
1491 }
1492 ProcessingGroup::Span => run!(process_standalone_spans, project_id, ctx),
1493 ProcessingGroup::ProfileChunk => {
1494 self.process_with_processor(
1495 &self.inner.processing.profile_chunks,
1496 managed_envelope,
1497 ctx,
1498 )
1499 .await
1500 }
1501 ProcessingGroup::Metrics => {
1503 if self.inner.config.relay_mode() != RelayMode::Proxy {
1506 relay_log::error!(
1507 tags.project = %project_id,
1508 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1509 "received metrics in the process_state"
1510 );
1511 }
1512
1513 Ok(ProcessingResult::no_metrics(
1514 managed_envelope.into_processed(),
1515 ))
1516 }
1517 ProcessingGroup::Ungrouped => {
1519 relay_log::error!(
1520 tags.project = %project_id,
1521 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1522 "could not identify the processing group based on the envelope's items"
1523 );
1524
1525 Err(ProcessingError::NoProcessingGroup)
1526 }
1527 ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
1531 managed_envelope.into_processed(),
1532 )),
1533 }
1534 }
1535
1536 async fn process<'a>(
1542 &self,
1543 mut message: ProcessEnvelopeGrouped<'a>,
1544 ) -> Result<Option<Submit<'a>>, ProcessingError> {
1545 let ProcessEnvelopeGrouped {
1546 ref mut envelope,
1547 ctx,
1548 ..
1549 } = message;
1550
1551 let Some(project_id) = ctx
1558 .project_info
1559 .project_id
1560 .or_else(|| envelope.envelope().meta().project_id())
1561 else {
1562 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1563 return Err(ProcessingError::MissingProjectId);
1564 };
1565
1566 let client = envelope.envelope().meta().client().map(str::to_owned);
1567 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1568 let project_key = envelope.envelope().meta().public_key();
1569 let sampling_key = envelope
1573 .envelope()
1574 .sampling_key()
1575 .filter(|_| ctx.sampling_project_info.is_some());
1576
1577 relay_log::configure_scope(|scope| {
1580 scope.set_tag("project", project_id);
1581 if let Some(client) = client {
1582 scope.set_tag("sdk", client);
1583 }
1584 if let Some(user_agent) = user_agent {
1585 scope.set_extra("user_agent", user_agent.into());
1586 }
1587 });
1588
1589 let result = match self.process_envelope(project_id, message).await {
1590 Ok(ProcessingResult::Envelope {
1591 mut managed_envelope,
1592 extracted_metrics,
1593 }) => {
1594 managed_envelope.update();
1597
1598 let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
1599 send_metrics(
1600 extracted_metrics.metrics,
1601 project_key,
1602 sampling_key,
1603 &self.inner.addrs.aggregator,
1604 );
1605
1606 let envelope_response = if managed_envelope.envelope().is_empty() {
1607 if !has_metrics {
1608 managed_envelope.reject(Outcome::RateLimited(None));
1610 } else {
1611 managed_envelope.accept();
1612 }
1613
1614 None
1615 } else {
1616 Some(managed_envelope)
1617 };
1618
1619 Ok(envelope_response.map(Submit::Envelope))
1620 }
1621 Ok(ProcessingResult::Output(Output { main, metrics })) => {
1622 if let Some(metrics) = metrics {
1623 metrics.accept(|metrics| {
1624 send_metrics(
1625 metrics,
1626 project_key,
1627 sampling_key,
1628 &self.inner.addrs.aggregator,
1629 );
1630 });
1631 }
1632
1633 let ctx = ctx.to_forward();
1634 Ok(main.map(|output| Submit::Output { output, ctx }))
1635 }
1636 Err(err) => Err(err),
1637 };
1638
1639 relay_log::configure_scope(|scope| {
1640 scope.remove_tag("project");
1641 scope.remove_tag("sdk");
1642 scope.remove_tag("user_agent");
1643 });
1644
1645 result
1646 }
1647
1648 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1649 let project_key = message.envelope.envelope().meta().public_key();
1650 let wait_time = message.envelope.age();
1651 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1652
1653 cogs.cancel();
1656
1657 let scoping = message.envelope.scoping();
1658 for (group, envelope) in ProcessingGroup::split_envelope(
1659 *message.envelope.into_envelope(),
1660 &message.project_info,
1661 ) {
1662 let mut cogs = self
1663 .inner
1664 .cogs
1665 .timed(ResourceId::Relay, AppFeature::from(group));
1666
1667 let mut envelope =
1668 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1669 envelope.scope(scoping);
1670
1671 let global_config = self.inner.global_config.current();
1672
1673 let ctx = processing::Context {
1674 config: &self.inner.config,
1675 global_config: &global_config,
1676 project_info: &message.project_info,
1677 sampling_project_info: message.sampling_project_info.as_deref(),
1678 rate_limits: &message.rate_limits,
1679 reservoir_counters: &message.reservoir_counters,
1680 };
1681
1682 let message = ProcessEnvelopeGrouped {
1683 group,
1684 envelope,
1685 ctx,
1686 };
1687
1688 let result = metric!(
1689 timer(RelayTimers::EnvelopeProcessingTime),
1690 group = group.variant(),
1691 { self.process(message).await }
1692 );
1693
1694 match result {
1695 Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
1696 Ok(None) => {}
1697 Err(error) if error.is_unexpected() => {
1698 relay_log::error!(
1699 tags.project_key = %project_key,
1700 error = &error as &dyn Error,
1701 "error processing envelope"
1702 )
1703 }
1704 Err(error) => {
1705 relay_log::debug!(
1706 tags.project_key = %project_key,
1707 error = &error as &dyn Error,
1708 "error processing envelope"
1709 )
1710 }
1711 }
1712 }
1713 }
1714
1715 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1716 let ProcessMetrics {
1717 data,
1718 project_key,
1719 received_at,
1720 sent_at,
1721 source,
1722 } = message;
1723
1724 let received_timestamp =
1725 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1726
1727 let mut buckets = data.into_buckets(received_timestamp);
1728 if buckets.is_empty() {
1729 return;
1730 };
1731 cogs.update(relay_metrics::cogs::BySize(&buckets));
1732
1733 let clock_drift_processor =
1734 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
1735
1736 buckets.retain_mut(|bucket| {
1737 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
1738 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
1739 return false;
1740 }
1741
1742 if !self::metrics::is_valid_namespace(bucket) {
1743 return false;
1744 }
1745
1746 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
1747
1748 if !matches!(source, BucketSource::Internal) {
1749 bucket.metadata = BucketMetadata::new(received_timestamp);
1750 }
1751
1752 true
1753 });
1754
1755 let project = self.inner.project_cache.get(project_key);
1756
1757 let buckets = match project.state() {
1760 ProjectState::Enabled(project_info) => {
1761 let rate_limits = project.rate_limits().current_limits();
1762 self.check_buckets(project_key, project_info, &rate_limits, buckets)
1763 }
1764 _ => buckets,
1765 };
1766
1767 relay_log::trace!("merging metric buckets into the aggregator");
1768 self.inner
1769 .addrs
1770 .aggregator
1771 .send(MergeBuckets::new(project_key, buckets));
1772 }
1773
1774 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
1775 let ProcessBatchedMetrics {
1776 payload,
1777 source,
1778 received_at,
1779 sent_at,
1780 } = message;
1781
1782 #[derive(serde::Deserialize)]
1783 struct Wrapper {
1784 buckets: HashMap<ProjectKey, Vec<Bucket>>,
1785 }
1786
1787 let buckets = match serde_json::from_slice(&payload) {
1788 Ok(Wrapper { buckets }) => buckets,
1789 Err(error) => {
1790 relay_log::debug!(
1791 error = &error as &dyn Error,
1792 "failed to parse batched metrics",
1793 );
1794 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1795 return;
1796 }
1797 };
1798
1799 for (project_key, buckets) in buckets {
1800 self.handle_process_metrics(
1801 cogs,
1802 ProcessMetrics {
1803 data: MetricData::Parsed(buckets),
1804 project_key,
1805 source,
1806 received_at,
1807 sent_at,
1808 },
1809 )
1810 }
1811 }
1812
1813 fn submit_upstream(&self, cogs: &mut Token, submit: Submit<'_>) {
1814 let _submit = cogs.start_category("submit");
1815
1816 #[cfg(feature = "processing")]
1817 if self.inner.config.processing_enabled()
1818 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
1819 {
1820 use crate::processing::StoreHandle;
1821
1822 let objectstore = self.inner.addrs.objectstore.as_ref();
1823 let global_config = &self.inner.global_config.current();
1824 let handle = StoreHandle::new(store_forwarder, objectstore, global_config);
1825
1826 match submit {
1827 Submit::Envelope(envelope) => handle.send_envelope(envelope.into_inner()),
1833 Submit::Output { output, ctx } => output
1834 .forward_store(handle, ctx)
1835 .unwrap_or_else(|err| err.into_inner()),
1836 }
1837 return;
1838 }
1839
1840 let mut envelope = match submit {
1841 Submit::Envelope(envelope) => envelope,
1842 Submit::Output { output, ctx } => match output.serialize_envelope(ctx) {
1843 Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
1844 Err(_) => {
1845 relay_log::error!("failed to serialize output to an envelope");
1846 return;
1847 }
1848 },
1849 };
1850
1851 if envelope.envelope_mut().is_empty() {
1852 envelope.accept();
1853 return;
1854 }
1855
1856 envelope.envelope_mut().set_sent_at(Utc::now());
1862
1863 relay_log::trace!("sending envelope to sentry endpoint");
1864 let http_encoding = self.inner.config.http_encoding();
1865 let result = envelope.envelope().to_vec().and_then(|v| {
1866 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
1867 });
1868
1869 match result {
1870 Ok(body) => {
1871 self.inner
1872 .addrs
1873 .upstream_relay
1874 .send(SendRequest(SendEnvelope {
1875 envelope,
1876 body,
1877 http_encoding,
1878 project_cache: self.inner.project_cache.clone(),
1879 }));
1880 }
1881 Err(error) => {
1882 relay_log::error!(
1885 error = &error as &dyn Error,
1886 tags.project_key = %envelope.scoping().project_key,
1887 "failed to serialize envelope payload"
1888 );
1889
1890 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1891 }
1892 }
1893 }
1894
1895 fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
1896 let SubmitClientReports {
1897 client_reports,
1898 scoping,
1899 } = message;
1900
1901 let upstream = self.inner.config.upstream_descriptor();
1902 let dsn = PartialDsn::outbound(&scoping, upstream);
1903
1904 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
1905 for client_report in client_reports {
1906 match client_report.serialize() {
1907 Ok(payload) => {
1908 let mut item = Item::new(ItemType::ClientReport);
1909 item.set_payload(ContentType::Json, payload);
1910 envelope.add_item(item);
1911 }
1912 Err(error) => {
1913 relay_log::error!(
1914 error = &error as &dyn std::error::Error,
1915 "failed to serialize client report"
1916 );
1917 }
1918 }
1919 }
1920
1921 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1922 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
1923 }
1924
1925 fn check_buckets(
1926 &self,
1927 project_key: ProjectKey,
1928 project_info: &ProjectInfo,
1929 rate_limits: &RateLimits,
1930 buckets: Vec<Bucket>,
1931 ) -> Vec<Bucket> {
1932 let Some(scoping) = project_info.scoping(project_key) else {
1933 relay_log::error!(
1934 tags.project_key = project_key.as_str(),
1935 "there is no scoping: dropping {} buckets",
1936 buckets.len(),
1937 );
1938 return Vec::new();
1939 };
1940
1941 let mut buckets = self::metrics::apply_project_info(
1942 buckets,
1943 &self.inner.metric_outcomes,
1944 project_info,
1945 scoping,
1946 );
1947
1948 let namespaces: BTreeSet<MetricNamespace> = buckets
1949 .iter()
1950 .filter_map(|bucket| bucket.name.try_namespace())
1951 .collect();
1952
1953 for namespace in namespaces {
1954 let limits = rate_limits.check_with_quotas(
1955 project_info.get_quotas(),
1956 scoping.item(DataCategory::MetricBucket),
1957 );
1958
1959 if limits.is_limited() {
1960 let rejected;
1961 (buckets, rejected) = utils::split_off(buckets, |bucket| {
1962 bucket.name.try_namespace() == Some(namespace)
1963 });
1964
1965 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1966 self.inner.metric_outcomes.track(
1967 scoping,
1968 &rejected,
1969 Outcome::RateLimited(reason_code),
1970 );
1971 }
1972 }
1973
1974 let quotas = project_info.config.quotas.clone();
1975 match MetricsLimiter::create(buckets, quotas, scoping) {
1976 Ok(mut bucket_limiter) => {
1977 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
1978 bucket_limiter.into_buckets()
1979 }
1980 Err(buckets) => buckets,
1981 }
1982 }
1983
1984 #[cfg(feature = "processing")]
1985 async fn rate_limit_buckets(
1986 &self,
1987 scoping: Scoping,
1988 project_info: &ProjectInfo,
1989 mut buckets: Vec<Bucket>,
1990 ) -> Vec<Bucket> {
1991 let Some(rate_limiter) = &self.inner.rate_limiter else {
1992 return buckets;
1993 };
1994
1995 let global_config = self.inner.global_config.current();
1996 let namespaces = buckets
1997 .iter()
1998 .filter_map(|bucket| bucket.name.try_namespace())
1999 .counts();
2000
2001 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2002
2003 for (namespace, quantity) in namespaces {
2004 let item_scoping = scoping.metric_bucket(namespace);
2005
2006 let limits = match rate_limiter
2007 .is_rate_limited(quotas, item_scoping, quantity, false)
2008 .await
2009 {
2010 Ok(limits) => limits,
2011 Err(err) => {
2012 relay_log::error!(
2013 error = &err as &dyn std::error::Error,
2014 "failed to check redis rate limits"
2015 );
2016 break;
2017 }
2018 };
2019
2020 if limits.is_limited() {
2021 let rejected;
2022 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2023 bucket.name.try_namespace() == Some(namespace)
2024 });
2025
2026 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2027 self.inner.metric_outcomes.track(
2028 scoping,
2029 &rejected,
2030 Outcome::RateLimited(reason_code),
2031 );
2032
2033 self.inner
2034 .project_cache
2035 .get(item_scoping.scoping.project_key)
2036 .rate_limits()
2037 .merge(limits);
2038 }
2039 }
2040
2041 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2042 Err(buckets) => buckets,
2043 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2044 }
2045 }
2046
2047 #[cfg(feature = "processing")]
2049 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2050 relay_log::trace!("handle_rate_limit_buckets");
2051
2052 let scoping = *bucket_limiter.scoping();
2053
2054 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2055 let global_config = self.inner.global_config.current();
2056 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2057
2058 let over_accept_once = true;
2061 let mut rate_limits = RateLimits::new();
2062
2063 let (category, count) = bucket_limiter.count();
2064
2065 let timer = Instant::now();
2066 let mut is_limited = false;
2067
2068 if let Some(count) = count {
2069 match rate_limiter
2070 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2071 .await
2072 {
2073 Ok(limits) => {
2074 is_limited = limits.is_limited();
2075 rate_limits.merge(limits)
2076 }
2077 Err(e) => {
2078 relay_log::error!(error = &e as &dyn Error, "rate limiting error")
2079 }
2080 }
2081 }
2082
2083 relay_statsd::metric!(
2084 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2085 category = category.name(),
2086 limited = if is_limited { "true" } else { "false" },
2087 count = match count {
2088 None => "none",
2089 Some(0) => "0",
2090 Some(1) => "1",
2091 Some(1..=10) => "10",
2092 Some(1..=25) => "25",
2093 Some(1..=50) => "50",
2094 Some(51..=100) => "100",
2095 Some(101..=500) => "500",
2096 _ => "> 500",
2097 },
2098 );
2099
2100 if rate_limits.is_limited() {
2101 let was_enforced =
2102 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2103
2104 if was_enforced {
2105 self.inner
2107 .project_cache
2108 .get(scoping.project_key)
2109 .rate_limits()
2110 .merge(rate_limits);
2111 }
2112 }
2113 }
2114
2115 bucket_limiter.into_buckets()
2116 }
2117
2118 #[cfg(feature = "processing")]
2124 async fn encode_metrics_processing(
2125 &self,
2126 message: FlushBuckets,
2127 store_forwarder: &Addr<Store>,
2128 ) {
2129 use crate::constants::DEFAULT_EVENT_RETENTION;
2130 use crate::services::store::StoreMetrics;
2131
2132 for ProjectBuckets {
2133 buckets,
2134 scoping,
2135 project_info,
2136 ..
2137 } in message.buckets.into_values()
2138 {
2139 let buckets = self
2140 .rate_limit_buckets(scoping, &project_info, buckets)
2141 .await;
2142
2143 if buckets.is_empty() {
2144 continue;
2145 }
2146
2147 let retention = project_info
2148 .config
2149 .event_retention
2150 .unwrap_or(DEFAULT_EVENT_RETENTION);
2151
2152 store_forwarder.send(StoreMetrics {
2155 buckets,
2156 scoping,
2157 retention,
2158 });
2159 }
2160 }
2161
2162 fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
2173 let FlushBuckets {
2174 partition_key,
2175 buckets,
2176 } = message;
2177
2178 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2179 let upstream = self.inner.config.upstream_descriptor();
2180
2181 for ProjectBuckets {
2182 buckets, scoping, ..
2183 } in buckets.values()
2184 {
2185 let dsn = PartialDsn::outbound(scoping, upstream);
2186
2187 relay_statsd::metric!(
2188 distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
2189 );
2190
2191 let mut num_batches = 0;
2192 for batch in BucketsView::from(buckets).by_size(batch_size) {
2193 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
2194
2195 let mut item = Item::new(ItemType::MetricBuckets);
2196 item.set_source_quantities(crate::metrics::extract_quantities(batch));
2197 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
2198 envelope.add_item(item);
2199
2200 let mut envelope =
2201 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2202 envelope
2203 .set_partition_key(Some(partition_key))
2204 .scope(*scoping);
2205
2206 relay_statsd::metric!(
2207 distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
2208 );
2209
2210 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2211 num_batches += 1;
2212 }
2213
2214 relay_statsd::metric!(
2215 distribution(RelayDistributions::BatchesPerPartition) = num_batches
2216 );
2217 }
2218 }
2219
2220 fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
2222 if partition.is_empty() {
2223 return;
2224 }
2225
2226 let (unencoded, project_info) = partition.take();
2227 let http_encoding = self.inner.config.http_encoding();
2228 let encoded = match encode_payload(&unencoded, http_encoding) {
2229 Ok(payload) => payload,
2230 Err(error) => {
2231 let error = &error as &dyn std::error::Error;
2232 relay_log::error!(error, "failed to encode metrics payload");
2233 return;
2234 }
2235 };
2236
2237 let request = SendMetricsRequest {
2238 partition_key: partition_key.to_string(),
2239 unencoded,
2240 encoded,
2241 project_info,
2242 http_encoding,
2243 metric_outcomes: self.inner.metric_outcomes.clone(),
2244 };
2245
2246 self.inner.addrs.upstream_relay.send(SendRequest(request));
2247 }
2248
2249 fn encode_metrics_global(&self, message: FlushBuckets) {
2260 let FlushBuckets {
2261 partition_key,
2262 buckets,
2263 } = message;
2264
2265 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
2266 let mut partition = Partition::new(batch_size);
2267 let mut partition_splits = 0;
2268
2269 for ProjectBuckets {
2270 buckets, scoping, ..
2271 } in buckets.values()
2272 {
2273 for bucket in buckets {
2274 let mut remaining = Some(BucketView::new(bucket));
2275
2276 while let Some(bucket) = remaining.take() {
2277 if let Some(next) = partition.insert(bucket, *scoping) {
2278 self.send_global_partition(partition_key, &mut partition);
2282 remaining = Some(next);
2283 partition_splits += 1;
2284 }
2285 }
2286 }
2287 }
2288
2289 if partition_splits > 0 {
2290 metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2291 }
2292
2293 self.send_global_partition(partition_key, &mut partition);
2294 }
2295
2296 async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
2297 for (project_key, pb) in message.buckets.iter_mut() {
2298 let buckets = std::mem::take(&mut pb.buckets);
2299 pb.buckets =
2300 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2301 }
2302
2303 #[cfg(feature = "processing")]
2304 if self.inner.config.processing_enabled()
2305 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2306 {
2307 return self
2308 .encode_metrics_processing(message, store_forwarder)
2309 .await;
2310 }
2311
2312 if self.inner.config.http_global_metrics() {
2313 self.encode_metrics_global(message)
2314 } else {
2315 self.encode_metrics_envelope(cogs, message)
2316 }
2317 }
2318
2319 #[cfg(all(test, feature = "processing"))]
2320 fn redis_rate_limiter_enabled(&self) -> bool {
2321 self.inner.rate_limiter.is_some()
2322 }
2323
2324 async fn handle_message(&self, message: EnvelopeProcessor) {
2325 let ty = message.variant();
2326 let feature_weights = self.feature_weights(&message);
2327
2328 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2329 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2330
2331 match message {
2332 EnvelopeProcessor::ProcessEnvelope(m) => {
2333 self.handle_process_envelope(&mut cogs, *m).await
2334 }
2335 EnvelopeProcessor::ProcessProjectMetrics(m) => {
2336 self.handle_process_metrics(&mut cogs, *m)
2337 }
2338 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2339 self.handle_process_batched_metrics(&mut cogs, *m)
2340 }
2341 EnvelopeProcessor::FlushBuckets(m) => {
2342 self.handle_flush_buckets(&mut cogs, *m).await
2343 }
2344 EnvelopeProcessor::SubmitClientReports(m) => {
2345 self.handle_submit_client_reports(&mut cogs, *m)
2346 }
2347 }
2348 });
2349 }
2350
2351 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2352 match message {
2353 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2355 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2356 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2357 EnvelopeProcessor::FlushBuckets(v) => v
2358 .buckets
2359 .values()
2360 .map(|s| {
2361 if self.inner.config.processing_enabled() {
2362 relay_metrics::cogs::ByCount(&s.buckets).into()
2365 } else {
2366 relay_metrics::cogs::BySize(&s.buckets).into()
2367 }
2368 })
2369 .fold(FeatureWeights::none(), FeatureWeights::merge),
2370 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2371 }
2372 }
2373}
2374
2375impl Service for EnvelopeProcessorService {
2376 type Interface = EnvelopeProcessor;
2377
2378 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2379 while let Some(message) = rx.recv().await {
2380 let service = self.clone();
2381 self.inner
2382 .pool
2383 .spawn_async(
2384 async move {
2385 service.handle_message(message).await;
2386 }
2387 .boxed(),
2388 )
2389 .await;
2390 }
2391 }
2392}
2393
2394struct EnforcementResult {
2399 event: Annotated<Event>,
2400 #[cfg_attr(not(feature = "processing"), expect(dead_code))]
2401 rate_limits: RateLimits,
2402}
2403
2404impl EnforcementResult {
2405 pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
2407 Self { event, rate_limits }
2408 }
2409}
2410
2411#[derive(Clone)]
2412enum RateLimiter {
2413 Cached,
2414 #[cfg(feature = "processing")]
2415 Consistent(Arc<RedisRateLimiter>),
2416}
2417
2418impl RateLimiter {
2419 async fn enforce<Group>(
2420 &self,
2421 managed_envelope: &mut TypedEnvelope<Group>,
2422 event: Annotated<Event>,
2423 _extracted_metrics: &mut ProcessingExtractedMetrics,
2424 ctx: processing::Context<'_>,
2425 ) -> Result<EnforcementResult, ProcessingError> {
2426 if managed_envelope.envelope().is_empty() && event.value().is_none() {
2427 return Ok(EnforcementResult::new(event, RateLimits::default()));
2428 }
2429
2430 let quotas = CombinedQuotas::new(ctx.global_config, ctx.project_info.get_quotas());
2431 if quotas.is_empty() {
2432 return Ok(EnforcementResult::new(event, RateLimits::default()));
2433 }
2434
2435 let event_category = event_category(&event);
2436
2437 let this = self.clone();
2443 let mut envelope_limiter =
2444 EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
2445 let this = this.clone();
2446
2447 async move {
2448 match this {
2449 #[cfg(feature = "processing")]
2450 RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
2451 rate_limiter
2452 .is_rate_limited(quotas, item_scope, _quantity, false)
2453 .await?,
2454 ),
2455 _ => Ok::<_, ProcessingError>(
2456 ctx.rate_limits.check_with_quotas(quotas, item_scope),
2457 ),
2458 }
2459 }
2460 });
2461
2462 if let Some(category) = event_category {
2465 envelope_limiter.assume_event(category);
2466 }
2467
2468 let scoping = managed_envelope.scoping();
2469 let (enforcement, rate_limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), type = self.name(), {
2470 envelope_limiter
2471 .compute(managed_envelope.envelope_mut(), &scoping)
2472 .await
2473 })?;
2474 let event_active = enforcement.is_event_active();
2475
2476 #[cfg(feature = "processing")]
2480 _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
2481 enforcement.apply_with_outcomes(managed_envelope);
2482
2483 if event_active {
2484 debug_assert!(managed_envelope.envelope().is_empty());
2485 return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
2486 }
2487
2488 Ok(EnforcementResult::new(event, rate_limits))
2489 }
2490
2491 fn name(&self) -> &'static str {
2492 match self {
2493 Self::Cached => "cached",
2494 #[cfg(feature = "processing")]
2495 Self::Consistent(_) => "consistent",
2496 }
2497 }
2498}
2499
2500pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2501 let envelope_body: Vec<u8> = match http_encoding {
2502 HttpEncoding::Identity => return Ok(body.clone()),
2503 HttpEncoding::Deflate => {
2504 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2505 encoder.write_all(body.as_ref())?;
2506 encoder.finish()?
2507 }
2508 HttpEncoding::Gzip => {
2509 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2510 encoder.write_all(body.as_ref())?;
2511 encoder.finish()?
2512 }
2513 HttpEncoding::Br => {
2514 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2516 encoder.write_all(body.as_ref())?;
2517 encoder.into_inner()
2518 }
2519 HttpEncoding::Zstd => {
2520 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2523 encoder.write_all(body.as_ref())?;
2524 encoder.finish()?
2525 }
2526 };
2527
2528 Ok(envelope_body.into())
2529}
2530
2531#[derive(Debug)]
2533pub struct SendEnvelope {
2534 pub envelope: TypedEnvelope<Processed>,
2535 pub body: Bytes,
2536 pub http_encoding: HttpEncoding,
2537 pub project_cache: ProjectCacheHandle,
2538}
2539
2540impl UpstreamRequest for SendEnvelope {
2541 fn method(&self) -> reqwest::Method {
2542 reqwest::Method::POST
2543 }
2544
2545 fn path(&self) -> Cow<'_, str> {
2546 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2547 }
2548
2549 fn route(&self) -> &'static str {
2550 "envelope"
2551 }
2552
2553 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2554 let envelope_body = self.body.clone();
2555 metric!(
2556 distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2557 );
2558
2559 let meta = &self.envelope.meta();
2560 let shard = self.envelope.partition_key().map(|p| p.to_string());
2561 builder
2562 .content_encoding(self.http_encoding)
2563 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2564 .header_opt("User-Agent", meta.user_agent())
2565 .header("X-Sentry-Auth", meta.auth_header())
2566 .header("X-Forwarded-For", meta.forwarded_for())
2567 .header("Content-Type", envelope::CONTENT_TYPE)
2568 .header_opt("X-Sentry-Relay-Shard", shard)
2569 .body(envelope_body);
2570
2571 Ok(())
2572 }
2573
2574 fn sign(&mut self) -> Option<Sign> {
2575 Some(Sign::Optional(SignatureType::RequestSign))
2576 }
2577
2578 fn respond(
2579 self: Box<Self>,
2580 result: Result<http::Response, UpstreamRequestError>,
2581 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2582 Box::pin(async move {
2583 let result = match result {
2584 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2585 Err(error) => Err(error),
2586 };
2587
2588 match result {
2589 Ok(()) => self.envelope.accept(),
2590 Err(error) if error.is_received() => {
2591 let scoping = self.envelope.scoping();
2592 self.envelope.accept();
2593
2594 if let UpstreamRequestError::RateLimited(limits) = error {
2595 self.project_cache
2596 .get(scoping.project_key)
2597 .rate_limits()
2598 .merge(limits.scope(&scoping));
2599 }
2600 }
2601 Err(error) => {
2602 let mut envelope = self.envelope;
2605 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2606 relay_log::error!(
2607 error = &error as &dyn Error,
2608 tags.project_key = %envelope.scoping().project_key,
2609 "error sending envelope"
2610 );
2611 }
2612 }
2613 })
2614 }
2615}
2616
2617#[derive(Debug)]
2624struct Partition<'a> {
2625 max_size: usize,
2626 remaining: usize,
2627 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2628 project_info: HashMap<ProjectKey, Scoping>,
2629}
2630
2631impl<'a> Partition<'a> {
2632 pub fn new(size: usize) -> Self {
2634 Self {
2635 max_size: size,
2636 remaining: size,
2637 views: HashMap::new(),
2638 project_info: HashMap::new(),
2639 }
2640 }
2641
2642 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
2653 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
2654
2655 if let Some(current) = current {
2656 self.remaining = self.remaining.saturating_sub(current.estimated_size());
2657 self.views
2658 .entry(scoping.project_key)
2659 .or_default()
2660 .push(current);
2661
2662 self.project_info
2663 .entry(scoping.project_key)
2664 .or_insert(scoping);
2665 }
2666
2667 next
2668 }
2669
2670 fn is_empty(&self) -> bool {
2672 self.views.is_empty()
2673 }
2674
2675 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
2679 #[derive(serde::Serialize)]
2680 struct Wrapper<'a> {
2681 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
2682 }
2683
2684 let buckets = &self.views;
2685 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
2686
2687 let scopings = self.project_info.clone();
2688 self.project_info.clear();
2689
2690 self.views.clear();
2691 self.remaining = self.max_size;
2692
2693 (payload, scopings)
2694 }
2695}
2696
2697#[derive(Debug)]
2701struct SendMetricsRequest {
2702 partition_key: String,
2704 unencoded: Bytes,
2706 encoded: Bytes,
2708 project_info: HashMap<ProjectKey, Scoping>,
2712 http_encoding: HttpEncoding,
2714 metric_outcomes: MetricOutcomes,
2716}
2717
2718impl SendMetricsRequest {
2719 fn create_error_outcomes(self) {
2720 #[derive(serde::Deserialize)]
2721 struct Wrapper {
2722 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
2723 }
2724
2725 let buckets = match serde_json::from_slice(&self.unencoded) {
2726 Ok(Wrapper { buckets }) => buckets,
2727 Err(err) => {
2728 relay_log::error!(
2729 error = &err as &dyn std::error::Error,
2730 "failed to parse buckets from failed transmission"
2731 );
2732 return;
2733 }
2734 };
2735
2736 for (key, buckets) in buckets {
2737 let Some(&scoping) = self.project_info.get(&key) else {
2738 relay_log::error!("missing scoping for project key");
2739 continue;
2740 };
2741
2742 self.metric_outcomes.track(
2743 scoping,
2744 &buckets,
2745 Outcome::Invalid(DiscardReason::Internal),
2746 );
2747 }
2748 }
2749}
2750
2751impl UpstreamRequest for SendMetricsRequest {
2752 fn set_relay_id(&self) -> bool {
2753 true
2754 }
2755
2756 fn sign(&mut self) -> Option<Sign> {
2757 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
2758 }
2759
2760 fn method(&self) -> reqwest::Method {
2761 reqwest::Method::POST
2762 }
2763
2764 fn path(&self) -> Cow<'_, str> {
2765 "/api/0/relays/metrics/".into()
2766 }
2767
2768 fn route(&self) -> &'static str {
2769 "global_metrics"
2770 }
2771
2772 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2773 metric!(
2774 distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
2775 );
2776
2777 builder
2778 .content_encoding(self.http_encoding)
2779 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
2780 .header(header::CONTENT_TYPE, b"application/json")
2781 .body(self.encoded.clone());
2782
2783 Ok(())
2784 }
2785
2786 fn respond(
2787 self: Box<Self>,
2788 result: Result<http::Response, UpstreamRequestError>,
2789 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2790 Box::pin(async {
2791 match result {
2792 Ok(mut response) => {
2793 response.consume().await.ok();
2794 }
2795 Err(error) => {
2796 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
2797
2798 if error.is_received() {
2801 return;
2802 }
2803
2804 self.create_error_outcomes()
2805 }
2806 }
2807 })
2808 }
2809}
2810
2811#[derive(Copy, Clone, Debug)]
2813struct CombinedQuotas<'a> {
2814 global_quotas: &'a [Quota],
2815 project_quotas: &'a [Quota],
2816}
2817
2818impl<'a> CombinedQuotas<'a> {
2819 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
2821 Self {
2822 global_quotas: &global_config.quotas,
2823 project_quotas,
2824 }
2825 }
2826
2827 pub fn is_empty(&self) -> bool {
2829 self.len() == 0
2830 }
2831
2832 pub fn len(&self) -> usize {
2834 self.global_quotas.len() + self.project_quotas.len()
2835 }
2836}
2837
2838impl<'a> IntoIterator for CombinedQuotas<'a> {
2839 type Item = &'a Quota;
2840 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
2841
2842 fn into_iter(self) -> Self::IntoIter {
2843 self.global_quotas.iter().chain(self.project_quotas.iter())
2844 }
2845}
2846
2847#[cfg(test)]
2848mod tests {
2849 use insta::assert_debug_snapshot;
2850 use relay_common::glob2::LazyGlob;
2851 use relay_dynamic_config::ProjectConfig;
2852 use relay_event_normalization::{
2853 NormalizationConfig, RedactionRule, TransactionNameConfig, TransactionNameRule,
2854 };
2855 use relay_event_schema::protocol::TransactionSource;
2856 use relay_pii::DataScrubbingConfig;
2857 use similar_asserts::assert_eq;
2858
2859 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
2860
2861 #[cfg(feature = "processing")]
2862 use {
2863 relay_metrics::BucketValue,
2864 relay_quotas::{QuotaScope, ReasonCode},
2865 relay_test::mock_service,
2866 };
2867
2868 use super::*;
2869
2870 #[cfg(feature = "processing")]
2871 fn mock_quota(id: &str) -> Quota {
2872 Quota {
2873 id: Some(id.into()),
2874 categories: [DataCategory::MetricBucket].into(),
2875 scope: QuotaScope::Organization,
2876 scope_id: None,
2877 limit: Some(0),
2878 window: None,
2879 reason_code: None,
2880 namespace: None,
2881 }
2882 }
2883
2884 #[cfg(feature = "processing")]
2885 #[test]
2886 fn test_dynamic_quotas() {
2887 let global_config = GlobalConfig {
2888 quotas: vec![mock_quota("foo"), mock_quota("bar")],
2889 ..Default::default()
2890 };
2891
2892 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
2893
2894 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
2895
2896 assert_eq!(dynamic_quotas.len(), 4);
2897 assert!(!dynamic_quotas.is_empty());
2898
2899 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
2900 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
2901 }
2902
2903 #[cfg(feature = "processing")]
2906 #[tokio::test]
2907 async fn test_ratelimit_per_batch() {
2908 use relay_base_schema::organization::OrganizationId;
2909 use relay_protocol::FiniteF64;
2910
2911 let rate_limited_org = Scoping {
2912 organization_id: OrganizationId::new(1),
2913 project_id: ProjectId::new(21),
2914 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
2915 key_id: Some(17),
2916 };
2917
2918 let not_rate_limited_org = Scoping {
2919 organization_id: OrganizationId::new(2),
2920 project_id: ProjectId::new(21),
2921 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
2922 key_id: Some(17),
2923 };
2924
2925 let message = {
2926 let project_info = {
2927 let quota = Quota {
2928 id: Some("testing".into()),
2929 categories: [DataCategory::MetricBucket].into(),
2930 scope: relay_quotas::QuotaScope::Organization,
2931 scope_id: Some(rate_limited_org.organization_id.to_string().into()),
2932 limit: Some(0),
2933 window: None,
2934 reason_code: Some(ReasonCode::new("test")),
2935 namespace: None,
2936 };
2937
2938 let mut config = ProjectConfig::default();
2939 config.quotas.push(quota);
2940
2941 Arc::new(ProjectInfo {
2942 config,
2943 ..Default::default()
2944 })
2945 };
2946
2947 let project_metrics = |scoping| ProjectBuckets {
2948 buckets: vec![Bucket {
2949 name: "d:spans/bar".into(),
2950 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
2951 timestamp: UnixTimestamp::now(),
2952 tags: Default::default(),
2953 width: 10,
2954 metadata: BucketMetadata::default(),
2955 }],
2956 rate_limits: Default::default(),
2957 project_info: project_info.clone(),
2958 scoping,
2959 };
2960
2961 let buckets = hashbrown::HashMap::from([
2962 (
2963 rate_limited_org.project_key,
2964 project_metrics(rate_limited_org),
2965 ),
2966 (
2967 not_rate_limited_org.project_key,
2968 project_metrics(not_rate_limited_org),
2969 ),
2970 ]);
2971
2972 FlushBuckets {
2973 partition_key: 0,
2974 buckets,
2975 }
2976 };
2977
2978 assert_eq!(message.buckets.keys().count(), 2);
2980
2981 let config = {
2982 let config_json = serde_json::json!({
2983 "processing": {
2984 "enabled": true,
2985 "kafka_config": [],
2986 "redis": {
2987 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
2988 }
2989 }
2990 });
2991 Config::from_json_value(config_json).unwrap()
2992 };
2993
2994 let (store, handle) = {
2995 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
2996 let org_id = match msg {
2997 Store::Metrics(x) => x.scoping.organization_id,
2998 _ => panic!("received envelope when expecting only metrics"),
2999 };
3000 org_ids.push(org_id);
3001 };
3002
3003 mock_service("store_forwarder", vec![], f)
3004 };
3005
3006 let processor = create_test_processor(config).await;
3007 assert!(processor.redis_rate_limiter_enabled());
3008
3009 processor.encode_metrics_processing(message, &store).await;
3010
3011 drop(store);
3012 let orgs_not_ratelimited = handle.await.unwrap();
3013
3014 assert_eq!(
3015 orgs_not_ratelimited,
3016 vec![not_rate_limited_org.organization_id]
3017 );
3018 }
3019
3020 #[tokio::test]
3021 async fn test_browser_version_extraction_with_pii_like_data() {
3022 let processor = create_test_processor(Default::default()).await;
3023 let outcome_aggregator = Addr::dummy();
3024 let event_id = EventId::new();
3025
3026 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3027 .parse()
3028 .unwrap();
3029
3030 let request_meta = RequestMeta::new(dsn);
3031 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
3032
3033 envelope.add_item({
3034 let mut item = Item::new(ItemType::Event);
3035 item.set_payload(
3036 ContentType::Json,
3037 r#"
3038 {
3039 "request": {
3040 "headers": [
3041 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
3042 ]
3043 }
3044 }
3045 "#,
3046 );
3047 item
3048 });
3049
3050 let mut datascrubbing_settings = DataScrubbingConfig::default();
3051 datascrubbing_settings.scrub_data = true;
3053 datascrubbing_settings.scrub_defaults = true;
3054 datascrubbing_settings.scrub_ip_addresses = true;
3055
3056 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
3058
3059 let config = ProjectConfig {
3060 datascrubbing_settings,
3061 pii_config: Some(pii_config),
3062 ..Default::default()
3063 };
3064
3065 let project_info = ProjectInfo {
3066 config,
3067 ..Default::default()
3068 };
3069
3070 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
3071 assert_eq!(envelopes.len(), 1);
3072
3073 let (group, envelope) = envelopes.pop().unwrap();
3074 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3075
3076 let message = ProcessEnvelopeGrouped {
3077 group,
3078 envelope,
3079 ctx: processing::Context {
3080 project_info: &project_info,
3081 ..processing::Context::for_test()
3082 },
3083 };
3084
3085 let Ok(Some(Submit::Output { output, ctx })) = processor.process(message).await else {
3086 panic!();
3087 };
3088 let new_envelope = output.serialize_envelope(ctx).unwrap().accept(|f| f);
3089
3090 let event_item = new_envelope.items().last().unwrap();
3091 let annotated_event: Annotated<Event> =
3092 Annotated::from_json_bytes(&event_item.payload()).unwrap();
3093 let event = annotated_event.into_value().unwrap();
3094 let headers = event
3095 .request
3096 .into_value()
3097 .unwrap()
3098 .headers
3099 .into_value()
3100 .unwrap();
3101
3102 assert_eq!(
3104 Some(
3105 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
3106 ),
3107 headers.get_header("User-Agent")
3108 );
3109 let contexts = event.contexts.into_value().unwrap();
3111 let browser = contexts.0.get("browser").unwrap();
3112 assert_eq!(
3113 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
3114 browser.to_json().unwrap()
3115 );
3116 }
3117
3118 #[tokio::test]
3119 #[cfg(feature = "processing")]
3120 async fn test_materialize_dsc() {
3121 use crate::services::projects::project::PublicKeyConfig;
3122
3123 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
3124 .parse()
3125 .unwrap();
3126 let request_meta = RequestMeta::new(dsn);
3127 let mut envelope = Envelope::from_request(None, request_meta);
3128
3129 let dsc = r#"{
3130 "trace_id": "00000000-0000-0000-0000-000000000000",
3131 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
3132 "sample_rate": "0.2"
3133 }"#;
3134 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
3135
3136 let mut item = Item::new(ItemType::Event);
3137 item.set_payload(ContentType::Json, r#"{}"#);
3138 envelope.add_item(item);
3139
3140 let outcome_aggregator = Addr::dummy();
3141 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
3142
3143 let mut project_info = ProjectInfo::default();
3144 project_info.public_keys.push(PublicKeyConfig {
3145 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
3146 numeric_id: Some(1),
3147 });
3148
3149 let config = serde_json::json!({
3150 "processing": {
3151 "enabled": true,
3152 "kafka_config": [],
3153 }
3154 });
3155
3156 let message = ProcessEnvelopeGrouped {
3157 group: ProcessingGroup::Error,
3158 envelope: managed_envelope,
3159 ctx: processing::Context {
3160 config: &Config::from_json_value(config.clone()).unwrap(),
3161 project_info: &project_info,
3162 sampling_project_info: Some(&project_info),
3163 ..processing::Context::for_test()
3164 },
3165 };
3166
3167 let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
3168 let Ok(Some(Submit::Output { output, ctx })) = processor.process(message).await else {
3169 panic!();
3170 };
3171 let envelope = output.serialize_envelope(ctx).unwrap();
3172 let event = envelope
3173 .get_item_by(|item| item.ty() == &ItemType::Event)
3174 .unwrap();
3175
3176 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
3177 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
3178 Object(
3179 {
3180 "environment": ~,
3181 "public_key": String(
3182 "e12d836b15bb49d7bbf99e64295d995b",
3183 ),
3184 "release": ~,
3185 "replay_id": ~,
3186 "sample_rate": String(
3187 "0.2",
3188 ),
3189 "trace_id": String(
3190 "00000000000000000000000000000000",
3191 ),
3192 "transaction": ~,
3193 },
3194 )
3195 "###);
3196 }
3197
3198 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
3199 let mut event = Annotated::<Event>::from_json(
3200 r#"
3201 {
3202 "type": "transaction",
3203 "transaction": "/foo/",
3204 "timestamp": 946684810.0,
3205 "start_timestamp": 946684800.0,
3206 "contexts": {
3207 "trace": {
3208 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
3209 "span_id": "fa90fdead5f74053",
3210 "op": "http.server",
3211 "type": "trace"
3212 }
3213 },
3214 "transaction_info": {
3215 "source": "url"
3216 }
3217 }
3218 "#,
3219 )
3220 .unwrap();
3221 let e = event.value_mut().as_mut().unwrap();
3222 e.transaction.set_value(Some(transaction_name.into()));
3223
3224 e.transaction_info
3225 .value_mut()
3226 .as_mut()
3227 .unwrap()
3228 .source
3229 .set_value(Some(source));
3230
3231 relay_statsd::with_capturing_test_client(|| {
3232 utils::log_transaction_name_metrics(&mut event, |event| {
3233 let config = NormalizationConfig {
3234 transaction_name_config: TransactionNameConfig {
3235 rules: &[TransactionNameRule {
3236 pattern: LazyGlob::new("/foo/*/**".to_owned()),
3237 expiry: DateTime::<Utc>::MAX_UTC,
3238 redaction: RedactionRule::Replace {
3239 substitution: "*".to_owned(),
3240 },
3241 }],
3242 },
3243 ..Default::default()
3244 };
3245 relay_event_normalization::normalize_event(event, &config)
3246 });
3247 })
3248 }
3249
3250 #[test]
3251 fn test_log_transaction_metrics_none() {
3252 let captures = capture_test_event("/nothing", TransactionSource::Url);
3253 insta::assert_debug_snapshot!(captures, @r###"
3254 [
3255 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
3256 ]
3257 "###);
3258 }
3259
3260 #[test]
3261 fn test_log_transaction_metrics_rule() {
3262 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
3263 insta::assert_debug_snapshot!(captures, @r###"
3264 [
3265 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
3266 ]
3267 "###);
3268 }
3269
3270 #[test]
3271 fn test_log_transaction_metrics_pattern() {
3272 let captures = capture_test_event("/something/12345", TransactionSource::Url);
3273 insta::assert_debug_snapshot!(captures, @r###"
3274 [
3275 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
3276 ]
3277 "###);
3278 }
3279
3280 #[test]
3281 fn test_log_transaction_metrics_both() {
3282 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
3283 insta::assert_debug_snapshot!(captures, @r###"
3284 [
3285 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
3286 ]
3287 "###);
3288 }
3289
3290 #[test]
3291 fn test_log_transaction_metrics_no_match() {
3292 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
3293 insta::assert_debug_snapshot!(captures, @r###"
3294 [
3295 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
3296 ]
3297 "###);
3298 }
3299
3300 #[tokio::test]
3301 async fn test_process_metrics_bucket_metadata() {
3302 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3303 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
3304 let received_at = Utc::now();
3305 let config = Config::default();
3306
3307 let (aggregator, mut aggregator_rx) = Addr::custom();
3308 let processor = create_test_processor_with_addrs(
3309 config,
3310 Addrs {
3311 aggregator,
3312 ..Default::default()
3313 },
3314 )
3315 .await;
3316
3317 let mut item = Item::new(ItemType::Statsd);
3318 item.set_payload(ContentType::Text, "spans/foo:3182887624:4267882815|s");
3319 for (source, expected_received_at) in [
3320 (
3321 BucketSource::External,
3322 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
3323 ),
3324 (BucketSource::Internal, None),
3325 ] {
3326 let message = ProcessMetrics {
3327 data: MetricData::Raw(vec![item.clone()]),
3328 project_key,
3329 source,
3330 received_at,
3331 sent_at: Some(Utc::now()),
3332 };
3333 processor.handle_process_metrics(&mut token, message);
3334
3335 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
3336 let buckets = merge_buckets.buckets;
3337 assert_eq!(buckets.len(), 1);
3338 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
3339 }
3340 }
3341
3342 #[tokio::test]
3343 async fn test_process_batched_metrics() {
3344 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
3345 let received_at = Utc::now();
3346 let config = Config::default();
3347
3348 let (aggregator, mut aggregator_rx) = Addr::custom();
3349 let processor = create_test_processor_with_addrs(
3350 config,
3351 Addrs {
3352 aggregator,
3353 ..Default::default()
3354 },
3355 )
3356 .await;
3357
3358 let payload = r#"{
3359 "buckets": {
3360 "11111111111111111111111111111111": [
3361 {
3362 "timestamp": 1615889440,
3363 "width": 0,
3364 "name": "d:custom/endpoint.response_time@millisecond",
3365 "type": "d",
3366 "value": [
3367 68.0
3368 ],
3369 "tags": {
3370 "route": "user_index"
3371 }
3372 }
3373 ],
3374 "22222222222222222222222222222222": [
3375 {
3376 "timestamp": 1615889440,
3377 "width": 0,
3378 "name": "d:custom/endpoint.cache_rate@none",
3379 "type": "d",
3380 "value": [
3381 36.0
3382 ]
3383 }
3384 ]
3385 }
3386}
3387"#;
3388 let message = ProcessBatchedMetrics {
3389 payload: Bytes::from(payload),
3390 source: BucketSource::Internal,
3391 received_at,
3392 sent_at: Some(Utc::now()),
3393 };
3394 processor.handle_process_batched_metrics(&mut token, message);
3395
3396 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3397 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3398
3399 let mut messages = vec![mb1, mb2];
3400 messages.sort_by_key(|mb| mb.project_key);
3401
3402 let actual = messages
3403 .into_iter()
3404 .map(|mb| (mb.project_key, mb.buckets))
3405 .collect::<Vec<_>>();
3406
3407 assert_debug_snapshot!(actual, @r###"
3408 [
3409 (
3410 ProjectKey("11111111111111111111111111111111"),
3411 [
3412 Bucket {
3413 timestamp: UnixTimestamp(1615889440),
3414 width: 0,
3415 name: MetricName(
3416 "d:custom/endpoint.response_time@millisecond",
3417 ),
3418 value: Distribution(
3419 [
3420 68.0,
3421 ],
3422 ),
3423 tags: {
3424 "route": "user_index",
3425 },
3426 metadata: BucketMetadata {
3427 merges: 1,
3428 received_at: None,
3429 extracted_from_indexed: false,
3430 },
3431 },
3432 ],
3433 ),
3434 (
3435 ProjectKey("22222222222222222222222222222222"),
3436 [
3437 Bucket {
3438 timestamp: UnixTimestamp(1615889440),
3439 width: 0,
3440 name: MetricName(
3441 "d:custom/endpoint.cache_rate@none",
3442 ),
3443 value: Distribution(
3444 [
3445 36.0,
3446 ],
3447 ),
3448 tags: {},
3449 metadata: BucketMetadata {
3450 merges: 1,
3451 received_at: None,
3452 extracted_from_indexed: false,
3453 },
3454 },
3455 ],
3456 ),
3457 ]
3458 "###);
3459 }
3460}