1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::{Arc, Once};
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, NormalizationLevel, RelayMode};
23use relay_dynamic_config::{CombinedMetricExtractionConfig, ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{
25 ClockDriftProcessor, CombinedMeasurementsConfig, EventValidationConfig, GeoIpLookup,
26 MeasurementsConfig, NormalizationConfig, RawUserAgentInfo, TransactionNameConfig,
27 normalize_event, validate_event,
28};
29use relay_event_schema::processor::ProcessingAction;
30use relay_event_schema::protocol::{
31 ClientReport, Event, EventId, EventType, IpAddr, Metrics, NetworkReportError, SpanV2,
32};
33use relay_filter::FilterStatKey;
34use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
35use relay_pii::PiiConfigError;
36use relay_protocol::{Annotated, Empty};
37use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
38use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
39use relay_statsd::metric;
40use relay_system::{Addr, FromMessage, NoResponse, Service};
41use reqwest::header;
42use smallvec::{SmallVec, smallvec};
43use zstd::stream::Encoder as ZstdEncoder;
44
45use crate::constants::DEFAULT_EVENT_RETENTION;
46use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType};
47use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
48use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
49use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
50use crate::metrics_extraction::transactions::types::ExtractMetricsError;
51use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor};
52use crate::processing::logs::LogsProcessor;
53use crate::processing::spans::SpansProcessor;
54use crate::processing::{Forward as _, Output, Outputs, QuotaRateLimiter};
55use crate::service::ServiceError;
56use crate::services::global_config::GlobalConfigHandle;
57use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
58use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
59use crate::services::processor::event::FiltersStatus;
60use crate::services::projects::cache::ProjectCacheHandle;
61use crate::services::projects::project::{ProjectInfo, ProjectState};
62use crate::services::upstream::{
63 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
64};
65use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
66use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
67use crate::{http, processing};
68use relay_base_schema::organization::OrganizationId;
69use relay_threading::AsyncPool;
70#[cfg(feature = "processing")]
71use {
72 crate::managed::ItemAction,
73 crate::services::global_rate_limits::{GlobalRateLimits, GlobalRateLimitsServiceHandle},
74 crate::services::processor::nnswitch::SwitchProcessingError,
75 crate::services::store::{Store, StoreEnvelope},
76 crate::utils::Enforcement,
77 itertools::Itertools,
78 relay_cardinality::{
79 CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
80 RedisSetLimiterOptions,
81 },
82 relay_dynamic_config::{CardinalityLimiterMode, MetricExtractionGroups},
83 relay_quotas::{RateLimitingError, RedisRateLimiter},
84 relay_redis::{AsyncRedisClient, RedisClients},
85 std::time::Instant,
86 symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
87};
88
89mod attachment;
90mod dynamic_sampling;
91mod event;
92mod metrics;
93mod nel;
94mod profile;
95mod profile_chunk;
96mod replay;
97mod report;
98mod session;
99mod span;
100pub use span::extract_transaction_span;
101
102#[cfg(all(sentry, feature = "processing"))]
103mod playstation;
104mod standalone;
105#[cfg(feature = "processing")]
106mod unreal;
107
108#[cfg(feature = "processing")]
109mod nnswitch;
110
111macro_rules! if_processing {
115 ($config:expr, $if_true:block) => {
116 #[cfg(feature = "processing")] {
117 if $config.processing_enabled() $if_true
118 }
119 };
120 ($config:expr, $if_true:block else $if_false:block) => {
121 {
122 #[cfg(feature = "processing")] {
123 if $config.processing_enabled() $if_true else $if_false
124 }
125 #[cfg(not(feature = "processing"))] {
126 $if_false
127 }
128 }
129 };
130}
131
132const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
134
135#[derive(Debug)]
136pub struct GroupTypeError;
137
138impl Display for GroupTypeError {
139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 f.write_str("failed to convert processing group into corresponding type")
141 }
142}
143
144impl std::error::Error for GroupTypeError {}
145
146macro_rules! processing_group {
147 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
148 #[derive(Clone, Copy, Debug)]
149 pub struct $ty;
150
151 impl From<$ty> for ProcessingGroup {
152 fn from(_: $ty) -> Self {
153 ProcessingGroup::$variant
154 }
155 }
156
157 impl TryFrom<ProcessingGroup> for $ty {
158 type Error = GroupTypeError;
159
160 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
161 if matches!(value, ProcessingGroup::$variant) {
162 return Ok($ty);
163 }
164 $($(
165 if matches!(value, ProcessingGroup::$other) {
166 return Ok($ty);
167 }
168 )+)?
169 return Err(GroupTypeError);
170 }
171 }
172 };
173}
174
175pub trait EventProcessing {}
179
180pub trait Sampling {
182 fn supports_sampling(project_info: &ProjectInfo) -> bool;
184
185 fn supports_reservoir_sampling() -> bool;
187}
188
189processing_group!(TransactionGroup, Transaction);
190impl EventProcessing for TransactionGroup {}
191
192impl Sampling for TransactionGroup {
193 fn supports_sampling(project_info: &ProjectInfo) -> bool {
194 matches!(&project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled())
196 }
197
198 fn supports_reservoir_sampling() -> bool {
199 true
200 }
201}
202
203processing_group!(ErrorGroup, Error);
204impl EventProcessing for ErrorGroup {}
205
206processing_group!(SessionGroup, Session);
207processing_group!(StandaloneGroup, Standalone);
208processing_group!(ClientReportGroup, ClientReport);
209processing_group!(ReplayGroup, Replay);
210processing_group!(CheckInGroup, CheckIn);
211processing_group!(LogGroup, Log, Nel);
212processing_group!(SpanGroup, Span);
213
214impl Sampling for SpanGroup {
215 fn supports_sampling(project_info: &ProjectInfo) -> bool {
216 matches!(&project_info.config().metric_extraction, ErrorBoundary::Ok(c) if c.is_supported())
218 }
219
220 fn supports_reservoir_sampling() -> bool {
221 false
222 }
223}
224
225processing_group!(ProfileChunkGroup, ProfileChunk);
226processing_group!(MetricsGroup, Metrics);
227processing_group!(ForwardUnknownGroup, ForwardUnknown);
228processing_group!(Ungrouped, Ungrouped);
229
230#[derive(Clone, Copy, Debug)]
234pub struct Processed;
235
236#[derive(Clone, Copy, Debug)]
238pub enum ProcessingGroup {
239 Transaction,
243 Error,
248 Session,
250 Standalone,
253 ClientReport,
255 Replay,
257 CheckIn,
259 Nel,
261 Log,
263 Span,
265 SpanV2,
267 Metrics,
269 ProfileChunk,
271 ForwardUnknown,
274 Ungrouped,
276}
277
278impl ProcessingGroup {
279 fn split_envelope(
281 mut envelope: Envelope,
282 project_info: &ProjectInfo,
283 ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
284 let headers = envelope.headers().clone();
285 let mut grouped_envelopes = smallvec![];
286
287 let replay_items = envelope.take_items_by(|item| {
289 matches!(
290 item.ty(),
291 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
292 )
293 });
294 if !replay_items.is_empty() {
295 grouped_envelopes.push((
296 ProcessingGroup::Replay,
297 Envelope::from_parts(headers.clone(), replay_items),
298 ))
299 }
300
301 let session_items = envelope
303 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
304 if !session_items.is_empty() {
305 grouped_envelopes.push((
306 ProcessingGroup::Session,
307 Envelope::from_parts(headers.clone(), session_items),
308 ))
309 }
310
311 if project_info.has_feature(Feature::SpanV2ExperimentalProcessing) {
312 let span_v2_items = envelope.take_items_by(ItemContainer::<SpanV2>::is_container);
313
314 if !span_v2_items.is_empty() {
315 grouped_envelopes.push((
316 ProcessingGroup::SpanV2,
317 Envelope::from_parts(headers.clone(), span_v2_items),
318 ))
319 }
320 }
321
322 let span_items = envelope.take_items_by(|item| {
324 matches!(
325 item.ty(),
326 &ItemType::Span | &ItemType::OtelSpan | &ItemType::OtelTracesData
327 )
328 });
329
330 if !span_items.is_empty() {
331 grouped_envelopes.push((
332 ProcessingGroup::Span,
333 Envelope::from_parts(headers.clone(), span_items),
334 ))
335 }
336
337 let logs_items = envelope
339 .take_items_by(|item| matches!(item.ty(), &ItemType::Log | &ItemType::OtelLogsData));
340 if !logs_items.is_empty() {
341 grouped_envelopes.push((
342 ProcessingGroup::Log,
343 Envelope::from_parts(headers.clone(), logs_items),
344 ))
345 }
346
347 let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
349 if !nel_items.is_empty() {
350 grouped_envelopes.push((
351 ProcessingGroup::Nel,
352 Envelope::from_parts(headers.clone(), nel_items),
353 ))
354 }
355
356 let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
361 if !metric_items.is_empty() {
362 grouped_envelopes.push((
363 ProcessingGroup::Metrics,
364 Envelope::from_parts(headers.clone(), metric_items),
365 ))
366 }
367
368 let profile_chunk_items =
370 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
371 if !profile_chunk_items.is_empty() {
372 grouped_envelopes.push((
373 ProcessingGroup::ProfileChunk,
374 Envelope::from_parts(headers.clone(), profile_chunk_items),
375 ))
376 }
377
378 if !envelope.items().any(Item::creates_event) {
383 let standalone_items = envelope.take_items_by(Item::requires_event);
384 if !standalone_items.is_empty() {
385 grouped_envelopes.push((
386 ProcessingGroup::Standalone,
387 Envelope::from_parts(headers.clone(), standalone_items),
388 ))
389 }
390 };
391
392 let security_reports_items = envelope
394 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
395 .into_iter()
396 .map(|item| {
397 let headers = headers.clone();
398 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
399 let mut envelope = Envelope::from_parts(headers, items);
400 envelope.set_event_id(EventId::new());
401 (ProcessingGroup::Error, envelope)
402 });
403 grouped_envelopes.extend(security_reports_items);
404
405 let require_event_items = envelope.take_items_by(Item::requires_event);
407 if !require_event_items.is_empty() {
408 let group = if require_event_items
409 .iter()
410 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
411 {
412 ProcessingGroup::Transaction
413 } else {
414 ProcessingGroup::Error
415 };
416
417 grouped_envelopes.push((
418 group,
419 Envelope::from_parts(headers.clone(), require_event_items),
420 ))
421 }
422
423 let envelopes = envelope.items_mut().map(|item| {
425 let headers = headers.clone();
426 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
427 let envelope = Envelope::from_parts(headers, items);
428 let item_type = item.ty();
429 let group = if matches!(item_type, &ItemType::CheckIn) {
430 ProcessingGroup::CheckIn
431 } else if matches!(item.ty(), &ItemType::ClientReport) {
432 ProcessingGroup::ClientReport
433 } else if matches!(item_type, &ItemType::Unknown(_)) {
434 ProcessingGroup::ForwardUnknown
435 } else {
436 ProcessingGroup::Ungrouped
438 };
439
440 (group, envelope)
441 });
442 grouped_envelopes.extend(envelopes);
443
444 grouped_envelopes
445 }
446
447 pub fn variant(&self) -> &'static str {
449 match self {
450 ProcessingGroup::Transaction => "transaction",
451 ProcessingGroup::Error => "error",
452 ProcessingGroup::Session => "session",
453 ProcessingGroup::Standalone => "standalone",
454 ProcessingGroup::ClientReport => "client_report",
455 ProcessingGroup::Replay => "replay",
456 ProcessingGroup::CheckIn => "check_in",
457 ProcessingGroup::Log => "log",
458 ProcessingGroup::Nel => "nel",
459 ProcessingGroup::Span => "span",
460 ProcessingGroup::SpanV2 => "span_v2",
461 ProcessingGroup::Metrics => "metrics",
462 ProcessingGroup::ProfileChunk => "profile_chunk",
463 ProcessingGroup::ForwardUnknown => "forward_unknown",
464 ProcessingGroup::Ungrouped => "ungrouped",
465 }
466 }
467}
468
469impl From<ProcessingGroup> for AppFeature {
470 fn from(value: ProcessingGroup) -> Self {
471 match value {
472 ProcessingGroup::Transaction => AppFeature::Transactions,
473 ProcessingGroup::Error => AppFeature::Errors,
474 ProcessingGroup::Session => AppFeature::Sessions,
475 ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
476 ProcessingGroup::ClientReport => AppFeature::ClientReports,
477 ProcessingGroup::Replay => AppFeature::Replays,
478 ProcessingGroup::CheckIn => AppFeature::CheckIns,
479 ProcessingGroup::Log => AppFeature::Logs,
480 ProcessingGroup::Nel => AppFeature::Logs,
481 ProcessingGroup::Span => AppFeature::Spans,
482 ProcessingGroup::SpanV2 => AppFeature::Spans,
483 ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
484 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
485 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
486 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
487 }
488 }
489}
490
491#[derive(Debug, thiserror::Error)]
493pub enum ProcessingError {
494 #[error("invalid json in event")]
495 InvalidJson(#[source] serde_json::Error),
496
497 #[error("invalid message pack event payload")]
498 InvalidMsgpack(#[from] rmp_serde::decode::Error),
499
500 #[cfg(feature = "processing")]
501 #[error("invalid unreal crash report")]
502 InvalidUnrealReport(#[source] Unreal4Error),
503
504 #[error("event payload too large")]
505 PayloadTooLarge(DiscardItemType),
506
507 #[error("invalid transaction event")]
508 InvalidTransaction,
509
510 #[error("envelope processor failed")]
511 ProcessingFailed(#[from] ProcessingAction),
512
513 #[error("duplicate {0} in event")]
514 DuplicateItem(ItemType),
515
516 #[error("failed to extract event payload")]
517 NoEventPayload,
518
519 #[error("missing project id in DSN")]
520 MissingProjectId,
521
522 #[error("invalid security report type: {0:?}")]
523 InvalidSecurityType(Bytes),
524
525 #[error("unsupported security report type")]
526 UnsupportedSecurityType,
527
528 #[error("invalid security report")]
529 InvalidSecurityReport(#[source] serde_json::Error),
530
531 #[error("invalid nel report")]
532 InvalidNelReport(#[source] NetworkReportError),
533
534 #[error("event filtered with reason: {0:?}")]
535 EventFiltered(FilterStatKey),
536
537 #[error("missing or invalid required event timestamp")]
538 InvalidTimestamp,
539
540 #[error("could not serialize event payload")]
541 SerializeFailed(#[source] serde_json::Error),
542
543 #[cfg(feature = "processing")]
544 #[error("failed to apply quotas")]
545 QuotasFailed(#[from] RateLimitingError),
546
547 #[error("invalid pii config")]
548 PiiConfigError(PiiConfigError),
549
550 #[error("invalid processing group type")]
551 InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
552
553 #[error("invalid replay")]
554 InvalidReplay(DiscardReason),
555
556 #[error("replay filtered with reason: {0:?}")]
557 ReplayFiltered(FilterStatKey),
558
559 #[cfg(feature = "processing")]
560 #[error("nintendo switch dying message processing failed {0:?}")]
561 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
562
563 #[cfg(all(sentry, feature = "processing"))]
564 #[error("playstation dump processing failed: {0}")]
565 InvalidPlaystationDump(String),
566
567 #[error("processing group does not match specific processor")]
568 ProcessingGroupMismatch,
569 #[error("new processing pipeline failed")]
570 ProcessingFailure,
571}
572
573impl ProcessingError {
574 pub fn to_outcome(&self) -> Option<Outcome> {
575 match self {
576 Self::PayloadTooLarge(payload_type) => {
577 Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
578 }
579 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
580 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
581 Self::InvalidSecurityType(_) => {
582 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
583 }
584 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
585 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
586 Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
587 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
588 Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
589 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
590 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
591 #[cfg(feature = "processing")]
592 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
593 #[cfg(all(sentry, feature = "processing"))]
594 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
595 #[cfg(feature = "processing")]
596 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
597 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
598 }
599 #[cfg(feature = "processing")]
600 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
601 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
602 Some(Outcome::Invalid(DiscardReason::Internal))
603 }
604 #[cfg(feature = "processing")]
605 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
606 Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
607 Self::MissingProjectId => None,
608 Self::EventFiltered(_) => None,
609 Self::InvalidProcessingGroup(_) => None,
610 Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
611 Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
612
613 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
614 Self::ProcessingFailure => None,
616 }
617 }
618
619 fn is_unexpected(&self) -> bool {
620 self.to_outcome()
621 .is_some_and(|outcome| outcome.is_unexpected())
622 }
623}
624
625#[cfg(feature = "processing")]
626impl From<Unreal4Error> for ProcessingError {
627 fn from(err: Unreal4Error) -> Self {
628 match err.kind() {
629 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
630 _ => ProcessingError::InvalidUnrealReport(err),
631 }
632 }
633}
634
635impl From<ExtractMetricsError> for ProcessingError {
636 fn from(error: ExtractMetricsError) -> Self {
637 match error {
638 ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
639 Self::InvalidTimestamp
640 }
641 }
642 }
643}
644
645impl From<InvalidProcessingGroupType> for ProcessingError {
646 fn from(value: InvalidProcessingGroupType) -> Self {
647 Self::InvalidProcessingGroup(Box::new(value))
648 }
649}
650
651type ExtractedEvent = (Annotated<Event>, usize);
652
653#[derive(Debug)]
658pub struct ProcessingExtractedMetrics {
659 metrics: ExtractedMetrics,
660}
661
662impl ProcessingExtractedMetrics {
663 pub fn new() -> Self {
664 Self {
665 metrics: ExtractedMetrics::default(),
666 }
667 }
668
669 pub fn extend(
671 &mut self,
672 extracted: ExtractedMetrics,
673 sampling_decision: Option<SamplingDecision>,
674 ) {
675 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
676 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
677 }
678
679 pub fn extend_project_metrics<I>(
681 &mut self,
682 buckets: I,
683 sampling_decision: Option<SamplingDecision>,
684 ) where
685 I: IntoIterator<Item = Bucket>,
686 {
687 self.metrics
688 .project_metrics
689 .extend(buckets.into_iter().map(|mut bucket| {
690 bucket.metadata.extracted_from_indexed =
691 sampling_decision == Some(SamplingDecision::Keep);
692 bucket
693 }));
694 }
695
696 pub fn extend_sampling_metrics<I>(
698 &mut self,
699 buckets: I,
700 sampling_decision: Option<SamplingDecision>,
701 ) where
702 I: IntoIterator<Item = Bucket>,
703 {
704 self.metrics
705 .sampling_metrics
706 .extend(buckets.into_iter().map(|mut bucket| {
707 bucket.metadata.extracted_from_indexed =
708 sampling_decision == Some(SamplingDecision::Keep);
709 bucket
710 }));
711 }
712
713 #[cfg(feature = "processing")]
718 fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
719 let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
721 let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
724
725 for (namespace, limit, indexed) in [
726 (
727 MetricNamespace::Transactions,
728 &enforcement.event,
729 &enforcement.event_indexed,
730 ),
731 (
732 MetricNamespace::Spans,
733 &enforcement.spans,
734 &enforcement.spans_indexed,
735 ),
736 ] {
737 if limit.is_active() {
738 drop_namespaces.push(namespace);
739 } else if indexed.is_active() && !enforced_consistently {
740 reset_extracted_from_indexed.push(namespace);
745 }
746 }
747
748 if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
749 self.retain_mut(|bucket| {
750 let Some(namespace) = bucket.name.try_namespace() else {
751 return true;
752 };
753
754 if drop_namespaces.contains(&namespace) {
755 return false;
756 }
757
758 if reset_extracted_from_indexed.contains(&namespace) {
759 bucket.metadata.extracted_from_indexed = false;
760 }
761
762 true
763 });
764 }
765 }
766
767 #[cfg(feature = "processing")]
768 fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
769 self.metrics.project_metrics.retain_mut(&mut f);
770 self.metrics.sampling_metrics.retain_mut(&mut f);
771 }
772}
773
774fn send_metrics(
775 metrics: ExtractedMetrics,
776 project_key: ProjectKey,
777 sampling_key: Option<ProjectKey>,
778 aggregator: &Addr<Aggregator>,
779) {
780 let ExtractedMetrics {
781 project_metrics,
782 sampling_metrics,
783 } = metrics;
784
785 if !project_metrics.is_empty() {
786 aggregator.send(MergeBuckets {
787 project_key,
788 buckets: project_metrics,
789 });
790 }
791
792 if !sampling_metrics.is_empty() {
793 let sampling_project_key = sampling_key.unwrap_or(project_key);
800 aggregator.send(MergeBuckets {
801 project_key: sampling_project_key,
802 buckets: sampling_metrics,
803 });
804 }
805}
806
807fn event_category(event: &Annotated<Event>) -> Option<DataCategory> {
812 event_type(event).map(DataCategory::from)
813}
814
815fn event_type(event: &Annotated<Event>) -> Option<EventType> {
820 event
821 .value()
822 .map(|event| event.ty.value().copied().unwrap_or_default())
823}
824
825fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
830 match config.relay_mode() {
831 RelayMode::Proxy => false,
832 RelayMode::Managed => !project_info.has_feature(feature),
833 }
834}
835
836#[derive(Copy, Clone)]
838struct EventFullyNormalized(bool);
839
840impl EventFullyNormalized {
841 pub fn new(envelope: &Envelope) -> Self {
843 let event_fully_normalized = envelope.meta().request_trust().is_trusted()
844 && envelope
845 .items()
846 .any(|item| item.creates_event() && item.fully_normalized());
847
848 Self(event_fully_normalized)
849 }
850}
851
852#[derive(Debug, Copy, Clone)]
854struct EventMetricsExtracted(bool);
855
856#[derive(Debug, Copy, Clone)]
858struct SpansExtracted(bool);
859
860#[derive(Debug)]
863#[expect(
864 clippy::large_enum_variant,
865 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
866)]
867enum ProcessingResult {
868 Envelope {
869 managed_envelope: TypedEnvelope<Processed>,
870 extracted_metrics: ProcessingExtractedMetrics,
871 },
872 Output(Output<Outputs>),
873}
874
875impl ProcessingResult {
876 fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
878 Self::Envelope {
879 managed_envelope,
880 extracted_metrics: ProcessingExtractedMetrics::new(),
881 }
882 }
883}
884
885#[derive(Debug)]
887#[expect(
888 clippy::large_enum_variant,
889 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
890)]
891enum Submit {
892 Envelope(TypedEnvelope<Processed>),
894 Output(Outputs),
896}
897
898#[derive(Debug)]
908pub struct ProcessEnvelope {
909 pub envelope: ManagedEnvelope,
911 pub project_info: Arc<ProjectInfo>,
913 pub rate_limits: Arc<RateLimits>,
915 pub sampling_project_info: Option<Arc<ProjectInfo>>,
917 pub reservoir_counters: ReservoirCounters,
919}
920
921#[derive(Debug)]
923struct ProcessEnvelopeGrouped {
924 pub group: ProcessingGroup,
926 pub envelope: ManagedEnvelope,
928 pub project_info: Arc<ProjectInfo>,
930 pub rate_limits: Arc<RateLimits>,
932 pub sampling_project_info: Option<Arc<ProjectInfo>>,
934 pub reservoir_counters: ReservoirCounters,
936}
937
938#[derive(Debug)]
950pub struct ProcessMetrics {
951 pub data: MetricData,
953 pub project_key: ProjectKey,
955 pub source: BucketSource,
957 pub received_at: DateTime<Utc>,
959 pub sent_at: Option<DateTime<Utc>>,
962}
963
964#[derive(Debug)]
966pub enum MetricData {
967 Raw(Vec<Item>),
969 Parsed(Vec<Bucket>),
971}
972
973impl MetricData {
974 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
979 let items = match self {
980 Self::Parsed(buckets) => return buckets,
981 Self::Raw(items) => items,
982 };
983
984 let mut buckets = Vec::new();
985 for item in items {
986 let payload = item.payload();
987 if item.ty() == &ItemType::Statsd {
988 for bucket_result in Bucket::parse_all(&payload, timestamp) {
989 match bucket_result {
990 Ok(bucket) => buckets.push(bucket),
991 Err(error) => relay_log::debug!(
992 error = &error as &dyn Error,
993 "failed to parse metric bucket from statsd format",
994 ),
995 }
996 }
997 } else if item.ty() == &ItemType::MetricBuckets {
998 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
999 Ok(parsed_buckets) => {
1000 if buckets.is_empty() {
1002 buckets = parsed_buckets;
1003 } else {
1004 buckets.extend(parsed_buckets);
1005 }
1006 }
1007 Err(error) => {
1008 relay_log::debug!(
1009 error = &error as &dyn Error,
1010 "failed to parse metric bucket",
1011 );
1012 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1013 }
1014 }
1015 } else {
1016 relay_log::error!(
1017 "invalid item of type {} passed to ProcessMetrics",
1018 item.ty()
1019 );
1020 }
1021 }
1022 buckets
1023 }
1024}
1025
1026#[derive(Debug)]
1027pub struct ProcessBatchedMetrics {
1028 pub payload: Bytes,
1030 pub source: BucketSource,
1032 pub received_at: DateTime<Utc>,
1034 pub sent_at: Option<DateTime<Utc>>,
1036}
1037
1038#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1040pub enum BucketSource {
1041 Internal,
1047 External,
1052}
1053
1054impl BucketSource {
1055 pub fn from_meta(meta: &RequestMeta) -> Self {
1057 match meta.request_trust() {
1058 RequestTrust::Trusted => Self::Internal,
1059 RequestTrust::Untrusted => Self::External,
1060 }
1061 }
1062}
1063
1064#[derive(Debug)]
1066pub struct SubmitClientReports {
1067 pub client_reports: Vec<ClientReport>,
1069 pub scoping: Scoping,
1071}
1072
1073#[derive(Debug)]
1075pub enum EnvelopeProcessor {
1076 ProcessEnvelope(Box<ProcessEnvelope>),
1077 ProcessProjectMetrics(Box<ProcessMetrics>),
1078 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1079 FlushBuckets(Box<FlushBuckets>),
1080 SubmitClientReports(Box<SubmitClientReports>),
1081}
1082
1083impl EnvelopeProcessor {
1084 pub fn variant(&self) -> &'static str {
1086 match self {
1087 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1088 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1089 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1090 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1091 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1092 }
1093 }
1094}
1095
1096impl relay_system::Interface for EnvelopeProcessor {}
1097
1098impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1099 type Response = relay_system::NoResponse;
1100
1101 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1102 Self::ProcessEnvelope(Box::new(message))
1103 }
1104}
1105
1106impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1107 type Response = NoResponse;
1108
1109 fn from_message(message: ProcessMetrics, _: ()) -> Self {
1110 Self::ProcessProjectMetrics(Box::new(message))
1111 }
1112}
1113
1114impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1115 type Response = NoResponse;
1116
1117 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1118 Self::ProcessBatchedMetrics(Box::new(message))
1119 }
1120}
1121
1122impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1123 type Response = NoResponse;
1124
1125 fn from_message(message: FlushBuckets, _: ()) -> Self {
1126 Self::FlushBuckets(Box::new(message))
1127 }
1128}
1129
1130impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1131 type Response = NoResponse;
1132
1133 fn from_message(message: SubmitClientReports, _: ()) -> Self {
1134 Self::SubmitClientReports(Box::new(message))
1135 }
1136}
1137
1138pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1140
1141#[derive(Clone)]
1145pub struct EnvelopeProcessorService {
1146 inner: Arc<InnerProcessor>,
1147}
1148
1149pub struct Addrs {
1151 pub outcome_aggregator: Addr<TrackOutcome>,
1152 pub upstream_relay: Addr<UpstreamRelay>,
1153 #[cfg(feature = "processing")]
1154 pub store_forwarder: Option<Addr<Store>>,
1155 pub aggregator: Addr<Aggregator>,
1156 #[cfg(feature = "processing")]
1157 pub global_rate_limits: Option<Addr<GlobalRateLimits>>,
1158}
1159
1160impl Default for Addrs {
1161 fn default() -> Self {
1162 Addrs {
1163 outcome_aggregator: Addr::dummy(),
1164 upstream_relay: Addr::dummy(),
1165 #[cfg(feature = "processing")]
1166 store_forwarder: None,
1167 aggregator: Addr::dummy(),
1168 #[cfg(feature = "processing")]
1169 global_rate_limits: None,
1170 }
1171 }
1172}
1173
1174struct InnerProcessor {
1175 pool: EnvelopeProcessorServicePool,
1176 config: Arc<Config>,
1177 global_config: GlobalConfigHandle,
1178 project_cache: ProjectCacheHandle,
1179 cogs: Cogs,
1180 #[cfg(feature = "processing")]
1181 quotas_client: Option<AsyncRedisClient>,
1182 addrs: Addrs,
1183 #[cfg(feature = "processing")]
1184 rate_limiter: Option<Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>>,
1185 geoip_lookup: GeoIpLookup,
1186 #[cfg(feature = "processing")]
1187 cardinality_limiter: Option<CardinalityLimiter>,
1188 metric_outcomes: MetricOutcomes,
1189 processing: Processing,
1190}
1191
1192struct Processing {
1193 logs: LogsProcessor,
1194 spans: SpansProcessor,
1195}
1196
1197impl EnvelopeProcessorService {
1198 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1200 pub fn new(
1201 pool: EnvelopeProcessorServicePool,
1202 config: Arc<Config>,
1203 global_config: GlobalConfigHandle,
1204 project_cache: ProjectCacheHandle,
1205 cogs: Cogs,
1206 #[cfg(feature = "processing")] redis: Option<RedisClients>,
1207 addrs: Addrs,
1208 metric_outcomes: MetricOutcomes,
1209 ) -> Self {
1210 let geoip_lookup = config
1211 .geoip_path()
1212 .and_then(
1213 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1214 Ok(geoip) => Some(geoip),
1215 Err(err) => {
1216 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1217 None
1218 }
1219 },
1220 )
1221 .unwrap_or_else(GeoIpLookup::empty);
1222
1223 #[cfg(feature = "processing")]
1224 let (cardinality, quotas) = match redis {
1225 Some(RedisClients {
1226 cardinality,
1227 quotas,
1228 ..
1229 }) => (Some(cardinality), Some(quotas)),
1230 None => (None, None),
1231 };
1232
1233 #[cfg(feature = "processing")]
1234 let global_rate_limits = addrs.global_rate_limits.clone().map(Into::into);
1235
1236 #[cfg(feature = "processing")]
1237 let rate_limiter = match (quotas.clone(), global_rate_limits) {
1238 (Some(redis), Some(global)) => {
1239 Some(RedisRateLimiter::new(redis, global).max_limit(config.max_rate_limit()))
1240 }
1241 _ => None,
1242 };
1243
1244 let quota_limiter = Arc::new(QuotaRateLimiter::new(
1245 #[cfg(feature = "processing")]
1246 project_cache.clone(),
1247 #[cfg(feature = "processing")]
1248 rate_limiter.clone(),
1249 ));
1250 #[cfg(feature = "processing")]
1251 let rate_limiter = rate_limiter.map(Arc::new);
1252
1253 let inner = InnerProcessor {
1254 pool,
1255 global_config,
1256 project_cache,
1257 cogs,
1258 #[cfg(feature = "processing")]
1259 quotas_client: quotas.clone(),
1260 #[cfg(feature = "processing")]
1261 rate_limiter,
1262 addrs,
1263 #[cfg(feature = "processing")]
1264 cardinality_limiter: cardinality
1265 .map(|cardinality| {
1266 RedisSetLimiter::new(
1267 RedisSetLimiterOptions {
1268 cache_vacuum_interval: config
1269 .cardinality_limiter_cache_vacuum_interval(),
1270 },
1271 cardinality,
1272 )
1273 })
1274 .map(CardinalityLimiter::new),
1275 metric_outcomes,
1276 processing: Processing {
1277 logs: LogsProcessor::new(Arc::clone("a_limiter)),
1278 spans: SpansProcessor::new(quota_limiter, geoip_lookup.clone()),
1279 },
1280 geoip_lookup,
1281 config,
1282 };
1283
1284 Self {
1285 inner: Arc::new(inner),
1286 }
1287 }
1288
1289 #[cfg(feature = "processing")]
1291 fn normalize_checkins(
1292 &self,
1293 managed_envelope: &mut TypedEnvelope<CheckInGroup>,
1294 project_id: ProjectId,
1295 ) {
1296 managed_envelope.retain_items(|item| {
1297 if item.ty() != &ItemType::CheckIn {
1298 return ItemAction::Keep;
1299 }
1300
1301 match relay_monitors::process_check_in(&item.payload(), project_id) {
1302 Ok(result) => {
1303 item.set_routing_hint(result.routing_hint);
1304 item.set_payload(ContentType::Json, result.payload);
1305 ItemAction::Keep
1306 }
1307 Err(error) => {
1308 relay_log::debug!(
1310 error = &error as &dyn Error,
1311 "dropped invalid monitor check-in"
1312 );
1313 ItemAction::DropSilently
1314 }
1315 }
1316 })
1317 }
1318
1319 async fn enforce_quotas<Group>(
1320 &self,
1321 managed_envelope: &mut TypedEnvelope<Group>,
1322 event: Annotated<Event>,
1323 extracted_metrics: &mut ProcessingExtractedMetrics,
1324 project_info: &ProjectInfo,
1325 rate_limits: &RateLimits,
1326 ) -> Result<Annotated<Event>, ProcessingError> {
1327 let global_config = self.inner.global_config.current();
1328 let cached_result = RateLimiter::Cached
1331 .enforce(
1332 managed_envelope,
1333 event,
1334 extracted_metrics,
1335 &global_config,
1336 project_info,
1337 rate_limits,
1338 )
1339 .await?;
1340
1341 if_processing!(self.inner.config, {
1342 let rate_limiter = match self.inner.rate_limiter.clone() {
1343 Some(rate_limiter) => rate_limiter,
1344 None => return Ok(cached_result.event),
1345 };
1346
1347 let consistent_result = RateLimiter::Consistent(rate_limiter)
1349 .enforce(
1350 managed_envelope,
1351 cached_result.event,
1352 extracted_metrics,
1353 &global_config,
1354 project_info,
1355 rate_limits,
1356 )
1357 .await?;
1358
1359 if !consistent_result.rate_limits.is_empty() {
1361 self.inner
1362 .project_cache
1363 .get(managed_envelope.scoping().project_key)
1364 .rate_limits()
1365 .merge(consistent_result.rate_limits);
1366 }
1367
1368 Ok(consistent_result.event)
1369 } else { Ok(cached_result.event) })
1370 }
1371
1372 #[allow(clippy::too_many_arguments)]
1374 fn extract_transaction_metrics(
1375 &self,
1376 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1377 event: &mut Annotated<Event>,
1378 extracted_metrics: &mut ProcessingExtractedMetrics,
1379 project_id: ProjectId,
1380 project_info: Arc<ProjectInfo>,
1381 sampling_decision: SamplingDecision,
1382 event_metrics_extracted: EventMetricsExtracted,
1383 spans_extracted: SpansExtracted,
1384 ) -> Result<EventMetricsExtracted, ProcessingError> {
1385 if event_metrics_extracted.0 {
1386 return Ok(event_metrics_extracted);
1387 }
1388 let Some(event) = event.value_mut() else {
1389 return Ok(event_metrics_extracted);
1390 };
1391
1392 let global = self.inner.global_config.current();
1396 let combined_config = {
1397 let config = match &project_info.config.metric_extraction {
1398 ErrorBoundary::Ok(config) if config.is_supported() => config,
1399 _ => return Ok(event_metrics_extracted),
1400 };
1401 let global_config = match &global.metric_extraction {
1402 ErrorBoundary::Ok(global_config) => global_config,
1403 #[allow(unused_variables)]
1404 ErrorBoundary::Err(e) => {
1405 if_processing!(self.inner.config, {
1406 relay_log::error!("Failed to parse global extraction config {e}");
1409 MetricExtractionGroups::EMPTY
1410 } else {
1411 relay_log::debug!("Failed to parse global extraction config: {e}");
1414 return Ok(event_metrics_extracted);
1415 })
1416 }
1417 };
1418 CombinedMetricExtractionConfig::new(global_config, config)
1419 };
1420
1421 let tx_config = match &project_info.config.transaction_metrics {
1423 Some(ErrorBoundary::Ok(tx_config)) => tx_config,
1424 Some(ErrorBoundary::Err(e)) => {
1425 relay_log::debug!("Failed to parse legacy transaction metrics config: {e}");
1426 return Ok(event_metrics_extracted);
1427 }
1428 None => {
1429 relay_log::debug!("Legacy transaction metrics config is missing");
1430 return Ok(event_metrics_extracted);
1431 }
1432 };
1433
1434 if !tx_config.is_enabled() {
1435 static TX_CONFIG_ERROR: Once = Once::new();
1436 TX_CONFIG_ERROR.call_once(|| {
1437 if self.inner.config.processing_enabled() {
1438 relay_log::error!(
1439 "Processing Relay outdated, received tx config in version {}, which is not supported",
1440 tx_config.version
1441 );
1442 }
1443 });
1444
1445 return Ok(event_metrics_extracted);
1446 }
1447
1448 let extract_spans = !spans_extracted.0
1450 && utils::sample(global.options.span_extraction_sample_rate.unwrap_or(1.0)).is_keep();
1451
1452 let metrics = crate::metrics_extraction::event::extract_metrics(
1453 event,
1454 combined_config,
1455 sampling_decision,
1456 project_id,
1457 self.inner
1458 .config
1459 .aggregator_config_for(MetricNamespace::Spans)
1460 .max_tag_value_length,
1461 extract_spans,
1462 );
1463
1464 extracted_metrics.extend(metrics, Some(sampling_decision));
1465
1466 if !project_info.has_feature(Feature::DiscardTransaction) {
1467 let transaction_from_dsc = managed_envelope
1468 .envelope()
1469 .dsc()
1470 .and_then(|dsc| dsc.transaction.as_deref());
1471
1472 let extractor = TransactionExtractor {
1473 config: tx_config,
1474 generic_config: Some(combined_config),
1475 transaction_from_dsc,
1476 sampling_decision,
1477 target_project_id: project_id,
1478 };
1479
1480 extracted_metrics.extend(extractor.extract(event)?, Some(sampling_decision));
1481 }
1482
1483 Ok(EventMetricsExtracted(true))
1484 }
1485
1486 fn normalize_event<Group: EventProcessing>(
1487 &self,
1488 managed_envelope: &mut TypedEnvelope<Group>,
1489 event: &mut Annotated<Event>,
1490 project_id: ProjectId,
1491 project_info: Arc<ProjectInfo>,
1492 mut event_fully_normalized: EventFullyNormalized,
1493 ) -> Result<EventFullyNormalized, ProcessingError> {
1494 if event.value().is_empty() {
1495 return Ok(event_fully_normalized);
1500 }
1501
1502 let full_normalization = match self.inner.config.normalization_level() {
1503 NormalizationLevel::Full => true,
1504 NormalizationLevel::Default => {
1505 if self.inner.config.processing_enabled() && event_fully_normalized.0 {
1506 return Ok(event_fully_normalized);
1507 }
1508
1509 self.inner.config.processing_enabled()
1510 }
1511 };
1512
1513 let request_meta = managed_envelope.envelope().meta();
1514 let client_ipaddr = request_meta.client_addr().map(IpAddr::from);
1515
1516 let transaction_aggregator_config = self
1517 .inner
1518 .config
1519 .aggregator_config_for(MetricNamespace::Transactions);
1520
1521 let global_config = self.inner.global_config.current();
1522 let ai_model_costs = global_config.ai_model_costs.as_ref().ok();
1523 let ai_operation_type_map = global_config.ai_operation_type_map.as_ref().ok();
1524 let http_span_allowed_hosts = global_config.options.http_span_allowed_hosts.as_slice();
1525
1526 let retention_days: i64 = project_info
1527 .config
1528 .event_retention
1529 .unwrap_or(DEFAULT_EVENT_RETENTION)
1530 .into();
1531
1532 utils::log_transaction_name_metrics(event, |event| {
1533 let event_validation_config = EventValidationConfig {
1534 received_at: Some(managed_envelope.received_at()),
1535 max_secs_in_past: Some(retention_days * 24 * 3600),
1536 max_secs_in_future: Some(self.inner.config.max_secs_in_future()),
1537 transaction_timestamp_range: Some(transaction_aggregator_config.timestamp_range()),
1538 is_validated: false,
1539 };
1540
1541 let key_id = project_info
1542 .get_public_key_config()
1543 .and_then(|key| Some(key.numeric_id?.to_string()));
1544 if full_normalization && key_id.is_none() {
1545 relay_log::error!(
1546 "project state for key {} is missing key id",
1547 managed_envelope.envelope().meta().public_key()
1548 );
1549 }
1550
1551 let normalization_config = NormalizationConfig {
1552 project_id: Some(project_id.value()),
1553 client: request_meta.client().map(str::to_owned),
1554 key_id,
1555 protocol_version: Some(request_meta.version().to_string()),
1556 grouping_config: project_info.config.grouping_config.clone(),
1557 client_ip: client_ipaddr.as_ref(),
1558 infer_ip_address: !project_info
1560 .config
1561 .datascrubbing_settings
1562 .scrub_ip_addresses,
1563 client_sample_rate: managed_envelope
1564 .envelope()
1565 .dsc()
1566 .and_then(|ctx| ctx.sample_rate),
1567 user_agent: RawUserAgentInfo {
1568 user_agent: request_meta.user_agent(),
1569 client_hints: request_meta.client_hints(),
1570 },
1571 max_name_and_unit_len: Some(
1572 transaction_aggregator_config
1573 .max_name_length
1574 .saturating_sub(MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD),
1575 ),
1576 breakdowns_config: project_info.config.breakdowns_v2.as_ref(),
1577 performance_score: project_info.config.performance_score.as_ref(),
1578 normalize_user_agent: Some(true),
1579 transaction_name_config: TransactionNameConfig {
1580 rules: &project_info.config.tx_name_rules,
1581 },
1582 device_class_synthesis_config: project_info
1583 .has_feature(Feature::DeviceClassSynthesis),
1584 enrich_spans: project_info.has_feature(Feature::ExtractSpansFromEvent),
1585 max_tag_value_length: self
1586 .inner
1587 .config
1588 .aggregator_config_for(MetricNamespace::Spans)
1589 .max_tag_value_length,
1590 is_renormalize: false,
1591 remove_other: full_normalization,
1592 emit_event_errors: full_normalization,
1593 span_description_rules: project_info.config.span_description_rules.as_ref(),
1594 geoip_lookup: Some(&self.inner.geoip_lookup),
1595 ai_model_costs,
1596 ai_operation_type_map,
1597 enable_trimming: true,
1598 measurements: Some(CombinedMeasurementsConfig::new(
1599 project_info.config().measurements.as_ref(),
1600 global_config.measurements.as_ref(),
1601 )),
1602 normalize_spans: true,
1603 replay_id: managed_envelope
1604 .envelope()
1605 .dsc()
1606 .and_then(|ctx| ctx.replay_id),
1607 span_allowed_hosts: http_span_allowed_hosts,
1608 span_op_defaults: global_config.span_op_defaults.borrow(),
1609 performance_issues_spans: project_info.has_feature(Feature::PerformanceIssuesSpans),
1610 };
1611
1612 metric!(timer(RelayTimers::EventProcessingNormalization), {
1613 validate_event(event, &event_validation_config)
1614 .map_err(|_| ProcessingError::InvalidTransaction)?;
1615 normalize_event(event, &normalization_config);
1616 if full_normalization && event::has_unprintable_fields(event) {
1617 metric!(counter(RelayCounters::EventCorrupted) += 1);
1618 }
1619 Result::<(), ProcessingError>::Ok(())
1620 })
1621 })?;
1622
1623 event_fully_normalized.0 |= full_normalization;
1624
1625 Ok(event_fully_normalized)
1626 }
1627
1628 async fn process_errors(
1630 &self,
1631 managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1632 project_id: ProjectId,
1633 project_info: Arc<ProjectInfo>,
1634 mut sampling_project_info: Option<Arc<ProjectInfo>>,
1635 rate_limits: Arc<RateLimits>,
1636 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1637 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1638 let mut metrics = Metrics::default();
1639 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1640
1641 report::process_user_reports(managed_envelope);
1643
1644 if_processing!(self.inner.config, {
1645 unreal::expand(managed_envelope, &self.inner.config)?;
1646 #[cfg(sentry)]
1647 playstation::expand(managed_envelope, &self.inner.config, &project_info)?;
1648 nnswitch::expand(managed_envelope)?;
1649 });
1650
1651 let extraction_result = event::extract(
1652 managed_envelope,
1653 &mut metrics,
1654 event_fully_normalized,
1655 &self.inner.config,
1656 )?;
1657 let mut event = extraction_result.event;
1658
1659 if_processing!(self.inner.config, {
1660 if let Some(inner_event_fully_normalized) =
1661 unreal::process(managed_envelope, &mut event)?
1662 {
1663 event_fully_normalized = inner_event_fully_normalized;
1664 }
1665 #[cfg(sentry)]
1666 if let Some(inner_event_fully_normalized) =
1667 playstation::process(managed_envelope, &mut event, &project_info)?
1668 {
1669 event_fully_normalized = inner_event_fully_normalized;
1670 }
1671 if let Some(inner_event_fully_normalized) =
1672 attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1673 {
1674 event_fully_normalized = inner_event_fully_normalized;
1675 }
1676 });
1677
1678 sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1679 managed_envelope,
1680 &mut event,
1681 project_info.clone(),
1682 sampling_project_info,
1683 );
1684 event::finalize(
1685 managed_envelope,
1686 &mut event,
1687 &mut metrics,
1688 &self.inner.config,
1689 )?;
1690 event_fully_normalized = self.normalize_event(
1691 managed_envelope,
1692 &mut event,
1693 project_id,
1694 project_info.clone(),
1695 event_fully_normalized,
1696 )?;
1697 let filter_run = event::filter(
1698 managed_envelope,
1699 &mut event,
1700 project_info.clone(),
1701 &self.inner.global_config.current(),
1702 )?;
1703
1704 if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1705 dynamic_sampling::tag_error_with_sampling_decision(
1706 managed_envelope,
1707 &mut event,
1708 sampling_project_info,
1709 &self.inner.config,
1710 )
1711 .await;
1712 }
1713
1714 event = self
1715 .enforce_quotas(
1716 managed_envelope,
1717 event,
1718 &mut extracted_metrics,
1719 &project_info,
1720 &rate_limits,
1721 )
1722 .await?;
1723
1724 if event.value().is_some() {
1725 event::scrub(&mut event, project_info.clone())?;
1726 event::serialize(
1727 managed_envelope,
1728 &mut event,
1729 event_fully_normalized,
1730 EventMetricsExtracted(false),
1731 SpansExtracted(false),
1732 )?;
1733 event::emit_feedback_metrics(managed_envelope.envelope());
1734 }
1735
1736 attachment::scrub(managed_envelope, project_info);
1737
1738 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1739 relay_log::error!(
1740 tags.project = %project_id,
1741 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1742 "ingested event without normalizing"
1743 );
1744 }
1745
1746 Ok(Some(extracted_metrics))
1747 }
1748
1749 #[allow(unused_assignments)]
1751 #[allow(clippy::too_many_arguments)]
1752 async fn process_transactions(
1753 &self,
1754 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1755 cogs: &mut Token,
1756 config: Arc<Config>,
1757 project_id: ProjectId,
1758 project_info: Arc<ProjectInfo>,
1759 mut sampling_project_info: Option<Arc<ProjectInfo>>,
1760 rate_limits: Arc<RateLimits>,
1761 reservoir_counters: ReservoirCounters,
1762 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1763 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1764 let mut event_metrics_extracted = EventMetricsExtracted(false);
1765 let mut spans_extracted = SpansExtracted(false);
1766 let mut metrics = Metrics::default();
1767 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1768
1769 let global_config = self.inner.global_config.current();
1770
1771 let extraction_result = event::extract(
1773 managed_envelope,
1774 &mut metrics,
1775 event_fully_normalized,
1776 &self.inner.config,
1777 )?;
1778
1779 if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1781 event_metrics_extracted = inner_event_metrics_extracted;
1782 }
1783 if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1784 spans_extracted = inner_spans_extracted;
1785 };
1786
1787 let mut event = extraction_result.event;
1789
1790 let profile_id = profile::filter(
1791 managed_envelope,
1792 &event,
1793 config.clone(),
1794 project_id,
1795 &project_info,
1796 );
1797 profile::transfer_id(&mut event, profile_id);
1798
1799 sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1800 managed_envelope,
1801 &mut event,
1802 project_info.clone(),
1803 sampling_project_info,
1804 );
1805
1806 event::finalize(
1807 managed_envelope,
1808 &mut event,
1809 &mut metrics,
1810 &self.inner.config,
1811 )?;
1812
1813 event_fully_normalized = self.normalize_event(
1814 managed_envelope,
1815 &mut event,
1816 project_id,
1817 project_info.clone(),
1818 event_fully_normalized,
1819 )?;
1820
1821 let filter_run = event::filter(
1822 managed_envelope,
1823 &mut event,
1824 project_info.clone(),
1825 &self.inner.global_config.current(),
1826 )?;
1827
1828 let run_dynamic_sampling =
1831 matches!(filter_run, FiltersStatus::Ok) || self.inner.config.processing_enabled();
1832
1833 let reservoir = self.new_reservoir_evaluator(
1834 managed_envelope.scoping().organization_id,
1835 reservoir_counters,
1836 );
1837
1838 let sampling_result = match run_dynamic_sampling {
1839 true => {
1840 dynamic_sampling::run(
1841 managed_envelope,
1842 &mut event,
1843 config.clone(),
1844 project_info.clone(),
1845 sampling_project_info,
1846 &reservoir,
1847 )
1848 .await
1849 }
1850 false => SamplingResult::Pending,
1851 };
1852
1853 #[cfg(feature = "processing")]
1854 let server_sample_rate = sampling_result.sample_rate();
1855
1856 if let Some(outcome) = sampling_result.into_dropped_outcome() {
1857 profile::process(
1860 managed_envelope,
1861 &mut event,
1862 &global_config,
1863 config.clone(),
1864 project_info.clone(),
1865 );
1866 event_metrics_extracted = self.extract_transaction_metrics(
1868 managed_envelope,
1869 &mut event,
1870 &mut extracted_metrics,
1871 project_id,
1872 project_info.clone(),
1873 SamplingDecision::Drop,
1874 event_metrics_extracted,
1875 spans_extracted,
1876 )?;
1877
1878 dynamic_sampling::drop_unsampled_items(
1879 managed_envelope,
1880 event,
1881 outcome,
1882 spans_extracted,
1883 );
1884
1885 event = self
1890 .enforce_quotas(
1891 managed_envelope,
1892 Annotated::empty(),
1893 &mut extracted_metrics,
1894 &project_info,
1895 &rate_limits,
1896 )
1897 .await?;
1898
1899 return Ok(Some(extracted_metrics));
1900 }
1901
1902 let _post_ds = cogs.start_category("post_ds");
1903
1904 event::scrub(&mut event, project_info.clone())?;
1908
1909 attachment::scrub(managed_envelope, project_info.clone());
1911
1912 if_processing!(self.inner.config, {
1913 let profile_id = profile::process(
1915 managed_envelope,
1916 &mut event,
1917 &global_config,
1918 config.clone(),
1919 project_info.clone(),
1920 );
1921 profile::transfer_id(&mut event, profile_id);
1922 profile::scrub_profiler_id(&mut event);
1923
1924 event_metrics_extracted = self.extract_transaction_metrics(
1926 managed_envelope,
1927 &mut event,
1928 &mut extracted_metrics,
1929 project_id,
1930 project_info.clone(),
1931 SamplingDecision::Keep,
1932 event_metrics_extracted,
1933 spans_extracted,
1934 )?;
1935
1936 if project_info.has_feature(Feature::ExtractSpansFromEvent) {
1937 spans_extracted = span::extract_from_event(
1938 managed_envelope,
1939 &event,
1940 &global_config,
1941 config,
1942 server_sample_rate,
1943 event_metrics_extracted,
1944 spans_extracted,
1945 );
1946 }
1947 });
1948
1949 event = self
1950 .enforce_quotas(
1951 managed_envelope,
1952 event,
1953 &mut extracted_metrics,
1954 &project_info,
1955 &rate_limits,
1956 )
1957 .await?;
1958
1959 if_processing!(self.inner.config, {
1960 event = span::maybe_discard_transaction(managed_envelope, event, project_info);
1961 });
1962
1963 if event.value().is_some() {
1965 event::serialize(
1966 managed_envelope,
1967 &mut event,
1968 event_fully_normalized,
1969 event_metrics_extracted,
1970 spans_extracted,
1971 )?;
1972 }
1973
1974 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1975 relay_log::error!(
1976 tags.project = %project_id,
1977 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1978 "ingested event without normalizing"
1979 );
1980 };
1981
1982 Ok(Some(extracted_metrics))
1983 }
1984
1985 async fn process_profile_chunks(
1986 &self,
1987 managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
1988 project_info: Arc<ProjectInfo>,
1989 rate_limits: Arc<RateLimits>,
1990 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1991 profile_chunk::filter(managed_envelope, project_info.clone());
1992
1993 if_processing!(self.inner.config, {
1994 profile_chunk::process(
1995 managed_envelope,
1996 &project_info,
1997 &self.inner.global_config.current(),
1998 &self.inner.config,
1999 );
2000 });
2001
2002 self.enforce_quotas(
2003 managed_envelope,
2004 Annotated::empty(),
2005 &mut ProcessingExtractedMetrics::new(),
2006 &project_info,
2007 &rate_limits,
2008 )
2009 .await?;
2010
2011 Ok(None)
2012 }
2013
2014 async fn process_standalone(
2016 &self,
2017 managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
2018 config: Arc<Config>,
2019 project_id: ProjectId,
2020 project_info: Arc<ProjectInfo>,
2021 rate_limits: Arc<RateLimits>,
2022 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2023 let mut extracted_metrics = ProcessingExtractedMetrics::new();
2024
2025 standalone::process(managed_envelope);
2026
2027 profile::filter(
2028 managed_envelope,
2029 &Annotated::empty(),
2030 config,
2031 project_id,
2032 &project_info,
2033 );
2034
2035 self.enforce_quotas(
2036 managed_envelope,
2037 Annotated::empty(),
2038 &mut extracted_metrics,
2039 &project_info,
2040 &rate_limits,
2041 )
2042 .await?;
2043
2044 report::process_user_reports(managed_envelope);
2045 attachment::scrub(managed_envelope, project_info);
2046
2047 Ok(Some(extracted_metrics))
2048 }
2049
2050 async fn process_sessions(
2052 &self,
2053 managed_envelope: &mut TypedEnvelope<SessionGroup>,
2054 config: &Config,
2055 project_info: &ProjectInfo,
2056 rate_limits: &RateLimits,
2057 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2058 let mut extracted_metrics = ProcessingExtractedMetrics::new();
2059
2060 session::process(
2061 managed_envelope,
2062 &self.inner.global_config.current(),
2063 config,
2064 &mut extracted_metrics,
2065 project_info,
2066 );
2067
2068 self.enforce_quotas(
2069 managed_envelope,
2070 Annotated::empty(),
2071 &mut extracted_metrics,
2072 project_info,
2073 rate_limits,
2074 )
2075 .await?;
2076
2077 Ok(Some(extracted_metrics))
2078 }
2079
2080 async fn process_client_reports(
2082 &self,
2083 managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
2084 config: Arc<Config>,
2085 project_info: Arc<ProjectInfo>,
2086 rate_limits: Arc<RateLimits>,
2087 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2088 let mut extracted_metrics = ProcessingExtractedMetrics::new();
2089
2090 self.enforce_quotas(
2091 managed_envelope,
2092 Annotated::empty(),
2093 &mut extracted_metrics,
2094 &project_info,
2095 &rate_limits,
2096 )
2097 .await?;
2098
2099 report::process_client_reports(
2100 managed_envelope,
2101 &config,
2102 &project_info,
2103 self.inner.addrs.outcome_aggregator.clone(),
2104 );
2105
2106 Ok(Some(extracted_metrics))
2107 }
2108
2109 async fn process_replays(
2111 &self,
2112 managed_envelope: &mut TypedEnvelope<ReplayGroup>,
2113 config: Arc<Config>,
2114 project_info: Arc<ProjectInfo>,
2115 rate_limits: Arc<RateLimits>,
2116 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2117 let mut extracted_metrics = ProcessingExtractedMetrics::new();
2118
2119 replay::process(
2120 managed_envelope,
2121 &self.inner.global_config.current(),
2122 &config,
2123 &project_info,
2124 &self.inner.geoip_lookup,
2125 )?;
2126
2127 self.enforce_quotas(
2128 managed_envelope,
2129 Annotated::empty(),
2130 &mut extracted_metrics,
2131 &project_info,
2132 &rate_limits,
2133 )
2134 .await?;
2135
2136 Ok(Some(extracted_metrics))
2137 }
2138
2139 async fn process_checkins(
2141 &self,
2142 managed_envelope: &mut TypedEnvelope<CheckInGroup>,
2143 _project_id: ProjectId,
2144 project_info: Arc<ProjectInfo>,
2145 rate_limits: Arc<RateLimits>,
2146 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2147 self.enforce_quotas(
2148 managed_envelope,
2149 Annotated::empty(),
2150 &mut ProcessingExtractedMetrics::new(),
2151 &project_info,
2152 &rate_limits,
2153 )
2154 .await?;
2155
2156 if_processing!(self.inner.config, {
2157 self.normalize_checkins(managed_envelope, _project_id);
2158 });
2159
2160 Ok(None)
2161 }
2162
2163 async fn process_nel(
2164 &self,
2165 mut managed_envelope: ManagedEnvelope,
2166 ctx: processing::Context<'_>,
2167 ) -> Result<ProcessingResult, ProcessingError> {
2168 nel::convert_to_logs(&mut managed_envelope);
2169 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
2170 .await
2171 }
2172
2173 async fn process_with_processor<P: processing::Processor>(
2174 &self,
2175 processor: &P,
2176 mut managed_envelope: ManagedEnvelope,
2177 ctx: processing::Context<'_>,
2178 ) -> Result<ProcessingResult, ProcessingError>
2179 where
2180 Outputs: From<P::Output>,
2181 {
2182 let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
2183 debug_assert!(
2184 false,
2185 "there must be work for the {} processor",
2186 std::any::type_name::<P>(),
2187 );
2188 return Err(ProcessingError::ProcessingGroupMismatch);
2189 };
2190
2191 managed_envelope.update();
2192 match managed_envelope.envelope().is_empty() {
2193 true => managed_envelope.accept(),
2194 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
2195 }
2196
2197 processor
2198 .process(work, ctx)
2199 .await
2200 .map_err(|_| ProcessingError::ProcessingFailure)
2201 .map(|o| o.map(Into::into))
2202 .map(ProcessingResult::Output)
2203 }
2204
2205 #[allow(clippy::too_many_arguments)]
2209 async fn process_standalone_spans(
2210 &self,
2211 managed_envelope: &mut TypedEnvelope<SpanGroup>,
2212 config: Arc<Config>,
2213 _project_id: ProjectId,
2214 project_info: Arc<ProjectInfo>,
2215 _sampling_project_info: Option<Arc<ProjectInfo>>,
2216 rate_limits: Arc<RateLimits>,
2217 _reservoir_counters: ReservoirCounters,
2218 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2219 let mut extracted_metrics = ProcessingExtractedMetrics::new();
2220
2221 span::expand_v2_spans(managed_envelope)?;
2222 span::filter(managed_envelope, config.clone(), project_info.clone());
2223 span::convert_otel_traces_data(managed_envelope);
2224
2225 if_processing!(self.inner.config, {
2226 let global_config = self.inner.global_config.current();
2227 let reservoir = self.new_reservoir_evaluator(
2228 managed_envelope.scoping().organization_id,
2229 _reservoir_counters,
2230 );
2231
2232 span::process(
2233 managed_envelope,
2234 &mut Annotated::empty(),
2235 &mut extracted_metrics,
2236 &global_config,
2237 config,
2238 _project_id,
2239 project_info.clone(),
2240 _sampling_project_info,
2241 &self.inner.geoip_lookup,
2242 &reservoir,
2243 )
2244 .await;
2245 });
2246
2247 self.enforce_quotas(
2248 managed_envelope,
2249 Annotated::empty(),
2250 &mut extracted_metrics,
2251 &project_info,
2252 &rate_limits,
2253 )
2254 .await?;
2255
2256 Ok(Some(extracted_metrics))
2257 }
2258
2259 async fn process_envelope(
2260 &self,
2261 cogs: &mut Token,
2262 project_id: ProjectId,
2263 message: ProcessEnvelopeGrouped,
2264 ) -> Result<ProcessingResult, ProcessingError> {
2265 let ProcessEnvelopeGrouped {
2266 group,
2267 envelope: mut managed_envelope,
2268 project_info,
2269 rate_limits,
2270 sampling_project_info,
2271 reservoir_counters,
2272 } = message;
2273
2274 if let Some(sampling_state) = sampling_project_info.as_ref() {
2276 managed_envelope
2279 .envelope_mut()
2280 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
2281 }
2282
2283 if let Some(retention) = project_info.config.event_retention {
2286 managed_envelope.envelope_mut().set_retention(retention);
2287 }
2288
2289 if let Some(retention) = project_info.config.downsampled_event_retention {
2292 managed_envelope
2293 .envelope_mut()
2294 .set_downsampled_retention(retention);
2295 }
2296
2297 managed_envelope
2302 .envelope_mut()
2303 .meta_mut()
2304 .set_project_id(project_id);
2305
2306 macro_rules! run {
2307 ($fn_name:ident $(, $args:expr)*) => {
2308 async {
2309 let mut managed_envelope = (managed_envelope, group).try_into()?;
2310 match self.$fn_name(&mut managed_envelope, $($args),*).await {
2311 Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
2312 managed_envelope: managed_envelope.into_processed(),
2313 extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
2314 }),
2315 Err(error) => {
2316 relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
2317 if let Some(outcome) = error.to_outcome() {
2318 managed_envelope.reject(outcome);
2319 }
2320
2321 return Err(error);
2322 }
2323 }
2324 }.await
2325 };
2326 }
2327
2328 let global_config = self.inner.global_config.current();
2329 let ctx = processing::Context {
2330 config: &self.inner.config,
2331 global_config: &global_config,
2332 project_info: &project_info,
2333 sampling_project_info: sampling_project_info.as_deref(),
2334 rate_limits: &rate_limits,
2335 };
2336
2337 relay_log::trace!("Processing {group} group", group = group.variant());
2338
2339 match group {
2340 ProcessingGroup::Error => run!(
2341 process_errors,
2342 project_id,
2343 project_info,
2344 sampling_project_info,
2345 rate_limits
2346 ),
2347 ProcessingGroup::Transaction => {
2348 run!(
2349 process_transactions,
2350 cogs,
2351 self.inner.config.clone(),
2352 project_id,
2353 project_info,
2354 sampling_project_info,
2355 rate_limits,
2356 reservoir_counters
2357 )
2358 }
2359 ProcessingGroup::Session => run!(
2360 process_sessions,
2361 &self.inner.config.clone(),
2362 &project_info,
2363 &rate_limits
2364 ),
2365 ProcessingGroup::Standalone => run!(
2366 process_standalone,
2367 self.inner.config.clone(),
2368 project_id,
2369 project_info,
2370 rate_limits
2371 ),
2372 ProcessingGroup::ClientReport => run!(
2373 process_client_reports,
2374 self.inner.config.clone(),
2375 project_info,
2376 rate_limits
2377 ),
2378 ProcessingGroup::Replay => {
2379 run!(
2380 process_replays,
2381 self.inner.config.clone(),
2382 project_info,
2383 rate_limits
2384 )
2385 }
2386 ProcessingGroup::CheckIn => {
2387 run!(process_checkins, project_id, project_info, rate_limits)
2388 }
2389 ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
2390 ProcessingGroup::Log => {
2391 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
2392 .await
2393 }
2394 ProcessingGroup::SpanV2 => {
2395 self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
2396 .await
2397 }
2398 ProcessingGroup::Span => run!(
2399 process_standalone_spans,
2400 self.inner.config.clone(),
2401 project_id,
2402 project_info,
2403 sampling_project_info,
2404 rate_limits,
2405 reservoir_counters
2406 ),
2407 ProcessingGroup::ProfileChunk => {
2408 run!(process_profile_chunks, project_info, rate_limits)
2409 }
2410 ProcessingGroup::Metrics => {
2412 if self.inner.config.relay_mode() != RelayMode::Proxy {
2415 relay_log::error!(
2416 tags.project = %project_id,
2417 items = ?managed_envelope.envelope().items().next().map(Item::ty),
2418 "received metrics in the process_state"
2419 );
2420 }
2421
2422 Ok(ProcessingResult::no_metrics(
2423 managed_envelope.into_processed(),
2424 ))
2425 }
2426 ProcessingGroup::Ungrouped => {
2428 relay_log::error!(
2429 tags.project = %project_id,
2430 items = ?managed_envelope.envelope().items().next().map(Item::ty),
2431 "could not identify the processing group based on the envelope's items"
2432 );
2433
2434 Ok(ProcessingResult::no_metrics(
2435 managed_envelope.into_processed(),
2436 ))
2437 }
2438 ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
2442 managed_envelope.into_processed(),
2443 )),
2444 }
2445 }
2446
2447 async fn process(
2453 &self,
2454 cogs: &mut Token,
2455 mut message: ProcessEnvelopeGrouped,
2456 ) -> Result<Option<Submit>, ProcessingError> {
2457 let ProcessEnvelopeGrouped {
2458 ref mut envelope,
2459 ref project_info,
2460 ref sampling_project_info,
2461 ..
2462 } = message;
2463
2464 let Some(project_id) = project_info
2471 .project_id
2472 .or_else(|| envelope.envelope().meta().project_id())
2473 else {
2474 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2475 return Err(ProcessingError::MissingProjectId);
2476 };
2477
2478 let client = envelope.envelope().meta().client().map(str::to_owned);
2479 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2480 let project_key = envelope.envelope().meta().public_key();
2481 let sampling_key = envelope
2485 .envelope()
2486 .sampling_key()
2487 .filter(|_| sampling_project_info.is_some());
2488
2489 relay_log::configure_scope(|scope| {
2492 scope.set_tag("project", project_id);
2493 if let Some(client) = client {
2494 scope.set_tag("sdk", client);
2495 }
2496 if let Some(user_agent) = user_agent {
2497 scope.set_extra("user_agent", user_agent.into());
2498 }
2499 });
2500
2501 let result = match self.process_envelope(cogs, project_id, message).await {
2502 Ok(ProcessingResult::Envelope {
2503 mut managed_envelope,
2504 extracted_metrics,
2505 }) => {
2506 managed_envelope.update();
2509
2510 let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2511 send_metrics(
2512 extracted_metrics.metrics,
2513 project_key,
2514 sampling_key,
2515 &self.inner.addrs.aggregator,
2516 );
2517
2518 let envelope_response = if managed_envelope.envelope().is_empty() {
2519 if !has_metrics {
2520 managed_envelope.reject(Outcome::RateLimited(None));
2522 } else {
2523 managed_envelope.accept();
2524 }
2525
2526 None
2527 } else {
2528 Some(managed_envelope)
2529 };
2530
2531 Ok(envelope_response.map(Submit::Envelope))
2532 }
2533 Ok(ProcessingResult::Output(Output { main, metrics })) => {
2534 if let Some(metrics) = metrics {
2535 metrics.accept(|metrics| {
2536 send_metrics(
2537 metrics,
2538 project_key,
2539 sampling_key,
2540 &self.inner.addrs.aggregator,
2541 );
2542 });
2543 }
2544
2545 Ok(main.map(Submit::Output))
2546 }
2547 Err(err) => Err(err),
2548 };
2549
2550 relay_log::configure_scope(|scope| {
2551 scope.remove_tag("project");
2552 scope.remove_tag("sdk");
2553 scope.remove_tag("user_agent");
2554 });
2555
2556 result
2557 }
2558
2559 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2560 let project_key = message.envelope.envelope().meta().public_key();
2561 let wait_time = message.envelope.age();
2562 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2563
2564 cogs.cancel();
2567
2568 let scoping = message.envelope.scoping();
2569 for (group, envelope) in ProcessingGroup::split_envelope(
2570 *message.envelope.into_envelope(),
2571 &message.project_info,
2572 ) {
2573 let mut cogs = self
2574 .inner
2575 .cogs
2576 .timed(ResourceId::Relay, AppFeature::from(group));
2577
2578 let mut envelope =
2579 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2580 envelope.scope(scoping);
2581
2582 let message = ProcessEnvelopeGrouped {
2583 group,
2584 envelope,
2585 project_info: Arc::clone(&message.project_info),
2586 rate_limits: Arc::clone(&message.rate_limits),
2587 sampling_project_info: message.sampling_project_info.as_ref().map(Arc::clone),
2588 reservoir_counters: Arc::clone(&message.reservoir_counters),
2589 };
2590
2591 let result = metric!(
2592 timer(RelayTimers::EnvelopeProcessingTime),
2593 group = group.variant(),
2594 { self.process(&mut cogs, message).await }
2595 );
2596
2597 match result {
2598 Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2599 Ok(None) => {}
2600 Err(error) if error.is_unexpected() => {
2601 relay_log::error!(
2602 tags.project_key = %project_key,
2603 error = &error as &dyn Error,
2604 "error processing envelope"
2605 )
2606 }
2607 Err(error) => {
2608 relay_log::debug!(
2609 tags.project_key = %project_key,
2610 error = &error as &dyn Error,
2611 "error processing envelope"
2612 )
2613 }
2614 }
2615 }
2616 }
2617
2618 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2619 let ProcessMetrics {
2620 data,
2621 project_key,
2622 received_at,
2623 sent_at,
2624 source,
2625 } = message;
2626
2627 let received_timestamp =
2628 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2629
2630 let mut buckets = data.into_buckets(received_timestamp);
2631 if buckets.is_empty() {
2632 return;
2633 };
2634 cogs.update(relay_metrics::cogs::BySize(&buckets));
2635
2636 let clock_drift_processor =
2637 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2638
2639 buckets.retain_mut(|bucket| {
2640 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2641 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2642 return false;
2643 }
2644
2645 if !self::metrics::is_valid_namespace(bucket) {
2646 return false;
2647 }
2648
2649 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2650
2651 if !matches!(source, BucketSource::Internal) {
2652 bucket.metadata = BucketMetadata::new(received_timestamp);
2653 }
2654
2655 true
2656 });
2657
2658 let project = self.inner.project_cache.get(project_key);
2659
2660 let buckets = match project.state() {
2663 ProjectState::Enabled(project_info) => {
2664 let rate_limits = project.rate_limits().current_limits();
2665 self.check_buckets(project_key, project_info, &rate_limits, buckets)
2666 }
2667 _ => buckets,
2668 };
2669
2670 relay_log::trace!("merging metric buckets into the aggregator");
2671 self.inner
2672 .addrs
2673 .aggregator
2674 .send(MergeBuckets::new(project_key, buckets));
2675 }
2676
2677 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2678 let ProcessBatchedMetrics {
2679 payload,
2680 source,
2681 received_at,
2682 sent_at,
2683 } = message;
2684
2685 #[derive(serde::Deserialize)]
2686 struct Wrapper {
2687 buckets: HashMap<ProjectKey, Vec<Bucket>>,
2688 }
2689
2690 let buckets = match serde_json::from_slice(&payload) {
2691 Ok(Wrapper { buckets }) => buckets,
2692 Err(error) => {
2693 relay_log::debug!(
2694 error = &error as &dyn Error,
2695 "failed to parse batched metrics",
2696 );
2697 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2698 return;
2699 }
2700 };
2701
2702 for (project_key, buckets) in buckets {
2703 self.handle_process_metrics(
2704 cogs,
2705 ProcessMetrics {
2706 data: MetricData::Parsed(buckets),
2707 project_key,
2708 source,
2709 received_at,
2710 sent_at,
2711 },
2712 )
2713 }
2714 }
2715
2716 fn submit_upstream(&self, cogs: &mut Token, submit: Submit) {
2717 let _submit = cogs.start_category("submit");
2718
2719 #[cfg(feature = "processing")]
2720 if self.inner.config.processing_enabled()
2721 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
2722 {
2723 match submit {
2724 Submit::Envelope(envelope) => store_forwarder.send(StoreEnvelope { envelope }),
2725 Submit::Output(output) => output
2726 .forward_store(store_forwarder)
2727 .unwrap_or_else(|err| err.into_inner()),
2728 }
2729 return;
2730 }
2731
2732 let mut envelope = match submit {
2733 Submit::Envelope(envelope) => envelope,
2734 Submit::Output(output) => match output.serialize_envelope() {
2735 Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2736 Err(_) => {
2737 relay_log::error!("failed to serialize output to an envelope");
2738 return;
2739 }
2740 },
2741 };
2742
2743 if envelope.envelope_mut().is_empty() {
2744 envelope.accept();
2745 return;
2746 }
2747
2748 envelope.envelope_mut().set_sent_at(Utc::now());
2754
2755 relay_log::trace!("sending envelope to sentry endpoint");
2756 let http_encoding = self.inner.config.http_encoding();
2757 let result = envelope.envelope().to_vec().and_then(|v| {
2758 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2759 });
2760
2761 match result {
2762 Ok(body) => {
2763 self.inner
2764 .addrs
2765 .upstream_relay
2766 .send(SendRequest(SendEnvelope {
2767 envelope,
2768 body,
2769 http_encoding,
2770 project_cache: self.inner.project_cache.clone(),
2771 }));
2772 }
2773 Err(error) => {
2774 relay_log::error!(
2777 error = &error as &dyn Error,
2778 tags.project_key = %envelope.scoping().project_key,
2779 "failed to serialize envelope payload"
2780 );
2781
2782 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2783 }
2784 }
2785 }
2786
2787 fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2788 let SubmitClientReports {
2789 client_reports,
2790 scoping,
2791 } = message;
2792
2793 let upstream = self.inner.config.upstream_descriptor();
2794 let dsn = PartialDsn::outbound(&scoping, upstream);
2795
2796 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2797 for client_report in client_reports {
2798 let mut item = Item::new(ItemType::ClientReport);
2799 item.set_payload(ContentType::Json, client_report.serialize().unwrap()); envelope.add_item(item);
2801 }
2802
2803 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
2804 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2805 }
2806
2807 fn check_buckets(
2808 &self,
2809 project_key: ProjectKey,
2810 project_info: &ProjectInfo,
2811 rate_limits: &RateLimits,
2812 buckets: Vec<Bucket>,
2813 ) -> Vec<Bucket> {
2814 let Some(scoping) = project_info.scoping(project_key) else {
2815 relay_log::error!(
2816 tags.project_key = project_key.as_str(),
2817 "there is no scoping: dropping {} buckets",
2818 buckets.len(),
2819 );
2820 return Vec::new();
2821 };
2822
2823 let mut buckets = self::metrics::apply_project_info(
2824 buckets,
2825 &self.inner.metric_outcomes,
2826 project_info,
2827 scoping,
2828 );
2829
2830 let namespaces: BTreeSet<MetricNamespace> = buckets
2831 .iter()
2832 .filter_map(|bucket| bucket.name.try_namespace())
2833 .collect();
2834
2835 for namespace in namespaces {
2836 let limits = rate_limits.check_with_quotas(
2837 project_info.get_quotas(),
2838 scoping.item(DataCategory::MetricBucket),
2839 );
2840
2841 if limits.is_limited() {
2842 let rejected;
2843 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2844 bucket.name.try_namespace() == Some(namespace)
2845 });
2846
2847 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2848 self.inner.metric_outcomes.track(
2849 scoping,
2850 &rejected,
2851 Outcome::RateLimited(reason_code),
2852 );
2853 }
2854 }
2855
2856 let quotas = project_info.config.quotas.clone();
2857 match MetricsLimiter::create(buckets, quotas, scoping) {
2858 Ok(mut bucket_limiter) => {
2859 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2860 bucket_limiter.into_buckets()
2861 }
2862 Err(buckets) => buckets,
2863 }
2864 }
2865
2866 #[cfg(feature = "processing")]
2867 async fn rate_limit_buckets(
2868 &self,
2869 scoping: Scoping,
2870 project_info: &ProjectInfo,
2871 mut buckets: Vec<Bucket>,
2872 ) -> Vec<Bucket> {
2873 let Some(rate_limiter) = &self.inner.rate_limiter else {
2874 return buckets;
2875 };
2876
2877 let global_config = self.inner.global_config.current();
2878 let namespaces = buckets
2879 .iter()
2880 .filter_map(|bucket| bucket.name.try_namespace())
2881 .counts();
2882
2883 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2884
2885 for (namespace, quantity) in namespaces {
2886 let item_scoping = scoping.metric_bucket(namespace);
2887
2888 let limits = match rate_limiter
2889 .is_rate_limited(quotas, item_scoping, quantity, false)
2890 .await
2891 {
2892 Ok(limits) => limits,
2893 Err(err) => {
2894 relay_log::error!(
2895 error = &err as &dyn std::error::Error,
2896 "failed to check redis rate limits"
2897 );
2898 break;
2899 }
2900 };
2901
2902 if limits.is_limited() {
2903 let rejected;
2904 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2905 bucket.name.try_namespace() == Some(namespace)
2906 });
2907
2908 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2909 self.inner.metric_outcomes.track(
2910 scoping,
2911 &rejected,
2912 Outcome::RateLimited(reason_code),
2913 );
2914
2915 self.inner
2916 .project_cache
2917 .get(item_scoping.scoping.project_key)
2918 .rate_limits()
2919 .merge(limits);
2920 }
2921 }
2922
2923 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2924 Err(buckets) => buckets,
2925 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2926 }
2927 }
2928
2929 #[cfg(feature = "processing")]
2931 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2932 relay_log::trace!("handle_rate_limit_buckets");
2933
2934 let scoping = *bucket_limiter.scoping();
2935
2936 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2937 let global_config = self.inner.global_config.current();
2938 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2939
2940 let over_accept_once = true;
2943 let mut rate_limits = RateLimits::new();
2944
2945 for category in [DataCategory::Transaction, DataCategory::Span] {
2946 let count = bucket_limiter.count(category);
2947
2948 let timer = Instant::now();
2949 let mut is_limited = false;
2950
2951 if let Some(count) = count {
2952 match rate_limiter
2953 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2954 .await
2955 {
2956 Ok(limits) => {
2957 is_limited = limits.is_limited();
2958 rate_limits.merge(limits)
2959 }
2960 Err(e) => relay_log::error!(error = &e as &dyn Error),
2961 }
2962 }
2963
2964 relay_statsd::metric!(
2965 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2966 category = category.name(),
2967 limited = if is_limited { "true" } else { "false" },
2968 count = match count {
2969 None => "none",
2970 Some(0) => "0",
2971 Some(1) => "1",
2972 Some(1..=10) => "10",
2973 Some(1..=25) => "25",
2974 Some(1..=50) => "50",
2975 Some(51..=100) => "100",
2976 Some(101..=500) => "500",
2977 _ => "> 500",
2978 },
2979 );
2980 }
2981
2982 if rate_limits.is_limited() {
2983 let was_enforced =
2984 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2985
2986 if was_enforced {
2987 self.inner
2989 .project_cache
2990 .get(scoping.project_key)
2991 .rate_limits()
2992 .merge(rate_limits);
2993 }
2994 }
2995 }
2996
2997 bucket_limiter.into_buckets()
2998 }
2999
3000 #[cfg(feature = "processing")]
3002 async fn cardinality_limit_buckets(
3003 &self,
3004 scoping: Scoping,
3005 limits: &[CardinalityLimit],
3006 buckets: Vec<Bucket>,
3007 ) -> Vec<Bucket> {
3008 let global_config = self.inner.global_config.current();
3009 let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
3010
3011 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
3012 return buckets;
3013 }
3014
3015 let Some(ref limiter) = self.inner.cardinality_limiter else {
3016 return buckets;
3017 };
3018
3019 let scope = relay_cardinality::Scoping {
3020 organization_id: scoping.organization_id,
3021 project_id: scoping.project_id,
3022 };
3023
3024 let limits = match limiter
3025 .check_cardinality_limits(scope, limits, buckets)
3026 .await
3027 {
3028 Ok(limits) => limits,
3029 Err((buckets, error)) => {
3030 relay_log::error!(
3031 error = &error as &dyn std::error::Error,
3032 "cardinality limiter failed"
3033 );
3034 return buckets;
3035 }
3036 };
3037
3038 let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
3039 if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
3040 for limit in limits.exceeded_limits() {
3041 relay_log::with_scope(
3042 |scope| {
3043 scope.set_user(Some(relay_log::sentry::User {
3045 id: Some(scoping.organization_id.to_string()),
3046 ..Default::default()
3047 }));
3048 },
3049 || {
3050 relay_log::error!(
3051 tags.organization_id = scoping.organization_id.value(),
3052 tags.limit_id = limit.id,
3053 tags.passive = limit.passive,
3054 "Cardinality Limit"
3055 );
3056 },
3057 );
3058 }
3059 }
3060
3061 for (limit, reports) in limits.cardinality_reports() {
3062 for report in reports {
3063 self.inner
3064 .metric_outcomes
3065 .cardinality(scoping, limit, report);
3066 }
3067 }
3068
3069 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
3070 return limits.into_source();
3071 }
3072
3073 let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
3074
3075 for (bucket, exceeded) in rejected {
3076 self.inner.metric_outcomes.track(
3077 scoping,
3078 &[bucket],
3079 Outcome::CardinalityLimited(exceeded.id.clone()),
3080 );
3081 }
3082 accepted
3083 }
3084
3085 #[cfg(feature = "processing")]
3092 async fn encode_metrics_processing(
3093 &self,
3094 message: FlushBuckets,
3095 store_forwarder: &Addr<Store>,
3096 ) {
3097 use crate::constants::DEFAULT_EVENT_RETENTION;
3098 use crate::services::store::StoreMetrics;
3099
3100 for ProjectBuckets {
3101 buckets,
3102 scoping,
3103 project_info,
3104 ..
3105 } in message.buckets.into_values()
3106 {
3107 let buckets = self
3108 .rate_limit_buckets(scoping, &project_info, buckets)
3109 .await;
3110
3111 let limits = project_info.get_cardinality_limits();
3112 let buckets = self
3113 .cardinality_limit_buckets(scoping, limits, buckets)
3114 .await;
3115
3116 if buckets.is_empty() {
3117 continue;
3118 }
3119
3120 let retention = project_info
3121 .config
3122 .event_retention
3123 .unwrap_or(DEFAULT_EVENT_RETENTION);
3124
3125 store_forwarder.send(StoreMetrics {
3128 buckets,
3129 scoping,
3130 retention,
3131 });
3132 }
3133 }
3134
3135 fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
3147 let FlushBuckets {
3148 partition_key,
3149 buckets,
3150 } = message;
3151
3152 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
3153 let upstream = self.inner.config.upstream_descriptor();
3154
3155 for ProjectBuckets {
3156 buckets, scoping, ..
3157 } in buckets.values()
3158 {
3159 let dsn = PartialDsn::outbound(scoping, upstream);
3160
3161 relay_statsd::metric!(
3162 histogram(RelayHistograms::PartitionKeys) = u64::from(partition_key)
3163 );
3164
3165 let mut num_batches = 0;
3166 for batch in BucketsView::from(buckets).by_size(batch_size) {
3167 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
3168
3169 let mut item = Item::new(ItemType::MetricBuckets);
3170 item.set_source_quantities(crate::metrics::extract_quantities(batch));
3171 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
3172 envelope.add_item(item);
3173
3174 let mut envelope =
3175 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
3176 envelope
3177 .set_partition_key(Some(partition_key))
3178 .scope(*scoping);
3179
3180 relay_statsd::metric!(
3181 histogram(RelayHistograms::BucketsPerBatch) = batch.len() as u64
3182 );
3183
3184 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
3185 num_batches += 1;
3186 }
3187
3188 relay_statsd::metric!(histogram(RelayHistograms::BatchesPerPartition) = num_batches);
3189 }
3190 }
3191
3192 fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
3194 if partition.is_empty() {
3195 return;
3196 }
3197
3198 let (unencoded, project_info) = partition.take();
3199 let http_encoding = self.inner.config.http_encoding();
3200 let encoded = match encode_payload(&unencoded, http_encoding) {
3201 Ok(payload) => payload,
3202 Err(error) => {
3203 let error = &error as &dyn std::error::Error;
3204 relay_log::error!(error, "failed to encode metrics payload");
3205 return;
3206 }
3207 };
3208
3209 let request = SendMetricsRequest {
3210 partition_key: partition_key.to_string(),
3211 unencoded,
3212 encoded,
3213 project_info,
3214 http_encoding,
3215 metric_outcomes: self.inner.metric_outcomes.clone(),
3216 };
3217
3218 self.inner.addrs.upstream_relay.send(SendRequest(request));
3219 }
3220
3221 fn encode_metrics_global(&self, message: FlushBuckets) {
3236 let FlushBuckets {
3237 partition_key,
3238 buckets,
3239 } = message;
3240
3241 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
3242 let mut partition = Partition::new(batch_size);
3243 let mut partition_splits = 0;
3244
3245 for ProjectBuckets {
3246 buckets, scoping, ..
3247 } in buckets.values()
3248 {
3249 for bucket in buckets {
3250 let mut remaining = Some(BucketView::new(bucket));
3251
3252 while let Some(bucket) = remaining.take() {
3253 if let Some(next) = partition.insert(bucket, *scoping) {
3254 self.send_global_partition(partition_key, &mut partition);
3258 remaining = Some(next);
3259 partition_splits += 1;
3260 }
3261 }
3262 }
3263 }
3264
3265 if partition_splits > 0 {
3266 metric!(histogram(RelayHistograms::PartitionSplits) = partition_splits);
3267 }
3268
3269 self.send_global_partition(partition_key, &mut partition);
3270 }
3271
3272 async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
3273 for (project_key, pb) in message.buckets.iter_mut() {
3274 let buckets = std::mem::take(&mut pb.buckets);
3275 pb.buckets =
3276 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
3277 }
3278
3279 #[cfg(feature = "processing")]
3280 if self.inner.config.processing_enabled()
3281 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
3282 {
3283 return self
3284 .encode_metrics_processing(message, store_forwarder)
3285 .await;
3286 }
3287
3288 if self.inner.config.http_global_metrics() {
3289 self.encode_metrics_global(message)
3290 } else {
3291 self.encode_metrics_envelope(cogs, message)
3292 }
3293 }
3294
3295 #[cfg(all(test, feature = "processing"))]
3296 fn redis_rate_limiter_enabled(&self) -> bool {
3297 self.inner.rate_limiter.is_some()
3298 }
3299
3300 async fn handle_message(&self, message: EnvelopeProcessor) {
3301 let ty = message.variant();
3302 let feature_weights = self.feature_weights(&message);
3303
3304 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
3305 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
3306
3307 match message {
3308 EnvelopeProcessor::ProcessEnvelope(m) => {
3309 self.handle_process_envelope(&mut cogs, *m).await
3310 }
3311 EnvelopeProcessor::ProcessProjectMetrics(m) => {
3312 self.handle_process_metrics(&mut cogs, *m)
3313 }
3314 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
3315 self.handle_process_batched_metrics(&mut cogs, *m)
3316 }
3317 EnvelopeProcessor::FlushBuckets(m) => {
3318 self.handle_flush_buckets(&mut cogs, *m).await
3319 }
3320 EnvelopeProcessor::SubmitClientReports(m) => {
3321 self.handle_submit_client_reports(&mut cogs, *m)
3322 }
3323 }
3324 });
3325 }
3326
3327 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
3328 match message {
3329 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
3331 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
3332 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
3333 EnvelopeProcessor::FlushBuckets(v) => v
3334 .buckets
3335 .values()
3336 .map(|s| {
3337 if self.inner.config.processing_enabled() {
3338 relay_metrics::cogs::ByCount(&s.buckets).into()
3341 } else {
3342 relay_metrics::cogs::BySize(&s.buckets).into()
3343 }
3344 })
3345 .fold(FeatureWeights::none(), FeatureWeights::merge),
3346 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
3347 }
3348 }
3349
3350 fn new_reservoir_evaluator(
3351 &self,
3352 _organization_id: OrganizationId,
3353 reservoir_counters: ReservoirCounters,
3354 ) -> ReservoirEvaluator<'_> {
3355 #[cfg_attr(not(feature = "processing"), expect(unused_mut))]
3356 let mut reservoir = ReservoirEvaluator::new(reservoir_counters);
3357
3358 #[cfg(feature = "processing")]
3359 if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
3360 reservoir.set_redis(_organization_id, quotas_client);
3361 }
3362
3363 reservoir
3364 }
3365}
3366
3367impl Service for EnvelopeProcessorService {
3368 type Interface = EnvelopeProcessor;
3369
3370 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
3371 while let Some(message) = rx.recv().await {
3372 let service = self.clone();
3373 self.inner
3374 .pool
3375 .spawn_async(
3376 async move {
3377 service.handle_message(message).await;
3378 }
3379 .boxed(),
3380 )
3381 .await;
3382 }
3383 }
3384}
3385
3386struct EnforcementResult {
3391 event: Annotated<Event>,
3392 #[cfg_attr(not(feature = "processing"), expect(dead_code))]
3393 rate_limits: RateLimits,
3394}
3395
3396impl EnforcementResult {
3397 pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
3399 Self { event, rate_limits }
3400 }
3401}
3402
3403#[derive(Clone)]
3404enum RateLimiter {
3405 Cached,
3406 #[cfg(feature = "processing")]
3407 Consistent(Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>),
3408}
3409
3410impl RateLimiter {
3411 async fn enforce<Group>(
3412 &self,
3413 managed_envelope: &mut TypedEnvelope<Group>,
3414 event: Annotated<Event>,
3415 _extracted_metrics: &mut ProcessingExtractedMetrics,
3416 global_config: &GlobalConfig,
3417 project_info: &ProjectInfo,
3418 rate_limits: &RateLimits,
3419 ) -> Result<EnforcementResult, ProcessingError> {
3420 if managed_envelope.envelope().is_empty() && event.value().is_none() {
3421 return Ok(EnforcementResult::new(event, RateLimits::default()));
3422 }
3423
3424 let quotas = CombinedQuotas::new(global_config, project_info.get_quotas());
3425 if quotas.is_empty() {
3426 return Ok(EnforcementResult::new(event, RateLimits::default()));
3427 }
3428
3429 let event_category = event_category(&event);
3430
3431 let this = self.clone();
3437 let rate_limits_clone = rate_limits.clone();
3438 let mut envelope_limiter =
3439 EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
3440 let this = this.clone();
3441 let rate_limits_clone = rate_limits_clone.clone();
3442
3443 async move {
3444 match this {
3445 #[cfg(feature = "processing")]
3446 RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
3447 rate_limiter
3448 .is_rate_limited(quotas, item_scope, _quantity, false)
3449 .await?,
3450 ),
3451 _ => Ok::<_, ProcessingError>(
3452 rate_limits_clone.check_with_quotas(quotas, item_scope),
3453 ),
3454 }
3455 }
3456 });
3457
3458 if let Some(category) = event_category {
3461 envelope_limiter.assume_event(category);
3462 }
3463
3464 let scoping = managed_envelope.scoping();
3465 let (enforcement, rate_limits) =
3466 metric!(timer(RelayTimers::EventProcessingRateLimiting), {
3467 envelope_limiter
3468 .compute(managed_envelope.envelope_mut(), &scoping)
3469 .await
3470 })?;
3471 let event_active = enforcement.is_event_active();
3472
3473 #[cfg(feature = "processing")]
3477 _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3478 enforcement.apply_with_outcomes(managed_envelope);
3479
3480 if event_active {
3481 debug_assert!(managed_envelope.envelope().is_empty());
3482 return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3483 }
3484
3485 Ok(EnforcementResult::new(event, rate_limits))
3486 }
3487}
3488
3489fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3490 let envelope_body: Vec<u8> = match http_encoding {
3491 HttpEncoding::Identity => return Ok(body.clone()),
3492 HttpEncoding::Deflate => {
3493 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3494 encoder.write_all(body.as_ref())?;
3495 encoder.finish()?
3496 }
3497 HttpEncoding::Gzip => {
3498 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3499 encoder.write_all(body.as_ref())?;
3500 encoder.finish()?
3501 }
3502 HttpEncoding::Br => {
3503 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3505 encoder.write_all(body.as_ref())?;
3506 encoder.into_inner()
3507 }
3508 HttpEncoding::Zstd => {
3509 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3512 encoder.write_all(body.as_ref())?;
3513 encoder.finish()?
3514 }
3515 };
3516
3517 Ok(envelope_body.into())
3518}
3519
3520#[derive(Debug)]
3522pub struct SendEnvelope {
3523 envelope: TypedEnvelope<Processed>,
3524 body: Bytes,
3525 http_encoding: HttpEncoding,
3526 project_cache: ProjectCacheHandle,
3527}
3528
3529impl UpstreamRequest for SendEnvelope {
3530 fn method(&self) -> reqwest::Method {
3531 reqwest::Method::POST
3532 }
3533
3534 fn path(&self) -> Cow<'_, str> {
3535 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3536 }
3537
3538 fn route(&self) -> &'static str {
3539 "envelope"
3540 }
3541
3542 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3543 let envelope_body = self.body.clone();
3544 metric!(histogram(RelayHistograms::UpstreamEnvelopeBodySize) = envelope_body.len() as u64);
3545
3546 let meta = &self.envelope.meta();
3547 let shard = self.envelope.partition_key().map(|p| p.to_string());
3548 builder
3549 .content_encoding(self.http_encoding)
3550 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3551 .header_opt("User-Agent", meta.user_agent())
3552 .header("X-Sentry-Auth", meta.auth_header())
3553 .header("X-Forwarded-For", meta.forwarded_for())
3554 .header("Content-Type", envelope::CONTENT_TYPE)
3555 .header_opt("X-Sentry-Relay-Shard", shard)
3556 .body(envelope_body);
3557
3558 Ok(())
3559 }
3560
3561 fn sign(&mut self) -> Option<Sign> {
3562 Some(Sign::Optional(SignatureType::RequestSign))
3563 }
3564
3565 fn respond(
3566 self: Box<Self>,
3567 result: Result<http::Response, UpstreamRequestError>,
3568 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3569 Box::pin(async move {
3570 let result = match result {
3571 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3572 Err(error) => Err(error),
3573 };
3574
3575 match result {
3576 Ok(()) => self.envelope.accept(),
3577 Err(error) if error.is_received() => {
3578 let scoping = self.envelope.scoping();
3579 self.envelope.accept();
3580
3581 if let UpstreamRequestError::RateLimited(limits) = error {
3582 self.project_cache
3583 .get(scoping.project_key)
3584 .rate_limits()
3585 .merge(limits.scope(&scoping));
3586 }
3587 }
3588 Err(error) => {
3589 let mut envelope = self.envelope;
3592 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3593 relay_log::error!(
3594 error = &error as &dyn Error,
3595 tags.project_key = %envelope.scoping().project_key,
3596 "error sending envelope"
3597 );
3598 }
3599 }
3600 })
3601 }
3602}
3603
3604#[derive(Debug)]
3611struct Partition<'a> {
3612 max_size: usize,
3613 remaining: usize,
3614 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3615 project_info: HashMap<ProjectKey, Scoping>,
3616}
3617
3618impl<'a> Partition<'a> {
3619 pub fn new(size: usize) -> Self {
3621 Self {
3622 max_size: size,
3623 remaining: size,
3624 views: HashMap::new(),
3625 project_info: HashMap::new(),
3626 }
3627 }
3628
3629 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3640 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3641
3642 if let Some(current) = current {
3643 self.remaining = self.remaining.saturating_sub(current.estimated_size());
3644 self.views
3645 .entry(scoping.project_key)
3646 .or_default()
3647 .push(current);
3648
3649 self.project_info
3650 .entry(scoping.project_key)
3651 .or_insert(scoping);
3652 }
3653
3654 next
3655 }
3656
3657 fn is_empty(&self) -> bool {
3659 self.views.is_empty()
3660 }
3661
3662 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3666 #[derive(serde::Serialize)]
3667 struct Wrapper<'a> {
3668 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3669 }
3670
3671 let buckets = &self.views;
3672 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3673
3674 let scopings = self.project_info.clone();
3675 self.project_info.clear();
3676
3677 self.views.clear();
3678 self.remaining = self.max_size;
3679
3680 (payload, scopings)
3681 }
3682}
3683
3684#[derive(Debug)]
3688struct SendMetricsRequest {
3689 partition_key: String,
3691 unencoded: Bytes,
3693 encoded: Bytes,
3695 project_info: HashMap<ProjectKey, Scoping>,
3699 http_encoding: HttpEncoding,
3701 metric_outcomes: MetricOutcomes,
3703}
3704
3705impl SendMetricsRequest {
3706 fn create_error_outcomes(self) {
3707 #[derive(serde::Deserialize)]
3708 struct Wrapper {
3709 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3710 }
3711
3712 let buckets = match serde_json::from_slice(&self.unencoded) {
3713 Ok(Wrapper { buckets }) => buckets,
3714 Err(err) => {
3715 relay_log::error!(
3716 error = &err as &dyn std::error::Error,
3717 "failed to parse buckets from failed transmission"
3718 );
3719 return;
3720 }
3721 };
3722
3723 for (key, buckets) in buckets {
3724 let Some(&scoping) = self.project_info.get(&key) else {
3725 relay_log::error!("missing scoping for project key");
3726 continue;
3727 };
3728
3729 self.metric_outcomes.track(
3730 scoping,
3731 &buckets,
3732 Outcome::Invalid(DiscardReason::Internal),
3733 );
3734 }
3735 }
3736}
3737
3738impl UpstreamRequest for SendMetricsRequest {
3739 fn set_relay_id(&self) -> bool {
3740 true
3741 }
3742
3743 fn sign(&mut self) -> Option<Sign> {
3744 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3745 }
3746
3747 fn method(&self) -> reqwest::Method {
3748 reqwest::Method::POST
3749 }
3750
3751 fn path(&self) -> Cow<'_, str> {
3752 "/api/0/relays/metrics/".into()
3753 }
3754
3755 fn route(&self) -> &'static str {
3756 "global_metrics"
3757 }
3758
3759 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3760 metric!(histogram(RelayHistograms::UpstreamMetricsBodySize) = self.encoded.len() as u64);
3761
3762 builder
3763 .content_encoding(self.http_encoding)
3764 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3765 .header(header::CONTENT_TYPE, b"application/json")
3766 .body(self.encoded.clone());
3767
3768 Ok(())
3769 }
3770
3771 fn respond(
3772 self: Box<Self>,
3773 result: Result<http::Response, UpstreamRequestError>,
3774 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3775 Box::pin(async {
3776 match result {
3777 Ok(mut response) => {
3778 response.consume().await.ok();
3779 }
3780 Err(error) => {
3781 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3782
3783 if error.is_received() {
3786 return;
3787 }
3788
3789 self.create_error_outcomes()
3790 }
3791 }
3792 })
3793 }
3794}
3795
3796#[derive(Copy, Clone, Debug)]
3798struct CombinedQuotas<'a> {
3799 global_quotas: &'a [Quota],
3800 project_quotas: &'a [Quota],
3801}
3802
3803impl<'a> CombinedQuotas<'a> {
3804 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3806 Self {
3807 global_quotas: &global_config.quotas,
3808 project_quotas,
3809 }
3810 }
3811
3812 pub fn is_empty(&self) -> bool {
3814 self.len() == 0
3815 }
3816
3817 pub fn len(&self) -> usize {
3819 self.global_quotas.len() + self.project_quotas.len()
3820 }
3821}
3822
3823impl<'a> IntoIterator for CombinedQuotas<'a> {
3824 type Item = &'a Quota;
3825 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3826
3827 fn into_iter(self) -> Self::IntoIter {
3828 self.global_quotas.iter().chain(self.project_quotas.iter())
3829 }
3830}
3831
3832#[cfg(test)]
3833mod tests {
3834 use std::collections::BTreeMap;
3835 use std::env;
3836
3837 use insta::assert_debug_snapshot;
3838 use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3839 use relay_common::glob2::LazyGlob;
3840 use relay_dynamic_config::ProjectConfig;
3841 use relay_event_normalization::{RedactionRule, TransactionNameRule};
3842 use relay_event_schema::protocol::TransactionSource;
3843 use relay_pii::DataScrubbingConfig;
3844 use similar_asserts::assert_eq;
3845
3846 use crate::metrics_extraction::IntoMetric;
3847 use crate::metrics_extraction::transactions::types::{
3848 CommonTags, TransactionMeasurementTags, TransactionMetric,
3849 };
3850 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
3851
3852 #[cfg(feature = "processing")]
3853 use {
3854 relay_metrics::BucketValue,
3855 relay_quotas::{QuotaScope, ReasonCode},
3856 relay_test::mock_service,
3857 };
3858
3859 use super::*;
3860
3861 #[cfg(feature = "processing")]
3862 fn mock_quota(id: &str) -> Quota {
3863 Quota {
3864 id: Some(id.into()),
3865 categories: smallvec::smallvec![DataCategory::MetricBucket],
3866 scope: QuotaScope::Organization,
3867 scope_id: None,
3868 limit: Some(0),
3869 window: None,
3870 reason_code: None,
3871 namespace: None,
3872 }
3873 }
3874
3875 #[cfg(feature = "processing")]
3876 #[test]
3877 fn test_dynamic_quotas() {
3878 let global_config = GlobalConfig {
3879 quotas: vec![mock_quota("foo"), mock_quota("bar")],
3880 ..Default::default()
3881 };
3882
3883 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3884
3885 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3886
3887 assert_eq!(dynamic_quotas.len(), 4);
3888 assert!(!dynamic_quotas.is_empty());
3889
3890 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3891 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3892 }
3893
3894 #[cfg(feature = "processing")]
3897 #[tokio::test]
3898 async fn test_ratelimit_per_batch() {
3899 use relay_base_schema::organization::OrganizationId;
3900 use relay_protocol::FiniteF64;
3901
3902 let rate_limited_org = Scoping {
3903 organization_id: OrganizationId::new(1),
3904 project_id: ProjectId::new(21),
3905 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3906 key_id: Some(17),
3907 };
3908
3909 let not_rate_limited_org = Scoping {
3910 organization_id: OrganizationId::new(2),
3911 project_id: ProjectId::new(21),
3912 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3913 key_id: Some(17),
3914 };
3915
3916 let message = {
3917 let project_info = {
3918 let quota = Quota {
3919 id: Some("testing".into()),
3920 categories: vec![DataCategory::MetricBucket].into(),
3921 scope: relay_quotas::QuotaScope::Organization,
3922 scope_id: Some(rate_limited_org.organization_id.to_string()),
3923 limit: Some(0),
3924 window: None,
3925 reason_code: Some(ReasonCode::new("test")),
3926 namespace: None,
3927 };
3928
3929 let mut config = ProjectConfig::default();
3930 config.quotas.push(quota);
3931
3932 Arc::new(ProjectInfo {
3933 config,
3934 ..Default::default()
3935 })
3936 };
3937
3938 let project_metrics = |scoping| ProjectBuckets {
3939 buckets: vec![Bucket {
3940 name: "d:transactions/bar".into(),
3941 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3942 timestamp: UnixTimestamp::now(),
3943 tags: Default::default(),
3944 width: 10,
3945 metadata: BucketMetadata::default(),
3946 }],
3947 rate_limits: Default::default(),
3948 project_info: project_info.clone(),
3949 scoping,
3950 };
3951
3952 let buckets = hashbrown::HashMap::from([
3953 (
3954 rate_limited_org.project_key,
3955 project_metrics(rate_limited_org),
3956 ),
3957 (
3958 not_rate_limited_org.project_key,
3959 project_metrics(not_rate_limited_org),
3960 ),
3961 ]);
3962
3963 FlushBuckets {
3964 partition_key: 0,
3965 buckets,
3966 }
3967 };
3968
3969 assert_eq!(message.buckets.keys().count(), 2);
3971
3972 let config = {
3973 let config_json = serde_json::json!({
3974 "processing": {
3975 "enabled": true,
3976 "kafka_config": [],
3977 "redis": {
3978 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3979 }
3980 }
3981 });
3982 Config::from_json_value(config_json).unwrap()
3983 };
3984
3985 let (store, handle) = {
3986 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3987 let org_id = match msg {
3988 Store::Metrics(x) => x.scoping.organization_id,
3989 _ => panic!("received envelope when expecting only metrics"),
3990 };
3991 org_ids.push(org_id);
3992 };
3993
3994 mock_service("store_forwarder", vec![], f)
3995 };
3996
3997 let processor = create_test_processor(config).await;
3998 assert!(processor.redis_rate_limiter_enabled());
3999
4000 processor.encode_metrics_processing(message, &store).await;
4001
4002 drop(store);
4003 let orgs_not_ratelimited = handle.await.unwrap();
4004
4005 assert_eq!(
4006 orgs_not_ratelimited,
4007 vec![not_rate_limited_org.organization_id]
4008 );
4009 }
4010
4011 #[tokio::test]
4012 async fn test_browser_version_extraction_with_pii_like_data() {
4013 let processor = create_test_processor(Default::default()).await;
4014 let outcome_aggregator = Addr::dummy();
4015 let event_id = EventId::new();
4016
4017 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
4018 .parse()
4019 .unwrap();
4020
4021 let request_meta = RequestMeta::new(dsn);
4022 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
4023
4024 envelope.add_item({
4025 let mut item = Item::new(ItemType::Event);
4026 item.set_payload(
4027 ContentType::Json,
4028 r#"
4029 {
4030 "request": {
4031 "headers": [
4032 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
4033 ]
4034 }
4035 }
4036 "#,
4037 );
4038 item
4039 });
4040
4041 let mut datascrubbing_settings = DataScrubbingConfig::default();
4042 datascrubbing_settings.scrub_data = true;
4044 datascrubbing_settings.scrub_defaults = true;
4045 datascrubbing_settings.scrub_ip_addresses = true;
4046
4047 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
4049
4050 let config = ProjectConfig {
4051 datascrubbing_settings,
4052 pii_config: Some(pii_config),
4053 ..Default::default()
4054 };
4055
4056 let project_info = ProjectInfo {
4057 config,
4058 ..Default::default()
4059 };
4060
4061 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
4062 assert_eq!(envelopes.len(), 1);
4063
4064 let (group, envelope) = envelopes.pop().unwrap();
4065 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
4066
4067 let message = ProcessEnvelopeGrouped {
4068 group,
4069 envelope,
4070 project_info: Arc::new(project_info),
4071 rate_limits: Default::default(),
4072 sampling_project_info: None,
4073 reservoir_counters: ReservoirCounters::default(),
4074 };
4075
4076 let Ok(Some(Submit::Envelope(mut new_envelope))) =
4077 processor.process(&mut Token::noop(), message).await
4078 else {
4079 panic!();
4080 };
4081 let new_envelope = new_envelope.envelope_mut();
4082
4083 let event_item = new_envelope.items().last().unwrap();
4084 let annotated_event: Annotated<Event> =
4085 Annotated::from_json_bytes(&event_item.payload()).unwrap();
4086 let event = annotated_event.into_value().unwrap();
4087 let headers = event
4088 .request
4089 .into_value()
4090 .unwrap()
4091 .headers
4092 .into_value()
4093 .unwrap();
4094
4095 assert_eq!(
4097 Some(
4098 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
4099 ),
4100 headers.get_header("User-Agent")
4101 );
4102 let contexts = event.contexts.into_value().unwrap();
4104 let browser = contexts.0.get("browser").unwrap();
4105 assert_eq!(
4106 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
4107 browser.to_json().unwrap()
4108 );
4109 }
4110
4111 #[tokio::test]
4112 #[cfg(feature = "processing")]
4113 async fn test_materialize_dsc() {
4114 use crate::services::projects::project::PublicKeyConfig;
4115
4116 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
4117 .parse()
4118 .unwrap();
4119 let request_meta = RequestMeta::new(dsn);
4120 let mut envelope = Envelope::from_request(None, request_meta);
4121
4122 let dsc = r#"{
4123 "trace_id": "00000000-0000-0000-0000-000000000000",
4124 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
4125 "sample_rate": "0.2"
4126 }"#;
4127 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
4128
4129 let mut item = Item::new(ItemType::Event);
4130 item.set_payload(ContentType::Json, r#"{}"#);
4131 envelope.add_item(item);
4132
4133 let outcome_aggregator = Addr::dummy();
4134 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
4135
4136 let mut project_info = ProjectInfo::default();
4137 project_info.public_keys.push(PublicKeyConfig {
4138 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
4139 numeric_id: Some(1),
4140 });
4141 let project_info = Arc::new(project_info);
4142
4143 let message = ProcessEnvelopeGrouped {
4144 group: ProcessingGroup::Transaction,
4145 envelope: managed_envelope,
4146 project_info: project_info.clone(),
4147 rate_limits: Default::default(),
4148 sampling_project_info: Some(project_info),
4149 reservoir_counters: ReservoirCounters::default(),
4150 };
4151
4152 let config = Config::from_json_value(serde_json::json!({
4153 "processing": {
4154 "enabled": true,
4155 "kafka_config": [],
4156 }
4157 }))
4158 .unwrap();
4159
4160 let processor = create_test_processor(config).await;
4161 let Ok(Some(Submit::Envelope(envelope))) =
4162 processor.process(&mut Token::noop(), message).await
4163 else {
4164 panic!();
4165 };
4166 let event = envelope
4167 .envelope()
4168 .get_item_by(|item| item.ty() == &ItemType::Event)
4169 .unwrap();
4170
4171 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
4172 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
4173 Object(
4174 {
4175 "environment": ~,
4176 "public_key": String(
4177 "e12d836b15bb49d7bbf99e64295d995b",
4178 ),
4179 "release": ~,
4180 "replay_id": ~,
4181 "sample_rate": String(
4182 "0.2",
4183 ),
4184 "trace_id": String(
4185 "00000000000000000000000000000000",
4186 ),
4187 "transaction": ~,
4188 },
4189 )
4190 "###);
4191 }
4192
4193 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
4194 let mut event = Annotated::<Event>::from_json(
4195 r#"
4196 {
4197 "type": "transaction",
4198 "transaction": "/foo/",
4199 "timestamp": 946684810.0,
4200 "start_timestamp": 946684800.0,
4201 "contexts": {
4202 "trace": {
4203 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
4204 "span_id": "fa90fdead5f74053",
4205 "op": "http.server",
4206 "type": "trace"
4207 }
4208 },
4209 "transaction_info": {
4210 "source": "url"
4211 }
4212 }
4213 "#,
4214 )
4215 .unwrap();
4216 let e = event.value_mut().as_mut().unwrap();
4217 e.transaction.set_value(Some(transaction_name.into()));
4218
4219 e.transaction_info
4220 .value_mut()
4221 .as_mut()
4222 .unwrap()
4223 .source
4224 .set_value(Some(source));
4225
4226 relay_statsd::with_capturing_test_client(|| {
4227 utils::log_transaction_name_metrics(&mut event, |event| {
4228 let config = NormalizationConfig {
4229 transaction_name_config: TransactionNameConfig {
4230 rules: &[TransactionNameRule {
4231 pattern: LazyGlob::new("/foo/*/**".to_owned()),
4232 expiry: DateTime::<Utc>::MAX_UTC,
4233 redaction: RedactionRule::Replace {
4234 substitution: "*".to_owned(),
4235 },
4236 }],
4237 },
4238 ..Default::default()
4239 };
4240 normalize_event(event, &config)
4241 });
4242 })
4243 }
4244
4245 #[test]
4246 fn test_log_transaction_metrics_none() {
4247 let captures = capture_test_event("/nothing", TransactionSource::Url);
4248 insta::assert_debug_snapshot!(captures, @r###"
4249 [
4250 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
4251 ]
4252 "###);
4253 }
4254
4255 #[test]
4256 fn test_log_transaction_metrics_rule() {
4257 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
4258 insta::assert_debug_snapshot!(captures, @r###"
4259 [
4260 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
4261 ]
4262 "###);
4263 }
4264
4265 #[test]
4266 fn test_log_transaction_metrics_pattern() {
4267 let captures = capture_test_event("/something/12345", TransactionSource::Url);
4268 insta::assert_debug_snapshot!(captures, @r###"
4269 [
4270 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
4271 ]
4272 "###);
4273 }
4274
4275 #[test]
4276 fn test_log_transaction_metrics_both() {
4277 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
4278 insta::assert_debug_snapshot!(captures, @r###"
4279 [
4280 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
4281 ]
4282 "###);
4283 }
4284
4285 #[test]
4286 fn test_log_transaction_metrics_no_match() {
4287 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
4288 insta::assert_debug_snapshot!(captures, @r###"
4289 [
4290 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
4291 ]
4292 "###);
4293 }
4294
4295 #[test]
4299 fn test_mri_overhead_constant() {
4300 let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
4301
4302 let derived_value = {
4303 let name = "foobar".to_owned();
4304 let value = 5.into(); let unit = MetricUnit::Duration(DurationUnit::default());
4306 let tags = TransactionMeasurementTags {
4307 measurement_rating: None,
4308 universal_tags: CommonTags(BTreeMap::new()),
4309 score_profile_version: None,
4310 };
4311
4312 let measurement = TransactionMetric::Measurement {
4313 name: name.clone(),
4314 value,
4315 unit,
4316 tags,
4317 };
4318
4319 let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
4320 metric.name.len() - unit.to_string().len() - name.len()
4321 };
4322 assert_eq!(
4323 hardcoded_value, derived_value,
4324 "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
4325 );
4326 }
4327
4328 #[tokio::test]
4329 async fn test_process_metrics_bucket_metadata() {
4330 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
4331 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
4332 let received_at = Utc::now();
4333 let config = Config::default();
4334
4335 let (aggregator, mut aggregator_rx) = Addr::custom();
4336 let processor = create_test_processor_with_addrs(
4337 config,
4338 Addrs {
4339 aggregator,
4340 ..Default::default()
4341 },
4342 )
4343 .await;
4344
4345 let mut item = Item::new(ItemType::Statsd);
4346 item.set_payload(
4347 ContentType::Text,
4348 "transactions/foo:3182887624:4267882815|s",
4349 );
4350 for (source, expected_received_at) in [
4351 (
4352 BucketSource::External,
4353 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
4354 ),
4355 (BucketSource::Internal, None),
4356 ] {
4357 let message = ProcessMetrics {
4358 data: MetricData::Raw(vec![item.clone()]),
4359 project_key,
4360 source,
4361 received_at,
4362 sent_at: Some(Utc::now()),
4363 };
4364 processor.handle_process_metrics(&mut token, message);
4365
4366 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
4367 let buckets = merge_buckets.buckets;
4368 assert_eq!(buckets.len(), 1);
4369 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
4370 }
4371 }
4372
4373 #[tokio::test]
4374 async fn test_process_batched_metrics() {
4375 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
4376 let received_at = Utc::now();
4377 let config = Config::default();
4378
4379 let (aggregator, mut aggregator_rx) = Addr::custom();
4380 let processor = create_test_processor_with_addrs(
4381 config,
4382 Addrs {
4383 aggregator,
4384 ..Default::default()
4385 },
4386 )
4387 .await;
4388
4389 let payload = r#"{
4390 "buckets": {
4391 "11111111111111111111111111111111": [
4392 {
4393 "timestamp": 1615889440,
4394 "width": 0,
4395 "name": "d:custom/endpoint.response_time@millisecond",
4396 "type": "d",
4397 "value": [
4398 68.0
4399 ],
4400 "tags": {
4401 "route": "user_index"
4402 }
4403 }
4404 ],
4405 "22222222222222222222222222222222": [
4406 {
4407 "timestamp": 1615889440,
4408 "width": 0,
4409 "name": "d:custom/endpoint.cache_rate@none",
4410 "type": "d",
4411 "value": [
4412 36.0
4413 ]
4414 }
4415 ]
4416 }
4417}
4418"#;
4419 let message = ProcessBatchedMetrics {
4420 payload: Bytes::from(payload),
4421 source: BucketSource::Internal,
4422 received_at,
4423 sent_at: Some(Utc::now()),
4424 };
4425 processor.handle_process_batched_metrics(&mut token, message);
4426
4427 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
4428 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
4429
4430 let mut messages = vec![mb1, mb2];
4431 messages.sort_by_key(|mb| mb.project_key);
4432
4433 let actual = messages
4434 .into_iter()
4435 .map(|mb| (mb.project_key, mb.buckets))
4436 .collect::<Vec<_>>();
4437
4438 assert_debug_snapshot!(actual, @r###"
4439 [
4440 (
4441 ProjectKey("11111111111111111111111111111111"),
4442 [
4443 Bucket {
4444 timestamp: UnixTimestamp(1615889440),
4445 width: 0,
4446 name: MetricName(
4447 "d:custom/endpoint.response_time@millisecond",
4448 ),
4449 value: Distribution(
4450 [
4451 68.0,
4452 ],
4453 ),
4454 tags: {
4455 "route": "user_index",
4456 },
4457 metadata: BucketMetadata {
4458 merges: 1,
4459 received_at: None,
4460 extracted_from_indexed: false,
4461 },
4462 },
4463 ],
4464 ),
4465 (
4466 ProjectKey("22222222222222222222222222222222"),
4467 [
4468 Bucket {
4469 timestamp: UnixTimestamp(1615889440),
4470 width: 0,
4471 name: MetricName(
4472 "d:custom/endpoint.cache_rate@none",
4473 ),
4474 value: Distribution(
4475 [
4476 36.0,
4477 ],
4478 ),
4479 tags: {},
4480 metadata: BucketMetadata {
4481 merges: 1,
4482 received_at: None,
4483 extracted_from_indexed: false,
4484 },
4485 },
4486 ],
4487 ),
4488 ]
4489 "###);
4490 }
4491}