1use std::fmt::{Debug, Display};
2use std::marker::PhantomData;
3use std::mem::size_of;
4use std::ops::{Deref, DerefMut};
5use std::time::Duration;
6
7use chrono::{DateTime, Utc};
8use relay_quotas::{DataCategory, Scoping};
9use relay_system::Addr;
10
11use crate::envelope::{Envelope, Item};
12use crate::extractors::RequestMeta;
13use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
14use crate::services::processor::{Processed, ProcessingGroup};
15use crate::statsd::{RelayCounters, RelayTimers};
16use crate::utils::EnvelopeSummary;
17
18#[derive(Clone, Copy, Debug)]
20enum Handling {
21 Success,
26 Failure,
28}
29
30impl Handling {
31 fn from_outcome(outcome: &Outcome) -> Self {
32 if outcome.is_unexpected() {
33 Self::Failure
34 } else {
35 Self::Success
36 }
37 }
38
39 fn as_str(&self) -> &str {
40 match self {
41 Handling::Success => "success",
42 Handling::Failure => "failure",
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
49pub enum ItemAction {
50 Keep,
52 Drop(Outcome),
54 DropSilently,
56}
57
58#[derive(Debug)]
59struct EnvelopeContext {
60 summary: EnvelopeSummary,
61 scoping: Scoping,
62 partition_key: Option<u32>,
63 done: bool,
64}
65
66#[derive(Debug)]
68pub struct InvalidProcessingGroupType(pub ManagedEnvelope, pub ProcessingGroup);
69
70impl Display for InvalidProcessingGroupType {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.write_fmt(format_args!(
73 "failed to convert to the processing group {} based on the provided type",
74 self.1.variant()
75 ))
76 }
77}
78
79impl std::error::Error for InvalidProcessingGroupType {}
80
81pub struct TypedEnvelope<G>(ManagedEnvelope, PhantomData<G>);
83
84impl<G> TypedEnvelope<G> {
85 pub fn into_processed(self) -> TypedEnvelope<Processed> {
89 TypedEnvelope::new(self.0)
90 }
91
92 pub fn accept(self) {
98 self.0.accept()
99 }
100
101 fn new(managed_envelope: ManagedEnvelope) -> Self {
106 Self(managed_envelope, Default::default())
107 }
108}
109
110impl<G: TryFrom<ProcessingGroup>> TryFrom<(ManagedEnvelope, ProcessingGroup)> for TypedEnvelope<G> {
111 type Error = InvalidProcessingGroupType;
112 fn try_from(
113 (envelope, group): (ManagedEnvelope, ProcessingGroup),
114 ) -> Result<Self, Self::Error> {
115 match <ProcessingGroup as TryInto<G>>::try_into(group) {
116 Ok(_) => Ok(TypedEnvelope::new(envelope)),
117 Err(_) => Err(InvalidProcessingGroupType(envelope, group)),
118 }
119 }
120}
121
122impl<G> Debug for TypedEnvelope<G> {
123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 f.debug_tuple("TypedEnvelope").field(&self.0).finish()
125 }
126}
127
128impl<G> Deref for TypedEnvelope<G> {
129 type Target = ManagedEnvelope;
130
131 fn deref(&self) -> &Self::Target {
132 &self.0
133 }
134}
135
136impl<G> DerefMut for TypedEnvelope<G> {
137 fn deref_mut(&mut self) -> &mut Self::Target {
138 &mut self.0
139 }
140}
141
142#[derive(Debug)]
158pub struct ManagedEnvelope {
159 envelope: Box<Envelope>,
160 context: EnvelopeContext,
161 outcome_aggregator: Addr<TrackOutcome>,
162}
163
164impl ManagedEnvelope {
165 pub fn new(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
169 let meta = &envelope.meta();
170 let summary = EnvelopeSummary::compute(envelope.as_ref());
171 let scoping = meta.get_partial_scoping();
172
173 Self {
174 envelope,
175 context: EnvelopeContext {
176 summary,
177 scoping,
178 partition_key: None,
179 done: false,
180 },
181 outcome_aggregator,
182 }
183 }
184
185 #[cfg(test)]
187 pub fn untracked(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
188 let mut envelope = Self::new(envelope, outcome_aggregator);
189 envelope.context.done = true;
190 envelope
191 }
192
193 pub fn envelope(&self) -> &Envelope {
195 self.envelope.as_ref()
196 }
197
198 pub fn envelope_mut(&mut self) -> &mut Envelope {
200 self.envelope.as_mut()
201 }
202
203 pub fn into_envelope(mut self) -> Box<Envelope> {
205 self.context.done = true;
206 self.take_envelope()
207 }
208
209 pub fn into_processed(self) -> TypedEnvelope<Processed> {
213 TypedEnvelope::new(self)
214 }
215
216 pub(crate) fn take_envelope(&mut self) -> Box<Envelope> {
220 Box::new(self.envelope.take_items())
221 }
222
223 pub fn update(&mut self) -> &mut Self {
227 self.context.summary = EnvelopeSummary::compute(self.envelope());
228 self
229 }
230
231 pub fn retain_items<F>(&mut self, mut f: F)
236 where
237 F: FnMut(&mut Item) -> ItemAction,
238 {
239 let mut outcomes = Vec::new();
240 self.envelope.retain_items(|item| match f(item) {
241 ItemAction::Keep => true,
242 ItemAction::DropSilently => false,
243 ItemAction::Drop(outcome) => {
244 for (category, quantity) in item.quantities() {
245 if let Some(indexed) = category.index_category() {
246 outcomes.push((outcome.clone(), indexed, quantity));
247 };
248 outcomes.push((outcome.clone(), category, quantity));
249 }
250
251 false
252 }
253 });
254 for (outcome, category, quantity) in outcomes {
255 self.track_outcome(outcome, category, quantity);
256 }
257 }
259
260 pub fn drop_items_silently(&mut self) {
262 self.envelope.drop_items_silently();
263 }
264
265 pub fn scope(&mut self, scoping: Scoping) -> &mut Self {
267 self.context.scoping = scoping;
268 self
269 }
270
271 pub fn reject_event(&mut self, outcome: Outcome) {
275 if let Some(event_category) = self.event_category() {
276 self.envelope.retain_items(|item| !item.creates_event());
277 if let Some(indexed) = event_category.index_category() {
278 self.track_outcome(outcome.clone(), indexed, 1);
279 }
280 self.track_outcome(outcome, event_category, 1);
281 }
282 }
283
284 pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
289 self.outcome_aggregator.send(TrackOutcome {
290 timestamp: self.received_at(),
291 scoping: self.context.scoping,
292 outcome,
293 event_id: self.envelope.event_id(),
294 remote_addr: self.meta().remote_addr(),
295 category,
296 quantity: quantity as u32,
299 });
300 }
301
302 pub fn accept(mut self) {
308 if !self.context.done {
309 self.finish(RelayCounters::EnvelopeAccepted, Handling::Success);
310 }
311 }
312
313 fn event_category(&self) -> Option<DataCategory> {
315 self.context.summary.event_category
316 }
317
318 pub fn reject(&mut self, outcome: Outcome) {
322 if self.context.done {
323 return;
324 }
325
326 let handling = Handling::from_outcome(&outcome);
329 match handling {
330 Handling::Success => relay_log::debug!("dropped envelope: {outcome}"),
331 Handling::Failure => {
332 let summary = &self.context.summary;
333
334 relay_log::error!(
335 tags.project_key = self.scoping().project_key.to_string(),
336 tags.has_attachments = summary.attachment_quantity > 0,
337 tags.has_sessions = summary.session_quantity > 0,
338 tags.has_profiles = summary.profile_quantity > 0,
339 tags.has_transactions = summary.secondary_transaction_quantity > 0,
340 tags.has_span_metrics = summary.secondary_span_quantity > 0,
341 tags.has_replays = summary.replay_quantity > 0,
342 tags.has_user_reports = summary.user_report_quantity > 0,
343 tags.has_checkins = summary.monitor_quantity > 0,
344 tags.event_category = ?summary.event_category,
345 cached_summary = ?summary,
346 recomputed_summary = ?EnvelopeSummary::compute(self.envelope()),
347 "dropped envelope: {outcome}"
348 );
349 }
350 }
351
352 if let Some(category) = self.event_category() {
353 if let Some(category) = category.index_category() {
354 self.track_outcome(outcome.clone(), category, 1);
355 }
356 self.track_outcome(outcome.clone(), category, 1);
357 }
358
359 if self.context.summary.attachment_quantity > 0 {
360 self.track_outcome(
361 outcome.clone(),
362 DataCategory::Attachment,
363 self.context.summary.attachment_quantity,
364 );
365 }
366
367 if self.context.summary.monitor_quantity > 0 {
368 self.track_outcome(
369 outcome.clone(),
370 DataCategory::Monitor,
371 self.context.summary.monitor_quantity,
372 );
373 }
374
375 if self.context.summary.profile_quantity > 0 {
376 self.track_outcome(
377 outcome.clone(),
378 DataCategory::Profile,
379 self.context.summary.profile_quantity,
380 );
381 self.track_outcome(
382 outcome.clone(),
383 DataCategory::ProfileIndexed,
384 self.context.summary.profile_quantity,
385 );
386 }
387
388 if self.context.summary.span_quantity > 0 {
389 self.track_outcome(
390 outcome.clone(),
391 DataCategory::Span,
392 self.context.summary.span_quantity,
393 );
394 self.track_outcome(
395 outcome.clone(),
396 DataCategory::SpanIndexed,
397 self.context.summary.span_quantity,
398 );
399 }
400
401 if self.context.summary.log_item_quantity > 0 {
402 self.track_outcome(
403 outcome.clone(),
404 DataCategory::LogItem,
405 self.context.summary.log_item_quantity,
406 );
407 }
408 if self.context.summary.log_byte_quantity > 0 {
409 self.track_outcome(
410 outcome.clone(),
411 DataCategory::LogByte,
412 self.context.summary.log_byte_quantity,
413 );
414 }
415
416 if self.context.summary.secondary_transaction_quantity > 0 {
421 self.track_outcome(
422 outcome.clone(),
423 DataCategory::Transaction,
425 self.context.summary.secondary_transaction_quantity,
426 );
427 }
428
429 if self.context.summary.secondary_span_quantity > 0 {
433 self.track_outcome(
434 outcome.clone(),
435 DataCategory::Span,
437 self.context.summary.secondary_span_quantity,
438 );
439 }
440
441 if self.context.summary.replay_quantity > 0 {
442 self.track_outcome(
443 outcome.clone(),
444 DataCategory::Replay,
445 self.context.summary.replay_quantity,
446 );
447 }
448
449 if self.context.summary.user_report_quantity > 0 {
453 self.track_outcome(
454 outcome.clone(),
455 DataCategory::UserReportV2,
456 self.context.summary.user_report_quantity,
457 );
458 }
459
460 if self.context.summary.profile_chunk_quantity > 0 {
461 self.track_outcome(
462 outcome.clone(),
463 DataCategory::ProfileChunk,
464 self.context.summary.profile_chunk_quantity,
465 );
466 }
467
468 if self.context.summary.profile_chunk_ui_quantity > 0 {
469 self.track_outcome(
470 outcome.clone(),
471 DataCategory::ProfileChunkUi,
472 self.context.summary.profile_chunk_ui_quantity,
473 );
474 }
475
476 if self.context.summary.session_quantity > 0 {
477 self.track_outcome(
478 outcome.clone(),
479 DataCategory::Session,
480 self.context.summary.session_quantity,
481 );
482 }
483
484 self.finish(RelayCounters::EnvelopeRejected, handling);
485 }
486
487 pub fn scoping(&self) -> Scoping {
489 self.context.scoping
490 }
491
492 pub fn partition_key(&self) -> Option<u32> {
494 self.context.partition_key
495 }
496
497 pub fn set_partition_key(&mut self, partition_key: Option<u32>) -> &mut Self {
499 self.context.partition_key = partition_key;
500 self
501 }
502
503 pub fn meta(&self) -> &RequestMeta {
505 self.envelope().meta()
506 }
507
508 pub fn estimated_size(&self) -> usize {
517 (f64::ceil(
519 (self.context.summary.payload_size + size_of::<Self>() + size_of::<Envelope>()) as f64
520 / 1000.,
521 ) * 1000.) as usize
522 }
523
524 pub fn received_at(&self) -> DateTime<Utc> {
528 self.envelope.received_at()
529 }
530
531 pub fn age(&self) -> Duration {
535 self.envelope.age()
536 }
537
538 pub(super) fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
541 &self.outcome_aggregator
542 }
543
544 fn finish(&mut self, counter: RelayCounters, handling: Handling) {
546 relay_statsd::metric!(counter(counter) += 1, handling = handling.as_str());
547 relay_statsd::metric!(timer(RelayTimers::EnvelopeTotalTime) = self.age());
548
549 self.context.done = true;
550 }
551}
552
553impl Drop for ManagedEnvelope {
554 fn drop(&mut self) {
555 self.reject(Outcome::Invalid(DiscardReason::Internal));
556 }
557}
558
559impl<G> From<TypedEnvelope<G>> for ManagedEnvelope {
560 fn from(value: TypedEnvelope<G>) -> Self {
561 value.0
562 }
563}
564
565#[cfg(test)]
566mod tests {
567 use super::*;
568 use bytes::Bytes;
569
570 #[test]
571 fn span_metrics_are_reported() {
572 let bytes =
573 Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#);
574 let envelope = Envelope::parse_bytes(bytes).unwrap();
575
576 let (outcome_aggregator, mut rx) = Addr::custom();
577 let mut env = ManagedEnvelope::new(envelope, outcome_aggregator);
578 env.context.summary.span_quantity = 123;
579 env.context.summary.secondary_span_quantity = 456;
580
581 env.reject(Outcome::Abuse);
582
583 rx.close();
584
585 let outcome = rx.blocking_recv().unwrap();
586 assert_eq!(outcome.category, DataCategory::Span);
587 assert_eq!(outcome.quantity, 123);
588 assert_eq!(outcome.outcome, Outcome::Abuse);
589
590 let outcome = rx.blocking_recv().unwrap();
591 assert_eq!(outcome.category, DataCategory::SpanIndexed);
592 assert_eq!(outcome.quantity, 123);
593 assert_eq!(outcome.outcome, Outcome::Abuse);
594
595 let outcome = rx.blocking_recv().unwrap();
596 assert_eq!(outcome.category, DataCategory::Span);
597 assert_eq!(outcome.quantity, 456);
598 assert_eq!(outcome.outcome, Outcome::Abuse);
599
600 assert!(rx.blocking_recv().is_none());
601 }
602}