1use std::borrow::Cow;
2use std::collections::{BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding};
23use relay_dynamic_config::Feature;
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{ClientReport, Event, EventId, SpanV2};
27use relay_filter::FilterStatKey;
28use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
29use relay_protocol::Annotated;
30use relay_quotas::{DataCategory, RateLimits, Scoping};
31use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision};
32use relay_statsd::metric;
33use relay_system::{Addr, FromMessage, NoResponse, Service};
34use reqwest::header;
35use smallvec::{SmallVec, smallvec};
36use zstd::stream::Encoder as ZstdEncoder;
37
38use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType};
39use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
40use crate::integrations::Integration;
41use crate::managed::ManagedEnvelope;
42use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
43use crate::metrics_extraction::ExtractedMetrics;
44use crate::processing::attachments::AttachmentProcessor;
45use crate::processing::check_ins::CheckInsProcessor;
46use crate::processing::client_reports::ClientReportsProcessor;
47use crate::processing::errors::{ErrorsProcessor, SwitchProcessingError};
48use crate::processing::forward_unknown::ForwardUnknownProcessor;
49use crate::processing::legacy_spans::LegacySpansProcessor;
50use crate::processing::logs::LogsProcessor;
51use crate::processing::profile_chunks::ProfileChunksProcessor;
52use crate::processing::profiles::ProfilesProcessor;
53use crate::processing::replays::ReplaysProcessor;
54use crate::processing::sessions::SessionsProcessor;
55use crate::processing::spans::SpansProcessor;
56use crate::processing::trace_attachments::TraceAttachmentsProcessor;
57use crate::processing::trace_metrics::TraceMetricsProcessor;
58use crate::processing::transactions::TransactionProcessor;
59use crate::processing::user_reports::UserReportsProcessor;
60use crate::processing::{Forward as _, ForwardContext, Output, Outputs, QuotaRateLimiter};
61use crate::service::ServiceError;
62use crate::services::global_config::GlobalConfigHandle;
63use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
64use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
65use crate::services::projects::cache::ProjectCacheHandle;
66use crate::services::projects::project::{ProjectInfo, ProjectState};
67use crate::services::upstream::{
68 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
69};
70use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
71use crate::utils;
72use crate::{http, processing};
73use relay_threading::AsyncPool;
74use symbolic_unreal::{Unreal4Error, Unreal4ErrorKind};
75#[cfg(feature = "processing")]
76use {
77 crate::services::objectstore::Objectstore,
78 crate::services::store::Store,
79 itertools::Itertools,
80 relay_dynamic_config::GlobalConfig,
81 relay_quotas::{Quota, RateLimitingError, RedisRateLimiter},
82 relay_redis::RedisClients,
83 std::time::Instant,
84};
85
86pub mod event;
87mod metrics;
88pub mod span;
89
90#[cfg(all(sentry, feature = "processing"))]
91pub mod playstation;
92
93pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
95
96#[derive(Debug)]
97pub struct GroupTypeError;
98
99impl Display for GroupTypeError {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 f.write_str("failed to convert processing group into corresponding type")
102 }
103}
104
105impl std::error::Error for GroupTypeError {}
106
107macro_rules! processing_group {
108 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
109 #[derive(Clone, Copy, Debug)]
110 pub struct $ty;
111
112 impl From<$ty> for ProcessingGroup {
113 fn from(_: $ty) -> Self {
114 ProcessingGroup::$variant
115 }
116 }
117
118 impl TryFrom<ProcessingGroup> for $ty {
119 type Error = GroupTypeError;
120
121 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
122 if matches!(value, ProcessingGroup::$variant) {
123 return Ok($ty);
124 }
125 $($(
126 if matches!(value, ProcessingGroup::$other) {
127 return Ok($ty);
128 }
129 )+)?
130 return Err(GroupTypeError);
131 }
132 }
133 };
134}
135
136processing_group!(TransactionGroup, Transaction);
137
138processing_group!(ErrorGroup, Error);
139
140processing_group!(SessionGroup, Session);
141processing_group!(ClientReportGroup, ClientReport);
142processing_group!(ReplayGroup, Replay);
143processing_group!(CheckInGroup, CheckIn);
144processing_group!(TraceMetricGroup, TraceMetric);
145processing_group!(SpanGroup, Span);
146
147processing_group!(ProfileChunkGroup, ProfileChunk);
148processing_group!(ForwardUnknownGroup, ForwardUnknown);
149processing_group!(Ungrouped, Ungrouped);
150
151#[derive(Clone, Copy, Debug)]
153pub enum ProcessingGroup {
154 Transaction,
158 Error,
163 Session,
165 StandaloneAttachments,
169 StandaloneUserReports,
173 StandaloneProfiles,
177 ClientReport,
179 Replay,
181 CheckIn,
183 Log,
185 TraceMetric,
187 Span,
189 SpanV2,
191 ProfileChunk,
193 TraceAttachment,
195 ForwardUnknown,
198 Ungrouped,
200}
201
202impl ProcessingGroup {
203 fn split_envelope(
205 mut envelope: Envelope,
206 project_info: &ProjectInfo,
207 ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
208 let headers = envelope.headers().clone();
209 let mut grouped_envelopes = smallvec![];
210
211 let replay_items = envelope.take_items_by(|item| {
213 matches!(
214 item.ty(),
215 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
216 )
217 });
218 if !replay_items.is_empty() {
219 grouped_envelopes.push((
220 ProcessingGroup::Replay,
221 Envelope::from_parts(headers.clone(), replay_items),
222 ))
223 }
224
225 let session_items = envelope
227 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
228 if !session_items.is_empty() {
229 grouped_envelopes.push((
230 ProcessingGroup::Session,
231 Envelope::from_parts(headers.clone(), session_items),
232 ))
233 }
234
235 let span_v2_items = envelope.take_items_by(|item| {
236 let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
237
238 ItemContainer::<SpanV2>::is_container(item)
239 || matches!(item.integration(), Some(Integration::Spans(_)))
240 || (exp_feature && matches!(item.ty(), &ItemType::Span))
242 || (exp_feature && item.is_span_attachment())
243 });
244
245 if !span_v2_items.is_empty() {
246 grouped_envelopes.push((
247 ProcessingGroup::SpanV2,
248 Envelope::from_parts(headers.clone(), span_v2_items),
249 ))
250 }
251
252 let span_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Span));
254 if !span_items.is_empty() {
255 grouped_envelopes.push((
256 ProcessingGroup::Span,
257 Envelope::from_parts(headers.clone(), span_items),
258 ))
259 }
260
261 let logs_items = envelope.take_items_by(|item| {
263 matches!(item.ty(), &ItemType::Log)
264 || matches!(item.integration(), Some(Integration::Logs(_)))
265 });
266 if !logs_items.is_empty() {
267 grouped_envelopes.push((
268 ProcessingGroup::Log,
269 Envelope::from_parts(headers.clone(), logs_items),
270 ))
271 }
272
273 let trace_metric_items =
275 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
276 if !trace_metric_items.is_empty() {
277 grouped_envelopes.push((
278 ProcessingGroup::TraceMetric,
279 Envelope::from_parts(headers.clone(), trace_metric_items),
280 ))
281 }
282
283 let profile_chunk_items =
285 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
286 if !profile_chunk_items.is_empty() {
287 grouped_envelopes.push((
288 ProcessingGroup::ProfileChunk,
289 Envelope::from_parts(headers.clone(), profile_chunk_items),
290 ))
291 }
292
293 let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
294 if !trace_attachment_items.is_empty() {
295 grouped_envelopes.push((
296 ProcessingGroup::TraceAttachment,
297 Envelope::from_parts(headers.clone(), trace_attachment_items),
298 ))
299 }
300
301 if !envelope.items().any(Item::creates_event) {
302 let standalone_attachments = envelope
304 .take_items_by(|i| i.requires_event() && matches!(i.ty(), ItemType::Attachment));
305 if !standalone_attachments.is_empty() {
306 grouped_envelopes.push((
307 ProcessingGroup::StandaloneAttachments,
308 Envelope::from_parts(headers.clone(), standalone_attachments),
309 ))
310 }
311
312 let standalone_user_reports =
314 envelope.take_items_by(|i| matches!(i.ty(), ItemType::UserReport));
315 if !standalone_user_reports.is_empty() {
316 grouped_envelopes.push((
317 ProcessingGroup::StandaloneUserReports,
318 Envelope::from_parts(headers.clone(), standalone_user_reports),
319 ));
320 }
321
322 let standalone_profiles =
324 envelope.take_items_by(|i| matches!(i.ty(), ItemType::Profile));
325 if !standalone_profiles.is_empty() {
326 grouped_envelopes.push((
327 ProcessingGroup::StandaloneProfiles,
328 Envelope::from_parts(headers.clone(), standalone_profiles),
329 ));
330 }
331 }
332
333 let security_reports_items = envelope
335 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
336 .into_iter()
337 .map(|item| {
338 let headers = headers.clone();
339 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
340 let mut envelope = Envelope::from_parts(headers, items);
341 envelope.set_event_id(EventId::new());
342 (ProcessingGroup::Error, envelope)
343 });
344 grouped_envelopes.extend(security_reports_items);
345
346 let require_event_items = envelope.take_items_by(Item::requires_event);
348 if !require_event_items.is_empty() {
349 let group = if require_event_items
350 .iter()
351 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
352 {
353 ProcessingGroup::Transaction
354 } else {
355 ProcessingGroup::Error
356 };
357
358 grouped_envelopes.push((
359 group,
360 Envelope::from_parts(headers.clone(), require_event_items),
361 ))
362 }
363
364 let envelopes = envelope.items_mut().map(|item| {
366 let headers = headers.clone();
367 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
368 let envelope = Envelope::from_parts(headers, items);
369 let item_type = item.ty();
370 let group = if matches!(item_type, &ItemType::CheckIn) {
371 ProcessingGroup::CheckIn
372 } else if matches!(item.ty(), &ItemType::ClientReport) {
373 ProcessingGroup::ClientReport
374 } else if matches!(item_type, &ItemType::Unknown(_)) {
375 ProcessingGroup::ForwardUnknown
376 } else {
377 ProcessingGroup::Ungrouped
379 };
380
381 (group, envelope)
382 });
383 grouped_envelopes.extend(envelopes);
384
385 grouped_envelopes
386 }
387
388 pub fn variant(&self) -> &'static str {
390 match self {
391 ProcessingGroup::Transaction => "transaction",
392 ProcessingGroup::Error => "error",
393 ProcessingGroup::Session => "session",
394 ProcessingGroup::StandaloneAttachments => "standalone_attachment",
395 ProcessingGroup::StandaloneUserReports => "standalone_user_reports",
396 ProcessingGroup::StandaloneProfiles => "standalone_profiles",
397 ProcessingGroup::ClientReport => "client_report",
398 ProcessingGroup::Replay => "replay",
399 ProcessingGroup::CheckIn => "check_in",
400 ProcessingGroup::Log => "log",
401 ProcessingGroup::TraceMetric => "trace_metric",
402 ProcessingGroup::Span => "span",
403 ProcessingGroup::SpanV2 => "span_v2",
404 ProcessingGroup::ProfileChunk => "profile_chunk",
405 ProcessingGroup::TraceAttachment => "trace_attachment",
406 ProcessingGroup::ForwardUnknown => "forward_unknown",
407 ProcessingGroup::Ungrouped => "ungrouped",
408 }
409 }
410}
411
412impl From<ProcessingGroup> for AppFeature {
413 fn from(value: ProcessingGroup) -> Self {
414 match value {
415 ProcessingGroup::Transaction => AppFeature::Transactions,
416 ProcessingGroup::Error => AppFeature::Errors,
417 ProcessingGroup::Session => AppFeature::Sessions,
418 ProcessingGroup::StandaloneAttachments => AppFeature::UnattributedEnvelope,
419 ProcessingGroup::StandaloneUserReports => AppFeature::UserReports,
420 ProcessingGroup::StandaloneProfiles => AppFeature::Profiles,
421 ProcessingGroup::ClientReport => AppFeature::ClientReports,
422 ProcessingGroup::Replay => AppFeature::Replays,
423 ProcessingGroup::CheckIn => AppFeature::CheckIns,
424 ProcessingGroup::Log => AppFeature::Logs,
425 ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
426 ProcessingGroup::Span => AppFeature::Spans,
427 ProcessingGroup::SpanV2 => AppFeature::Spans,
428 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
429 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
430 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
431 ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
432 }
433 }
434}
435
436#[derive(Debug, thiserror::Error)]
438pub enum ProcessingError {
439 #[error("invalid json in event")]
440 InvalidJson(#[source] serde_json::Error),
441
442 #[error("invalid message pack event payload")]
443 InvalidMsgpack(#[from] rmp_serde::decode::Error),
444
445 #[error("invalid unreal crash report")]
446 InvalidUnrealReport(#[source] Unreal4Error),
447
448 #[error("event payload too large")]
449 PayloadTooLarge(DiscardItemType),
450
451 #[error("invalid transaction event")]
452 InvalidTransaction,
453
454 #[error("the item is not allowed/supported in this envelope")]
455 UnsupportedItem,
456
457 #[error("envelope processor failed")]
458 ProcessingFailed(#[from] ProcessingAction),
459
460 #[error("duplicate {0} in event")]
461 DuplicateItem(ItemType),
462
463 #[error("failed to extract event payload")]
464 NoEventPayload,
465
466 #[error("missing project id in DSN")]
467 MissingProjectId,
468
469 #[error("invalid security report type: {0:?}")]
470 InvalidSecurityType(Bytes),
471
472 #[error("unsupported security report type")]
473 UnsupportedSecurityType,
474
475 #[error("invalid security report")]
476 InvalidSecurityReport(#[source] serde_json::Error),
477
478 #[error("event filtered with reason: {0:?}")]
479 EventFiltered(FilterStatKey),
480
481 #[error("could not serialize event payload")]
482 SerializeFailed(#[source] serde_json::Error),
483
484 #[cfg(feature = "processing")]
485 #[error("failed to apply quotas")]
486 QuotasFailed(#[from] RateLimitingError),
487
488 #[error("nintendo switch dying message processing failed {0:?}")]
489 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
490
491 #[cfg(all(sentry, feature = "processing"))]
492 #[error("playstation dump processing failed: {0}")]
493 InvalidPlaystationDump(String),
494
495 #[error("processing group does not match specific processor")]
496 ProcessingGroupMismatch,
497 #[error("new processing pipeline failed")]
498 ProcessingFailure,
499
500 #[cfg(feature = "processing")]
501 #[error("invalid attachment reference")]
502 InvalidAttachmentRef,
503
504 #[error("could not determine processing group for envelope items")]
505 NoProcessingGroup,
506}
507
508impl ProcessingError {
509 pub fn to_outcome(&self) -> Option<Outcome> {
510 match self {
511 Self::PayloadTooLarge(payload_type) => {
512 Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
513 }
514 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
515 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
516 Self::InvalidSecurityType(_) => {
517 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
518 }
519 Self::UnsupportedItem => Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)),
520 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
521 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
522 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
523 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
524 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
525 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
526 #[cfg(all(sentry, feature = "processing"))]
527 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
528 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
529 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
530 }
531 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
532 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
533 Some(Outcome::Invalid(DiscardReason::Internal))
534 }
535 #[cfg(feature = "processing")]
536 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
537 Self::MissingProjectId => None,
538 Self::EventFiltered(key) => Some(Outcome::Filtered(key.clone())),
539
540 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
541 Self::ProcessingFailure => None,
543 #[cfg(feature = "processing")]
544 Self::InvalidAttachmentRef => {
545 Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
546 }
547 Self::NoProcessingGroup => Some(Outcome::Invalid(DiscardReason::Internal)),
548 }
549 }
550
551 fn is_unexpected(&self) -> bool {
552 self.to_outcome()
553 .is_some_and(|outcome| outcome.is_unexpected())
554 }
555}
556
557impl From<Unreal4Error> for ProcessingError {
558 fn from(err: Unreal4Error) -> Self {
559 match err.kind() {
560 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
561 _ => ProcessingError::InvalidUnrealReport(err),
562 }
563 }
564}
565
566type ExtractedEvent = (Annotated<Event>, usize);
567
568#[derive(Debug)]
573pub struct ProcessingExtractedMetrics {
574 metrics: ExtractedMetrics,
575}
576
577impl ProcessingExtractedMetrics {
578 pub fn new() -> Self {
579 Self {
580 metrics: ExtractedMetrics::default(),
581 }
582 }
583
584 pub fn into_inner(self) -> ExtractedMetrics {
585 self.metrics
586 }
587
588 pub fn extend(
590 &mut self,
591 extracted: ExtractedMetrics,
592 sampling_decision: Option<SamplingDecision>,
593 ) {
594 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
595 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
596 }
597
598 pub fn extend_project_metrics<I>(
600 &mut self,
601 buckets: I,
602 sampling_decision: Option<SamplingDecision>,
603 ) where
604 I: IntoIterator<Item = Bucket>,
605 {
606 self.metrics
607 .project_metrics
608 .extend(buckets.into_iter().map(|mut bucket| {
609 bucket.metadata.extracted_from_indexed =
610 sampling_decision == Some(SamplingDecision::Keep);
611 bucket
612 }));
613 }
614
615 pub fn extend_sampling_metrics<I>(
617 &mut self,
618 buckets: I,
619 sampling_decision: Option<SamplingDecision>,
620 ) where
621 I: IntoIterator<Item = Bucket>,
622 {
623 self.metrics
624 .sampling_metrics
625 .extend(buckets.into_iter().map(|mut bucket| {
626 bucket.metadata.extracted_from_indexed =
627 sampling_decision == Some(SamplingDecision::Keep);
628 bucket
629 }));
630 }
631}
632
633fn send_metrics(
634 metrics: ExtractedMetrics,
635 project_key: ProjectKey,
636 sampling_key: Option<ProjectKey>,
637 aggregator: &Addr<Aggregator>,
638) {
639 let ExtractedMetrics {
640 project_metrics,
641 sampling_metrics,
642 } = metrics;
643
644 if !project_metrics.is_empty() {
645 aggregator.send(MergeBuckets {
646 project_key,
647 buckets: project_metrics,
648 });
649 }
650
651 if !sampling_metrics.is_empty() {
652 let sampling_project_key = sampling_key.unwrap_or(project_key);
659 aggregator.send(MergeBuckets {
660 project_key: sampling_project_key,
661 buckets: sampling_metrics,
662 });
663 }
664}
665
666#[derive(Debug)]
676pub struct ProcessEnvelope {
677 pub envelope: ManagedEnvelope,
679 pub project_info: Arc<ProjectInfo>,
681 pub rate_limits: Arc<RateLimits>,
683 pub sampling_project_info: Option<Arc<ProjectInfo>>,
685 pub reservoir_counters: ReservoirCounters,
687}
688
689#[derive(Debug)]
691struct ProcessEnvelopeGrouped<'a> {
692 pub group: ProcessingGroup,
694 pub envelope: ManagedEnvelope,
696 pub ctx: processing::Context<'a>,
698}
699
700#[derive(Debug)]
712pub struct ProcessMetrics {
713 pub data: MetricData,
715 pub project_key: ProjectKey,
717 pub source: BucketSource,
719 pub received_at: DateTime<Utc>,
721 pub sent_at: Option<DateTime<Utc>>,
724}
725
726#[derive(Debug)]
728pub enum MetricData {
729 Raw(Vec<Item>),
731 Parsed(Vec<Bucket>),
733}
734
735impl MetricData {
736 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
741 let items = match self {
742 Self::Parsed(buckets) => return buckets,
743 Self::Raw(items) => items,
744 };
745
746 let mut buckets = Vec::new();
747 for item in items {
748 let payload = item.payload();
749 if item.ty() == &ItemType::Statsd {
750 for bucket_result in Bucket::parse_all(&payload, timestamp) {
751 match bucket_result {
752 Ok(bucket) => buckets.push(bucket),
753 Err(error) => relay_log::debug!(
754 error = &error as &dyn Error,
755 "failed to parse metric bucket from statsd format",
756 ),
757 }
758 }
759 } else if item.ty() == &ItemType::MetricBuckets {
760 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
761 Ok(parsed_buckets) => {
762 if buckets.is_empty() {
764 buckets = parsed_buckets;
765 } else {
766 buckets.extend(parsed_buckets);
767 }
768 }
769 Err(error) => {
770 relay_log::debug!(
771 error = &error as &dyn Error,
772 "failed to parse metric bucket",
773 );
774 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
775 }
776 }
777 } else {
778 relay_log::error!(
779 "invalid item of type {} passed to ProcessMetrics",
780 item.ty()
781 );
782 }
783 }
784 buckets
785 }
786}
787
788#[derive(Debug)]
789pub struct ProcessBatchedMetrics {
790 pub payload: Bytes,
792 pub source: BucketSource,
794 pub received_at: DateTime<Utc>,
796 pub sent_at: Option<DateTime<Utc>>,
798}
799
800#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
802pub enum BucketSource {
803 Internal,
809 External,
814}
815
816impl BucketSource {
817 pub fn from_meta(meta: &RequestMeta) -> Self {
819 match meta.request_trust() {
820 RequestTrust::Trusted => Self::Internal,
821 RequestTrust::Untrusted => Self::External,
822 }
823 }
824}
825
826#[derive(Debug)]
828pub struct SubmitClientReports {
829 pub client_reports: Vec<ClientReport>,
831 pub scoping: Scoping,
833}
834
835#[derive(Debug)]
837pub enum EnvelopeProcessor {
838 ProcessEnvelope(Box<ProcessEnvelope>),
839 ProcessProjectMetrics(Box<ProcessMetrics>),
840 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
841 FlushBuckets(Box<FlushBuckets>),
842 SubmitClientReports(Box<SubmitClientReports>),
843}
844
845impl EnvelopeProcessor {
846 pub fn variant(&self) -> &'static str {
848 match self {
849 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
850 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
851 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
852 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
853 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
854 }
855 }
856}
857
858impl relay_system::Interface for EnvelopeProcessor {}
859
860impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
861 type Response = relay_system::NoResponse;
862
863 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
864 Self::ProcessEnvelope(Box::new(message))
865 }
866}
867
868impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
869 type Response = NoResponse;
870
871 fn from_message(message: ProcessMetrics, _: ()) -> Self {
872 Self::ProcessProjectMetrics(Box::new(message))
873 }
874}
875
876impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
877 type Response = NoResponse;
878
879 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
880 Self::ProcessBatchedMetrics(Box::new(message))
881 }
882}
883
884impl FromMessage<FlushBuckets> for EnvelopeProcessor {
885 type Response = NoResponse;
886
887 fn from_message(message: FlushBuckets, _: ()) -> Self {
888 Self::FlushBuckets(Box::new(message))
889 }
890}
891
892impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
893 type Response = NoResponse;
894
895 fn from_message(message: SubmitClientReports, _: ()) -> Self {
896 Self::SubmitClientReports(Box::new(message))
897 }
898}
899
900pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
902
903#[derive(Clone)]
907pub struct EnvelopeProcessorService {
908 inner: Arc<InnerProcessor>,
909}
910
911pub struct Addrs {
913 pub outcome_aggregator: Addr<TrackOutcome>,
914 pub upstream_relay: Addr<UpstreamRelay>,
915 #[cfg(feature = "processing")]
916 pub objectstore: Option<Addr<Objectstore>>,
917 #[cfg(feature = "processing")]
918 pub store_forwarder: Option<Addr<Store>>,
919 pub aggregator: Addr<Aggregator>,
920}
921
922impl Default for Addrs {
923 fn default() -> Self {
924 Addrs {
925 outcome_aggregator: Addr::dummy(),
926 upstream_relay: Addr::dummy(),
927 #[cfg(feature = "processing")]
928 objectstore: None,
929 #[cfg(feature = "processing")]
930 store_forwarder: None,
931 aggregator: Addr::dummy(),
932 }
933 }
934}
935
936struct InnerProcessor {
937 pool: EnvelopeProcessorServicePool,
938 config: Arc<Config>,
939 global_config: GlobalConfigHandle,
940 project_cache: ProjectCacheHandle,
941 cogs: Cogs,
942 addrs: Addrs,
943 #[cfg(feature = "processing")]
944 rate_limiter: Option<Arc<RedisRateLimiter>>,
945 metric_outcomes: MetricOutcomes,
946 processing: Processing,
947}
948
949struct Processing {
950 errors: ErrorsProcessor,
951 logs: LogsProcessor,
952 trace_metrics: TraceMetricsProcessor,
953 spans: SpansProcessor,
954 legacy_spans: LegacySpansProcessor,
955 check_ins: CheckInsProcessor,
956 sessions: SessionsProcessor,
957 transactions: TransactionProcessor,
958 profile_chunks: ProfileChunksProcessor,
959 trace_attachments: TraceAttachmentsProcessor,
960 replays: ReplaysProcessor,
961 client_reports: ClientReportsProcessor,
962 attachments: AttachmentProcessor,
963 user_reports: UserReportsProcessor,
964 profiles: ProfilesProcessor,
965 forward_unknown: ForwardUnknownProcessor,
966}
967
968impl EnvelopeProcessorService {
969 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
971 pub fn new(
972 pool: EnvelopeProcessorServicePool,
973 config: Arc<Config>,
974 global_config: GlobalConfigHandle,
975 project_cache: ProjectCacheHandle,
976 cogs: Cogs,
977 #[cfg(feature = "processing")] redis: Option<RedisClients>,
978 addrs: Addrs,
979 metric_outcomes: MetricOutcomes,
980 ) -> Self {
981 let geoip_lookup = config
982 .geoip_path()
983 .and_then(
984 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
985 Ok(geoip) => Some(geoip),
986 Err(err) => {
987 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
988 None
989 }
990 },
991 )
992 .unwrap_or_else(GeoIpLookup::empty);
993
994 #[cfg(feature = "processing")]
995 let rate_limiter = redis.as_ref().map(|redis| {
996 RedisRateLimiter::new(redis.quotas.clone())
997 .max_limit(config.max_rate_limit())
998 .cache(config.quota_cache_ratio(), config.quota_cache_max())
999 });
1000
1001 let quota_limiter = Arc::new(QuotaRateLimiter::new(
1002 #[cfg(feature = "processing")]
1003 project_cache.clone(),
1004 #[cfg(feature = "processing")]
1005 rate_limiter.clone(),
1006 ));
1007 #[cfg(feature = "processing")]
1008 let rate_limiter = rate_limiter.map(Arc::new);
1009 let outcome_aggregator = addrs.outcome_aggregator.clone();
1010 let inner = InnerProcessor {
1011 pool,
1012 global_config,
1013 project_cache,
1014 cogs,
1015 #[cfg(feature = "processing")]
1016 rate_limiter,
1017 addrs,
1018 metric_outcomes,
1019 processing: Processing {
1020 errors: ErrorsProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1021 logs: LogsProcessor::new(Arc::clone("a_limiter)),
1022 trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)),
1023 spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1024 legacy_spans: LegacySpansProcessor::new(
1025 Arc::clone("a_limiter),
1026 geoip_lookup.clone(),
1027 ),
1028 check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)),
1029 sessions: SessionsProcessor::new(Arc::clone("a_limiter)),
1030 transactions: TransactionProcessor::new(
1031 Arc::clone("a_limiter),
1032 geoip_lookup.clone(),
1033 #[cfg(feature = "processing")]
1034 redis.map(|r| r.quotas),
1035 #[cfg(not(feature = "processing"))]
1036 None,
1037 ),
1038 profile_chunks: ProfileChunksProcessor::new(Arc::clone("a_limiter)),
1039 trace_attachments: TraceAttachmentsProcessor::new(Arc::clone("a_limiter)),
1040 replays: ReplaysProcessor::new(Arc::clone("a_limiter), geoip_lookup),
1041 client_reports: ClientReportsProcessor::new(outcome_aggregator),
1042 attachments: AttachmentProcessor::new(Arc::clone("a_limiter)),
1043 user_reports: UserReportsProcessor::new(Arc::clone("a_limiter)),
1044 profiles: ProfilesProcessor::new(quota_limiter),
1045 forward_unknown: ForwardUnknownProcessor::new(),
1046 },
1047 config,
1048 };
1049
1050 Self {
1051 inner: Arc::new(inner),
1052 }
1053 }
1054
1055 async fn process_with_processor<P: processing::Processor>(
1056 &self,
1057 processor: &P,
1058 mut managed_envelope: ManagedEnvelope,
1059 ctx: processing::Context<'_>,
1060 ) -> Result<Output<Outputs>, ProcessingError>
1061 where
1062 Outputs: From<P::Output>,
1063 {
1064 let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1065 debug_assert!(
1066 false,
1067 "there must be work for the {} processor",
1068 std::any::type_name::<P>(),
1069 );
1070 return Err(ProcessingError::ProcessingGroupMismatch);
1071 };
1072
1073 managed_envelope.update();
1074 match managed_envelope.envelope().is_empty() {
1075 true => managed_envelope.accept(),
1076 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1077 }
1078
1079 processor
1080 .process(work, ctx)
1081 .await
1082 .map_err(|err| {
1083 relay_log::debug!(
1084 error = &err as &dyn std::error::Error,
1085 "processing pipeline failed"
1086 );
1087 ProcessingError::ProcessingFailure
1088 })
1089 .map(|o| o.map(Into::into))
1090 }
1091
1092 async fn process_envelope(
1093 &self,
1094 project_id: ProjectId,
1095 message: ProcessEnvelopeGrouped<'_>,
1096 ) -> Result<Output<Outputs>, ProcessingError> {
1097 let ProcessEnvelopeGrouped {
1098 group,
1099 envelope: mut managed_envelope,
1100 ctx,
1101 } = message;
1102
1103 if let Some(sampling_state) = ctx.sampling_project_info {
1105 managed_envelope
1108 .envelope_mut()
1109 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1110 }
1111
1112 if let Some(retention) = ctx.project_info.config.event_retention {
1115 managed_envelope.envelope_mut().set_retention(retention);
1116 }
1117
1118 if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1121 managed_envelope
1122 .envelope_mut()
1123 .set_downsampled_retention(retention);
1124 }
1125
1126 managed_envelope
1131 .envelope_mut()
1132 .meta_mut()
1133 .set_project_id(project_id);
1134
1135 relay_log::trace!("Processing {group} group", group = group.variant());
1136
1137 match group {
1138 ProcessingGroup::Error => {
1139 self.process_with_processor(&self.inner.processing.errors, managed_envelope, ctx)
1140 .await
1141 }
1142 ProcessingGroup::Transaction => {
1143 self.process_with_processor(
1144 &self.inner.processing.transactions,
1145 managed_envelope,
1146 ctx,
1147 )
1148 .await
1149 }
1150 ProcessingGroup::Session => {
1151 self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1152 .await
1153 }
1154 ProcessingGroup::StandaloneAttachments => {
1155 self.process_with_processor(
1156 &self.inner.processing.attachments,
1157 managed_envelope,
1158 ctx,
1159 )
1160 .await
1161 }
1162 ProcessingGroup::StandaloneUserReports => {
1163 self.process_with_processor(
1164 &self.inner.processing.user_reports,
1165 managed_envelope,
1166 ctx,
1167 )
1168 .await
1169 }
1170 ProcessingGroup::StandaloneProfiles => {
1171 self.process_with_processor(&self.inner.processing.profiles, managed_envelope, ctx)
1172 .await
1173 }
1174 ProcessingGroup::ClientReport => {
1175 self.process_with_processor(
1176 &self.inner.processing.client_reports,
1177 managed_envelope,
1178 ctx,
1179 )
1180 .await
1181 }
1182 ProcessingGroup::Replay => {
1183 self.process_with_processor(&self.inner.processing.replays, managed_envelope, ctx)
1184 .await
1185 }
1186 ProcessingGroup::CheckIn => {
1187 self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1188 .await
1189 }
1190 ProcessingGroup::Log => {
1191 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1192 .await
1193 }
1194 ProcessingGroup::TraceMetric => {
1195 self.process_with_processor(
1196 &self.inner.processing.trace_metrics,
1197 managed_envelope,
1198 ctx,
1199 )
1200 .await
1201 }
1202 ProcessingGroup::SpanV2 => {
1203 self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1204 .await
1205 }
1206 ProcessingGroup::TraceAttachment => {
1207 self.process_with_processor(
1208 &self.inner.processing.trace_attachments,
1209 managed_envelope,
1210 ctx,
1211 )
1212 .await
1213 }
1214 ProcessingGroup::Span => {
1215 self.process_with_processor(
1216 &self.inner.processing.legacy_spans,
1217 managed_envelope,
1218 ctx,
1219 )
1220 .await
1221 }
1222 ProcessingGroup::ProfileChunk => {
1223 self.process_with_processor(
1224 &self.inner.processing.profile_chunks,
1225 managed_envelope,
1226 ctx,
1227 )
1228 .await
1229 }
1230 ProcessingGroup::Ungrouped => {
1233 relay_log::error!(
1234 tags.project = %project_id,
1235 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1236 "could not identify the processing group based on the envelope's items"
1237 );
1238
1239 Err(ProcessingError::NoProcessingGroup)
1240 }
1241 ProcessingGroup::ForwardUnknown => {
1245 self.process_with_processor(
1246 &self.inner.processing.forward_unknown,
1247 managed_envelope,
1248 ctx,
1249 )
1250 .await
1251 }
1252 }
1253 }
1254
1255 async fn process<'a>(
1261 &self,
1262 mut message: ProcessEnvelopeGrouped<'a>,
1263 ) -> Result<Option<(Outputs, ForwardContext<'a>)>, ProcessingError> {
1264 let ProcessEnvelopeGrouped {
1265 ref mut envelope,
1266 ctx,
1267 ..
1268 } = message;
1269
1270 let Some(project_id) = ctx
1277 .project_info
1278 .project_id
1279 .or_else(|| envelope.envelope().meta().project_id())
1280 else {
1281 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1282 return Err(ProcessingError::MissingProjectId);
1283 };
1284
1285 let client = envelope.envelope().meta().client().map(str::to_owned);
1286 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1287 let project_key = envelope.envelope().meta().public_key();
1288 let sampling_key = envelope
1292 .envelope()
1293 .sampling_key()
1294 .filter(|_| ctx.sampling_project_info.is_some());
1295
1296 relay_log::configure_scope(|scope| {
1299 scope.set_tag("project", project_id);
1300 if let Some(client) = client {
1301 scope.set_tag("sdk", client);
1302 }
1303 if let Some(user_agent) = user_agent {
1304 scope.set_extra("user_agent", user_agent.into());
1305 }
1306 });
1307
1308 let result =
1309 self.process_envelope(project_id, message)
1310 .await
1311 .map(|Output { main, metrics }| {
1312 if let Some(metrics) = metrics {
1313 metrics.accept(|metrics| {
1314 send_metrics(
1315 metrics,
1316 project_key,
1317 sampling_key,
1318 &self.inner.addrs.aggregator,
1319 );
1320 });
1321 }
1322
1323 let ctx = ctx.to_forward();
1324 main.map(|output| (output, ctx))
1325 });
1326
1327 relay_log::configure_scope(|scope| {
1328 scope.remove_tag("project");
1329 scope.remove_tag("sdk");
1330 scope.remove_tag("user_agent");
1331 });
1332
1333 result
1334 }
1335
1336 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1337 let project_key = message.envelope.envelope().meta().public_key();
1338 let wait_time = message.envelope.age();
1339 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1340
1341 cogs.cancel();
1344
1345 let scoping = message.envelope.scoping();
1346 for (group, envelope) in ProcessingGroup::split_envelope(
1347 *message.envelope.into_envelope(),
1348 &message.project_info,
1349 ) {
1350 let mut cogs = self
1351 .inner
1352 .cogs
1353 .timed(ResourceId::Relay, AppFeature::from(group));
1354
1355 let mut envelope =
1356 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1357 envelope.scope(scoping);
1358
1359 let global_config = self.inner.global_config.current();
1360
1361 let ctx = processing::Context {
1362 config: &self.inner.config,
1363 global_config: &global_config,
1364 project_info: &message.project_info,
1365 sampling_project_info: message.sampling_project_info.as_deref(),
1366 rate_limits: &message.rate_limits,
1367 reservoir_counters: &message.reservoir_counters,
1368 };
1369
1370 let message = ProcessEnvelopeGrouped {
1371 group,
1372 envelope,
1373 ctx,
1374 };
1375
1376 let result = metric!(
1377 timer(RelayTimers::EnvelopeProcessingTime),
1378 group = group.variant(),
1379 { self.process(message).await }
1380 );
1381
1382 match result {
1383 Ok(Some((output, ctx))) => self.submit_upstream(&mut cogs, output, ctx),
1384 Ok(None) => {}
1385 Err(error) if error.is_unexpected() => {
1386 relay_log::error!(
1387 tags.project_key = %project_key,
1388 error = &error as &dyn Error,
1389 "error processing envelope"
1390 )
1391 }
1392 Err(error) => {
1393 relay_log::debug!(
1394 tags.project_key = %project_key,
1395 error = &error as &dyn Error,
1396 "error processing envelope"
1397 )
1398 }
1399 }
1400 }
1401 }
1402
1403 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1404 let ProcessMetrics {
1405 data,
1406 project_key,
1407 received_at,
1408 sent_at,
1409 source,
1410 } = message;
1411
1412 let received_timestamp =
1413 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1414
1415 let mut buckets = data.into_buckets(received_timestamp);
1416 if buckets.is_empty() {
1417 return;
1418 };
1419 cogs.update(relay_metrics::cogs::BySize(&buckets));
1420
1421 let clock_drift_processor =
1422 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
1423
1424 buckets.retain_mut(|bucket| {
1425 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
1426 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
1427 return false;
1428 }
1429
1430 if !self::metrics::is_valid_namespace(bucket) {
1431 return false;
1432 }
1433
1434 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
1435
1436 if !matches!(source, BucketSource::Internal) {
1437 bucket.metadata = BucketMetadata::new(received_timestamp);
1438 }
1439
1440 true
1441 });
1442
1443 let project = self.inner.project_cache.get(project_key);
1444
1445 let buckets = match project.state() {
1448 ProjectState::Enabled(project_info) => {
1449 let rate_limits = project.rate_limits().current_limits();
1450 self.check_buckets(project_key, project_info, &rate_limits, buckets)
1451 }
1452 _ => buckets,
1453 };
1454
1455 relay_log::trace!("merging metric buckets into the aggregator");
1456 self.inner
1457 .addrs
1458 .aggregator
1459 .send(MergeBuckets::new(project_key, buckets));
1460 }
1461
1462 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
1463 let ProcessBatchedMetrics {
1464 payload,
1465 source,
1466 received_at,
1467 sent_at,
1468 } = message;
1469
1470 #[derive(serde::Deserialize)]
1471 struct Wrapper {
1472 buckets: HashMap<ProjectKey, Vec<Bucket>>,
1473 }
1474
1475 let buckets = match serde_json::from_slice(&payload) {
1476 Ok(Wrapper { buckets }) => buckets,
1477 Err(error) => {
1478 relay_log::debug!(
1479 error = &error as &dyn Error,
1480 "failed to parse batched metrics",
1481 );
1482 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1483 return;
1484 }
1485 };
1486
1487 for (project_key, buckets) in buckets {
1488 self.handle_process_metrics(
1489 cogs,
1490 ProcessMetrics {
1491 data: MetricData::Parsed(buckets),
1492 project_key,
1493 source,
1494 received_at,
1495 sent_at,
1496 },
1497 )
1498 }
1499 }
1500
1501 fn submit_upstream(
1505 &self,
1506 cogs: &mut Token,
1507 output: Outputs,
1508 ctx: processing::ForwardContext<'_>,
1509 ) {
1510 let _submit = cogs.start_category("submit");
1511
1512 #[cfg(feature = "processing")]
1513 if ctx.config.processing_enabled()
1514 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
1515 {
1516 use crate::processing::StoreHandle;
1517
1518 let objectstore = self.inner.addrs.objectstore.as_ref();
1519 let handle = StoreHandle::new(store_forwarder, objectstore, ctx.global_config);
1520
1521 output
1522 .forward_store(handle, ctx)
1523 .unwrap_or_else(|err| err.into_inner());
1524
1525 return;
1526 }
1527
1528 match output.serialize_envelope(ctx) {
1529 Ok(envelope) => {
1530 let envelope = ManagedEnvelope::from(envelope);
1531 self.submit_envelope_upstream(envelope);
1532 }
1533 Err(_) => relay_log::error!("failed to serialize output to an envelope"),
1534 };
1535 }
1536
1537 fn submit_envelope_upstream(&self, mut envelope: ManagedEnvelope) {
1538 if envelope.envelope_mut().is_empty() {
1539 envelope.accept();
1540 return;
1541 }
1542
1543 if self.inner.config.processing_enabled() {
1549 relay_log::error!(
1550 "attempt to forward envelope to http upstream when processing is enabled"
1551 );
1552 return;
1553 }
1554
1555 envelope.envelope_mut().set_sent_at(Utc::now());
1561
1562 relay_log::trace!("sending envelope to sentry endpoint");
1563 let http_encoding = self.inner.config.http_encoding();
1564 let result = envelope.envelope().to_vec().and_then(|v| {
1565 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
1566 });
1567
1568 match result {
1569 Ok(body) => {
1570 self.inner
1571 .addrs
1572 .upstream_relay
1573 .send(SendRequest(SendEnvelope {
1574 envelope,
1575 body,
1576 http_encoding,
1577 project_cache: self.inner.project_cache.clone(),
1578 }));
1579 }
1580 Err(error) => {
1581 relay_log::error!(
1584 error = &error as &dyn Error,
1585 tags.project_key = %envelope.scoping().project_key,
1586 "failed to serialize envelope payload"
1587 );
1588
1589 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1590 }
1591 }
1592 }
1593
1594 fn handle_submit_client_reports(&self, message: SubmitClientReports) {
1595 let SubmitClientReports {
1596 client_reports,
1597 scoping,
1598 } = message;
1599
1600 let upstream = self.inner.config.upstream();
1601 let dsn = PartialDsn::outbound(&scoping, upstream);
1602
1603 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
1604 for client_report in client_reports {
1605 match client_report.serialize() {
1606 Ok(payload) => {
1607 let mut item = Item::new(ItemType::ClientReport);
1608 item.set_payload(ContentType::Json, payload);
1609 envelope.add_item(item);
1610 }
1611 Err(error) => {
1612 relay_log::error!(
1613 error = &error as &dyn std::error::Error,
1614 "failed to serialize client report"
1615 );
1616 }
1617 }
1618 }
1619
1620 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1621 self.submit_envelope_upstream(envelope);
1622 }
1623
1624 fn check_buckets(
1625 &self,
1626 project_key: ProjectKey,
1627 project_info: &ProjectInfo,
1628 rate_limits: &RateLimits,
1629 buckets: Vec<Bucket>,
1630 ) -> Vec<Bucket> {
1631 let Some(scoping) = project_info.scoping(project_key) else {
1632 relay_log::error!(
1633 tags.project_key = project_key.as_str(),
1634 "there is no scoping: dropping {} buckets",
1635 buckets.len(),
1636 );
1637 return Vec::new();
1638 };
1639
1640 let mut buckets = self::metrics::apply_project_info(
1641 buckets,
1642 &self.inner.metric_outcomes,
1643 project_info,
1644 scoping,
1645 );
1646
1647 let namespaces: BTreeSet<MetricNamespace> = buckets
1648 .iter()
1649 .filter_map(|bucket| bucket.name.try_namespace())
1650 .collect();
1651
1652 for namespace in namespaces {
1653 let limits = rate_limits.check_with_quotas(
1654 project_info.get_quotas(),
1655 scoping.item(DataCategory::MetricBucket),
1656 );
1657
1658 if limits.is_limited() {
1659 let rejected;
1660 (buckets, rejected) = utils::split_off(buckets, |bucket| {
1661 bucket.name.try_namespace() == Some(namespace)
1662 });
1663
1664 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1665 self.inner.metric_outcomes.track(
1666 scoping,
1667 &rejected,
1668 Outcome::RateLimited(reason_code),
1669 );
1670 }
1671 }
1672
1673 let quotas = project_info.config.quotas.clone();
1674 match MetricsLimiter::create(buckets, quotas, scoping) {
1675 Ok(mut bucket_limiter) => {
1676 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
1677 bucket_limiter.into_buckets()
1678 }
1679 Err(buckets) => buckets,
1680 }
1681 }
1682
1683 #[cfg(feature = "processing")]
1684 async fn rate_limit_buckets(
1685 &self,
1686 scoping: Scoping,
1687 project_info: &ProjectInfo,
1688 mut buckets: Vec<Bucket>,
1689 ) -> Vec<Bucket> {
1690 let Some(rate_limiter) = &self.inner.rate_limiter else {
1691 return buckets;
1692 };
1693
1694 let global_config = self.inner.global_config.current();
1695 let namespaces = buckets
1696 .iter()
1697 .filter_map(|bucket| bucket.name.try_namespace())
1698 .counts();
1699
1700 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
1701
1702 for (namespace, quantity) in namespaces {
1703 let item_scoping = scoping.metric_bucket(namespace);
1704
1705 let limits = match rate_limiter
1706 .is_rate_limited(quotas, item_scoping, quantity, false)
1707 .await
1708 {
1709 Ok(limits) => limits,
1710 Err(err) => {
1711 relay_log::error!(
1712 error = &err as &dyn std::error::Error,
1713 "failed to check redis rate limits"
1714 );
1715 break;
1716 }
1717 };
1718
1719 if limits.is_limited() {
1720 let rejected;
1721 (buckets, rejected) = utils::split_off(buckets, |bucket| {
1722 bucket.name.try_namespace() == Some(namespace)
1723 });
1724
1725 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1726 self.inner.metric_outcomes.track(
1727 scoping,
1728 &rejected,
1729 Outcome::RateLimited(reason_code),
1730 );
1731
1732 self.inner
1733 .project_cache
1734 .get(item_scoping.scoping.project_key)
1735 .rate_limits()
1736 .merge(limits);
1737 }
1738 }
1739
1740 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
1741 Err(buckets) => buckets,
1742 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
1743 }
1744 }
1745
1746 #[cfg(feature = "processing")]
1748 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
1749 relay_log::trace!("handle_rate_limit_buckets");
1750
1751 let scoping = *bucket_limiter.scoping();
1752
1753 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
1754 let global_config = self.inner.global_config.current();
1755 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
1756
1757 let over_accept_once = true;
1760 let mut rate_limits = RateLimits::new();
1761
1762 let (category, count) = bucket_limiter.count();
1763
1764 let timer = Instant::now();
1765 let mut is_limited = false;
1766
1767 if let Some(count) = count {
1768 match rate_limiter
1769 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
1770 .await
1771 {
1772 Ok(limits) => {
1773 is_limited = limits.is_limited();
1774 rate_limits.merge(limits)
1775 }
1776 Err(e) => {
1777 relay_log::error!(error = &e as &dyn Error, "rate limiting error")
1778 }
1779 }
1780 }
1781
1782 relay_statsd::metric!(
1783 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
1784 category = category.name(),
1785 limited = if is_limited { "true" } else { "false" },
1786 count = match count {
1787 None => "none",
1788 Some(0) => "0",
1789 Some(1) => "1",
1790 Some(1..=10) => "10",
1791 Some(1..=25) => "25",
1792 Some(1..=50) => "50",
1793 Some(51..=100) => "100",
1794 Some(101..=500) => "500",
1795 _ => "> 500",
1796 },
1797 );
1798
1799 if rate_limits.is_limited() {
1800 let was_enforced =
1801 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
1802
1803 if was_enforced {
1804 self.inner
1806 .project_cache
1807 .get(scoping.project_key)
1808 .rate_limits()
1809 .merge(rate_limits);
1810 }
1811 }
1812 }
1813
1814 bucket_limiter.into_buckets()
1815 }
1816
1817 #[cfg(feature = "processing")]
1823 async fn encode_metrics_processing(
1824 &self,
1825 message: FlushBuckets,
1826 store_forwarder: &Addr<Store>,
1827 ) {
1828 use crate::constants::DEFAULT_EVENT_RETENTION;
1829 use crate::services::store::StoreMetrics;
1830
1831 for ProjectBuckets {
1832 buckets,
1833 scoping,
1834 project_info,
1835 ..
1836 } in message.buckets.into_values()
1837 {
1838 let buckets = self
1839 .rate_limit_buckets(scoping, &project_info, buckets)
1840 .await;
1841
1842 if buckets.is_empty() {
1843 continue;
1844 }
1845
1846 let retention = project_info
1847 .config
1848 .event_retention
1849 .unwrap_or(DEFAULT_EVENT_RETENTION);
1850
1851 store_forwarder.send(StoreMetrics {
1854 buckets,
1855 scoping,
1856 retention,
1857 });
1858 }
1859 }
1860
1861 fn encode_metrics_envelope(&self, message: FlushBuckets) {
1872 let FlushBuckets {
1873 partition_key,
1874 buckets,
1875 } = message;
1876
1877 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
1878 let upstream = self.inner.config.upstream();
1879
1880 for ProjectBuckets {
1881 buckets, scoping, ..
1882 } in buckets.values()
1883 {
1884 let dsn = PartialDsn::outbound(scoping, upstream);
1885
1886 relay_statsd::metric!(
1887 distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
1888 );
1889
1890 let mut num_batches = 0;
1891 for batch in BucketsView::from(buckets).by_size(batch_size) {
1892 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
1893
1894 let mut item = Item::new(ItemType::MetricBuckets);
1895 item.set_source_quantities(crate::metrics::extract_quantities(batch));
1896 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
1897 envelope.add_item(item);
1898
1899 let mut envelope =
1900 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1901 envelope
1902 .set_partition_key(Some(partition_key))
1903 .scope(*scoping);
1904
1905 relay_statsd::metric!(
1906 distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
1907 );
1908
1909 self.submit_envelope_upstream(envelope);
1910 num_batches += 1;
1911 }
1912
1913 relay_statsd::metric!(
1914 distribution(RelayDistributions::BatchesPerPartition) = num_batches
1915 );
1916 }
1917 }
1918
1919 fn send_global_partition(&self, partition_key: u32, partition: &mut Partition<'_>) {
1921 if partition.is_empty() {
1922 return;
1923 }
1924
1925 let (unencoded, project_info) = partition.take();
1926 let http_encoding = self.inner.config.http_encoding();
1927 let encoded = match encode_payload(&unencoded, http_encoding) {
1928 Ok(payload) => payload,
1929 Err(error) => {
1930 let error = &error as &dyn std::error::Error;
1931 relay_log::error!(error, "failed to encode metrics payload");
1932 return;
1933 }
1934 };
1935
1936 let request = SendMetricsRequest {
1937 partition_key: partition_key.to_string(),
1938 unencoded,
1939 encoded,
1940 project_info,
1941 http_encoding,
1942 metric_outcomes: self.inner.metric_outcomes.clone(),
1943 };
1944
1945 self.inner.addrs.upstream_relay.send(SendRequest(request));
1946 }
1947
1948 fn encode_metrics_global(&self, message: FlushBuckets) {
1959 let FlushBuckets {
1960 partition_key,
1961 buckets,
1962 } = message;
1963
1964 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
1965 let mut partition = Partition::new(batch_size);
1966 let mut partition_splits = 0;
1967
1968 for ProjectBuckets {
1969 buckets, scoping, ..
1970 } in buckets.values()
1971 {
1972 for bucket in buckets {
1973 let mut remaining = Some(BucketView::new(bucket));
1974
1975 while let Some(bucket) = remaining.take() {
1976 if let Some(next) = partition.insert(bucket, *scoping) {
1977 self.send_global_partition(partition_key, &mut partition);
1981 remaining = Some(next);
1982 partition_splits += 1;
1983 }
1984 }
1985 }
1986 }
1987
1988 if partition_splits > 0 {
1989 metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
1990 }
1991
1992 self.send_global_partition(partition_key, &mut partition);
1993 }
1994
1995 async fn handle_flush_buckets(&self, mut message: FlushBuckets) {
1996 for (project_key, pb) in message.buckets.iter_mut() {
1997 let buckets = std::mem::take(&mut pb.buckets);
1998 pb.buckets =
1999 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2000 }
2001
2002 #[cfg(feature = "processing")]
2003 if self.inner.config.processing_enabled()
2004 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2005 {
2006 return self
2007 .encode_metrics_processing(message, store_forwarder)
2008 .await;
2009 }
2010
2011 if self.inner.config.http_global_metrics() {
2012 self.encode_metrics_global(message)
2013 } else {
2014 self.encode_metrics_envelope(message)
2015 }
2016 }
2017
2018 #[cfg(all(test, feature = "processing"))]
2019 fn redis_rate_limiter_enabled(&self) -> bool {
2020 self.inner.rate_limiter.is_some()
2021 }
2022
2023 async fn handle_message(&self, message: EnvelopeProcessor) {
2024 let ty = message.variant();
2025 let feature_weights = self.feature_weights(&message);
2026
2027 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2028 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2029
2030 match message {
2031 EnvelopeProcessor::ProcessEnvelope(m) => {
2032 self.handle_process_envelope(&mut cogs, *m).await
2033 }
2034 EnvelopeProcessor::ProcessProjectMetrics(m) => {
2035 self.handle_process_metrics(&mut cogs, *m)
2036 }
2037 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2038 self.handle_process_batched_metrics(&mut cogs, *m)
2039 }
2040 EnvelopeProcessor::FlushBuckets(m) => self.handle_flush_buckets(*m).await,
2041 EnvelopeProcessor::SubmitClientReports(m) => self.handle_submit_client_reports(*m),
2042 }
2043 });
2044 }
2045
2046 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2047 match message {
2048 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2050 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2051 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2052 EnvelopeProcessor::FlushBuckets(v) => v
2053 .buckets
2054 .values()
2055 .map(|s| {
2056 if self.inner.config.processing_enabled() {
2057 relay_metrics::cogs::ByCount(&s.buckets).into()
2060 } else {
2061 relay_metrics::cogs::BySize(&s.buckets).into()
2062 }
2063 })
2064 .fold(FeatureWeights::none(), FeatureWeights::merge),
2065 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2066 }
2067 }
2068}
2069
2070impl Service for EnvelopeProcessorService {
2071 type Interface = EnvelopeProcessor;
2072
2073 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2074 while let Some(message) = rx.recv().await {
2075 let service = self.clone();
2076 self.inner
2077 .pool
2078 .spawn_async(
2079 async move {
2080 service.handle_message(message).await;
2081 }
2082 .boxed(),
2083 )
2084 .await;
2085 }
2086 }
2087}
2088
2089pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2090 let envelope_body: Vec<u8> = match http_encoding {
2091 HttpEncoding::Identity => return Ok(body.clone()),
2092 HttpEncoding::Deflate => {
2093 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2094 encoder.write_all(body.as_ref())?;
2095 encoder.finish()?
2096 }
2097 HttpEncoding::Gzip => {
2098 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2099 encoder.write_all(body.as_ref())?;
2100 encoder.finish()?
2101 }
2102 HttpEncoding::Br => {
2103 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2105 encoder.write_all(body.as_ref())?;
2106 encoder.into_inner()
2107 }
2108 HttpEncoding::Zstd => {
2109 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2112 encoder.write_all(body.as_ref())?;
2113 encoder.finish()?
2114 }
2115 };
2116
2117 Ok(envelope_body.into())
2118}
2119
2120#[derive(Debug)]
2122pub struct SendEnvelope {
2123 pub envelope: ManagedEnvelope,
2124 pub body: Bytes,
2125 pub http_encoding: HttpEncoding,
2126 pub project_cache: ProjectCacheHandle,
2127}
2128
2129impl UpstreamRequest for SendEnvelope {
2130 fn method(&self) -> reqwest::Method {
2131 reqwest::Method::POST
2132 }
2133
2134 fn path(&self) -> Cow<'_, str> {
2135 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2136 }
2137
2138 fn route(&self) -> &'static str {
2139 "envelope"
2140 }
2141
2142 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2143 let envelope_body = self.body.clone();
2144 metric!(
2145 distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2146 );
2147
2148 let meta = &self.envelope.meta();
2149 let shard = self.envelope.partition_key().map(|p| p.to_string());
2150 builder
2151 .content_encoding(self.http_encoding)
2152 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2153 .header_opt("User-Agent", meta.user_agent())
2154 .header("X-Sentry-Auth", meta.auth_header())
2155 .header("X-Forwarded-For", meta.forwarded_for())
2156 .header("Content-Type", envelope::CONTENT_TYPE)
2157 .header_opt("X-Sentry-Relay-Shard", shard)
2158 .body(envelope_body);
2159
2160 Ok(())
2161 }
2162
2163 fn sign(&mut self) -> Option<Sign> {
2164 Some(Sign::Optional(SignatureType::RequestSign))
2165 }
2166
2167 fn respond(
2168 self: Box<Self>,
2169 result: Result<http::Response, UpstreamRequestError>,
2170 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2171 Box::pin(async move {
2172 let result = match result {
2173 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2174 Err(error) => Err(error),
2175 };
2176
2177 match result {
2178 Ok(()) => self.envelope.accept(),
2179 Err(error) if error.is_received() => {
2180 let scoping = self.envelope.scoping();
2181 self.envelope.accept();
2182
2183 if let UpstreamRequestError::RateLimited(limits) = error {
2184 self.project_cache
2185 .get(scoping.project_key)
2186 .rate_limits()
2187 .merge(limits.scope(&scoping));
2188 }
2189 }
2190 Err(error) => {
2191 let mut envelope = self.envelope;
2194 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2195 relay_log::error!(
2196 error = &error as &dyn Error,
2197 tags.project_key = %envelope.scoping().project_key,
2198 "error sending envelope"
2199 );
2200 }
2201 }
2202 })
2203 }
2204}
2205
2206#[derive(Debug)]
2213struct Partition<'a> {
2214 max_size: usize,
2215 remaining: usize,
2216 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2217 project_info: HashMap<ProjectKey, Scoping>,
2218}
2219
2220impl<'a> Partition<'a> {
2221 pub fn new(size: usize) -> Self {
2223 Self {
2224 max_size: size,
2225 remaining: size,
2226 views: HashMap::new(),
2227 project_info: HashMap::new(),
2228 }
2229 }
2230
2231 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
2242 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
2243
2244 if let Some(current) = current {
2245 self.remaining = self.remaining.saturating_sub(current.estimated_size());
2246 self.views
2247 .entry(scoping.project_key)
2248 .or_default()
2249 .push(current);
2250
2251 self.project_info
2252 .entry(scoping.project_key)
2253 .or_insert(scoping);
2254 }
2255
2256 next
2257 }
2258
2259 fn is_empty(&self) -> bool {
2261 self.views.is_empty()
2262 }
2263
2264 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
2268 #[derive(serde::Serialize)]
2269 struct Wrapper<'a> {
2270 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
2271 }
2272
2273 let buckets = &self.views;
2274 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
2275
2276 let scopings = self.project_info.clone();
2277 self.project_info.clear();
2278
2279 self.views.clear();
2280 self.remaining = self.max_size;
2281
2282 (payload, scopings)
2283 }
2284}
2285
2286#[derive(Debug)]
2290struct SendMetricsRequest {
2291 partition_key: String,
2293 unencoded: Bytes,
2295 encoded: Bytes,
2297 project_info: HashMap<ProjectKey, Scoping>,
2301 http_encoding: HttpEncoding,
2303 metric_outcomes: MetricOutcomes,
2305}
2306
2307impl SendMetricsRequest {
2308 fn create_error_outcomes(self) {
2309 #[derive(serde::Deserialize)]
2310 struct Wrapper {
2311 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
2312 }
2313
2314 let buckets = match serde_json::from_slice(&self.unencoded) {
2315 Ok(Wrapper { buckets }) => buckets,
2316 Err(err) => {
2317 relay_log::error!(
2318 error = &err as &dyn std::error::Error,
2319 "failed to parse buckets from failed transmission"
2320 );
2321 return;
2322 }
2323 };
2324
2325 for (key, buckets) in buckets {
2326 let Some(&scoping) = self.project_info.get(&key) else {
2327 relay_log::error!("missing scoping for project key");
2328 continue;
2329 };
2330
2331 self.metric_outcomes.track(
2332 scoping,
2333 &buckets,
2334 Outcome::Invalid(DiscardReason::Internal),
2335 );
2336 }
2337 }
2338}
2339
2340impl UpstreamRequest for SendMetricsRequest {
2341 fn set_relay_id(&self) -> bool {
2342 true
2343 }
2344
2345 fn sign(&mut self) -> Option<Sign> {
2346 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
2347 }
2348
2349 fn method(&self) -> reqwest::Method {
2350 reqwest::Method::POST
2351 }
2352
2353 fn path(&self) -> Cow<'_, str> {
2354 "/api/0/relays/metrics/".into()
2355 }
2356
2357 fn route(&self) -> &'static str {
2358 "global_metrics"
2359 }
2360
2361 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2362 metric!(
2363 distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
2364 );
2365
2366 builder
2367 .content_encoding(self.http_encoding)
2368 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
2369 .header(header::CONTENT_TYPE, b"application/json")
2370 .body(self.encoded.clone());
2371
2372 Ok(())
2373 }
2374
2375 fn respond(
2376 self: Box<Self>,
2377 result: Result<http::Response, UpstreamRequestError>,
2378 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2379 Box::pin(async {
2380 match result {
2381 Ok(mut response) => {
2382 response.consume().await.ok();
2383 }
2384 Err(error) => {
2385 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
2386
2387 if error.is_received() {
2390 return;
2391 }
2392
2393 self.create_error_outcomes()
2394 }
2395 }
2396 })
2397 }
2398}
2399
2400#[derive(Copy, Clone, Debug)]
2402#[cfg(feature = "processing")]
2403struct CombinedQuotas<'a> {
2404 global_quotas: &'a [Quota],
2405 project_quotas: &'a [Quota],
2406}
2407
2408#[cfg(feature = "processing")]
2409impl<'a> CombinedQuotas<'a> {
2410 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
2412 Self {
2413 global_quotas: &global_config.quotas,
2414 project_quotas,
2415 }
2416 }
2417}
2418
2419#[cfg(feature = "processing")]
2420impl<'a> IntoIterator for CombinedQuotas<'a> {
2421 type Item = &'a Quota;
2422 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
2423
2424 fn into_iter(self) -> Self::IntoIter {
2425 self.global_quotas.iter().chain(self.project_quotas.iter())
2426 }
2427}
2428
2429#[cfg(test)]
2430mod tests {
2431 use insta::assert_debug_snapshot;
2432 use relay_common::glob2::LazyGlob;
2433 use relay_dynamic_config::ProjectConfig;
2434 use relay_event_normalization::{
2435 NormalizationConfig, RedactionRule, TransactionNameConfig, TransactionNameRule,
2436 };
2437 use relay_event_schema::protocol::TransactionSource;
2438 use relay_pii::DataScrubbingConfig;
2439 use similar_asserts::assert_eq;
2440
2441 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
2442
2443 #[cfg(feature = "processing")]
2444 use {
2445 relay_metrics::BucketValue,
2446 relay_quotas::{QuotaScope, ReasonCode},
2447 relay_test::mock_service,
2448 };
2449
2450 use super::*;
2451
2452 #[cfg(feature = "processing")]
2453 fn mock_quota(id: &str) -> Quota {
2454 Quota {
2455 id: Some(id.into()),
2456 categories: [DataCategory::MetricBucket].into(),
2457 scope: QuotaScope::Organization,
2458 scope_id: None,
2459 limit: Some(0),
2460 window: None,
2461 reason_code: None,
2462 namespace: None,
2463 }
2464 }
2465
2466 #[cfg(feature = "processing")]
2467 #[test]
2468 fn test_dynamic_quotas() {
2469 let global_config = relay_dynamic_config::GlobalConfig {
2470 quotas: vec![mock_quota("foo"), mock_quota("bar")],
2471 ..Default::default()
2472 };
2473
2474 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
2475
2476 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
2477
2478 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
2479 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
2480 }
2481
2482 #[cfg(feature = "processing")]
2485 #[tokio::test]
2486 async fn test_ratelimit_per_batch() {
2487 use relay_base_schema::organization::OrganizationId;
2488 use relay_protocol::FiniteF64;
2489
2490 let rate_limited_org = Scoping {
2491 organization_id: OrganizationId::new(1),
2492 project_id: ProjectId::new(21),
2493 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
2494 key_id: Some(17),
2495 };
2496
2497 let not_rate_limited_org = Scoping {
2498 organization_id: OrganizationId::new(2),
2499 project_id: ProjectId::new(21),
2500 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
2501 key_id: Some(17),
2502 };
2503
2504 let message = {
2505 let project_info = {
2506 let quota = Quota {
2507 id: Some("testing".into()),
2508 categories: [DataCategory::MetricBucket].into(),
2509 scope: relay_quotas::QuotaScope::Organization,
2510 scope_id: Some(rate_limited_org.organization_id.to_string().into()),
2511 limit: Some(0),
2512 window: None,
2513 reason_code: Some(ReasonCode::new("test")),
2514 namespace: None,
2515 };
2516
2517 let mut config = ProjectConfig::default();
2518 config.quotas.push(quota);
2519
2520 Arc::new(ProjectInfo {
2521 config,
2522 ..Default::default()
2523 })
2524 };
2525
2526 let project_metrics = |scoping| ProjectBuckets {
2527 buckets: vec![Bucket {
2528 name: "d:spans/bar".into(),
2529 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
2530 timestamp: UnixTimestamp::now(),
2531 tags: Default::default(),
2532 width: 10,
2533 metadata: BucketMetadata::default(),
2534 }],
2535 rate_limits: Default::default(),
2536 project_info: project_info.clone(),
2537 scoping,
2538 };
2539
2540 let buckets = hashbrown::HashMap::from([
2541 (
2542 rate_limited_org.project_key,
2543 project_metrics(rate_limited_org),
2544 ),
2545 (
2546 not_rate_limited_org.project_key,
2547 project_metrics(not_rate_limited_org),
2548 ),
2549 ]);
2550
2551 FlushBuckets {
2552 partition_key: 0,
2553 buckets,
2554 }
2555 };
2556
2557 assert_eq!(message.buckets.keys().count(), 2);
2559
2560 let config = {
2561 let config_json = serde_json::json!({
2562 "processing": {
2563 "enabled": true,
2564 "kafka_config": [],
2565 "redis": {
2566 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
2567 }
2568 }
2569 });
2570 Config::from_json_value(config_json).unwrap()
2571 };
2572
2573 let (store, handle) = {
2574 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
2575 let org_id = match msg {
2576 Store::Metrics(x) => x.scoping.organization_id,
2577 _ => panic!("received envelope when expecting only metrics"),
2578 };
2579 org_ids.push(org_id);
2580 };
2581
2582 mock_service("store_forwarder", vec![], f)
2583 };
2584
2585 let processor = create_test_processor(config).await;
2586 assert!(processor.redis_rate_limiter_enabled());
2587
2588 processor.encode_metrics_processing(message, &store).await;
2589
2590 drop(store);
2591 let orgs_not_ratelimited = handle.await.unwrap();
2592
2593 assert_eq!(
2594 orgs_not_ratelimited,
2595 vec![not_rate_limited_org.organization_id]
2596 );
2597 }
2598
2599 #[tokio::test]
2600 async fn test_browser_version_extraction_with_pii_like_data() {
2601 let processor = create_test_processor(Default::default()).await;
2602 let outcome_aggregator = Addr::dummy();
2603 let event_id = EventId::new();
2604
2605 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2606 .parse()
2607 .unwrap();
2608
2609 let request_meta = RequestMeta::new(dsn);
2610 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
2611
2612 envelope.add_item({
2613 let mut item = Item::new(ItemType::Event);
2614 item.set_payload(
2615 ContentType::Json,
2616 r#"
2617 {
2618 "request": {
2619 "headers": [
2620 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
2621 ]
2622 }
2623 }
2624 "#,
2625 );
2626 item
2627 });
2628
2629 let mut datascrubbing_settings = DataScrubbingConfig::default();
2630 datascrubbing_settings.scrub_data = true;
2632 datascrubbing_settings.scrub_defaults = true;
2633 datascrubbing_settings.scrub_ip_addresses = true;
2634
2635 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
2637
2638 let config = ProjectConfig {
2639 datascrubbing_settings,
2640 pii_config: Some(pii_config),
2641 ..Default::default()
2642 };
2643
2644 let project_info = ProjectInfo {
2645 config,
2646 ..Default::default()
2647 };
2648
2649 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
2650 assert_eq!(envelopes.len(), 1);
2651
2652 let (group, envelope) = envelopes.pop().unwrap();
2653 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2654
2655 let message = ProcessEnvelopeGrouped {
2656 group,
2657 envelope,
2658 ctx: processing::Context {
2659 project_info: &project_info,
2660 ..processing::Context::for_test()
2661 },
2662 };
2663
2664 let Ok(Some((output, ctx))) = processor.process(message).await else {
2665 panic!();
2666 };
2667 let new_envelope = output.serialize_envelope(ctx).unwrap().accept(|f| f);
2668
2669 let event_item = new_envelope.items().last().unwrap();
2670 let annotated_event: Annotated<Event> =
2671 Annotated::from_json_bytes(&event_item.payload()).unwrap();
2672 let event = annotated_event.into_value().unwrap();
2673 let headers = event
2674 .request
2675 .into_value()
2676 .unwrap()
2677 .headers
2678 .into_value()
2679 .unwrap();
2680
2681 assert_eq!(
2683 Some(
2684 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
2685 ),
2686 headers.get_header("User-Agent")
2687 );
2688 let contexts = event.contexts.into_value().unwrap();
2690 let browser = contexts.0.get("browser").unwrap();
2691 assert_eq!(
2692 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
2693 browser.to_json().unwrap()
2694 );
2695 }
2696
2697 #[tokio::test]
2698 #[cfg(feature = "processing")]
2699 async fn test_materialize_dsc() {
2700 use crate::services::projects::project::PublicKeyConfig;
2701
2702 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2703 .parse()
2704 .unwrap();
2705 let request_meta = RequestMeta::new(dsn);
2706 let mut envelope = Envelope::from_request(None, request_meta);
2707
2708 let dsc = r#"{
2709 "trace_id": "00000000-0000-0000-0000-000000000000",
2710 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
2711 "sample_rate": "0.2"
2712 }"#;
2713 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
2714
2715 let mut item = Item::new(ItemType::Event);
2716 item.set_payload(ContentType::Json, r#"{}"#);
2717 envelope.add_item(item);
2718
2719 let outcome_aggregator = Addr::dummy();
2720 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2721
2722 let mut project_info = ProjectInfo::default();
2723 project_info.public_keys.push(PublicKeyConfig {
2724 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
2725 numeric_id: Some(1),
2726 });
2727
2728 let config = serde_json::json!({
2729 "processing": {
2730 "enabled": true,
2731 "kafka_config": [],
2732 }
2733 });
2734
2735 let message = ProcessEnvelopeGrouped {
2736 group: ProcessingGroup::Error,
2737 envelope: managed_envelope,
2738 ctx: processing::Context {
2739 config: &Config::from_json_value(config.clone()).unwrap(),
2740 project_info: &project_info,
2741 sampling_project_info: Some(&project_info),
2742 ..processing::Context::for_test()
2743 },
2744 };
2745
2746 let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
2747 let Ok(Some((output, ctx))) = processor.process(message).await else {
2748 panic!();
2749 };
2750 let envelope = output.serialize_envelope(ctx).unwrap();
2751 let event = envelope
2752 .get_item_by(|item| item.ty() == &ItemType::Event)
2753 .unwrap();
2754
2755 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
2756 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
2757 Object(
2758 {
2759 "environment": ~,
2760 "public_key": String(
2761 "e12d836b15bb49d7bbf99e64295d995b",
2762 ),
2763 "release": ~,
2764 "replay_id": ~,
2765 "sample_rate": String(
2766 "0.2",
2767 ),
2768 "trace_id": String(
2769 "00000000000000000000000000000000",
2770 ),
2771 "transaction": ~,
2772 },
2773 )
2774 "###);
2775 }
2776
2777 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
2778 let mut event = Annotated::<Event>::from_json(
2779 r#"
2780 {
2781 "type": "transaction",
2782 "transaction": "/foo/",
2783 "timestamp": 946684810.0,
2784 "start_timestamp": 946684800.0,
2785 "contexts": {
2786 "trace": {
2787 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
2788 "span_id": "fa90fdead5f74053",
2789 "op": "http.server",
2790 "type": "trace"
2791 }
2792 },
2793 "transaction_info": {
2794 "source": "url"
2795 }
2796 }
2797 "#,
2798 )
2799 .unwrap();
2800 let e = event.value_mut().as_mut().unwrap();
2801 e.transaction.set_value(Some(transaction_name.into()));
2802
2803 e.transaction_info
2804 .value_mut()
2805 .as_mut()
2806 .unwrap()
2807 .source
2808 .set_value(Some(source));
2809
2810 relay_statsd::with_capturing_test_client(|| {
2811 utils::log_transaction_name_metrics(&mut event, |event| {
2812 let config = NormalizationConfig {
2813 transaction_name_config: TransactionNameConfig {
2814 rules: &[TransactionNameRule {
2815 pattern: LazyGlob::new("/foo/*/**".to_owned()),
2816 expiry: DateTime::<Utc>::MAX_UTC,
2817 redaction: RedactionRule::Replace {
2818 substitution: "*".to_owned(),
2819 },
2820 }],
2821 },
2822 ..Default::default()
2823 };
2824 relay_event_normalization::normalize_event(event, &config)
2825 });
2826 })
2827 }
2828
2829 #[test]
2830 fn test_log_transaction_metrics_none() {
2831 let captures = capture_test_event("/nothing", TransactionSource::Url);
2832 insta::assert_debug_snapshot!(captures, @r###"
2833 [
2834 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
2835 ]
2836 "###);
2837 }
2838
2839 #[test]
2840 fn test_log_transaction_metrics_rule() {
2841 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
2842 insta::assert_debug_snapshot!(captures, @r###"
2843 [
2844 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
2845 ]
2846 "###);
2847 }
2848
2849 #[test]
2850 fn test_log_transaction_metrics_pattern() {
2851 let captures = capture_test_event("/something/12345", TransactionSource::Url);
2852 insta::assert_debug_snapshot!(captures, @r###"
2853 [
2854 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
2855 ]
2856 "###);
2857 }
2858
2859 #[test]
2860 fn test_log_transaction_metrics_both() {
2861 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
2862 insta::assert_debug_snapshot!(captures, @r###"
2863 [
2864 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
2865 ]
2866 "###);
2867 }
2868
2869 #[test]
2870 fn test_log_transaction_metrics_no_match() {
2871 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
2872 insta::assert_debug_snapshot!(captures, @r###"
2873 [
2874 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
2875 ]
2876 "###);
2877 }
2878
2879 #[tokio::test]
2880 async fn test_process_metrics_bucket_metadata() {
2881 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
2882 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
2883 let received_at = Utc::now();
2884 let config = Config::default();
2885
2886 let (aggregator, mut aggregator_rx) = Addr::custom();
2887 let processor = create_test_processor_with_addrs(
2888 config,
2889 Addrs {
2890 aggregator,
2891 ..Default::default()
2892 },
2893 )
2894 .await;
2895
2896 let mut item = Item::new(ItemType::Statsd);
2897 item.set_payload(ContentType::Text, "spans/foo:3182887624:4267882815|s");
2898 for (source, expected_received_at) in [
2899 (
2900 BucketSource::External,
2901 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
2902 ),
2903 (BucketSource::Internal, None),
2904 ] {
2905 let message = ProcessMetrics {
2906 data: MetricData::Raw(vec![item.clone()]),
2907 project_key,
2908 source,
2909 received_at,
2910 sent_at: Some(Utc::now()),
2911 };
2912 processor.handle_process_metrics(&mut token, message);
2913
2914 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
2915 let buckets = merge_buckets.buckets;
2916 assert_eq!(buckets.len(), 1);
2917 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
2918 }
2919 }
2920
2921 #[tokio::test]
2922 async fn test_process_batched_metrics() {
2923 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
2924 let received_at = Utc::now();
2925 let config = Config::default();
2926
2927 let (aggregator, mut aggregator_rx) = Addr::custom();
2928 let processor = create_test_processor_with_addrs(
2929 config,
2930 Addrs {
2931 aggregator,
2932 ..Default::default()
2933 },
2934 )
2935 .await;
2936
2937 let payload = r#"{
2938 "buckets": {
2939 "11111111111111111111111111111111": [
2940 {
2941 "timestamp": 1615889440,
2942 "width": 0,
2943 "name": "d:custom/endpoint.response_time@millisecond",
2944 "type": "d",
2945 "value": [
2946 68.0
2947 ],
2948 "tags": {
2949 "route": "user_index"
2950 }
2951 }
2952 ],
2953 "22222222222222222222222222222222": [
2954 {
2955 "timestamp": 1615889440,
2956 "width": 0,
2957 "name": "d:custom/endpoint.cache_rate@none",
2958 "type": "d",
2959 "value": [
2960 36.0
2961 ]
2962 }
2963 ]
2964 }
2965}
2966"#;
2967 let message = ProcessBatchedMetrics {
2968 payload: Bytes::from(payload),
2969 source: BucketSource::Internal,
2970 received_at,
2971 sent_at: Some(Utc::now()),
2972 };
2973 processor.handle_process_batched_metrics(&mut token, message);
2974
2975 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
2976 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
2977
2978 let mut messages = vec![mb1, mb2];
2979 messages.sort_by_key(|mb| mb.project_key);
2980
2981 let actual = messages
2982 .into_iter()
2983 .map(|mb| (mb.project_key, mb.buckets))
2984 .collect::<Vec<_>>();
2985
2986 assert_debug_snapshot!(actual, @r###"
2987 [
2988 (
2989 ProjectKey("11111111111111111111111111111111"),
2990 [
2991 Bucket {
2992 timestamp: UnixTimestamp(1615889440),
2993 width: 0,
2994 name: MetricName(
2995 "d:custom/endpoint.response_time@millisecond",
2996 ),
2997 value: Distribution(
2998 [
2999 68.0,
3000 ],
3001 ),
3002 tags: {
3003 "route": "user_index",
3004 },
3005 metadata: BucketMetadata {
3006 merges: 1,
3007 received_at: None,
3008 extracted_from_indexed: false,
3009 },
3010 },
3011 ],
3012 ),
3013 (
3014 ProjectKey("22222222222222222222222222222222"),
3015 [
3016 Bucket {
3017 timestamp: UnixTimestamp(1615889440),
3018 width: 0,
3019 name: MetricName(
3020 "d:custom/endpoint.cache_rate@none",
3021 ),
3022 value: Distribution(
3023 [
3024 36.0,
3025 ],
3026 ),
3027 tags: {},
3028 metadata: BucketMetadata {
3029 merges: 1,
3030 received_at: None,
3031 extracted_from_indexed: false,
3032 },
3033 },
3034 ],
3035 ),
3036 ]
3037 "###);
3038 }
3039}