1use std::fmt::{Debug, Display};
2use std::marker::PhantomData;
3use std::mem::size_of;
4use std::ops::{Deref, DerefMut};
5use std::time::Duration;
6
7use chrono::{DateTime, Utc};
8use relay_quotas::{DataCategory, Scoping};
9use relay_system::Addr;
10
11use crate::envelope::{Envelope, Item};
12use crate::extractors::RequestMeta;
13use crate::managed::Counted as _;
14use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
15use crate::services::processor::{Processed, ProcessingGroup};
16use crate::statsd::{RelayCounters, RelayTimers};
17use crate::utils::EnvelopeSummary;
18
19#[derive(Clone, Copy, Debug)]
21enum Handling {
22 Success,
27 Failure,
29}
30
31impl Handling {
32 fn from_outcome(outcome: &Outcome) -> Self {
33 if outcome.is_unexpected() {
34 Self::Failure
35 } else {
36 Self::Success
37 }
38 }
39
40 fn as_str(&self) -> &str {
41 match self {
42 Handling::Success => "success",
43 Handling::Failure => "failure",
44 }
45 }
46}
47
48#[derive(Debug, Clone)]
50pub enum ItemAction {
51 Keep,
53 Drop(Outcome),
55 DropSilently,
57}
58
59#[derive(Debug)]
60struct EnvelopeContext {
61 summary: EnvelopeSummary,
62 scoping: Scoping,
63 partition_key: Option<u32>,
64 done: bool,
65}
66
67#[derive(Debug)]
69pub struct InvalidProcessingGroupType(pub ManagedEnvelope, pub ProcessingGroup);
70
71impl Display for InvalidProcessingGroupType {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 f.write_fmt(format_args!(
74 "failed to convert to the processing group {} based on the provided type",
75 self.1.variant()
76 ))
77 }
78}
79
80impl std::error::Error for InvalidProcessingGroupType {}
81
82pub struct TypedEnvelope<G>(ManagedEnvelope, PhantomData<G>);
84
85impl<G> TypedEnvelope<G> {
86 pub fn into_processed(self) -> TypedEnvelope<Processed> {
90 TypedEnvelope::new(self.0)
91 }
92
93 pub fn accept(self) {
99 self.0.accept()
100 }
101
102 pub fn into_inner(self) -> ManagedEnvelope {
104 self.0
105 }
106
107 fn new(managed_envelope: ManagedEnvelope) -> Self {
112 Self(managed_envelope, Default::default())
113 }
114}
115
116impl<G: TryFrom<ProcessingGroup>> TryFrom<(ManagedEnvelope, ProcessingGroup)> for TypedEnvelope<G> {
117 type Error = InvalidProcessingGroupType;
118 fn try_from(
119 (envelope, group): (ManagedEnvelope, ProcessingGroup),
120 ) -> Result<Self, Self::Error> {
121 match <ProcessingGroup as TryInto<G>>::try_into(group) {
122 Ok(_) => Ok(TypedEnvelope::new(envelope)),
123 Err(_) => Err(InvalidProcessingGroupType(envelope, group)),
124 }
125 }
126}
127
128impl<G> Debug for TypedEnvelope<G> {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 f.debug_tuple("TypedEnvelope").field(&self.0).finish()
131 }
132}
133
134impl<G> Deref for TypedEnvelope<G> {
135 type Target = ManagedEnvelope;
136
137 fn deref(&self) -> &Self::Target {
138 &self.0
139 }
140}
141
142impl<G> DerefMut for TypedEnvelope<G> {
143 fn deref_mut(&mut self) -> &mut Self::Target {
144 &mut self.0
145 }
146}
147
148#[derive(Debug)]
164pub struct ManagedEnvelope {
165 envelope: Box<Envelope>,
166 context: EnvelopeContext,
167 outcome_aggregator: Addr<TrackOutcome>,
168}
169
170impl ManagedEnvelope {
171 pub fn new(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
175 let meta = &envelope.meta();
176 let summary = EnvelopeSummary::compute(envelope.as_ref());
177 let scoping = meta.get_partial_scoping().into_scoping();
178
179 Self {
180 envelope,
181 context: EnvelopeContext {
182 summary,
183 scoping,
184 partition_key: None,
185 done: false,
186 },
187 outcome_aggregator,
188 }
189 }
190
191 #[cfg(test)]
193 pub fn untracked(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
194 let mut envelope = Self::new(envelope, outcome_aggregator);
195 envelope.context.done = true;
196 envelope
197 }
198
199 pub fn envelope(&self) -> &Envelope {
201 self.envelope.as_ref()
202 }
203
204 pub fn envelope_mut(&mut self) -> &mut Envelope {
206 self.envelope.as_mut()
207 }
208
209 pub fn into_envelope(mut self) -> Box<Envelope> {
211 self.context.done = true;
212 self.take_envelope()
213 }
214
215 pub fn into_processed(self) -> TypedEnvelope<Processed> {
219 TypedEnvelope::new(self)
220 }
221
222 pub(crate) fn take_envelope(&mut self) -> Box<Envelope> {
226 Box::new(self.envelope.take_items())
227 }
228
229 pub fn update(&mut self) -> &mut Self {
233 self.context.summary = EnvelopeSummary::compute(self.envelope());
234 self
235 }
236
237 pub fn retain_items<F>(&mut self, mut f: F)
242 where
243 F: FnMut(&mut Item) -> ItemAction,
244 {
245 let mut outcomes = Vec::new();
246 self.envelope.retain_items(|item| match f(item) {
247 ItemAction::Keep => true,
248 ItemAction::DropSilently => false,
249 ItemAction::Drop(outcome) => {
250 for (category, quantity) in item.quantities() {
251 outcomes.push((outcome.clone(), category, quantity));
252 }
253
254 false
255 }
256 });
257 for (outcome, category, quantity) in outcomes {
258 self.track_outcome(outcome, category, quantity);
259 }
260 }
262
263 pub fn drop_items_silently(&mut self) {
265 self.envelope.drop_items_silently();
266 }
267
268 pub fn scope(&mut self, scoping: Scoping) -> &mut Self {
270 self.context.scoping = scoping;
271 self
272 }
273
274 pub fn reject_event(&mut self, outcome: Outcome) {
278 if let Some(event_category) = self.event_category() {
279 self.envelope.retain_items(|item| !item.creates_event());
280 if let Some(indexed) = event_category.index_category() {
281 self.track_outcome(outcome.clone(), indexed, 1);
282 }
283 self.track_outcome(outcome, event_category, 1);
284 }
285 }
286
287 pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
292 self.outcome_aggregator.send(TrackOutcome {
293 timestamp: self.received_at(),
294 scoping: self.context.scoping,
295 outcome,
296 event_id: self.envelope.event_id(),
297 remote_addr: self.meta().remote_addr(),
298 category,
299 quantity: quantity as u32,
302 });
303 }
304
305 pub fn accept(mut self) {
311 if !self.context.done {
312 self.finish(RelayCounters::EnvelopeAccepted, Handling::Success);
313 }
314 }
315
316 fn event_category(&self) -> Option<DataCategory> {
318 self.context.summary.event_category
319 }
320
321 pub fn reject(&mut self, outcome: Outcome) {
325 if self.context.done {
326 return;
327 }
328
329 let handling = Handling::from_outcome(&outcome);
332 match handling {
333 Handling::Success => relay_log::debug!("dropped envelope: {outcome}"),
334 Handling::Failure => {
335 let summary = &self.context.summary;
336
337 relay_log::error!(
338 tags.project_key = self.scoping().project_key.to_string(),
339 tags.has_attachments = summary.attachment_quantities.bytes() > 0,
340 tags.has_sessions = summary.session_quantity > 0,
341 tags.has_profiles = summary.profile_quantity.total > 0,
342 tags.has_transactions = summary.secondary_transaction_quantity > 0,
343 tags.has_span_metrics = summary.secondary_span_quantity > 0,
344 tags.has_replays = summary.replay_quantity > 0,
345 tags.has_user_reports = summary.user_report_quantity > 0,
346 tags.has_trace_metrics = summary.trace_metric_quantity > 0,
347 tags.has_checkins = summary.monitor_quantity > 0,
348 tags.event_category = ?summary.event_category,
349 cached_summary = ?summary,
350 recomputed_summary = ?EnvelopeSummary::compute(self.envelope()),
351 "dropped envelope: {outcome}"
352 );
353 }
354 }
355
356 for (category, quantity) in self.context.summary.quantities() {
357 self.track_outcome(outcome.clone(), category, quantity);
358 }
359
360 self.finish(RelayCounters::EnvelopeRejected, handling);
361 }
362
363 pub fn scoping(&self) -> Scoping {
365 self.context.scoping
366 }
367
368 pub fn partition_key(&self) -> Option<u32> {
370 self.context.partition_key
371 }
372
373 pub fn set_partition_key(&mut self, partition_key: Option<u32>) -> &mut Self {
375 self.context.partition_key = partition_key;
376 self
377 }
378
379 pub fn meta(&self) -> &RequestMeta {
381 self.envelope().meta()
382 }
383
384 pub fn estimated_size(&self) -> usize {
393 (f64::ceil(
395 (self.context.summary.payload_size + size_of::<Self>() + size_of::<Envelope>()) as f64
396 / 1000.,
397 ) * 1000.) as usize
398 }
399
400 pub fn received_at(&self) -> DateTime<Utc> {
404 self.envelope.received_at()
405 }
406
407 pub fn age(&self) -> Duration {
411 self.envelope.age()
412 }
413
414 pub(super) fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
417 &self.outcome_aggregator
418 }
419
420 fn finish(&mut self, counter: RelayCounters, handling: Handling) {
422 relay_statsd::metric!(counter(counter) += 1, handling = handling.as_str());
423 relay_statsd::metric!(timer(RelayTimers::EnvelopeTotalTime) = self.age());
424
425 self.context.done = true;
426 }
427}
428
429impl Drop for ManagedEnvelope {
430 fn drop(&mut self) {
431 self.reject(Outcome::Invalid(DiscardReason::Internal));
432 }
433}
434
435impl<G> From<TypedEnvelope<G>> for ManagedEnvelope {
436 fn from(value: TypedEnvelope<G>) -> Self {
437 value.0
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use super::*;
444 use bytes::Bytes;
445
446 #[test]
447 fn span_metrics_are_reported() {
448 let bytes =
449 Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#);
450 let envelope = Envelope::parse_bytes(bytes).unwrap();
451
452 let (outcome_aggregator, mut rx) = Addr::custom();
453 let mut env = ManagedEnvelope::new(envelope, outcome_aggregator);
454 env.context.summary.span_quantity = 123;
455 env.context.summary.secondary_span_quantity = 456;
456
457 env.reject(Outcome::Abuse);
458
459 rx.close();
460
461 let outcome = rx.blocking_recv().unwrap();
462 assert_eq!(outcome.category, DataCategory::Span);
463 assert_eq!(outcome.quantity, 123);
464 assert_eq!(outcome.outcome, Outcome::Abuse);
465
466 let outcome = rx.blocking_recv().unwrap();
467 assert_eq!(outcome.category, DataCategory::SpanIndexed);
468 assert_eq!(outcome.quantity, 123);
469 assert_eq!(outcome.outcome, Outcome::Abuse);
470
471 let outcome = rx.blocking_recv().unwrap();
472 assert_eq!(outcome.category, DataCategory::Span);
473 assert_eq!(outcome.quantity, 456);
474 assert_eq!(outcome.outcome, Outcome::Abuse);
475
476 assert!(rx.blocking_recv().is_none());
477 }
478}