Skip to main content

relay_server/managed/
managed.rs

1#[cfg(debug_assertions)]
2use std::collections::BTreeMap;
3use std::convert::Infallible;
4use std::fmt;
5use std::iter::FusedIterator;
6use std::mem::ManuallyDrop;
7use std::net::IpAddr;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use chrono::{DateTime, Utc};
12use itertools::Either;
13use relay_event_schema::protocol::EventId;
14use relay_quotas::{DataCategory, Scoping};
15use relay_system::Addr;
16use smallvec::SmallVec;
17
18use crate::Envelope;
19use crate::endpoints::common::BadStoreRequest;
20use crate::extractors::RequestMeta;
21use crate::managed::{Counted, ManagedEnvelope, Quantities};
22use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
23use crate::services::processor::ProcessingError;
24
25#[cfg(debug_assertions)]
26mod debug;
27#[cfg(test)]
28mod test;
29
30#[cfg(test)]
31pub use self::test::*;
32
33/// An error which can be extracted into an outcome.
34pub trait OutcomeError {
35    /// Produced error, without attached outcome.
36    type Error;
37
38    /// Consumes the error and returns an outcome and [`Self::Error`].
39    ///
40    /// Returning a `None` outcome should discard the item(s) silently.
41    fn consume(self) -> (Option<Outcome>, Self::Error);
42}
43
44impl OutcomeError for Outcome {
45    type Error = ();
46
47    fn consume(self) -> (Option<Outcome>, Self::Error) {
48        (self, ()).consume()
49    }
50}
51
52impl OutcomeError for Option<Outcome> {
53    type Error = ();
54
55    fn consume(self) -> (Option<Outcome>, Self::Error) {
56        (self, ()).consume()
57    }
58}
59
60impl<E> OutcomeError for (Outcome, E) {
61    type Error = E;
62
63    fn consume(self) -> (Option<Outcome>, Self::Error) {
64        (Some(self.0), self.1)
65    }
66}
67
68impl<E> OutcomeError for (Option<Outcome>, E) {
69    type Error = E;
70
71    fn consume(self) -> (Option<Outcome>, Self::Error) {
72        self
73    }
74}
75
76impl OutcomeError for ProcessingError {
77    type Error = Self;
78
79    fn consume(self) -> (Option<Outcome>, Self::Error) {
80        (self.to_outcome(), self)
81    }
82}
83
84impl OutcomeError for Infallible {
85    type Error = Self;
86
87    fn consume(self) -> (Option<Outcome>, Self::Error) {
88        match self {}
89    }
90}
91
92impl OutcomeError for BadStoreRequest {
93    type Error = Self;
94
95    fn consume(self) -> (Option<Outcome>, Self) {
96        (self.to_outcome(), self)
97    }
98}
99
100/// A wrapper type which ensures outcomes have been emitted for an error.
101///
102/// [`Managed`] wraps an error in [`Rejected`] once outcomes for have been emitted for the managed
103/// item.
104#[derive(Debug, Clone, Copy)]
105#[must_use = "a rejection must be propagated"]
106pub struct Rejected<T>(T);
107
108impl<T> Rejected<T> {
109    /// Extracts the underlying error.
110    pub fn into_inner(self) -> T {
111        self.0
112    }
113
114    /// Maps the rejected error to a different error.
115    pub fn map<F, S>(self, f: F) -> Rejected<S>
116    where
117        F: FnOnce(T) -> S,
118    {
119        Rejected(f(self.0))
120    }
121}
122
123impl<T> std::error::Error for Rejected<T>
124where
125    T: std::error::Error,
126{
127    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
128        self.0.source()
129    }
130}
131
132impl<T> std::fmt::Display for Rejected<T>
133where
134    T: std::fmt::Display,
135{
136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137        self.0.fmt(f)
138    }
139}
140
141impl<T> axum::response::IntoResponse for Rejected<T>
142where
143    T: axum::response::IntoResponse,
144{
145    fn into_response(self) -> axum::response::Response {
146        self.0.into_response()
147    }
148}
149
150/// The [`Managed`] wrapper ensures outcomes are correctly emitted for the contained item.
151pub struct Managed<T: Counted> {
152    value: T,
153    meta: Arc<Meta>,
154    done: AtomicBool,
155}
156
157impl Managed<Box<Envelope>> {
158    /// Creates a managed instance from an unmanaged envelope.
159    pub fn from_envelope(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
160        let meta = Arc::new(Meta {
161            outcome_aggregator,
162            received_at: envelope.received_at(),
163            scoping: envelope.meta().get_partial_scoping().into_scoping(),
164            event_id: envelope.event_id(),
165            remote_addr: envelope.meta().remote_addr(),
166        });
167
168        Self::from_parts(envelope, meta)
169    }
170}
171
172/// Helper trait to abstract over `Vec` and `SmallVec` in [`Managed::retain`].
173pub trait RetainMut<I> {
174    /// Retains only the elements specified by the predicate.
175    fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool);
176}
177
178impl<I> RetainMut<I> for Vec<I> {
179    fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool) {
180        Vec::retain_mut(self, f)
181    }
182}
183impl<I, const N: usize> RetainMut<I> for SmallVec<[I; N]> {
184    fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool) {
185        SmallVec::retain_mut(self, f)
186    }
187}
188
189impl<T: Counted> Managed<T> {
190    /// Creates new [`Managed`] instance with the provided `value` and metadata from a [`ManagedEnvelope`].
191    ///
192    /// The [`Managed`] instance, inherits all metadata from the passed [`ManagedEnvelope`],
193    /// like received time or scoping.
194    pub fn with_meta_from_managed_envelope(envelope: &ManagedEnvelope, value: T) -> Self {
195        Self::from_parts(
196            value,
197            Arc::new(Meta {
198                outcome_aggregator: envelope.outcome_aggregator().clone(),
199                received_at: envelope.received_at(),
200                scoping: envelope.scoping(),
201                event_id: envelope.envelope().event_id(),
202                remote_addr: envelope.meta().remote_addr(),
203            }),
204        )
205    }
206
207    /// Creates new [`Managed`] instance with the provided `value` and metadata from `request_meta`.
208    pub fn with_meta_from_request_meta(
209        request_meta: &RequestMeta,
210        outcome_aggregator: &Addr<TrackOutcome>,
211        value: T,
212    ) -> Self {
213        Self::from_parts(
214            value,
215            Arc::new(Meta {
216                outcome_aggregator: outcome_aggregator.clone(),
217                received_at: request_meta.received_at(),
218                scoping: request_meta.get_partial_scoping().into_scoping(),
219                event_id: None,
220                remote_addr: request_meta.remote_addr(),
221            }),
222        )
223    }
224
225    /// Creates another [`Managed`] instance, with a new value but shared metadata.
226    pub fn wrap<S>(&self, other: S) -> Managed<S>
227    where
228        S: Counted,
229    {
230        Managed::from_parts(other, Arc::clone(&self.meta))
231    }
232
233    /// Boxes the contained value.
234    pub fn boxed(self) -> Managed<Box<T>> {
235        self.map(|value, _| Box::new(value))
236    }
237
238    /// Original received timestamp.
239    pub fn received_at(&self) -> DateTime<Utc> {
240        self.meta.received_at
241    }
242
243    /// Scoping information stored in this context.
244    pub fn scoping(&self) -> Scoping {
245        self.meta.scoping
246    }
247
248    /// Updates the scoping stored in this context.
249    ///
250    /// Special care has to be taken when items contained in the managed instance also store a
251    /// scoping. Such a scoping will **not** be updated.
252    ///
253    /// Conversions between `Managed<Box<Envelope>>` and `ManagedEnvelope` transfer the scoping
254    /// correctly.
255    ///
256    /// See also: [`ManagedEnvelope::scope`].
257    pub fn scope(&mut self, scoping: Scoping) {
258        let meta = Arc::make_mut(&mut self.meta);
259        meta.scoping = scoping;
260    }
261
262    /// Merge [`Self`] with another [`Managed`] instance using a mapping function.
263    ///
264    /// The caller's closure is expected to merge `other`'s inner value into `self`'s inner value.
265    /// The outcome records of `self` are automatically offset by the records of `other`.
266    pub fn merge_with<S, F>(&mut self, other: Managed<S>, f: F)
267    where
268        S: Counted,
269        F: FnOnce(&mut T, S, &mut RecordKeeper),
270    {
271        self.modify(|s, records| {
272            for (category, quantity) in other.quantities() {
273                records.modify_by(category, quantity as isize);
274            }
275            other.accept(|o| f(s, o, records));
276        })
277    }
278
279    /// Splits [`Self`] into two other [`Managed`] items.
280    ///
281    /// The two resulting managed instances together are expected to have the same outcomes as the original instance..
282    /// Since splitting may introduce a new type of item, which some of the original
283    /// quantities are transferred to, there may be new additional data categories created.
284    pub fn split_once<F, S, U>(self, f: F) -> (Managed<S>, Managed<U>)
285    where
286        F: FnOnce(T, &mut RecordKeeper) -> (S, U),
287        S: Counted,
288        U: Counted,
289    {
290        debug_assert!(!self.is_done());
291
292        let (value, meta) = self.destructure();
293        let quantities = value.quantities();
294
295        let mut records = RecordKeeper::new(&meta, quantities);
296
297        let (a, b) = f(value, &mut records);
298
299        let mut quantities = a.quantities();
300        quantities.extend(b.quantities());
301        records.success(quantities);
302
303        (
304            Managed::from_parts(a, Arc::clone(&meta)),
305            Managed::from_parts(b, meta),
306        )
307    }
308
309    /// Splits [`Self`] into a variable amount if individual items.
310    ///
311    /// Useful when the current instance contains multiple items of the same type
312    /// and must be split into individually managed items.
313    pub fn split<F, I, S>(self, f: F) -> Split<I::IntoIter, I::Item>
314    where
315        F: FnOnce(T) -> I,
316        I: IntoIterator<Item = S>,
317        S: Counted,
318    {
319        self.split_with_context(|value| (f(value), ())).0
320    }
321
322    /// Splits [`Self`] into a variable amount if individual items.
323    ///
324    /// Like [`Self::split`] but also allows returning an untracked context,
325    /// a way of returning additional data when deconstructing the original item.
326    pub fn split_with_context<F, I, S, C>(self, f: F) -> (Split<I::IntoIter, I::Item>, C)
327    where
328        F: FnOnce(T) -> (I, C),
329        I: IntoIterator<Item = S>,
330        S: Counted,
331    {
332        debug_assert!(!self.is_done());
333
334        let (value, meta) = self.destructure();
335        #[cfg(debug_assertions)]
336        let quantities = value.quantities();
337
338        let (items, context) = f(value);
339
340        (
341            Split {
342                #[cfg(debug_assertions)]
343                quantities,
344                items: items.into_iter(),
345                meta,
346                exhausted: false,
347            },
348            context,
349        )
350    }
351
352    /// Filters individual items and emits outcomes for them if they are removed.
353    ///
354    /// This is particularly useful when the managed instance is a container of individual items,
355    /// which need to be processed or filtered on a case by case basis.
356    ///
357    /// # Examples:
358    ///
359    /// ```
360    /// # use relay_server::managed::{Counted, Managed, Quantities};
361    /// # #[derive(Copy, Clone)]
362    /// # struct Context<'a>(&'a u32);
363    /// # struct Item;
364    /// struct Items {
365    ///     items: Vec<Item>,
366    /// }
367    /// # impl Counted for Items {
368    /// #   fn quantities(&self) -> Quantities {
369    /// #       todo!()
370    /// #   }
371    /// # }
372    /// # impl Counted for Item {
373    /// #   fn quantities(&self) -> Quantities {
374    /// #       todo!()
375    /// #   }
376    /// # }
377    /// # type Error = std::convert::Infallible;
378    ///
379    /// fn process_items(items: &mut Managed<Items>, ctx: Context<'_>) {
380    ///     items.retain(|items| &mut items.items, |item, _| process(item, ctx));
381    /// }
382    ///
383    /// fn process(item: &mut Item, ctx: Context<'_>) -> Result<(), Error> {
384    ///     todo!()
385    /// }
386    /// ```
387    pub fn retain<S, I, U, E, V>(&mut self, select: S, mut retain: U)
388    where
389        S: FnOnce(&mut T) -> &mut V,
390        I: Counted,
391        U: FnMut(&mut I, &mut RecordKeeper<'_>) -> Result<(), E>,
392        E: OutcomeError,
393        V: RetainMut<I>,
394    {
395        self.retain_with_context(
396            |inner| (select(inner), &()),
397            |item, _, records| retain(item, records),
398        );
399    }
400
401    /// Filters individual items and emits outcomes for them if they are removed.
402    ///
403    /// Like [`Self::retain`], but it allows for an additional context extracted from the managed
404    /// object passed to the retain function.
405    ///
406    /// # Examples:
407    ///
408    /// ```
409    /// # use relay_server::managed::{Counted, Managed, Quantities};
410    /// # #[derive(Copy, Clone)]
411    /// # struct Context<'a>(&'a u32);
412    /// # struct Item;
413    /// struct Items {
414    ///     ty: String,
415    ///     items: Vec<Item>,
416    /// }
417    /// # impl Counted for Items {
418    /// #   fn quantities(&self) -> Quantities {
419    /// #       todo!()
420    /// #   }
421    /// # }
422    /// # impl Counted for Item {
423    /// #   fn quantities(&self) -> Quantities {
424    /// #       todo!()
425    /// #   }
426    /// # }
427    /// # type Error = std::convert::Infallible;
428    ///
429    /// fn process_items(items: &mut Managed<Items>, ctx: Context<'_>) {
430    ///     items.retain_with_context(|items| (&mut items.items, &items.ty), |item, ty, _| process(item, ty, ctx));
431    /// }
432    ///
433    /// fn process(item: &mut Item, ty: &str, ctx: Context<'_>) -> Result<(), Error> {
434    ///     todo!()
435    /// }
436    /// ```
437    pub fn retain_with_context<S, C, I, U, E, V>(&mut self, select: S, mut retain: U)
438    where
439        // Returning `&'a C` here is not optimal, ideally we return C here and express the correct
440        // bound of `C: 'a` but this is, to my knowledge, currently not possible to express in stable Rust.
441        //
442        // This is unfortunately a bit limiting but for most of our purposes it is enough.
443        for<'a> S: FnOnce(&'a mut T) -> (&'a mut V, &'a C),
444        I: Counted,
445        U: FnMut(&mut I, &C, &mut RecordKeeper<'_>) -> Result<(), E>,
446        E: OutcomeError,
447        V: RetainMut<I>,
448    {
449        self.modify(|inner, records| {
450            let (items, ctx) = select(inner);
451            items.retain_mut(|item| match retain(item, ctx, records) {
452                Ok(()) => true,
453                Err(err) => {
454                    records.reject_err(err, &*item);
455                    false
456                }
457            })
458        });
459    }
460
461    /// Maps a [`Managed<T>`] to [`Managed<S>`] by applying the mapping function `f`.
462    ///
463    /// Like [`Self::try_map`] but not fallible.
464    pub fn map<S, F>(self, f: F) -> Managed<S>
465    where
466        F: FnOnce(T, &mut RecordKeeper) -> S,
467        S: Counted,
468    {
469        self.try_map(move |inner, records| Ok::<_, Infallible>(f(inner, records)))
470            .unwrap_or_else(|e| match e.0 {})
471    }
472
473    /// Maps a [`Managed<T>`] to [`Managed<S>`] by applying the mapping function `f`.
474    ///
475    /// The mapping function gets access to a [`RecordKeeper`], to emit outcomes for partial
476    /// discards.
477    ///
478    /// If the mapping function returns an error, the entire (original) [`Self`] is rejected,
479    /// no partial outcomes are emitted.
480    pub fn try_map<S, F, E>(self, f: F) -> Result<Managed<S>, Rejected<E::Error>>
481    where
482        F: FnOnce(T, &mut RecordKeeper) -> Result<S, E>,
483        S: Counted,
484        E: OutcomeError,
485    {
486        debug_assert!(!self.is_done());
487
488        let (value, meta) = self.destructure();
489        let quantities = value.quantities();
490
491        let mut records = RecordKeeper::new(&meta, quantities);
492
493        match f(value, &mut records) {
494            Ok(value) => {
495                records.success(value.quantities());
496                Ok(Managed::from_parts(value, meta))
497            }
498            Err(err) => Err(records.failure(err)),
499        }
500    }
501
502    /// Gives mutable access to the contained value to modify it.
503    ///
504    /// Like [`Self::try_modify`] but not fallible.
505    pub fn modify<F>(&mut self, f: F)
506    where
507        F: FnOnce(&mut T, &mut RecordKeeper),
508    {
509        self.try_modify(move |inner, records| {
510            f(inner, records);
511            Ok::<_, Infallible>(())
512        })
513        .unwrap_or_else(|e| match e {})
514    }
515
516    /// Gives mutable access to the contained value to modify it.
517    ///
518    /// The modifying function gets access to a [`RecordKeeper`], to emit outcomes for partial
519    /// discards.
520    ///
521    /// If the modifying function returns an error, the entire (original) [`Self`] is rejected,
522    /// no partial outcomes are emitted.
523    pub fn try_modify<F, E>(&mut self, f: F) -> Result<(), Rejected<E::Error>>
524    where
525        F: FnOnce(&mut T, &mut RecordKeeper) -> Result<(), E>,
526        E: OutcomeError,
527    {
528        debug_assert!(!self.is_done());
529
530        let quantities = self.value.quantities();
531        let mut records = RecordKeeper::new(&self.meta, quantities);
532
533        match f(&mut self.value, &mut records) {
534            Ok(()) => {
535                records.success(self.value.quantities());
536                Ok(())
537            }
538            Err(err) => {
539                let err = records.failure(err);
540                self.done.store(true, Ordering::Relaxed);
541                Err(err)
542            }
543        }
544    }
545
546    /// Accepts the item of this managed instance.
547    ///
548    /// This should be called if the item has been or is about to be accepted by the upstream, which means that
549    /// the responsibility for logging outcomes has been moved. This function will not log any
550    /// outcomes.
551    ///
552    /// Like [`Self::try_accept`], but infallible.
553    pub fn accept<F, S>(self, f: F) -> S
554    where
555        F: FnOnce(T) -> S,
556    {
557        self.try_accept(|item| Ok::<_, Infallible>(f(item)))
558            .unwrap_or_else(|err| match err.0 {})
559    }
560
561    /// Accepts the item of this managed instance.
562    ///
563    /// This should be called if the item has been or is about to be accepted by the upstream.
564    ///
565    /// Outcomes are only emitted when the accepting closure returns an error, which means that
566    /// in the success case the responsibility for logging outcomes has been moved to the
567    /// caller/upstream.
568    pub fn try_accept<F, S, E>(self, f: F) -> Result<S, Rejected<E::Error>>
569    where
570        F: FnOnce(T) -> Result<S, E>,
571        E: OutcomeError,
572    {
573        debug_assert!(!self.is_done());
574
575        let (value, meta) = self.destructure();
576        let records = RecordKeeper::new(&meta, value.quantities());
577
578        match f(value) {
579            Ok(value) => {
580                records.accept();
581                Ok(value)
582            }
583            Err(err) => Err(records.failure(err)),
584        }
585    }
586
587    /// Rejects the entire [`Managed`] instance with an internal error.
588    ///
589    /// Internal errors should be reserved for uses where logical invariants are violated.
590    /// Cases which should never happen and always indicate a logical bug.
591    ///
592    /// This function will panic in debug builds, but discard the item
593    /// with an internal discard reason in release builds.
594    #[track_caller]
595    pub fn internal_error(&self, reason: &'static str) -> Rejected<()> {
596        relay_log::error!("internal error: {reason}");
597        debug_assert!(false, "internal error: {reason}");
598        self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
599    }
600
601    /// Rejects the entire [`Managed`] instance.
602    pub fn reject_err<E>(&self, error: E) -> Rejected<E::Error>
603    where
604        E: OutcomeError,
605    {
606        debug_assert!(!self.is_done());
607
608        let (outcome, error) = error.consume();
609        self.do_reject(outcome);
610        Rejected(error)
611    }
612
613    fn do_reject(&self, outcome: Option<Outcome>) {
614        // Always set the internal state to `done`, even if there is no outcome to be emitted.
615        // All bookkeeping has been done.
616        let is_done = self.done.fetch_or(true, Ordering::Relaxed);
617
618        // No outcome to emit, we're done.
619        let Some(outcome) = outcome else {
620            return;
621        };
622
623        // Only emit outcomes if we were not yet done.
624        //
625        // Callers should guard against accidentally calling `do_reject` when the `is_done` flag is
626        // already set, but internal uses (like `Drop`) can rely on this double emission
627        // prevention.
628        if !is_done {
629            for (category, quantity) in self.value.quantities() {
630                self.meta.track_outcome(outcome.clone(), category, quantity);
631            }
632        }
633    }
634
635    /// De-structures this managed instance into its own parts.
636    ///
637    /// While de-structured no outcomes will be emitted on drop.
638    ///
639    /// Currently no `Managed`, which already has outcomes emitted, should be de-structured
640    /// as this status is lost.
641    fn destructure(self) -> (T, Arc<Meta>) {
642        // SAFETY: this follows an approach mentioned in the RFC
643        // <https://github.com/rust-lang/rfcs/pull/3466> to move fields out of
644        // a type with a drop implementation.
645        //
646        // The original type is wrapped in a manual drop to prevent running the
647        // drop handler, afterwards all fields are moved out of the type.
648        //
649        // And the original type is forgotten, de-structuring the original type
650        // without running its drop implementation.
651        let this = ManuallyDrop::new(self);
652        let Managed { value, meta, done } = &*this;
653
654        let value = unsafe { std::ptr::read(value) };
655        let meta = unsafe { std::ptr::read(meta) };
656        let done = unsafe { std::ptr::read(done) };
657        // This is a current invariant, if we ever need to change the invariant,
658        // the done status should be preserved and returned instead.
659        debug_assert!(
660            !done.load(Ordering::Relaxed),
661            "a `done` managed should never be destructured"
662        );
663
664        (value, meta)
665    }
666
667    fn from_parts(value: T, meta: Arc<Meta>) -> Self {
668        Self {
669            value,
670            meta,
671            done: AtomicBool::new(false),
672        }
673    }
674
675    fn is_done(&self) -> bool {
676        self.done.load(Ordering::Relaxed)
677    }
678}
679
680impl<T: Counted> Drop for Managed<T> {
681    fn drop(&mut self) {
682        self.do_reject(Some(Outcome::Invalid(DiscardReason::Internal)));
683    }
684}
685
686impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
687    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
688        write!(f, "Managed<{}>[", std::any::type_name::<T>())?;
689        for (i, (category, quantity)) in self.value.quantities().iter().enumerate() {
690            if i > 0 {
691                write!(f, ",")?;
692            }
693            write!(f, "{category}:{quantity}")?;
694        }
695        write!(f, "](")?;
696        self.value.fmt(f)?;
697        write!(f, ")")
698    }
699}
700
701impl<T: Counted> Managed<Option<T>> {
702    /// Turns a managed option into an optional [`Managed`].
703    pub fn transpose(self) -> Option<Managed<T>> {
704        let (o, meta) = self.destructure();
705        o.map(|t| Managed::from_parts(t, meta))
706    }
707}
708
709impl<L: Counted, R: Counted> Managed<Either<L, R>> {
710    /// Turns a managed [`Either`] into an [`Either`] of [`Managed`].
711    pub fn transpose(self) -> Either<Managed<L>, Managed<R>> {
712        let (either, meta) = self.destructure();
713        match either {
714            Either::Left(value) => Either::Left(Managed::from_parts(value, meta)),
715            Either::Right(value) => Either::Right(Managed::from_parts(value, meta)),
716        }
717    }
718}
719
720impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
721    fn from(value: Managed<Box<Envelope>>) -> Self {
722        let (value, meta) = value.destructure();
723        let mut envelope = ManagedEnvelope::new(value, meta.outcome_aggregator.clone());
724        envelope.scope(meta.scoping);
725        envelope
726    }
727}
728
729impl<T: Counted> AsRef<T> for Managed<T> {
730    fn as_ref(&self) -> &T {
731        &self.value
732    }
733}
734
735impl<T: Counted> std::ops::Deref for Managed<T> {
736    type Target = T;
737
738    fn deref(&self) -> &Self::Target {
739        &self.value
740    }
741}
742
743/// Internal metadata attached with a [`Managed`] instance.
744#[derive(Debug, Clone)]
745struct Meta {
746    /// Outcome aggregator service.
747    outcome_aggregator: Addr<TrackOutcome>,
748    /// Received timestamp, when the contained payload/information was received.
749    ///
750    /// See also: [`crate::extractors::RequestMeta::received_at`].
751    received_at: DateTime<Utc>,
752    /// Data scoping information of the contained item.
753    scoping: Scoping,
754    /// Optional event id associated with the contained data.
755    event_id: Option<EventId>,
756    /// Optional remote addr from where the data was received from.
757    remote_addr: Option<IpAddr>,
758}
759
760impl Meta {
761    pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
762        self.outcome_aggregator.send(TrackOutcome {
763            timestamp: self.received_at,
764            scoping: self.scoping,
765            outcome,
766            event_id: self.event_id,
767            remote_addr: self.remote_addr,
768            category,
769            quantity: quantity.try_into().unwrap_or(u32::MAX),
770        });
771    }
772}
773
774/// A record keeper makes sure modifications done on a [`Managed`] item are all accounted for
775/// correctly.
776pub struct RecordKeeper<'a> {
777    meta: &'a Meta,
778    on_drop: Quantities,
779    #[cfg(debug_assertions)]
780    lenient: SmallVec<[DataCategory; 1]>,
781    #[cfg(debug_assertions)]
782    modifications: BTreeMap<DataCategory, isize>,
783    in_flight: SmallVec<[(DataCategory, usize, Option<Outcome>); 2]>,
784}
785
786impl<'a> RecordKeeper<'a> {
787    fn new(meta: &'a Meta, quantities: Quantities) -> Self {
788        Self {
789            meta,
790            on_drop: quantities,
791            #[cfg(debug_assertions)]
792            lenient: Default::default(),
793            #[cfg(debug_assertions)]
794            modifications: Default::default(),
795            in_flight: Default::default(),
796        }
797    }
798
799    /// Marking a data category as lenient exempts this category from outcome quantity validations.
800    ///
801    /// Consider using [`Self::modify_by`] instead.
802    ///
803    /// This can be used in cases where the quantity is knowingly modified, which is quite common
804    /// for data categories which count bytes.
805    pub fn lenient(&mut self, category: DataCategory) {
806        let _category = category;
807        #[cfg(debug_assertions)]
808        self.lenient.push(_category);
809    }
810
811    /// Modifies the expected count for a category.
812    ///
813    /// When extracting payloads category counts may expectedly change, these changes can be
814    /// tracked using this function.
815    ///
816    /// Prefer using [`Self::modify_by`] over [`Self::lenient`] as lenient completely disables
817    /// validation for the entire category.
818    pub fn modify_by(&mut self, category: DataCategory, offset: isize) {
819        let _category = category;
820        let _offset = offset;
821        #[cfg(debug_assertions)]
822        {
823            *self.modifications.entry(_category).or_default() += offset;
824        }
825    }
826
827    /// Finalizes all records and emits the necessary outcomes.
828    ///
829    /// This uses the quantities of the original item.
830    fn failure<E>(mut self, error: E) -> Rejected<E::Error>
831    where
832        E: OutcomeError,
833    {
834        let (outcome, error) = error.consume();
835
836        if let Some(outcome) = outcome {
837            for (category, quantity) in std::mem::take(&mut self.on_drop) {
838                self.meta.track_outcome(outcome.clone(), category, quantity);
839            }
840        }
841
842        Rejected(error)
843    }
844
845    /// Finalizes all records and asserts that no additional outcomes have been tracked.
846    ///
847    /// Unlike [`Self::success`], this method does not allow for intermediate or partial outcomes,
848    /// it also does not verify any outcomes.
849    ///
850    /// This method is useful for using the record keeper to track failure outcomes, either
851    /// explicit failures or panics.
852    fn accept(mut self) {
853        debug_assert!(
854            self.in_flight.is_empty(),
855            "records accepted, but intermediate outcomes tracked"
856        );
857        self.on_drop.clear();
858    }
859
860    /// Finalizes all records and emits the created outcomes.
861    ///
862    /// This only emits the outcomes that have been explicitly registered.
863    /// In a debug build, the function also ensure no outcomes have been missed by comparing
864    /// quantities of the item before and after.
865    fn success(mut self, new: Quantities) {
866        let original = std::mem::take(&mut self.on_drop);
867        self.assert_quantities(original, new);
868
869        self.on_drop.clear();
870        for (category, quantity, outcome) in std::mem::take(&mut self.in_flight) {
871            if let Some(outcome) = outcome {
872                self.meta.track_outcome(outcome, category, quantity);
873            }
874        }
875    }
876
877    /// Asserts that there have been no quantities lost.
878    ///
879    /// The original amount of quantities should match the new amount of quantities + all emitted
880    /// outcomes.
881    #[cfg(debug_assertions)]
882    fn assert_quantities(&self, original: Quantities, new: Quantities) {
883        macro_rules! emit {
884            ($category:expr, $($tt:tt)*) => {{
885                match self.lenient.contains(&$category) {
886                    // Certain categories are known to be not always correct,
887                    // they are logged instead.
888                    true => relay_log::debug!($($tt)*),
889                    false  => {
890                        relay_log::error!("Original: {original:?}");
891                        relay_log::error!("New: {new:?}");
892                        relay_log::error!("Modifications: {:?}", self.modifications);
893                        relay_log::error!("In Flight: {:?}", self.in_flight);
894                        panic!($($tt)*)
895                    }
896                }
897            }};
898        }
899
900        let mut sums = debug::Quantities::from(&original).0;
901        for (category, offset) in &self.modifications {
902            let v = sums.entry(*category).or_default();
903            match v.checked_add_signed(*offset) {
904                Some(result) => *v = result,
905                None => emit!(
906                    category,
907                    "Attempted to modify original quantity {v} into the negative ({offset})"
908                ),
909            }
910        }
911
912        for (category, quantity, outcome) in &self.in_flight {
913            match sums.get_mut(category) {
914                Some(c) if *c >= *quantity => *c -= *quantity,
915                Some(c) => emit!(
916                    category,
917                    "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there were only {c} items in the category originally"
918                ),
919                None => emit!(
920                    category,
921                    "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there never was an item in this category"
922                ),
923            }
924        }
925
926        for (category, quantity) in &new {
927            match sums.get_mut(category) {
928                Some(c) if *c >= *quantity => *c -= *quantity,
929                Some(c) => emit!(
930                    category,
931                    "New item has {quantity} items in category '{category}', but original (after emitted outcomes) only has {c} left"
932                ),
933                None => emit!(
934                    category,
935                    "New item has {quantity} items in category '{category}', but after emitted outcomes there are none left"
936                ),
937            }
938        }
939
940        for (category, quantity) in sums {
941            if quantity > 0 {
942                emit!(
943                    category,
944                    "Missing outcomes or mismatched quantity in category '{category}', off by {quantity}"
945                );
946            }
947        }
948    }
949
950    #[cfg(not(debug_assertions))]
951    fn assert_quantities(&self, _: Quantities, _: Quantities) {}
952}
953
954impl<'a> Drop for RecordKeeper<'a> {
955    fn drop(&mut self) {
956        for (category, quantity) in std::mem::take(&mut self.on_drop) {
957            self.meta.track_outcome(
958                Outcome::Invalid(DiscardReason::Internal),
959                category,
960                quantity,
961            );
962        }
963    }
964}
965
966impl RecordKeeper<'_> {
967    /// Rejects an item if the passed result is an error and returns a default value.
968    ///
969    /// Similar to [`Self::reject_err`], this emits the necessary outcomes for an
970    /// item, if there is an error.
971    pub fn or_default<T, E, Q>(&mut self, r: Result<T, E>, q: Q) -> T
972    where
973        T: Default,
974        E: OutcomeError,
975        Q: Counted,
976    {
977        match r {
978            Ok(result) => result,
979            Err(err) => {
980                self.reject_err(err, q);
981                T::default()
982            }
983        }
984    }
985
986    /// Rejects an item with an error.
987    ///
988    /// Makes sure the correct outcomes are tracked for the item, that is discarded due to an
989    /// error.
990    pub fn reject_err<E, Q>(&mut self, err: E, q: Q) -> E::Error
991    where
992        E: OutcomeError,
993        Q: Counted,
994    {
995        let (outcome, err) = err.consume();
996        for (category, quantity) in q.quantities() {
997            self.in_flight.push((category, quantity, outcome.clone()))
998        }
999        err
1000    }
1001
1002    /// Rejects an item with an internal error.
1003    ///
1004    /// See also: [`Managed::internal_error`].
1005    #[track_caller]
1006    pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
1007    where
1008        E: std::error::Error + 'static,
1009        Q: Counted,
1010    {
1011        relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
1012        debug_assert!(false, "internal error: {error}");
1013        self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
1014    }
1015}
1016
1017/// Iterator returned by [`Managed::split`].
1018pub struct Split<I, S>
1019where
1020    I: Iterator<Item = S>,
1021    S: Counted,
1022{
1023    #[cfg(debug_assertions)]
1024    quantities: Quantities,
1025    items: I,
1026    meta: Arc<Meta>,
1027    exhausted: bool,
1028}
1029
1030impl<I, S> Split<I, S>
1031where
1032    I: Iterator<Item = S>,
1033    S: Counted,
1034{
1035    /// Subtracts passed quantities from the total quantities to verify total quantity counts are
1036    /// matching.
1037    #[cfg(debug_assertions)]
1038    fn subtract(&mut self, q: Quantities) {
1039        for (category, quantities) in q {
1040            let Some(orig_quantities) = self
1041                .quantities
1042                .iter_mut()
1043                .find_map(|(c, q)| (*c == category).then_some(q))
1044            else {
1045                debug_assert!(
1046                    false,
1047                    "mismatching quantities, item split into category {category}, \
1048                    which originally was not present"
1049                );
1050                continue;
1051            };
1052
1053            if *orig_quantities >= quantities {
1054                *orig_quantities -= quantities;
1055            } else {
1056                debug_assert!(
1057                    false,
1058                    "in total more items produced in category {category} than originally available"
1059                );
1060            }
1061        }
1062    }
1063}
1064
1065impl<I, S> Iterator for Split<I, S>
1066where
1067    I: Iterator<Item = S>,
1068    S: Counted,
1069{
1070    type Item = Managed<S>;
1071
1072    fn next(&mut self) -> Option<Self::Item> {
1073        let next = match self.items.next() {
1074            Some(next) => next,
1075            None => {
1076                self.exhausted = true;
1077                return None;
1078            }
1079        };
1080
1081        #[cfg(debug_assertions)]
1082        self.subtract(next.quantities());
1083
1084        Some(Managed::from_parts(next, Arc::clone(&self.meta)))
1085    }
1086}
1087
1088impl<I, S> Drop for Split<I, S>
1089where
1090    I: Iterator<Item = S>,
1091    S: Counted,
1092{
1093    fn drop(&mut self) {
1094        // If the inner iterator was exhausted, no items should be remaining.
1095        #[cfg(debug_assertions)]
1096        if self.exhausted {
1097            for (category, quantities) in &self.quantities {
1098                debug_assert!(
1099                    *quantities == 0,
1100                    "items split, but still {quantities} remaining in category {category}"
1101                );
1102            }
1103        }
1104
1105        if self.exhausted {
1106            return;
1107        }
1108
1109        // There may be items remaining in the iterator for multiple reasons:
1110        // - there was a panic
1111        // - the iterator was never fully consumed
1112        //
1113        // In any case, outcomes must be emitted for the remaining items.
1114        for item in &mut self.items {
1115            for (category, quantity) in item.quantities() {
1116                self.meta.track_outcome(
1117                    Outcome::Invalid(DiscardReason::Internal),
1118                    category,
1119                    quantity,
1120                );
1121            }
1122        }
1123    }
1124}
1125
1126impl<I, S> FusedIterator for Split<I, S>
1127where
1128    I: Iterator<Item = S> + FusedIterator,
1129    S: Counted,
1130{
1131}
1132
1133#[cfg(test)]
1134mod tests {
1135    use super::*;
1136
1137    struct CountedVec(Vec<u32>);
1138
1139    impl Counted for CountedVec {
1140        fn quantities(&self) -> Quantities {
1141            smallvec::smallvec![(DataCategory::Error, self.0.len())]
1142        }
1143    }
1144
1145    struct CountedValue(u32);
1146
1147    impl Counted for CountedValue {
1148        fn quantities(&self) -> Quantities {
1149            smallvec::smallvec![(DataCategory::Error, 1)]
1150        }
1151    }
1152
1153    #[test]
1154    fn test_reject_err_no_outcome() {
1155        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1156        let (managed, mut handle) = Managed::for_test(value).build();
1157
1158        // Rejecting with no outcome, should not emit any outcomes.
1159        let _ = managed.reject_err((None, ()));
1160        handle.assert_no_outcomes();
1161
1162        // Now dropping the manged instance, should not record any (internal) outcomes either.
1163        drop(managed);
1164        handle.assert_no_outcomes();
1165    }
1166
1167    #[test]
1168    fn test_merge() {
1169        let (mut a, mut handle_a) = Managed::for_test(CountedVec(vec![1, 2])).build();
1170        let (b, mut handle_b) = Managed::for_test(CountedVec(vec![3, 4])).build();
1171
1172        a.merge_with(b, |a, b, _| a.0.extend(b.0));
1173
1174        assert_eq!(a.0, vec![1, 2, 3, 4]);
1175        drop(a);
1176        handle_a.assert_internal_outcome(DataCategory::Error, 4);
1177        handle_b.assert_no_outcomes();
1178    }
1179
1180    #[test]
1181    fn test_merge_mismatched_records_should_panic() {
1182        let (mut a, mut handle_a) = Managed::for_test(CountedVec(vec![1, 2])).build();
1183        let (b, _handle_b) = Managed::for_test(CountedVec(vec![3, 4])).build();
1184
1185        let r = std::panic::catch_unwind(move || {
1186            a.merge_with(b, |_a, _b, _| {});
1187        });
1188
1189        assert!(
1190            r.is_err(),
1191            "expected merge to panic because of mismatched outcome records"
1192        );
1193        handle_a.assert_internal_outcome(DataCategory::Error, 2);
1194    }
1195
1196    #[test]
1197    fn test_split_fully_consumed() {
1198        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1199        let (managed, mut handle) = Managed::for_test(value).build();
1200
1201        let s = managed
1202            .split(|value| value.0.into_iter().map(CountedValue))
1203            // Fully consume the iterator to make sure there aren't any outcomes emitted on drop.
1204            .collect::<Vec<_>>();
1205
1206        handle.assert_no_outcomes();
1207
1208        for (i, s) in s.into_iter().enumerate() {
1209            assert_eq!(s.as_ref().0, i as u32);
1210            let outcome = Outcome::Invalid(DiscardReason::Cors);
1211            let _ = s.reject_err((outcome.clone(), ()));
1212            handle.assert_outcome(&outcome, DataCategory::Error, 1);
1213        }
1214    }
1215
1216    #[test]
1217    fn test_split_partially_consumed_emits_remaining() {
1218        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1219        let (managed, mut handle) = Managed::for_test(value).build();
1220
1221        let mut s = managed.split(|value| value.0.into_iter().map(CountedValue));
1222        handle.assert_no_outcomes();
1223
1224        drop(s.next());
1225        handle.assert_internal_outcome(DataCategory::Error, 1);
1226        drop(s.next());
1227        handle.assert_internal_outcome(DataCategory::Error, 1);
1228        drop(s.next());
1229        handle.assert_internal_outcome(DataCategory::Error, 1);
1230        handle.assert_no_outcomes();
1231
1232        drop(s);
1233
1234        handle.assert_internal_outcome(DataCategory::Error, 1);
1235        handle.assert_internal_outcome(DataCategory::Error, 1);
1236        handle.assert_internal_outcome(DataCategory::Error, 1);
1237    }
1238
1239    #[test]
1240    fn test_split_changing_quantities_should_panic() {
1241        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1242        let (managed, mut handle) = Managed::for_test(value).build();
1243
1244        let mut s = managed.split(|_| std::iter::once(CountedValue(0)));
1245
1246        s.next().unwrap().accept(|_| {});
1247        handle.assert_no_outcomes();
1248
1249        assert!(s.next().is_none());
1250
1251        let r = std::panic::catch_unwind(move || {
1252            drop(s);
1253        });
1254
1255        assert!(
1256            r.is_err(),
1257            "expected split to panic because of mismatched (not enough) outcomes"
1258        );
1259    }
1260
1261    #[test]
1262    fn test_split_more_outcomes_than_before_should_panic() {
1263        let value = CountedVec(vec![0]);
1264        let (managed, mut handle) = Managed::for_test(value).build();
1265
1266        let mut s = managed.split(|_| vec![CountedValue(0), CountedValue(2)].into_iter());
1267
1268        s.next().unwrap().accept(|_| {});
1269        handle.assert_no_outcomes();
1270
1271        let r = std::panic::catch_unwind(move || {
1272            s.next();
1273        });
1274
1275        assert!(
1276            r.is_err(),
1277            "expected split to panic because of mismatched (too many) outcomes"
1278        );
1279    }
1280
1281    #[test]
1282    fn test_split_changing_categories_should_panic() {
1283        struct Special;
1284        impl Counted for Special {
1285            fn quantities(&self) -> Quantities {
1286                smallvec::smallvec![(DataCategory::Error, 1), (DataCategory::Transaction, 1)]
1287            }
1288        }
1289
1290        let value = CountedVec(vec![0]);
1291        let (managed, _handle) = Managed::for_test(value).build();
1292
1293        let mut s = managed.split(|value| value.0.into_iter().map(|_| Special));
1294
1295        let r = std::panic::catch_unwind(move || {
1296            let _ = s.next();
1297        });
1298
1299        assert!(
1300            r.is_err(),
1301            "expected split to panic because of mismatched outcome categories"
1302        );
1303    }
1304
1305    #[test]
1306    fn test_split_assert_fused() {
1307        fn only_fused<T: FusedIterator>(_: T) {}
1308
1309        let (managed, mut handle) = Managed::for_test(CountedVec(vec![0])).build();
1310        only_fused(managed.split(|value| value.0.into_iter().map(CountedValue)));
1311        handle.assert_internal_outcome(DataCategory::Error, 1);
1312    }
1313}