1use std::borrow::Cow;
2use std::collections::{BTreeMap, BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, UpstreamDescriptor};
23use relay_dynamic_config::Feature;
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{ClientReport, EventId, SpanV2};
27use relay_filter::FilterStatKey;
28use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
29use relay_quotas::{DataCategory, RateLimits, Scoping};
30use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision};
31use relay_statsd::metric;
32use relay_system::{Addr, FromMessage, NoResponse, Service};
33use reqwest::header;
34use smallvec::{SmallVec, smallvec};
35use zstd::stream::Encoder as ZstdEncoder;
36
37use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType};
38use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
39use crate::integrations::Integration;
40use crate::managed::ManagedEnvelope;
41use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
42use crate::metrics_extraction::ExtractedMetrics;
43use crate::processing::attachments::AttachmentProcessor;
44use crate::processing::check_ins::CheckInsProcessor;
45use crate::processing::client_reports::ClientReportsProcessor;
46use crate::processing::errors::{ErrorsProcessor, SwitchProcessingError};
47use crate::processing::forward_unknown::ForwardUnknownProcessor;
48use crate::processing::legacy_spans::LegacySpansProcessor;
49use crate::processing::logs::LogsProcessor;
50use crate::processing::profile_chunks::ProfileChunksProcessor;
51use crate::processing::profiles::ProfilesProcessor;
52use crate::processing::replays::ReplaysProcessor;
53use crate::processing::sessions::SessionsProcessor;
54use crate::processing::spans::SpansProcessor;
55use crate::processing::trace_attachments::TraceAttachmentsProcessor;
56use crate::processing::trace_metrics::TraceMetricsProcessor;
57use crate::processing::transactions::TransactionProcessor;
58use crate::processing::user_reports::UserReportsProcessor;
59use crate::processing::{Forward as _, ForwardContext, Output, Outputs, QuotaRateLimiter};
60use crate::service::ServiceError;
61use crate::services::global_config::GlobalConfigHandle;
62use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
63use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
64use crate::services::projects::cache::ProjectCacheHandle;
65use crate::services::projects::project::{ProjectInfo, ProjectState};
66use crate::services::upstream::{
67 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
68};
69use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
70use crate::utils;
71use crate::{http, processing};
72use relay_threading::AsyncPool;
73use symbolic_unreal::{Unreal4Error, Unreal4ErrorKind};
74#[cfg(feature = "processing")]
75use {
76 crate::services::objectstore::Objectstore,
77 crate::services::store::Store,
78 itertools::Itertools,
79 relay_dynamic_config::GlobalConfig,
80 relay_quotas::{Quota, RateLimitingError, RedisRateLimiter},
81 relay_redis::RedisClients,
82 std::time::Instant,
83};
84
85mod metrics;
86
87pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
89
90#[derive(Debug)]
91pub struct GroupTypeError;
92
93impl Display for GroupTypeError {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 f.write_str("failed to convert processing group into corresponding type")
96 }
97}
98
99impl std::error::Error for GroupTypeError {}
100
101macro_rules! processing_group {
102 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
103 #[derive(Clone, Copy, Debug)]
104 pub struct $ty;
105
106 impl From<$ty> for ProcessingGroup {
107 fn from(_: $ty) -> Self {
108 ProcessingGroup::$variant
109 }
110 }
111
112 impl TryFrom<ProcessingGroup> for $ty {
113 type Error = GroupTypeError;
114
115 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
116 if matches!(value, ProcessingGroup::$variant) {
117 return Ok($ty);
118 }
119 $($(
120 if matches!(value, ProcessingGroup::$other) {
121 return Ok($ty);
122 }
123 )+)?
124 return Err(GroupTypeError);
125 }
126 }
127 };
128}
129
130processing_group!(TransactionGroup, Transaction);
131
132processing_group!(ErrorGroup, Error);
133
134processing_group!(SessionGroup, Session);
135processing_group!(ClientReportGroup, ClientReport);
136processing_group!(ReplayGroup, Replay);
137processing_group!(CheckInGroup, CheckIn);
138processing_group!(TraceMetricGroup, TraceMetric);
139processing_group!(SpanGroup, Span);
140
141processing_group!(ProfileChunkGroup, ProfileChunk);
142processing_group!(ForwardUnknownGroup, ForwardUnknown);
143processing_group!(Ungrouped, Ungrouped);
144
145#[derive(Clone, Copy, Debug)]
147pub enum ProcessingGroup {
148 Transaction,
152 Error,
157 Session,
159 StandaloneAttachments,
163 StandaloneUserReports,
167 StandaloneProfiles,
171 ClientReport,
173 Replay,
175 CheckIn,
177 Log,
179 TraceMetric,
181 Span,
183 SpanV2,
185 ProfileChunk,
187 TraceAttachment,
189 ForwardUnknown,
192 Ungrouped,
194}
195
196impl ProcessingGroup {
197 fn split_envelope(
199 mut envelope: Envelope,
200 project_info: &ProjectInfo,
201 ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
202 let headers = envelope.headers().clone();
203 let mut grouped_envelopes = smallvec![];
204
205 let replay_items = envelope.take_items_by(|item| {
207 matches!(
208 item.ty(),
209 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
210 )
211 });
212 if !replay_items.is_empty() {
213 grouped_envelopes.push((
214 ProcessingGroup::Replay,
215 Envelope::from_parts(headers.clone(), replay_items),
216 ))
217 }
218
219 let session_items = envelope
221 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
222 if !session_items.is_empty() {
223 grouped_envelopes.push((
224 ProcessingGroup::Session,
225 Envelope::from_parts(headers.clone(), session_items),
226 ))
227 }
228
229 let span_v2_items = envelope.take_items_by(|item| {
230 let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
231
232 ItemContainer::<SpanV2>::is_container(item)
233 || matches!(item.integration(), Some(Integration::Spans(_)))
234 || (exp_feature && matches!(item.ty(), &ItemType::Span))
236 || (exp_feature && item.is_span_attachment())
237 });
238
239 if !span_v2_items.is_empty() {
240 grouped_envelopes.push((
241 ProcessingGroup::SpanV2,
242 Envelope::from_parts(headers.clone(), span_v2_items),
243 ))
244 }
245
246 let span_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Span));
248 if !span_items.is_empty() {
249 grouped_envelopes.push((
250 ProcessingGroup::Span,
251 Envelope::from_parts(headers.clone(), span_items),
252 ))
253 }
254
255 let logs_items = envelope.take_items_by(|item| {
257 matches!(item.ty(), &ItemType::Log)
258 || matches!(item.integration(), Some(Integration::Logs(_)))
259 });
260 if !logs_items.is_empty() {
261 grouped_envelopes.push((
262 ProcessingGroup::Log,
263 Envelope::from_parts(headers.clone(), logs_items),
264 ))
265 }
266
267 let trace_metric_items =
269 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
270 if !trace_metric_items.is_empty() {
271 grouped_envelopes.push((
272 ProcessingGroup::TraceMetric,
273 Envelope::from_parts(headers.clone(), trace_metric_items),
274 ))
275 }
276
277 let profile_chunk_items =
279 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
280 if !profile_chunk_items.is_empty() {
281 grouped_envelopes.push((
282 ProcessingGroup::ProfileChunk,
283 Envelope::from_parts(headers.clone(), profile_chunk_items),
284 ))
285 }
286
287 let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
288 if !trace_attachment_items.is_empty() {
289 grouped_envelopes.push((
290 ProcessingGroup::TraceAttachment,
291 Envelope::from_parts(headers.clone(), trace_attachment_items),
292 ))
293 }
294
295 if !envelope.items().any(Item::creates_event) {
296 let standalone_attachments = envelope
298 .take_items_by(|i| i.requires_event() && matches!(i.ty(), ItemType::Attachment));
299 if !standalone_attachments.is_empty() {
300 grouped_envelopes.push((
301 ProcessingGroup::StandaloneAttachments,
302 Envelope::from_parts(headers.clone(), standalone_attachments),
303 ))
304 }
305
306 let standalone_user_reports =
308 envelope.take_items_by(|i| matches!(i.ty(), ItemType::UserReport));
309 if !standalone_user_reports.is_empty() {
310 grouped_envelopes.push((
311 ProcessingGroup::StandaloneUserReports,
312 Envelope::from_parts(headers.clone(), standalone_user_reports),
313 ));
314 }
315
316 let standalone_profiles =
318 envelope.take_items_by(|i| matches!(i.ty(), ItemType::Profile));
319 if !standalone_profiles.is_empty() {
320 grouped_envelopes.push((
321 ProcessingGroup::StandaloneProfiles,
322 Envelope::from_parts(headers.clone(), standalone_profiles),
323 ));
324 }
325 }
326
327 let security_reports_items = envelope
329 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
330 .into_iter()
331 .map(|item| {
332 let headers = headers.clone();
333 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
334 let mut envelope = Envelope::from_parts(headers, items);
335 envelope.set_event_id(EventId::new());
336 (ProcessingGroup::Error, envelope)
337 });
338 grouped_envelopes.extend(security_reports_items);
339
340 let require_event_items = envelope.take_items_by(Item::requires_event);
342 if !require_event_items.is_empty() {
343 let group = if require_event_items
344 .iter()
345 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
346 {
347 ProcessingGroup::Transaction
348 } else {
349 ProcessingGroup::Error
350 };
351
352 grouped_envelopes.push((
353 group,
354 Envelope::from_parts(headers.clone(), require_event_items),
355 ))
356 }
357
358 let envelopes = envelope.items_mut().map(|item| {
360 let headers = headers.clone();
361 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
362 let envelope = Envelope::from_parts(headers, items);
363 let item_type = item.ty();
364 let group = if matches!(item_type, &ItemType::CheckIn) {
365 ProcessingGroup::CheckIn
366 } else if matches!(item.ty(), &ItemType::ClientReport) {
367 ProcessingGroup::ClientReport
368 } else if matches!(item_type, &ItemType::Unknown(_)) {
369 ProcessingGroup::ForwardUnknown
370 } else {
371 ProcessingGroup::Ungrouped
373 };
374
375 (group, envelope)
376 });
377 grouped_envelopes.extend(envelopes);
378
379 grouped_envelopes
380 }
381
382 pub fn variant(&self) -> &'static str {
384 match self {
385 ProcessingGroup::Transaction => "transaction",
386 ProcessingGroup::Error => "error",
387 ProcessingGroup::Session => "session",
388 ProcessingGroup::StandaloneAttachments => "standalone_attachment",
389 ProcessingGroup::StandaloneUserReports => "standalone_user_reports",
390 ProcessingGroup::StandaloneProfiles => "standalone_profiles",
391 ProcessingGroup::ClientReport => "client_report",
392 ProcessingGroup::Replay => "replay",
393 ProcessingGroup::CheckIn => "check_in",
394 ProcessingGroup::Log => "log",
395 ProcessingGroup::TraceMetric => "trace_metric",
396 ProcessingGroup::Span => "span",
397 ProcessingGroup::SpanV2 => "span_v2",
398 ProcessingGroup::ProfileChunk => "profile_chunk",
399 ProcessingGroup::TraceAttachment => "trace_attachment",
400 ProcessingGroup::ForwardUnknown => "forward_unknown",
401 ProcessingGroup::Ungrouped => "ungrouped",
402 }
403 }
404}
405
406impl From<ProcessingGroup> for AppFeature {
407 fn from(value: ProcessingGroup) -> Self {
408 match value {
409 ProcessingGroup::Transaction => AppFeature::Transactions,
410 ProcessingGroup::Error => AppFeature::Errors,
411 ProcessingGroup::Session => AppFeature::Sessions,
412 ProcessingGroup::StandaloneAttachments => AppFeature::UnattributedEnvelope,
413 ProcessingGroup::StandaloneUserReports => AppFeature::UserReports,
414 ProcessingGroup::StandaloneProfiles => AppFeature::Profiles,
415 ProcessingGroup::ClientReport => AppFeature::ClientReports,
416 ProcessingGroup::Replay => AppFeature::Replays,
417 ProcessingGroup::CheckIn => AppFeature::CheckIns,
418 ProcessingGroup::Log => AppFeature::Logs,
419 ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
420 ProcessingGroup::Span => AppFeature::Spans,
421 ProcessingGroup::SpanV2 => AppFeature::Spans,
422 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
423 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
424 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
425 ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
426 }
427 }
428}
429
430#[derive(Debug, thiserror::Error)]
432pub enum ProcessingError {
433 #[error("invalid json in event")]
434 InvalidJson(#[source] serde_json::Error),
435
436 #[error("invalid message pack event payload")]
437 InvalidMsgpack(#[from] rmp_serde::decode::Error),
438
439 #[error("invalid unreal crash report")]
440 InvalidUnrealReport(#[source] Unreal4Error),
441
442 #[error("event payload too large")]
443 PayloadTooLarge(DiscardItemType),
444
445 #[error("invalid transaction event")]
446 InvalidTransaction,
447
448 #[error("the item is not allowed/supported in this envelope")]
449 UnsupportedItem,
450
451 #[error("envelope processor failed")]
452 ProcessingFailed(#[from] ProcessingAction),
453
454 #[error("duplicate {0} in event")]
455 DuplicateItem(ItemType),
456
457 #[error("failed to extract event payload")]
458 NoEventPayload,
459
460 #[error("missing project id in DSN")]
461 MissingProjectId,
462
463 #[error("invalid security report type: {0:?}")]
464 InvalidSecurityType(Bytes),
465
466 #[error("unsupported security report type")]
467 UnsupportedSecurityType,
468
469 #[error("invalid security report")]
470 InvalidSecurityReport(#[source] serde_json::Error),
471
472 #[error("event filtered with reason: {0:?}")]
473 EventFiltered(FilterStatKey),
474
475 #[error("could not serialize event payload")]
476 SerializeFailed(#[source] serde_json::Error),
477
478 #[cfg(feature = "processing")]
479 #[error("failed to apply quotas")]
480 QuotasFailed(#[from] RateLimitingError),
481
482 #[error("nintendo switch dying message processing failed {0:?}")]
483 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
484
485 #[cfg(all(sentry, feature = "processing"))]
486 #[error("playstation dump processing failed: {0}")]
487 InvalidPlaystationDump(String),
488
489 #[error("processing group does not match specific processor")]
490 ProcessingGroupMismatch,
491 #[error("new processing pipeline failed")]
492 ProcessingFailure,
493
494 #[cfg(feature = "processing")]
495 #[error("invalid attachment reference")]
496 InvalidAttachmentRef,
497
498 #[error("could not determine processing group for envelope items")]
499 NoProcessingGroup,
500}
501
502impl ProcessingError {
503 pub fn to_outcome(&self) -> Option<Outcome> {
504 match self {
505 Self::PayloadTooLarge(payload_type) => {
506 Some(Outcome::Invalid(DiscardReason::TooLarge(*payload_type)))
507 }
508 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
509 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
510 Self::InvalidSecurityType(_) => {
511 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
512 }
513 Self::UnsupportedItem => Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)),
514 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
515 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
516 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
517 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
518 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
519 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
520 #[cfg(all(sentry, feature = "processing"))]
521 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
522 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
523 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
524 }
525 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
526 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
527 Some(Outcome::Invalid(DiscardReason::Internal))
528 }
529 #[cfg(feature = "processing")]
530 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
531 Self::MissingProjectId => None,
532 Self::EventFiltered(key) => Some(Outcome::Filtered(key.clone())),
533
534 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
535 Self::ProcessingFailure => None,
537 #[cfg(feature = "processing")]
538 Self::InvalidAttachmentRef => {
539 Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
540 }
541 Self::NoProcessingGroup => Some(Outcome::Invalid(DiscardReason::Internal)),
542 }
543 }
544
545 fn is_unexpected(&self) -> bool {
546 self.to_outcome()
547 .is_some_and(|outcome| outcome.is_unexpected())
548 }
549}
550
551impl From<Unreal4Error> for ProcessingError {
552 fn from(err: Unreal4Error) -> Self {
553 match err.kind() {
554 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
555 _ => ProcessingError::InvalidUnrealReport(err),
556 }
557 }
558}
559
560#[derive(Debug)]
565pub struct ProcessingExtractedMetrics {
566 metrics: ExtractedMetrics,
567}
568
569impl ProcessingExtractedMetrics {
570 pub fn new() -> Self {
571 Self {
572 metrics: ExtractedMetrics::default(),
573 }
574 }
575
576 pub fn into_inner(self) -> ExtractedMetrics {
577 self.metrics
578 }
579
580 pub fn extend(
582 &mut self,
583 extracted: ExtractedMetrics,
584 sampling_decision: Option<SamplingDecision>,
585 ) {
586 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
587 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
588 }
589
590 pub fn extend_project_metrics<I>(
592 &mut self,
593 buckets: I,
594 sampling_decision: Option<SamplingDecision>,
595 ) where
596 I: IntoIterator<Item = Bucket>,
597 {
598 self.metrics
599 .project_metrics
600 .extend(buckets.into_iter().map(|mut bucket| {
601 bucket.metadata.extracted_from_indexed =
602 sampling_decision == Some(SamplingDecision::Keep);
603 bucket
604 }));
605 }
606
607 pub fn extend_sampling_metrics<I>(
609 &mut self,
610 buckets: I,
611 sampling_decision: Option<SamplingDecision>,
612 ) where
613 I: IntoIterator<Item = Bucket>,
614 {
615 self.metrics
616 .sampling_metrics
617 .extend(buckets.into_iter().map(|mut bucket| {
618 bucket.metadata.extracted_from_indexed =
619 sampling_decision == Some(SamplingDecision::Keep);
620 bucket
621 }));
622 }
623}
624
625fn send_metrics(
626 metrics: ExtractedMetrics,
627 project_key: ProjectKey,
628 sampling_key: Option<ProjectKey>,
629 aggregator: &Addr<Aggregator>,
630) {
631 let ExtractedMetrics {
632 project_metrics,
633 sampling_metrics,
634 } = metrics;
635
636 if !project_metrics.is_empty() {
637 aggregator.send(MergeBuckets {
638 project_key,
639 buckets: project_metrics,
640 });
641 }
642
643 if !sampling_metrics.is_empty() {
644 let sampling_project_key = sampling_key.unwrap_or(project_key);
651 aggregator.send(MergeBuckets {
652 project_key: sampling_project_key,
653 buckets: sampling_metrics,
654 });
655 }
656}
657
658#[derive(Debug)]
668pub struct ProcessEnvelope {
669 pub envelope: ManagedEnvelope,
671 pub project_info: Arc<ProjectInfo>,
673 pub rate_limits: Arc<RateLimits>,
675 pub sampling_project_info: Option<Arc<ProjectInfo>>,
677 pub reservoir_counters: ReservoirCounters,
679}
680
681#[derive(Debug)]
683struct ProcessEnvelopeGrouped<'a> {
684 pub group: ProcessingGroup,
686 pub envelope: ManagedEnvelope,
688 pub ctx: processing::Context<'a>,
690}
691
692#[derive(Debug)]
704pub struct ProcessMetrics {
705 pub data: MetricData,
707 pub project_key: ProjectKey,
709 pub source: BucketSource,
711 pub received_at: DateTime<Utc>,
713 pub sent_at: Option<DateTime<Utc>>,
716}
717
718#[derive(Debug)]
720pub enum MetricData {
721 Raw(Vec<Item>),
723 Parsed(Vec<Bucket>),
725}
726
727impl MetricData {
728 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
733 let items = match self {
734 Self::Parsed(buckets) => return buckets,
735 Self::Raw(items) => items,
736 };
737
738 let mut buckets = Vec::new();
739 for item in items {
740 let payload = item.payload();
741 if item.ty() == &ItemType::Statsd {
742 for bucket_result in Bucket::parse_all(&payload, timestamp) {
743 match bucket_result {
744 Ok(bucket) => buckets.push(bucket),
745 Err(error) => relay_log::debug!(
746 error = &error as &dyn Error,
747 "failed to parse metric bucket from statsd format",
748 ),
749 }
750 }
751 } else if item.ty() == &ItemType::MetricBuckets {
752 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
753 Ok(parsed_buckets) => {
754 if buckets.is_empty() {
756 buckets = parsed_buckets;
757 } else {
758 buckets.extend(parsed_buckets);
759 }
760 }
761 Err(error) => {
762 relay_log::debug!(
763 error = &error as &dyn Error,
764 "failed to parse metric bucket",
765 );
766 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
767 }
768 }
769 } else {
770 relay_log::error!(
771 "invalid item of type {} passed to ProcessMetrics",
772 item.ty()
773 );
774 }
775 }
776 buckets
777 }
778}
779
780#[derive(Debug)]
781pub struct ProcessBatchedMetrics {
782 pub payload: Bytes,
784 pub source: BucketSource,
786 pub received_at: DateTime<Utc>,
788 pub sent_at: Option<DateTime<Utc>>,
790}
791
792#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
794pub enum BucketSource {
795 Internal,
801 External,
806}
807
808impl BucketSource {
809 pub fn from_meta(meta: &RequestMeta) -> Self {
811 match meta.request_trust() {
812 RequestTrust::Trusted => Self::Internal,
813 RequestTrust::Untrusted => Self::External,
814 }
815 }
816}
817
818#[derive(Debug)]
820pub struct SubmitClientReports {
821 pub client_reports: Vec<ClientReport>,
823 pub scoping: Scoping,
825}
826
827#[derive(Debug)]
829pub enum EnvelopeProcessor {
830 ProcessEnvelope(Box<ProcessEnvelope>),
831 ProcessProjectMetrics(Box<ProcessMetrics>),
832 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
833 FlushBuckets(Box<FlushBuckets>),
834 SubmitClientReports(Box<SubmitClientReports>),
835}
836
837impl EnvelopeProcessor {
838 pub fn variant(&self) -> &'static str {
840 match self {
841 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
842 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
843 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
844 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
845 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
846 }
847 }
848}
849
850impl relay_system::Interface for EnvelopeProcessor {}
851
852impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
853 type Response = relay_system::NoResponse;
854
855 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
856 Self::ProcessEnvelope(Box::new(message))
857 }
858}
859
860impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
861 type Response = NoResponse;
862
863 fn from_message(message: ProcessMetrics, _: ()) -> Self {
864 Self::ProcessProjectMetrics(Box::new(message))
865 }
866}
867
868impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
869 type Response = NoResponse;
870
871 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
872 Self::ProcessBatchedMetrics(Box::new(message))
873 }
874}
875
876impl FromMessage<FlushBuckets> for EnvelopeProcessor {
877 type Response = NoResponse;
878
879 fn from_message(message: FlushBuckets, _: ()) -> Self {
880 Self::FlushBuckets(Box::new(message))
881 }
882}
883
884impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
885 type Response = NoResponse;
886
887 fn from_message(message: SubmitClientReports, _: ()) -> Self {
888 Self::SubmitClientReports(Box::new(message))
889 }
890}
891
892pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
894
895#[derive(Clone)]
899pub struct EnvelopeProcessorService {
900 inner: Arc<InnerProcessor>,
901}
902
903pub struct Addrs {
905 pub outcome_aggregator: Addr<TrackOutcome>,
906 pub upstream_relay: Addr<UpstreamRelay>,
907 #[cfg(feature = "processing")]
908 pub objectstore: Option<Addr<Objectstore>>,
909 #[cfg(feature = "processing")]
910 pub store_forwarder: Option<Addr<Store>>,
911 pub aggregator: Addr<Aggregator>,
912}
913
914impl Default for Addrs {
915 fn default() -> Self {
916 Addrs {
917 outcome_aggregator: Addr::dummy(),
918 upstream_relay: Addr::dummy(),
919 #[cfg(feature = "processing")]
920 objectstore: None,
921 #[cfg(feature = "processing")]
922 store_forwarder: None,
923 aggregator: Addr::dummy(),
924 }
925 }
926}
927
928struct InnerProcessor {
929 pool: EnvelopeProcessorServicePool,
930 config: Arc<Config>,
931 global_config: GlobalConfigHandle,
932 project_cache: ProjectCacheHandle,
933 cogs: Cogs,
934 addrs: Addrs,
935 #[cfg(feature = "processing")]
936 rate_limiter: Option<Arc<RedisRateLimiter>>,
937 metric_outcomes: MetricOutcomes,
938 processing: Processing,
939}
940
941struct Processing {
942 errors: ErrorsProcessor,
943 logs: LogsProcessor,
944 trace_metrics: TraceMetricsProcessor,
945 spans: SpansProcessor,
946 legacy_spans: LegacySpansProcessor,
947 check_ins: CheckInsProcessor,
948 sessions: SessionsProcessor,
949 transactions: TransactionProcessor,
950 profile_chunks: ProfileChunksProcessor,
951 trace_attachments: TraceAttachmentsProcessor,
952 replays: ReplaysProcessor,
953 client_reports: ClientReportsProcessor,
954 attachments: AttachmentProcessor,
955 user_reports: UserReportsProcessor,
956 profiles: ProfilesProcessor,
957 forward_unknown: ForwardUnknownProcessor,
958}
959
960impl EnvelopeProcessorService {
961 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
963 pub fn new(
964 pool: EnvelopeProcessorServicePool,
965 config: Arc<Config>,
966 global_config: GlobalConfigHandle,
967 project_cache: ProjectCacheHandle,
968 cogs: Cogs,
969 #[cfg(feature = "processing")] redis: Option<RedisClients>,
970 addrs: Addrs,
971 metric_outcomes: MetricOutcomes,
972 ) -> Self {
973 let geoip_lookup = config
974 .geoip_path()
975 .and_then(
976 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
977 Ok(geoip) => Some(geoip),
978 Err(err) => {
979 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
980 None
981 }
982 },
983 )
984 .unwrap_or_else(GeoIpLookup::empty);
985
986 if let Some(build_epoch) = geoip_lookup.build_epoch() {
987 relay_log::info!("Loaded GeoIP database (build: {build_epoch})");
988 }
989
990 #[cfg(feature = "processing")]
991 let rate_limiter = redis.as_ref().map(|redis| {
992 RedisRateLimiter::new(redis.quotas.clone())
993 .max_limit(config.max_rate_limit())
994 .cache(config.quota_cache_ratio(), config.quota_cache_max())
995 });
996
997 let quota_limiter = Arc::new(QuotaRateLimiter::new(
998 #[cfg(feature = "processing")]
999 project_cache.clone(),
1000 #[cfg(feature = "processing")]
1001 rate_limiter.clone(),
1002 ));
1003 #[cfg(feature = "processing")]
1004 let rate_limiter = rate_limiter.map(Arc::new);
1005 let outcome_aggregator = addrs.outcome_aggregator.clone();
1006 let inner = InnerProcessor {
1007 pool,
1008 global_config,
1009 project_cache,
1010 cogs,
1011 #[cfg(feature = "processing")]
1012 rate_limiter,
1013 addrs,
1014 metric_outcomes,
1015 processing: Processing {
1016 errors: ErrorsProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1017 logs: LogsProcessor::new(Arc::clone("a_limiter)),
1018 trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)),
1019 spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1020 legacy_spans: LegacySpansProcessor::new(
1021 Arc::clone("a_limiter),
1022 geoip_lookup.clone(),
1023 ),
1024 check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)),
1025 sessions: SessionsProcessor::new(Arc::clone("a_limiter)),
1026 transactions: TransactionProcessor::new(
1027 Arc::clone("a_limiter),
1028 geoip_lookup.clone(),
1029 #[cfg(feature = "processing")]
1030 redis.map(|r| r.quotas),
1031 #[cfg(not(feature = "processing"))]
1032 None,
1033 ),
1034 profile_chunks: ProfileChunksProcessor::new(Arc::clone("a_limiter)),
1035 trace_attachments: TraceAttachmentsProcessor::new(Arc::clone("a_limiter)),
1036 replays: ReplaysProcessor::new(Arc::clone("a_limiter), geoip_lookup),
1037 client_reports: ClientReportsProcessor::new(outcome_aggregator),
1038 attachments: AttachmentProcessor::new(Arc::clone("a_limiter)),
1039 user_reports: UserReportsProcessor::new(Arc::clone("a_limiter)),
1040 profiles: ProfilesProcessor::new(quota_limiter),
1041 forward_unknown: ForwardUnknownProcessor::new(),
1042 },
1043 config,
1044 };
1045
1046 Self {
1047 inner: Arc::new(inner),
1048 }
1049 }
1050
1051 async fn process_with_processor<P: processing::Processor>(
1052 &self,
1053 processor: &P,
1054 mut managed_envelope: ManagedEnvelope,
1055 ctx: processing::Context<'_>,
1056 ) -> Result<Output<Outputs>, ProcessingError>
1057 where
1058 Outputs: From<P::Output>,
1059 {
1060 let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1061 debug_assert!(
1062 false,
1063 "there must be work for the {} processor",
1064 std::any::type_name::<P>(),
1065 );
1066 return Err(ProcessingError::ProcessingGroupMismatch);
1067 };
1068
1069 managed_envelope.update();
1070 match managed_envelope.envelope().is_empty() {
1071 true => managed_envelope.accept(),
1072 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1073 }
1074
1075 processor
1076 .process(work, ctx)
1077 .await
1078 .map_err(|err| {
1079 relay_log::debug!(
1080 error = &err as &dyn std::error::Error,
1081 "processing pipeline failed"
1082 );
1083 ProcessingError::ProcessingFailure
1084 })
1085 .map(|o| o.map(Into::into))
1086 }
1087
1088 async fn process_envelope(
1089 &self,
1090 project_id: ProjectId,
1091 message: ProcessEnvelopeGrouped<'_>,
1092 ) -> Result<Output<Outputs>, ProcessingError> {
1093 let ProcessEnvelopeGrouped {
1094 group,
1095 envelope: mut managed_envelope,
1096 ctx,
1097 } = message;
1098
1099 if let Some(sampling_state) = ctx.sampling_project_info {
1101 managed_envelope
1104 .envelope_mut()
1105 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1106 }
1107
1108 if let Some(retention) = ctx.project_info.config.event_retention {
1111 managed_envelope.envelope_mut().set_retention(retention);
1112 }
1113
1114 if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1117 managed_envelope
1118 .envelope_mut()
1119 .set_downsampled_retention(retention);
1120 }
1121
1122 managed_envelope
1127 .envelope_mut()
1128 .meta_mut()
1129 .set_project_id(project_id);
1130
1131 relay_log::trace!("Processing {group} group", group = group.variant());
1132
1133 match group {
1134 ProcessingGroup::Error => {
1135 self.process_with_processor(&self.inner.processing.errors, managed_envelope, ctx)
1136 .await
1137 }
1138 ProcessingGroup::Transaction => {
1139 self.process_with_processor(
1140 &self.inner.processing.transactions,
1141 managed_envelope,
1142 ctx,
1143 )
1144 .await
1145 }
1146 ProcessingGroup::Session => {
1147 self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1148 .await
1149 }
1150 ProcessingGroup::StandaloneAttachments => {
1151 self.process_with_processor(
1152 &self.inner.processing.attachments,
1153 managed_envelope,
1154 ctx,
1155 )
1156 .await
1157 }
1158 ProcessingGroup::StandaloneUserReports => {
1159 self.process_with_processor(
1160 &self.inner.processing.user_reports,
1161 managed_envelope,
1162 ctx,
1163 )
1164 .await
1165 }
1166 ProcessingGroup::StandaloneProfiles => {
1167 self.process_with_processor(&self.inner.processing.profiles, managed_envelope, ctx)
1168 .await
1169 }
1170 ProcessingGroup::ClientReport => {
1171 self.process_with_processor(
1172 &self.inner.processing.client_reports,
1173 managed_envelope,
1174 ctx,
1175 )
1176 .await
1177 }
1178 ProcessingGroup::Replay => {
1179 self.process_with_processor(&self.inner.processing.replays, managed_envelope, ctx)
1180 .await
1181 }
1182 ProcessingGroup::CheckIn => {
1183 self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1184 .await
1185 }
1186 ProcessingGroup::Log => {
1187 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1188 .await
1189 }
1190 ProcessingGroup::TraceMetric => {
1191 self.process_with_processor(
1192 &self.inner.processing.trace_metrics,
1193 managed_envelope,
1194 ctx,
1195 )
1196 .await
1197 }
1198 ProcessingGroup::SpanV2 => {
1199 self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1200 .await
1201 }
1202 ProcessingGroup::TraceAttachment => {
1203 self.process_with_processor(
1204 &self.inner.processing.trace_attachments,
1205 managed_envelope,
1206 ctx,
1207 )
1208 .await
1209 }
1210 ProcessingGroup::Span => {
1211 self.process_with_processor(
1212 &self.inner.processing.legacy_spans,
1213 managed_envelope,
1214 ctx,
1215 )
1216 .await
1217 }
1218 ProcessingGroup::ProfileChunk => {
1219 self.process_with_processor(
1220 &self.inner.processing.profile_chunks,
1221 managed_envelope,
1222 ctx,
1223 )
1224 .await
1225 }
1226 ProcessingGroup::Ungrouped => {
1229 relay_log::error!(
1230 tags.project = %project_id,
1231 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1232 "could not identify the processing group based on the envelope's items"
1233 );
1234
1235 Err(ProcessingError::NoProcessingGroup)
1236 }
1237 ProcessingGroup::ForwardUnknown => {
1241 self.process_with_processor(
1242 &self.inner.processing.forward_unknown,
1243 managed_envelope,
1244 ctx,
1245 )
1246 .await
1247 }
1248 }
1249 }
1250
1251 async fn process<'a>(
1257 &self,
1258 mut message: ProcessEnvelopeGrouped<'a>,
1259 ) -> Result<Option<(Outputs, ForwardContext<'a>)>, ProcessingError> {
1260 let ProcessEnvelopeGrouped {
1261 ref mut envelope,
1262 ctx,
1263 ..
1264 } = message;
1265
1266 let Some(project_id) = ctx
1273 .project_info
1274 .project_id
1275 .or_else(|| envelope.envelope().meta().project_id())
1276 else {
1277 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1278 return Err(ProcessingError::MissingProjectId);
1279 };
1280
1281 let client = envelope.envelope().meta().client().map(str::to_owned);
1282 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1283 let project_key = envelope.envelope().meta().public_key();
1284 let sampling_key = envelope
1288 .envelope()
1289 .sampling_key()
1290 .filter(|_| ctx.sampling_project_info.is_some());
1291
1292 relay_log::configure_scope(|scope| {
1295 scope.set_tag("project", project_id);
1296 if let Some(client) = client {
1297 scope.set_tag("sdk", client);
1298 }
1299 if let Some(user_agent) = user_agent {
1300 scope.set_extra("user_agent", user_agent.into());
1301 }
1302 });
1303
1304 let result =
1305 self.process_envelope(project_id, message)
1306 .await
1307 .map(|Output { main, metrics }| {
1308 if let Some(metrics) = metrics {
1309 metrics.accept(|metrics| {
1310 send_metrics(
1311 metrics,
1312 project_key,
1313 sampling_key,
1314 &self.inner.addrs.aggregator,
1315 );
1316 });
1317 }
1318
1319 let ctx = ctx.to_forward();
1320 main.map(|output| (output, ctx))
1321 });
1322
1323 relay_log::configure_scope(|scope| {
1324 scope.remove_tag("project");
1325 scope.remove_tag("sdk");
1326 scope.remove_tag("user_agent");
1327 });
1328
1329 result
1330 }
1331
1332 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1333 let project_key = message.envelope.envelope().meta().public_key();
1334 let wait_time = message.envelope.age();
1335 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1336
1337 cogs.cancel();
1340
1341 let scoping = message.envelope.scoping();
1342 for (group, envelope) in ProcessingGroup::split_envelope(
1343 *message.envelope.into_envelope(),
1344 &message.project_info,
1345 ) {
1346 let mut cogs = self
1347 .inner
1348 .cogs
1349 .timed(ResourceId::Relay, AppFeature::from(group));
1350
1351 let mut envelope =
1352 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1353 envelope.scope(scoping);
1354
1355 let global_config = self.inner.global_config.current();
1356
1357 let ctx = processing::Context {
1358 config: &self.inner.config,
1359 global_config: &global_config,
1360 project_info: &message.project_info,
1361 sampling_project_info: message.sampling_project_info.as_deref(),
1362 rate_limits: &message.rate_limits,
1363 reservoir_counters: &message.reservoir_counters,
1364 };
1365
1366 let message = ProcessEnvelopeGrouped {
1367 group,
1368 envelope,
1369 ctx,
1370 };
1371
1372 let result = metric!(
1373 timer(RelayTimers::EnvelopeProcessingTime),
1374 group = group.variant(),
1375 { self.process(message).await }
1376 );
1377
1378 match result {
1379 Ok(Some((output, ctx))) => self.submit_upstream(&mut cogs, output, ctx),
1380 Ok(None) => {}
1381 Err(error) if error.is_unexpected() => {
1382 relay_log::error!(
1383 tags.project_key = %project_key,
1384 error = &error as &dyn Error,
1385 "error processing envelope"
1386 )
1387 }
1388 Err(error) => {
1389 relay_log::debug!(
1390 tags.project_key = %project_key,
1391 error = &error as &dyn Error,
1392 "error processing envelope"
1393 )
1394 }
1395 }
1396 }
1397 }
1398
1399 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1400 let ProcessMetrics {
1401 data,
1402 project_key,
1403 received_at,
1404 sent_at,
1405 source,
1406 } = message;
1407
1408 let received_timestamp =
1409 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1410
1411 let mut buckets = data.into_buckets(received_timestamp);
1412 if buckets.is_empty() {
1413 return;
1414 };
1415 cogs.update(relay_metrics::cogs::BySize(&buckets));
1416
1417 let clock_drift_processor =
1418 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
1419
1420 buckets.retain_mut(|bucket| {
1421 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
1422 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
1423 return false;
1424 }
1425
1426 if !self::metrics::is_valid_namespace(bucket) {
1427 return false;
1428 }
1429
1430 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
1431
1432 if !matches!(source, BucketSource::Internal) {
1433 bucket.metadata = BucketMetadata::new(received_timestamp);
1434 }
1435
1436 true
1437 });
1438
1439 let project = self.inner.project_cache.get(project_key);
1440
1441 let buckets = match project.state() {
1444 ProjectState::Enabled(project_info) => {
1445 let rate_limits = project.rate_limits().current_limits();
1446 self.check_buckets(project_key, project_info, &rate_limits, buckets)
1447 }
1448 _ => buckets,
1449 };
1450
1451 relay_log::trace!("merging metric buckets into the aggregator");
1452 self.inner
1453 .addrs
1454 .aggregator
1455 .send(MergeBuckets::new(project_key, buckets));
1456 }
1457
1458 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
1459 let ProcessBatchedMetrics {
1460 payload,
1461 source,
1462 received_at,
1463 sent_at,
1464 } = message;
1465
1466 #[derive(serde::Deserialize)]
1467 struct Wrapper {
1468 buckets: HashMap<ProjectKey, Vec<Bucket>>,
1469 }
1470
1471 let buckets = match serde_json::from_slice(&payload) {
1472 Ok(Wrapper { buckets }) => buckets,
1473 Err(error) => {
1474 relay_log::debug!(
1475 error = &error as &dyn Error,
1476 "failed to parse batched metrics",
1477 );
1478 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1479 return;
1480 }
1481 };
1482
1483 for (project_key, buckets) in buckets {
1484 self.handle_process_metrics(
1485 cogs,
1486 ProcessMetrics {
1487 data: MetricData::Parsed(buckets),
1488 project_key,
1489 source,
1490 received_at,
1491 sent_at,
1492 },
1493 )
1494 }
1495 }
1496
1497 fn submit_upstream(
1501 &self,
1502 cogs: &mut Token,
1503 output: Outputs,
1504 ctx: processing::ForwardContext<'_>,
1505 ) {
1506 let _submit = cogs.start_category("submit");
1507
1508 #[cfg(feature = "processing")]
1509 if ctx.config.processing_enabled()
1510 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
1511 {
1512 use crate::processing::StoreHandle;
1513
1514 let objectstore = self.inner.addrs.objectstore.as_ref();
1515 let handle = StoreHandle::new(store_forwarder, objectstore, ctx.global_config);
1516
1517 output
1518 .forward_store(handle, ctx)
1519 .unwrap_or_else(|err| err.into_inner());
1520
1521 return;
1522 }
1523
1524 match output.serialize_envelope(ctx) {
1525 Ok(envelope) => {
1526 let envelope = ManagedEnvelope::from(envelope);
1527 self.submit_envelope_upstream(envelope, ctx.project_info.upstream.clone());
1528 }
1529 Err(_) => relay_log::error!("failed to serialize output to an envelope"),
1530 };
1531 }
1532
1533 fn submit_envelope_upstream(
1534 &self,
1535 mut envelope: ManagedEnvelope,
1536 upstream: Option<UpstreamDescriptor>,
1539 ) {
1540 if envelope.envelope_mut().is_empty() {
1541 envelope.accept();
1542 return;
1543 }
1544
1545 if self.inner.config.processing_enabled() {
1551 relay_log::error!(
1552 "attempt to forward envelope to http upstream when processing is enabled"
1553 );
1554 return;
1555 }
1556
1557 envelope.envelope_mut().set_sent_at(Utc::now());
1563
1564 relay_log::trace!("sending envelope to sentry endpoint");
1565 let http_encoding = self.inner.config.http_encoding();
1566 let result = envelope.envelope().to_vec().and_then(|v| {
1567 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
1568 });
1569
1570 match result {
1571 Ok(body) => {
1572 self.inner
1573 .addrs
1574 .upstream_relay
1575 .send(SendRequest(SendEnvelope {
1576 upstream,
1577 envelope,
1578 body,
1579 http_encoding,
1580 project_cache: self.inner.project_cache.clone(),
1581 }));
1582 }
1583 Err(error) => {
1584 relay_log::error!(
1587 error = &error as &dyn Error,
1588 tags.project_key = %envelope.scoping().project_key,
1589 "failed to serialize envelope payload"
1590 );
1591
1592 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1593 }
1594 }
1595 }
1596
1597 fn handle_submit_client_reports(&self, message: SubmitClientReports) {
1598 let SubmitClientReports {
1599 client_reports,
1600 scoping,
1601 } = message;
1602
1603 let upstream = self.inner.config.upstream();
1604 let dsn = PartialDsn::outbound(&scoping, upstream);
1605
1606 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
1607 for client_report in client_reports {
1608 match client_report.serialize() {
1609 Ok(payload) => {
1610 let mut item = Item::new(ItemType::ClientReport);
1611 item.set_payload(ContentType::Json, payload);
1612 envelope.add_item(item);
1613 }
1614 Err(error) => {
1615 relay_log::error!(
1616 error = &error as &dyn std::error::Error,
1617 "failed to serialize client report"
1618 );
1619 }
1620 }
1621 }
1622
1623 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1624 self.submit_envelope_upstream(envelope, None);
1625 }
1626
1627 fn check_buckets(
1628 &self,
1629 project_key: ProjectKey,
1630 project_info: &ProjectInfo,
1631 rate_limits: &RateLimits,
1632 buckets: Vec<Bucket>,
1633 ) -> Vec<Bucket> {
1634 let Some(scoping) = project_info.scoping(project_key) else {
1635 relay_log::error!(
1636 tags.project_key = project_key.as_str(),
1637 "there is no scoping: dropping {} buckets",
1638 buckets.len(),
1639 );
1640 return Vec::new();
1641 };
1642
1643 let mut buckets = self::metrics::apply_project_info(
1644 buckets,
1645 &self.inner.metric_outcomes,
1646 project_info,
1647 scoping,
1648 );
1649
1650 let namespaces: BTreeSet<MetricNamespace> = buckets
1651 .iter()
1652 .filter_map(|bucket| bucket.name.try_namespace())
1653 .collect();
1654
1655 for namespace in namespaces {
1656 let limits = rate_limits.check_with_quotas(
1657 project_info.get_quotas(),
1658 scoping.item(DataCategory::MetricBucket),
1659 );
1660
1661 if limits.is_limited() {
1662 let rejected;
1663 (buckets, rejected) = utils::split_off(buckets, |bucket| {
1664 bucket.name.try_namespace() == Some(namespace)
1665 });
1666
1667 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1668 self.inner.metric_outcomes.track(
1669 scoping,
1670 &rejected,
1671 Outcome::RateLimited(reason_code),
1672 );
1673 }
1674 }
1675
1676 let quotas = project_info.config.quotas.clone();
1677 match MetricsLimiter::create(buckets, quotas, scoping) {
1678 Ok(mut bucket_limiter) => {
1679 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
1680 bucket_limiter.into_buckets()
1681 }
1682 Err(buckets) => buckets,
1683 }
1684 }
1685
1686 #[cfg(feature = "processing")]
1687 async fn rate_limit_buckets(
1688 &self,
1689 scoping: Scoping,
1690 project_info: &ProjectInfo,
1691 mut buckets: Vec<Bucket>,
1692 ) -> Vec<Bucket> {
1693 let Some(rate_limiter) = &self.inner.rate_limiter else {
1694 return buckets;
1695 };
1696
1697 let global_config = self.inner.global_config.current();
1698 let namespaces = buckets
1699 .iter()
1700 .filter_map(|bucket| bucket.name.try_namespace())
1701 .counts();
1702
1703 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
1704
1705 for (namespace, quantity) in namespaces {
1706 let item_scoping = scoping.metric_bucket(namespace);
1707
1708 let limits = match rate_limiter
1709 .is_rate_limited(quotas, item_scoping, quantity, false)
1710 .await
1711 {
1712 Ok(limits) => limits,
1713 Err(err) => {
1714 relay_log::error!(
1715 error = &err as &dyn std::error::Error,
1716 "failed to check redis rate limits"
1717 );
1718 break;
1719 }
1720 };
1721
1722 if limits.is_limited() {
1723 let rejected;
1724 (buckets, rejected) = utils::split_off(buckets, |bucket| {
1725 bucket.name.try_namespace() == Some(namespace)
1726 });
1727
1728 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1729 self.inner.metric_outcomes.track(
1730 scoping,
1731 &rejected,
1732 Outcome::RateLimited(reason_code),
1733 );
1734
1735 self.inner
1736 .project_cache
1737 .get(item_scoping.scoping.project_key)
1738 .rate_limits()
1739 .merge(limits);
1740 }
1741 }
1742
1743 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
1744 Err(buckets) => buckets,
1745 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
1746 }
1747 }
1748
1749 #[cfg(feature = "processing")]
1751 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
1752 relay_log::trace!("handle_rate_limit_buckets");
1753
1754 let scoping = *bucket_limiter.scoping();
1755
1756 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
1757 let global_config = self.inner.global_config.current();
1758 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
1759
1760 let over_accept_once = true;
1763 let mut rate_limits = RateLimits::new();
1764
1765 let (category, count) = bucket_limiter.count();
1766
1767 let timer = Instant::now();
1768 let mut is_limited = false;
1769
1770 if let Some(count) = count {
1771 match rate_limiter
1772 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
1773 .await
1774 {
1775 Ok(limits) => {
1776 is_limited = limits.is_limited();
1777 rate_limits.merge(limits)
1778 }
1779 Err(e) => {
1780 relay_log::error!(error = &e as &dyn Error, "rate limiting error")
1781 }
1782 }
1783 }
1784
1785 relay_statsd::metric!(
1786 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
1787 category = category.name(),
1788 limited = if is_limited { "true" } else { "false" },
1789 count = match count {
1790 None => "none",
1791 Some(0) => "0",
1792 Some(1) => "1",
1793 Some(1..=10) => "10",
1794 Some(1..=25) => "25",
1795 Some(1..=50) => "50",
1796 Some(51..=100) => "100",
1797 Some(101..=500) => "500",
1798 _ => "> 500",
1799 },
1800 );
1801
1802 if rate_limits.is_limited() {
1803 let was_enforced =
1804 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
1805
1806 if was_enforced {
1807 self.inner
1809 .project_cache
1810 .get(scoping.project_key)
1811 .rate_limits()
1812 .merge(rate_limits);
1813 }
1814 }
1815 }
1816
1817 bucket_limiter.into_buckets()
1818 }
1819
1820 #[cfg(feature = "processing")]
1826 async fn encode_metrics_processing(
1827 &self,
1828 message: FlushBuckets,
1829 store_forwarder: &Addr<Store>,
1830 ) {
1831 use crate::constants::DEFAULT_EVENT_RETENTION;
1832 use crate::services::store::StoreMetrics;
1833
1834 for ProjectBuckets {
1835 buckets,
1836 scoping,
1837 project_info,
1838 ..
1839 } in message.buckets.into_values()
1840 {
1841 let buckets = self
1842 .rate_limit_buckets(scoping, &project_info, buckets)
1843 .await;
1844
1845 if buckets.is_empty() {
1846 continue;
1847 }
1848
1849 let retention = project_info
1850 .config
1851 .event_retention
1852 .unwrap_or(DEFAULT_EVENT_RETENTION);
1853
1854 store_forwarder.send(StoreMetrics {
1857 buckets,
1858 scoping,
1859 retention,
1860 });
1861 }
1862 }
1863
1864 fn encode_metrics_envelope(&self, message: FlushBuckets) {
1874 let FlushBuckets {
1875 partition_key,
1876 buckets,
1877 } = message;
1878
1879 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
1880 let upstream = self.inner.config.upstream();
1881
1882 for ProjectBuckets {
1883 buckets,
1884 scoping,
1885 project_info,
1886 ..
1887 } in buckets.values()
1888 {
1889 let dsn = PartialDsn::outbound(scoping, upstream);
1890
1891 relay_statsd::metric!(
1892 distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
1893 );
1894
1895 let mut num_batches = 0;
1896 for batch in BucketsView::from(buckets).by_size(batch_size) {
1897 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
1898
1899 let mut item = Item::new(ItemType::MetricBuckets);
1900 item.set_source_quantities(crate::metrics::extract_quantities(batch));
1901 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
1902 envelope.add_item(item);
1903
1904 let mut envelope =
1905 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1906 envelope
1907 .set_partition_key(Some(partition_key))
1908 .scope(*scoping);
1909
1910 relay_statsd::metric!(
1911 distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
1912 );
1913
1914 self.submit_envelope_upstream(envelope, project_info.upstream.clone());
1915 num_batches += 1;
1916 }
1917
1918 relay_statsd::metric!(
1919 distribution(RelayDistributions::BatchesPerPartition) = num_batches
1920 );
1921 }
1922 }
1923
1924 fn send_global_partition(
1926 &self,
1927 upstream: Option<UpstreamDescriptor>,
1928 partition_key: u32,
1929 partition: &mut Partition<'_>,
1930 ) {
1931 if partition.is_empty() {
1932 return;
1933 }
1934
1935 let (unencoded, project_info) = partition.take();
1936 let http_encoding = self.inner.config.http_encoding();
1937 let encoded = match encode_payload(&unencoded, http_encoding) {
1938 Ok(payload) => payload,
1939 Err(error) => {
1940 let error = &error as &dyn std::error::Error;
1941 relay_log::error!(error, "failed to encode metrics payload");
1942 return;
1943 }
1944 };
1945
1946 let request = SendMetricsRequest {
1947 upstream,
1948 partition_key: partition_key.to_string(),
1949 unencoded,
1950 encoded,
1951 project_info,
1952 http_encoding,
1953 metric_outcomes: self.inner.metric_outcomes.clone(),
1954 };
1955
1956 self.inner.addrs.upstream_relay.send(SendRequest(request));
1957 }
1958
1959 fn encode_metrics_global(&self, message: FlushBuckets) {
1970 let FlushBuckets {
1971 partition_key,
1972 buckets,
1973 } = message;
1974
1975 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
1976 let mut partitions = BTreeMap::new();
1977 let mut partition_splits = 0;
1978
1979 for ProjectBuckets {
1980 buckets,
1981 scoping,
1982 project_info,
1983 ..
1984 } in buckets.values()
1985 {
1986 let partition = match partitions.get_mut(&project_info.upstream) {
1987 Some(partition) => partition,
1988 None => partitions
1989 .entry(project_info.upstream.clone())
1990 .or_insert_with(|| Partition::new(batch_size)),
1991 };
1992
1993 for bucket in buckets {
1994 let mut remaining = Some(BucketView::new(bucket));
1995
1996 while let Some(bucket) = remaining.take() {
1997 if let Some(next) = partition.insert(bucket, *scoping) {
1998 self.send_global_partition(
2002 project_info.upstream.clone(),
2003 partition_key,
2004 partition,
2005 );
2006 remaining = Some(next);
2007 partition_splits += 1;
2008 }
2009 }
2010 }
2011 }
2012
2013 if partition_splits > 0 {
2014 metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2015 }
2016
2017 for (upstream, mut partition) in partitions {
2018 self.send_global_partition(upstream, partition_key, &mut partition);
2019 }
2020 }
2021
2022 async fn handle_flush_buckets(&self, mut message: FlushBuckets) {
2023 for (project_key, pb) in message.buckets.iter_mut() {
2024 let buckets = std::mem::take(&mut pb.buckets);
2025 pb.buckets =
2026 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2027 }
2028
2029 #[cfg(feature = "processing")]
2030 if self.inner.config.processing_enabled()
2031 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2032 {
2033 return self
2034 .encode_metrics_processing(message, store_forwarder)
2035 .await;
2036 }
2037
2038 if self.inner.config.http_global_metrics() {
2039 self.encode_metrics_global(message)
2040 } else {
2041 self.encode_metrics_envelope(message)
2042 }
2043 }
2044
2045 #[cfg(all(test, feature = "processing"))]
2046 fn redis_rate_limiter_enabled(&self) -> bool {
2047 self.inner.rate_limiter.is_some()
2048 }
2049
2050 async fn handle_message(&self, message: EnvelopeProcessor) {
2051 let ty = message.variant();
2052 let feature_weights = self.feature_weights(&message);
2053
2054 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2055 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2056
2057 match message {
2058 EnvelopeProcessor::ProcessEnvelope(m) => {
2059 self.handle_process_envelope(&mut cogs, *m).await
2060 }
2061 EnvelopeProcessor::ProcessProjectMetrics(m) => {
2062 self.handle_process_metrics(&mut cogs, *m)
2063 }
2064 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2065 self.handle_process_batched_metrics(&mut cogs, *m)
2066 }
2067 EnvelopeProcessor::FlushBuckets(m) => self.handle_flush_buckets(*m).await,
2068 EnvelopeProcessor::SubmitClientReports(m) => self.handle_submit_client_reports(*m),
2069 }
2070 });
2071 }
2072
2073 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2074 match message {
2075 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2077 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2078 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2079 EnvelopeProcessor::FlushBuckets(v) => v
2080 .buckets
2081 .values()
2082 .map(|s| {
2083 if self.inner.config.processing_enabled() {
2084 relay_metrics::cogs::ByCount(&s.buckets).into()
2087 } else {
2088 relay_metrics::cogs::BySize(&s.buckets).into()
2089 }
2090 })
2091 .fold(FeatureWeights::none(), FeatureWeights::merge),
2092 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2093 }
2094 }
2095}
2096
2097impl Service for EnvelopeProcessorService {
2098 type Interface = EnvelopeProcessor;
2099
2100 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2101 while let Some(message) = rx.recv().await {
2102 let service = self.clone();
2103 self.inner
2104 .pool
2105 .spawn_async(
2106 async move {
2107 service.handle_message(message).await;
2108 }
2109 .boxed(),
2110 )
2111 .await;
2112 }
2113 }
2114}
2115
2116pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2117 let envelope_body: Vec<u8> = match http_encoding {
2118 HttpEncoding::Identity => return Ok(body.clone()),
2119 HttpEncoding::Deflate => {
2120 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2121 encoder.write_all(body.as_ref())?;
2122 encoder.finish()?
2123 }
2124 HttpEncoding::Gzip => {
2125 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2126 encoder.write_all(body.as_ref())?;
2127 encoder.finish()?
2128 }
2129 HttpEncoding::Br => {
2130 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2132 encoder.write_all(body.as_ref())?;
2133 encoder.into_inner()
2134 }
2135 HttpEncoding::Zstd => {
2136 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2139 encoder.write_all(body.as_ref())?;
2140 encoder.finish()?
2141 }
2142 };
2143
2144 Ok(envelope_body.into())
2145}
2146
2147#[derive(Debug)]
2149pub struct SendEnvelope {
2150 pub upstream: Option<UpstreamDescriptor>,
2151 pub envelope: ManagedEnvelope,
2152 pub body: Bytes,
2153 pub http_encoding: HttpEncoding,
2154 pub project_cache: ProjectCacheHandle,
2155}
2156
2157impl UpstreamRequest for SendEnvelope {
2158 fn upstream(&self) -> Option<&UpstreamDescriptor> {
2159 self.upstream.as_ref()
2160 }
2161
2162 fn method(&self) -> reqwest::Method {
2163 reqwest::Method::POST
2164 }
2165
2166 fn path(&self) -> Cow<'_, str> {
2167 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2168 }
2169
2170 fn route(&self) -> &'static str {
2171 "envelope"
2172 }
2173
2174 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2175 let envelope_body = self.body.clone();
2176 metric!(
2177 distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2178 );
2179
2180 let meta = &self.envelope.meta();
2181 let shard = self.envelope.partition_key().map(|p| p.to_string());
2182 builder
2183 .content_encoding(self.http_encoding)
2184 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2185 .header_opt("User-Agent", meta.user_agent())
2186 .header("X-Sentry-Auth", meta.auth_header())
2187 .header("X-Forwarded-For", meta.forwarded_for())
2188 .header("Content-Type", envelope::CONTENT_TYPE)
2189 .header_opt("X-Sentry-Relay-Shard", shard)
2190 .body(envelope_body);
2191
2192 Ok(())
2193 }
2194
2195 fn sign(&mut self) -> Option<Sign> {
2196 Some(Sign::Optional(SignatureType::RequestSign))
2197 }
2198
2199 fn respond(
2200 self: Box<Self>,
2201 result: Result<http::Response, UpstreamRequestError>,
2202 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2203 Box::pin(async move {
2204 let result = match result {
2205 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2206 Err(error) => Err(error),
2207 };
2208
2209 match result {
2210 Ok(()) => self.envelope.accept(),
2211 Err(error) if error.is_received() => {
2212 let scoping = self.envelope.scoping();
2213 self.envelope.accept();
2214
2215 if let UpstreamRequestError::RateLimited(limits) = error {
2216 self.project_cache
2217 .get(scoping.project_key)
2218 .rate_limits()
2219 .merge(limits.scope(&scoping));
2220 }
2221 }
2222 Err(error) => {
2223 let mut envelope = self.envelope;
2226 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2227 relay_log::error!(
2228 error = &error as &dyn Error,
2229 tags.project_key = %envelope.scoping().project_key,
2230 "error sending envelope"
2231 );
2232 }
2233 }
2234 })
2235 }
2236}
2237
2238#[derive(Debug)]
2245struct Partition<'a> {
2246 max_size: usize,
2247 remaining: usize,
2248 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2249 project_info: HashMap<ProjectKey, Scoping>,
2250}
2251
2252impl<'a> Partition<'a> {
2253 pub fn new(size: usize) -> Self {
2255 Self {
2256 max_size: size,
2257 remaining: size,
2258 views: HashMap::new(),
2259 project_info: HashMap::new(),
2260 }
2261 }
2262
2263 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
2274 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
2275
2276 if let Some(current) = current {
2277 self.remaining = self.remaining.saturating_sub(current.estimated_size());
2278 self.views
2279 .entry(scoping.project_key)
2280 .or_default()
2281 .push(current);
2282
2283 self.project_info
2284 .entry(scoping.project_key)
2285 .or_insert(scoping);
2286 }
2287
2288 next
2289 }
2290
2291 fn is_empty(&self) -> bool {
2293 self.views.is_empty()
2294 }
2295
2296 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
2300 #[derive(serde::Serialize)]
2301 struct Wrapper<'a> {
2302 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
2303 }
2304
2305 let buckets = &self.views;
2306 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
2307
2308 let scopings = std::mem::take(&mut self.project_info);
2309
2310 self.views.clear();
2311 self.remaining = self.max_size;
2312
2313 (payload, scopings)
2314 }
2315}
2316
2317#[derive(Debug)]
2321struct SendMetricsRequest {
2322 upstream: Option<UpstreamDescriptor>,
2324 partition_key: String,
2326 unencoded: Bytes,
2328 encoded: Bytes,
2330 project_info: HashMap<ProjectKey, Scoping>,
2334 http_encoding: HttpEncoding,
2336 metric_outcomes: MetricOutcomes,
2338}
2339
2340impl SendMetricsRequest {
2341 fn create_error_outcomes(self) {
2342 #[derive(serde::Deserialize)]
2343 struct Wrapper {
2344 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
2345 }
2346
2347 let buckets = match serde_json::from_slice(&self.unencoded) {
2348 Ok(Wrapper { buckets }) => buckets,
2349 Err(err) => {
2350 relay_log::error!(
2351 error = &err as &dyn std::error::Error,
2352 "failed to parse buckets from failed transmission"
2353 );
2354 return;
2355 }
2356 };
2357
2358 for (key, buckets) in buckets {
2359 let Some(&scoping) = self.project_info.get(&key) else {
2360 relay_log::error!("missing scoping for project key");
2361 continue;
2362 };
2363
2364 self.metric_outcomes.track(
2365 scoping,
2366 &buckets,
2367 Outcome::Invalid(DiscardReason::Internal),
2368 );
2369 }
2370 }
2371}
2372
2373impl UpstreamRequest for SendMetricsRequest {
2374 fn upstream(&self) -> Option<&UpstreamDescriptor> {
2375 self.upstream.as_ref()
2376 }
2377
2378 fn set_relay_id(&self) -> bool {
2379 true
2380 }
2381
2382 fn sign(&mut self) -> Option<Sign> {
2383 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
2384 }
2385
2386 fn method(&self) -> reqwest::Method {
2387 reqwest::Method::POST
2388 }
2389
2390 fn path(&self) -> Cow<'_, str> {
2391 "/api/0/relays/metrics/".into()
2392 }
2393
2394 fn route(&self) -> &'static str {
2395 "global_metrics"
2396 }
2397
2398 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2399 metric!(
2400 distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
2401 );
2402
2403 builder
2404 .content_encoding(self.http_encoding)
2405 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
2406 .header(header::CONTENT_TYPE, b"application/json")
2407 .body(self.encoded.clone());
2408
2409 Ok(())
2410 }
2411
2412 fn respond(
2413 self: Box<Self>,
2414 result: Result<http::Response, UpstreamRequestError>,
2415 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2416 Box::pin(async {
2417 match result {
2418 Ok(mut response) => {
2419 response.consume().await.ok();
2420 }
2421 Err(error) => {
2422 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
2423
2424 if error.is_received() {
2427 return;
2428 }
2429
2430 self.create_error_outcomes()
2431 }
2432 }
2433 })
2434 }
2435}
2436
2437#[derive(Copy, Clone, Debug)]
2439#[cfg(feature = "processing")]
2440struct CombinedQuotas<'a> {
2441 global_quotas: &'a [Quota],
2442 project_quotas: &'a [Quota],
2443}
2444
2445#[cfg(feature = "processing")]
2446impl<'a> CombinedQuotas<'a> {
2447 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
2449 Self {
2450 global_quotas: &global_config.quotas,
2451 project_quotas,
2452 }
2453 }
2454}
2455
2456#[cfg(feature = "processing")]
2457impl<'a> IntoIterator for CombinedQuotas<'a> {
2458 type Item = &'a Quota;
2459 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
2460
2461 fn into_iter(self) -> Self::IntoIter {
2462 self.global_quotas.iter().chain(self.project_quotas.iter())
2463 }
2464}
2465
2466#[cfg(test)]
2467mod tests {
2468 use insta::assert_debug_snapshot;
2469 use relay_common::glob2::LazyGlob;
2470 use relay_dynamic_config::ProjectConfig;
2471 use relay_event_normalization::{
2472 NormalizationConfig, RedactionRule, TransactionNameConfig, TransactionNameRule,
2473 };
2474 use relay_event_schema::protocol::{Event, TransactionSource};
2475 use relay_pii::DataScrubbingConfig;
2476 use relay_protocol::Annotated;
2477 use similar_asserts::assert_eq;
2478
2479 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
2480
2481 #[cfg(feature = "processing")]
2482 use {
2483 relay_metrics::BucketValue,
2484 relay_quotas::{QuotaScope, ReasonCode},
2485 relay_test::mock_service,
2486 };
2487
2488 use super::*;
2489
2490 #[cfg(feature = "processing")]
2491 fn mock_quota(id: &str) -> Quota {
2492 Quota {
2493 id: Some(id.into()),
2494 categories: [DataCategory::MetricBucket].into(),
2495 scope: QuotaScope::Organization,
2496 scope_id: None,
2497 limit: Some(0),
2498 window: None,
2499 reason_code: None,
2500 namespace: None,
2501 }
2502 }
2503
2504 #[cfg(feature = "processing")]
2505 #[test]
2506 fn test_dynamic_quotas() {
2507 let global_config = relay_dynamic_config::GlobalConfig {
2508 quotas: vec![mock_quota("foo"), mock_quota("bar")],
2509 ..Default::default()
2510 };
2511
2512 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
2513
2514 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
2515
2516 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
2517 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
2518 }
2519
2520 #[cfg(feature = "processing")]
2523 #[tokio::test]
2524 async fn test_ratelimit_per_batch() {
2525 use relay_base_schema::organization::OrganizationId;
2526 use relay_protocol::FiniteF64;
2527
2528 let rate_limited_org = Scoping {
2529 organization_id: OrganizationId::new(1),
2530 project_id: ProjectId::new(21),
2531 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
2532 key_id: Some(17),
2533 };
2534
2535 let not_rate_limited_org = Scoping {
2536 organization_id: OrganizationId::new(2),
2537 project_id: ProjectId::new(21),
2538 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
2539 key_id: Some(17),
2540 };
2541
2542 let message = {
2543 let project_info = {
2544 let quota = Quota {
2545 id: Some("testing".into()),
2546 categories: [DataCategory::MetricBucket].into(),
2547 scope: relay_quotas::QuotaScope::Organization,
2548 scope_id: Some(rate_limited_org.organization_id.to_string().into()),
2549 limit: Some(0),
2550 window: None,
2551 reason_code: Some(ReasonCode::new("test")),
2552 namespace: None,
2553 };
2554
2555 let mut config = ProjectConfig::default();
2556 config.quotas.push(quota);
2557
2558 Arc::new(ProjectInfo {
2559 config,
2560 ..Default::default()
2561 })
2562 };
2563
2564 let project_metrics = |scoping| ProjectBuckets {
2565 buckets: vec![Bucket {
2566 name: "d:spans/bar".into(),
2567 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
2568 timestamp: UnixTimestamp::now(),
2569 tags: Default::default(),
2570 width: 10,
2571 metadata: BucketMetadata::default(),
2572 }],
2573 rate_limits: Default::default(),
2574 project_info: project_info.clone(),
2575 scoping,
2576 };
2577
2578 let buckets = hashbrown::HashMap::from([
2579 (
2580 rate_limited_org.project_key,
2581 project_metrics(rate_limited_org),
2582 ),
2583 (
2584 not_rate_limited_org.project_key,
2585 project_metrics(not_rate_limited_org),
2586 ),
2587 ]);
2588
2589 FlushBuckets {
2590 partition_key: 0,
2591 buckets,
2592 }
2593 };
2594
2595 assert_eq!(message.buckets.keys().count(), 2);
2597
2598 let config = {
2599 let config_json = serde_json::json!({
2600 "processing": {
2601 "enabled": true,
2602 "kafka_config": [],
2603 "redis": {
2604 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
2605 }
2606 }
2607 });
2608 Config::from_json_value(config_json).unwrap()
2609 };
2610
2611 let (store, handle) = {
2612 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
2613 let org_id = match msg {
2614 Store::Metrics(x) => x.scoping.organization_id,
2615 _ => panic!("received envelope when expecting only metrics"),
2616 };
2617 org_ids.push(org_id);
2618 };
2619
2620 mock_service("store_forwarder", vec![], f)
2621 };
2622
2623 let processor = create_test_processor(config).await;
2624 assert!(processor.redis_rate_limiter_enabled());
2625
2626 processor.encode_metrics_processing(message, &store).await;
2627
2628 drop(store);
2629 let orgs_not_ratelimited = handle.await.unwrap();
2630
2631 assert_eq!(
2632 orgs_not_ratelimited,
2633 vec![not_rate_limited_org.organization_id]
2634 );
2635 }
2636
2637 #[tokio::test]
2638 async fn test_browser_version_extraction_with_pii_like_data() {
2639 let processor = create_test_processor(Default::default()).await;
2640 let outcome_aggregator = Addr::dummy();
2641 let event_id = EventId::new();
2642
2643 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2644 .parse()
2645 .unwrap();
2646
2647 let request_meta = RequestMeta::new(dsn);
2648 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
2649
2650 envelope.add_item({
2651 let mut item = Item::new(ItemType::Event);
2652 item.set_payload(
2653 ContentType::Json,
2654 r#"
2655 {
2656 "request": {
2657 "headers": [
2658 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
2659 ]
2660 }
2661 }
2662 "#,
2663 );
2664 item
2665 });
2666
2667 let mut datascrubbing_settings = DataScrubbingConfig::default();
2668 datascrubbing_settings.scrub_data = true;
2670 datascrubbing_settings.scrub_defaults = true;
2671 datascrubbing_settings.scrub_ip_addresses = true;
2672
2673 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
2675
2676 let config = ProjectConfig {
2677 datascrubbing_settings,
2678 pii_config: Some(pii_config),
2679 ..Default::default()
2680 };
2681
2682 let project_info = ProjectInfo {
2683 config,
2684 ..Default::default()
2685 };
2686
2687 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
2688 assert_eq!(envelopes.len(), 1);
2689
2690 let (group, envelope) = envelopes.pop().unwrap();
2691 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2692
2693 let message = ProcessEnvelopeGrouped {
2694 group,
2695 envelope,
2696 ctx: processing::Context {
2697 project_info: &project_info,
2698 ..processing::Context::for_test()
2699 },
2700 };
2701
2702 let Ok(Some((output, ctx))) = processor.process(message).await else {
2703 panic!();
2704 };
2705 let new_envelope = output.serialize_envelope(ctx).unwrap().accept(|f| f);
2706
2707 let event_item = new_envelope.items().last().unwrap();
2708 let annotated_event: Annotated<Event> =
2709 Annotated::from_json_bytes(&event_item.payload()).unwrap();
2710 let event = annotated_event.into_value().unwrap();
2711 let headers = event
2712 .request
2713 .into_value()
2714 .unwrap()
2715 .headers
2716 .into_value()
2717 .unwrap();
2718
2719 assert_eq!(
2721 Some(
2722 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
2723 ),
2724 headers.get_header("User-Agent")
2725 );
2726 let contexts = event.contexts.into_value().unwrap();
2728 let browser = contexts.0.get("browser").unwrap();
2729 assert_eq!(
2730 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
2731 browser.to_json().unwrap()
2732 );
2733 }
2734
2735 #[tokio::test]
2736 #[cfg(feature = "processing")]
2737 async fn test_materialize_dsc() {
2738 use crate::services::projects::project::PublicKeyConfig;
2739
2740 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2741 .parse()
2742 .unwrap();
2743 let request_meta = RequestMeta::new(dsn);
2744 let mut envelope = Envelope::from_request(None, request_meta);
2745
2746 let dsc = r#"{
2747 "trace_id": "00000000-0000-0000-0000-000000000000",
2748 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
2749 "sample_rate": "0.2"
2750 }"#;
2751 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
2752
2753 let mut item = Item::new(ItemType::Event);
2754 item.set_payload(ContentType::Json, r#"{}"#);
2755 envelope.add_item(item);
2756
2757 let outcome_aggregator = Addr::dummy();
2758 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2759
2760 let mut project_info = ProjectInfo::default();
2761 project_info.public_keys.push(PublicKeyConfig {
2762 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
2763 numeric_id: Some(1),
2764 });
2765
2766 let config = serde_json::json!({
2767 "processing": {
2768 "enabled": true,
2769 "kafka_config": [],
2770 }
2771 });
2772
2773 let message = ProcessEnvelopeGrouped {
2774 group: ProcessingGroup::Error,
2775 envelope: managed_envelope,
2776 ctx: processing::Context {
2777 config: &Config::from_json_value(config.clone()).unwrap(),
2778 project_info: &project_info,
2779 sampling_project_info: Some(&project_info),
2780 ..processing::Context::for_test()
2781 },
2782 };
2783
2784 let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
2785 let Ok(Some((output, ctx))) = processor.process(message).await else {
2786 panic!();
2787 };
2788 let envelope = output.serialize_envelope(ctx).unwrap();
2789 let event = envelope
2790 .get_item_by(|item| item.ty() == &ItemType::Event)
2791 .unwrap();
2792
2793 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
2794 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
2795 Object(
2796 {
2797 "environment": ~,
2798 "public_key": String(
2799 "e12d836b15bb49d7bbf99e64295d995b",
2800 ),
2801 "release": ~,
2802 "replay_id": ~,
2803 "sample_rate": String(
2804 "0.2",
2805 ),
2806 "trace_id": String(
2807 "00000000000000000000000000000000",
2808 ),
2809 "transaction": ~,
2810 },
2811 )
2812 "###);
2813 }
2814
2815 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
2816 let mut event = Annotated::<Event>::from_json(
2817 r#"
2818 {
2819 "type": "transaction",
2820 "transaction": "/foo/",
2821 "timestamp": 946684810.0,
2822 "start_timestamp": 946684800.0,
2823 "contexts": {
2824 "trace": {
2825 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
2826 "span_id": "fa90fdead5f74053",
2827 "op": "http.server",
2828 "type": "trace"
2829 }
2830 },
2831 "transaction_info": {
2832 "source": "url"
2833 }
2834 }
2835 "#,
2836 )
2837 .unwrap();
2838 let e = event.value_mut().as_mut().unwrap();
2839 e.transaction.set_value(Some(transaction_name.into()));
2840
2841 e.transaction_info
2842 .value_mut()
2843 .as_mut()
2844 .unwrap()
2845 .source
2846 .set_value(Some(source));
2847
2848 relay_statsd::with_capturing_test_client(|| {
2849 utils::log_transaction_name_metrics(&mut event, |event| {
2850 let config = NormalizationConfig {
2851 transaction_name_config: TransactionNameConfig {
2852 rules: &[TransactionNameRule {
2853 pattern: LazyGlob::new("/foo/*/**".to_owned()),
2854 expiry: DateTime::<Utc>::MAX_UTC,
2855 redaction: RedactionRule::Replace {
2856 substitution: "*".to_owned(),
2857 },
2858 }],
2859 },
2860 ..Default::default()
2861 };
2862 relay_event_normalization::normalize_event(event, &config)
2863 });
2864 })
2865 }
2866
2867 #[test]
2868 fn test_log_transaction_metrics_none() {
2869 let captures = capture_test_event("/nothing", TransactionSource::Url);
2870 insta::assert_debug_snapshot!(captures, @r###"
2871 [
2872 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
2873 ]
2874 "###);
2875 }
2876
2877 #[test]
2878 fn test_log_transaction_metrics_rule() {
2879 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
2880 insta::assert_debug_snapshot!(captures, @r###"
2881 [
2882 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
2883 ]
2884 "###);
2885 }
2886
2887 #[test]
2888 fn test_log_transaction_metrics_pattern() {
2889 let captures = capture_test_event("/something/12345", TransactionSource::Url);
2890 insta::assert_debug_snapshot!(captures, @r###"
2891 [
2892 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
2893 ]
2894 "###);
2895 }
2896
2897 #[test]
2898 fn test_log_transaction_metrics_both() {
2899 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
2900 insta::assert_debug_snapshot!(captures, @r###"
2901 [
2902 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
2903 ]
2904 "###);
2905 }
2906
2907 #[test]
2908 fn test_log_transaction_metrics_no_match() {
2909 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
2910 insta::assert_debug_snapshot!(captures, @r###"
2911 [
2912 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
2913 ]
2914 "###);
2915 }
2916
2917 #[tokio::test]
2918 async fn test_process_metrics_bucket_metadata() {
2919 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
2920 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
2921 let received_at = Utc::now();
2922 let config = Config::default();
2923
2924 let (aggregator, mut aggregator_rx) = Addr::custom();
2925 let processor = create_test_processor_with_addrs(
2926 config,
2927 Addrs {
2928 aggregator,
2929 ..Default::default()
2930 },
2931 )
2932 .await;
2933
2934 let mut item = Item::new(ItemType::Statsd);
2935 item.set_payload(ContentType::Text, "spans/foo:3182887624:4267882815|s");
2936 for (source, expected_received_at) in [
2937 (
2938 BucketSource::External,
2939 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
2940 ),
2941 (BucketSource::Internal, None),
2942 ] {
2943 let message = ProcessMetrics {
2944 data: MetricData::Raw(vec![item.clone()]),
2945 project_key,
2946 source,
2947 received_at,
2948 sent_at: Some(Utc::now()),
2949 };
2950 processor.handle_process_metrics(&mut token, message);
2951
2952 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
2953 let buckets = merge_buckets.buckets;
2954 assert_eq!(buckets.len(), 1);
2955 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
2956 }
2957 }
2958
2959 #[tokio::test]
2960 async fn test_process_batched_metrics() {
2961 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
2962 let received_at = Utc::now();
2963 let config = Config::default();
2964
2965 let (aggregator, mut aggregator_rx) = Addr::custom();
2966 let processor = create_test_processor_with_addrs(
2967 config,
2968 Addrs {
2969 aggregator,
2970 ..Default::default()
2971 },
2972 )
2973 .await;
2974
2975 let payload = r#"{
2976 "buckets": {
2977 "11111111111111111111111111111111": [
2978 {
2979 "timestamp": 1615889440,
2980 "width": 0,
2981 "name": "d:custom/endpoint.response_time@millisecond",
2982 "type": "d",
2983 "value": [
2984 68.0
2985 ],
2986 "tags": {
2987 "route": "user_index"
2988 }
2989 }
2990 ],
2991 "22222222222222222222222222222222": [
2992 {
2993 "timestamp": 1615889440,
2994 "width": 0,
2995 "name": "d:custom/endpoint.cache_rate@none",
2996 "type": "d",
2997 "value": [
2998 36.0
2999 ]
3000 }
3001 ]
3002 }
3003}
3004"#;
3005 let message = ProcessBatchedMetrics {
3006 payload: Bytes::from(payload),
3007 source: BucketSource::Internal,
3008 received_at,
3009 sent_at: Some(Utc::now()),
3010 };
3011 processor.handle_process_batched_metrics(&mut token, message);
3012
3013 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3014 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3015
3016 let mut messages = vec![mb1, mb2];
3017 messages.sort_by_key(|mb| mb.project_key);
3018
3019 let actual = messages
3020 .into_iter()
3021 .map(|mb| (mb.project_key, mb.buckets))
3022 .collect::<Vec<_>>();
3023
3024 assert_debug_snapshot!(actual, @r###"
3025 [
3026 (
3027 ProjectKey("11111111111111111111111111111111"),
3028 [
3029 Bucket {
3030 timestamp: UnixTimestamp(1615889440),
3031 width: 0,
3032 name: MetricName(
3033 "d:custom/endpoint.response_time@millisecond",
3034 ),
3035 value: Distribution(
3036 [
3037 68.0,
3038 ],
3039 ),
3040 tags: {
3041 "route": "user_index",
3042 },
3043 metadata: BucketMetadata {
3044 merges: 1,
3045 received_at: None,
3046 extracted_from_indexed: false,
3047 },
3048 },
3049 ],
3050 ),
3051 (
3052 ProjectKey("22222222222222222222222222222222"),
3053 [
3054 Bucket {
3055 timestamp: UnixTimestamp(1615889440),
3056 width: 0,
3057 name: MetricName(
3058 "d:custom/endpoint.cache_rate@none",
3059 ),
3060 value: Distribution(
3061 [
3062 36.0,
3063 ],
3064 ),
3065 tags: {},
3066 metadata: BucketMetadata {
3067 merges: 1,
3068 received_at: None,
3069 extracted_from_indexed: false,
3070 },
3071 },
3072 ],
3073 ),
3074 ]
3075 "###);
3076 }
3077}