Skip to main content

relay_server/managed/
envelope.rs

1use std::fmt::Debug;
2use std::mem::size_of;
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use relay_quotas::{DataCategory, Scoping};
7use relay_system::Addr;
8
9use crate::envelope::{Envelope, Item};
10use crate::extractors::RequestMeta;
11use crate::managed::Counted as _;
12use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
13use crate::statsd::{RelayCounters, RelayTimers};
14use crate::utils::EnvelopeSummary;
15
16/// Denotes the success of handling an envelope.
17#[derive(Clone, Copy, Debug)]
18enum Handling {
19    /// The envelope was handled successfully.
20    ///
21    /// This can be the case even if the envelpoe was dropped. For example, if a rate limit is in
22    /// effect or if the corresponding project is disabled.
23    Success,
24    /// Handling the envelope failed due to an error or bug.
25    Failure,
26}
27
28impl Handling {
29    fn from_outcome(outcome: &Outcome) -> Self {
30        if outcome.is_unexpected() {
31            Self::Failure
32        } else {
33            Self::Success
34        }
35    }
36
37    fn as_str(&self) -> &str {
38        match self {
39            Handling::Success => "success",
40            Handling::Failure => "failure",
41        }
42    }
43}
44
45/// Represents the decision on whether or not to keep an envelope item.
46#[derive(Debug, Clone)]
47pub enum ItemAction {
48    /// Keep the item.
49    Keep,
50    /// Drop the item and log an outcome for it.
51    Drop(Outcome),
52    /// Drop the item without logging an outcome.
53    DropSilently,
54}
55
56#[derive(Debug)]
57struct EnvelopeContext {
58    summary: EnvelopeSummary,
59    scoping: Scoping,
60    partition_key: Option<u32>,
61    done: bool,
62}
63
64/// Tracks the lifetime of an [`Envelope`] in Relay.
65///
66/// The managed envelope accompanies envelopes through the processing pipeline in Relay and ensures
67/// that outcomes are recorded when the Envelope is dropped. They can be dropped in one of three
68/// ways:
69///
70///  - By calling [`accept`](Self::accept). Responsibility of the envelope has been transferred to
71///    another service, and no further outcomes need to be recorded.
72///  - By calling [`reject`](Self::reject). The entire envelope was dropped, and the outcome
73///    specifies the reason.
74///  - By dropping the managed envelope. This indicates an issue or a bug and raises the
75///    `"internal"` outcome. There should be additional error handling to report an error to Sentry.
76///
77/// The managed envelope also holds a processing queue permit which is used for backpressure
78/// management. It is automatically reclaimed when the context is dropped along with the envelope.
79#[derive(Debug)]
80pub struct ManagedEnvelope {
81    envelope: Box<Envelope>,
82    context: EnvelopeContext,
83    outcome_aggregator: Addr<TrackOutcome>,
84}
85
86impl ManagedEnvelope {
87    /// Computes a managed envelope from the given envelope and binds it to the processing queue.
88    ///
89    /// To provide additional scoping, use [`ManagedEnvelope::scope`].
90    pub fn new(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
91        let meta = &envelope.meta();
92        let summary = EnvelopeSummary::compute(envelope.as_ref());
93        let scoping = meta.get_partial_scoping().into_scoping();
94
95        Self {
96            envelope,
97            context: EnvelopeContext {
98                summary,
99                scoping,
100                partition_key: None,
101                done: false,
102            },
103            outcome_aggregator,
104        }
105    }
106
107    /// An untracked instance which does not emit outcomes, useful for testing.
108    #[cfg(test)]
109    pub fn untracked(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
110        let mut envelope = Self::new(envelope, outcome_aggregator);
111        envelope.context.done = true;
112        envelope
113    }
114
115    /// Returns a reference to the contained [`Envelope`].
116    pub fn envelope(&self) -> &Envelope {
117        self.envelope.as_ref()
118    }
119
120    /// Returns a mutable reference to the contained [`Envelope`].
121    pub fn envelope_mut(&mut self) -> &mut Envelope {
122        self.envelope.as_mut()
123    }
124
125    /// Consumes itself returning the managed envelope.
126    pub fn into_envelope(mut self) -> Box<Envelope> {
127        self.context.done = true;
128        self.take_envelope()
129    }
130
131    /// Take the envelope out of the context and replace it with a dummy.
132    ///
133    /// Note that after taking out the envelope, the envelope summary is incorrect.
134    pub(crate) fn take_envelope(&mut self) -> Box<Envelope> {
135        Box::new(self.envelope.take_items())
136    }
137
138    /// Update the context with envelope information.
139    ///
140    /// This updates the item summary as well as the event id.
141    pub fn update(&mut self) -> &mut Self {
142        self.context.summary = EnvelopeSummary::compute(self.envelope());
143        self
144    }
145
146    /// Retains or drops items based on the [`ItemAction`].
147    ///
148    ///
149    /// This method operates in place and preserves the order of the retained items.
150    pub fn retain_items<F>(&mut self, mut f: F)
151    where
152        F: FnMut(&mut Item) -> ItemAction,
153    {
154        let mut outcomes = Vec::new();
155        self.envelope.retain_items(|item| match f(item) {
156            ItemAction::Keep => true,
157            ItemAction::DropSilently => false,
158            ItemAction::Drop(outcome) => {
159                for (category, quantity) in item.quantities() {
160                    outcomes.push((outcome.clone(), category, quantity));
161                }
162
163                false
164            }
165        });
166        for (outcome, category, quantity) in outcomes {
167            self.track_outcome(outcome, category, quantity);
168        }
169        // TODO: once `update` is private, it should be called here.
170    }
171
172    /// Drops every item in the envelope.
173    pub fn drop_items_silently(&mut self) {
174        self.envelope.drop_items_silently();
175    }
176
177    /// Re-scopes this context to the given scoping.
178    pub fn scope(&mut self, scoping: Scoping) -> &mut Self {
179        self.context.scoping = scoping;
180        self
181    }
182
183    /// Removes event item(s) and logs an outcome.
184    ///
185    /// Note: This function relies on the envelope summary being correct.
186    pub fn reject_event(&mut self, outcome: Outcome) {
187        if let Some(event_category) = self.event_category() {
188            self.envelope.retain_items(|item| !item.creates_event());
189            if let Some(indexed) = event_category.index_category() {
190                self.track_outcome(outcome.clone(), indexed, 1);
191            }
192            self.track_outcome(outcome, event_category, 1);
193        }
194    }
195
196    /// Records an outcome scoped to this envelope's context.
197    ///
198    /// This managed envelope should be updated using [`update`](Self::update) soon after this
199    /// operation to ensure that subsequent outcomes are consistent.
200    pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
201        self.outcome_aggregator.send(TrackOutcome {
202            timestamp: self.received_at(),
203            scoping: self.context.scoping,
204            outcome,
205            event_id: self.envelope.event_id(),
206            remote_addr: self.meta().remote_addr(),
207            category,
208            // Quantities are usually `usize` which lets us go all the way to 64-bit on our
209            // machines, but the protocol and data store can only do 32-bit.
210            quantity: quantity as u32,
211        });
212    }
213
214    /// Accepts the envelope and drops the context.
215    ///
216    /// This should be called if the envelope has been accepted by the upstream, which means that
217    /// the responsibility for logging outcomes has been moved. This function will not log any
218    /// outcomes.
219    pub fn accept(mut self) {
220        if !self.context.done {
221            self.finish(RelayCounters::EnvelopeAccepted, Handling::Success);
222        }
223    }
224
225    /// Returns the data category of the event item in the envelope.
226    fn event_category(&self) -> Option<DataCategory> {
227        self.context.summary.event_category
228    }
229
230    /// Records rejection outcomes for all items stored in this context.
231    ///
232    /// This does not send outcomes for empty envelopes or request-only contexts.
233    pub fn reject(&mut self, outcome: Outcome) {
234        if self.context.done {
235            return;
236        }
237
238        // Errors are only logged for what we consider failed request handling. In other cases, we
239        // "expect" errors and log them as debug level.
240        let handling = Handling::from_outcome(&outcome);
241        match handling {
242            Handling::Success => relay_log::debug!("dropped envelope: {outcome}"),
243            Handling::Failure => {
244                let summary = &self.context.summary;
245
246                relay_log::error!(
247                    tags.project_key = self.scoping().project_key.to_string(),
248                    tags.has_attachments = summary.attachment_quantities.bytes() > 0,
249                    tags.has_sessions = summary.session_quantity > 0,
250                    tags.has_profiles = summary.profile_quantity.total > 0,
251                    tags.has_transactions = summary.secondary_transaction_quantity > 0,
252                    tags.has_span_metrics = summary.secondary_span_quantity > 0,
253                    tags.has_replays = summary.replay_quantity > 0,
254                    tags.has_user_reports = summary.user_report_quantity > 0,
255                    tags.has_trace_metrics = summary.trace_metric_quantity > 0,
256                    tags.has_checkins = summary.monitor_quantity > 0,
257                    tags.event_category = ?summary.event_category,
258                    cached_summary = ?summary,
259                    recomputed_summary = ?EnvelopeSummary::compute(self.envelope()),
260                    "dropped envelope: {outcome}"
261                );
262            }
263        }
264
265        for (category, quantity) in self.context.summary.quantities() {
266            self.track_outcome(outcome.clone(), category, quantity);
267        }
268
269        self.finish(RelayCounters::EnvelopeRejected, handling);
270    }
271
272    /// Returns scoping stored in this context.
273    pub fn scoping(&self) -> Scoping {
274        self.context.scoping
275    }
276
277    /// Returns the partition key, which is set on upstream requests in the `X-Sentry-Relay-Shard` header.
278    pub fn partition_key(&self) -> Option<u32> {
279        self.context.partition_key
280    }
281
282    /// Sets a new [`Self::partition_key`].
283    pub fn set_partition_key(&mut self, partition_key: Option<u32>) -> &mut Self {
284        self.context.partition_key = partition_key;
285        self
286    }
287
288    /// Returns the contained original request meta.
289    pub fn meta(&self) -> &RequestMeta {
290        self.envelope().meta()
291    }
292
293    /// Returns estimated size of this envelope.
294    ///
295    /// This is just an estimated size, which in reality can be somewhat bigger, depending on the
296    /// list of additional attributes allocated on all of the inner types.
297    ///
298    /// NOTE: Current implementation counts in only the size of the items payload and stack
299    /// allocated parts of [`Envelope`] and [`ManagedEnvelope`]. All the heap allocated fields
300    /// within early mentioned types are skipped.
301    pub fn estimated_size(&self) -> usize {
302        // Always round it up to next 1KB.
303        (f64::ceil(
304            (self.context.summary.payload_size + size_of::<Self>() + size_of::<Envelope>()) as f64
305                / 1000.,
306        ) * 1000.) as usize
307    }
308
309    /// Returns the time at which the envelope was received at this Relay.
310    ///
311    /// This is the date time equivalent to [`start_time`](Self::received_at).
312    pub fn received_at(&self) -> DateTime<Utc> {
313        self.envelope.received_at()
314    }
315
316    /// Returns the time elapsed in seconds since the envelope was received by this Relay.
317    ///
318    /// In case the elapsed time is negative, it is assumed that no time elapsed.
319    pub fn age(&self) -> Duration {
320        self.envelope.age()
321    }
322
323    /// Escape hatch for the [`super::Managed`] type, to make it possible to construct
324    /// from a managed envelope.
325    pub(super) fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
326        &self.outcome_aggregator
327    }
328
329    /// Resets inner state to ensure there's no more logging.
330    fn finish(&mut self, counter: RelayCounters, handling: Handling) {
331        relay_statsd::metric!(counter(counter) += 1, handling = handling.as_str());
332        relay_statsd::metric!(timer(RelayTimers::EnvelopeTotalTime) = self.age());
333
334        self.context.done = true;
335    }
336}
337
338impl Drop for ManagedEnvelope {
339    fn drop(&mut self) {
340        self.reject(Outcome::Invalid(DiscardReason::Internal));
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347    use bytes::Bytes;
348
349    #[test]
350    fn span_metrics_are_reported() {
351        let bytes =
352            Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#);
353        let envelope = Envelope::parse_bytes(bytes).unwrap();
354
355        let (outcome_aggregator, mut rx) = Addr::custom();
356        let mut env = ManagedEnvelope::new(envelope, outcome_aggregator);
357        env.context.summary.span_quantity = 123;
358        env.context.summary.secondary_span_quantity = 456;
359
360        env.reject(Outcome::Abuse);
361
362        rx.close();
363
364        let outcome = rx.blocking_recv().unwrap();
365        assert_eq!(outcome.category, DataCategory::Span);
366        assert_eq!(outcome.quantity, 123);
367        assert_eq!(outcome.outcome, Outcome::Abuse);
368
369        let outcome = rx.blocking_recv().unwrap();
370        assert_eq!(outcome.category, DataCategory::SpanIndexed);
371        assert_eq!(outcome.quantity, 123);
372        assert_eq!(outcome.outcome, Outcome::Abuse);
373
374        let outcome = rx.blocking_recv().unwrap();
375        assert_eq!(outcome.category, DataCategory::Span);
376        assert_eq!(outcome.quantity, 456);
377        assert_eq!(outcome.outcome, Outcome::Abuse);
378
379        assert!(rx.blocking_recv().is_none());
380    }
381}