relay_server/managed/
envelope.rs

1use std::fmt::{Debug, Display};
2use std::marker::PhantomData;
3use std::mem::size_of;
4use std::ops::{Deref, DerefMut};
5use std::time::Duration;
6
7use chrono::{DateTime, Utc};
8use relay_quotas::{DataCategory, Scoping};
9use relay_system::Addr;
10
11use crate::envelope::{Envelope, Item};
12use crate::extractors::RequestMeta;
13use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
14use crate::services::processor::{Processed, ProcessingGroup};
15use crate::statsd::{RelayCounters, RelayTimers};
16use crate::utils::EnvelopeSummary;
17
18/// Denotes the success of handling an envelope.
19#[derive(Clone, Copy, Debug)]
20enum Handling {
21    /// The envelope was handled successfully.
22    ///
23    /// This can be the case even if the envelpoe was dropped. For example, if a rate limit is in
24    /// effect or if the corresponding project is disabled.
25    Success,
26    /// Handling the envelope failed due to an error or bug.
27    Failure,
28}
29
30impl Handling {
31    fn from_outcome(outcome: &Outcome) -> Self {
32        if outcome.is_unexpected() {
33            Self::Failure
34        } else {
35            Self::Success
36        }
37    }
38
39    fn as_str(&self) -> &str {
40        match self {
41            Handling::Success => "success",
42            Handling::Failure => "failure",
43        }
44    }
45}
46
47/// Represents the decision on whether or not to keep an envelope item.
48#[derive(Debug, Clone)]
49pub enum ItemAction {
50    /// Keep the item.
51    Keep,
52    /// Drop the item and log an outcome for it.
53    Drop(Outcome),
54    /// Drop the item without logging an outcome.
55    DropSilently,
56}
57
58#[derive(Debug)]
59struct EnvelopeContext {
60    summary: EnvelopeSummary,
61    scoping: Scoping,
62    partition_key: Option<u32>,
63    done: bool,
64}
65
66/// Error emitted when converting a [`ManagedEnvelope`] and a processing group into a [`TypedEnvelope`].
67#[derive(Debug)]
68pub struct InvalidProcessingGroupType(pub ManagedEnvelope, pub ProcessingGroup);
69
70impl Display for InvalidProcessingGroupType {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.write_fmt(format_args!(
73            "failed to convert to the processing group {} based on the provided type",
74            self.1.variant()
75        ))
76    }
77}
78
79impl std::error::Error for InvalidProcessingGroupType {}
80
81/// A wrapper for [`ManagedEnvelope`] with assigned processing group type.
82pub struct TypedEnvelope<G>(ManagedEnvelope, PhantomData<G>);
83
84impl<G> TypedEnvelope<G> {
85    /// Changes the typed of the current envelope to processed.
86    ///
87    /// Once it's marked processed it can be submitted to upstream.
88    pub fn into_processed(self) -> TypedEnvelope<Processed> {
89        TypedEnvelope::new(self.0)
90    }
91
92    /// Accepts the envelope and drops the internal managed envelope with its context.
93    ///
94    /// This should be called if the envelope has been accepted by the upstream, which means that
95    /// the responsibility for logging outcomes has been moved. This function will not log any
96    /// outcomes.
97    pub fn accept(self) {
98        self.0.accept()
99    }
100
101    /// Creates a new typed envelope.
102    ///
103    /// Note: this method is private to make sure that only `TryFrom` implementation is used, which
104    /// requires the check for the error if conversion is failing.
105    fn new(managed_envelope: ManagedEnvelope) -> Self {
106        Self(managed_envelope, Default::default())
107    }
108}
109
110impl<G: TryFrom<ProcessingGroup>> TryFrom<(ManagedEnvelope, ProcessingGroup)> for TypedEnvelope<G> {
111    type Error = InvalidProcessingGroupType;
112    fn try_from(
113        (envelope, group): (ManagedEnvelope, ProcessingGroup),
114    ) -> Result<Self, Self::Error> {
115        match <ProcessingGroup as TryInto<G>>::try_into(group) {
116            Ok(_) => Ok(TypedEnvelope::new(envelope)),
117            Err(_) => Err(InvalidProcessingGroupType(envelope, group)),
118        }
119    }
120}
121
122impl<G> Debug for TypedEnvelope<G> {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        f.debug_tuple("TypedEnvelope").field(&self.0).finish()
125    }
126}
127
128impl<G> Deref for TypedEnvelope<G> {
129    type Target = ManagedEnvelope;
130
131    fn deref(&self) -> &Self::Target {
132        &self.0
133    }
134}
135
136impl<G> DerefMut for TypedEnvelope<G> {
137    fn deref_mut(&mut self) -> &mut Self::Target {
138        &mut self.0
139    }
140}
141
142/// Tracks the lifetime of an [`Envelope`] in Relay.
143///
144/// The managed envelope accompanies envelopes through the processing pipeline in Relay and ensures
145/// that outcomes are recorded when the Envelope is dropped. They can be dropped in one of three
146/// ways:
147///
148///  - By calling [`accept`](Self::accept). Responsibility of the envelope has been transferred to
149///    another service, and no further outcomes need to be recorded.
150///  - By calling [`reject`](Self::reject). The entire envelope was dropped, and the outcome
151///    specifies the reason.
152///  - By dropping the managed envelope. This indicates an issue or a bug and raises the
153///    `"internal"` outcome. There should be additional error handling to report an error to Sentry.
154///
155/// The managed envelope also holds a processing queue permit which is used for backpressure
156/// management. It is automatically reclaimed when the context is dropped along with the envelope.
157#[derive(Debug)]
158pub struct ManagedEnvelope {
159    envelope: Box<Envelope>,
160    context: EnvelopeContext,
161    outcome_aggregator: Addr<TrackOutcome>,
162}
163
164impl ManagedEnvelope {
165    /// Computes a managed envelope from the given envelope and binds it to the processing queue.
166    ///
167    /// To provide additional scoping, use [`ManagedEnvelope::scope`].
168    pub fn new(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
169        let meta = &envelope.meta();
170        let summary = EnvelopeSummary::compute(envelope.as_ref());
171        let scoping = meta.get_partial_scoping();
172
173        Self {
174            envelope,
175            context: EnvelopeContext {
176                summary,
177                scoping,
178                partition_key: None,
179                done: false,
180            },
181            outcome_aggregator,
182        }
183    }
184
185    /// An untracked instance which does not emit outcomes, useful for testing.
186    #[cfg(test)]
187    pub fn untracked(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
188        let mut envelope = Self::new(envelope, outcome_aggregator);
189        envelope.context.done = true;
190        envelope
191    }
192
193    /// Returns a reference to the contained [`Envelope`].
194    pub fn envelope(&self) -> &Envelope {
195        self.envelope.as_ref()
196    }
197
198    /// Returns a mutable reference to the contained [`Envelope`].
199    pub fn envelope_mut(&mut self) -> &mut Envelope {
200        self.envelope.as_mut()
201    }
202
203    /// Consumes itself returning the managed envelope.
204    pub fn into_envelope(mut self) -> Box<Envelope> {
205        self.context.done = true;
206        self.take_envelope()
207    }
208
209    /// Converts current managed envelope into processed envelope.
210    ///
211    /// Once it's marked processed it can be submitted to upstream.
212    pub fn into_processed(self) -> TypedEnvelope<Processed> {
213        TypedEnvelope::new(self)
214    }
215
216    /// Take the envelope out of the context and replace it with a dummy.
217    ///
218    /// Note that after taking out the envelope, the envelope summary is incorrect.
219    pub(crate) fn take_envelope(&mut self) -> Box<Envelope> {
220        Box::new(self.envelope.take_items())
221    }
222
223    /// Update the context with envelope information.
224    ///
225    /// This updates the item summary as well as the event id.
226    pub fn update(&mut self) -> &mut Self {
227        self.context.summary = EnvelopeSummary::compute(self.envelope());
228        self
229    }
230
231    /// Retains or drops items based on the [`ItemAction`].
232    ///
233    ///
234    /// This method operates in place and preserves the order of the retained items.
235    pub fn retain_items<F>(&mut self, mut f: F)
236    where
237        F: FnMut(&mut Item) -> ItemAction,
238    {
239        let mut outcomes = Vec::new();
240        self.envelope.retain_items(|item| match f(item) {
241            ItemAction::Keep => true,
242            ItemAction::DropSilently => false,
243            ItemAction::Drop(outcome) => {
244                for (category, quantity) in item.quantities() {
245                    if let Some(indexed) = category.index_category() {
246                        outcomes.push((outcome.clone(), indexed, quantity));
247                    };
248                    outcomes.push((outcome.clone(), category, quantity));
249                }
250
251                false
252            }
253        });
254        for (outcome, category, quantity) in outcomes {
255            self.track_outcome(outcome, category, quantity);
256        }
257        // TODO: once `update` is private, it should be called here.
258    }
259
260    /// Drops every item in the envelope.
261    pub fn drop_items_silently(&mut self) {
262        self.envelope.drop_items_silently();
263    }
264
265    /// Re-scopes this context to the given scoping.
266    pub fn scope(&mut self, scoping: Scoping) -> &mut Self {
267        self.context.scoping = scoping;
268        self
269    }
270
271    /// Removes event item(s) and logs an outcome.
272    ///
273    /// Note: This function relies on the envelope summary being correct.
274    pub fn reject_event(&mut self, outcome: Outcome) {
275        if let Some(event_category) = self.event_category() {
276            self.envelope.retain_items(|item| !item.creates_event());
277            if let Some(indexed) = event_category.index_category() {
278                self.track_outcome(outcome.clone(), indexed, 1);
279            }
280            self.track_outcome(outcome, event_category, 1);
281        }
282    }
283
284    /// Records an outcome scoped to this envelope's context.
285    ///
286    /// This managed envelope should be updated using [`update`](Self::update) soon after this
287    /// operation to ensure that subsequent outcomes are consistent.
288    pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
289        self.outcome_aggregator.send(TrackOutcome {
290            timestamp: self.received_at(),
291            scoping: self.context.scoping,
292            outcome,
293            event_id: self.envelope.event_id(),
294            remote_addr: self.meta().remote_addr(),
295            category,
296            // Quantities are usually `usize` which lets us go all the way to 64-bit on our
297            // machines, but the protocol and data store can only do 32-bit.
298            quantity: quantity as u32,
299        });
300    }
301
302    /// Accepts the envelope and drops the context.
303    ///
304    /// This should be called if the envelope has been accepted by the upstream, which means that
305    /// the responsibility for logging outcomes has been moved. This function will not log any
306    /// outcomes.
307    pub fn accept(mut self) {
308        if !self.context.done {
309            self.finish(RelayCounters::EnvelopeAccepted, Handling::Success);
310        }
311    }
312
313    /// Returns the data category of the event item in the envelope.
314    fn event_category(&self) -> Option<DataCategory> {
315        self.context.summary.event_category
316    }
317
318    /// Records rejection outcomes for all items stored in this context.
319    ///
320    /// This does not send outcomes for empty envelopes or request-only contexts.
321    pub fn reject(&mut self, outcome: Outcome) {
322        if self.context.done {
323            return;
324        }
325
326        // Errors are only logged for what we consider failed request handling. In other cases, we
327        // "expect" errors and log them as debug level.
328        let handling = Handling::from_outcome(&outcome);
329        match handling {
330            Handling::Success => relay_log::debug!("dropped envelope: {outcome}"),
331            Handling::Failure => {
332                let summary = &self.context.summary;
333
334                relay_log::error!(
335                    tags.project_key = self.scoping().project_key.to_string(),
336                    tags.has_attachments = summary.attachment_quantity > 0,
337                    tags.has_sessions = summary.session_quantity > 0,
338                    tags.has_profiles = summary.profile_quantity > 0,
339                    tags.has_transactions = summary.secondary_transaction_quantity > 0,
340                    tags.has_span_metrics = summary.secondary_span_quantity > 0,
341                    tags.has_replays = summary.replay_quantity > 0,
342                    tags.has_user_reports = summary.user_report_quantity > 0,
343                    tags.has_checkins = summary.monitor_quantity > 0,
344                    tags.event_category = ?summary.event_category,
345                    cached_summary = ?summary,
346                    recomputed_summary = ?EnvelopeSummary::compute(self.envelope()),
347                    "dropped envelope: {outcome}"
348                );
349            }
350        }
351
352        if let Some(category) = self.event_category() {
353            if let Some(category) = category.index_category() {
354                self.track_outcome(outcome.clone(), category, 1);
355            }
356            self.track_outcome(outcome.clone(), category, 1);
357        }
358
359        if self.context.summary.attachment_quantity > 0 {
360            self.track_outcome(
361                outcome.clone(),
362                DataCategory::Attachment,
363                self.context.summary.attachment_quantity,
364            );
365        }
366
367        if self.context.summary.monitor_quantity > 0 {
368            self.track_outcome(
369                outcome.clone(),
370                DataCategory::Monitor,
371                self.context.summary.monitor_quantity,
372            );
373        }
374
375        if self.context.summary.profile_quantity > 0 {
376            self.track_outcome(
377                outcome.clone(),
378                DataCategory::Profile,
379                self.context.summary.profile_quantity,
380            );
381            self.track_outcome(
382                outcome.clone(),
383                DataCategory::ProfileIndexed,
384                self.context.summary.profile_quantity,
385            );
386        }
387
388        if self.context.summary.span_quantity > 0 {
389            self.track_outcome(
390                outcome.clone(),
391                DataCategory::Span,
392                self.context.summary.span_quantity,
393            );
394            self.track_outcome(
395                outcome.clone(),
396                DataCategory::SpanIndexed,
397                self.context.summary.span_quantity,
398            );
399        }
400
401        if self.context.summary.log_item_quantity > 0 {
402            self.track_outcome(
403                outcome.clone(),
404                DataCategory::LogItem,
405                self.context.summary.log_item_quantity,
406            );
407        }
408        if self.context.summary.log_byte_quantity > 0 {
409            self.track_outcome(
410                outcome.clone(),
411                DataCategory::LogByte,
412                self.context.summary.log_byte_quantity,
413            );
414        }
415
416        // Track outcomes for attached secondary transactions, e.g. extracted from metrics.
417        //
418        // Primary transaction count is already tracked through the event category
419        // (see: `Self::event_category()`).
420        if self.context.summary.secondary_transaction_quantity > 0 {
421            self.track_outcome(
422                outcome.clone(),
423                // Secondary transaction counts are never indexed transactions
424                DataCategory::Transaction,
425                self.context.summary.secondary_transaction_quantity,
426            );
427        }
428
429        // Track outcomes for attached secondary spans, e.g. extracted from metrics.
430        //
431        // Primary span count is already tracked through `SpanIndexed`.
432        if self.context.summary.secondary_span_quantity > 0 {
433            self.track_outcome(
434                outcome.clone(),
435                // Secondary transaction counts are never indexed transactions
436                DataCategory::Span,
437                self.context.summary.secondary_span_quantity,
438            );
439        }
440
441        if self.context.summary.replay_quantity > 0 {
442            self.track_outcome(
443                outcome.clone(),
444                DataCategory::Replay,
445                self.context.summary.replay_quantity,
446            );
447        }
448
449        // Track outcomes for user reports, the legacy item type for user feedback.
450        //
451        // User reports are not events, but count toward UserReportV2 for quotas and outcomes.
452        if self.context.summary.user_report_quantity > 0 {
453            self.track_outcome(
454                outcome.clone(),
455                DataCategory::UserReportV2,
456                self.context.summary.user_report_quantity,
457            );
458        }
459
460        if self.context.summary.profile_chunk_quantity > 0 {
461            self.track_outcome(
462                outcome.clone(),
463                DataCategory::ProfileChunk,
464                self.context.summary.profile_chunk_quantity,
465            );
466        }
467
468        if self.context.summary.profile_chunk_ui_quantity > 0 {
469            self.track_outcome(
470                outcome.clone(),
471                DataCategory::ProfileChunkUi,
472                self.context.summary.profile_chunk_ui_quantity,
473            );
474        }
475
476        if self.context.summary.session_quantity > 0 {
477            self.track_outcome(
478                outcome.clone(),
479                DataCategory::Session,
480                self.context.summary.session_quantity,
481            );
482        }
483
484        self.finish(RelayCounters::EnvelopeRejected, handling);
485    }
486
487    /// Returns scoping stored in this context.
488    pub fn scoping(&self) -> Scoping {
489        self.context.scoping
490    }
491
492    /// Returns the partition key, which is set on upstream requests in the `X-Sentry-Relay-Shard` header.
493    pub fn partition_key(&self) -> Option<u32> {
494        self.context.partition_key
495    }
496
497    /// Sets a new [`Self::partition_key`].
498    pub fn set_partition_key(&mut self, partition_key: Option<u32>) -> &mut Self {
499        self.context.partition_key = partition_key;
500        self
501    }
502
503    /// Returns the contained original request meta.
504    pub fn meta(&self) -> &RequestMeta {
505        self.envelope().meta()
506    }
507
508    /// Returns estimated size of this envelope.
509    ///
510    /// This is just an estimated size, which in reality can be somewhat bigger, depending on the
511    /// list of additional attributes allocated on all of the inner types.
512    ///
513    /// NOTE: Current implementation counts in only the size of the items payload and stack
514    /// allocated parts of [`Envelope`] and [`ManagedEnvelope`]. All the heap allocated fields
515    /// within early mentioned types are skipped.
516    pub fn estimated_size(&self) -> usize {
517        // Always round it up to next 1KB.
518        (f64::ceil(
519            (self.context.summary.payload_size + size_of::<Self>() + size_of::<Envelope>()) as f64
520                / 1000.,
521        ) * 1000.) as usize
522    }
523
524    /// Returns the time at which the envelope was received at this Relay.
525    ///
526    /// This is the date time equivalent to [`start_time`](Self::received_at).
527    pub fn received_at(&self) -> DateTime<Utc> {
528        self.envelope.received_at()
529    }
530
531    /// Returns the time elapsed in seconds since the envelope was received by this Relay.
532    ///
533    /// In case the elapsed time is negative, it is assumed that no time elapsed.
534    pub fn age(&self) -> Duration {
535        self.envelope.age()
536    }
537
538    /// Escape hatch for the [`super::Managed`] type, to make it possible to construct
539    /// from a managed envelope.
540    pub(super) fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
541        &self.outcome_aggregator
542    }
543
544    /// Resets inner state to ensure there's no more logging.
545    fn finish(&mut self, counter: RelayCounters, handling: Handling) {
546        relay_statsd::metric!(counter(counter) += 1, handling = handling.as_str());
547        relay_statsd::metric!(timer(RelayTimers::EnvelopeTotalTime) = self.age());
548
549        self.context.done = true;
550    }
551}
552
553impl Drop for ManagedEnvelope {
554    fn drop(&mut self) {
555        self.reject(Outcome::Invalid(DiscardReason::Internal));
556    }
557}
558
559impl<G> From<TypedEnvelope<G>> for ManagedEnvelope {
560    fn from(value: TypedEnvelope<G>) -> Self {
561        value.0
562    }
563}
564
565#[cfg(test)]
566mod tests {
567    use super::*;
568    use bytes::Bytes;
569
570    #[test]
571    fn span_metrics_are_reported() {
572        let bytes =
573            Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#);
574        let envelope = Envelope::parse_bytes(bytes).unwrap();
575
576        let (outcome_aggregator, mut rx) = Addr::custom();
577        let mut env = ManagedEnvelope::new(envelope, outcome_aggregator);
578        env.context.summary.span_quantity = 123;
579        env.context.summary.secondary_span_quantity = 456;
580
581        env.reject(Outcome::Abuse);
582
583        rx.close();
584
585        let outcome = rx.blocking_recv().unwrap();
586        assert_eq!(outcome.category, DataCategory::Span);
587        assert_eq!(outcome.quantity, 123);
588        assert_eq!(outcome.outcome, Outcome::Abuse);
589
590        let outcome = rx.blocking_recv().unwrap();
591        assert_eq!(outcome.category, DataCategory::SpanIndexed);
592        assert_eq!(outcome.quantity, 123);
593        assert_eq!(outcome.outcome, Outcome::Abuse);
594
595        let outcome = rx.blocking_recv().unwrap();
596        assert_eq!(outcome.category, DataCategory::Span);
597        assert_eq!(outcome.quantity, 456);
598        assert_eq!(outcome.outcome, Outcome::Abuse);
599
600        assert!(rx.blocking_recv().is_none());
601    }
602}