relay_server/managed/
envelope.rs

1use std::fmt::{Debug, Display};
2use std::marker::PhantomData;
3use std::mem::size_of;
4use std::ops::{Deref, DerefMut};
5use std::time::Duration;
6
7use chrono::{DateTime, Utc};
8use relay_quotas::{DataCategory, Scoping};
9use relay_system::Addr;
10
11use crate::envelope::{Envelope, Item};
12use crate::extractors::RequestMeta;
13use crate::managed::Counted as _;
14use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
15use crate::services::processor::{Processed, ProcessingGroup};
16use crate::statsd::{RelayCounters, RelayTimers};
17use crate::utils::EnvelopeSummary;
18
19/// Denotes the success of handling an envelope.
20#[derive(Clone, Copy, Debug)]
21enum Handling {
22    /// The envelope was handled successfully.
23    ///
24    /// This can be the case even if the envelpoe was dropped. For example, if a rate limit is in
25    /// effect or if the corresponding project is disabled.
26    Success,
27    /// Handling the envelope failed due to an error or bug.
28    Failure,
29}
30
31impl Handling {
32    fn from_outcome(outcome: &Outcome) -> Self {
33        if outcome.is_unexpected() {
34            Self::Failure
35        } else {
36            Self::Success
37        }
38    }
39
40    fn as_str(&self) -> &str {
41        match self {
42            Handling::Success => "success",
43            Handling::Failure => "failure",
44        }
45    }
46}
47
48/// Represents the decision on whether or not to keep an envelope item.
49#[derive(Debug, Clone)]
50pub enum ItemAction {
51    /// Keep the item.
52    Keep,
53    /// Drop the item and log an outcome for it.
54    Drop(Outcome),
55    /// Drop the item without logging an outcome.
56    DropSilently,
57}
58
59#[derive(Debug)]
60struct EnvelopeContext {
61    summary: EnvelopeSummary,
62    scoping: Scoping,
63    partition_key: Option<u32>,
64    done: bool,
65}
66
67/// Error emitted when converting a [`ManagedEnvelope`] and a processing group into a [`TypedEnvelope`].
68#[derive(Debug)]
69pub struct InvalidProcessingGroupType(pub ManagedEnvelope, pub ProcessingGroup);
70
71impl Display for InvalidProcessingGroupType {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        f.write_fmt(format_args!(
74            "failed to convert to the processing group {} based on the provided type",
75            self.1.variant()
76        ))
77    }
78}
79
80impl std::error::Error for InvalidProcessingGroupType {}
81
82/// A wrapper for [`ManagedEnvelope`] with assigned processing group type.
83pub struct TypedEnvelope<G>(ManagedEnvelope, PhantomData<G>);
84
85impl<G> TypedEnvelope<G> {
86    /// Changes the typed of the current envelope to processed.
87    ///
88    /// Once it's marked processed it can be submitted to upstream.
89    pub fn into_processed(self) -> TypedEnvelope<Processed> {
90        TypedEnvelope::new(self.0)
91    }
92
93    /// Accepts the envelope and drops the internal managed envelope with its context.
94    ///
95    /// This should be called if the envelope has been accepted by the upstream, which means that
96    /// the responsibility for logging outcomes has been moved. This function will not log any
97    /// outcomes.
98    pub fn accept(self) {
99        self.0.accept()
100    }
101
102    /// Returns the raw [`ManagedEnvelope`].
103    pub fn into_inner(self) -> ManagedEnvelope {
104        self.0
105    }
106
107    /// Creates a new typed envelope.
108    ///
109    /// Note: this method is private to make sure that only `TryFrom` implementation is used, which
110    /// requires the check for the error if conversion is failing.
111    fn new(managed_envelope: ManagedEnvelope) -> Self {
112        Self(managed_envelope, Default::default())
113    }
114}
115
116impl<G: TryFrom<ProcessingGroup>> TryFrom<(ManagedEnvelope, ProcessingGroup)> for TypedEnvelope<G> {
117    type Error = InvalidProcessingGroupType;
118    fn try_from(
119        (envelope, group): (ManagedEnvelope, ProcessingGroup),
120    ) -> Result<Self, Self::Error> {
121        match <ProcessingGroup as TryInto<G>>::try_into(group) {
122            Ok(_) => Ok(TypedEnvelope::new(envelope)),
123            Err(_) => Err(InvalidProcessingGroupType(envelope, group)),
124        }
125    }
126}
127
128impl<G> Debug for TypedEnvelope<G> {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        f.debug_tuple("TypedEnvelope").field(&self.0).finish()
131    }
132}
133
134impl<G> Deref for TypedEnvelope<G> {
135    type Target = ManagedEnvelope;
136
137    fn deref(&self) -> &Self::Target {
138        &self.0
139    }
140}
141
142impl<G> DerefMut for TypedEnvelope<G> {
143    fn deref_mut(&mut self) -> &mut Self::Target {
144        &mut self.0
145    }
146}
147
148/// Tracks the lifetime of an [`Envelope`] in Relay.
149///
150/// The managed envelope accompanies envelopes through the processing pipeline in Relay and ensures
151/// that outcomes are recorded when the Envelope is dropped. They can be dropped in one of three
152/// ways:
153///
154///  - By calling [`accept`](Self::accept). Responsibility of the envelope has been transferred to
155///    another service, and no further outcomes need to be recorded.
156///  - By calling [`reject`](Self::reject). The entire envelope was dropped, and the outcome
157///    specifies the reason.
158///  - By dropping the managed envelope. This indicates an issue or a bug and raises the
159///    `"internal"` outcome. There should be additional error handling to report an error to Sentry.
160///
161/// The managed envelope also holds a processing queue permit which is used for backpressure
162/// management. It is automatically reclaimed when the context is dropped along with the envelope.
163#[derive(Debug)]
164pub struct ManagedEnvelope {
165    envelope: Box<Envelope>,
166    context: EnvelopeContext,
167    outcome_aggregator: Addr<TrackOutcome>,
168}
169
170impl ManagedEnvelope {
171    /// Computes a managed envelope from the given envelope and binds it to the processing queue.
172    ///
173    /// To provide additional scoping, use [`ManagedEnvelope::scope`].
174    pub fn new(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
175        let meta = &envelope.meta();
176        let summary = EnvelopeSummary::compute(envelope.as_ref());
177        let scoping = meta.get_partial_scoping().into_scoping();
178
179        Self {
180            envelope,
181            context: EnvelopeContext {
182                summary,
183                scoping,
184                partition_key: None,
185                done: false,
186            },
187            outcome_aggregator,
188        }
189    }
190
191    /// An untracked instance which does not emit outcomes, useful for testing.
192    #[cfg(test)]
193    pub fn untracked(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
194        let mut envelope = Self::new(envelope, outcome_aggregator);
195        envelope.context.done = true;
196        envelope
197    }
198
199    /// Returns a reference to the contained [`Envelope`].
200    pub fn envelope(&self) -> &Envelope {
201        self.envelope.as_ref()
202    }
203
204    /// Returns a mutable reference to the contained [`Envelope`].
205    pub fn envelope_mut(&mut self) -> &mut Envelope {
206        self.envelope.as_mut()
207    }
208
209    /// Consumes itself returning the managed envelope.
210    pub fn into_envelope(mut self) -> Box<Envelope> {
211        self.context.done = true;
212        self.take_envelope()
213    }
214
215    /// Converts current managed envelope into processed envelope.
216    ///
217    /// Once it's marked processed it can be submitted to upstream.
218    pub fn into_processed(self) -> TypedEnvelope<Processed> {
219        TypedEnvelope::new(self)
220    }
221
222    /// Take the envelope out of the context and replace it with a dummy.
223    ///
224    /// Note that after taking out the envelope, the envelope summary is incorrect.
225    pub(crate) fn take_envelope(&mut self) -> Box<Envelope> {
226        Box::new(self.envelope.take_items())
227    }
228
229    /// Update the context with envelope information.
230    ///
231    /// This updates the item summary as well as the event id.
232    pub fn update(&mut self) -> &mut Self {
233        self.context.summary = EnvelopeSummary::compute(self.envelope());
234        self
235    }
236
237    /// Retains or drops items based on the [`ItemAction`].
238    ///
239    ///
240    /// This method operates in place and preserves the order of the retained items.
241    pub fn retain_items<F>(&mut self, mut f: F)
242    where
243        F: FnMut(&mut Item) -> ItemAction,
244    {
245        let mut outcomes = Vec::new();
246        self.envelope.retain_items(|item| match f(item) {
247            ItemAction::Keep => true,
248            ItemAction::DropSilently => false,
249            ItemAction::Drop(outcome) => {
250                for (category, quantity) in item.quantities() {
251                    outcomes.push((outcome.clone(), category, quantity));
252                }
253
254                false
255            }
256        });
257        for (outcome, category, quantity) in outcomes {
258            self.track_outcome(outcome, category, quantity);
259        }
260        // TODO: once `update` is private, it should be called here.
261    }
262
263    /// Drops every item in the envelope.
264    pub fn drop_items_silently(&mut self) {
265        self.envelope.drop_items_silently();
266    }
267
268    /// Re-scopes this context to the given scoping.
269    pub fn scope(&mut self, scoping: Scoping) -> &mut Self {
270        self.context.scoping = scoping;
271        self
272    }
273
274    /// Removes event item(s) and logs an outcome.
275    ///
276    /// Note: This function relies on the envelope summary being correct.
277    pub fn reject_event(&mut self, outcome: Outcome) {
278        if let Some(event_category) = self.event_category() {
279            self.envelope.retain_items(|item| !item.creates_event());
280            if let Some(indexed) = event_category.index_category() {
281                self.track_outcome(outcome.clone(), indexed, 1);
282            }
283            self.track_outcome(outcome, event_category, 1);
284        }
285    }
286
287    /// Records an outcome scoped to this envelope's context.
288    ///
289    /// This managed envelope should be updated using [`update`](Self::update) soon after this
290    /// operation to ensure that subsequent outcomes are consistent.
291    pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
292        self.outcome_aggregator.send(TrackOutcome {
293            timestamp: self.received_at(),
294            scoping: self.context.scoping,
295            outcome,
296            event_id: self.envelope.event_id(),
297            remote_addr: self.meta().remote_addr(),
298            category,
299            // Quantities are usually `usize` which lets us go all the way to 64-bit on our
300            // machines, but the protocol and data store can only do 32-bit.
301            quantity: quantity as u32,
302        });
303    }
304
305    /// Accepts the envelope and drops the context.
306    ///
307    /// This should be called if the envelope has been accepted by the upstream, which means that
308    /// the responsibility for logging outcomes has been moved. This function will not log any
309    /// outcomes.
310    pub fn accept(mut self) {
311        if !self.context.done {
312            self.finish(RelayCounters::EnvelopeAccepted, Handling::Success);
313        }
314    }
315
316    /// Returns the data category of the event item in the envelope.
317    fn event_category(&self) -> Option<DataCategory> {
318        self.context.summary.event_category
319    }
320
321    /// Records rejection outcomes for all items stored in this context.
322    ///
323    /// This does not send outcomes for empty envelopes or request-only contexts.
324    pub fn reject(&mut self, outcome: Outcome) {
325        if self.context.done {
326            return;
327        }
328
329        // Errors are only logged for what we consider failed request handling. In other cases, we
330        // "expect" errors and log them as debug level.
331        let handling = Handling::from_outcome(&outcome);
332        match handling {
333            Handling::Success => relay_log::debug!("dropped envelope: {outcome}"),
334            Handling::Failure => {
335                let summary = &self.context.summary;
336
337                relay_log::error!(
338                    tags.project_key = self.scoping().project_key.to_string(),
339                    tags.has_attachments = summary.attachment_quantities.bytes() > 0,
340                    tags.has_sessions = summary.session_quantity > 0,
341                    tags.has_profiles = summary.profile_quantity.total > 0,
342                    tags.has_transactions = summary.secondary_transaction_quantity > 0,
343                    tags.has_span_metrics = summary.secondary_span_quantity > 0,
344                    tags.has_replays = summary.replay_quantity > 0,
345                    tags.has_user_reports = summary.user_report_quantity > 0,
346                    tags.has_trace_metrics = summary.trace_metric_quantity > 0,
347                    tags.has_checkins = summary.monitor_quantity > 0,
348                    tags.event_category = ?summary.event_category,
349                    cached_summary = ?summary,
350                    recomputed_summary = ?EnvelopeSummary::compute(self.envelope()),
351                    "dropped envelope: {outcome}"
352                );
353            }
354        }
355
356        for (category, quantity) in self.context.summary.quantities() {
357            self.track_outcome(outcome.clone(), category, quantity);
358        }
359
360        self.finish(RelayCounters::EnvelopeRejected, handling);
361    }
362
363    /// Returns scoping stored in this context.
364    pub fn scoping(&self) -> Scoping {
365        self.context.scoping
366    }
367
368    /// Returns the partition key, which is set on upstream requests in the `X-Sentry-Relay-Shard` header.
369    pub fn partition_key(&self) -> Option<u32> {
370        self.context.partition_key
371    }
372
373    /// Sets a new [`Self::partition_key`].
374    pub fn set_partition_key(&mut self, partition_key: Option<u32>) -> &mut Self {
375        self.context.partition_key = partition_key;
376        self
377    }
378
379    /// Returns the contained original request meta.
380    pub fn meta(&self) -> &RequestMeta {
381        self.envelope().meta()
382    }
383
384    /// Returns estimated size of this envelope.
385    ///
386    /// This is just an estimated size, which in reality can be somewhat bigger, depending on the
387    /// list of additional attributes allocated on all of the inner types.
388    ///
389    /// NOTE: Current implementation counts in only the size of the items payload and stack
390    /// allocated parts of [`Envelope`] and [`ManagedEnvelope`]. All the heap allocated fields
391    /// within early mentioned types are skipped.
392    pub fn estimated_size(&self) -> usize {
393        // Always round it up to next 1KB.
394        (f64::ceil(
395            (self.context.summary.payload_size + size_of::<Self>() + size_of::<Envelope>()) as f64
396                / 1000.,
397        ) * 1000.) as usize
398    }
399
400    /// Returns the time at which the envelope was received at this Relay.
401    ///
402    /// This is the date time equivalent to [`start_time`](Self::received_at).
403    pub fn received_at(&self) -> DateTime<Utc> {
404        self.envelope.received_at()
405    }
406
407    /// Returns the time elapsed in seconds since the envelope was received by this Relay.
408    ///
409    /// In case the elapsed time is negative, it is assumed that no time elapsed.
410    pub fn age(&self) -> Duration {
411        self.envelope.age()
412    }
413
414    /// Escape hatch for the [`super::Managed`] type, to make it possible to construct
415    /// from a managed envelope.
416    pub(super) fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
417        &self.outcome_aggregator
418    }
419
420    /// Resets inner state to ensure there's no more logging.
421    fn finish(&mut self, counter: RelayCounters, handling: Handling) {
422        relay_statsd::metric!(counter(counter) += 1, handling = handling.as_str());
423        relay_statsd::metric!(timer(RelayTimers::EnvelopeTotalTime) = self.age());
424
425        self.context.done = true;
426    }
427}
428
429impl Drop for ManagedEnvelope {
430    fn drop(&mut self) {
431        self.reject(Outcome::Invalid(DiscardReason::Internal));
432    }
433}
434
435impl<G> From<TypedEnvelope<G>> for ManagedEnvelope {
436    fn from(value: TypedEnvelope<G>) -> Self {
437        value.0
438    }
439}
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444    use bytes::Bytes;
445
446    #[test]
447    fn span_metrics_are_reported() {
448        let bytes =
449            Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#);
450        let envelope = Envelope::parse_bytes(bytes).unwrap();
451
452        let (outcome_aggregator, mut rx) = Addr::custom();
453        let mut env = ManagedEnvelope::new(envelope, outcome_aggregator);
454        env.context.summary.span_quantity = 123;
455        env.context.summary.secondary_span_quantity = 456;
456
457        env.reject(Outcome::Abuse);
458
459        rx.close();
460
461        let outcome = rx.blocking_recv().unwrap();
462        assert_eq!(outcome.category, DataCategory::Span);
463        assert_eq!(outcome.quantity, 123);
464        assert_eq!(outcome.outcome, Outcome::Abuse);
465
466        let outcome = rx.blocking_recv().unwrap();
467        assert_eq!(outcome.category, DataCategory::SpanIndexed);
468        assert_eq!(outcome.quantity, 123);
469        assert_eq!(outcome.outcome, Outcome::Abuse);
470
471        let outcome = rx.blocking_recv().unwrap();
472        assert_eq!(outcome.category, DataCategory::Span);
473        assert_eq!(outcome.quantity, 456);
474        assert_eq!(outcome.outcome, Outcome::Abuse);
475
476        assert!(rx.blocking_recv().is_none());
477    }
478}