relay_server/managed/
envelope.rs

1use std::fmt::{Debug, Display};
2use std::marker::PhantomData;
3use std::mem::size_of;
4use std::ops::{Deref, DerefMut};
5use std::time::Duration;
6
7use chrono::{DateTime, Utc};
8use relay_quotas::{DataCategory, Scoping};
9use relay_system::Addr;
10
11use crate::envelope::{Envelope, Item};
12use crate::extractors::RequestMeta;
13use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
14use crate::services::processor::{Processed, ProcessingGroup};
15use crate::statsd::{RelayCounters, RelayTimers};
16use crate::utils::EnvelopeSummary;
17
18/// Denotes the success of handling an envelope.
19#[derive(Clone, Copy, Debug)]
20enum Handling {
21    /// The envelope was handled successfully.
22    ///
23    /// This can be the case even if the envelpoe was dropped. For example, if a rate limit is in
24    /// effect or if the corresponding project is disabled.
25    Success,
26    /// Handling the envelope failed due to an error or bug.
27    Failure,
28}
29
30impl Handling {
31    fn from_outcome(outcome: &Outcome) -> Self {
32        if outcome.is_unexpected() {
33            Self::Failure
34        } else {
35            Self::Success
36        }
37    }
38
39    fn as_str(&self) -> &str {
40        match self {
41            Handling::Success => "success",
42            Handling::Failure => "failure",
43        }
44    }
45}
46
47/// Represents the decision on whether or not to keep an envelope item.
48#[derive(Debug, Clone)]
49pub enum ItemAction {
50    /// Keep the item.
51    Keep,
52    /// Drop the item and log an outcome for it.
53    Drop(Outcome),
54    /// Drop the item without logging an outcome.
55    DropSilently,
56}
57
58#[derive(Debug)]
59struct EnvelopeContext {
60    summary: EnvelopeSummary,
61    scoping: Scoping,
62    partition_key: Option<u32>,
63    done: bool,
64}
65
66/// Error emitted when converting a [`ManagedEnvelope`] and a processing group into a [`TypedEnvelope`].
67#[derive(Debug)]
68pub struct InvalidProcessingGroupType(pub ManagedEnvelope, pub ProcessingGroup);
69
70impl Display for InvalidProcessingGroupType {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.write_fmt(format_args!(
73            "failed to convert to the processing group {} based on the provided type",
74            self.1.variant()
75        ))
76    }
77}
78
79impl std::error::Error for InvalidProcessingGroupType {}
80
81/// A wrapper for [`ManagedEnvelope`] with assigned processing group type.
82pub struct TypedEnvelope<G>(ManagedEnvelope, PhantomData<G>);
83
84impl<G> TypedEnvelope<G> {
85    /// Changes the typed of the current envelope to processed.
86    ///
87    /// Once it's marked processed it can be submitted to upstream.
88    pub fn into_processed(self) -> TypedEnvelope<Processed> {
89        TypedEnvelope::new(self.0)
90    }
91
92    /// Accepts the envelope and drops the internal managed envelope with its context.
93    ///
94    /// This should be called if the envelope has been accepted by the upstream, which means that
95    /// the responsibility for logging outcomes has been moved. This function will not log any
96    /// outcomes.
97    pub fn accept(self) {
98        self.0.accept()
99    }
100
101    /// Creates a new typed envelope.
102    ///
103    /// Note: this method is private to make sure that only `TryFrom` implementation is used, which
104    /// requires the check for the error if conversion is failing.
105    fn new(managed_envelope: ManagedEnvelope) -> Self {
106        Self(managed_envelope, Default::default())
107    }
108}
109
110impl<G: TryFrom<ProcessingGroup>> TryFrom<(ManagedEnvelope, ProcessingGroup)> for TypedEnvelope<G> {
111    type Error = InvalidProcessingGroupType;
112    fn try_from(
113        (envelope, group): (ManagedEnvelope, ProcessingGroup),
114    ) -> Result<Self, Self::Error> {
115        match <ProcessingGroup as TryInto<G>>::try_into(group) {
116            Ok(_) => Ok(TypedEnvelope::new(envelope)),
117            Err(_) => Err(InvalidProcessingGroupType(envelope, group)),
118        }
119    }
120}
121
122impl<G> Debug for TypedEnvelope<G> {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        f.debug_tuple("TypedEnvelope").field(&self.0).finish()
125    }
126}
127
128impl<G> Deref for TypedEnvelope<G> {
129    type Target = ManagedEnvelope;
130
131    fn deref(&self) -> &Self::Target {
132        &self.0
133    }
134}
135
136impl<G> DerefMut for TypedEnvelope<G> {
137    fn deref_mut(&mut self) -> &mut Self::Target {
138        &mut self.0
139    }
140}
141
142/// Tracks the lifetime of an [`Envelope`] in Relay.
143///
144/// The managed envelope accompanies envelopes through the processing pipeline in Relay and ensures
145/// that outcomes are recorded when the Envelope is dropped. They can be dropped in one of three
146/// ways:
147///
148///  - By calling [`accept`](Self::accept). Responsibility of the envelope has been transferred to
149///    another service, and no further outcomes need to be recorded.
150///  - By calling [`reject`](Self::reject). The entire envelope was dropped, and the outcome
151///    specifies the reason.
152///  - By dropping the managed envelope. This indicates an issue or a bug and raises the
153///    `"internal"` outcome. There should be additional error handling to report an error to Sentry.
154///
155/// The managed envelope also holds a processing queue permit which is used for backpressure
156/// management. It is automatically reclaimed when the context is dropped along with the envelope.
157#[derive(Debug)]
158pub struct ManagedEnvelope {
159    envelope: Box<Envelope>,
160    context: EnvelopeContext,
161    outcome_aggregator: Addr<TrackOutcome>,
162}
163
164impl ManagedEnvelope {
165    /// Computes a managed envelope from the given envelope and binds it to the processing queue.
166    ///
167    /// To provide additional scoping, use [`ManagedEnvelope::scope`].
168    pub fn new(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
169        let meta = &envelope.meta();
170        let summary = EnvelopeSummary::compute(envelope.as_ref());
171        let scoping = meta.get_partial_scoping().into_scoping();
172
173        Self {
174            envelope,
175            context: EnvelopeContext {
176                summary,
177                scoping,
178                partition_key: None,
179                done: false,
180            },
181            outcome_aggregator,
182        }
183    }
184
185    /// An untracked instance which does not emit outcomes, useful for testing.
186    #[cfg(test)]
187    pub fn untracked(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
188        let mut envelope = Self::new(envelope, outcome_aggregator);
189        envelope.context.done = true;
190        envelope
191    }
192
193    /// Returns a reference to the contained [`Envelope`].
194    pub fn envelope(&self) -> &Envelope {
195        self.envelope.as_ref()
196    }
197
198    /// Returns a mutable reference to the contained [`Envelope`].
199    pub fn envelope_mut(&mut self) -> &mut Envelope {
200        self.envelope.as_mut()
201    }
202
203    /// Consumes itself returning the managed envelope.
204    pub fn into_envelope(mut self) -> Box<Envelope> {
205        self.context.done = true;
206        self.take_envelope()
207    }
208
209    /// Converts current managed envelope into processed envelope.
210    ///
211    /// Once it's marked processed it can be submitted to upstream.
212    pub fn into_processed(self) -> TypedEnvelope<Processed> {
213        TypedEnvelope::new(self)
214    }
215
216    /// Take the envelope out of the context and replace it with a dummy.
217    ///
218    /// Note that after taking out the envelope, the envelope summary is incorrect.
219    pub(crate) fn take_envelope(&mut self) -> Box<Envelope> {
220        Box::new(self.envelope.take_items())
221    }
222
223    /// Update the context with envelope information.
224    ///
225    /// This updates the item summary as well as the event id.
226    pub fn update(&mut self) -> &mut Self {
227        self.context.summary = EnvelopeSummary::compute(self.envelope());
228        self
229    }
230
231    /// Retains or drops items based on the [`ItemAction`].
232    ///
233    ///
234    /// This method operates in place and preserves the order of the retained items.
235    pub fn retain_items<F>(&mut self, mut f: F)
236    where
237        F: FnMut(&mut Item) -> ItemAction,
238    {
239        let mut outcomes = Vec::new();
240        self.envelope.retain_items(|item| match f(item) {
241            ItemAction::Keep => true,
242            ItemAction::DropSilently => false,
243            ItemAction::Drop(outcome) => {
244                for (category, quantity) in item.quantities() {
245                    outcomes.push((outcome.clone(), category, quantity));
246                }
247
248                false
249            }
250        });
251        for (outcome, category, quantity) in outcomes {
252            self.track_outcome(outcome, category, quantity);
253        }
254        // TODO: once `update` is private, it should be called here.
255    }
256
257    /// Drops every item in the envelope.
258    pub fn drop_items_silently(&mut self) {
259        self.envelope.drop_items_silently();
260    }
261
262    /// Re-scopes this context to the given scoping.
263    pub fn scope(&mut self, scoping: Scoping) -> &mut Self {
264        self.context.scoping = scoping;
265        self
266    }
267
268    /// Removes event item(s) and logs an outcome.
269    ///
270    /// Note: This function relies on the envelope summary being correct.
271    pub fn reject_event(&mut self, outcome: Outcome) {
272        if let Some(event_category) = self.event_category() {
273            self.envelope.retain_items(|item| !item.creates_event());
274            if let Some(indexed) = event_category.index_category() {
275                self.track_outcome(outcome.clone(), indexed, 1);
276            }
277            self.track_outcome(outcome, event_category, 1);
278        }
279    }
280
281    /// Records an outcome scoped to this envelope's context.
282    ///
283    /// This managed envelope should be updated using [`update`](Self::update) soon after this
284    /// operation to ensure that subsequent outcomes are consistent.
285    pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
286        self.outcome_aggregator.send(TrackOutcome {
287            timestamp: self.received_at(),
288            scoping: self.context.scoping,
289            outcome,
290            event_id: self.envelope.event_id(),
291            remote_addr: self.meta().remote_addr(),
292            category,
293            // Quantities are usually `usize` which lets us go all the way to 64-bit on our
294            // machines, but the protocol and data store can only do 32-bit.
295            quantity: quantity as u32,
296        });
297    }
298
299    /// Accepts the envelope and drops the context.
300    ///
301    /// This should be called if the envelope has been accepted by the upstream, which means that
302    /// the responsibility for logging outcomes has been moved. This function will not log any
303    /// outcomes.
304    pub fn accept(mut self) {
305        if !self.context.done {
306            self.finish(RelayCounters::EnvelopeAccepted, Handling::Success);
307        }
308    }
309
310    /// Returns the data category of the event item in the envelope.
311    fn event_category(&self) -> Option<DataCategory> {
312        self.context.summary.event_category
313    }
314
315    /// Records rejection outcomes for all items stored in this context.
316    ///
317    /// This does not send outcomes for empty envelopes or request-only contexts.
318    pub fn reject(&mut self, outcome: Outcome) {
319        if self.context.done {
320            return;
321        }
322
323        // Errors are only logged for what we consider failed request handling. In other cases, we
324        // "expect" errors and log them as debug level.
325        let handling = Handling::from_outcome(&outcome);
326        match handling {
327            Handling::Success => relay_log::debug!("dropped envelope: {outcome}"),
328            Handling::Failure => {
329                let summary = &self.context.summary;
330
331                relay_log::error!(
332                    tags.project_key = self.scoping().project_key.to_string(),
333                    tags.has_attachments = summary.attachment_quantity > 0,
334                    tags.has_sessions = summary.session_quantity > 0,
335                    tags.has_profiles = summary.profile_quantity > 0,
336                    tags.has_transactions = summary.secondary_transaction_quantity > 0,
337                    tags.has_span_metrics = summary.secondary_span_quantity > 0,
338                    tags.has_replays = summary.replay_quantity > 0,
339                    tags.has_user_reports = summary.user_report_quantity > 0,
340                    tags.has_trace_metrics = summary.trace_metric_quantity > 0,
341                    tags.has_checkins = summary.monitor_quantity > 0,
342                    tags.event_category = ?summary.event_category,
343                    cached_summary = ?summary,
344                    recomputed_summary = ?EnvelopeSummary::compute(self.envelope()),
345                    "dropped envelope: {outcome}"
346                );
347            }
348        }
349
350        if let Some(category) = self.event_category() {
351            if let Some(category) = category.index_category() {
352                self.track_outcome(outcome.clone(), category, 1);
353            }
354            self.track_outcome(outcome.clone(), category, 1);
355        }
356
357        if self.context.summary.attachment_quantity > 0 {
358            self.track_outcome(
359                outcome.clone(),
360                DataCategory::Attachment,
361                self.context.summary.attachment_quantity,
362            );
363        }
364
365        if self.context.summary.monitor_quantity > 0 {
366            self.track_outcome(
367                outcome.clone(),
368                DataCategory::Monitor,
369                self.context.summary.monitor_quantity,
370            );
371        }
372
373        if self.context.summary.profile_quantity > 0 {
374            self.track_outcome(
375                outcome.clone(),
376                DataCategory::Profile,
377                self.context.summary.profile_quantity,
378            );
379            self.track_outcome(
380                outcome.clone(),
381                DataCategory::ProfileIndexed,
382                self.context.summary.profile_quantity,
383            );
384        }
385
386        if self.context.summary.span_quantity > 0 {
387            self.track_outcome(
388                outcome.clone(),
389                DataCategory::Span,
390                self.context.summary.span_quantity,
391            );
392            self.track_outcome(
393                outcome.clone(),
394                DataCategory::SpanIndexed,
395                self.context.summary.span_quantity,
396            );
397        }
398
399        if self.context.summary.log_item_quantity > 0 {
400            self.track_outcome(
401                outcome.clone(),
402                DataCategory::LogItem,
403                self.context.summary.log_item_quantity,
404            );
405        }
406        if self.context.summary.log_byte_quantity > 0 {
407            self.track_outcome(
408                outcome.clone(),
409                DataCategory::LogByte,
410                self.context.summary.log_byte_quantity,
411            );
412        }
413
414        // Track outcomes for attached secondary transactions, e.g. extracted from metrics.
415        //
416        // Primary transaction count is already tracked through the event category
417        // (see: `Self::event_category()`).
418        if self.context.summary.secondary_transaction_quantity > 0 {
419            self.track_outcome(
420                outcome.clone(),
421                // Secondary transaction counts are never indexed transactions
422                DataCategory::Transaction,
423                self.context.summary.secondary_transaction_quantity,
424            );
425        }
426
427        // Track outcomes for attached secondary spans, e.g. extracted from metrics.
428        //
429        // Primary span count is already tracked through `SpanIndexed`.
430        if self.context.summary.secondary_span_quantity > 0 {
431            self.track_outcome(
432                outcome.clone(),
433                // Secondary transaction counts are never indexed transactions
434                DataCategory::Span,
435                self.context.summary.secondary_span_quantity,
436            );
437        }
438
439        if self.context.summary.replay_quantity > 0 {
440            self.track_outcome(
441                outcome.clone(),
442                DataCategory::Replay,
443                self.context.summary.replay_quantity,
444            );
445        }
446
447        // Track outcomes for user reports, the legacy item type for user feedback.
448        //
449        // User reports are not events, but count toward UserReportV2 for quotas and outcomes.
450        if self.context.summary.user_report_quantity > 0 {
451            self.track_outcome(
452                outcome.clone(),
453                DataCategory::UserReportV2,
454                self.context.summary.user_report_quantity,
455            );
456        }
457
458        if self.context.summary.profile_chunk_quantity > 0 {
459            self.track_outcome(
460                outcome.clone(),
461                DataCategory::ProfileChunk,
462                self.context.summary.profile_chunk_quantity,
463            );
464        }
465
466        if self.context.summary.profile_chunk_ui_quantity > 0 {
467            self.track_outcome(
468                outcome.clone(),
469                DataCategory::ProfileChunkUi,
470                self.context.summary.profile_chunk_ui_quantity,
471            );
472        }
473
474        if self.context.summary.session_quantity > 0 {
475            self.track_outcome(
476                outcome.clone(),
477                DataCategory::Session,
478                self.context.summary.session_quantity,
479            );
480        }
481
482        self.finish(RelayCounters::EnvelopeRejected, handling);
483    }
484
485    /// Returns scoping stored in this context.
486    pub fn scoping(&self) -> Scoping {
487        self.context.scoping
488    }
489
490    /// Returns the partition key, which is set on upstream requests in the `X-Sentry-Relay-Shard` header.
491    pub fn partition_key(&self) -> Option<u32> {
492        self.context.partition_key
493    }
494
495    /// Sets a new [`Self::partition_key`].
496    pub fn set_partition_key(&mut self, partition_key: Option<u32>) -> &mut Self {
497        self.context.partition_key = partition_key;
498        self
499    }
500
501    /// Returns the contained original request meta.
502    pub fn meta(&self) -> &RequestMeta {
503        self.envelope().meta()
504    }
505
506    /// Returns estimated size of this envelope.
507    ///
508    /// This is just an estimated size, which in reality can be somewhat bigger, depending on the
509    /// list of additional attributes allocated on all of the inner types.
510    ///
511    /// NOTE: Current implementation counts in only the size of the items payload and stack
512    /// allocated parts of [`Envelope`] and [`ManagedEnvelope`]. All the heap allocated fields
513    /// within early mentioned types are skipped.
514    pub fn estimated_size(&self) -> usize {
515        // Always round it up to next 1KB.
516        (f64::ceil(
517            (self.context.summary.payload_size + size_of::<Self>() + size_of::<Envelope>()) as f64
518                / 1000.,
519        ) * 1000.) as usize
520    }
521
522    /// Returns the time at which the envelope was received at this Relay.
523    ///
524    /// This is the date time equivalent to [`start_time`](Self::received_at).
525    pub fn received_at(&self) -> DateTime<Utc> {
526        self.envelope.received_at()
527    }
528
529    /// Returns the time elapsed in seconds since the envelope was received by this Relay.
530    ///
531    /// In case the elapsed time is negative, it is assumed that no time elapsed.
532    pub fn age(&self) -> Duration {
533        self.envelope.age()
534    }
535
536    /// Escape hatch for the [`super::Managed`] type, to make it possible to construct
537    /// from a managed envelope.
538    pub(super) fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
539        &self.outcome_aggregator
540    }
541
542    /// Resets inner state to ensure there's no more logging.
543    fn finish(&mut self, counter: RelayCounters, handling: Handling) {
544        relay_statsd::metric!(counter(counter) += 1, handling = handling.as_str());
545        relay_statsd::metric!(timer(RelayTimers::EnvelopeTotalTime) = self.age());
546
547        self.context.done = true;
548    }
549}
550
551impl Drop for ManagedEnvelope {
552    fn drop(&mut self) {
553        self.reject(Outcome::Invalid(DiscardReason::Internal));
554    }
555}
556
557impl<G> From<TypedEnvelope<G>> for ManagedEnvelope {
558    fn from(value: TypedEnvelope<G>) -> Self {
559        value.0
560    }
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566    use bytes::Bytes;
567
568    #[test]
569    fn span_metrics_are_reported() {
570        let bytes =
571            Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#);
572        let envelope = Envelope::parse_bytes(bytes).unwrap();
573
574        let (outcome_aggregator, mut rx) = Addr::custom();
575        let mut env = ManagedEnvelope::new(envelope, outcome_aggregator);
576        env.context.summary.span_quantity = 123;
577        env.context.summary.secondary_span_quantity = 456;
578
579        env.reject(Outcome::Abuse);
580
581        rx.close();
582
583        let outcome = rx.blocking_recv().unwrap();
584        assert_eq!(outcome.category, DataCategory::Span);
585        assert_eq!(outcome.quantity, 123);
586        assert_eq!(outcome.outcome, Outcome::Abuse);
587
588        let outcome = rx.blocking_recv().unwrap();
589        assert_eq!(outcome.category, DataCategory::SpanIndexed);
590        assert_eq!(outcome.quantity, 123);
591        assert_eq!(outcome.outcome, Outcome::Abuse);
592
593        let outcome = rx.blocking_recv().unwrap();
594        assert_eq!(outcome.category, DataCategory::Span);
595        assert_eq!(outcome.quantity, 456);
596        assert_eq!(outcome.outcome, Outcome::Abuse);
597
598        assert!(rx.blocking_recv().is_none());
599    }
600}