relay_server/utils/
managed_envelope.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
//! Envelope context type and helpers to ensure outcomes.

use std::fmt::{Debug, Display};
use std::marker::PhantomData;
use std::mem::size_of;
use std::ops::{Deref, DerefMut};
use std::time::Duration;

use chrono::{DateTime, Utc};
use relay_quotas::{DataCategory, Scoping};
use relay_system::Addr;

use crate::envelope::{CountFor, Envelope, Item};
use crate::extractors::RequestMeta;
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::processor::{Processed, ProcessingGroup};
use crate::services::test_store::{Capture, TestStore};
use crate::statsd::{RelayCounters, RelayTimers};
use crate::utils::EnvelopeSummary;

/// Denotes the success of handling an envelope.
#[derive(Clone, Copy, Debug)]
enum Handling {
    /// The envelope was handled successfully.
    ///
    /// This can be the case even if the envelpoe was dropped. For example, if a rate limit is in
    /// effect or if the corresponding project is disabled.
    Success,
    /// Handling the envelope failed due to an error or bug.
    Failure,
}

impl Handling {
    fn from_outcome(outcome: &Outcome) -> Self {
        if outcome.is_unexpected() {
            Self::Failure
        } else {
            Self::Success
        }
    }

    fn as_str(&self) -> &str {
        match self {
            Handling::Success => "success",
            Handling::Failure => "failure",
        }
    }
}

/// Represents the decision on whether or not to keep an envelope item.
pub enum ItemAction {
    /// Keep the item.
    Keep,
    /// Drop the item and log an outcome for it.
    Drop(Outcome),
    /// Drop the item without logging an outcome.
    DropSilently,
}

#[derive(Debug)]
struct EnvelopeContext {
    summary: EnvelopeSummary,
    scoping: Scoping,
    partition_key: Option<u32>,
    done: bool,
    group: ProcessingGroup,
}

#[derive(Debug)]
pub struct InvalidProcessingGroupType(pub ManagedEnvelope);

impl Display for InvalidProcessingGroupType {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_fmt(format_args!(
            "failed to convert to the processing group {} based on the provided type",
            self.0.group().variant()
        ))
    }
}

impl std::error::Error for InvalidProcessingGroupType {}

/// A wrapper for [`ManagedEnvelope`] with assigned processing group type.
pub struct TypedEnvelope<G>(ManagedEnvelope, PhantomData<G>);

impl<G> TypedEnvelope<G> {
    /// Changes the typed of the current envelope to processed.
    ///
    /// Once it's marked processed it can be submitted to upstream.
    pub fn into_processed(self) -> TypedEnvelope<Processed> {
        TypedEnvelope::new(self.0, Processed)
    }

    /// Accepts the envelope and drops the internal managed envelope with its context.
    ///
    /// This should be called if the envelope has been accepted by the upstream, which means that
    /// the responsibility for logging outcomes has been moved. This function will not log any
    /// outcomes.
    pub fn accept(self) {
        self.0.accept()
    }

    /// Creates a new typed envelope.
    ///
    /// Note: this method is private to make sure that only `TryFrom` implementation is used, which
    /// requires the check for the error if conversion is failing.
    fn new(managed_envelope: ManagedEnvelope, _ty: G) -> Self {
        Self(managed_envelope, PhantomData::<G> {})
    }
}

impl<G: TryFrom<ProcessingGroup>> TryFrom<ManagedEnvelope> for TypedEnvelope<G> {
    type Error = InvalidProcessingGroupType;
    fn try_from(value: ManagedEnvelope) -> Result<Self, Self::Error> {
        match value.group().try_into() {
            Ok(group) => Ok(TypedEnvelope::new(value, group)),
            Err(_) => Err(InvalidProcessingGroupType(value)),
        }
    }
}

impl<G> Debug for TypedEnvelope<G> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_tuple("TypedEnvelope").field(&self.0).finish()
    }
}

impl<G> Deref for TypedEnvelope<G> {
    type Target = ManagedEnvelope;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl<G> DerefMut for TypedEnvelope<G> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.0
    }
}

/// Tracks the lifetime of an [`Envelope`] in Relay.
///
/// The managed envelope accompanies envelopes through the processing pipeline in Relay and ensures
/// that outcomes are recorded when the Envelope is dropped. They can be dropped in one of three
/// ways:
///
///  - By calling [`accept`](Self::accept). Responsibility of the envelope has been transferred to
///    another service, and no further outcomes need to be recorded.
///  - By calling [`reject`](Self::reject). The entire envelope was dropped, and the outcome
///    specifies the reason.
///  - By dropping the managed envelope. This indicates an issue or a bug and raises the
///    `"internal"` outcome. There should be additional error handling to report an error to Sentry.
///
/// The managed envelope also holds a processing queue permit which is used for backpressure
/// management. It is automatically reclaimed when the context is dropped along with the envelope.
#[derive(Debug)]
pub struct ManagedEnvelope {
    envelope: Box<Envelope>,
    context: EnvelopeContext,
    outcome_aggregator: Addr<TrackOutcome>,
    test_store: Addr<TestStore>,
}

impl ManagedEnvelope {
    /// Computes a managed envelope from the given envelope and binds it to the processing queue.
    ///
    /// To provide additional scoping, use [`ManagedEnvelope::scope`].
    pub fn new(
        envelope: Box<Envelope>,
        outcome_aggregator: Addr<TrackOutcome>,
        test_store: Addr<TestStore>,
        group: ProcessingGroup,
    ) -> Self {
        let meta = &envelope.meta();
        let summary = EnvelopeSummary::compute(envelope.as_ref());
        let scoping = meta.get_partial_scoping();

        Self {
            envelope,
            context: EnvelopeContext {
                summary,
                scoping,
                partition_key: None,
                done: false,
                group,
            },
            outcome_aggregator,
            test_store,
        }
    }

    #[cfg(test)]
    pub fn untracked(
        envelope: Box<Envelope>,
        outcome_aggregator: Addr<TrackOutcome>,
        test_store: Addr<TestStore>,
    ) -> Self {
        let mut envelope = Self::new(
            envelope,
            outcome_aggregator,
            test_store,
            ProcessingGroup::Ungrouped,
        );
        envelope.context.done = true;
        envelope
    }

    /// Returns a reference to the contained [`Envelope`].
    pub fn envelope(&self) -> &Envelope {
        self.envelope.as_ref()
    }

    /// Returns the [`ProcessingGroup`] where this envelope belongs to.
    pub fn group(&self) -> ProcessingGroup {
        self.context.group
    }

    /// Returns a mutable reference to the contained [`Envelope`].
    pub fn envelope_mut(&mut self) -> &mut Envelope {
        self.envelope.as_mut()
    }

    /// Consumes itself returning the managed envelope.
    pub fn into_envelope(mut self) -> Box<Envelope> {
        self.context.done = true;
        self.take_envelope()
    }

    /// Converts current managed envelope into processed envelope.
    ///
    /// Once it's marked processed it can be submitted to upstream.
    pub fn into_processed(self) -> TypedEnvelope<Processed> {
        TypedEnvelope::new(self, Processed)
    }

    /// Take the envelope out of the context and replace it with a dummy.
    ///
    /// Note that after taking out the envelope, the envelope summary is incorrect.
    pub(crate) fn take_envelope(&mut self) -> Box<Envelope> {
        Box::new(self.envelope.take_items())
    }

    /// Update the context with envelope information.
    ///
    /// This updates the item summary as well as the event id.
    pub fn update(&mut self) -> &mut Self {
        self.context.summary = EnvelopeSummary::compute(self.envelope());
        self
    }

    /// Retains or drops items based on the [`ItemAction`].
    ///
    ///
    /// This method operates in place and preserves the order of the retained items.
    pub fn retain_items<F>(&mut self, mut f: F)
    where
        F: FnMut(&mut Item) -> ItemAction,
    {
        let mut outcomes = Vec::new();
        self.envelope.retain_items(|item| match f(item) {
            ItemAction::Keep => true,
            ItemAction::DropSilently => false,
            ItemAction::Drop(outcome) => {
                for (category, quantity) in item.quantities(CountFor::Outcomes) {
                    if let Some(indexed) = category.index_category() {
                        outcomes.push((outcome.clone(), indexed, quantity));
                    };
                    outcomes.push((outcome.clone(), category, quantity));
                }

                false
            }
        });
        for (outcome, category, quantity) in outcomes {
            self.track_outcome(outcome, category, quantity);
        }
        // TODO: once `update` is private, it should be called here.
    }

    /// Drops every item in the envelope.
    pub fn drop_items_silently(&mut self) {
        self.envelope.drop_items_silently();
    }

    /// Re-scopes this context to the given scoping.
    pub fn scope(&mut self, scoping: Scoping) -> &mut Self {
        self.context.scoping = scoping;
        self
    }

    /// Removes event item(s) and logs an outcome.
    ///
    /// Note: This function relies on the envelope summary being correct.
    pub fn reject_event(&mut self, outcome: Outcome) {
        if let Some(event_category) = self.event_category() {
            self.envelope.retain_items(|item| !item.creates_event());
            if let Some(indexed) = event_category.index_category() {
                self.track_outcome(outcome.clone(), indexed, 1);
            }
            self.track_outcome(outcome, event_category, 1);
        }
    }

    /// Records an outcome scoped to this envelope's context.
    ///
    /// This managed envelope should be updated using [`update`](Self::update) soon after this
    /// operation to ensure that subsequent outcomes are consistent.
    pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
        self.outcome_aggregator.send(TrackOutcome {
            timestamp: self.received_at(),
            scoping: self.context.scoping,
            outcome,
            event_id: self.envelope.event_id(),
            remote_addr: self.meta().remote_addr(),
            category,
            // Quantities are usually `usize` which lets us go all the way to 64-bit on our
            // machines, but the protocol and data store can only do 32-bit.
            quantity: quantity as u32,
        });
    }

    /// Accepts the envelope and drops the context.
    ///
    /// This should be called if the envelope has been accepted by the upstream, which means that
    /// the responsibility for logging outcomes has been moved. This function will not log any
    /// outcomes.
    pub fn accept(mut self) {
        if !self.context.done {
            self.finish(RelayCounters::EnvelopeAccepted, Handling::Success);
        }
    }

    /// Returns the data category of the event item in the envelope.
    fn event_category(&self) -> Option<DataCategory> {
        self.context.summary.event_category
    }

    /// Records rejection outcomes for all items stored in this context.
    ///
    /// This does not send outcomes for empty envelopes or request-only contexts.
    pub fn reject(&mut self, outcome: Outcome) {
        if self.context.done {
            return;
        }

        // Errors are only logged for what we consider failed request handling. In other cases, we
        // "expect" errors and log them as debug level.
        let handling = Handling::from_outcome(&outcome);
        match handling {
            Handling::Success => relay_log::debug!("dropped envelope: {outcome}"),
            Handling::Failure => {
                let summary = &self.context.summary;

                relay_log::error!(
                    tags.project_key = self.scoping().project_key.to_string(),
                    tags.has_attachments = summary.attachment_quantity > 0,
                    tags.has_sessions = summary.session_quantity > 0,
                    tags.has_profiles = summary.profile_quantity > 0,
                    tags.has_transactions = summary.secondary_transaction_quantity > 0,
                    tags.has_span_metrics = summary.secondary_span_quantity > 0,
                    tags.has_replays = summary.replay_quantity > 0,
                    tags.has_checkins = summary.monitor_quantity > 0,
                    tags.event_category = ?summary.event_category,
                    cached_summary = ?summary,
                    recomputed_summary = ?EnvelopeSummary::compute(self.envelope()),
                    "dropped envelope: {outcome}"
                );
            }
        }

        // TODO: This could be optimized with Capture::should_capture
        self.test_store
            .send(Capture::rejected(self.envelope.event_id(), &outcome));

        if let Some(category) = self.event_category() {
            if let Some(category) = category.index_category() {
                self.track_outcome(outcome.clone(), category, 1);
            }
            self.track_outcome(outcome.clone(), category, 1);
        }

        if self.context.summary.attachment_quantity > 0 {
            self.track_outcome(
                outcome.clone(),
                DataCategory::Attachment,
                self.context.summary.attachment_quantity,
            );
        }

        if self.context.summary.profile_quantity > 0 {
            self.track_outcome(
                outcome.clone(),
                DataCategory::Profile,
                self.context.summary.profile_quantity,
            );
            self.track_outcome(
                outcome.clone(),
                DataCategory::ProfileIndexed,
                self.context.summary.profile_quantity,
            );
        }

        if self.context.summary.span_quantity > 0 {
            self.track_outcome(
                outcome.clone(),
                DataCategory::Span,
                self.context.summary.span_quantity,
            );
            self.track_outcome(
                outcome.clone(),
                DataCategory::SpanIndexed,
                self.context.summary.span_quantity,
            );
        }

        // Track outcomes for attached secondary transactions, e.g. extracted from metrics.
        //
        // Primary transaction count is already tracked through the event category
        // (see: `Self::event_category()`).
        if self.context.summary.secondary_transaction_quantity > 0 {
            self.track_outcome(
                outcome.clone(),
                // Secondary transaction counts are never indexed transactions
                DataCategory::Transaction,
                self.context.summary.secondary_transaction_quantity,
            );
        }

        // Track outcomes for attached secondary spans, e.g. extracted from metrics.
        //
        // Primary span count is already tracked through `SpanIndexed`.
        if self.context.summary.secondary_span_quantity > 0 {
            self.track_outcome(
                outcome.clone(),
                // Secondary transaction counts are never indexed transactions
                DataCategory::Span,
                self.context.summary.secondary_span_quantity,
            );
        }

        if self.context.summary.replay_quantity > 0 {
            self.track_outcome(
                outcome.clone(),
                DataCategory::Replay,
                self.context.summary.replay_quantity,
            );
        }

        if self.context.summary.profile_chunk_quantity > 0 {
            self.track_outcome(
                outcome.clone(),
                DataCategory::ProfileChunk,
                self.context.summary.profile_chunk_quantity,
            );
        }

        self.finish(RelayCounters::EnvelopeRejected, handling);
    }

    /// Returns scoping stored in this context.
    pub fn scoping(&self) -> Scoping {
        self.context.scoping
    }

    pub fn partition_key(&self) -> Option<u32> {
        self.context.partition_key
    }

    pub fn set_partition_key(&mut self, partition_key: Option<u32>) -> &mut Self {
        self.context.partition_key = partition_key;
        self
    }

    pub fn meta(&self) -> &RequestMeta {
        self.envelope().meta()
    }

    /// Returns estimated size of this envelope.
    ///
    /// This is just an estimated size, which in reality can be somewhat bigger, depending on the
    /// list of additional attributes allocated on all of the inner types.
    ///
    /// NOTE: Current implementation counts in only the size of the items payload and stack
    /// allocated parts of [`Envelope`] and [`ManagedEnvelope`]. All the heap allocated fields
    /// within early mentioned types are skipped.
    pub fn estimated_size(&self) -> usize {
        // Always round it up to next 1KB.
        (f64::ceil(
            (self.context.summary.payload_size + size_of::<Self>() + size_of::<Envelope>()) as f64
                / 1000.,
        ) * 1000.) as usize
    }

    /// Returns the time at which the envelope was received at this Relay.
    ///
    /// This is the date time equivalent to [`start_time`](Self::received_at).
    pub fn received_at(&self) -> DateTime<Utc> {
        self.envelope.received_at()
    }

    /// Returns the time elapsed in seconds since the envelope was received by this Relay.
    ///
    /// In case the elapsed time is negative, it is assumed that no time elapsed.
    pub fn age(&self) -> Duration {
        self.envelope.age()
    }

    /// Resets inner state to ensure there's no more logging.
    fn finish(&mut self, counter: RelayCounters, handling: Handling) {
        relay_statsd::metric!(counter(counter) += 1, handling = handling.as_str());
        relay_statsd::metric!(timer(RelayTimers::EnvelopeTotalTime) = self.age());

        self.context.done = true;
    }
}

impl Drop for ManagedEnvelope {
    fn drop(&mut self) {
        self.reject(Outcome::Invalid(DiscardReason::Internal));
    }
}

impl<G> From<TypedEnvelope<G>> for ManagedEnvelope {
    fn from(value: TypedEnvelope<G>) -> Self {
        value.0
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use bytes::Bytes;

    #[test]
    fn span_metrics_are_reported() {
        let bytes =
            Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#);
        let envelope = Envelope::parse_bytes(bytes).unwrap();

        let (test_store, _) = Addr::custom();
        let (outcome_aggregator, mut rx) = Addr::custom();
        let mut env = ManagedEnvelope::new(
            envelope,
            outcome_aggregator,
            test_store,
            ProcessingGroup::Ungrouped,
        );
        env.context.summary.span_quantity = 123;
        env.context.summary.secondary_span_quantity = 456;

        env.reject(Outcome::Abuse);

        rx.close();

        let outcome = rx.blocking_recv().unwrap();
        assert_eq!(outcome.category, DataCategory::Span);
        assert_eq!(outcome.quantity, 123);
        assert_eq!(outcome.outcome, Outcome::Abuse);

        let outcome = rx.blocking_recv().unwrap();
        assert_eq!(outcome.category, DataCategory::SpanIndexed);
        assert_eq!(outcome.quantity, 123);
        assert_eq!(outcome.outcome, Outcome::Abuse);

        let outcome = rx.blocking_recv().unwrap();
        assert_eq!(outcome.category, DataCategory::Span);
        assert_eq!(outcome.quantity, 456);
        assert_eq!(outcome.outcome, Outcome::Abuse);

        assert!(rx.blocking_recv().is_none());
    }
}