1use std::fmt::Debug;
2use std::mem::size_of;
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use relay_quotas::{DataCategory, Scoping};
7use relay_system::Addr;
8
9use crate::envelope::{Envelope, Item};
10use crate::extractors::RequestMeta;
11use crate::managed::Counted as _;
12use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
13use crate::statsd::{RelayCounters, RelayTimers};
14use crate::utils::EnvelopeSummary;
15
16#[derive(Clone, Copy, Debug)]
18enum Handling {
19 Success,
24 Failure,
26}
27
28impl Handling {
29 fn from_outcome(outcome: &Outcome) -> Self {
30 if outcome.is_unexpected() {
31 Self::Failure
32 } else {
33 Self::Success
34 }
35 }
36
37 fn as_str(&self) -> &str {
38 match self {
39 Handling::Success => "success",
40 Handling::Failure => "failure",
41 }
42 }
43}
44
45#[derive(Debug, Clone)]
47pub enum ItemAction {
48 Keep,
50 Drop(Outcome),
52 DropSilently,
54}
55
56#[derive(Debug)]
57struct EnvelopeContext {
58 summary: EnvelopeSummary,
59 scoping: Scoping,
60 partition_key: Option<u32>,
61 done: bool,
62}
63
64#[derive(Debug)]
80pub struct ManagedEnvelope {
81 envelope: Box<Envelope>,
82 context: EnvelopeContext,
83 outcome_aggregator: Addr<TrackOutcome>,
84}
85
86impl ManagedEnvelope {
87 pub fn new(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
91 let meta = &envelope.meta();
92 let summary = EnvelopeSummary::compute(envelope.as_ref());
93 let scoping = meta.get_partial_scoping().into_scoping();
94
95 Self {
96 envelope,
97 context: EnvelopeContext {
98 summary,
99 scoping,
100 partition_key: None,
101 done: false,
102 },
103 outcome_aggregator,
104 }
105 }
106
107 #[cfg(test)]
109 pub fn untracked(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
110 let mut envelope = Self::new(envelope, outcome_aggregator);
111 envelope.context.done = true;
112 envelope
113 }
114
115 pub fn envelope(&self) -> &Envelope {
117 self.envelope.as_ref()
118 }
119
120 pub fn envelope_mut(&mut self) -> &mut Envelope {
122 self.envelope.as_mut()
123 }
124
125 pub fn into_envelope(mut self) -> Box<Envelope> {
127 self.context.done = true;
128 self.take_envelope()
129 }
130
131 pub(crate) fn take_envelope(&mut self) -> Box<Envelope> {
135 Box::new(self.envelope.take_items())
136 }
137
138 pub fn update(&mut self) -> &mut Self {
142 self.context.summary = EnvelopeSummary::compute(self.envelope());
143 self
144 }
145
146 pub fn retain_items<F>(&mut self, mut f: F)
151 where
152 F: FnMut(&mut Item) -> ItemAction,
153 {
154 let mut outcomes = Vec::new();
155 self.envelope.retain_items(|item| match f(item) {
156 ItemAction::Keep => true,
157 ItemAction::DropSilently => false,
158 ItemAction::Drop(outcome) => {
159 for (category, quantity) in item.quantities() {
160 outcomes.push((outcome.clone(), category, quantity));
161 }
162
163 false
164 }
165 });
166 for (outcome, category, quantity) in outcomes {
167 self.track_outcome(outcome, category, quantity);
168 }
169 }
171
172 pub fn drop_items_silently(&mut self) {
174 self.envelope.drop_items_silently();
175 }
176
177 pub fn scope(&mut self, scoping: Scoping) -> &mut Self {
179 self.context.scoping = scoping;
180 self
181 }
182
183 pub fn reject_event(&mut self, outcome: Outcome) {
187 if let Some(event_category) = self.event_category() {
188 self.envelope.retain_items(|item| !item.creates_event());
189 if let Some(indexed) = event_category.index_category() {
190 self.track_outcome(outcome.clone(), indexed, 1);
191 }
192 self.track_outcome(outcome, event_category, 1);
193 }
194 }
195
196 pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
201 self.outcome_aggregator.send(TrackOutcome {
202 timestamp: self.received_at(),
203 scoping: self.context.scoping,
204 outcome,
205 event_id: self.envelope.event_id(),
206 remote_addr: self.meta().remote_addr(),
207 category,
208 quantity: quantity as u32,
211 });
212 }
213
214 pub fn accept(mut self) {
220 if !self.context.done {
221 self.finish(RelayCounters::EnvelopeAccepted, Handling::Success);
222 }
223 }
224
225 fn event_category(&self) -> Option<DataCategory> {
227 self.context.summary.event_category
228 }
229
230 pub fn reject(&mut self, outcome: Outcome) {
234 if self.context.done {
235 return;
236 }
237
238 let handling = Handling::from_outcome(&outcome);
241 match handling {
242 Handling::Success => relay_log::debug!("dropped envelope: {outcome}"),
243 Handling::Failure => {
244 let summary = &self.context.summary;
245
246 relay_log::error!(
247 tags.project_key = self.scoping().project_key.to_string(),
248 tags.has_attachments = summary.attachment_quantities.bytes() > 0,
249 tags.has_sessions = summary.session_quantity > 0,
250 tags.has_profiles = summary.profile_quantity.total > 0,
251 tags.has_transactions = summary.secondary_transaction_quantity > 0,
252 tags.has_span_metrics = summary.secondary_span_quantity > 0,
253 tags.has_replays = summary.replay_quantity > 0,
254 tags.has_user_reports = summary.user_report_quantity > 0,
255 tags.has_trace_metrics = summary.trace_metric_quantity > 0,
256 tags.has_checkins = summary.monitor_quantity > 0,
257 tags.event_category = ?summary.event_category,
258 cached_summary = ?summary,
259 recomputed_summary = ?EnvelopeSummary::compute(self.envelope()),
260 "dropped envelope: {outcome}"
261 );
262 }
263 }
264
265 for (category, quantity) in self.context.summary.quantities() {
266 self.track_outcome(outcome.clone(), category, quantity);
267 }
268
269 self.finish(RelayCounters::EnvelopeRejected, handling);
270 }
271
272 pub fn scoping(&self) -> Scoping {
274 self.context.scoping
275 }
276
277 pub fn partition_key(&self) -> Option<u32> {
279 self.context.partition_key
280 }
281
282 pub fn set_partition_key(&mut self, partition_key: Option<u32>) -> &mut Self {
284 self.context.partition_key = partition_key;
285 self
286 }
287
288 pub fn meta(&self) -> &RequestMeta {
290 self.envelope().meta()
291 }
292
293 pub fn estimated_size(&self) -> usize {
302 (f64::ceil(
304 (self.context.summary.payload_size + size_of::<Self>() + size_of::<Envelope>()) as f64
305 / 1000.,
306 ) * 1000.) as usize
307 }
308
309 pub fn received_at(&self) -> DateTime<Utc> {
313 self.envelope.received_at()
314 }
315
316 pub fn age(&self) -> Duration {
320 self.envelope.age()
321 }
322
323 pub(super) fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
326 &self.outcome_aggregator
327 }
328
329 fn finish(&mut self, counter: RelayCounters, handling: Handling) {
331 relay_statsd::metric!(counter(counter) += 1, handling = handling.as_str());
332 relay_statsd::metric!(timer(RelayTimers::EnvelopeTotalTime) = self.age());
333
334 self.context.done = true;
335 }
336}
337
338impl Drop for ManagedEnvelope {
339 fn drop(&mut self) {
340 self.reject(Outcome::Invalid(DiscardReason::Internal));
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347 use bytes::Bytes;
348
349 #[test]
350 fn span_metrics_are_reported() {
351 let bytes =
352 Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#);
353 let envelope = Envelope::parse_bytes(bytes).unwrap();
354
355 let (outcome_aggregator, mut rx) = Addr::custom();
356 let mut env = ManagedEnvelope::new(envelope, outcome_aggregator);
357 env.context.summary.span_quantity = 123;
358 env.context.summary.secondary_span_quantity = 456;
359
360 env.reject(Outcome::Abuse);
361
362 rx.close();
363
364 let outcome = rx.blocking_recv().unwrap();
365 assert_eq!(outcome.category, DataCategory::Span);
366 assert_eq!(outcome.quantity, 123);
367 assert_eq!(outcome.outcome, Outcome::Abuse);
368
369 let outcome = rx.blocking_recv().unwrap();
370 assert_eq!(outcome.category, DataCategory::SpanIndexed);
371 assert_eq!(outcome.quantity, 123);
372 assert_eq!(outcome.outcome, Outcome::Abuse);
373
374 let outcome = rx.blocking_recv().unwrap();
375 assert_eq!(outcome.category, DataCategory::Span);
376 assert_eq!(outcome.quantity, 456);
377 assert_eq!(outcome.outcome, Outcome::Abuse);
378
379 assert!(rx.blocking_recv().is_none());
380 }
381}