1use std::borrow::Cow;
2use std::collections::{BTreeMap, BTreeSet, HashMap};
3use std::error::Error;
4use std::fmt::{Debug, Display};
5use std::future::Future;
6use std::io::Write;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::Context;
12use brotli::CompressorWriter as BrotliEncoder;
13use bytes::Bytes;
14use chrono::{DateTime, Utc};
15use flate2::Compression;
16use flate2::write::{GzEncoder, ZlibEncoder};
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use relay_base_schema::project::{ProjectId, ProjectKey};
20use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
21use relay_common::time::UnixTimestamp;
22use relay_config::{Config, HttpEncoding, UpstreamDescriptor};
23use relay_dynamic_config::Feature;
24use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
25use relay_event_schema::processor::ProcessingAction;
26use relay_event_schema::protocol::{ClientReport, EventId, SpanV2};
27use relay_filter::FilterStatKey;
28use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace};
29use relay_quotas::{DataCategory, RateLimits, Scoping};
30use relay_sampling::evaluation::SamplingDecision;
31use relay_statsd::metric;
32use relay_system::{Addr, FromMessage, NoResponse, Service};
33use reqwest::header;
34use smallvec::{SmallVec, smallvec};
35use zstd::stream::Encoder as ZstdEncoder;
36
37use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemContainer, ItemType};
38use crate::extractors::{PartialDsn, RequestMeta, RequestTrust};
39use crate::integrations::Integration;
40use crate::managed::ManagedEnvelope;
41use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket};
42use crate::metrics_extraction::ExtractedMetrics;
43use crate::processing::attachments::AttachmentProcessor;
44use crate::processing::check_ins::CheckInsProcessor;
45use crate::processing::client_reports::ClientReportsProcessor;
46use crate::processing::errors::{ErrorsProcessor, SwitchProcessingError};
47use crate::processing::forward_unknown::ForwardUnknownProcessor;
48use crate::processing::legacy_spans::LegacySpansProcessor;
49use crate::processing::logs::LogsProcessor;
50use crate::processing::profile_chunks::ProfileChunksProcessor;
51use crate::processing::profiles::ProfilesProcessor;
52use crate::processing::replays::ReplaysProcessor;
53use crate::processing::sessions::SessionsProcessor;
54use crate::processing::spans::SpansProcessor;
55use crate::processing::trace_attachments::TraceAttachmentsProcessor;
56use crate::processing::trace_metrics::TraceMetricsProcessor;
57use crate::processing::transactions::TransactionProcessor;
58use crate::processing::user_reports::UserReportsProcessor;
59use crate::processing::{Forward as _, ForwardContext, Output, Outputs, QuotaRateLimiter};
60use crate::service::ServiceError;
61use crate::services::global_config::GlobalConfigHandle;
62use crate::services::metrics::{Aggregator, FlushBuckets, MergeBuckets, ProjectBuckets};
63use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
64use crate::services::projects::cache::ProjectCacheHandle;
65use crate::services::projects::project::{ProjectInfo, ProjectState};
66use crate::services::upstream::{
67 SendRequest, Sign, SignatureType, UpstreamRelay, UpstreamRequest, UpstreamRequestError,
68};
69use crate::statsd::{RelayCounters, RelayDistributions, RelayTimers};
70use crate::utils;
71use crate::{http, processing};
72use relay_threading::AsyncPool;
73use symbolic_unreal::{Unreal4Error, Unreal4ErrorKind};
74#[cfg(feature = "processing")]
75use {
76 crate::services::objectstore::Objectstore,
77 crate::services::store::Store,
78 itertools::Itertools,
79 relay_dynamic_config::GlobalConfig,
80 relay_quotas::{Quota, RateLimitingError, RedisRateLimiter},
81 relay_redis::RedisClients,
82 std::time::Instant,
83};
84
85mod metrics;
86
87pub const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);
89
90#[derive(Debug)]
91pub struct GroupTypeError;
92
93impl Display for GroupTypeError {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 f.write_str("failed to convert processing group into corresponding type")
96 }
97}
98
99impl std::error::Error for GroupTypeError {}
100
101macro_rules! processing_group {
102 ($ty:ident, $variant:ident$(, $($other:ident),+)?) => {
103 #[derive(Clone, Copy, Debug)]
104 pub struct $ty;
105
106 impl From<$ty> for ProcessingGroup {
107 fn from(_: $ty) -> Self {
108 ProcessingGroup::$variant
109 }
110 }
111
112 impl TryFrom<ProcessingGroup> for $ty {
113 type Error = GroupTypeError;
114
115 fn try_from(value: ProcessingGroup) -> Result<Self, Self::Error> {
116 if matches!(value, ProcessingGroup::$variant) {
117 return Ok($ty);
118 }
119 $($(
120 if matches!(value, ProcessingGroup::$other) {
121 return Ok($ty);
122 }
123 )+)?
124 return Err(GroupTypeError);
125 }
126 }
127 };
128}
129
130processing_group!(TransactionGroup, Transaction);
131
132processing_group!(ErrorGroup, Error);
133
134processing_group!(SessionGroup, Session);
135processing_group!(ClientReportGroup, ClientReport);
136processing_group!(ReplayGroup, Replay);
137processing_group!(CheckInGroup, CheckIn);
138processing_group!(TraceMetricGroup, TraceMetric);
139processing_group!(SpanGroup, Span);
140
141processing_group!(ProfileChunkGroup, ProfileChunk);
142processing_group!(ForwardUnknownGroup, ForwardUnknown);
143processing_group!(Ungrouped, Ungrouped);
144
145#[derive(Clone, Copy, Debug)]
147pub enum ProcessingGroup {
148 Transaction,
152 Error,
157 Session,
159 StandaloneAttachments,
163 StandaloneUserReports,
167 StandaloneProfiles,
171 ClientReport,
173 Replay,
175 CheckIn,
177 Log,
179 TraceMetric,
181 Span,
183 SpanV2,
185 ProfileChunk,
187 TraceAttachment,
189 ForwardUnknown,
192 Ungrouped,
194}
195
196impl ProcessingGroup {
197 fn split_envelope(
199 mut envelope: Envelope,
200 project_info: &ProjectInfo,
201 ) -> SmallVec<[(Self, Box<Envelope>); 3]> {
202 let headers = envelope.headers().clone();
203 let mut grouped_envelopes = smallvec![];
204
205 let replay_items = envelope.take_items_by(|item| {
207 matches!(
208 item.ty(),
209 &ItemType::ReplayEvent | &ItemType::ReplayRecording | &ItemType::ReplayVideo
210 )
211 });
212 if !replay_items.is_empty() {
213 grouped_envelopes.push((
214 ProcessingGroup::Replay,
215 Envelope::from_parts(headers.clone(), replay_items),
216 ))
217 }
218
219 let session_items = envelope
221 .take_items_by(|item| matches!(item.ty(), &ItemType::Session | &ItemType::Sessions));
222 if !session_items.is_empty() {
223 grouped_envelopes.push((
224 ProcessingGroup::Session,
225 Envelope::from_parts(headers.clone(), session_items),
226 ))
227 }
228
229 let span_v2_items = envelope.take_items_by(|item| {
230 let exp_feature = project_info.has_feature(Feature::SpanV2ExperimentalProcessing);
231
232 ItemContainer::<SpanV2>::is_container(item)
233 || matches!(item.integration(), Some(Integration::Spans(_)))
234 || (exp_feature && matches!(item.ty(), &ItemType::Span))
236 || (exp_feature && item.is_span_attachment())
237 });
238
239 if !span_v2_items.is_empty() {
240 grouped_envelopes.push((
241 ProcessingGroup::SpanV2,
242 Envelope::from_parts(headers.clone(), span_v2_items),
243 ))
244 }
245
246 let span_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Span));
248 if !span_items.is_empty() {
249 grouped_envelopes.push((
250 ProcessingGroup::Span,
251 Envelope::from_parts(headers.clone(), span_items),
252 ))
253 }
254
255 let logs_items = envelope.take_items_by(|item| {
257 matches!(item.ty(), &ItemType::Log)
258 || matches!(item.integration(), Some(Integration::Logs(_)))
259 });
260 if !logs_items.is_empty() {
261 grouped_envelopes.push((
262 ProcessingGroup::Log,
263 Envelope::from_parts(headers.clone(), logs_items),
264 ))
265 }
266
267 let trace_metric_items =
269 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::TraceMetric));
270 if !trace_metric_items.is_empty() {
271 grouped_envelopes.push((
272 ProcessingGroup::TraceMetric,
273 Envelope::from_parts(headers.clone(), trace_metric_items),
274 ))
275 }
276
277 let profile_chunk_items =
279 envelope.take_items_by(|item| matches!(item.ty(), &ItemType::ProfileChunk));
280 if !profile_chunk_items.is_empty() {
281 grouped_envelopes.push((
282 ProcessingGroup::ProfileChunk,
283 Envelope::from_parts(headers.clone(), profile_chunk_items),
284 ))
285 }
286
287 let trace_attachment_items = envelope.take_items_by(Item::is_trace_attachment);
288 if !trace_attachment_items.is_empty() {
289 grouped_envelopes.push((
290 ProcessingGroup::TraceAttachment,
291 Envelope::from_parts(headers.clone(), trace_attachment_items),
292 ))
293 }
294
295 if !envelope.items().any(Item::creates_event) {
296 let standalone_attachments = envelope
298 .take_items_by(|i| i.requires_event() && matches!(i.ty(), ItemType::Attachment));
299 if !standalone_attachments.is_empty() {
300 grouped_envelopes.push((
301 ProcessingGroup::StandaloneAttachments,
302 Envelope::from_parts(headers.clone(), standalone_attachments),
303 ))
304 }
305
306 let standalone_user_reports =
308 envelope.take_items_by(|i| matches!(i.ty(), ItemType::UserReport));
309 if !standalone_user_reports.is_empty() {
310 grouped_envelopes.push((
311 ProcessingGroup::StandaloneUserReports,
312 Envelope::from_parts(headers.clone(), standalone_user_reports),
313 ));
314 }
315
316 let standalone_profiles =
318 envelope.take_items_by(|i| matches!(i.ty(), ItemType::Profile));
319 if !standalone_profiles.is_empty() {
320 grouped_envelopes.push((
321 ProcessingGroup::StandaloneProfiles,
322 Envelope::from_parts(headers.clone(), standalone_profiles),
323 ));
324 }
325 }
326
327 let security_reports_items = envelope
329 .take_items_by(|i| matches!(i.ty(), &ItemType::RawSecurity))
330 .into_iter()
331 .map(|item| {
332 let headers = headers.clone();
333 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
334 let mut envelope = Envelope::from_parts(headers, items);
335 envelope.set_event_id(EventId::new());
336 (ProcessingGroup::Error, envelope)
337 });
338 grouped_envelopes.extend(security_reports_items);
339
340 let require_event_items = envelope.take_items_by(Item::requires_event);
342 if !require_event_items.is_empty() {
343 let group = if require_event_items
344 .iter()
345 .any(|item| matches!(item.ty(), &ItemType::Transaction | &ItemType::Profile))
346 {
347 ProcessingGroup::Transaction
348 } else {
349 ProcessingGroup::Error
350 };
351
352 grouped_envelopes.push((
353 group,
354 Envelope::from_parts(headers.clone(), require_event_items),
355 ))
356 }
357
358 let envelopes = envelope.items_mut().map(|item| {
360 let headers = headers.clone();
361 let items: SmallVec<[Item; 3]> = smallvec![item.clone()];
362 let envelope = Envelope::from_parts(headers, items);
363 let item_type = item.ty();
364 let group = if matches!(item_type, &ItemType::CheckIn) {
365 ProcessingGroup::CheckIn
366 } else if matches!(item.ty(), &ItemType::ClientReport) {
367 ProcessingGroup::ClientReport
368 } else if matches!(item_type, &ItemType::Unknown(_)) {
369 ProcessingGroup::ForwardUnknown
370 } else {
371 ProcessingGroup::Ungrouped
373 };
374
375 (group, envelope)
376 });
377 grouped_envelopes.extend(envelopes);
378
379 grouped_envelopes
380 }
381
382 pub fn variant(&self) -> &'static str {
384 match self {
385 ProcessingGroup::Transaction => "transaction",
386 ProcessingGroup::Error => "error",
387 ProcessingGroup::Session => "session",
388 ProcessingGroup::StandaloneAttachments => "standalone_attachment",
389 ProcessingGroup::StandaloneUserReports => "standalone_user_reports",
390 ProcessingGroup::StandaloneProfiles => "standalone_profiles",
391 ProcessingGroup::ClientReport => "client_report",
392 ProcessingGroup::Replay => "replay",
393 ProcessingGroup::CheckIn => "check_in",
394 ProcessingGroup::Log => "log",
395 ProcessingGroup::TraceMetric => "trace_metric",
396 ProcessingGroup::Span => "span",
397 ProcessingGroup::SpanV2 => "span_v2",
398 ProcessingGroup::ProfileChunk => "profile_chunk",
399 ProcessingGroup::TraceAttachment => "trace_attachment",
400 ProcessingGroup::ForwardUnknown => "forward_unknown",
401 ProcessingGroup::Ungrouped => "ungrouped",
402 }
403 }
404}
405
406impl From<ProcessingGroup> for AppFeature {
407 fn from(value: ProcessingGroup) -> Self {
408 match value {
409 ProcessingGroup::Transaction => AppFeature::Transactions,
410 ProcessingGroup::Error => AppFeature::Errors,
411 ProcessingGroup::Session => AppFeature::Sessions,
412 ProcessingGroup::StandaloneAttachments => AppFeature::UnattributedEnvelope,
413 ProcessingGroup::StandaloneUserReports => AppFeature::UserReports,
414 ProcessingGroup::StandaloneProfiles => AppFeature::Profiles,
415 ProcessingGroup::ClientReport => AppFeature::ClientReports,
416 ProcessingGroup::Replay => AppFeature::Replays,
417 ProcessingGroup::CheckIn => AppFeature::CheckIns,
418 ProcessingGroup::Log => AppFeature::Logs,
419 ProcessingGroup::TraceMetric => AppFeature::TraceMetrics,
420 ProcessingGroup::Span => AppFeature::Spans,
421 ProcessingGroup::SpanV2 => AppFeature::Spans,
422 ProcessingGroup::ProfileChunk => AppFeature::Profiles,
423 ProcessingGroup::ForwardUnknown => AppFeature::UnattributedEnvelope,
424 ProcessingGroup::Ungrouped => AppFeature::UnattributedEnvelope,
425 ProcessingGroup::TraceAttachment => AppFeature::TraceAttachments,
426 }
427 }
428}
429
430#[derive(Debug, thiserror::Error)]
432pub enum ProcessingError {
433 #[error("invalid json in event")]
434 InvalidJson(#[source] serde_json::Error),
435
436 #[error("invalid message pack event payload")]
437 InvalidMsgpack(#[from] rmp_serde::decode::Error),
438
439 #[error("invalid unreal crash report")]
440 InvalidUnrealReport(#[source] Unreal4Error),
441
442 #[error("event payload too large")]
443 PayloadTooLarge(DiscardItemType),
444
445 #[error("invalid transaction event")]
446 InvalidTransaction,
447
448 #[error("the item is not allowed/supported in this envelope")]
449 UnsupportedItem,
450
451 #[error("envelope processor failed")]
452 ProcessingFailed(#[from] ProcessingAction),
453
454 #[error("duplicate {0} in event")]
455 DuplicateItem(ItemType),
456
457 #[error("failed to extract event payload")]
458 NoEventPayload,
459
460 #[error("missing project id in DSN")]
461 MissingProjectId,
462
463 #[error("invalid security report type: {0:?}")]
464 InvalidSecurityType(Bytes),
465
466 #[error("unsupported security report type")]
467 UnsupportedSecurityType,
468
469 #[error("invalid security report")]
470 InvalidSecurityReport(#[source] serde_json::Error),
471
472 #[error("event filtered with reason: {0:?}")]
473 EventFiltered(FilterStatKey),
474
475 #[error("could not serialize event payload")]
476 SerializeFailed(#[source] serde_json::Error),
477
478 #[cfg(feature = "processing")]
479 #[error("failed to apply quotas")]
480 QuotasFailed(#[from] RateLimitingError),
481
482 #[error("nintendo switch dying message processing failed {0:?}")]
483 InvalidNintendoDyingMessage(#[source] SwitchProcessingError),
484
485 #[cfg(all(sentry, feature = "processing"))]
486 #[error("playstation dump processing failed: {0}")]
487 InvalidPlaystationDump(String),
488
489 #[error("processing group does not match specific processor")]
490 ProcessingGroupMismatch,
491 #[error("new processing pipeline failed")]
492 ProcessingFailure,
493
494 #[cfg(feature = "processing")]
495 #[error("invalid attachment reference")]
496 InvalidAttachmentRef,
497
498 #[error("could not determine processing group for envelope items")]
499 NoProcessingGroup,
500}
501
502impl ProcessingError {
503 pub fn to_outcome(&self) -> Option<Outcome> {
504 match self {
505 Self::PayloadTooLarge(payload_type) => {
506 Some(Outcome::Invalid(DiscardReason::ItemTooLarge(*payload_type)))
507 }
508 Self::InvalidJson(_) => Some(Outcome::Invalid(DiscardReason::InvalidJson)),
509 Self::InvalidMsgpack(_) => Some(Outcome::Invalid(DiscardReason::InvalidMsgpack)),
510 Self::InvalidSecurityType(_) => {
511 Some(Outcome::Invalid(DiscardReason::SecurityReportType))
512 }
513 Self::UnsupportedItem => Some(Outcome::Invalid(DiscardReason::InvalidEnvelope)),
514 Self::InvalidSecurityReport(_) => Some(Outcome::Invalid(DiscardReason::SecurityReport)),
515 Self::UnsupportedSecurityType => Some(Outcome::Filtered(FilterStatKey::InvalidCsp)),
516 Self::InvalidTransaction => Some(Outcome::Invalid(DiscardReason::InvalidTransaction)),
517 Self::DuplicateItem(_) => Some(Outcome::Invalid(DiscardReason::DuplicateItem)),
518 Self::NoEventPayload => Some(Outcome::Invalid(DiscardReason::NoEventPayload)),
519 Self::InvalidNintendoDyingMessage(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
520 #[cfg(all(sentry, feature = "processing"))]
521 Self::InvalidPlaystationDump(_) => Some(Outcome::Invalid(DiscardReason::Payload)),
522 Self::InvalidUnrealReport(err) if err.kind() == Unreal4ErrorKind::BadCompression => {
523 Some(Outcome::Invalid(DiscardReason::InvalidCompression))
524 }
525 Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),
526 Self::SerializeFailed(_) | Self::ProcessingFailed(_) => {
527 Some(Outcome::Invalid(DiscardReason::Internal))
528 }
529 #[cfg(feature = "processing")]
530 Self::QuotasFailed(_) => Some(Outcome::Invalid(DiscardReason::Internal)),
531 Self::MissingProjectId => None,
532 Self::EventFiltered(key) => Some(Outcome::Filtered(key.clone())),
533
534 Self::ProcessingGroupMismatch => Some(Outcome::Invalid(DiscardReason::Internal)),
535 Self::ProcessingFailure => None,
537 #[cfg(feature = "processing")]
538 Self::InvalidAttachmentRef => {
539 Some(Outcome::Invalid(DiscardReason::InvalidAttachmentRef))
540 }
541 Self::NoProcessingGroup => Some(Outcome::Invalid(DiscardReason::Internal)),
542 }
543 }
544
545 fn is_unexpected(&self) -> bool {
546 self.to_outcome()
547 .is_some_and(|outcome| outcome.is_unexpected())
548 }
549}
550
551impl From<Unreal4Error> for ProcessingError {
552 fn from(err: Unreal4Error) -> Self {
553 match err.kind() {
554 Unreal4ErrorKind::TooLarge => Self::PayloadTooLarge(ItemType::UnrealReport.into()),
555 _ => ProcessingError::InvalidUnrealReport(err),
556 }
557 }
558}
559
560#[derive(Debug)]
565pub struct ProcessingExtractedMetrics {
566 metrics: ExtractedMetrics,
567}
568
569impl ProcessingExtractedMetrics {
570 pub fn new() -> Self {
571 Self {
572 metrics: ExtractedMetrics::default(),
573 }
574 }
575
576 pub fn into_inner(self) -> ExtractedMetrics {
577 self.metrics
578 }
579
580 pub fn extend(
582 &mut self,
583 extracted: ExtractedMetrics,
584 sampling_decision: Option<SamplingDecision>,
585 ) {
586 self.extend_project_metrics(extracted.project_metrics, sampling_decision);
587 self.extend_sampling_metrics(extracted.sampling_metrics, sampling_decision);
588 }
589
590 pub fn extend_project_metrics<I>(
592 &mut self,
593 buckets: I,
594 sampling_decision: Option<SamplingDecision>,
595 ) where
596 I: IntoIterator<Item = Bucket>,
597 {
598 self.metrics
599 .project_metrics
600 .extend(buckets.into_iter().map(|mut bucket| {
601 bucket.metadata.extracted_from_indexed =
602 sampling_decision == Some(SamplingDecision::Keep);
603 bucket
604 }));
605 }
606
607 pub fn extend_sampling_metrics<I>(
609 &mut self,
610 buckets: I,
611 sampling_decision: Option<SamplingDecision>,
612 ) where
613 I: IntoIterator<Item = Bucket>,
614 {
615 self.metrics
616 .sampling_metrics
617 .extend(buckets.into_iter().map(|mut bucket| {
618 bucket.metadata.extracted_from_indexed =
619 sampling_decision == Some(SamplingDecision::Keep);
620 bucket
621 }));
622 }
623}
624
625fn send_metrics(
626 metrics: ExtractedMetrics,
627 project_key: ProjectKey,
628 sampling_key: Option<ProjectKey>,
629 aggregator: &Addr<Aggregator>,
630) {
631 let ExtractedMetrics {
632 project_metrics,
633 sampling_metrics,
634 } = metrics;
635
636 if !project_metrics.is_empty() {
637 aggregator.send(MergeBuckets {
638 project_key,
639 buckets: project_metrics,
640 });
641 }
642
643 if !sampling_metrics.is_empty() {
644 let sampling_project_key = sampling_key.unwrap_or(project_key);
651 aggregator.send(MergeBuckets {
652 project_key: sampling_project_key,
653 buckets: sampling_metrics,
654 });
655 }
656}
657
658#[derive(Debug)]
668pub struct ProcessEnvelope {
669 pub envelope: ManagedEnvelope,
671 pub project_info: Arc<ProjectInfo>,
673 pub rate_limits: Arc<RateLimits>,
675 pub sampling_project_info: Option<Arc<ProjectInfo>>,
677}
678
679#[derive(Debug)]
681struct ProcessEnvelopeGrouped<'a> {
682 pub group: ProcessingGroup,
684 pub envelope: ManagedEnvelope,
686 pub ctx: processing::Context<'a>,
688}
689
690#[derive(Debug)]
702pub struct ProcessMetrics {
703 pub data: MetricData,
705 pub project_key: ProjectKey,
707 pub source: BucketSource,
709 pub received_at: DateTime<Utc>,
711 pub sent_at: Option<DateTime<Utc>>,
714}
715
716#[derive(Debug)]
718pub enum MetricData {
719 Raw(Vec<Item>),
721 Parsed(Vec<Bucket>),
723}
724
725impl MetricData {
726 fn into_buckets(self, timestamp: UnixTimestamp) -> Vec<Bucket> {
731 let items = match self {
732 Self::Parsed(buckets) => return buckets,
733 Self::Raw(items) => items,
734 };
735
736 let mut buckets = Vec::new();
737 for item in items {
738 let payload = item.payload();
739 if item.ty() == &ItemType::Statsd {
740 for bucket_result in Bucket::parse_all(&payload, timestamp) {
741 match bucket_result {
742 Ok(bucket) => buckets.push(bucket),
743 Err(error) => relay_log::debug!(
744 error = &error as &dyn Error,
745 "failed to parse metric bucket from statsd format",
746 ),
747 }
748 }
749 } else if item.ty() == &ItemType::MetricBuckets {
750 match serde_json::from_slice::<Vec<Bucket>>(&payload) {
751 Ok(parsed_buckets) => {
752 if buckets.is_empty() {
754 buckets = parsed_buckets;
755 } else {
756 buckets.extend(parsed_buckets);
757 }
758 }
759 Err(error) => {
760 relay_log::debug!(
761 error = &error as &dyn Error,
762 "failed to parse metric bucket",
763 );
764 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
765 }
766 }
767 } else {
768 relay_log::error!(
769 "invalid item of type {} passed to ProcessMetrics",
770 item.ty()
771 );
772 }
773 }
774 buckets
775 }
776}
777
778#[derive(Debug)]
779pub struct ProcessBatchedMetrics {
780 pub payload: Bytes,
782 pub source: BucketSource,
784 pub received_at: DateTime<Utc>,
786 pub sent_at: Option<DateTime<Utc>>,
788}
789
790#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
792pub enum BucketSource {
793 Internal,
799 External,
804}
805
806impl BucketSource {
807 pub fn from_meta(meta: &RequestMeta) -> Self {
809 match meta.request_trust() {
810 RequestTrust::Trusted => Self::Internal,
811 RequestTrust::Untrusted => Self::External,
812 }
813 }
814}
815
816#[derive(Debug)]
818pub struct SubmitClientReports {
819 pub client_reports: Vec<ClientReport>,
821 pub scoping: Scoping,
823}
824
825#[derive(Debug)]
827pub enum EnvelopeProcessor {
828 ProcessEnvelope(Box<ProcessEnvelope>),
829 ProcessProjectMetrics(Box<ProcessMetrics>),
830 ProcessBatchedMetrics(Box<ProcessBatchedMetrics>),
831 FlushBuckets(Box<FlushBuckets>),
832 SubmitClientReports(Box<SubmitClientReports>),
833}
834
835impl EnvelopeProcessor {
836 pub fn variant(&self) -> &'static str {
838 match self {
839 EnvelopeProcessor::ProcessEnvelope(_) => "ProcessEnvelope",
840 EnvelopeProcessor::ProcessProjectMetrics(_) => "ProcessProjectMetrics",
841 EnvelopeProcessor::ProcessBatchedMetrics(_) => "ProcessBatchedMetrics",
842 EnvelopeProcessor::FlushBuckets(_) => "FlushBuckets",
843 EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
844 }
845 }
846}
847
848impl relay_system::Interface for EnvelopeProcessor {}
849
850impl FromMessage<ProcessEnvelope> for EnvelopeProcessor {
851 type Response = relay_system::NoResponse;
852
853 fn from_message(message: ProcessEnvelope, _sender: ()) -> Self {
854 Self::ProcessEnvelope(Box::new(message))
855 }
856}
857
858impl FromMessage<ProcessMetrics> for EnvelopeProcessor {
859 type Response = NoResponse;
860
861 fn from_message(message: ProcessMetrics, _: ()) -> Self {
862 Self::ProcessProjectMetrics(Box::new(message))
863 }
864}
865
866impl FromMessage<ProcessBatchedMetrics> for EnvelopeProcessor {
867 type Response = NoResponse;
868
869 fn from_message(message: ProcessBatchedMetrics, _: ()) -> Self {
870 Self::ProcessBatchedMetrics(Box::new(message))
871 }
872}
873
874impl FromMessage<FlushBuckets> for EnvelopeProcessor {
875 type Response = NoResponse;
876
877 fn from_message(message: FlushBuckets, _: ()) -> Self {
878 Self::FlushBuckets(Box::new(message))
879 }
880}
881
882impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
883 type Response = NoResponse;
884
885 fn from_message(message: SubmitClientReports, _: ()) -> Self {
886 Self::SubmitClientReports(Box::new(message))
887 }
888}
889
890pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
892
893#[derive(Clone)]
897pub struct EnvelopeProcessorService {
898 inner: Arc<InnerProcessor>,
899}
900
901pub struct Addrs {
903 pub outcome_aggregator: Addr<TrackOutcome>,
904 pub upstream_relay: Addr<UpstreamRelay>,
905 #[cfg(feature = "processing")]
906 pub objectstore: Option<Addr<Objectstore>>,
907 #[cfg(feature = "processing")]
908 pub store_forwarder: Option<Addr<Store>>,
909 pub aggregator: Addr<Aggregator>,
910}
911
912impl Default for Addrs {
913 fn default() -> Self {
914 Addrs {
915 outcome_aggregator: Addr::dummy(),
916 upstream_relay: Addr::dummy(),
917 #[cfg(feature = "processing")]
918 objectstore: None,
919 #[cfg(feature = "processing")]
920 store_forwarder: None,
921 aggregator: Addr::dummy(),
922 }
923 }
924}
925
926struct InnerProcessor {
927 pool: EnvelopeProcessorServicePool,
928 config: Arc<Config>,
929 global_config: GlobalConfigHandle,
930 project_cache: ProjectCacheHandle,
931 cogs: Cogs,
932 addrs: Addrs,
933 #[cfg(feature = "processing")]
934 rate_limiter: Option<Arc<RedisRateLimiter>>,
935 metric_outcomes: MetricOutcomes,
936 processing: Processing,
937}
938
939struct Processing {
940 errors: ErrorsProcessor,
941 logs: LogsProcessor,
942 trace_metrics: TraceMetricsProcessor,
943 spans: SpansProcessor,
944 legacy_spans: LegacySpansProcessor,
945 check_ins: CheckInsProcessor,
946 sessions: SessionsProcessor,
947 transactions: TransactionProcessor,
948 profile_chunks: ProfileChunksProcessor,
949 trace_attachments: TraceAttachmentsProcessor,
950 replays: ReplaysProcessor,
951 client_reports: ClientReportsProcessor,
952 attachments: AttachmentProcessor,
953 user_reports: UserReportsProcessor,
954 profiles: ProfilesProcessor,
955 forward_unknown: ForwardUnknownProcessor,
956}
957
958impl EnvelopeProcessorService {
959 #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
961 pub fn new(
962 pool: EnvelopeProcessorServicePool,
963 config: Arc<Config>,
964 global_config: GlobalConfigHandle,
965 project_cache: ProjectCacheHandle,
966 cogs: Cogs,
967 #[cfg(feature = "processing")] redis: Option<RedisClients>,
968 addrs: Addrs,
969 metric_outcomes: MetricOutcomes,
970 ) -> Self {
971 let geoip_lookup = config
972 .geoip_path()
973 .and_then(
974 |p| match GeoIpLookup::open(p).context(ServiceError::GeoIp) {
975 Ok(geoip) => Some(geoip),
976 Err(err) => {
977 relay_log::error!("failed to open GeoIP db {p:?}: {err:?}");
978 None
979 }
980 },
981 )
982 .unwrap_or_else(GeoIpLookup::empty);
983
984 if let Some(build_epoch) = geoip_lookup.build_epoch() {
985 relay_log::info!("Loaded GeoIP database (build: {build_epoch})");
986 }
987
988 #[cfg(feature = "processing")]
989 let rate_limiter = redis.map(|redis| {
990 RedisRateLimiter::new(redis.quotas)
991 .max_limit(config.max_rate_limit())
992 .cache(config.quota_cache_ratio(), config.quota_cache_max())
993 });
994
995 let quota_limiter = Arc::new(QuotaRateLimiter::new(
996 #[cfg(feature = "processing")]
997 project_cache.clone(),
998 #[cfg(feature = "processing")]
999 rate_limiter.clone(),
1000 ));
1001 #[cfg(feature = "processing")]
1002 let rate_limiter = rate_limiter.map(Arc::new);
1003 let outcome_aggregator = addrs.outcome_aggregator.clone();
1004 let inner = InnerProcessor {
1005 pool,
1006 global_config,
1007 project_cache,
1008 cogs,
1009 #[cfg(feature = "processing")]
1010 rate_limiter,
1011 addrs,
1012 metric_outcomes,
1013 processing: Processing {
1014 errors: ErrorsProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1015 logs: LogsProcessor::new(Arc::clone("a_limiter)),
1016 trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)),
1017 spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()),
1018 legacy_spans: LegacySpansProcessor::new(
1019 Arc::clone("a_limiter),
1020 geoip_lookup.clone(),
1021 ),
1022 check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)),
1023 sessions: SessionsProcessor::new(Arc::clone("a_limiter)),
1024 transactions: TransactionProcessor::new(
1025 Arc::clone("a_limiter),
1026 geoip_lookup.clone(),
1027 ),
1028 profile_chunks: ProfileChunksProcessor::new(Arc::clone("a_limiter)),
1029 trace_attachments: TraceAttachmentsProcessor::new(Arc::clone("a_limiter)),
1030 replays: ReplaysProcessor::new(Arc::clone("a_limiter), geoip_lookup),
1031 client_reports: ClientReportsProcessor::new(outcome_aggregator),
1032 attachments: AttachmentProcessor::new(Arc::clone("a_limiter)),
1033 user_reports: UserReportsProcessor::new(Arc::clone("a_limiter)),
1034 profiles: ProfilesProcessor::new(quota_limiter),
1035 forward_unknown: ForwardUnknownProcessor::new(),
1036 },
1037 config,
1038 };
1039
1040 Self {
1041 inner: Arc::new(inner),
1042 }
1043 }
1044
1045 async fn process_with_processor<P: processing::Processor>(
1046 &self,
1047 processor: &P,
1048 mut managed_envelope: ManagedEnvelope,
1049 ctx: processing::Context<'_>,
1050 ) -> Result<Output<Outputs>, ProcessingError>
1051 where
1052 Outputs: From<P::Output>,
1053 {
1054 let Some(work) = processor.prepare_envelope(&mut managed_envelope) else {
1055 debug_assert!(
1056 false,
1057 "there must be work for the {} processor",
1058 std::any::type_name::<P>(),
1059 );
1060 return Err(ProcessingError::ProcessingGroupMismatch);
1061 };
1062
1063 managed_envelope.update();
1064 match managed_envelope.envelope().is_empty() {
1065 true => managed_envelope.accept(),
1066 false => managed_envelope.reject(Outcome::Invalid(DiscardReason::Internal)),
1067 }
1068
1069 processor
1070 .process(work, ctx)
1071 .await
1072 .map_err(|err| {
1073 relay_log::debug!(
1074 error = &err as &dyn std::error::Error,
1075 "processing pipeline failed"
1076 );
1077 ProcessingError::ProcessingFailure
1078 })
1079 .map(|o| o.map(Into::into))
1080 }
1081
1082 async fn process_envelope(
1083 &self,
1084 project_id: ProjectId,
1085 message: ProcessEnvelopeGrouped<'_>,
1086 ) -> Result<Output<Outputs>, ProcessingError> {
1087 let ProcessEnvelopeGrouped {
1088 group,
1089 envelope: mut managed_envelope,
1090 ctx,
1091 } = message;
1092
1093 if let Some(sampling_state) = ctx.sampling_project_info {
1095 managed_envelope
1098 .envelope_mut()
1099 .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
1100 }
1101
1102 if let Some(retention) = ctx.project_info.config.event_retention {
1105 managed_envelope.envelope_mut().set_retention(retention);
1106 }
1107
1108 if let Some(retention) = ctx.project_info.config.downsampled_event_retention {
1111 managed_envelope
1112 .envelope_mut()
1113 .set_downsampled_retention(retention);
1114 }
1115
1116 managed_envelope
1121 .envelope_mut()
1122 .meta_mut()
1123 .set_project_id(project_id);
1124
1125 relay_log::trace!("Processing {group} group", group = group.variant());
1126
1127 match group {
1128 ProcessingGroup::Error => {
1129 self.process_with_processor(&self.inner.processing.errors, managed_envelope, ctx)
1130 .await
1131 }
1132 ProcessingGroup::Transaction => {
1133 self.process_with_processor(
1134 &self.inner.processing.transactions,
1135 managed_envelope,
1136 ctx,
1137 )
1138 .await
1139 }
1140 ProcessingGroup::Session => {
1141 self.process_with_processor(&self.inner.processing.sessions, managed_envelope, ctx)
1142 .await
1143 }
1144 ProcessingGroup::StandaloneAttachments => {
1145 self.process_with_processor(
1146 &self.inner.processing.attachments,
1147 managed_envelope,
1148 ctx,
1149 )
1150 .await
1151 }
1152 ProcessingGroup::StandaloneUserReports => {
1153 self.process_with_processor(
1154 &self.inner.processing.user_reports,
1155 managed_envelope,
1156 ctx,
1157 )
1158 .await
1159 }
1160 ProcessingGroup::StandaloneProfiles => {
1161 self.process_with_processor(&self.inner.processing.profiles, managed_envelope, ctx)
1162 .await
1163 }
1164 ProcessingGroup::ClientReport => {
1165 self.process_with_processor(
1166 &self.inner.processing.client_reports,
1167 managed_envelope,
1168 ctx,
1169 )
1170 .await
1171 }
1172 ProcessingGroup::Replay => {
1173 self.process_with_processor(&self.inner.processing.replays, managed_envelope, ctx)
1174 .await
1175 }
1176 ProcessingGroup::CheckIn => {
1177 self.process_with_processor(&self.inner.processing.check_ins, managed_envelope, ctx)
1178 .await
1179 }
1180 ProcessingGroup::Log => {
1181 self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
1182 .await
1183 }
1184 ProcessingGroup::TraceMetric => {
1185 self.process_with_processor(
1186 &self.inner.processing.trace_metrics,
1187 managed_envelope,
1188 ctx,
1189 )
1190 .await
1191 }
1192 ProcessingGroup::SpanV2 => {
1193 self.process_with_processor(&self.inner.processing.spans, managed_envelope, ctx)
1194 .await
1195 }
1196 ProcessingGroup::TraceAttachment => {
1197 self.process_with_processor(
1198 &self.inner.processing.trace_attachments,
1199 managed_envelope,
1200 ctx,
1201 )
1202 .await
1203 }
1204 ProcessingGroup::Span => {
1205 self.process_with_processor(
1206 &self.inner.processing.legacy_spans,
1207 managed_envelope,
1208 ctx,
1209 )
1210 .await
1211 }
1212 ProcessingGroup::ProfileChunk => {
1213 self.process_with_processor(
1214 &self.inner.processing.profile_chunks,
1215 managed_envelope,
1216 ctx,
1217 )
1218 .await
1219 }
1220 ProcessingGroup::Ungrouped => {
1223 relay_log::error!(
1224 tags.project = %project_id,
1225 items = ?managed_envelope.envelope().items().next().map(Item::ty),
1226 "could not identify the processing group based on the envelope's items"
1227 );
1228
1229 Err(ProcessingError::NoProcessingGroup)
1230 }
1231 ProcessingGroup::ForwardUnknown => {
1235 self.process_with_processor(
1236 &self.inner.processing.forward_unknown,
1237 managed_envelope,
1238 ctx,
1239 )
1240 .await
1241 }
1242 }
1243 }
1244
1245 async fn process<'a>(
1251 &self,
1252 mut message: ProcessEnvelopeGrouped<'a>,
1253 ) -> Result<Option<(Outputs, ForwardContext<'a>)>, ProcessingError> {
1254 let ProcessEnvelopeGrouped {
1255 ref mut envelope,
1256 ctx,
1257 ..
1258 } = message;
1259
1260 let Some(project_id) = ctx
1267 .project_info
1268 .project_id
1269 .or_else(|| envelope.envelope().meta().project_id())
1270 else {
1271 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1272 return Err(ProcessingError::MissingProjectId);
1273 };
1274
1275 let client = envelope.envelope().meta().client().map(str::to_owned);
1276 let user_agent = envelope.envelope().meta().user_agent().map(str::to_owned);
1277 let project_key = envelope.envelope().meta().public_key();
1278 let sampling_key = envelope
1282 .envelope()
1283 .sampling_key()
1284 .filter(|_| ctx.sampling_project_info.is_some());
1285
1286 relay_log::configure_scope(|scope| {
1289 scope.set_tag("project", project_id);
1290 if let Some(client) = client {
1291 scope.set_tag("sdk", client);
1292 }
1293 if let Some(user_agent) = user_agent {
1294 scope.set_extra("user_agent", user_agent.into());
1295 }
1296 });
1297
1298 let result =
1299 self.process_envelope(project_id, message)
1300 .await
1301 .map(|Output { main, metrics }| {
1302 if let Some(metrics) = metrics {
1303 metrics.accept(|metrics| {
1304 send_metrics(
1305 metrics,
1306 project_key,
1307 sampling_key,
1308 &self.inner.addrs.aggregator,
1309 );
1310 });
1311 }
1312
1313 let ctx = ctx.to_forward();
1314 main.map(|output| (output, ctx))
1315 });
1316
1317 relay_log::configure_scope(|scope| {
1318 scope.remove_tag("project");
1319 scope.remove_tag("sdk");
1320 scope.remove_tag("user_agent");
1321 });
1322
1323 result
1324 }
1325
1326 async fn handle_process_envelope(&self, cogs: &mut Token, message: ProcessEnvelope) {
1327 let project_key = message.envelope.envelope().meta().public_key();
1328 let wait_time = message.envelope.age();
1329 metric!(timer(RelayTimers::EnvelopeWaitTime) = wait_time);
1330
1331 cogs.cancel();
1334
1335 let scoping = message.envelope.scoping();
1336 for (group, envelope) in ProcessingGroup::split_envelope(
1337 *message.envelope.into_envelope(),
1338 &message.project_info,
1339 ) {
1340 let mut cogs = self
1341 .inner
1342 .cogs
1343 .timed(ResourceId::Relay, AppFeature::from(group));
1344
1345 let mut envelope =
1346 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1347 envelope.scope(scoping);
1348
1349 let global_config = self.inner.global_config.current();
1350
1351 let ctx = processing::Context {
1352 config: &self.inner.config,
1353 global_config: &global_config,
1354 project_info: &message.project_info,
1355 sampling_project_info: message.sampling_project_info.as_deref(),
1356 rate_limits: &message.rate_limits,
1357 };
1358
1359 let message = ProcessEnvelopeGrouped {
1360 group,
1361 envelope,
1362 ctx,
1363 };
1364
1365 let result = metric!(
1366 timer(RelayTimers::EnvelopeProcessingTime),
1367 group = group.variant(),
1368 { self.process(message).await }
1369 );
1370
1371 match result {
1372 Ok(Some((output, ctx))) => self.submit_upstream(&mut cogs, output, ctx),
1373 Ok(None) => {}
1374 Err(error) if error.is_unexpected() => {
1375 relay_log::error!(
1376 tags.project_key = %project_key,
1377 error = &error as &dyn Error,
1378 "error processing envelope"
1379 )
1380 }
1381 Err(error) => {
1382 relay_log::debug!(
1383 tags.project_key = %project_key,
1384 error = &error as &dyn Error,
1385 "error processing envelope"
1386 )
1387 }
1388 }
1389 }
1390 }
1391
1392 fn handle_process_metrics(&self, cogs: &mut Token, message: ProcessMetrics) {
1393 let ProcessMetrics {
1394 data,
1395 project_key,
1396 received_at,
1397 sent_at,
1398 source,
1399 } = message;
1400
1401 let received_timestamp =
1402 UnixTimestamp::from_datetime(received_at).unwrap_or(UnixTimestamp::now());
1403
1404 let mut buckets = data.into_buckets(received_timestamp);
1405 if buckets.is_empty() {
1406 return;
1407 };
1408 cogs.update(relay_metrics::cogs::BySize(&buckets));
1409
1410 let clock_drift_processor =
1411 ClockDriftProcessor::new(sent_at, received_at).at_least(MINIMUM_CLOCK_DRIFT);
1412
1413 buckets.retain_mut(|bucket| {
1414 if let Err(error) = relay_metrics::normalize_bucket(bucket) {
1415 relay_log::debug!(error = &error as &dyn Error, "dropping bucket {bucket:?}");
1416 return false;
1417 }
1418
1419 if !self::metrics::is_valid_namespace(bucket) {
1420 return false;
1421 }
1422
1423 clock_drift_processor.process_timestamp(&mut bucket.timestamp);
1424
1425 if !matches!(source, BucketSource::Internal) {
1426 bucket.metadata = BucketMetadata::new(received_timestamp);
1427 }
1428
1429 true
1430 });
1431
1432 let project = self.inner.project_cache.get(project_key);
1433
1434 let buckets = match project.state() {
1437 ProjectState::Enabled(project_info) => {
1438 let rate_limits = project.rate_limits().current_limits();
1439 self.check_buckets(project_key, project_info, &rate_limits, buckets)
1440 }
1441 _ => buckets,
1442 };
1443
1444 relay_log::trace!("merging metric buckets into the aggregator");
1445 self.inner
1446 .addrs
1447 .aggregator
1448 .send(MergeBuckets::new(project_key, buckets));
1449 }
1450
1451 fn handle_process_batched_metrics(&self, cogs: &mut Token, message: ProcessBatchedMetrics) {
1452 let ProcessBatchedMetrics {
1453 payload,
1454 source,
1455 received_at,
1456 sent_at,
1457 } = message;
1458
1459 #[derive(serde::Deserialize)]
1460 struct Wrapper {
1461 buckets: HashMap<ProjectKey, Vec<Bucket>>,
1462 }
1463
1464 let buckets = match serde_json::from_slice(&payload) {
1465 Ok(Wrapper { buckets }) => buckets,
1466 Err(error) => {
1467 relay_log::debug!(
1468 error = &error as &dyn Error,
1469 "failed to parse batched metrics",
1470 );
1471 metric!(counter(RelayCounters::MetricBucketsParsingFailed) += 1);
1472 return;
1473 }
1474 };
1475
1476 for (project_key, buckets) in buckets {
1477 self.handle_process_metrics(
1478 cogs,
1479 ProcessMetrics {
1480 data: MetricData::Parsed(buckets),
1481 project_key,
1482 source,
1483 received_at,
1484 sent_at,
1485 },
1486 )
1487 }
1488 }
1489
1490 fn submit_upstream(
1494 &self,
1495 cogs: &mut Token,
1496 output: Outputs,
1497 ctx: processing::ForwardContext<'_>,
1498 ) {
1499 let _submit = cogs.start_category("submit");
1500
1501 #[cfg(feature = "processing")]
1502 if ctx.config.processing_enabled()
1503 && let Some(store_forwarder) = &self.inner.addrs.store_forwarder
1504 {
1505 use crate::processing::StoreHandle;
1506
1507 let objectstore = self.inner.addrs.objectstore.as_ref();
1508 let handle = StoreHandle::new(store_forwarder, objectstore, ctx.global_config);
1509
1510 output
1511 .forward_store(handle, ctx)
1512 .unwrap_or_else(|err| err.into_inner());
1513
1514 return;
1515 }
1516
1517 match output.serialize_envelope(ctx) {
1518 Ok(envelope) => {
1519 let envelope = ManagedEnvelope::from(envelope);
1520 self.submit_envelope_upstream(envelope, ctx.project_info.upstream.clone());
1521 }
1522 Err(_) => relay_log::error!("failed to serialize output to an envelope"),
1523 };
1524 }
1525
1526 fn submit_envelope_upstream(
1527 &self,
1528 mut envelope: ManagedEnvelope,
1529 upstream: Option<UpstreamDescriptor>,
1532 ) {
1533 if envelope.envelope_mut().is_empty() {
1534 envelope.accept();
1535 return;
1536 }
1537
1538 if self.inner.config.processing_enabled() {
1544 relay_log::error!(
1545 "attempt to forward envelope to http upstream when processing is enabled"
1546 );
1547 return;
1548 }
1549
1550 envelope.envelope_mut().set_sent_at(Utc::now());
1556
1557 relay_log::trace!("sending envelope to sentry endpoint");
1558 let http_encoding = self.inner.config.http_encoding();
1559 let result = envelope.envelope().to_vec().and_then(|v| {
1560 encode_payload(&v.into(), http_encoding).map_err(EnvelopeError::PayloadIoFailed)
1561 });
1562
1563 match result {
1564 Ok(body) => {
1565 self.inner
1566 .addrs
1567 .upstream_relay
1568 .send(SendRequest(SendEnvelope {
1569 upstream,
1570 envelope,
1571 body,
1572 http_encoding,
1573 project_cache: self.inner.project_cache.clone(),
1574 }));
1575 }
1576 Err(error) => {
1577 relay_log::error!(
1580 error = &error as &dyn Error,
1581 tags.project_key = %envelope.scoping().project_key,
1582 "failed to serialize envelope payload"
1583 );
1584
1585 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
1586 }
1587 }
1588 }
1589
1590 fn handle_submit_client_reports(&self, message: SubmitClientReports) {
1591 let SubmitClientReports {
1592 client_reports,
1593 scoping,
1594 } = message;
1595
1596 let upstream = self.inner.config.upstream();
1597 let dsn = PartialDsn::outbound(&scoping, upstream);
1598
1599 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn));
1600 for client_report in client_reports {
1601 match client_report.serialize() {
1602 Ok(payload) => {
1603 let mut item = Item::new(ItemType::ClientReport);
1604 item.set_payload(ContentType::Json, payload);
1605 envelope.add_item(item);
1606 }
1607 Err(error) => {
1608 relay_log::error!(
1609 error = &error as &dyn std::error::Error,
1610 "failed to serialize client report"
1611 );
1612 }
1613 }
1614 }
1615
1616 let envelope = ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1617 self.submit_envelope_upstream(envelope, None);
1618 }
1619
1620 fn check_buckets(
1621 &self,
1622 project_key: ProjectKey,
1623 project_info: &ProjectInfo,
1624 rate_limits: &RateLimits,
1625 buckets: Vec<Bucket>,
1626 ) -> Vec<Bucket> {
1627 let Some(scoping) = project_info.scoping(project_key) else {
1628 relay_log::error!(
1629 tags.project_key = project_key.as_str(),
1630 "there is no scoping: dropping {} buckets",
1631 buckets.len(),
1632 );
1633 return Vec::new();
1634 };
1635
1636 let mut buckets = self::metrics::apply_project_info(
1637 buckets,
1638 &self.inner.metric_outcomes,
1639 project_info,
1640 scoping,
1641 );
1642
1643 let namespaces: BTreeSet<MetricNamespace> = buckets
1644 .iter()
1645 .filter_map(|bucket| bucket.name.try_namespace())
1646 .collect();
1647
1648 for namespace in namespaces {
1649 let limits = rate_limits.check_with_quotas(
1650 project_info.get_quotas(),
1651 scoping.item(DataCategory::MetricBucket),
1652 );
1653
1654 if limits.is_limited() {
1655 let rejected;
1656 (buckets, rejected) = utils::split_off(buckets, |bucket| {
1657 bucket.name.try_namespace() == Some(namespace)
1658 });
1659
1660 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1661 self.inner.metric_outcomes.track(
1662 scoping,
1663 &rejected,
1664 Outcome::RateLimited(reason_code),
1665 );
1666 }
1667 }
1668
1669 let quotas = project_info.config.quotas.clone();
1670 match MetricsLimiter::create(buckets, quotas, scoping) {
1671 Ok(mut bucket_limiter) => {
1672 bucket_limiter.enforce_limits(rate_limits, &self.inner.metric_outcomes);
1673 bucket_limiter.into_buckets()
1674 }
1675 Err(buckets) => buckets,
1676 }
1677 }
1678
1679 #[cfg(feature = "processing")]
1680 async fn rate_limit_buckets(
1681 &self,
1682 scoping: Scoping,
1683 project_info: &ProjectInfo,
1684 mut buckets: Vec<Bucket>,
1685 ) -> Vec<Bucket> {
1686 let Some(rate_limiter) = &self.inner.rate_limiter else {
1687 return buckets;
1688 };
1689
1690 let global_config = self.inner.global_config.current();
1691 let namespaces = buckets
1692 .iter()
1693 .filter_map(|bucket| bucket.name.try_namespace())
1694 .counts();
1695
1696 let quotas = CombinedQuotas::new(&global_config, project_info.get_quotas());
1697
1698 for (namespace, quantity) in namespaces {
1699 let item_scoping = scoping.metric_bucket(namespace);
1700
1701 let limits = match rate_limiter
1702 .is_rate_limited(quotas, item_scoping, quantity, false)
1703 .await
1704 {
1705 Ok(limits) => limits,
1706 Err(err) => {
1707 relay_log::error!(
1708 error = &err as &dyn std::error::Error,
1709 "failed to check redis rate limits"
1710 );
1711 break;
1712 }
1713 };
1714
1715 if limits.is_limited() {
1716 let rejected;
1717 (buckets, rejected) = utils::split_off(buckets, |bucket| {
1718 bucket.name.try_namespace() == Some(namespace)
1719 });
1720
1721 let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
1722 self.inner.metric_outcomes.track(
1723 scoping,
1724 &rejected,
1725 Outcome::RateLimited(reason_code),
1726 );
1727
1728 self.inner
1729 .project_cache
1730 .get(item_scoping.scoping.project_key)
1731 .rate_limits()
1732 .merge(limits);
1733 }
1734 }
1735
1736 match MetricsLimiter::create(buckets, project_info.config.quotas.clone(), scoping) {
1737 Err(buckets) => buckets,
1738 Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter).await,
1739 }
1740 }
1741
1742 #[cfg(feature = "processing")]
1744 async fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
1745 relay_log::trace!("handle_rate_limit_buckets");
1746
1747 let scoping = *bucket_limiter.scoping();
1748
1749 if let Some(rate_limiter) = self.inner.rate_limiter.as_ref() {
1750 let global_config = self.inner.global_config.current();
1751 let quotas = CombinedQuotas::new(&global_config, bucket_limiter.quotas());
1752
1753 let over_accept_once = true;
1756 let mut rate_limits = RateLimits::new();
1757
1758 let (category, count) = bucket_limiter.count();
1759
1760 let timer = Instant::now();
1761 let mut is_limited = false;
1762
1763 if let Some(count) = count {
1764 match rate_limiter
1765 .is_rate_limited(quotas, scoping.item(category), count, over_accept_once)
1766 .await
1767 {
1768 Ok(limits) => {
1769 is_limited = limits.is_limited();
1770 rate_limits.merge(limits)
1771 }
1772 Err(e) => {
1773 relay_log::error!(error = &e as &dyn Error, "rate limiting error")
1774 }
1775 }
1776 }
1777
1778 relay_statsd::metric!(
1779 timer(RelayTimers::RateLimitBucketsDuration) = timer.elapsed(),
1780 category = category.name(),
1781 limited = if is_limited { "true" } else { "false" },
1782 count = match count {
1783 None => "none",
1784 Some(0) => "0",
1785 Some(1) => "1",
1786 Some(1..=10) => "10",
1787 Some(1..=25) => "25",
1788 Some(1..=50) => "50",
1789 Some(51..=100) => "100",
1790 Some(101..=500) => "500",
1791 _ => "> 500",
1792 },
1793 );
1794
1795 if rate_limits.is_limited() {
1796 let was_enforced =
1797 bucket_limiter.enforce_limits(&rate_limits, &self.inner.metric_outcomes);
1798
1799 if was_enforced {
1800 self.inner
1802 .project_cache
1803 .get(scoping.project_key)
1804 .rate_limits()
1805 .merge(rate_limits);
1806 }
1807 }
1808 }
1809
1810 bucket_limiter.into_buckets()
1811 }
1812
1813 #[cfg(feature = "processing")]
1819 async fn encode_metrics_processing(
1820 &self,
1821 message: FlushBuckets,
1822 store_forwarder: &Addr<Store>,
1823 ) {
1824 use crate::constants::DEFAULT_EVENT_RETENTION;
1825 use crate::services::store::StoreMetrics;
1826
1827 for ProjectBuckets {
1828 buckets,
1829 scoping,
1830 project_info,
1831 ..
1832 } in message.buckets.into_values()
1833 {
1834 let buckets = self
1835 .rate_limit_buckets(scoping, &project_info, buckets)
1836 .await;
1837
1838 if buckets.is_empty() {
1839 continue;
1840 }
1841
1842 let retention = project_info
1843 .config
1844 .event_retention
1845 .unwrap_or(DEFAULT_EVENT_RETENTION);
1846
1847 store_forwarder.send(StoreMetrics {
1850 buckets,
1851 scoping,
1852 retention,
1853 });
1854 }
1855 }
1856
1857 fn encode_metrics_envelope(&self, message: FlushBuckets) {
1867 let FlushBuckets {
1868 partition_key,
1869 buckets,
1870 } = message;
1871
1872 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
1873 let upstream = self.inner.config.upstream();
1874
1875 for ProjectBuckets {
1876 buckets,
1877 scoping,
1878 project_info,
1879 ..
1880 } in buckets.values()
1881 {
1882 let dsn = PartialDsn::outbound(scoping, upstream);
1883
1884 relay_statsd::metric!(
1885 distribution(RelayDistributions::PartitionKeys) = u64::from(partition_key)
1886 );
1887
1888 let mut num_batches = 0;
1889 for batch in BucketsView::from(buckets).by_size(batch_size) {
1890 let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone()));
1891
1892 let mut item = Item::new(ItemType::MetricBuckets);
1893 item.set_source_quantities(crate::metrics::extract_quantities(batch));
1894 item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap());
1895 envelope.add_item(item);
1896
1897 let mut envelope =
1898 ManagedEnvelope::new(envelope, self.inner.addrs.outcome_aggregator.clone());
1899 envelope
1900 .set_partition_key(Some(partition_key))
1901 .scope(*scoping);
1902
1903 relay_statsd::metric!(
1904 distribution(RelayDistributions::BucketsPerBatch) = batch.len() as u64
1905 );
1906
1907 self.submit_envelope_upstream(envelope, project_info.upstream.clone());
1908 num_batches += 1;
1909 }
1910
1911 relay_statsd::metric!(
1912 distribution(RelayDistributions::BatchesPerPartition) = num_batches
1913 );
1914 }
1915 }
1916
1917 fn send_global_partition(
1919 &self,
1920 upstream: Option<UpstreamDescriptor>,
1921 partition_key: u32,
1922 partition: &mut Partition<'_>,
1923 ) {
1924 if partition.is_empty() {
1925 return;
1926 }
1927
1928 let (unencoded, project_info) = partition.take();
1929 let http_encoding = self.inner.config.http_encoding();
1930 let encoded = match encode_payload(&unencoded, http_encoding) {
1931 Ok(payload) => payload,
1932 Err(error) => {
1933 let error = &error as &dyn std::error::Error;
1934 relay_log::error!(error, "failed to encode metrics payload");
1935 return;
1936 }
1937 };
1938
1939 let request = SendMetricsRequest {
1940 upstream,
1941 partition_key: partition_key.to_string(),
1942 unencoded,
1943 encoded,
1944 project_info,
1945 http_encoding,
1946 metric_outcomes: self.inner.metric_outcomes.clone(),
1947 };
1948
1949 self.inner.addrs.upstream_relay.send(SendRequest(request));
1950 }
1951
1952 fn encode_metrics_global(&self, message: FlushBuckets) {
1963 let FlushBuckets {
1964 partition_key,
1965 buckets,
1966 } = message;
1967
1968 let batch_size = self.inner.config.metrics_max_batch_size_bytes();
1969 let mut partitions = BTreeMap::new();
1970 let mut partition_splits = 0;
1971
1972 for ProjectBuckets {
1973 buckets,
1974 scoping,
1975 project_info,
1976 ..
1977 } in buckets.values()
1978 {
1979 let partition = match partitions.get_mut(&project_info.upstream) {
1980 Some(partition) => partition,
1981 None => partitions
1982 .entry(project_info.upstream.clone())
1983 .or_insert_with(|| Partition::new(batch_size)),
1984 };
1985
1986 for bucket in buckets {
1987 let mut remaining = Some(BucketView::new(bucket));
1988
1989 while let Some(bucket) = remaining.take() {
1990 if let Some(next) = partition.insert(bucket, *scoping) {
1991 self.send_global_partition(
1995 project_info.upstream.clone(),
1996 partition_key,
1997 partition,
1998 );
1999 remaining = Some(next);
2000 partition_splits += 1;
2001 }
2002 }
2003 }
2004 }
2005
2006 if partition_splits > 0 {
2007 metric!(distribution(RelayDistributions::PartitionSplits) = partition_splits);
2008 }
2009
2010 for (upstream, mut partition) in partitions {
2011 self.send_global_partition(upstream, partition_key, &mut partition);
2012 }
2013 }
2014
2015 async fn handle_flush_buckets(&self, mut message: FlushBuckets) {
2016 for (project_key, pb) in message.buckets.iter_mut() {
2017 let buckets = std::mem::take(&mut pb.buckets);
2018 pb.buckets =
2019 self.check_buckets(*project_key, &pb.project_info, &pb.rate_limits, buckets);
2020 }
2021
2022 #[cfg(feature = "processing")]
2023 if self.inner.config.processing_enabled()
2024 && let Some(ref store_forwarder) = self.inner.addrs.store_forwarder
2025 {
2026 return self
2027 .encode_metrics_processing(message, store_forwarder)
2028 .await;
2029 }
2030
2031 if self.inner.config.http_global_metrics() {
2032 self.encode_metrics_global(message)
2033 } else {
2034 self.encode_metrics_envelope(message)
2035 }
2036 }
2037
2038 #[cfg(all(test, feature = "processing"))]
2039 fn redis_rate_limiter_enabled(&self) -> bool {
2040 self.inner.rate_limiter.is_some()
2041 }
2042
2043 async fn handle_message(&self, message: EnvelopeProcessor) {
2044 let ty = message.variant();
2045 let feature_weights = self.feature_weights(&message);
2046
2047 metric!(timer(RelayTimers::ProcessMessageDuration), message = ty, {
2048 let mut cogs = self.inner.cogs.timed(ResourceId::Relay, feature_weights);
2049
2050 match message {
2051 EnvelopeProcessor::ProcessEnvelope(m) => {
2052 self.handle_process_envelope(&mut cogs, *m).await
2053 }
2054 EnvelopeProcessor::ProcessProjectMetrics(m) => {
2055 self.handle_process_metrics(&mut cogs, *m)
2056 }
2057 EnvelopeProcessor::ProcessBatchedMetrics(m) => {
2058 self.handle_process_batched_metrics(&mut cogs, *m)
2059 }
2060 EnvelopeProcessor::FlushBuckets(m) => self.handle_flush_buckets(*m).await,
2061 EnvelopeProcessor::SubmitClientReports(m) => self.handle_submit_client_reports(*m),
2062 }
2063 });
2064 }
2065
2066 fn feature_weights(&self, message: &EnvelopeProcessor) -> FeatureWeights {
2067 match message {
2068 EnvelopeProcessor::ProcessEnvelope(_) => AppFeature::Unattributed.into(),
2070 EnvelopeProcessor::ProcessProjectMetrics(_) => AppFeature::Unattributed.into(),
2071 EnvelopeProcessor::ProcessBatchedMetrics(_) => AppFeature::Unattributed.into(),
2072 EnvelopeProcessor::FlushBuckets(v) => v
2073 .buckets
2074 .values()
2075 .map(|s| {
2076 if self.inner.config.processing_enabled() {
2077 relay_metrics::cogs::ByCount(&s.buckets).into()
2080 } else {
2081 relay_metrics::cogs::BySize(&s.buckets).into()
2082 }
2083 })
2084 .fold(FeatureWeights::none(), FeatureWeights::merge),
2085 EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
2086 }
2087 }
2088}
2089
2090impl Service for EnvelopeProcessorService {
2091 type Interface = EnvelopeProcessor;
2092
2093 async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
2094 while let Some(message) = rx.recv().await {
2095 let service = self.clone();
2096 self.inner
2097 .pool
2098 .spawn_async(
2099 async move {
2100 service.handle_message(message).await;
2101 }
2102 .boxed(),
2103 )
2104 .await;
2105 }
2106 }
2107}
2108
2109pub fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result<Bytes, std::io::Error> {
2110 let envelope_body: Vec<u8> = match http_encoding {
2111 HttpEncoding::Identity => return Ok(body.clone()),
2112 HttpEncoding::Deflate => {
2113 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
2114 encoder.write_all(body.as_ref())?;
2115 encoder.finish()?
2116 }
2117 HttpEncoding::Gzip => {
2118 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2119 encoder.write_all(body.as_ref())?;
2120 encoder.finish()?
2121 }
2122 HttpEncoding::Br => {
2123 let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22);
2125 encoder.write_all(body.as_ref())?;
2126 encoder.into_inner()
2127 }
2128 HttpEncoding::Zstd => {
2129 let mut encoder = ZstdEncoder::new(Vec::new(), 1)?;
2132 encoder.write_all(body.as_ref())?;
2133 encoder.finish()?
2134 }
2135 };
2136
2137 Ok(envelope_body.into())
2138}
2139
2140#[derive(Debug)]
2142pub struct SendEnvelope {
2143 pub upstream: Option<UpstreamDescriptor>,
2144 pub envelope: ManagedEnvelope,
2145 pub body: Bytes,
2146 pub http_encoding: HttpEncoding,
2147 pub project_cache: ProjectCacheHandle,
2148}
2149
2150impl UpstreamRequest for SendEnvelope {
2151 fn upstream(&self) -> Option<&UpstreamDescriptor> {
2152 self.upstream.as_ref()
2153 }
2154
2155 fn method(&self) -> reqwest::Method {
2156 reqwest::Method::POST
2157 }
2158
2159 fn path(&self) -> Cow<'_, str> {
2160 format!("/api/{}/envelope/", self.envelope.scoping().project_id).into()
2161 }
2162
2163 fn route(&self) -> &'static str {
2164 "envelope"
2165 }
2166
2167 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2168 let envelope_body = self.body.clone();
2169 metric!(
2170 distribution(RelayDistributions::UpstreamEnvelopeBodySize) = envelope_body.len() as u64
2171 );
2172
2173 let meta = &self.envelope.meta();
2174 let shard = self.envelope.partition_key().map(|p| p.to_string());
2175 builder
2176 .content_encoding(self.http_encoding)
2177 .header_opt("Origin", meta.origin().map(|url| url.as_str()))
2178 .header_opt("User-Agent", meta.user_agent())
2179 .header("X-Sentry-Auth", meta.auth_header())
2180 .header("X-Forwarded-For", meta.forwarded_for())
2181 .header("Content-Type", envelope::CONTENT_TYPE)
2182 .header_opt("X-Sentry-Relay-Shard", shard)
2183 .body(envelope_body);
2184
2185 Ok(())
2186 }
2187
2188 fn sign(&mut self) -> Option<Sign> {
2189 Some(Sign::Optional(SignatureType::RequestSign))
2190 }
2191
2192 fn respond(
2193 self: Box<Self>,
2194 result: Result<http::Response, UpstreamRequestError>,
2195 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2196 Box::pin(async move {
2197 let result = match result {
2198 Ok(mut response) => response.consume().await.map_err(UpstreamRequestError::Http),
2199 Err(error) => Err(error),
2200 };
2201
2202 match result {
2203 Ok(()) => self.envelope.accept(),
2204 Err(error) if error.is_received() => {
2205 let scoping = self.envelope.scoping();
2206 self.envelope.accept();
2207
2208 if let UpstreamRequestError::RateLimited(limits) = error {
2209 self.project_cache
2210 .get(scoping.project_key)
2211 .rate_limits()
2212 .merge(limits.scope(&scoping));
2213 }
2214 }
2215 Err(error) => {
2216 let mut envelope = self.envelope;
2219 envelope.reject(Outcome::Invalid(DiscardReason::Internal));
2220 relay_log::error!(
2221 error = &error as &dyn Error,
2222 tags.project_key = %envelope.scoping().project_key,
2223 "error sending envelope"
2224 );
2225 }
2226 }
2227 })
2228 }
2229}
2230
2231#[derive(Debug)]
2238struct Partition<'a> {
2239 max_size: usize,
2240 remaining: usize,
2241 views: HashMap<ProjectKey, Vec<BucketView<'a>>>,
2242 project_info: HashMap<ProjectKey, Scoping>,
2243}
2244
2245impl<'a> Partition<'a> {
2246 pub fn new(size: usize) -> Self {
2248 Self {
2249 max_size: size,
2250 remaining: size,
2251 views: HashMap::new(),
2252 project_info: HashMap::new(),
2253 }
2254 }
2255
2256 pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option<BucketView<'a>> {
2267 let (current, next) = bucket.split(self.remaining, Some(self.max_size));
2268
2269 if let Some(current) = current {
2270 self.remaining = self.remaining.saturating_sub(current.estimated_size());
2271 self.views
2272 .entry(scoping.project_key)
2273 .or_default()
2274 .push(current);
2275
2276 self.project_info
2277 .entry(scoping.project_key)
2278 .or_insert(scoping);
2279 }
2280
2281 next
2282 }
2283
2284 fn is_empty(&self) -> bool {
2286 self.views.is_empty()
2287 }
2288
2289 fn take(&mut self) -> (Bytes, HashMap<ProjectKey, Scoping>) {
2293 #[derive(serde::Serialize)]
2294 struct Wrapper<'a> {
2295 buckets: &'a HashMap<ProjectKey, Vec<BucketView<'a>>>,
2296 }
2297
2298 let buckets = &self.views;
2299 let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into();
2300
2301 let scopings = std::mem::take(&mut self.project_info);
2302
2303 self.views.clear();
2304 self.remaining = self.max_size;
2305
2306 (payload, scopings)
2307 }
2308}
2309
2310#[derive(Debug)]
2314struct SendMetricsRequest {
2315 upstream: Option<UpstreamDescriptor>,
2317 partition_key: String,
2319 unencoded: Bytes,
2321 encoded: Bytes,
2323 project_info: HashMap<ProjectKey, Scoping>,
2327 http_encoding: HttpEncoding,
2329 metric_outcomes: MetricOutcomes,
2331}
2332
2333impl SendMetricsRequest {
2334 fn create_error_outcomes(self) {
2335 #[derive(serde::Deserialize)]
2336 struct Wrapper {
2337 buckets: HashMap<ProjectKey, Vec<MinimalTrackableBucket>>,
2338 }
2339
2340 let buckets = match serde_json::from_slice(&self.unencoded) {
2341 Ok(Wrapper { buckets }) => buckets,
2342 Err(err) => {
2343 relay_log::error!(
2344 error = &err as &dyn std::error::Error,
2345 "failed to parse buckets from failed transmission"
2346 );
2347 return;
2348 }
2349 };
2350
2351 for (key, buckets) in buckets {
2352 let Some(&scoping) = self.project_info.get(&key) else {
2353 relay_log::error!("missing scoping for project key");
2354 continue;
2355 };
2356
2357 self.metric_outcomes.track(
2358 scoping,
2359 &buckets,
2360 Outcome::Invalid(DiscardReason::Internal),
2361 );
2362 }
2363 }
2364}
2365
2366impl UpstreamRequest for SendMetricsRequest {
2367 fn upstream(&self) -> Option<&UpstreamDescriptor> {
2368 self.upstream.as_ref()
2369 }
2370
2371 fn set_relay_id(&self) -> bool {
2372 true
2373 }
2374
2375 fn sign(&mut self) -> Option<Sign> {
2376 Some(Sign::Required(SignatureType::Body(self.unencoded.clone())))
2377 }
2378
2379 fn method(&self) -> reqwest::Method {
2380 reqwest::Method::POST
2381 }
2382
2383 fn path(&self) -> Cow<'_, str> {
2384 "/api/0/relays/metrics/".into()
2385 }
2386
2387 fn route(&self) -> &'static str {
2388 "global_metrics"
2389 }
2390
2391 fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> {
2392 metric!(
2393 distribution(RelayDistributions::UpstreamMetricsBodySize) = self.encoded.len() as u64
2394 );
2395
2396 builder
2397 .content_encoding(self.http_encoding)
2398 .header("X-Sentry-Relay-Shard", self.partition_key.as_bytes())
2399 .header(header::CONTENT_TYPE, b"application/json")
2400 .body(self.encoded.clone());
2401
2402 Ok(())
2403 }
2404
2405 fn respond(
2406 self: Box<Self>,
2407 result: Result<http::Response, UpstreamRequestError>,
2408 ) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
2409 Box::pin(async {
2410 match result {
2411 Ok(mut response) => {
2412 response.consume().await.ok();
2413 }
2414 Err(error) => {
2415 relay_log::error!(error = &error as &dyn Error, "Failed to send metrics batch");
2416
2417 if error.is_received() {
2420 return;
2421 }
2422
2423 self.create_error_outcomes()
2424 }
2425 }
2426 })
2427 }
2428}
2429
2430#[derive(Copy, Clone, Debug)]
2432#[cfg(feature = "processing")]
2433struct CombinedQuotas<'a> {
2434 global_quotas: &'a [Quota],
2435 project_quotas: &'a [Quota],
2436}
2437
2438#[cfg(feature = "processing")]
2439impl<'a> CombinedQuotas<'a> {
2440 pub fn new(global_config: &'a GlobalConfig, project_quotas: &'a [Quota]) -> Self {
2442 Self {
2443 global_quotas: &global_config.quotas,
2444 project_quotas,
2445 }
2446 }
2447}
2448
2449#[cfg(feature = "processing")]
2450impl<'a> IntoIterator for CombinedQuotas<'a> {
2451 type Item = &'a Quota;
2452 type IntoIter = std::iter::Chain<std::slice::Iter<'a, Quota>, std::slice::Iter<'a, Quota>>;
2453
2454 fn into_iter(self) -> Self::IntoIter {
2455 self.global_quotas.iter().chain(self.project_quotas.iter())
2456 }
2457}
2458
2459#[cfg(test)]
2460mod tests {
2461 use insta::assert_debug_snapshot;
2462 use relay_common::glob2::LazyGlob;
2463 use relay_dynamic_config::ProjectConfig;
2464 use relay_event_normalization::{
2465 NormalizationConfig, RedactionRule, TransactionNameConfig, TransactionNameRule,
2466 };
2467 use relay_event_schema::protocol::{Event, TransactionSource};
2468 use relay_pii::DataScrubbingConfig;
2469 use relay_protocol::Annotated;
2470 use similar_asserts::assert_eq;
2471
2472 use crate::testutils::{create_test_processor, create_test_processor_with_addrs};
2473
2474 #[cfg(feature = "processing")]
2475 use {
2476 relay_metrics::BucketValue,
2477 relay_quotas::{QuotaScope, ReasonCode},
2478 relay_test::mock_service,
2479 };
2480
2481 use super::*;
2482
2483 #[cfg(feature = "processing")]
2484 fn mock_quota(id: &str) -> Quota {
2485 Quota {
2486 id: Some(id.into()),
2487 categories: [DataCategory::MetricBucket].into(),
2488 scope: QuotaScope::Organization,
2489 scope_id: None,
2490 limit: Some(0),
2491 window: None,
2492 reason_code: None,
2493 namespace: None,
2494 }
2495 }
2496
2497 #[cfg(feature = "processing")]
2498 #[test]
2499 fn test_dynamic_quotas() {
2500 let global_config = relay_dynamic_config::GlobalConfig {
2501 quotas: vec![mock_quota("foo"), mock_quota("bar")],
2502 ..Default::default()
2503 };
2504
2505 let project_quotas = vec![mock_quota("baz"), mock_quota("qux")];
2506
2507 let dynamic_quotas = CombinedQuotas::new(&global_config, &project_quotas);
2508
2509 let quota_ids = dynamic_quotas.into_iter().filter_map(|q| q.id.as_deref());
2510 assert!(quota_ids.eq(["foo", "bar", "baz", "qux"]));
2511 }
2512
2513 #[cfg(feature = "processing")]
2516 #[tokio::test]
2517 async fn test_ratelimit_per_batch() {
2518 use relay_base_schema::organization::OrganizationId;
2519 use relay_protocol::FiniteF64;
2520
2521 let rate_limited_org = Scoping {
2522 organization_id: OrganizationId::new(1),
2523 project_id: ProjectId::new(21),
2524 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
2525 key_id: Some(17),
2526 };
2527
2528 let not_rate_limited_org = Scoping {
2529 organization_id: OrganizationId::new(2),
2530 project_id: ProjectId::new(21),
2531 project_key: ProjectKey::parse("11111111111111111111111111111111").unwrap(),
2532 key_id: Some(17),
2533 };
2534
2535 let message = {
2536 let project_info = {
2537 let quota = Quota {
2538 id: Some("testing".into()),
2539 categories: [DataCategory::MetricBucket].into(),
2540 scope: relay_quotas::QuotaScope::Organization,
2541 scope_id: Some(rate_limited_org.organization_id.to_string().into()),
2542 limit: Some(0),
2543 window: None,
2544 reason_code: Some(ReasonCode::new("test")),
2545 namespace: None,
2546 };
2547
2548 let mut config = ProjectConfig::default();
2549 config.quotas.push(quota);
2550
2551 Arc::new(ProjectInfo {
2552 config,
2553 ..Default::default()
2554 })
2555 };
2556
2557 let project_metrics = |scoping| ProjectBuckets {
2558 buckets: vec![Bucket {
2559 name: "d:spans/bar".into(),
2560 value: BucketValue::Counter(FiniteF64::new(1.0).unwrap()),
2561 timestamp: UnixTimestamp::now(),
2562 tags: Default::default(),
2563 width: 10,
2564 metadata: BucketMetadata::default(),
2565 }],
2566 rate_limits: Default::default(),
2567 project_info: project_info.clone(),
2568 scoping,
2569 };
2570
2571 let buckets = hashbrown::HashMap::from([
2572 (
2573 rate_limited_org.project_key,
2574 project_metrics(rate_limited_org),
2575 ),
2576 (
2577 not_rate_limited_org.project_key,
2578 project_metrics(not_rate_limited_org),
2579 ),
2580 ]);
2581
2582 FlushBuckets {
2583 partition_key: 0,
2584 buckets,
2585 }
2586 };
2587
2588 assert_eq!(message.buckets.keys().count(), 2);
2590
2591 let config = {
2592 let config_json = serde_json::json!({
2593 "processing": {
2594 "enabled": true,
2595 "kafka_config": [],
2596 "redis": {
2597 "server": std::env::var("RELAY_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_owned()),
2598 }
2599 }
2600 });
2601 Config::from_json_value(config_json).unwrap()
2602 };
2603
2604 let (store, handle) = {
2605 let f = |org_ids: &mut Vec<OrganizationId>, msg: Store| {
2606 let org_id = match msg {
2607 Store::Metrics(x) => x.scoping.organization_id,
2608 _ => panic!("received envelope when expecting only metrics"),
2609 };
2610 org_ids.push(org_id);
2611 };
2612
2613 mock_service("store_forwarder", vec![], f)
2614 };
2615
2616 let processor = create_test_processor(config).await;
2617 assert!(processor.redis_rate_limiter_enabled());
2618
2619 processor.encode_metrics_processing(message, &store).await;
2620
2621 drop(store);
2622 let orgs_not_ratelimited = handle.await.unwrap();
2623
2624 assert_eq!(
2625 orgs_not_ratelimited,
2626 vec![not_rate_limited_org.organization_id]
2627 );
2628 }
2629
2630 #[tokio::test]
2631 async fn test_browser_version_extraction_with_pii_like_data() {
2632 let processor = create_test_processor(Default::default()).await;
2633 let outcome_aggregator = Addr::dummy();
2634 let event_id = EventId::new();
2635
2636 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2637 .parse()
2638 .unwrap();
2639
2640 let request_meta = RequestMeta::new(dsn);
2641 let mut envelope = Envelope::from_request(Some(event_id), request_meta);
2642
2643 envelope.add_item({
2644 let mut item = Item::new(ItemType::Event);
2645 item.set_payload(
2646 ContentType::Json,
2647 r#"
2648 {
2649 "request": {
2650 "headers": [
2651 ["User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]
2652 ]
2653 }
2654 }
2655 "#,
2656 );
2657 item
2658 });
2659
2660 let mut datascrubbing_settings = DataScrubbingConfig::default();
2661 datascrubbing_settings.scrub_data = true;
2663 datascrubbing_settings.scrub_defaults = true;
2664 datascrubbing_settings.scrub_ip_addresses = true;
2665
2666 let pii_config = serde_json::from_str(r#"{"applications": {"**": ["@ip:mask"]}}"#).unwrap();
2668
2669 let config = ProjectConfig {
2670 datascrubbing_settings,
2671 pii_config: Some(pii_config),
2672 ..Default::default()
2673 };
2674
2675 let project_info = ProjectInfo {
2676 config,
2677 ..Default::default()
2678 };
2679
2680 let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default());
2681 assert_eq!(envelopes.len(), 1);
2682
2683 let (group, envelope) = envelopes.pop().unwrap();
2684 let envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2685
2686 let message = ProcessEnvelopeGrouped {
2687 group,
2688 envelope,
2689 ctx: processing::Context {
2690 project_info: &project_info,
2691 ..processing::Context::for_test()
2692 },
2693 };
2694
2695 let Ok(Some((output, ctx))) = processor.process(message).await else {
2696 panic!();
2697 };
2698 let new_envelope = output.serialize_envelope(ctx).unwrap().accept(|f| f);
2699
2700 let event_item = new_envelope.items().last().unwrap();
2701 let annotated_event: Annotated<Event> =
2702 Annotated::from_json_bytes(&event_item.payload()).unwrap();
2703 let event = annotated_event.into_value().unwrap();
2704 let headers = event
2705 .request
2706 .into_value()
2707 .unwrap()
2708 .headers
2709 .into_value()
2710 .unwrap();
2711
2712 assert_eq!(
2714 Some(
2715 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/********* Safari/537.36"
2716 ),
2717 headers.get_header("User-Agent")
2718 );
2719 let contexts = event.contexts.into_value().unwrap();
2721 let browser = contexts.0.get("browser").unwrap();
2722 assert_eq!(
2723 r#"{"browser":"Chrome 103.0.0","name":"Chrome","version":"103.0.0","type":"browser"}"#,
2724 browser.to_json().unwrap()
2725 );
2726 }
2727
2728 #[tokio::test]
2729 #[cfg(feature = "processing")]
2730 async fn test_materialize_dsc() {
2731 use crate::services::projects::project::PublicKeyConfig;
2732
2733 let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
2734 .parse()
2735 .unwrap();
2736 let request_meta = RequestMeta::new(dsn);
2737 let mut envelope = Envelope::from_request(None, request_meta);
2738
2739 let dsc = r#"{
2740 "trace_id": "00000000-0000-0000-0000-000000000000",
2741 "public_key": "e12d836b15bb49d7bbf99e64295d995b",
2742 "sample_rate": "0.2"
2743 }"#;
2744 envelope.set_dsc(serde_json::from_str(dsc).unwrap());
2745
2746 let mut item = Item::new(ItemType::Event);
2747 item.set_payload(ContentType::Json, r#"{}"#);
2748 envelope.add_item(item);
2749
2750 let outcome_aggregator = Addr::dummy();
2751 let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator);
2752
2753 let mut project_info = ProjectInfo::default();
2754 project_info.public_keys.push(PublicKeyConfig {
2755 public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
2756 numeric_id: Some(1),
2757 });
2758
2759 let config = serde_json::json!({
2760 "processing": {
2761 "enabled": true,
2762 "kafka_config": [],
2763 }
2764 });
2765
2766 let message = ProcessEnvelopeGrouped {
2767 group: ProcessingGroup::Error,
2768 envelope: managed_envelope,
2769 ctx: processing::Context {
2770 config: &Config::from_json_value(config.clone()).unwrap(),
2771 project_info: &project_info,
2772 sampling_project_info: Some(&project_info),
2773 ..processing::Context::for_test()
2774 },
2775 };
2776
2777 let processor = create_test_processor(Config::from_json_value(config).unwrap()).await;
2778 let Ok(Some((output, ctx))) = processor.process(message).await else {
2779 panic!();
2780 };
2781 let envelope = output.serialize_envelope(ctx).unwrap();
2782 let event = envelope
2783 .get_item_by(|item| item.ty() == &ItemType::Event)
2784 .unwrap();
2785
2786 let event = Annotated::<Event>::from_json_bytes(&event.payload()).unwrap();
2787 insta::assert_debug_snapshot!(event.value().unwrap()._dsc, @r###"
2788 Object(
2789 {
2790 "environment": ~,
2791 "public_key": String(
2792 "e12d836b15bb49d7bbf99e64295d995b",
2793 ),
2794 "release": ~,
2795 "replay_id": ~,
2796 "sample_rate": String(
2797 "0.2",
2798 ),
2799 "trace_id": String(
2800 "00000000000000000000000000000000",
2801 ),
2802 "transaction": ~,
2803 },
2804 )
2805 "###);
2806 }
2807
2808 fn capture_test_event(transaction_name: &str, source: TransactionSource) -> Vec<String> {
2809 let mut event = Annotated::<Event>::from_json(
2810 r#"
2811 {
2812 "type": "transaction",
2813 "transaction": "/foo/",
2814 "timestamp": 946684810.0,
2815 "start_timestamp": 946684800.0,
2816 "contexts": {
2817 "trace": {
2818 "trace_id": "4c79f60c11214eb38604f4ae0781bfb2",
2819 "span_id": "fa90fdead5f74053",
2820 "op": "http.server",
2821 "type": "trace"
2822 }
2823 },
2824 "transaction_info": {
2825 "source": "url"
2826 }
2827 }
2828 "#,
2829 )
2830 .unwrap();
2831 let e = event.value_mut().as_mut().unwrap();
2832 e.transaction.set_value(Some(transaction_name.into()));
2833
2834 e.transaction_info
2835 .value_mut()
2836 .as_mut()
2837 .unwrap()
2838 .source
2839 .set_value(Some(source));
2840
2841 relay_statsd::with_capturing_test_client(|| {
2842 utils::log_transaction_name_metrics(&mut event, |event| {
2843 let config = NormalizationConfig {
2844 transaction_name_config: TransactionNameConfig {
2845 rules: &[TransactionNameRule {
2846 pattern: LazyGlob::new("/foo/*/**".to_owned()),
2847 expiry: DateTime::<Utc>::MAX_UTC,
2848 redaction: RedactionRule::Replace {
2849 substitution: "*".to_owned(),
2850 },
2851 }],
2852 },
2853 ..Default::default()
2854 };
2855 relay_event_normalization::normalize_event(event, &config)
2856 });
2857 })
2858 }
2859
2860 #[test]
2861 fn test_log_transaction_metrics_none() {
2862 let captures = capture_test_event("/nothing", TransactionSource::Url);
2863 insta::assert_debug_snapshot!(captures, @r###"
2864 [
2865 "event.transaction_name_changes:1|c|#source_in:url,changes:none,source_out:sanitized,is_404:false",
2866 ]
2867 "###);
2868 }
2869
2870 #[test]
2871 fn test_log_transaction_metrics_rule() {
2872 let captures = capture_test_event("/foo/john/denver", TransactionSource::Url);
2873 insta::assert_debug_snapshot!(captures, @r###"
2874 [
2875 "event.transaction_name_changes:1|c|#source_in:url,changes:rule,source_out:sanitized,is_404:false",
2876 ]
2877 "###);
2878 }
2879
2880 #[test]
2881 fn test_log_transaction_metrics_pattern() {
2882 let captures = capture_test_event("/something/12345", TransactionSource::Url);
2883 insta::assert_debug_snapshot!(captures, @r###"
2884 [
2885 "event.transaction_name_changes:1|c|#source_in:url,changes:pattern,source_out:sanitized,is_404:false",
2886 ]
2887 "###);
2888 }
2889
2890 #[test]
2891 fn test_log_transaction_metrics_both() {
2892 let captures = capture_test_event("/foo/john/12345", TransactionSource::Url);
2893 insta::assert_debug_snapshot!(captures, @r###"
2894 [
2895 "event.transaction_name_changes:1|c|#source_in:url,changes:both,source_out:sanitized,is_404:false",
2896 ]
2897 "###);
2898 }
2899
2900 #[test]
2901 fn test_log_transaction_metrics_no_match() {
2902 let captures = capture_test_event("/foo/john/12345", TransactionSource::Route);
2903 insta::assert_debug_snapshot!(captures, @r###"
2904 [
2905 "event.transaction_name_changes:1|c|#source_in:route,changes:none,source_out:route,is_404:false",
2906 ]
2907 "###);
2908 }
2909
2910 #[tokio::test]
2911 async fn test_process_metrics_bucket_metadata() {
2912 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
2913 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
2914 let received_at = Utc::now();
2915 let config = Config::default();
2916
2917 let (aggregator, mut aggregator_rx) = Addr::custom();
2918 let processor = create_test_processor_with_addrs(
2919 config,
2920 Addrs {
2921 aggregator,
2922 ..Default::default()
2923 },
2924 )
2925 .await;
2926
2927 let mut item = Item::new(ItemType::Statsd);
2928 item.set_payload(ContentType::Text, "spans/foo:3182887624:4267882815|s");
2929 for (source, expected_received_at) in [
2930 (
2931 BucketSource::External,
2932 Some(UnixTimestamp::from_datetime(received_at).unwrap()),
2933 ),
2934 (BucketSource::Internal, None),
2935 ] {
2936 let message = ProcessMetrics {
2937 data: MetricData::Raw(vec![item.clone()]),
2938 project_key,
2939 source,
2940 received_at,
2941 sent_at: Some(Utc::now()),
2942 };
2943 processor.handle_process_metrics(&mut token, message);
2944
2945 let Aggregator::MergeBuckets(merge_buckets) = aggregator_rx.recv().await.unwrap();
2946 let buckets = merge_buckets.buckets;
2947 assert_eq!(buckets.len(), 1);
2948 assert_eq!(buckets[0].metadata.received_at, expected_received_at);
2949 }
2950 }
2951
2952 #[tokio::test]
2953 async fn test_process_batched_metrics() {
2954 let mut token = Cogs::noop().timed(ResourceId::Relay, AppFeature::Unattributed);
2955 let received_at = Utc::now();
2956 let config = Config::default();
2957
2958 let (aggregator, mut aggregator_rx) = Addr::custom();
2959 let processor = create_test_processor_with_addrs(
2960 config,
2961 Addrs {
2962 aggregator,
2963 ..Default::default()
2964 },
2965 )
2966 .await;
2967
2968 let payload = r#"{
2969 "buckets": {
2970 "11111111111111111111111111111111": [
2971 {
2972 "timestamp": 1615889440,
2973 "width": 0,
2974 "name": "d:custom/endpoint.response_time@millisecond",
2975 "type": "d",
2976 "value": [
2977 68.0
2978 ],
2979 "tags": {
2980 "route": "user_index"
2981 }
2982 }
2983 ],
2984 "22222222222222222222222222222222": [
2985 {
2986 "timestamp": 1615889440,
2987 "width": 0,
2988 "name": "d:custom/endpoint.cache_rate@none",
2989 "type": "d",
2990 "value": [
2991 36.0
2992 ]
2993 }
2994 ]
2995 }
2996}
2997"#;
2998 let message = ProcessBatchedMetrics {
2999 payload: Bytes::from(payload),
3000 source: BucketSource::Internal,
3001 received_at,
3002 sent_at: Some(Utc::now()),
3003 };
3004 processor.handle_process_batched_metrics(&mut token, message);
3005
3006 let Aggregator::MergeBuckets(mb1) = aggregator_rx.recv().await.unwrap();
3007 let Aggregator::MergeBuckets(mb2) = aggregator_rx.recv().await.unwrap();
3008
3009 let mut messages = vec![mb1, mb2];
3010 messages.sort_by_key(|mb| mb.project_key);
3011
3012 let actual = messages
3013 .into_iter()
3014 .map(|mb| (mb.project_key, mb.buckets))
3015 .collect::<Vec<_>>();
3016
3017 assert_debug_snapshot!(actual, @r###"
3018 [
3019 (
3020 ProjectKey("11111111111111111111111111111111"),
3021 [
3022 Bucket {
3023 timestamp: UnixTimestamp(1615889440),
3024 width: 0,
3025 name: MetricName(
3026 "d:custom/endpoint.response_time@millisecond",
3027 ),
3028 value: Distribution(
3029 [
3030 68.0,
3031 ],
3032 ),
3033 tags: {
3034 "route": "user_index",
3035 },
3036 metadata: BucketMetadata {
3037 merges: 1,
3038 received_at: None,
3039 extracted_from_indexed: false,
3040 },
3041 },
3042 ],
3043 ),
3044 (
3045 ProjectKey("22222222222222222222222222222222"),
3046 [
3047 Bucket {
3048 timestamp: UnixTimestamp(1615889440),
3049 width: 0,
3050 name: MetricName(
3051 "d:custom/endpoint.cache_rate@none",
3052 ),
3053 value: Distribution(
3054 [
3055 36.0,
3056 ],
3057 ),
3058 tags: {},
3059 metadata: BucketMetadata {
3060 merges: 1,
3061 received_at: None,
3062 extracted_from_indexed: false,
3063 },
3064 },
3065 ],
3066 ),
3067 ]
3068 "###);
3069 }
3070}