1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::{Arc, Once};
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, NormalizationLevel, RelayMode};
23use relay_dynamic_config::{CombinedMetricExtractionConfig, ErrorBoundary, Feature, GlobalConfig};
24use relay_event_normalization::{
25 ClockDriftProcessor, CombinedMeasurementsConfig, EventValidationConfig, GeoIpLookup,
26 MeasurementsConfig, NormalizationConfig, RawUserAgentInfo, TransactionNameConfig,
27 normalize_event, validate_event,
28};
29use relay_event_schema::processor::ProcessingAction;
30use relay_event_schema::protocol::{
31 ClientReport, Event, EventId, EventType, IpAddr, Metrics, NetworkReportError,
32};
33use relay_filter::FilterStatKey;
34use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
35use relay_pii::PiiConfigError;
36use relay_protocol::{Annotated, Empty};
37use relay_quotas::{DataCategory, Quota, RateLimits, Scoping};
38use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
39use relay_statsd::metric;
40use relay_system::{Addr, FromMessage, NoResponse, Service};
41use reqwest::header;
42use smallvec::{SmallVec, smallvec};
43use zstd::stream::Encoder as ZstdEncoder;
44
45use crate::constants::DEFAULT_EVENT_RETENTION;
46use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType};
47use crate::extractors::{PartialDsn, RequestMeta};
48use crate::managed::{InvalidProcessingGroupType, ManagedEnvelope, TypedEnvelope};
49use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
50use crate::metrics_extraction::transactions::types::ExtractMetricsError;
51use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor};
52use crate::processing::logs::{LogOutput, LogsProcessor};
53use crate::processing::{Forward as _, Output, Processor as _, QuotaRateLimiter};
54use crate::service::ServiceError;
55use crate::services::global_config::GlobalConfigHandle;
56use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
57use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
58use crate::services::processor::event::FiltersStatus;
59use crate::services::projects::cache::ProjectCacheHandle;
60use crate::services::projects::project::{ProjectInfo, ProjectState};
61use crate::services::test_store::{Capture, TestStore};
62use crate::services::upstream::{
63 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
64};
65use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
66use crate::utils::{self, CheckLimits, EnvelopeLimiter, SamplingResult};
67use crate::{http, processing};
68use relay_base_schema::organization::OrganizationId;
69use relay_threading::AsyncPool;
70#[cfg(feature = "processing")]
71use {
72 crate::managed::ItemAction,
73 crate::services::global_rate_limits::{GlobalRateLimits, GlobalRateLimitsServiceHandle},
74 crate::services::processor::nnswitch::SwitchProcessingError,
75 crate::services::store::{Store, StoreEnvelope},
76 crate::utils::Enforcement,
77 itertools::Itertools,
78 relay_cardinality::{
79 CardinalityLimit, CardinalityLimiter, CardinalityLimitsSplit, RedisSetLimiter,
80 RedisSetLimiterOptions,
81 },
82 relay_dynamic_config::{CardinalityLimiterMode, MetricExtractionGroups},
83 relay_quotas::{RateLimitingError, RedisRateLimiter},
84 relay_redis::{AsyncRedisClient, RedisClients},
85 std::time::Instant,
86 symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
87};
88
89mod attachment;
90mod dynamic_sampling;
91mod event;
92mod metrics;
93mod nel;
94mod profile;
95mod profile_chunk;
96mod replay;
97mod report;
98mod session;
99mod span;
100mod transaction;
101pub use span::extract_transaction_span;
102
103#[cfg(all(sentry, feature = "processing"))]
104mod playstation;
105mod standalone;
106#[cfg(feature = "processing")]
107mod unreal;
108
109#[cfg(feature = "processing")]
110mod nnswitch;
111
112macro_rules! if_processing {
116 ($config:expr, $if_true:block) => {
117 #[cfg(feature = "processing")] {
118 if $config.processing_enabled() $if_true
119 }
120 };
121 ($config:expr, $if_true:block else $if_false:block) => {
122 {
123 #[cfg(feature = "processing")] {
124 if $config.processing_enabled() $if_true else $if_false
125 }
126 #[cfg(not(feature = "processing"))] {
127 $if_false
128 }
129 }
130 };
131}
132
133const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
135
136#[derive(Debug)]
137pub struct GroupTypeError;
138
139impl Display for GroupTypeError {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 f.write_str("failed to convert processing group into corresponding type")
142 }
143}
144
145impl std::error::Error for GroupTypeError {}
146
147macro_rules! processing_group {
148 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
149 #[derive(Clone, Copy, Debug)]
150 pub struct $ty;
151
152 impl From<$ty> for ProcessingGroup {
153 fn from(_: $ty) -> Self {
154 ProcessingGroup::$variant
155 }
156 }
157
158 impl TryFrom<ProcessingGroup> for $ty {
159 type Error = GroupTypeError;
160
161 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
162 if matches!(value, ProcessingGroup::$variant) {
163 return Ok($ty);
164 }
165 $($(
166 if matches!(value, ProcessingGroup::$other) {
167 return Ok($ty);
168 }
169 )+)?
170 return Err(GroupTypeError);
171 }
172 }
173 };
174}
175
176pub trait EventProcessing {}
180
181pub trait Sampling {
183 fn supports_sampling(project_info: &ProjectInfo) -> bool;
185
186 fn supports_reservoir_sampling() -> bool;
188}
189
190processing_group!(TransactionGroup, Transaction);
191impl EventProcessing for TransactionGroup {}
192
193impl Sampling for TransactionGroup {
194 fn supports_sampling(project_info: &ProjectInfo) -> bool {
195 matches!(&project_info.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled())
197 }
198
199 fn supports_reservoir_sampling() -> bool {
200 true
201 }
202}
203
204processing_group!(ErrorGroup, Error);
205impl EventProcessing for ErrorGroup {}
206
207processing_group!(SessionGroup, Session);
208processing_group!(StandaloneGroup, Standalone);
209processing_group!(ClientReportGroup, ClientReport);
210processing_group!(ReplayGroup, Replay);
211processing_group!(CheckInGroup, CheckIn);
212processing_group!(LogGroup, Log, Nel);
213processing_group!(SpanGroup, Span);
214
215impl Sampling for SpanGroup {
216 fn supports_sampling(project_info: &ProjectInfo) -> bool {
217 matches!(&project_info.config().metric_extraction, ErrorBoundary::Ok(c) if c.is_supported())
219 }
220
221 fn supports_reservoir_sampling() -> bool {
222 false
223 }
224}
225
226processing_group!(ProfileChunkGroup, ProfileChunk);
227processing_group!(MetricsGroup, Metrics);
228processing_group!(ForwardUnknownGroup, ForwardUnknown);
229processing_group!(Ungrouped, Ungrouped);
230
231#[derive(Clone, Copy, Debug)]
235pub struct Processed;
236
237#[derive(Clone, Copy, Debug)]
239pub enum ProcessingGroup {
240 Transaction,
244 Error,
249 Session,
251 Standalone,
254 ClientReport,
256 Replay,
258 CheckIn,
260 Nel,
262 Log,
264 Span,
266 Metrics,
268 ProfileChunk,
270 ForwardUnknown,
273 Ungrouped,
275}
276
277impl ProcessingGroup {
278 pub fn split_envelope(mut envelope: Envelope) -> SmallVec<[(Self, Box<Envelope>); 3]> {
280 let headers = envelope.headers().clone();
281 let mut grouped_envelopes = smallvec![];
282
283 let replay_items = envelope.take_items_by(|item| {
285 matches!(
286 item.ty(),
287 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
288 )
289 });
290 if !replay_items.is_empty() {
291 grouped_envelopes.push((
292 ProcessingGroup::Replay,
293 Envelope::from_parts(headers.clone(), replay_items),
294 ))
295 }
296
297 let session_items = envelope
299 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
300 if !session_items.is_empty() {
301 grouped_envelopes.push((
302 ProcessingGroup::Session,
303 Envelope::from_parts(headers.clone(), session_items),
304 ))
305 }
306
307 let span_items = envelope.take_items_by(|item| {
309 matches!(
310 item.ty(),
311 &ItemType::Span | &ItemType::OtelSpan | &ItemType::OtelTracesData
312 )
313 });
314
315 if !span_items.is_empty() {
316 grouped_envelopes.push((
317 ProcessingGroup::Span,
318 Envelope::from_parts(headers.clone(), span_items),
319 ))
320 }
321
322 let logs_items =
324 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Log | &ItemType::OtelLog));
325
326 if !logs_items.is_empty() {
327 grouped_envelopes.push((
328 ProcessingGroup::Log,
329 Envelope::from_parts(headers.clone(), logs_items),
330 ))
331 }
332
333 let nel_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Nel));
335 if !nel_items.is_empty() {
336 grouped_envelopes.push((
337 ProcessingGroup::Nel,
338 Envelope::from_parts(headers.clone(), nel_items),
339 ))
340 }
341
342 let metric_items = envelope.take_items_by(|i| i.ty().is_metrics());
347 if !metric_items.is_empty() {
348 grouped_envelopes.push((
349 ProcessingGroup::Metrics,
350 Envelope::from_parts(headers.clone(), metric_items),
351 ))
352 }
353
354 let profile_chunk_items =
356 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
357 if !profile_chunk_items.is_empty() {
358 grouped_envelopes.push((
359 ProcessingGroup::ProfileChunk,
360 Envelope::from_parts(headers.clone(), profile_chunk_items),
361 ))
362 }
363
364 if !envelope.items().any(Item::creates_event) {
369 let standalone_items = envelope.take_items_by(Item::requires_event);
370 if !standalone_items.is_empty() {
371 grouped_envelopes.push((
372 ProcessingGroup::Standalone,
373 Envelope::from_parts(headers.clone(), standalone_items),
374 ))
375 }
376 };
377
378 let security_reports_items = envelope
380 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
381 .into_iter()
382 .map(|item| {
383 let headers = headers.clone();
384 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
385 let mut envelope = Envelope::from_parts(headers, items);
386 envelope.set_event_id(EventId::new());
387 (ProcessingGroup::Error, envelope)
388 });
389 grouped_envelopes.extend(security_reports_items);
390
391 let require_event_items = envelope.take_items_by(Item::requires_event);
393 if !require_event_items.is_empty() {
394 let group = if require_event_items
395 .iter()
396 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
397 {
398 ProcessingGroup::Transaction
399 } else {
400 ProcessingGroup::Error
401 };
402
403 grouped_envelopes.push((
404 group,
405 Envelope::from_parts(headers.clone(), require_event_items),
406 ))
407 }
408
409 let envelopes = envelope.items_mut().map(|item| {
411 let headers = headers.clone();
412 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
413 let envelope = Envelope::from_parts(headers, items);
414 let item_type = item.ty();
415 let group = if matches!(item_type, &ItemType::CheckIn) {
416 ProcessingGroup::CheckIn
417 } else if matches!(item.ty(), &ItemType::ClientReport) {
418 ProcessingGroup::ClientReport
419 } else if matches!(item_type, &ItemType::Unknown(_)) {
420 ProcessingGroup::ForwardUnknown
421 } else {
422 ProcessingGroup::Ungrouped
424 };
425
426 (group, envelope)
427 });
428 grouped_envelopes.extend(envelopes);
429
430 grouped_envelopes
431 }
432
433 pub fn variant(&self) -> &'static str {
435 match self {
436 ProcessingGroup::Transaction => "transaction",
437 ProcessingGroup::Error => "error",
438 ProcessingGroup::Session => "session",
439 ProcessingGroup::Standalone => "standalone",
440 ProcessingGroup::ClientReport => "client_report",
441 ProcessingGroup::Replay => "replay",
442 ProcessingGroup::CheckIn => "check_in",
443 ProcessingGroup::Log => "log",
444 ProcessingGroup::Nel => "nel",
445 ProcessingGroup::Span => "span",
446 ProcessingGroup::Metrics => "metrics",
447 ProcessingGroup::ProfileChunk => "profile_chunk",
448 ProcessingGroup::ForwardUnknown => "forward_unknown",
449 ProcessingGroup::Ungrouped => "ungrouped",
450 }
451 }
452}
453
454impl From<ProcessingGroup> for AppFeature {
455 fn from(value: ProcessingGroup) -> Self {
456 match value {
457 ProcessingGroup::Transaction => AppFeature::Transactions,
458 ProcessingGroup::Error => AppFeature::Errors,
459 ProcessingGroup::Session => AppFeature::Sessions,
460 ProcessingGroup::Standalone => AppFeature::UnattributedEnvelope,
461 ProcessingGroup::ClientReport => AppFeature::ClientReports,
462 ProcessingGroup::Replay => AppFeature::Replays,
463 ProcessingGroup::CheckIn => AppFeature::CheckIns,
464 ProcessingGroup::Log => AppFeature::Logs,
465 ProcessingGroup::Nel => AppFeature::Logs,
466 ProcessingGroup::Span => AppFeature::Spans,
467 ProcessingGroup::Metrics => AppFeature::UnattributedMetrics,
468 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
469 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
470 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
471 }
472 }
473}
474
475#[derive(Debug, thiserror::Error)]
477pub enum ProcessingError {
478 #[error("invalid json in event")]
479 InvalidJson(#[source] serde_json::Error),
480
481 #[error("invalid message pack event payload")]
482 InvalidMsgpack(#[from] rmp_serde::decode::Error),
483
484 #[cfg(feature = "processing")]
485 #[error("invalid unreal crash report")]
486 InvalidUnrealReport(#[source] Unreal4Error),
487
488 #[error("event payload too large")]
489 PayloadTooLarge(DiscardItemType),
490
491 #[error("invalid transaction event")]
492 InvalidTransaction,
493
494 #[error("envelope processor failed")]
495 ProcessingFailed(#[from] ProcessingAction),
496
497 #[error("duplicate {0} in event")]
498 DuplicateItem(ItemType),
499
500 #[error("failed to extract event payload")]
501 NoEventPayload,
502
503 #[error("missing project id in DSN")]
504 MissingProjectId,
505
506 #[error("invalid security report type: {0:?}")]
507 InvalidSecurityType(Bytes),
508
509 #[error("unsupported security report type")]
510 UnsupportedSecurityType,
511
512 #[error("invalid security report")]
513 InvalidSecurityReport(#[source] serde_json::Error),
514
515 #[error("invalid nel report")]
516 InvalidNelReport(#[source] NetworkReportError),
517
518 #[error("event filtered with reason: {0:?}")]
519 EventFiltered(FilterStatKey),
520
521 #[error("missing or invalid required event timestamp")]
522 InvalidTimestamp,
523
524 #[error("could not serialize event payload")]
525 SerializeFailed(#[source] serde_json::Error),
526
527 #[cfg(feature = "processing")]
528 #[error("failed to apply quotas")]
529 QuotasFailed(#[from] RateLimitingError),
530
531 #[error("invalid pii config")]
532 PiiConfigError(PiiConfigError),
533
534 #[error("invalid processing group type")]
535 InvalidProcessingGroup(Box<InvalidProcessingGroupType>),
536
537 #[error("invalid replay")]
538 InvalidReplay(DiscardReason),
539
540 #[error("replay filtered with reason: {0:?}")]
541 ReplayFiltered(FilterStatKey),
542
543 #[cfg(feature = "processing")]
544 #[error("nintendo switch dying message processing failed {0:?}")]
545 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
546
547 #[cfg(all(sentry, feature = "processing"))]
548 #[error("playstation dump processing failed: {0}")]
549 InvalidPlaystationDump(String),
550
551 #[error("processing group does not match specific processor")]
552 ProcessingGroupMismatch,
553 #[error("new processing pipeline failed")]
554 ProcessingFailure,
555 #[error("failed to serialize processing result to an envelope")]
556 ProcessingEnvelopeSerialization,
557}
558
559impl ProcessingError {
560 pub fn to_outcome(&self) -> Option<Outcome> {
561 match self {
562 Self::PayloadTooLarge(payload_type) => {
563 Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
564 }
565 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
566 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
567 Self::InvalidSecurityType(_) => {
568 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
569 }
570 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
571 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
572 Self::InvalidNelReport(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
573 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
574 Self::InvalidTimestamp => Some(Outcome::Invalid(DiscardReason::Timestamp)),
575 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
576 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
577 #[cfg(feature = "processing")]
578 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
579 #[cfg(all(sentry, feature = "processing"))]
580 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
581 #[cfg(feature = "processing")]
582 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
583 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
584 }
585 #[cfg(feature = "processing")]
586 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
587 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
588 Some(Outcome::Invalid(DiscardReason::Internal))
589 }
590 #[cfg(feature = "processing")]
591 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
592 Self::PiiConfigError(_) => Some(Outcome::Invalid(DiscardReason::ProjectStatePii)),
593 Self::MissingProjectId => None,
594 Self::EventFiltered(_) => None,
595 Self::InvalidProcessingGroup(_) => None,
596 Self::InvalidReplay(reason) => Some(Outcome::Invalid(*reason)),
597 Self::ReplayFiltered(key) => Some(Outcome::Filtered(key.clone())),
598
599 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
600 Self::ProcessingFailure => None,
602 Self::ProcessingEnvelopeSerialization => {
603 Some(Outcome::Invalid(DiscardReason::Internal))
604 }
605 }
606 }
607
608 fn is_unexpected(&self) -> bool {
609 self.to_outcome()
610 .is_some_and(|outcome| outcome.is_unexpected())
611 }
612}
613
614#[cfg(feature = "processing")]
615impl From<Unreal4Error> for ProcessingError {
616 fn from(err: Unreal4Error) -> Self {
617 match err.kind() {
618 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
619 _ => ProcessingError::InvalidUnrealReport(err),
620 }
621 }
622}
623
624impl From<ExtractMetricsError> for ProcessingError {
625 fn from(error: ExtractMetricsError) -> Self {
626 match error {
627 ExtractMetricsError::MissingTimestamp | ExtractMetricsError::InvalidTimestamp => {
628 Self::InvalidTimestamp
629 }
630 }
631 }
632}
633
634impl From<InvalidProcessingGroupType> for ProcessingError {
635 fn from(value: InvalidProcessingGroupType) -> Self {
636 Self::InvalidProcessingGroup(Box::new(value))
637 }
638}
639
640type ExtractedEvent = (Annotated<Event>, usize);
641
642#[derive(Debug)]
647pub struct ProcessingExtractedMetrics {
648 metrics: ExtractedMetrics,
649}
650
651impl ProcessingExtractedMetrics {
652 pub fn new() -> Self {
653 Self {
654 metrics: ExtractedMetrics::default(),
655 }
656 }
657
658 pub fn extend(
660 &mut self,
661 extracted: ExtractedMetrics,
662 sampling_decision: Option<SamplingDecision>,
663 ) {
664 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
665 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
666 }
667
668 pub fn extend_project_metrics<I>(
670 &mut self,
671 buckets: I,
672 sampling_decision: Option<SamplingDecision>,
673 ) where
674 I: IntoIterator<Item = Bucket>,
675 {
676 self.metrics
677 .project_metrics
678 .extend(buckets.into_iter().map(|mut bucket| {
679 bucket.metadata.extracted_from_indexed =
680 sampling_decision == Some(SamplingDecision::Keep);
681 bucket
682 }));
683 }
684
685 pub fn extend_sampling_metrics<I>(
687 &mut self,
688 buckets: I,
689 sampling_decision: Option<SamplingDecision>,
690 ) where
691 I: IntoIterator<Item = Bucket>,
692 {
693 self.metrics
694 .sampling_metrics
695 .extend(buckets.into_iter().map(|mut bucket| {
696 bucket.metadata.extracted_from_indexed =
697 sampling_decision == Some(SamplingDecision::Keep);
698 bucket
699 }));
700 }
701
702 #[cfg(feature = "processing")]
707 fn apply_enforcement(&mut self, enforcement: &Enforcement, enforced_consistently: bool) {
708 let mut drop_namespaces: SmallVec<[_; 2]> = smallvec![];
710 let mut reset_extracted_from_indexed: SmallVec<[_; 2]> = smallvec![];
713
714 for (namespace, limit, indexed) in [
715 (
716 MetricNamespace::Transactions,
717 &enforcement.event,
718 &enforcement.event_indexed,
719 ),
720 (
721 MetricNamespace::Spans,
722 &enforcement.spans,
723 &enforcement.spans_indexed,
724 ),
725 ] {
726 if limit.is_active() {
727 drop_namespaces.push(namespace);
728 } else if indexed.is_active() && !enforced_consistently {
729 reset_extracted_from_indexed.push(namespace);
734 }
735 }
736
737 if !drop_namespaces.is_empty() || !reset_extracted_from_indexed.is_empty() {
738 self.retain_mut(|bucket| {
739 let Some(namespace) = bucket.name.try_namespace() else {
740 return true;
741 };
742
743 if drop_namespaces.contains(&namespace) {
744 return false;
745 }
746
747 if reset_extracted_from_indexed.contains(&namespace) {
748 bucket.metadata.extracted_from_indexed = false;
749 }
750
751 true
752 });
753 }
754 }
755
756 #[cfg(feature = "processing")]
757 fn retain_mut(&mut self, mut f: impl FnMut(&mut Bucket) -> bool) {
758 self.metrics.project_metrics.retain_mut(&mut f);
759 self.metrics.sampling_metrics.retain_mut(&mut f);
760 }
761}
762
763fn send_metrics(
764 metrics: ExtractedMetrics,
765 project_key: ProjectKey,
766 sampling_key: Option<ProjectKey>,
767 aggregator: &Addr<Aggregator>,
768) {
769 let ExtractedMetrics {
770 project_metrics,
771 sampling_metrics,
772 } = metrics;
773
774 if !project_metrics.is_empty() {
775 aggregator.send(MergeBuckets {
776 project_key,
777 buckets: project_metrics,
778 });
779 }
780
781 if !sampling_metrics.is_empty() {
782 let sampling_project_key = sampling_key.unwrap_or(project_key);
789 aggregator.send(MergeBuckets {
790 project_key: sampling_project_key,
791 buckets: sampling_metrics,
792 });
793 }
794}
795
796fn event_category(event: &Annotated<Event>) -> Option<DataCategory> {
801 event_type(event).map(DataCategory::from)
802}
803
804fn event_type(event: &Annotated<Event>) -> Option<EventType> {
809 event
810 .value()
811 .map(|event| event.ty.value().copied().unwrap_or_default())
812}
813
814fn should_filter(config: &Config, project_info: &ProjectInfo, feature: Feature) -> bool {
819 match config.relay_mode() {
820 RelayMode::Proxy | RelayMode::Static | RelayMode::Capture => false,
821 RelayMode::Managed => !project_info.has_feature(feature),
822 }
823}
824
825#[derive(Copy, Clone)]
827struct EventFullyNormalized(bool);
828
829impl EventFullyNormalized {
830 pub fn new(envelope: &Envelope) -> Self {
832 let event_fully_normalized = envelope.meta().is_from_internal_relay()
833 && envelope
834 .items()
835 .any(|item| item.creates_event() && item.fully_normalized());
836
837 Self(event_fully_normalized)
838 }
839}
840
841#[derive(Debug, Copy, Clone)]
843struct EventMetricsExtracted(bool);
844
845#[derive(Debug, Copy, Clone)]
847struct SpansExtracted(bool);
848
849#[expect(
852 clippy::large_enum_variant,
853 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
854)]
855#[derive(Debug)]
856enum ProcessingResult {
857 Envelope {
858 managed_envelope: TypedEnvelope<Processed>,
859 extracted_metrics: ProcessingExtractedMetrics,
860 },
861 Logs(Output<LogOutput>),
862}
863
864impl ProcessingResult {
865 fn no_metrics(managed_envelope: TypedEnvelope<Processed>) -> Self {
867 Self::Envelope {
868 managed_envelope,
869 extracted_metrics: ProcessingExtractedMetrics::new(),
870 }
871 }
872}
873
874#[expect(
876 clippy::large_enum_variant,
877 reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
878)]
879enum Submit {
880 Envelope(TypedEnvelope<Processed>),
882 Logs(LogOutput),
884}
885
886#[derive(Debug)]
896pub struct ProcessEnvelope {
897 pub envelope: ManagedEnvelope,
899 pub project_info: Arc<ProjectInfo>,
901 pub rate_limits: Arc<RateLimits>,
903 pub sampling_project_info: Option<Arc<ProjectInfo>>,
905 pub reservoir_counters: ReservoirCounters,
907}
908
909#[derive(Debug)]
911struct ProcessEnvelopeGrouped {
912 pub group: ProcessingGroup,
914 pub envelope: ManagedEnvelope,
916 pub project_info: Arc<ProjectInfo>,
918 pub rate_limits: Arc<RateLimits>,
920 pub sampling_project_info: Option<Arc<ProjectInfo>>,
922 pub reservoir_counters: ReservoirCounters,
924}
925
926#[derive(Debug)]
938pub struct ProcessMetrics {
939 pub data: MetricData,
941 pub project_key: ProjectKey,
943 pub source: BucketSource,
945 pub received_at: DateTime<Utc>,
947 pub sent_at: Option<DateTime<Utc>>,
950}
951
952#[derive(Debug)]
954pub enum MetricData {
955 Raw(Vec<Item>),
957 Parsed(Vec<Bucket>),
959}
960
961impl MetricData {
962 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
967 let items = match self {
968 Self::Parsed(buckets) => return buckets,
969 Self::Raw(items) => items,
970 };
971
972 let mut buckets = Vec::new();
973 for item in items {
974 let payload = item.payload();
975 if item.ty() == &ItemType::Statsd {
976 for bucket_result in Bucket::parse_all(&payload, timestamp) {
977 match bucket_result {
978 Ok(bucket) => buckets.push(bucket),
979 Err(error) => relay_log::debug!(
980 error = &error as &dyn Error,
981 "failed to parse metric bucket from statsd format",
982 ),
983 }
984 }
985 } else if item.ty() == &ItemType::MetricBuckets {
986 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
987 Ok(parsed_buckets) => {
988 if buckets.is_empty() {
990 buckets = parsed_buckets;
991 } else {
992 buckets.extend(parsed_buckets);
993 }
994 }
995 Err(error) => {
996 relay_log::debug!(
997 error = &error as &dyn Error,
998 "failed to parse metric bucket",
999 );
1000 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1001 }
1002 }
1003 } else {
1004 relay_log::error!(
1005 "invalid item of type {} passed to ProcessMetrics",
1006 item.ty()
1007 );
1008 }
1009 }
1010 buckets
1011 }
1012}
1013
1014#[derive(Debug)]
1015pub struct ProcessBatchedMetrics {
1016 pub payload: Bytes,
1018 pub source: BucketSource,
1020 pub received_at: DateTime<Utc>,
1022 pub sent_at: Option<DateTime<Utc>>,
1024}
1025
1026#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
1028pub enum BucketSource {
1029 Internal,
1035 External,
1040}
1041
1042impl BucketSource {
1043 pub fn from_meta(meta: &RequestMeta) -> Self {
1045 match meta.is_from_internal_relay() {
1046 true => Self::Internal,
1047 false => Self::External,
1048 }
1049 }
1050}
1051
1052#[derive(Debug)]
1054pub struct SubmitClientReports {
1055 pub client_reports: Vec<ClientReport>,
1057 pub scoping: Scoping,
1059}
1060
1061#[derive(Debug)]
1063pub enum EnvelopeProcessor {
1064 ProcessEnvelope(Box<ProcessEnvelope>),
1065 ProcessProjectMetrics(Box<ProcessMetrics>),
1066 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
1067 FlushBuckets(Box<FlushBuckets>),
1068 SubmitClientReports(Box<SubmitClientReports>),
1069}
1070
1071impl EnvelopeProcessor {
1072 pub fn variant(&self) -> &'static str {
1074 match self {
1075 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
1076 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
1077 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
1078 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
1079 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
1080 }
1081 }
1082}
1083
1084impl relay_system::Interface for EnvelopeProcessor {}
1085
1086impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
1087 type Response = relay_system::NoResponse;
1088
1089 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
1090 Self::ProcessEnvelope(Box::new(message))
1091 }
1092}
1093
1094impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
1095 type Response = NoResponse;
1096
1097 fn from_message(message: ProcessMetrics, _: ()) -> Self {
1098 Self::ProcessProjectMetrics(Box::new(message))
1099 }
1100}
1101
1102impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
1103 type Response = NoResponse;
1104
1105 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
1106 Self::ProcessBatchedMetrics(Box::new(message))
1107 }
1108}
1109
1110impl FromMessage<FlushBuckets> for EnvelopeProcessor {
1111 type Response = NoResponse;
1112
1113 fn from_message(message: FlushBuckets, _: ()) -> Self {
1114 Self::FlushBuckets(Box::new(message))
1115 }
1116}
1117
1118impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
1119 type Response = NoResponse;
1120
1121 fn from_message(message: SubmitClientReports, _: ()) -> Self {
1122 Self::SubmitClientReports(Box::new(message))
1123 }
1124}
1125
1126pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
1128
1129#[derive(Clone)]
1133pub struct EnvelopeProcessorService {
1134 inner: Arc<InnerProcessor>,
1135}
1136
1137pub struct Addrs {
1139 pub outcome_aggregator: Addr<TrackOutcome>,
1140 pub upstream_relay: Addr<UpstreamRelay>,
1141 pub test_store: Addr<TestStore>,
1142 #[cfg(feature = "processing")]
1143 pub store_forwarder: Option<Addr<Store>>,
1144 pub aggregator: Addr<Aggregator>,
1145 #[cfg(feature = "processing")]
1146 pub global_rate_limits: Option<Addr<GlobalRateLimits>>,
1147}
1148
1149impl Default for Addrs {
1150 fn default() -> Self {
1151 Addrs {
1152 outcome_aggregator: Addr::dummy(),
1153 upstream_relay: Addr::dummy(),
1154 test_store: Addr::dummy(),
1155 #[cfg(feature = "processing")]
1156 store_forwarder: None,
1157 aggregator: Addr::dummy(),
1158 #[cfg(feature = "processing")]
1159 global_rate_limits: None,
1160 }
1161 }
1162}
1163
1164struct InnerProcessor {
1165 pool: EnvelopeProcessorServicePool,
1166 config: Arc<Config>,
1167 global_config: GlobalConfigHandle,
1168 project_cache: ProjectCacheHandle,
1169 cogs: Cogs,
1170 #[cfg(feature = "processing")]
1171 quotas_client: Option<AsyncRedisClient>,
1172 addrs: Addrs,
1173 #[cfg(feature = "processing")]
1174 rate_limiter: Option<Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>>,
1175 geoip_lookup: Option<GeoIpLookup>,
1176 #[cfg(feature = "processing")]
1177 cardinality_limiter: Option<CardinalityLimiter>,
1178 metric_outcomes: MetricOutcomes,
1179 processing: Processing,
1180}
1181
1182struct Processing {
1183 logs: LogsProcessor,
1184}
1185
1186impl EnvelopeProcessorService {
1187 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
1189 pub fn new(
1190 pool: EnvelopeProcessorServicePool,
1191 config: Arc<Config>,
1192 global_config: GlobalConfigHandle,
1193 project_cache: ProjectCacheHandle,
1194 cogs: Cogs,
1195 #[cfg(feature = "processing")] redis: Option<RedisClients>,
1196 addrs: Addrs,
1197 metric_outcomes: MetricOutcomes,
1198 ) -> Self {
1199 let geoip_lookup = config.geoip_path().and_then(|p| {
1200 match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
1201 Ok(geoip) => Some(geoip),
1202 Err(err) => {
1203 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
1204 None
1205 }
1206 }
1207 });
1208
1209 #[cfg(feature = "processing")]
1210 let (cardinality, quotas) = match redis {
1211 Some(RedisClients {
1212 cardinality,
1213 quotas,
1214 ..
1215 }) => (Some(cardinality), Some(quotas)),
1216 None => (None, None),
1217 };
1218
1219 #[cfg(feature = "processing")]
1220 let global_rate_limits = addrs.global_rate_limits.clone().map(Into::into);
1221
1222 #[cfg(feature = "processing")]
1223 let rate_limiter = match (quotas.clone(), global_rate_limits) {
1224 (Some(redis), Some(global)) => {
1225 Some(RedisRateLimiter::new(redis, global).max_limit(config.max_rate_limit()))
1226 }
1227 _ => None,
1228 };
1229
1230 let quota_limiter = Arc::new(QuotaRateLimiter::new(
1231 #[cfg(feature = "processing")]
1232 project_cache.clone(),
1233 #[cfg(feature = "processing")]
1234 rate_limiter.clone(),
1235 ));
1236 #[cfg(feature = "processing")]
1237 let rate_limiter = rate_limiter.map(Arc::new);
1238
1239 let inner = InnerProcessor {
1240 pool,
1241 global_config,
1242 project_cache,
1243 cogs,
1244 #[cfg(feature = "processing")]
1245 quotas_client: quotas.clone(),
1246 #[cfg(feature = "processing")]
1247 rate_limiter,
1248 addrs,
1249 geoip_lookup,
1250 #[cfg(feature = "processing")]
1251 cardinality_limiter: cardinality
1252 .map(|cardinality| {
1253 RedisSetLimiter::new(
1254 RedisSetLimiterOptions {
1255 cache_vacuum_interval: config
1256 .cardinality_limiter_cache_vacuum_interval(),
1257 },
1258 cardinality,
1259 )
1260 })
1261 .map(CardinalityLimiter::new),
1262 metric_outcomes,
1263 processing: Processing {
1264 logs: LogsProcessor::new(quota_limiter),
1265 },
1266 config,
1267 };
1268
1269 Self {
1270 inner: Arc::new(inner),
1271 }
1272 }
1273
1274 #[cfg(feature = "processing")]
1276 fn normalize_checkins(
1277 &self,
1278 managed_envelope: &mut TypedEnvelope<CheckInGroup>,
1279 project_id: ProjectId,
1280 ) {
1281 managed_envelope.retain_items(|item| {
1282 if item.ty() != &ItemType::CheckIn {
1283 return ItemAction::Keep;
1284 }
1285
1286 match relay_monitors::process_check_in(&item.payload(), project_id) {
1287 Ok(result) => {
1288 item.set_routing_hint(result.routing_hint);
1289 item.set_payload(ContentType::Json, result.payload);
1290 ItemAction::Keep
1291 }
1292 Err(error) => {
1293 relay_log::debug!(
1295 error = &error as &dyn Error,
1296 "dropped invalid monitor check-in"
1297 );
1298 ItemAction::DropSilently
1299 }
1300 }
1301 })
1302 }
1303
1304 async fn enforce_quotas<Group>(
1305 &self,
1306 managed_envelope: &mut TypedEnvelope<Group>,
1307 event: Annotated<Event>,
1308 extracted_metrics: &mut ProcessingExtractedMetrics,
1309 project_info: &ProjectInfo,
1310 rate_limits: &RateLimits,
1311 ) -> Result<Annotated<Event>, ProcessingError> {
1312 let global_config = self.inner.global_config.current();
1313 let cached_result = RateLimiter::Cached
1316 .enforce(
1317 managed_envelope,
1318 event,
1319 extracted_metrics,
1320 &global_config,
1321 project_info,
1322 rate_limits,
1323 )
1324 .await?;
1325
1326 if_processing!(self.inner.config, {
1327 let rate_limiter = match self.inner.rate_limiter.clone() {
1328 Some(rate_limiter) => rate_limiter,
1329 None => return Ok(cached_result.event),
1330 };
1331
1332 let consistent_result = RateLimiter::Consistent(rate_limiter)
1334 .enforce(
1335 managed_envelope,
1336 cached_result.event,
1337 extracted_metrics,
1338 &global_config,
1339 project_info,
1340 rate_limits,
1341 )
1342 .await?;
1343
1344 if !consistent_result.rate_limits.is_empty() {
1346 self.inner
1347 .project_cache
1348 .get(managed_envelope.scoping().project_key)
1349 .rate_limits()
1350 .merge(consistent_result.rate_limits);
1351 }
1352
1353 Ok(consistent_result.event)
1354 } else { Ok(cached_result.event) })
1355 }
1356
1357 #[allow(clippy::too_many_arguments)]
1359 fn extract_transaction_metrics(
1360 &self,
1361 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1362 event: &mut Annotated<Event>,
1363 extracted_metrics: &mut ProcessingExtractedMetrics,
1364 project_id: ProjectId,
1365 project_info: Arc<ProjectInfo>,
1366 sampling_decision: SamplingDecision,
1367 event_metrics_extracted: EventMetricsExtracted,
1368 spans_extracted: SpansExtracted,
1369 ) -> Result<EventMetricsExtracted, ProcessingError> {
1370 if event_metrics_extracted.0 {
1371 return Ok(event_metrics_extracted);
1372 }
1373 let Some(event) = event.value_mut() else {
1374 return Ok(event_metrics_extracted);
1375 };
1376
1377 let global = self.inner.global_config.current();
1381 let combined_config = {
1382 let config = match &project_info.config.metric_extraction {
1383 ErrorBoundary::Ok(config) if config.is_supported() => config,
1384 _ => return Ok(event_metrics_extracted),
1385 };
1386 let global_config = match &global.metric_extraction {
1387 ErrorBoundary::Ok(global_config) => global_config,
1388 #[allow(unused_variables)]
1389 ErrorBoundary::Err(e) => {
1390 if_processing!(self.inner.config, {
1391 relay_log::error!("Failed to parse global extraction config {e}");
1394 MetricExtractionGroups::EMPTY
1395 } else {
1396 relay_log::debug!("Failed to parse global extraction config: {e}");
1399 return Ok(event_metrics_extracted);
1400 })
1401 }
1402 };
1403 CombinedMetricExtractionConfig::new(global_config, config)
1404 };
1405
1406 let tx_config = match &project_info.config.transaction_metrics {
1408 Some(ErrorBoundary::Ok(tx_config)) => tx_config,
1409 Some(ErrorBoundary::Err(e)) => {
1410 relay_log::debug!("Failed to parse legacy transaction metrics config: {e}");
1411 return Ok(event_metrics_extracted);
1412 }
1413 None => {
1414 relay_log::debug!("Legacy transaction metrics config is missing");
1415 return Ok(event_metrics_extracted);
1416 }
1417 };
1418
1419 if !tx_config.is_enabled() {
1420 static TX_CONFIG_ERROR: Once = Once::new();
1421 TX_CONFIG_ERROR.call_once(|| {
1422 if self.inner.config.processing_enabled() {
1423 relay_log::error!(
1424 "Processing Relay outdated, received tx config in version {}, which is not supported",
1425 tx_config.version
1426 );
1427 }
1428 });
1429
1430 return Ok(event_metrics_extracted);
1431 }
1432
1433 let extract_spans = !spans_extracted.0
1435 && project_info.config.features.produces_spans()
1436 && utils::sample(global.options.span_extraction_sample_rate.unwrap_or(1.0)).is_keep();
1437
1438 let metrics = crate::metrics_extraction::event::extract_metrics(
1439 event,
1440 combined_config,
1441 sampling_decision,
1442 project_id,
1443 self.inner
1444 .config
1445 .aggregator_config_for(MetricNamespace::Spans)
1446 .max_tag_value_length,
1447 extract_spans,
1448 );
1449
1450 extracted_metrics.extend(metrics, Some(sampling_decision));
1451
1452 if !project_info.has_feature(Feature::DiscardTransaction) {
1453 let transaction_from_dsc = managed_envelope
1454 .envelope()
1455 .dsc()
1456 .and_then(|dsc| dsc.transaction.as_deref());
1457
1458 let extractor = TransactionExtractor {
1459 config: tx_config,
1460 generic_config: Some(combined_config),
1461 transaction_from_dsc,
1462 sampling_decision,
1463 target_project_id: project_id,
1464 };
1465
1466 extracted_metrics.extend(extractor.extract(event)?, Some(sampling_decision));
1467 }
1468
1469 Ok(EventMetricsExtracted(true))
1470 }
1471
1472 fn normalize_event<Group: EventProcessing>(
1473 &self,
1474 managed_envelope: &mut TypedEnvelope<Group>,
1475 event: &mut Annotated<Event>,
1476 project_id: ProjectId,
1477 project_info: Arc<ProjectInfo>,
1478 mut event_fully_normalized: EventFullyNormalized,
1479 ) -> Result<EventFullyNormalized, ProcessingError> {
1480 if event.value().is_empty() {
1481 return Ok(event_fully_normalized);
1486 }
1487
1488 let full_normalization = match self.inner.config.normalization_level() {
1489 NormalizationLevel::Full => true,
1490 NormalizationLevel::Default => {
1491 if self.inner.config.processing_enabled() && event_fully_normalized.0 {
1492 return Ok(event_fully_normalized);
1493 }
1494
1495 self.inner.config.processing_enabled()
1496 }
1497 };
1498
1499 let request_meta = managed_envelope.envelope().meta();
1500 let client_ipaddr = request_meta.client_addr().map(IpAddr::from);
1501
1502 let transaction_aggregator_config = self
1503 .inner
1504 .config
1505 .aggregator_config_for(MetricNamespace::Transactions);
1506
1507 let global_config = self.inner.global_config.current();
1508 let ai_model_costs = global_config.ai_model_costs.clone().ok();
1509 let http_span_allowed_hosts = global_config.options.http_span_allowed_hosts.as_slice();
1510
1511 let retention_days: i64 = project_info
1512 .config
1513 .event_retention
1514 .unwrap_or(DEFAULT_EVENT_RETENTION)
1515 .into();
1516
1517 utils::log_transaction_name_metrics(event, |event| {
1518 let event_validation_config = EventValidationConfig {
1519 received_at: Some(managed_envelope.received_at()),
1520 max_secs_in_past: Some(retention_days * 24 * 3600),
1521 max_secs_in_future: Some(self.inner.config.max_secs_in_future()),
1522 transaction_timestamp_range: Some(transaction_aggregator_config.timestamp_range()),
1523 is_validated: false,
1524 };
1525
1526 let key_id = project_info
1527 .get_public_key_config()
1528 .and_then(|key| Some(key.numeric_id?.to_string()));
1529 if full_normalization && key_id.is_none() {
1530 relay_log::error!(
1531 "project state for key {} is missing key id",
1532 managed_envelope.envelope().meta().public_key()
1533 );
1534 }
1535
1536 let normalization_config = NormalizationConfig {
1537 project_id: Some(project_id.value()),
1538 client: request_meta.client().map(str::to_owned),
1539 key_id,
1540 protocol_version: Some(request_meta.version().to_string()),
1541 grouping_config: project_info.config.grouping_config.clone(),
1542 client_ip: client_ipaddr.as_ref(),
1543 infer_ip_address: !project_info
1545 .config
1546 .datascrubbing_settings
1547 .scrub_ip_addresses,
1548 client_sample_rate: managed_envelope
1549 .envelope()
1550 .dsc()
1551 .and_then(|ctx| ctx.sample_rate),
1552 user_agent: RawUserAgentInfo {
1553 user_agent: request_meta.user_agent(),
1554 client_hints: request_meta.client_hints().as_deref(),
1555 },
1556 max_name_and_unit_len: Some(
1557 transaction_aggregator_config
1558 .max_name_length
1559 .saturating_sub(MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD),
1560 ),
1561 breakdowns_config: project_info.config.breakdowns_v2.as_ref(),
1562 performance_score: project_info.config.performance_score.as_ref(),
1563 normalize_user_agent: Some(true),
1564 transaction_name_config: TransactionNameConfig {
1565 rules: &project_info.config.tx_name_rules,
1566 },
1567 device_class_synthesis_config: project_info
1568 .has_feature(Feature::DeviceClassSynthesis),
1569 enrich_spans: project_info.has_feature(Feature::ExtractSpansFromEvent)
1570 || project_info.has_feature(Feature::ExtractCommonSpanMetricsFromEvent),
1571 max_tag_value_length: self
1572 .inner
1573 .config
1574 .aggregator_config_for(MetricNamespace::Spans)
1575 .max_tag_value_length,
1576 is_renormalize: false,
1577 remove_other: full_normalization,
1578 emit_event_errors: full_normalization,
1579 span_description_rules: project_info.config.span_description_rules.as_ref(),
1580 geoip_lookup: self.inner.geoip_lookup.as_ref(),
1581 ai_model_costs: ai_model_costs.as_ref(),
1582 enable_trimming: true,
1583 measurements: Some(CombinedMeasurementsConfig::new(
1584 project_info.config().measurements.as_ref(),
1585 global_config.measurements.as_ref(),
1586 )),
1587 normalize_spans: true,
1588 replay_id: managed_envelope
1589 .envelope()
1590 .dsc()
1591 .and_then(|ctx| ctx.replay_id),
1592 span_allowed_hosts: http_span_allowed_hosts,
1593 span_op_defaults: global_config.span_op_defaults.borrow(),
1594 performance_issues_spans: project_info.has_feature(Feature::PerformanceIssuesSpans),
1595 };
1596
1597 metric!(timer(RelayTimers::EventProcessingNormalization), {
1598 validate_event(event, &event_validation_config)
1599 .map_err(|_| ProcessingError::InvalidTransaction)?;
1600 normalize_event(event, &normalization_config);
1601 if full_normalization && event::has_unprintable_fields(event) {
1602 metric!(counter(RelayCounters::EventCorrupted) += 1);
1603 }
1604 Result::<(), ProcessingError>::Ok(())
1605 })
1606 })?;
1607
1608 event_fully_normalized.0 |= full_normalization;
1609
1610 Ok(event_fully_normalized)
1611 }
1612
1613 async fn process_errors(
1615 &self,
1616 managed_envelope: &mut TypedEnvelope<ErrorGroup>,
1617 project_id: ProjectId,
1618 project_info: Arc<ProjectInfo>,
1619 mut sampling_project_info: Option<Arc<ProjectInfo>>,
1620 rate_limits: Arc<RateLimits>,
1621 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1622 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1623 let mut metrics = Metrics::default();
1624 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1625
1626 report::process_user_reports(managed_envelope);
1628
1629 if_processing!(self.inner.config, {
1630 unreal::expand(managed_envelope, &self.inner.config)?;
1631 #[cfg(sentry)]
1632 playstation::expand(managed_envelope, &self.inner.config, &project_info)?;
1633 nnswitch::expand(managed_envelope)?;
1634 });
1635
1636 let extraction_result = event::extract(
1637 managed_envelope,
1638 &mut metrics,
1639 event_fully_normalized,
1640 &self.inner.config,
1641 )?;
1642 let mut event = extraction_result.event;
1643
1644 if_processing!(self.inner.config, {
1645 if let Some(inner_event_fully_normalized) =
1646 unreal::process(managed_envelope, &mut event)?
1647 {
1648 event_fully_normalized = inner_event_fully_normalized;
1649 }
1650 #[cfg(sentry)]
1651 if let Some(inner_event_fully_normalized) =
1652 playstation::process(managed_envelope, &mut event, &project_info)?
1653 {
1654 event_fully_normalized = inner_event_fully_normalized;
1655 }
1656 if let Some(inner_event_fully_normalized) =
1657 attachment::create_placeholders(managed_envelope, &mut event, &mut metrics)
1658 {
1659 event_fully_normalized = inner_event_fully_normalized;
1660 }
1661 });
1662
1663 sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1664 managed_envelope,
1665 &mut event,
1666 project_info.clone(),
1667 sampling_project_info,
1668 );
1669 event::finalize(
1670 managed_envelope,
1671 &mut event,
1672 &mut metrics,
1673 &self.inner.config,
1674 )?;
1675 event_fully_normalized = self.normalize_event(
1676 managed_envelope,
1677 &mut event,
1678 project_id,
1679 project_info.clone(),
1680 event_fully_normalized,
1681 )?;
1682 let filter_run = event::filter(
1683 managed_envelope,
1684 &mut event,
1685 project_info.clone(),
1686 &self.inner.global_config.current(),
1687 )?;
1688
1689 if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) {
1690 dynamic_sampling::tag_error_with_sampling_decision(
1691 managed_envelope,
1692 &mut event,
1693 sampling_project_info,
1694 &self.inner.config,
1695 )
1696 .await;
1697 }
1698
1699 event = self
1700 .enforce_quotas(
1701 managed_envelope,
1702 event,
1703 &mut extracted_metrics,
1704 &project_info,
1705 &rate_limits,
1706 )
1707 .await?;
1708
1709 if event.value().is_some() {
1710 event::scrub(&mut event, project_info.clone())?;
1711 event::serialize(
1712 managed_envelope,
1713 &mut event,
1714 event_fully_normalized,
1715 EventMetricsExtracted(false),
1716 SpansExtracted(false),
1717 )?;
1718 event::emit_feedback_metrics(managed_envelope.envelope());
1719 }
1720
1721 attachment::scrub(managed_envelope, project_info);
1722
1723 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1724 relay_log::error!(
1725 tags.project = %project_id,
1726 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1727 "ingested event without normalizing"
1728 );
1729 }
1730
1731 Ok(Some(extracted_metrics))
1732 }
1733
1734 #[allow(unused_assignments)]
1736 #[allow(clippy::too_many_arguments)]
1737 async fn process_transactions(
1738 &self,
1739 managed_envelope: &mut TypedEnvelope<TransactionGroup>,
1740 cogs: &mut Token,
1741 config: Arc<Config>,
1742 project_id: ProjectId,
1743 project_info: Arc<ProjectInfo>,
1744 mut sampling_project_info: Option<Arc<ProjectInfo>>,
1745 rate_limits: Arc<RateLimits>,
1746 reservoir_counters: ReservoirCounters,
1747 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1748 let mut event_fully_normalized = EventFullyNormalized::new(managed_envelope.envelope());
1749 let mut event_metrics_extracted = EventMetricsExtracted(false);
1750 let mut spans_extracted = SpansExtracted(false);
1751 let mut metrics = Metrics::default();
1752 let mut extracted_metrics = ProcessingExtractedMetrics::new();
1753
1754 let global_config = self.inner.global_config.current();
1755
1756 transaction::drop_invalid_items(managed_envelope, &global_config);
1757
1758 relay_cogs::with!(cogs, "event_extract", {
1759 let extraction_result = event::extract(
1761 managed_envelope,
1762 &mut metrics,
1763 event_fully_normalized,
1764 &self.inner.config,
1765 )?;
1766 });
1767
1768 if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted {
1770 event_metrics_extracted = inner_event_metrics_extracted;
1771 }
1772 if let Some(inner_spans_extracted) = extraction_result.spans_extracted {
1773 spans_extracted = inner_spans_extracted;
1774 };
1775
1776 let mut event = extraction_result.event;
1778
1779 relay_cogs::with!(cogs, "profile_filter", {
1780 let profile_id = profile::filter(
1781 managed_envelope,
1782 &event,
1783 config.clone(),
1784 project_id,
1785 &project_info,
1786 );
1787 profile::transfer_id(&mut event, profile_id);
1788 });
1789
1790 relay_cogs::with!(cogs, "dynamic_sampling_dsc", {
1791 sampling_project_info = dynamic_sampling::validate_and_set_dsc(
1792 managed_envelope,
1793 &mut event,
1794 project_info.clone(),
1795 sampling_project_info,
1796 );
1797 });
1798
1799 relay_cogs::with!(cogs, "event_finalize", {
1800 event::finalize(
1801 managed_envelope,
1802 &mut event,
1803 &mut metrics,
1804 &self.inner.config,
1805 )?;
1806 });
1807
1808 relay_cogs::with!(cogs, "event_normalize", {
1809 event_fully_normalized = self.normalize_event(
1810 managed_envelope,
1811 &mut event,
1812 project_id,
1813 project_info.clone(),
1814 event_fully_normalized,
1815 )?;
1816 });
1817
1818 relay_cogs::with!(cogs, "filter", {
1819 let filter_run = event::filter(
1820 managed_envelope,
1821 &mut event,
1822 project_info.clone(),
1823 &self.inner.global_config.current(),
1824 )?;
1825 });
1826
1827 let run_dynamic_sampling =
1830 matches!(filter_run, FiltersStatus::Ok) || self.inner.config.processing_enabled();
1831
1832 let reservoir = self.new_reservoir_evaluator(
1833 managed_envelope.scoping().organization_id,
1834 reservoir_counters,
1835 );
1836
1837 relay_cogs::with!(cogs, "dynamic_sampling_run", {
1838 let sampling_result = match run_dynamic_sampling {
1839 true => {
1840 dynamic_sampling::run(
1841 managed_envelope,
1842 &mut event,
1843 config.clone(),
1844 project_info.clone(),
1845 sampling_project_info,
1846 &reservoir,
1847 )
1848 .await
1849 }
1850 false => SamplingResult::Pending,
1851 };
1852 });
1853
1854 #[cfg(feature = "processing")]
1855 let server_sample_rate = match sampling_result {
1856 SamplingResult::Match(ref sampling_match) => Some(sampling_match.sample_rate()),
1857 SamplingResult::NoMatch | SamplingResult::Pending => None,
1858 };
1859
1860 if let Some(outcome) = sampling_result.into_dropped_outcome() {
1861 profile::process(
1864 managed_envelope,
1865 &mut event,
1866 &global_config,
1867 config.clone(),
1868 project_info.clone(),
1869 );
1870 event_metrics_extracted = self.extract_transaction_metrics(
1872 managed_envelope,
1873 &mut event,
1874 &mut extracted_metrics,
1875 project_id,
1876 project_info.clone(),
1877 SamplingDecision::Drop,
1878 event_metrics_extracted,
1879 spans_extracted,
1880 )?;
1881
1882 dynamic_sampling::drop_unsampled_items(
1883 managed_envelope,
1884 event,
1885 outcome,
1886 spans_extracted,
1887 );
1888
1889 event = self
1894 .enforce_quotas(
1895 managed_envelope,
1896 Annotated::empty(),
1897 &mut extracted_metrics,
1898 &project_info,
1899 &rate_limits,
1900 )
1901 .await?;
1902
1903 return Ok(Some(extracted_metrics));
1904 }
1905
1906 let _post_ds = cogs.start_category("post_ds");
1907
1908 event::scrub(&mut event, project_info.clone())?;
1912
1913 attachment::scrub(managed_envelope, project_info.clone());
1915
1916 if_processing!(self.inner.config, {
1917 let profile_id = profile::process(
1919 managed_envelope,
1920 &mut event,
1921 &global_config,
1922 config.clone(),
1923 project_info.clone(),
1924 );
1925 profile::transfer_id(&mut event, profile_id);
1926 profile::scrub_profiler_id(&mut event);
1927
1928 event_metrics_extracted = self.extract_transaction_metrics(
1930 managed_envelope,
1931 &mut event,
1932 &mut extracted_metrics,
1933 project_id,
1934 project_info.clone(),
1935 SamplingDecision::Keep,
1936 event_metrics_extracted,
1937 spans_extracted,
1938 )?;
1939
1940 if project_info.has_feature(Feature::ExtractSpansFromEvent) {
1941 spans_extracted = span::extract_from_event(
1942 managed_envelope,
1943 &event,
1944 &global_config,
1945 config,
1946 server_sample_rate,
1947 event_metrics_extracted,
1948 spans_extracted,
1949 );
1950 }
1951 });
1952
1953 event = self
1954 .enforce_quotas(
1955 managed_envelope,
1956 event,
1957 &mut extracted_metrics,
1958 &project_info,
1959 &rate_limits,
1960 )
1961 .await?;
1962
1963 if_processing!(self.inner.config, {
1964 event = span::maybe_discard_transaction(managed_envelope, event, project_info);
1965 });
1966
1967 if event.value().is_some() {
1969 event::serialize(
1970 managed_envelope,
1971 &mut event,
1972 event_fully_normalized,
1973 event_metrics_extracted,
1974 spans_extracted,
1975 )?;
1976 }
1977
1978 if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
1979 relay_log::error!(
1980 tags.project = %project_id,
1981 tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()),
1982 "ingested event without normalizing"
1983 );
1984 };
1985
1986 Ok(Some(extracted_metrics))
1987 }
1988
1989 async fn process_profile_chunks(
1990 &self,
1991 managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
1992 project_info: Arc<ProjectInfo>,
1993 rate_limits: Arc<RateLimits>,
1994 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
1995 profile_chunk::filter(managed_envelope, project_info.clone());
1996
1997 if_processing!(self.inner.config, {
1998 profile_chunk::process(
1999 managed_envelope,
2000 &project_info,
2001 &self.inner.global_config.current(),
2002 &self.inner.config,
2003 );
2004 });
2005
2006 self.enforce_quotas(
2007 managed_envelope,
2008 Annotated::empty(),
2009 &mut ProcessingExtractedMetrics::new(),
2010 &project_info,
2011 &rate_limits,
2012 )
2013 .await?;
2014
2015 Ok(None)
2016 }
2017
2018 async fn process_standalone(
2020 &self,
2021 managed_envelope: &mut TypedEnvelope<StandaloneGroup>,
2022 config: Arc<Config>,
2023 project_id: ProjectId,
2024 project_info: Arc<ProjectInfo>,
2025 rate_limits: Arc<RateLimits>,
2026 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2027 let mut extracted_metrics = ProcessingExtractedMetrics::new();
2028
2029 standalone::process(managed_envelope);
2030
2031 profile::filter(
2032 managed_envelope,
2033 &Annotated::empty(),
2034 config,
2035 project_id,
2036 &project_info,
2037 );
2038
2039 self.enforce_quotas(
2040 managed_envelope,
2041 Annotated::empty(),
2042 &mut extracted_metrics,
2043 &project_info,
2044 &rate_limits,
2045 )
2046 .await?;
2047
2048 report::process_user_reports(managed_envelope);
2049 attachment::scrub(managed_envelope, project_info);
2050
2051 Ok(Some(extracted_metrics))
2052 }
2053
2054 async fn process_sessions(
2056 &self,
2057 managed_envelope: &mut TypedEnvelope<SessionGroup>,
2058 config: &Config,
2059 project_info: &ProjectInfo,
2060 rate_limits: &RateLimits,
2061 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2062 let mut extracted_metrics = ProcessingExtractedMetrics::new();
2063
2064 session::process(
2065 managed_envelope,
2066 &self.inner.global_config.current(),
2067 config,
2068 &mut extracted_metrics,
2069 project_info,
2070 );
2071
2072 self.enforce_quotas(
2073 managed_envelope,
2074 Annotated::empty(),
2075 &mut extracted_metrics,
2076 project_info,
2077 rate_limits,
2078 )
2079 .await?;
2080
2081 Ok(Some(extracted_metrics))
2082 }
2083
2084 async fn process_client_reports(
2086 &self,
2087 managed_envelope: &mut TypedEnvelope<ClientReportGroup>,
2088 config: Arc<Config>,
2089 project_info: Arc<ProjectInfo>,
2090 rate_limits: Arc<RateLimits>,
2091 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2092 let mut extracted_metrics = ProcessingExtractedMetrics::new();
2093
2094 self.enforce_quotas(
2095 managed_envelope,
2096 Annotated::empty(),
2097 &mut extracted_metrics,
2098 &project_info,
2099 &rate_limits,
2100 )
2101 .await?;
2102
2103 report::process_client_reports(
2104 managed_envelope,
2105 &config,
2106 &project_info,
2107 self.inner.addrs.outcome_aggregator.clone(),
2108 );
2109
2110 Ok(Some(extracted_metrics))
2111 }
2112
2113 async fn process_replays(
2115 &self,
2116 managed_envelope: &mut TypedEnvelope<ReplayGroup>,
2117 config: Arc<Config>,
2118 project_info: Arc<ProjectInfo>,
2119 rate_limits: Arc<RateLimits>,
2120 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2121 let mut extracted_metrics = ProcessingExtractedMetrics::new();
2122
2123 replay::process(
2124 managed_envelope,
2125 &self.inner.global_config.current(),
2126 &config,
2127 &project_info,
2128 self.inner.geoip_lookup.as_ref(),
2129 )?;
2130
2131 self.enforce_quotas(
2132 managed_envelope,
2133 Annotated::empty(),
2134 &mut extracted_metrics,
2135 &project_info,
2136 &rate_limits,
2137 )
2138 .await?;
2139
2140 Ok(Some(extracted_metrics))
2141 }
2142
2143 async fn process_checkins(
2145 &self,
2146 managed_envelope: &mut TypedEnvelope<CheckInGroup>,
2147 _project_id: ProjectId,
2148 project_info: Arc<ProjectInfo>,
2149 rate_limits: Arc<RateLimits>,
2150 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2151 self.enforce_quotas(
2152 managed_envelope,
2153 Annotated::empty(),
2154 &mut ProcessingExtractedMetrics::new(),
2155 &project_info,
2156 &rate_limits,
2157 )
2158 .await?;
2159
2160 if_processing!(self.inner.config, {
2161 self.normalize_checkins(managed_envelope, _project_id);
2162 });
2163
2164 Ok(None)
2165 }
2166
2167 async fn process_nel(
2168 &self,
2169 mut managed_envelope: ManagedEnvelope,
2170 ctx: processing::Context<'_>,
2171 ) -> Result<ProcessingResult, ProcessingError> {
2172 nel::convert_to_logs(&mut managed_envelope);
2173 self.process_logs(managed_envelope, ctx).await
2174 }
2175
2176 async fn process_logs(
2179 &self,
2180 mut managed_envelope: ManagedEnvelope,
2181 ctx: processing::Context<'_>,
2182 ) -> Result<ProcessingResult, ProcessingError> {
2183 let processor = &self.inner.processing.logs;
2184 let Some(logs) = processor.prepare_envelope(&mut managed_envelope) else {
2185 debug_assert!(
2186 false,
2187 "there must be work for the logs processor in the logs processing group"
2188 );
2189 return Err(ProcessingError::ProcessingGroupMismatch);
2190 };
2191
2192 managed_envelope.update();
2193 match managed_envelope.envelope().is_empty() {
2194 true => managed_envelope.accept(),
2195 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
2196 }
2197
2198 processor
2199 .process(logs, ctx)
2200 .await
2201 .map_err(|_| ProcessingError::ProcessingFailure)
2202 .map(ProcessingResult::Logs)
2203 }
2204
2205 #[allow(clippy::too_many_arguments)]
2209 async fn process_standalone_spans(
2210 &self,
2211 managed_envelope: &mut TypedEnvelope<SpanGroup>,
2212 config: Arc<Config>,
2213 _project_id: ProjectId,
2214 project_info: Arc<ProjectInfo>,
2215 _sampling_project_info: Option<Arc<ProjectInfo>>,
2216 rate_limits: Arc<RateLimits>,
2217 _reservoir_counters: ReservoirCounters,
2218 ) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2219 let mut extracted_metrics = ProcessingExtractedMetrics::new();
2220
2221 span::expand_v2_spans(managed_envelope)?;
2222 span::filter(managed_envelope, config.clone(), project_info.clone());
2223 span::convert_otel_traces_data(managed_envelope);
2224
2225 if_processing!(self.inner.config, {
2226 let global_config = self.inner.global_config.current();
2227 let reservoir = self.new_reservoir_evaluator(
2228 managed_envelope.scoping().organization_id,
2229 _reservoir_counters,
2230 );
2231
2232 span::process(
2233 managed_envelope,
2234 &mut Annotated::empty(),
2235 &mut extracted_metrics,
2236 &global_config,
2237 config,
2238 _project_id,
2239 project_info.clone(),
2240 _sampling_project_info,
2241 self.inner.geoip_lookup.as_ref(),
2242 &reservoir,
2243 )
2244 .await;
2245 });
2246
2247 self.enforce_quotas(
2248 managed_envelope,
2249 Annotated::empty(),
2250 &mut extracted_metrics,
2251 &project_info,
2252 &rate_limits,
2253 )
2254 .await?;
2255
2256 Ok(Some(extracted_metrics))
2257 }
2258
2259 async fn process_envelope(
2260 &self,
2261 cogs: &mut Token,
2262 project_id: ProjectId,
2263 message: ProcessEnvelopeGrouped,
2264 ) -> Result<ProcessingResult, ProcessingError> {
2265 let ProcessEnvelopeGrouped {
2266 group,
2267 envelope: mut managed_envelope,
2268 project_info,
2269 rate_limits,
2270 sampling_project_info,
2271 reservoir_counters,
2272 } = message;
2273
2274 if let Some(sampling_state) = sampling_project_info.as_ref() {
2276 managed_envelope
2279 .envelope_mut()
2280 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
2281 }
2282
2283 if let Some(retention) = project_info.config.event_retention {
2286 managed_envelope.envelope_mut().set_retention(retention);
2287 }
2288
2289 managed_envelope
2294 .envelope_mut()
2295 .meta_mut()
2296 .set_project_id(project_id);
2297
2298 macro_rules! run {
2299 ($fn_name:ident $(, $args:expr)*) => {
2300 async {
2301 let mut managed_envelope = (managed_envelope, group).try_into()?;
2302 match self.$fn_name(&mut managed_envelope, $($args),*).await {
2303 Ok(extracted_metrics) => Ok(ProcessingResult::Envelope {
2304 managed_envelope: managed_envelope.into_processed(),
2305 extracted_metrics: extracted_metrics.map_or(ProcessingExtractedMetrics::new(), |e| e)
2306 }),
2307 Err(error) => {
2308 relay_log::trace!("Executing {fn} failed: {error}", fn = stringify!($fn_name), error = error);
2309 if let Some(outcome) = error.to_outcome() {
2310 managed_envelope.reject(outcome);
2311 }
2312
2313 return Err(error);
2314 }
2315 }
2316 }.await
2317 };
2318 }
2319
2320 let global_config = self.inner.global_config.current();
2321 let ctx = processing::Context {
2322 config: &self.inner.config,
2323 global_config: &global_config,
2324 project_info: &project_info,
2325 rate_limits: &rate_limits,
2326 };
2327
2328 relay_log::trace!("Processing {group} group", group = group.variant());
2329
2330 match group {
2331 ProcessingGroup::Error => run!(
2332 process_errors,
2333 project_id,
2334 project_info,
2335 sampling_project_info,
2336 rate_limits
2337 ),
2338 ProcessingGroup::Transaction => {
2339 run!(
2340 process_transactions,
2341 cogs,
2342 self.inner.config.clone(),
2343 project_id,
2344 project_info,
2345 sampling_project_info,
2346 rate_limits,
2347 reservoir_counters
2348 )
2349 }
2350 ProcessingGroup::Session => run!(
2351 process_sessions,
2352 &self.inner.config.clone(),
2353 &project_info,
2354 &rate_limits
2355 ),
2356 ProcessingGroup::Standalone => run!(
2357 process_standalone,
2358 self.inner.config.clone(),
2359 project_id,
2360 project_info,
2361 rate_limits
2362 ),
2363 ProcessingGroup::ClientReport => run!(
2364 process_client_reports,
2365 self.inner.config.clone(),
2366 project_info,
2367 rate_limits
2368 ),
2369 ProcessingGroup::Replay => {
2370 run!(
2371 process_replays,
2372 self.inner.config.clone(),
2373 project_info,
2374 rate_limits
2375 )
2376 }
2377 ProcessingGroup::CheckIn => {
2378 run!(process_checkins, project_id, project_info, rate_limits)
2379 }
2380 ProcessingGroup::Log => self.process_logs(managed_envelope, ctx).await,
2381 ProcessingGroup::Nel => self.process_nel(managed_envelope, ctx).await,
2382 ProcessingGroup::Span => run!(
2383 process_standalone_spans,
2384 self.inner.config.clone(),
2385 project_id,
2386 project_info,
2387 sampling_project_info,
2388 rate_limits,
2389 reservoir_counters
2390 ),
2391 ProcessingGroup::ProfileChunk => {
2392 run!(process_profile_chunks, project_info, rate_limits)
2393 }
2394 ProcessingGroup::Metrics => {
2396 if self.inner.config.relay_mode() != RelayMode::Proxy {
2399 relay_log::error!(
2400 tags.project = %project_id,
2401 items = ?managed_envelope.envelope().items().next().map(Item::ty),
2402 "received metrics in the process_state"
2403 );
2404 }
2405
2406 Ok(ProcessingResult::no_metrics(
2407 managed_envelope.into_processed(),
2408 ))
2409 }
2410 ProcessingGroup::Ungrouped => {
2412 relay_log::error!(
2413 tags.project = %project_id,
2414 items = ?managed_envelope.envelope().items().next().map(Item::ty),
2415 "could not identify the processing group based on the envelope's items"
2416 );
2417
2418 Ok(ProcessingResult::no_metrics(
2419 managed_envelope.into_processed(),
2420 ))
2421 }
2422 ProcessingGroup::ForwardUnknown => Ok(ProcessingResult::no_metrics(
2426 managed_envelope.into_processed(),
2427 )),
2428 }
2429 }
2430
2431 async fn process(
2437 &self,
2438 cogs: &mut Token,
2439 mut message: ProcessEnvelopeGrouped,
2440 ) -> Result<Option<Submit>, ProcessingError> {
2441 let ProcessEnvelopeGrouped {
2442 ref mut envelope,
2443 ref project_info,
2444 ..
2445 } = message;
2446
2447 let Some(project_id) = project_info
2454 .project_id
2455 .or_else(|| envelope.envelope().meta().project_id())
2456 else {
2457 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2458 return Err(ProcessingError::MissingProjectId);
2459 };
2460
2461 let client = envelope.envelope().meta().client().map(str::to_owned);
2462 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
2463 let project_key = envelope.envelope().meta().public_key();
2464 let sampling_key = envelope.envelope().sampling_key();
2465
2466 let has_ourlogs_new_byte_count =
2467 project_info.has_feature(Feature::OurLogsCalculatedByteCount);
2468
2469 relay_log::configure_scope(|scope| {
2472 scope.set_tag("project", project_id);
2473 if let Some(client) = client {
2474 scope.set_tag("sdk", client);
2475 }
2476 if let Some(user_agent) = user_agent {
2477 scope.set_extra("user_agent", user_agent.into());
2478 }
2479 });
2480
2481 let result = match self.process_envelope(cogs, project_id, message).await {
2482 Ok(ProcessingResult::Envelope {
2483 mut managed_envelope,
2484 extracted_metrics,
2485 }) => {
2486 managed_envelope.update();
2489
2490 let has_metrics = !extracted_metrics.metrics.project_metrics.is_empty();
2491 send_metrics(
2492 extracted_metrics.metrics,
2493 project_key,
2494 sampling_key,
2495 &self.inner.addrs.aggregator,
2496 );
2497
2498 let envelope_response = if managed_envelope.envelope().is_empty() {
2499 if !has_metrics {
2500 managed_envelope.reject(Outcome::RateLimited(None));
2502 } else {
2503 managed_envelope.accept();
2504 }
2505
2506 None
2507 } else {
2508 Some(managed_envelope)
2509 };
2510
2511 Ok(envelope_response.map(Submit::Envelope))
2512 }
2513 Ok(ProcessingResult::Logs(Output { main, metrics })) => {
2514 send_metrics(
2515 metrics.metrics,
2516 project_key,
2517 sampling_key,
2518 &self.inner.addrs.aggregator,
2519 );
2520
2521 if has_ourlogs_new_byte_count {
2522 Ok(Some(Submit::Logs(main)))
2523 } else {
2524 let envelope = main
2525 .serialize_envelope()
2526 .map_err(|_| ProcessingError::ProcessingEnvelopeSerialization)?;
2527 let managed_envelope = ManagedEnvelope::from(envelope);
2528
2529 Ok(Some(Submit::Envelope(managed_envelope.into_processed())))
2530 }
2531 }
2532 Err(err) => Err(err),
2533 };
2534
2535 relay_log::configure_scope(|scope| {
2536 scope.remove_tag("project");
2537 scope.remove_tag("sdk");
2538 scope.remove_tag("user_agent");
2539 });
2540
2541 result
2542 }
2543
2544 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
2545 let project_key = message.envelope.envelope().meta().public_key();
2546 let wait_time = message.envelope.age();
2547 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
2548
2549 cogs.cancel();
2552
2553 let scoping = message.envelope.scoping();
2554 for (group, envelope) in ProcessingGroup::split_envelope(*message.envelope.into_envelope())
2555 {
2556 let mut cogs = self
2557 .inner
2558 .cogs
2559 .timed(ResourceId::Relay, AppFeature::from(group));
2560
2561 let mut envelope = ManagedEnvelope::new(
2562 envelope,
2563 self.inner.addrs.outcome_aggregator.clone(),
2564 self.inner.addrs.test_store.clone(),
2565 );
2566 envelope.scope(scoping);
2567
2568 let message = ProcessEnvelopeGrouped {
2569 group,
2570 envelope,
2571 project_info: Arc::clone(&message.project_info),
2572 rate_limits: Arc::clone(&message.rate_limits),
2573 sampling_project_info: message.sampling_project_info.as_ref().map(Arc::clone),
2574 reservoir_counters: Arc::clone(&message.reservoir_counters),
2575 };
2576
2577 let result = metric!(
2578 timer(RelayTimers::EnvelopeProcessingTime),
2579 group = group.variant(),
2580 { self.process(&mut cogs, message).await }
2581 );
2582
2583 match result {
2584 Ok(Some(envelope)) => self.submit_upstream(&mut cogs, envelope),
2585 Ok(None) => {}
2586 Err(error) if error.is_unexpected() => {
2587 relay_log::error!(
2588 tags.project_key = %project_key,
2589 error = &error as &dyn Error,
2590 "error processing envelope"
2591 )
2592 }
2593 Err(error) => {
2594 relay_log::debug!(
2595 tags.project_key = %project_key,
2596 error = &error as &dyn Error,
2597 "error processing envelope"
2598 )
2599 }
2600 }
2601 }
2602 }
2603
2604 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
2605 let ProcessMetrics {
2606 data,
2607 project_key,
2608 received_at,
2609 sent_at,
2610 source,
2611 } = message;
2612
2613 let received_timestamp =
2614 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
2615
2616 let mut buckets = data.into_buckets(received_timestamp);
2617 if buckets.is_empty() {
2618 return;
2619 };
2620 cogs.update(relay_metrics::cogs::BySize(&buckets));
2621
2622 let clock_drift_processor =
2623 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
2624
2625 buckets.retain_mut(|bucket| {
2626 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
2627 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
2628 return false;
2629 }
2630
2631 if !self::metrics::is_valid_namespace(bucket, source) {
2632 return false;
2633 }
2634
2635 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
2636
2637 if !matches!(source, BucketSource::Internal) {
2638 bucket.metadata = BucketMetadata::new(received_timestamp);
2639 }
2640
2641 true
2642 });
2643
2644 let project = self.inner.project_cache.get(project_key);
2645
2646 let buckets = match project.state() {
2649 ProjectState::Enabled(project_info) => {
2650 let rate_limits = project.rate_limits().current_limits();
2651 self.check_buckets(project_key, project_info, &rate_limits, buckets)
2652 }
2653 _ => buckets,
2654 };
2655
2656 relay_log::trace!("merging metric buckets into the aggregator");
2657 self.inner
2658 .addrs
2659 .aggregator
2660 .send(MergeBuckets::new(project_key, buckets));
2661 }
2662
2663 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
2664 let ProcessBatchedMetrics {
2665 payload,
2666 source,
2667 received_at,
2668 sent_at,
2669 } = message;
2670
2671 #[derive(serde::Deserialize)]
2672 struct Wrapper {
2673 buckets: HashMap<ProjectKey, Vec<Bucket>>,
2674 }
2675
2676 let buckets = match serde_json::from_slice(&payload) {
2677 Ok(Wrapper { buckets }) => buckets,
2678 Err(error) => {
2679 relay_log::debug!(
2680 error = &error as &dyn Error,
2681 "failed to parse batched metrics",
2682 );
2683 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
2684 return;
2685 }
2686 };
2687
2688 for (project_key, buckets) in buckets {
2689 self.handle_process_metrics(
2690 cogs,
2691 ProcessMetrics {
2692 data: MetricData::Parsed(buckets),
2693 project_key,
2694 source,
2695 received_at,
2696 sent_at,
2697 },
2698 )
2699 }
2700 }
2701
2702 fn submit_upstream(&self, cogs: &mut Token, submit: Submit) {
2703 let _submit = cogs.start_category("submit");
2704
2705 #[cfg(feature = "processing")]
2706 if self.inner.config.processing_enabled() {
2707 if let Some(store_forwarder) = &self.inner.addrs.store_forwarder {
2708 match submit {
2709 Submit::Envelope(envelope) => store_forwarder.send(StoreEnvelope { envelope }),
2710 Submit::Logs(output) => output
2711 .forward_store(store_forwarder)
2712 .unwrap_or_else(|err| err.into_inner()),
2713 }
2714 return;
2715 }
2716 }
2717
2718 let mut envelope = match submit {
2719 Submit::Envelope(envelope) => envelope,
2720 Submit::Logs(output) => match output.serialize_envelope() {
2721 Ok(envelope) => ManagedEnvelope::from(envelope).into_processed(),
2722 Err(_) => {
2723 relay_log::error!("failed to serialize output to an envelope");
2724 return;
2725 }
2726 },
2727 };
2728
2729 if Capture::should_capture(&self.inner.config) {
2731 relay_log::trace!("capturing envelope in memory");
2732 self.inner
2733 .addrs
2734 .test_store
2735 .send(Capture::accepted(envelope));
2736 return;
2737 }
2738
2739 envelope.envelope_mut().set_sent_at(Utc::now());
2745
2746 relay_log::trace!("sending envelope to sentry endpoint");
2747 let http_encoding = self.inner.config.http_encoding();
2748 let result = envelope.envelope().to_vec().and_then(|v| {
2749 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
2750 });
2751
2752 match result {
2753 Ok(body) => {
2754 self.inner
2755 .addrs
2756 .upstream_relay
2757 .send(SendRequest(SendEnvelope {
2758 envelope,
2759 body,
2760 http_encoding,
2761 project_cache: self.inner.project_cache.clone(),
2762 }));
2763 }
2764 Err(error) => {
2765 relay_log::error!(
2768 error = &error as &dyn Error,
2769 tags.project_key = %envelope.scoping().project_key,
2770 "failed to serialize envelope payload"
2771 );
2772
2773 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2774 }
2775 }
2776 }
2777
2778 fn handle_submit_client_reports(&self, cogs: &mut Token, message: SubmitClientReports) {
2779 let SubmitClientReports {
2780 client_reports,
2781 scoping,
2782 } = message;
2783
2784 let upstream = self.inner.config.upstream_descriptor();
2785 let dsn = PartialDsn::outbound(&scoping, upstream);
2786
2787 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
2788 for client_report in client_reports {
2789 let mut item = Item::new(ItemType::ClientReport);
2790 item.set_payload(ContentType::Json, client_report.serialize().unwrap()); envelope.add_item(item);
2792 }
2793
2794 let envelope = ManagedEnvelope::new(
2795 envelope,
2796 self.inner.addrs.outcome_aggregator.clone(),
2797 self.inner.addrs.test_store.clone(),
2798 );
2799 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
2800 }
2801
2802 fn check_buckets(
2803 &self,
2804 project_key: ProjectKey,
2805 project_info: &ProjectInfo,
2806 rate_limits: &RateLimits,
2807 buckets: Vec<Bucket>,
2808 ) -> Vec<Bucket> {
2809 let Some(scoping) = project_info.scoping(project_key) else {
2810 relay_log::error!(
2811 tags.project_key = project_key.as_str(),
2812 "there is no scoping: dropping {} buckets",
2813 buckets.len(),
2814 );
2815 return Vec::new();
2816 };
2817
2818 let mut buckets = self::metrics::apply_project_info(
2819 buckets,
2820 &self.inner.metric_outcomes,
2821 project_info,
2822 scoping,
2823 );
2824
2825 let namespaces: BTreeSet<MetricNamespace> = buckets
2826 .iter()
2827 .filter_map(|bucket| bucket.name.try_namespace())
2828 .collect();
2829
2830 for namespace in namespaces {
2831 let limits = rate_limits.check_with_quotas(
2832 project_info.get_quotas(),
2833 scoping.item(DataCategory::MetricBucket),
2834 );
2835
2836 if limits.is_limited() {
2837 let rejected;
2838 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2839 bucket.name.try_namespace() == Some(namespace)
2840 });
2841
2842 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2843 self.inner.metric_outcomes.track(
2844 scoping,
2845 &rejected,
2846 Outcome::RateLimited(reason_code),
2847 );
2848 }
2849 }
2850
2851 let quotas = project_info.config.quotas.clone();
2852 match MetricsLimiter::create(buckets, quotas, scoping) {
2853 Ok(mut bucket_limiter) => {
2854 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
2855 bucket_limiter.into_buckets()
2856 }
2857 Err(buckets) => buckets,
2858 }
2859 }
2860
2861 #[cfg(feature = "processing")]
2862 async fn rate_limit_buckets(
2863 &self,
2864 scoping: Scoping,
2865 project_info: &ProjectInfo,
2866 mut buckets: Vec<Bucket>,
2867 ) -> Vec<Bucket> {
2868 let Some(rate_limiter) = &self.inner.rate_limiter else {
2869 return buckets;
2870 };
2871
2872 let global_config = self.inner.global_config.current();
2873 let namespaces = buckets
2874 .iter()
2875 .filter_map(|bucket| bucket.name.try_namespace())
2876 .counts();
2877
2878 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
2879
2880 for (namespace, quantity) in namespaces {
2881 let item_scoping = scoping.metric_bucket(namespace);
2882
2883 let limits = match rate_limiter
2884 .is_rate_limited(quotas, item_scoping, quantity, false)
2885 .await
2886 {
2887 Ok(limits) => limits,
2888 Err(err) => {
2889 relay_log::error!(
2890 error = &err as &dyn std::error::Error,
2891 "failed to check redis rate limits"
2892 );
2893 break;
2894 }
2895 };
2896
2897 if limits.is_limited() {
2898 let rejected;
2899 (buckets, rejected) = utils::split_off(buckets, |bucket| {
2900 bucket.name.try_namespace() == Some(namespace)
2901 });
2902
2903 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
2904 self.inner.metric_outcomes.track(
2905 scoping,
2906 &rejected,
2907 Outcome::RateLimited(reason_code),
2908 );
2909
2910 self.inner
2911 .project_cache
2912 .get(item_scoping.scoping.project_key)
2913 .rate_limits()
2914 .merge(limits);
2915 }
2916 }
2917
2918 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
2919 Err(buckets) => buckets,
2920 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
2921 }
2922 }
2923
2924 #[cfg(feature = "processing")]
2926 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
2927 relay_log::trace!("handle_rate_limit_buckets");
2928
2929 let scoping = *bucket_limiter.scoping();
2930
2931 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
2932 let global_config = self.inner.global_config.current();
2933 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
2934
2935 let over_accept_once = true;
2938 let mut rate_limits = RateLimits::new();
2939
2940 for category in [DataCategory::Transaction, DataCategory::Span] {
2941 let count = bucket_limiter.count(category);
2942
2943 let timer = Instant::now();
2944 let mut is_limited = false;
2945
2946 if let Some(count) = count {
2947 match rate_limiter
2948 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
2949 .await
2950 {
2951 Ok(limits) => {
2952 is_limited = limits.is_limited();
2953 rate_limits.merge(limits)
2954 }
2955 Err(e) => relay_log::error!(error = &e as &dyn Error),
2956 }
2957 }
2958
2959 relay_statsd::metric!(
2960 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
2961 category = category.name(),
2962 limited = if is_limited { "true" } else { "false" },
2963 count = match count {
2964 None => "none",
2965 Some(0) => "0",
2966 Some(1) => "1",
2967 Some(1..=10) => "10",
2968 Some(1..=25) => "25",
2969 Some(1..=50) => "50",
2970 Some(51..=100) => "100",
2971 Some(101..=500) => "500",
2972 _ => "> 500",
2973 },
2974 );
2975 }
2976
2977 if rate_limits.is_limited() {
2978 let was_enforced =
2979 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
2980
2981 if was_enforced {
2982 self.inner
2984 .project_cache
2985 .get(scoping.project_key)
2986 .rate_limits()
2987 .merge(rate_limits);
2988 }
2989 }
2990 }
2991
2992 bucket_limiter.into_buckets()
2993 }
2994
2995 #[cfg(feature = "processing")]
2997 async fn cardinality_limit_buckets(
2998 &self,
2999 scoping: Scoping,
3000 limits: &[CardinalityLimit],
3001 buckets: Vec<Bucket>,
3002 ) -> Vec<Bucket> {
3003 let global_config = self.inner.global_config.current();
3004 let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
3005
3006 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
3007 return buckets;
3008 }
3009
3010 let Some(ref limiter) = self.inner.cardinality_limiter else {
3011 return buckets;
3012 };
3013
3014 let scope = relay_cardinality::Scoping {
3015 organization_id: scoping.organization_id,
3016 project_id: scoping.project_id,
3017 };
3018
3019 let limits = match limiter
3020 .check_cardinality_limits(scope, limits, buckets)
3021 .await
3022 {
3023 Ok(limits) => limits,
3024 Err((buckets, error)) => {
3025 relay_log::error!(
3026 error = &error as &dyn std::error::Error,
3027 "cardinality limiter failed"
3028 );
3029 return buckets;
3030 }
3031 };
3032
3033 let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
3034 if !limits.exceeded_limits().is_empty() && utils::sample(error_sample_rate).is_keep() {
3035 for limit in limits.exceeded_limits() {
3036 relay_log::with_scope(
3037 |scope| {
3038 scope.set_user(Some(relay_log::sentry::User {
3040 id: Some(scoping.organization_id.to_string()),
3041 ..Default::default()
3042 }));
3043 },
3044 || {
3045 relay_log::error!(
3046 tags.organization_id = scoping.organization_id.value(),
3047 tags.limit_id = limit.id,
3048 tags.passive = limit.passive,
3049 "Cardinality Limit"
3050 );
3051 },
3052 );
3053 }
3054 }
3055
3056 for (limit, reports) in limits.cardinality_reports() {
3057 for report in reports {
3058 self.inner
3059 .metric_outcomes
3060 .cardinality(scoping, limit, report);
3061 }
3062 }
3063
3064 if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
3065 return limits.into_source();
3066 }
3067
3068 let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();
3069
3070 for (bucket, exceeded) in rejected {
3071 self.inner.metric_outcomes.track(
3072 scoping,
3073 &[bucket],
3074 Outcome::CardinalityLimited(exceeded.id.clone()),
3075 );
3076 }
3077 accepted
3078 }
3079
3080 #[cfg(feature = "processing")]
3087 async fn encode_metrics_processing(
3088 &self,
3089 message: FlushBuckets,
3090 store_forwarder: &Addr<Store>,
3091 ) {
3092 use crate::constants::DEFAULT_EVENT_RETENTION;
3093 use crate::services::store::StoreMetrics;
3094
3095 for ProjectBuckets {
3096 buckets,
3097 scoping,
3098 project_info,
3099 ..
3100 } in message.buckets.into_values()
3101 {
3102 let buckets = self
3103 .rate_limit_buckets(scoping, &project_info, buckets)
3104 .await;
3105
3106 let limits = project_info.get_cardinality_limits();
3107 let buckets = self
3108 .cardinality_limit_buckets(scoping, limits, buckets)
3109 .await;
3110
3111 if buckets.is_empty() {
3112 continue;
3113 }
3114
3115 let retention = project_info
3116 .config
3117 .event_retention
3118 .unwrap_or(DEFAULT_EVENT_RETENTION);
3119
3120 store_forwarder.send(StoreMetrics {
3123 buckets,
3124 scoping,
3125 retention,
3126 });
3127 }
3128 }
3129
3130 fn encode_metrics_envelope(&self, cogs: &mut Token, message: FlushBuckets) {
3142 let FlushBuckets {
3143 partition_key,
3144 buckets,
3145 } = message;
3146
3147 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
3148 let upstream = self.inner.config.upstream_descriptor();
3149
3150 for ProjectBuckets {
3151 buckets, scoping, ..
3152 } in buckets.values()
3153 {
3154 let dsn = PartialDsn::outbound(scoping, upstream);
3155
3156 relay_statsd::metric!(
3157 histogram(RelayHistograms::PartitionKeys) = u64::from(partition_key)
3158 );
3159
3160 let mut num_batches = 0;
3161 for batch in BucketsView::from(buckets).by_size(batch_size) {
3162 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
3163
3164 let mut item = Item::new(ItemType::MetricBuckets);
3165 item.set_source_quantities(crate::metrics::extract_quantities(batch));
3166 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
3167 envelope.add_item(item);
3168
3169 let mut envelope = ManagedEnvelope::new(
3170 envelope,
3171 self.inner.addrs.outcome_aggregator.clone(),
3172 self.inner.addrs.test_store.clone(),
3173 );
3174 envelope
3175 .set_partition_key(Some(partition_key))
3176 .scope(*scoping);
3177
3178 relay_statsd::metric!(
3179 histogram(RelayHistograms::BucketsPerBatch) = batch.len() as u64
3180 );
3181
3182 self.submit_upstream(cogs, Submit::Envelope(envelope.into_processed()));
3183 num_batches += 1;
3184 }
3185
3186 relay_statsd::metric!(histogram(RelayHistograms::BatchesPerPartition) = num_batches);
3187 }
3188 }
3189
3190 fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
3192 if partition.is_empty() {
3193 return;
3194 }
3195
3196 let (unencoded, project_info) = partition.take();
3197 let http_encoding = self.inner.config.http_encoding();
3198 let encoded = match encode_payload(&unencoded, http_encoding) {
3199 Ok(payload) => payload,
3200 Err(error) => {
3201 let error = &error as &dyn std::error::Error;
3202 relay_log::error!(error, "failed to encode metrics payload");
3203 return;
3204 }
3205 };
3206
3207 let request = SendMetricsRequest {
3208 partition_key: partition_key.to_string(),
3209 unencoded,
3210 encoded,
3211 project_info,
3212 http_encoding,
3213 metric_outcomes: self.inner.metric_outcomes.clone(),
3214 };
3215
3216 self.inner.addrs.upstream_relay.send(SendRequest(request));
3217 }
3218
3219 fn encode_metrics_global(&self, message: FlushBuckets) {
3234 let FlushBuckets {
3235 partition_key,
3236 buckets,
3237 } = message;
3238
3239 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
3240 let mut partition = Partition::new(batch_size);
3241 let mut partition_splits = 0;
3242
3243 for ProjectBuckets {
3244 buckets, scoping, ..
3245 } in buckets.values()
3246 {
3247 for bucket in buckets {
3248 let mut remaining = Some(BucketView::new(bucket));
3249
3250 while let Some(bucket) = remaining.take() {
3251 if let Some(next) = partition.insert(bucket, *scoping) {
3252 self.send_global_partition(partition_key, &mut partition);
3256 remaining = Some(next);
3257 partition_splits += 1;
3258 }
3259 }
3260 }
3261 }
3262
3263 if partition_splits > 0 {
3264 metric!(histogram(RelayHistograms::PartitionSplits) = partition_splits);
3265 }
3266
3267 self.send_global_partition(partition_key, &mut partition);
3268 }
3269
3270 async fn handle_flush_buckets(&self, cogs: &mut Token, mut message: FlushBuckets) {
3271 for (project_key, pb) in message.buckets.iter_mut() {
3272 let buckets = std::mem::take(&mut pb.buckets);
3273 pb.buckets =
3274 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
3275 }
3276
3277 #[cfg(feature = "processing")]
3278 if self.inner.config.processing_enabled() {
3279 if let Some(ref store_forwarder) = self.inner.addrs.store_forwarder {
3280 return self
3281 .encode_metrics_processing(message, store_forwarder)
3282 .await;
3283 }
3284 }
3285
3286 if self.inner.config.http_global_metrics() {
3287 self.encode_metrics_global(message)
3288 } else {
3289 self.encode_metrics_envelope(cogs, message)
3290 }
3291 }
3292
3293 #[cfg(all(test, feature = "processing"))]
3294 fn redis_rate_limiter_enabled(&self) -> bool {
3295 self.inner.rate_limiter.is_some()
3296 }
3297
3298 async fn handle_message(&self, message: EnvelopeProcessor) {
3299 let ty = message.variant();
3300 let feature_weights = self.feature_weights(&message);
3301
3302 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
3303 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
3304
3305 match message {
3306 EnvelopeProcessor::ProcessEnvelope(m) => {
3307 self.handle_process_envelope(&mut cogs, *m).await
3308 }
3309 EnvelopeProcessor::ProcessProjectMetrics(m) => {
3310 self.handle_process_metrics(&mut cogs, *m)
3311 }
3312 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
3313 self.handle_process_batched_metrics(&mut cogs, *m)
3314 }
3315 EnvelopeProcessor::FlushBuckets(m) => {
3316 self.handle_flush_buckets(&mut cogs, *m).await
3317 }
3318 EnvelopeProcessor::SubmitClientReports(m) => {
3319 self.handle_submit_client_reports(&mut cogs, *m)
3320 }
3321 }
3322 });
3323 }
3324
3325 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
3326 match message {
3327 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
3329 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
3330 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
3331 EnvelopeProcessor::FlushBuckets(v) => v
3332 .buckets
3333 .values()
3334 .map(|s| {
3335 if self.inner.config.processing_enabled() {
3336 relay_metrics::cogs::ByCount(&s.buckets).into()
3339 } else {
3340 relay_metrics::cogs::BySize(&s.buckets).into()
3341 }
3342 })
3343 .fold(FeatureWeights::none(), FeatureWeights::merge),
3344 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
3345 }
3346 }
3347
3348 fn new_reservoir_evaluator(
3349 &self,
3350 _organization_id: OrganizationId,
3351 reservoir_counters: ReservoirCounters,
3352 ) -> ReservoirEvaluator {
3353 #[cfg_attr(not(feature = "processing"), expect(unused_mut))]
3354 let mut reservoir = ReservoirEvaluator::new(reservoir_counters);
3355
3356 #[cfg(feature = "processing")]
3357 if let Some(quotas_client) = self.inner.quotas_client.as_ref() {
3358 reservoir.set_redis(_organization_id, quotas_client);
3359 }
3360
3361 reservoir
3362 }
3363}
3364
3365impl Service for EnvelopeProcessorService {
3366 type Interface = EnvelopeProcessor;
3367
3368 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
3369 while let Some(message) = rx.recv().await {
3370 let service = self.clone();
3371 self.inner
3372 .pool
3373 .spawn_async(
3374 async move {
3375 service.handle_message(message).await;
3376 }
3377 .boxed(),
3378 )
3379 .await;
3380 }
3381 }
3382}
3383
3384struct EnforcementResult {
3389 event: Annotated<Event>,
3390 #[cfg_attr(not(feature = "processing"), expect(dead_code))]
3391 rate_limits: RateLimits,
3392}
3393
3394impl EnforcementResult {
3395 pub fn new(event: Annotated<Event>, rate_limits: RateLimits) -> Self {
3397 Self { event, rate_limits }
3398 }
3399}
3400
3401#[derive(Clone)]
3402enum RateLimiter {
3403 Cached,
3404 #[cfg(feature = "processing")]
3405 Consistent(Arc<RedisRateLimiter<GlobalRateLimitsServiceHandle>>),
3406}
3407
3408impl RateLimiter {
3409 async fn enforce<Group>(
3410 &self,
3411 managed_envelope: &mut TypedEnvelope<Group>,
3412 event: Annotated<Event>,
3413 _extracted_metrics: &mut ProcessingExtractedMetrics,
3414 global_config: &GlobalConfig,
3415 project_info: &ProjectInfo,
3416 rate_limits: &RateLimits,
3417 ) -> Result<EnforcementResult, ProcessingError> {
3418 if managed_envelope.envelope().is_empty() && event.value().is_none() {
3419 return Ok(EnforcementResult::new(event, RateLimits::default()));
3420 }
3421
3422 let quotas = CombinedQuotas::new(global_config, project_info.get_quotas());
3423 if quotas.is_empty() {
3424 return Ok(EnforcementResult::new(event, RateLimits::default()));
3425 }
3426
3427 let event_category = event_category(&event);
3428
3429 let this = self.clone();
3435 let rate_limits_clone = rate_limits.clone();
3436 let mut envelope_limiter =
3437 EnvelopeLimiter::new(CheckLimits::All, move |item_scope, _quantity| {
3438 let this = this.clone();
3439 let rate_limits_clone = rate_limits_clone.clone();
3440
3441 async move {
3442 match this {
3443 #[cfg(feature = "processing")]
3444 RateLimiter::Consistent(rate_limiter) => Ok::<_, ProcessingError>(
3445 rate_limiter
3446 .is_rate_limited(quotas, item_scope, _quantity, false)
3447 .await?,
3448 ),
3449 _ => Ok::<_, ProcessingError>(
3450 rate_limits_clone.check_with_quotas(quotas, item_scope),
3451 ),
3452 }
3453 }
3454 });
3455
3456 if let Some(category) = event_category {
3459 envelope_limiter.assume_event(category);
3460 }
3461
3462 let scoping = managed_envelope.scoping();
3463 let (enforcement, rate_limits) =
3464 metric!(timer(RelayTimers::EventProcessingRateLimiting), {
3465 envelope_limiter
3466 .compute(managed_envelope.envelope_mut(), &scoping)
3467 .await
3468 })?;
3469 let event_active = enforcement.is_event_active();
3470
3471 #[cfg(feature = "processing")]
3475 _extracted_metrics.apply_enforcement(&enforcement, matches!(self, Self::Consistent(_)));
3476 enforcement.apply_with_outcomes(managed_envelope);
3477
3478 if event_active {
3479 debug_assert!(managed_envelope.envelope().is_empty());
3480 return Ok(EnforcementResult::new(Annotated::empty(), rate_limits));
3481 }
3482
3483 Ok(EnforcementResult::new(event, rate_limits))
3484 }
3485}
3486
3487fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
3488 let envelope_body: Vec<u8> = match http_encoding {
3489 HttpEncoding::Identity => return Ok(body.clone()),
3490 HttpEncoding::Deflate => {
3491 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
3492 encoder.write_all(body.as_ref())?;
3493 encoder.finish()?
3494 }
3495 HttpEncoding::Gzip => {
3496 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
3497 encoder.write_all(body.as_ref())?;
3498 encoder.finish()?
3499 }
3500 HttpEncoding::Br => {
3501 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
3503 encoder.write_all(body.as_ref())?;
3504 encoder.into_inner()
3505 }
3506 HttpEncoding::Zstd => {
3507 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
3510 encoder.write_all(body.as_ref())?;
3511 encoder.finish()?
3512 }
3513 };
3514
3515 Ok(envelope_body.into())
3516}
3517
3518#[derive(Debug)]
3520pub struct SendEnvelope {
3521 envelope: TypedEnvelope<Processed>,
3522 body: Bytes,
3523 http_encoding: HttpEncoding,
3524 project_cache: ProjectCacheHandle,
3525}
3526
3527impl UpstreamRequest for SendEnvelope {
3528 fn method(&self) -> reqwest::Method {
3529 reqwest::Method::POST
3530 }
3531
3532 fn path(&self) -> Cow<'_, str> {
3533 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
3534 }
3535
3536 fn route(&self) -> &'static str {
3537 "envelope"
3538 }
3539
3540 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3541 let envelope_body = self.body.clone();
3542 metric!(histogram(RelayHistograms::UpstreamEnvelopeBodySize) = envelope_body.len() as u64);
3543
3544 let meta = &self.envelope.meta();
3545 let shard = self.envelope.partition_key().map(|p| p.to_string());
3546 builder
3547 .content_encoding(self.http_encoding)
3548 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
3549 .header_opt("User-Agent", meta.user_agent())
3550 .header("X-Sentry-Auth", meta.auth_header())
3551 .header("X-Forwarded-For", meta.forwarded_for())
3552 .header("Content-Type", envelope::CONTENT_TYPE)
3553 .header_opt("X-Sentry-Relay-Shard", shard)
3554 .body(envelope_body);
3555
3556 Ok(())
3557 }
3558
3559 fn sign(&mut self) -> Option<Sign> {
3560 Some(Sign::Optional(SignatureType::RequestSign))
3561 }
3562
3563 fn respond(
3564 self: Box<Self>,
3565 result: Result<http::Response, UpstreamRequestError>,
3566 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3567 Box::pin(async move {
3568 let result = match result {
3569 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
3570 Err(error) => Err(error),
3571 };
3572
3573 match result {
3574 Ok(()) => self.envelope.accept(),
3575 Err(error) if error.is_received() => {
3576 let scoping = self.envelope.scoping();
3577 self.envelope.accept();
3578
3579 if let UpstreamRequestError::RateLimited(limits) = error {
3580 self.project_cache
3581 .get(scoping.project_key)
3582 .rate_limits()
3583 .merge(limits.scope(&scoping));
3584 }
3585 }
3586 Err(error) => {
3587 let mut envelope = self.envelope;
3590 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
3591 relay_log::error!(
3592 error = &error as &dyn Error,
3593 tags.project_key = %envelope.scoping().project_key,
3594 "error sending envelope"
3595 );
3596 }
3597 }
3598 })
3599 }
3600}
3601
3602#[derive(Debug)]
3609struct Partition<'a> {
3610 max_size: usize,
3611 remaining: usize,
3612 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
3613 project_info: HashMap<ProjectKey, Scoping>,
3614}
3615
3616impl<'a> Partition<'a> {
3617 pub fn new(size: usize) -> Self {
3619 Self {
3620 max_size: size,
3621 remaining: size,
3622 views: HashMap::new(),
3623 project_info: HashMap::new(),
3624 }
3625 }
3626
3627 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
3638 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
3639
3640 if let Some(current) = current {
3641 self.remaining = self.remaining.saturating_sub(current.estimated_size());
3642 self.views
3643 .entry(scoping.project_key)
3644 .or_default()
3645 .push(current);
3646
3647 self.project_info
3648 .entry(scoping.project_key)
3649 .or_insert(scoping);
3650 }
3651
3652 next
3653 }
3654
3655 fn is_empty(&self) -> bool {
3657 self.views.is_empty()
3658 }
3659
3660 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
3664 #[derive(serde::Serialize)]
3665 struct Wrapper<'a> {
3666 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
3667 }
3668
3669 let buckets = &self.views;
3670 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
3671
3672 let scopings = self.project_info.clone();
3673 self.project_info.clear();
3674
3675 self.views.clear();
3676 self.remaining = self.max_size;
3677
3678 (payload, scopings)
3679 }
3680}
3681
3682#[derive(Debug)]
3686struct SendMetricsRequest {
3687 partition_key: String,
3689 unencoded: Bytes,
3691 encoded: Bytes,
3693 project_info: HashMap<ProjectKey, Scoping>,
3697 http_encoding: HttpEncoding,
3699 metric_outcomes: MetricOutcomes,
3701}
3702
3703impl SendMetricsRequest {
3704 fn create_error_outcomes(self) {
3705 #[derive(serde::Deserialize)]
3706 struct Wrapper {
3707 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
3708 }
3709
3710 let buckets = match serde_json::from_slice(&self.unencoded) {
3711 Ok(Wrapper { buckets }) => buckets,
3712 Err(err) => {
3713 relay_log::error!(
3714 error = &err as &dyn std::error::Error,
3715 "failed to parse buckets from failed transmission"
3716 );
3717 return;
3718 }
3719 };
3720
3721 for (key, buckets) in buckets {
3722 let Some(&scoping) = self.project_info.get(&key) else {
3723 relay_log::error!("missing scoping for project key");
3724 continue;
3725 };
3726
3727 self.metric_outcomes.track(
3728 scoping,
3729 &buckets,
3730 Outcome::Invalid(DiscardReason::Internal),
3731 );
3732 }
3733 }
3734}
3735
3736impl UpstreamRequest for SendMetricsRequest {
3737 fn set_relay_id(&self) -> bool {
3738 true
3739 }
3740
3741 fn sign(&mut self) -> Option<Sign> {
3742 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
3743 }
3744
3745 fn method(&self) -> reqwest::Method {
3746 reqwest::Method::POST
3747 }
3748
3749 fn path(&self) -> Cow<'_, str> {
3750 "/api/0/relays/metrics/".into()
3751 }
3752
3753 fn route(&self) -> &'static str {
3754 "global_metrics"
3755 }
3756
3757 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
3758 metric!(histogram(RelayHistograms::UpstreamMetricsBodySize) = self.encoded.len() as u64);
3759
3760 builder
3761 .content_encoding(self.http_encoding)
3762 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
3763 .header(header::CONTENT_TYPE, b"application/json")
3764 .body(self.encoded.clone());
3765
3766 Ok(())
3767 }
3768
3769 fn respond(
3770 self: Box<Self>,
3771 result: Result<http::Response, UpstreamRequestError>,
3772 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
3773 Box::pin(async {
3774 match result {
3775 Ok(mut response) => {
3776 response.consume().await.ok();
3777 }
3778 Err(error) => {
3779 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
3780
3781 if error.is_received() {
3784 return;
3785 }
3786
3787 self.create_error_outcomes()
3788 }
3789 }
3790 })
3791 }
3792}
3793
3794#[derive(Copy, Clone, Debug)]
3796struct CombinedQuotas<'a> {
3797 global_quotas: &'a [Quota],
3798 project_quotas: &'a [Quota],
3799}
3800
3801impl<'a> CombinedQuotas<'a> {
3802 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
3804 Self {
3805 global_quotas: &global_config.quotas,
3806 project_quotas,
3807 }
3808 }
3809
3810 pub fn is_empty(&self) -> bool {
3812 self.len() == 0
3813 }
3814
3815 pub fn len(&self) -> usize {
3817 self.global_quotas.len() + self.project_quotas.len()
3818 }
3819}
3820
3821impl<'a> IntoIterator for CombinedQuotas<'a> {
3822 type Item = &'a Quota;
3823 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
3824
3825 fn into_iter(self) -> Self::IntoIter {
3826 self.global_quotas.iter().chain(self.project_quotas.iter())
3827 }
3828}
3829
3830#[cfg(test)]
3831mod tests {
3832 use std::collections::BTreeMap;
3833 use std::env;
3834
3835 use insta::assert_debug_snapshot;
3836 use relay_base_schema::metrics::{DurationUnit, MetricUnit};
3837 use relay_common::glob2::LazyGlob;
3838 use relay_dynamic_config::ProjectConfig;
3839 use relay_event_normalization::{RedactionRule, TransactionNameRule};
3840 use relay_event_schema::protocol::TransactionSource;
3841 use relay_pii::DataScrubbingConfig;
3842 use similar_asserts::assert_eq;
3843
3844 use crate::metrics_extraction::IntoMetric;
3845 use crate::metrics_extraction::transactions::types::{
3846 CommonTags, TransactionMeasurementTags, TransactionMetric,
3847 };
3848 use crate::testutils::{self, create_test_processor, create_test_processor_with_addrs};
3849
3850 #[cfg(feature = "processing")]
3851 use {
3852 relay_metrics::BucketValue,
3853 relay_quotas::{QuotaScope, ReasonCode},
3854 relay_test::mock_service,
3855 };
3856
3857 use super::*;
3858
3859 #[cfg(feature = "processing")]
3860 fn mock_quota(id: &str) -> Quota {
3861 Quota {
3862 id: Some(id.into()),
3863 categories: smallvec::smallvec![DataCategory::MetricBucket],
3864 scope: QuotaScope::Organization,
3865 scope_id: None,
3866 limit: Some(0),
3867 window: None,
3868 reason_code: None,
3869 namespace: None,
3870 }
3871 }
3872
3873 #[cfg(feature = "processing")]
3874 #[test]
3875 fn test_dynamic_quotas() {
3876 let global_config = GlobalConfig {
3877 quotas: vec![mock_quota("foo"), mock_quota("bar")],
3878 ..Default::default()
3879 };
3880
3881 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
3882
3883 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
3884
3885 assert_eq!(dynamic_quotas.len(), 4);
3886 assert!(!dynamic_quotas.is_empty());
3887
3888 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
3889 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
3890 }
3891
3892 #[cfg(feature = "processing")]
3895 #[tokio::test]
3896 async fn test_ratelimit_per_batch() {
3897 use relay_base_schema::organization::OrganizationId;
3898 use relay_protocol::FiniteF64;
3899
3900 let rate_limited_org = Scoping {
3901 organization_id: OrganizationId::new(1),
3902 project_id: ProjectId::new(21),
3903 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
3904 key_id: Some(17),
3905 };
3906
3907 let not_rate_limited_org = Scoping {
3908 organization_id: OrganizationId::new(2),
3909 project_id: ProjectId::new(21),
3910 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
3911 key_id: Some(17),
3912 };
3913
3914 let message = {
3915 let project_info = {
3916 let quota = Quota {
3917 id: Some("testing".into()),
3918 categories: vec![DataCategory::MetricBucket].into(),
3919 scope: relay_quotas::QuotaScope::Organization,
3920 scope_id: Some(rate_limited_org.organization_id.to_string()),
3921 limit: Some(0),
3922 window: None,
3923 reason_code: Some(ReasonCode::new("test")),
3924 namespace: None,
3925 };
3926
3927 let mut config = ProjectConfig::default();
3928 config.quotas.push(quota);
3929
3930 Arc::new(ProjectInfo {
3931 config,
3932 ..Default::default()
3933 })
3934 };
3935
3936 let project_metrics = |scoping| ProjectBuckets {
3937 buckets: vec![Bucket {
3938 name: "d:transactions/bar".into(),
3939 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
3940 timestamp: UnixTimestamp::now(),
3941 tags: Default::default(),
3942 width: 10,
3943 metadata: BucketMetadata::default(),
3944 }],
3945 rate_limits: Default::default(),
3946 project_info: project_info.clone(),
3947 scoping,
3948 };
3949
3950 let buckets = hashbrown::HashMap::from([
3951 (
3952 rate_limited_org.project_key,
3953 project_metrics(rate_limited_org),
3954 ),
3955 (
3956 not_rate_limited_org.project_key,
3957 project_metrics(not_rate_limited_org),
3958 ),
3959 ]);
3960
3961 FlushBuckets {
3962 partition_key: 0,
3963 buckets,
3964 }
3965 };
3966
3967 assert_eq!(message.buckets.keys().count(), 2);
3969
3970 let config = {
3971 let config_json = serde_json::json!({
3972 "processing": {
3973 "enabled": true,
3974 "kafka_config": [],
3975 "redis": {
3976 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
3977 }
3978 }
3979 });
3980 Config::from_json_value(config_json).unwrap()
3981 };
3982
3983 let (store, handle) = {
3984 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
3985 let org_id = match msg {
3986 Store::Metrics(x) => x.scoping.organization_id,
3987 _ => panic!("received envelope when expecting only metrics"),
3988 };
3989 org_ids.push(org_id);
3990 };
3991
3992 mock_service("store_forwarder", vec![], f)
3993 };
3994
3995 let processor = create_test_processor(config).await;
3996 assert!(processor.redis_rate_limiter_enabled());
3997
3998 processor.encode_metrics_processing(message, &store).await;
3999
4000 drop(store);
4001 let orgs_not_ratelimited = handle.await.unwrap();
4002
4003 assert_eq!(
4004 orgs_not_ratelimited,
4005 vec![not_rate_limited_org.organization_id]
4006 );
4007 }
4008
4009 #[tokio::test]
4010 async fn test_browser_version_extraction_with_pii_like_data() {
4011 let processor = create_test_processor(Default::default()).await;
4012 let (outcome_aggregator, test_store) = testutils::processor_services();
4013 let event_id = EventId::new();
4014
4015 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
4016 .parse()
4017 .unwrap();
4018
4019 let request_meta = RequestMeta::new(dsn);
4020 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
4021
4022 envelope.add_item({
4023 let mut item = Item::new(ItemType::Event);
4024 item.set_payload(
4025 ContentType::Json,
4026 r#"
4027 {
4028 "request": {
4029 "headers": [
4030 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
4031 ]
4032 }
4033 }
4034 "#,
4035 );
4036 item
4037 });
4038
4039 let mut datascrubbing_settings = DataScrubbingConfig::default();
4040 datascrubbing_settings.scrub_data = true;
4042 datascrubbing_settings.scrub_defaults = true;
4043 datascrubbing_settings.scrub_ip_addresses = true;
4044
4045 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
4047
4048 let config = ProjectConfig {
4049 datascrubbing_settings,
4050 pii_config: Some(pii_config),
4051 ..Default::default()
4052 };
4053
4054 let project_info = ProjectInfo {
4055 config,
4056 ..Default::default()
4057 };
4058
4059 let mut envelopes = ProcessingGroup::split_envelope(*envelope);
4060 assert_eq!(envelopes.len(), 1);
4061
4062 let (group, envelope) = envelopes.pop().unwrap();
4063 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store);
4064
4065 let message = ProcessEnvelopeGrouped {
4066 group,
4067 envelope,
4068 project_info: Arc::new(project_info),
4069 rate_limits: Default::default(),
4070 sampling_project_info: None,
4071 reservoir_counters: ReservoirCounters::default(),
4072 };
4073
4074 let Ok(Some(Submit::Envelope(mut new_envelope))) =
4075 processor.process(&mut Token::noop(), message).await
4076 else {
4077 panic!();
4078 };
4079 let new_envelope = new_envelope.envelope_mut();
4080
4081 let event_item = new_envelope.items().last().unwrap();
4082 let annotated_event: Annotated<Event> =
4083 Annotated::from_json_bytes(&event_item.payload()).unwrap();
4084 let event = annotated_event.into_value().unwrap();
4085 let headers = event
4086 .request
4087 .into_value()
4088 .unwrap()
4089 .headers
4090 .into_value()
4091 .unwrap();
4092
4093 assert_eq!(
4095 Some(
4096 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
4097 ),
4098 headers.get_header("User-Agent")
4099 );
4100 let contexts = event.contexts.into_value().unwrap();
4102 let browser = contexts.0.get("browser").unwrap();
4103 assert_eq!(
4104 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
4105 browser.to_json().unwrap()
4106 );
4107 }
4108
4109 #[tokio::test]
4110 #[cfg(feature = "processing")]
4111 async fn test_materialize_dsc() {
4112 use crate::services::projects::project::PublicKeyConfig;
4113
4114 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
4115 .parse()
4116 .unwrap();
4117 let request_meta = RequestMeta::new(dsn);
4118 let mut envelope = Envelope::from_request(None, request_meta);
4119
4120 let dsc = r#"{
4121 "trace_id": "00000000-0000-0000-0000-000000000000",
4122 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
4123 "sample_rate": "0.2"
4124 }"#;
4125 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
4126
4127 let mut item = Item::new(ItemType::Event);
4128 item.set_payload(ContentType::Json, r#"{}"#);
4129 envelope.add_item(item);
4130
4131 let (outcome_aggregator, test_store) = testutils::processor_services();
4132 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator, test_store);
4133
4134 let mut project_info = ProjectInfo::default();
4135 project_info.public_keys.push(PublicKeyConfig {
4136 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
4137 numeric_id: Some(1),
4138 });
4139 let project_info = Arc::new(project_info);
4140
4141 let message = ProcessEnvelopeGrouped {
4142 group: ProcessingGroup::Transaction,
4143 envelope: managed_envelope,
4144 project_info: project_info.clone(),
4145 rate_limits: Default::default(),
4146 sampling_project_info: Some(project_info),
4147 reservoir_counters: ReservoirCounters::default(),
4148 };
4149
4150 let config = Config::from_json_value(serde_json::json!({
4151 "processing": {
4152 "enabled": true,
4153 "kafka_config": [],
4154 }
4155 }))
4156 .unwrap();
4157
4158 let processor = create_test_processor(config).await;
4159 let Ok(Some(Submit::Envelope(envelope))) =
4160 processor.process(&mut Token::noop(), message).await
4161 else {
4162 panic!();
4163 };
4164 let event = envelope
4165 .envelope()
4166 .get_item_by(|item| item.ty() == &ItemType::Event)
4167 .unwrap();
4168
4169 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
4170 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r#"
4171 Object(
4172 {
4173 "environment": ~,
4174 "public_key": String(
4175 "e12d836b15bb49d7bbf99e64295d995b",
4176 ),
4177 "release": ~,
4178 "replay_id": ~,
4179 "sample_rate": String(
4180 "0.2",
4181 ),
4182 "trace_id": String(
4183 "00000000000000000000000000000000",
4184 ),
4185 "transaction": ~,
4186 },
4187 )
4188 "#);
4189 }
4190
4191 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
4192 let mut event = Annotated::<Event>::from_json(
4193 r#"
4194 {
4195 "type": "transaction",
4196 "transaction": "/foo/",
4197 "timestamp": 946684810.0,
4198 "start_timestamp": 946684800.0,
4199 "contexts": {
4200 "trace": {
4201 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
4202 "span_id": "fa90fdead5f74053",
4203 "op": "http.server",
4204 "type": "trace"
4205 }
4206 },
4207 "transaction_info": {
4208 "source": "url"
4209 }
4210 }
4211 "#,
4212 )
4213 .unwrap();
4214 let e = event.value_mut().as_mut().unwrap();
4215 e.transaction.set_value(Some(transaction_name.into()));
4216
4217 e.transaction_info
4218 .value_mut()
4219 .as_mut()
4220 .unwrap()
4221 .source
4222 .set_value(Some(source));
4223
4224 relay_statsd::with_capturing_test_client(|| {
4225 utils::log_transaction_name_metrics(&mut event, |event| {
4226 let config = NormalizationConfig {
4227 transaction_name_config: TransactionNameConfig {
4228 rules: &[TransactionNameRule {
4229 pattern: LazyGlob::new("/foo/*/**".to_owned()),
4230 expiry: DateTime::<Utc>::MAX_UTC,
4231 redaction: RedactionRule::Replace {
4232 substitution: "*".to_owned(),
4233 },
4234 }],
4235 },
4236 ..Default::default()
4237 };
4238 normalize_event(event, &config)
4239 });
4240 })
4241 }
4242
4243 #[test]
4244 fn test_log_transaction_metrics_none() {
4245 let captures = capture_test_event("/nothing", TransactionSource::Url);
4246 insta::assert_debug_snapshot!(captures, @r#"
4247 [
4248 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
4249 ]
4250 "#);
4251 }
4252
4253 #[test]
4254 fn test_log_transaction_metrics_rule() {
4255 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
4256 insta::assert_debug_snapshot!(captures, @r#"
4257 [
4258 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
4259 ]
4260 "#);
4261 }
4262
4263 #[test]
4264 fn test_log_transaction_metrics_pattern() {
4265 let captures = capture_test_event("/something/12345", TransactionSource::Url);
4266 insta::assert_debug_snapshot!(captures, @r#"
4267 [
4268 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
4269 ]
4270 "#);
4271 }
4272
4273 #[test]
4274 fn test_log_transaction_metrics_both() {
4275 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
4276 insta::assert_debug_snapshot!(captures, @r#"
4277 [
4278 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
4279 ]
4280 "#);
4281 }
4282
4283 #[test]
4284 fn test_log_transaction_metrics_no_match() {
4285 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
4286 insta::assert_debug_snapshot!(captures, @r#"
4287 [
4288 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
4289 ]
4290 "#);
4291 }
4292
4293 #[test]
4297 fn test_mri_overhead_constant() {
4298 let hardcoded_value = MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD;
4299
4300 let derived_value = {
4301 let name = "foobar".to_owned();
4302 let value = 5.into(); let unit = MetricUnit::Duration(DurationUnit::default());
4304 let tags = TransactionMeasurementTags {
4305 measurement_rating: None,
4306 universal_tags: CommonTags(BTreeMap::new()),
4307 score_profile_version: None,
4308 };
4309
4310 let measurement = TransactionMetric::Measurement {
4311 name: name.clone(),
4312 value,
4313 unit,
4314 tags,
4315 };
4316
4317 let metric: Bucket = measurement.into_metric(UnixTimestamp::now());
4318 metric.name.len() - unit.to_string().len() - name.len()
4319 };
4320 assert_eq!(
4321 hardcoded_value, derived_value,
4322 "Update `MEASUREMENT_MRI_OVERHEAD` if the naming scheme changed."
4323 );
4324 }
4325
4326 #[tokio::test]
4327 async fn test_process_metrics_bucket_metadata() {
4328 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
4329 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
4330 let received_at = Utc::now();
4331 let config = Config::default();
4332
4333 let (aggregator, mut aggregator_rx) = Addr::custom();
4334 let processor = create_test_processor_with_addrs(
4335 config,
4336 Addrs {
4337 aggregator,
4338 ..Default::default()
4339 },
4340 )
4341 .await;
4342
4343 let mut item = Item::new(ItemType::Statsd);
4344 item.set_payload(
4345 ContentType::Text,
4346 "transactions/foo:3182887624:4267882815|s",
4347 );
4348 for (source, expected_received_at) in [
4349 (
4350 BucketSource::External,
4351 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
4352 ),
4353 (BucketSource::Internal, None),
4354 ] {
4355 let message = ProcessMetrics {
4356 data: MetricData::Raw(vec![item.clone()]),
4357 project_key,
4358 source,
4359 received_at,
4360 sent_at: Some(Utc::now()),
4361 };
4362 processor.handle_process_metrics(&mut token, message);
4363
4364 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
4365 let buckets = merge_buckets.buckets;
4366 assert_eq!(buckets.len(), 1);
4367 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
4368 }
4369 }
4370
4371 #[tokio::test]
4372 async fn test_process_batched_metrics() {
4373 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
4374 let received_at = Utc::now();
4375 let config = Config::default();
4376
4377 let (aggregator, mut aggregator_rx) = Addr::custom();
4378 let processor = create_test_processor_with_addrs(
4379 config,
4380 Addrs {
4381 aggregator,
4382 ..Default::default()
4383 },
4384 )
4385 .await;
4386
4387 let payload = r#"{
4388 "buckets": {
4389 "11111111111111111111111111111111": [
4390 {
4391 "timestamp": 1615889440,
4392 "width": 0,
4393 "name": "d:custom/endpoint.response_time@millisecond",
4394 "type": "d",
4395 "value": [
4396 68.0
4397 ],
4398 "tags": {
4399 "route": "user_index"
4400 }
4401 }
4402 ],
4403 "22222222222222222222222222222222": [
4404 {
4405 "timestamp": 1615889440,
4406 "width": 0,
4407 "name": "d:custom/endpoint.cache_rate@none",
4408 "type": "d",
4409 "value": [
4410 36.0
4411 ]
4412 }
4413 ]
4414 }
4415}
4416"#;
4417 let message = ProcessBatchedMetrics {
4418 payload: Bytes::from(payload),
4419 source: BucketSource::Internal,
4420 received_at,
4421 sent_at: Some(Utc::now()),
4422 };
4423 processor.handle_process_batched_metrics(&mut token, message);
4424
4425 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
4426 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
4427
4428 let mut messages = vec![mb1, mb2];
4429 messages.sort_by_key(|mb| mb.project_key);
4430
4431 let actual = messages
4432 .into_iter()
4433 .map(|mb| (mb.project_key, mb.buckets))
4434 .collect::<Vec<_>>();
4435
4436 assert_debug_snapshot!(actual, @r###"
4437 [
4438 (
4439 ProjectKey("11111111111111111111111111111111"),
4440 [
4441 Bucket {
4442 timestamp: UnixTimestamp(1615889440),
4443 width: 0,
4444 name: MetricName(
4445 "d:custom/endpoint.response_time@millisecond",
4446 ),
4447 value: Distribution(
4448 [
4449 68.0,
4450 ],
4451 ),
4452 tags: {
4453 "route": "user_index",
4454 },
4455 metadata: BucketMetadata {
4456 merges: 1,
4457 received_at: None,
4458 extracted_from_indexed: false,
4459 },
4460 },
4461 ],
4462 ),
4463 (
4464 ProjectKey("22222222222222222222222222222222"),
4465 [
4466 Bucket {
4467 timestamp: UnixTimestamp(1615889440),
4468 width: 0,
4469 name: MetricName(
4470 "d:custom/endpoint.cache_rate@none",
4471 ),
4472 value: Distribution(
4473 [
4474 36.0,
4475 ],
4476 ),
4477 tags: {},
4478 metadata: BucketMetadata {
4479 merges: 1,
4480 received_at: None,
4481 extracted_from_indexed: false,
4482 },
4483 },
4484 ],
4485 ),
4486 ]
4487 "###);
4488 }
4489}