relay_server/managed/
managed.rs

1#[cfg(debug_assertions)]
2use std::collections::BTreeMap;
3use std::convert::Infallible;
4use std::fmt;
5use std::iter::FusedIterator;
6use std::mem::ManuallyDrop;
7use std::net::IpAddr;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use chrono::{DateTime, Utc};
12use itertools::Either;
13use relay_event_schema::protocol::EventId;
14use relay_quotas::{DataCategory, Scoping};
15use relay_system::Addr;
16use smallvec::SmallVec;
17
18use crate::Envelope;
19use crate::managed::{Counted, ManagedEnvelope, Quantities};
20use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
21use crate::services::processor::ProcessingError;
22
23#[cfg(debug_assertions)]
24mod debug;
25#[cfg(test)]
26mod test;
27
28#[cfg(test)]
29pub use self::test::*;
30
31/// An error which can be extracted into an outcome.
32pub trait OutcomeError {
33    /// Produced error, without attached outcome.
34    type Error;
35
36    /// Consumes the error and returns an outcome and [`Self::Error`].
37    ///
38    /// Returning a `None` outcome should discard the item(s) silently.
39    fn consume(self) -> (Option<Outcome>, Self::Error);
40}
41
42impl OutcomeError for Outcome {
43    type Error = ();
44
45    fn consume(self) -> (Option<Outcome>, Self::Error) {
46        (self, ()).consume()
47    }
48}
49
50impl OutcomeError for Option<Outcome> {
51    type Error = ();
52
53    fn consume(self) -> (Option<Outcome>, Self::Error) {
54        (self, ()).consume()
55    }
56}
57
58impl<E> OutcomeError for (Outcome, E) {
59    type Error = E;
60
61    fn consume(self) -> (Option<Outcome>, Self::Error) {
62        (Some(self.0), self.1)
63    }
64}
65
66impl<E> OutcomeError for (Option<Outcome>, E) {
67    type Error = E;
68
69    fn consume(self) -> (Option<Outcome>, Self::Error) {
70        self
71    }
72}
73
74impl OutcomeError for ProcessingError {
75    type Error = Self;
76
77    fn consume(self) -> (Option<Outcome>, Self::Error) {
78        (self.to_outcome(), self)
79    }
80}
81
82impl OutcomeError for Infallible {
83    type Error = Self;
84
85    fn consume(self) -> (Option<Outcome>, Self::Error) {
86        match self {}
87    }
88}
89
90/// A wrapper type which ensures outcomes have been emitted for an error.
91///
92/// [`Managed`] wraps an error in [`Rejected`] once outcomes for have been emitted for the managed
93/// item.
94#[derive(Debug, Clone, Copy)]
95#[must_use = "a rejection must be propagated"]
96pub struct Rejected<T>(T);
97
98impl<T> Rejected<T> {
99    /// Extracts the underlying error.
100    pub fn into_inner(self) -> T {
101        self.0
102    }
103
104    /// Maps the rejected error to a different error.
105    pub fn map<F, S>(self, f: F) -> Rejected<S>
106    where
107        F: FnOnce(T) -> S,
108    {
109        Rejected(f(self.0))
110    }
111}
112
113impl<T> std::error::Error for Rejected<T>
114where
115    T: std::error::Error,
116{
117    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
118        self.0.source()
119    }
120}
121
122impl<T> std::fmt::Display for Rejected<T>
123where
124    T: std::fmt::Display,
125{
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        self.0.fmt(f)
128    }
129}
130
131impl<T> axum::response::IntoResponse for Rejected<T>
132where
133    T: axum::response::IntoResponse,
134{
135    fn into_response(self) -> axum::response::Response {
136        self.0.into_response()
137    }
138}
139
140/// The [`Managed`] wrapper ensures outcomes are correctly emitted for the contained item.
141pub struct Managed<T: Counted> {
142    value: T,
143    meta: Arc<Meta>,
144    done: AtomicBool,
145}
146
147impl Managed<Box<Envelope>> {
148    /// Creates a managed instance from an unmanaged envelope.
149    pub fn from_envelope(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
150        let meta = Arc::new(Meta {
151            outcome_aggregator,
152            received_at: envelope.received_at(),
153            scoping: envelope.meta().get_partial_scoping().into_scoping(),
154            event_id: envelope.event_id(),
155            remote_addr: envelope.meta().remote_addr(),
156        });
157
158        Self::from_parts(envelope, meta)
159    }
160}
161
162/// Helper trait to abstract over `Vec` and `SmallVec` in [`Managed::retain`].
163pub trait RetainMut<I> {
164    /// Retains only the elements specified by the predicate.
165    fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool);
166}
167
168impl<I> RetainMut<I> for Vec<I> {
169    fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool) {
170        Vec::retain_mut(self, f)
171    }
172}
173impl<I, const N: usize> RetainMut<I> for SmallVec<[I; N]> {
174    fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool) {
175        SmallVec::retain_mut(self, f)
176    }
177}
178
179impl<T: Counted> Managed<T> {
180    /// Creates new [`Managed`] instance with the provided `value` and metadata from a [`ManagedEnvelope`].
181    ///
182    /// The [`Managed`] instance, inherits all metadata from the passed [`ManagedEnvelope`],
183    /// like received time or scoping.
184    pub fn with_meta_from(envelope: &ManagedEnvelope, value: T) -> Self {
185        Self::from_parts(
186            value,
187            Arc::new(Meta {
188                outcome_aggregator: envelope.outcome_aggregator().clone(),
189                received_at: envelope.received_at(),
190                scoping: envelope.scoping(),
191                event_id: envelope.envelope().event_id(),
192                remote_addr: envelope.meta().remote_addr(),
193            }),
194        )
195    }
196
197    /// Creates another [`Managed`] instance, with a new value but shared metadata.
198    pub fn wrap<S>(&self, other: S) -> Managed<S>
199    where
200        S: Counted,
201    {
202        Managed::from_parts(other, Arc::clone(&self.meta))
203    }
204
205    /// Boxes the contained value.
206    pub fn boxed(self) -> Managed<Box<T>> {
207        self.map(|value, _| Box::new(value))
208    }
209
210    /// Original received timestamp.
211    pub fn received_at(&self) -> DateTime<Utc> {
212        self.meta.received_at
213    }
214
215    /// Scoping information stored in this context.
216    pub fn scoping(&self) -> Scoping {
217        self.meta.scoping
218    }
219
220    /// Updates the scoping stored in this context.
221    ///
222    /// Special care has to be taken when items contained in the managed instance also store a
223    /// scoping. Such a scoping will **not** be updated.
224    ///
225    /// Conversions between `Managed<Box<Envelope>>` and `ManagedEnvelope` transfer the scoping
226    /// correctly.
227    ///
228    /// See also: [`ManagedEnvelope::scope`].
229    pub fn scope(&mut self, scoping: Scoping) {
230        let meta = Arc::make_mut(&mut self.meta);
231        meta.scoping = scoping;
232    }
233
234    /// Splits [`Self`] into two other [`Managed`] items.
235    ///
236    /// The two resulting managed instances together are expected to have the same outcomes as the original instance..
237    /// Since splitting may introduce a new type of item, which some of the original
238    /// quantities are transferred to, there may be new additional data categories created.
239    pub fn split_once<F, S, U>(self, f: F) -> (Managed<S>, Managed<U>)
240    where
241        F: FnOnce(T, &mut RecordKeeper) -> (S, U),
242        S: Counted,
243        U: Counted,
244    {
245        debug_assert!(!self.is_done());
246
247        let (value, meta) = self.destructure();
248        let quantities = value.quantities();
249
250        let mut records = RecordKeeper::new(&meta, quantities);
251
252        let (a, b) = f(value, &mut records);
253
254        let mut quantities = a.quantities();
255        quantities.extend(b.quantities());
256        records.success(quantities);
257
258        (
259            Managed::from_parts(a, Arc::clone(&meta)),
260            Managed::from_parts(b, meta),
261        )
262    }
263
264    /// Splits [`Self`] into a variable amount if individual items.
265    ///
266    /// Useful when the current instance contains multiple items of the same type
267    /// and must be split into individually managed items.
268    pub fn split<F, I, S>(self, f: F) -> Split<I::IntoIter, I::Item>
269    where
270        F: FnOnce(T) -> I,
271        I: IntoIterator<Item = S>,
272        S: Counted,
273    {
274        self.split_with_context(|value| (f(value), ())).0
275    }
276
277    /// Splits [`Self`] into a variable amount if individual items.
278    ///
279    /// Like [`Self::split`] but also allows returning an untracked context,
280    /// a way of returning additional data when deconstructing the original item.
281    pub fn split_with_context<F, I, S, C>(self, f: F) -> (Split<I::IntoIter, I::Item>, C)
282    where
283        F: FnOnce(T) -> (I, C),
284        I: IntoIterator<Item = S>,
285        S: Counted,
286    {
287        debug_assert!(!self.is_done());
288
289        let (value, meta) = self.destructure();
290        #[cfg(debug_assertions)]
291        let quantities = value.quantities();
292
293        let (items, context) = f(value);
294
295        (
296            Split {
297                #[cfg(debug_assertions)]
298                quantities,
299                items: items.into_iter(),
300                meta,
301                exhausted: false,
302            },
303            context,
304        )
305    }
306
307    /// Filters individual items and emits outcomes for them if they are removed.
308    ///
309    /// This is particularly useful when the managed instance is a container of individual items,
310    /// which need to be processed or filtered on a case by case basis.
311    ///
312    /// # Examples:
313    ///
314    /// ```
315    /// # use relay_server::managed::{Counted, Managed, Quantities};
316    /// # #[derive(Copy, Clone)]
317    /// # struct Context<'a>(&'a u32);
318    /// # struct Item;
319    /// struct Items {
320    ///     items: Vec<Item>,
321    /// }
322    /// # impl Counted for Items {
323    /// #   fn quantities(&self) -> Quantities {
324    /// #       todo!()
325    /// #   }
326    /// # }
327    /// # impl Counted for Item {
328    /// #   fn quantities(&self) -> Quantities {
329    /// #       todo!()
330    /// #   }
331    /// # }
332    /// # type Error = std::convert::Infallible;
333    ///
334    /// fn process_items(items: &mut Managed<Items>, ctx: Context<'_>) {
335    ///     items.retain(|items| &mut items.items, |item, _| process(item, ctx));
336    /// }
337    ///
338    /// fn process(item: &mut Item, ctx: Context<'_>) -> Result<(), Error> {
339    ///     todo!()
340    /// }
341    /// ```
342    pub fn retain<S, I, U, E, V>(&mut self, select: S, mut retain: U)
343    where
344        S: FnOnce(&mut T) -> &mut V,
345        I: Counted,
346        U: FnMut(&mut I, &mut RecordKeeper<'_>) -> Result<(), E>,
347        E: OutcomeError,
348        V: RetainMut<I>,
349    {
350        self.retain_with_context(
351            |inner| (select(inner), &()),
352            |item, _, records| retain(item, records),
353        );
354    }
355
356    /// Filters individual items and emits outcomes for them if they are removed.
357    ///
358    /// Like [`Self::retain`], but it allows for an additional context extracted from the managed
359    /// object passed to the retain function.
360    ///
361    /// # Examples:
362    ///
363    /// ```
364    /// # use relay_server::managed::{Counted, Managed, Quantities};
365    /// # #[derive(Copy, Clone)]
366    /// # struct Context<'a>(&'a u32);
367    /// # struct Item;
368    /// struct Items {
369    ///     ty: String,
370    ///     items: Vec<Item>,
371    /// }
372    /// # impl Counted for Items {
373    /// #   fn quantities(&self) -> Quantities {
374    /// #       todo!()
375    /// #   }
376    /// # }
377    /// # impl Counted for Item {
378    /// #   fn quantities(&self) -> Quantities {
379    /// #       todo!()
380    /// #   }
381    /// # }
382    /// # type Error = std::convert::Infallible;
383    ///
384    /// fn process_items(items: &mut Managed<Items>, ctx: Context<'_>) {
385    ///     items.retain_with_context(|items| (&mut items.items, &items.ty), |item, ty, _| process(item, ty, ctx));
386    /// }
387    ///
388    /// fn process(item: &mut Item, ty: &str, ctx: Context<'_>) -> Result<(), Error> {
389    ///     todo!()
390    /// }
391    /// ```
392    pub fn retain_with_context<S, C, I, U, E, V>(&mut self, select: S, mut retain: U)
393    where
394        // Returning `&'a C` here is not optimal, ideally we return C here and express the correct
395        // bound of `C: 'a` but this is, to my knowledge, currently not possible to express in stable Rust.
396        //
397        // This is unfortunately a bit limiting but for most of our purposes it is enough.
398        for<'a> S: FnOnce(&'a mut T) -> (&'a mut V, &'a C),
399        I: Counted,
400        U: FnMut(&mut I, &C, &mut RecordKeeper<'_>) -> Result<(), E>,
401        E: OutcomeError,
402        V: RetainMut<I>,
403    {
404        self.modify(|inner, records| {
405            let (items, ctx) = select(inner);
406            items.retain_mut(|item| match retain(item, ctx, records) {
407                Ok(()) => true,
408                Err(err) => {
409                    records.reject_err(err, &*item);
410                    false
411                }
412            })
413        });
414    }
415
416    /// Maps a [`Managed<T>`] to [`Managed<S>`] by applying the mapping function `f`.
417    ///
418    /// Like [`Self::try_map`] but not fallible.
419    pub fn map<S, F>(self, f: F) -> Managed<S>
420    where
421        F: FnOnce(T, &mut RecordKeeper) -> S,
422        S: Counted,
423    {
424        self.try_map(move |inner, records| Ok::<_, Infallible>(f(inner, records)))
425            .unwrap_or_else(|e| match e.0 {})
426    }
427
428    /// Maps a [`Managed<T>`] to [`Managed<S>`] by applying the mapping function `f`.
429    ///
430    /// The mapping function gets access to a [`RecordKeeper`], to emit outcomes for partial
431    /// discards.
432    ///
433    /// If the mapping function returns an error, the entire (original) [`Self`] is rejected,
434    /// no partial outcomes are emitted.
435    pub fn try_map<S, F, E>(self, f: F) -> Result<Managed<S>, Rejected<E::Error>>
436    where
437        F: FnOnce(T, &mut RecordKeeper) -> Result<S, E>,
438        S: Counted,
439        E: OutcomeError,
440    {
441        debug_assert!(!self.is_done());
442
443        let (value, meta) = self.destructure();
444        let quantities = value.quantities();
445
446        let mut records = RecordKeeper::new(&meta, quantities);
447
448        match f(value, &mut records) {
449            Ok(value) => {
450                records.success(value.quantities());
451                Ok(Managed::from_parts(value, meta))
452            }
453            Err(err) => Err(records.failure(err)),
454        }
455    }
456
457    /// Gives mutable access to the contained value to modify it.
458    ///
459    /// Like [`Self::try_modify`] but not fallible.
460    pub fn modify<F>(&mut self, f: F)
461    where
462        F: FnOnce(&mut T, &mut RecordKeeper),
463    {
464        self.try_modify(move |inner, records| {
465            f(inner, records);
466            Ok::<_, Infallible>(())
467        })
468        .unwrap_or_else(|e| match e {})
469    }
470
471    /// Gives mutable access to the contained value to modify it.
472    ///
473    /// The modifying function gets access to a [`RecordKeeper`], to emit outcomes for partial
474    /// discards.
475    ///
476    /// If the modifying function returns an error, the entire (original) [`Self`] is rejected,
477    /// no partial outcomes are emitted.
478    pub fn try_modify<F, E>(&mut self, f: F) -> Result<(), Rejected<E::Error>>
479    where
480        F: FnOnce(&mut T, &mut RecordKeeper) -> Result<(), E>,
481        E: OutcomeError,
482    {
483        debug_assert!(!self.is_done());
484
485        let quantities = self.value.quantities();
486        let mut records = RecordKeeper::new(&self.meta, quantities);
487
488        match f(&mut self.value, &mut records) {
489            Ok(()) => {
490                records.success(self.value.quantities());
491                Ok(())
492            }
493            Err(err) => {
494                let err = records.failure(err);
495                self.done.store(true, Ordering::Relaxed);
496                Err(err)
497            }
498        }
499    }
500
501    /// Accepts the item of this managed instance.
502    ///
503    /// This should be called if the item has been or is about to be accepted by the upstream, which means that
504    /// the responsibility for logging outcomes has been moved. This function will not log any
505    /// outcomes.
506    ///
507    /// Like [`Self::try_accept`], but infallible.
508    pub fn accept<F, S>(self, f: F) -> S
509    where
510        F: FnOnce(T) -> S,
511    {
512        self.try_accept(|item| Ok::<_, Infallible>(f(item)))
513            .unwrap_or_else(|err| match err.0 {})
514    }
515
516    /// Accepts the item of this managed instance.
517    ///
518    /// This should be called if the item has been or is about to be accepted by the upstream.
519    ///
520    /// Outcomes are only emitted when the accepting closure returns an error, which means that
521    /// in the success case the responsibility for logging outcomes has been moved to the
522    /// caller/upstream.
523    pub fn try_accept<F, S, E>(self, f: F) -> Result<S, Rejected<E::Error>>
524    where
525        F: FnOnce(T) -> Result<S, E>,
526        E: OutcomeError,
527    {
528        debug_assert!(!self.is_done());
529
530        let (value, meta) = self.destructure();
531        let records = RecordKeeper::new(&meta, value.quantities());
532
533        match f(value) {
534            Ok(value) => {
535                records.accept();
536                Ok(value)
537            }
538            Err(err) => Err(records.failure(err)),
539        }
540    }
541
542    /// Rejects the entire [`Managed`] instance with an internal error.
543    ///
544    /// Internal errors should be reserved for uses where logical invariants are violated.
545    /// Cases which should never happen and always indicate a logical bug.
546    ///
547    /// This function will panic in debug builds, but discard the item
548    /// with an internal discard reason in release builds.
549    #[track_caller]
550    pub fn internal_error(&self, reason: &'static str) -> Rejected<()> {
551        relay_log::error!("internal error: {reason}");
552        debug_assert!(false, "internal error: {reason}");
553        self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
554    }
555
556    /// Rejects the entire [`Managed`] instance.
557    pub fn reject_err<E>(&self, error: E) -> Rejected<E::Error>
558    where
559        E: OutcomeError,
560    {
561        debug_assert!(!self.is_done());
562
563        let (outcome, error) = error.consume();
564        self.do_reject(outcome);
565        Rejected(error)
566    }
567
568    fn do_reject(&self, outcome: Option<Outcome>) {
569        // Always set the internal state to `done`, even if there is no outcome to be emitted.
570        // All bookkeeping has been done.
571        let is_done = self.done.fetch_or(true, Ordering::Relaxed);
572
573        // No outcome to emit, we're done.
574        let Some(outcome) = outcome else {
575            return;
576        };
577
578        // Only emit outcomes if we were not yet done.
579        //
580        // Callers should guard against accidentally calling `do_reject` when the `is_done` flag is
581        // already set, but internal uses (like `Drop`) can rely on this double emission
582        // prevention.
583        if !is_done {
584            for (category, quantity) in self.value.quantities() {
585                self.meta.track_outcome(outcome.clone(), category, quantity);
586            }
587        }
588    }
589
590    /// De-structures this managed instance into its own parts.
591    ///
592    /// While de-structured no outcomes will be emitted on drop.
593    ///
594    /// Currently no `Managed`, which already has outcomes emitted, should be de-structured
595    /// as this status is lost.
596    fn destructure(self) -> (T, Arc<Meta>) {
597        // SAFETY: this follows an approach mentioned in the RFC
598        // <https://github.com/rust-lang/rfcs/pull/3466> to move fields out of
599        // a type with a drop implementation.
600        //
601        // The original type is wrapped in a manual drop to prevent running the
602        // drop handler, afterwards all fields are moved out of the type.
603        //
604        // And the original type is forgotten, de-structuring the original type
605        // without running its drop implementation.
606        let this = ManuallyDrop::new(self);
607        let Managed { value, meta, done } = &*this;
608
609        let value = unsafe { std::ptr::read(value) };
610        let meta = unsafe { std::ptr::read(meta) };
611        let done = unsafe { std::ptr::read(done) };
612        // This is a current invariant, if we ever need to change the invariant,
613        // the done status should be preserved and returned instead.
614        debug_assert!(
615            !done.load(Ordering::Relaxed),
616            "a `done` managed should never be destructured"
617        );
618
619        (value, meta)
620    }
621
622    fn from_parts(value: T, meta: Arc<Meta>) -> Self {
623        Self {
624            value,
625            meta,
626            done: AtomicBool::new(false),
627        }
628    }
629
630    fn is_done(&self) -> bool {
631        self.done.load(Ordering::Relaxed)
632    }
633}
634
635impl<T: Counted> Drop for Managed<T> {
636    fn drop(&mut self) {
637        self.do_reject(Some(Outcome::Invalid(DiscardReason::Internal)));
638    }
639}
640
641impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
642    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
643        write!(f, "Managed<{}>[", std::any::type_name::<T>())?;
644        for (i, (category, quantity)) in self.value.quantities().iter().enumerate() {
645            if i > 0 {
646                write!(f, ",")?;
647            }
648            write!(f, "{category}:{quantity}")?;
649        }
650        write!(f, "](")?;
651        self.value.fmt(f)?;
652        write!(f, ")")
653    }
654}
655
656impl<T: Counted> Managed<Option<T>> {
657    /// Turns a managed option into an optional [`Managed`].
658    pub fn transpose(self) -> Option<Managed<T>> {
659        let (o, meta) = self.destructure();
660        o.map(|t| Managed::from_parts(t, meta))
661    }
662}
663
664impl<L: Counted, R: Counted> Managed<Either<L, R>> {
665    /// Turns a managed [`Either`] into an [`Either`] of [`Managed`].
666    pub fn transpose(self) -> Either<Managed<L>, Managed<R>> {
667        let (either, meta) = self.destructure();
668        match either {
669            Either::Left(value) => Either::Left(Managed::from_parts(value, meta)),
670            Either::Right(value) => Either::Right(Managed::from_parts(value, meta)),
671        }
672    }
673}
674
675impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
676    fn from(value: Managed<Box<Envelope>>) -> Self {
677        let (value, meta) = value.destructure();
678        let mut envelope = ManagedEnvelope::new(value, meta.outcome_aggregator.clone());
679        envelope.scope(meta.scoping);
680        envelope
681    }
682}
683
684impl<T: Counted> AsRef<T> for Managed<T> {
685    fn as_ref(&self) -> &T {
686        &self.value
687    }
688}
689
690impl<T: Counted> std::ops::Deref for Managed<T> {
691    type Target = T;
692
693    fn deref(&self) -> &Self::Target {
694        &self.value
695    }
696}
697
698/// Internal metadata attached with a [`Managed`] instance.
699#[derive(Debug, Clone)]
700struct Meta {
701    /// Outcome aggregator service.
702    outcome_aggregator: Addr<TrackOutcome>,
703    /// Received timestamp, when the contained payload/information was received.
704    ///
705    /// See also: [`crate::extractors::RequestMeta::received_at`].
706    received_at: DateTime<Utc>,
707    /// Data scoping information of the contained item.
708    scoping: Scoping,
709    /// Optional event id associated with the contained data.
710    event_id: Option<EventId>,
711    /// Optional remote addr from where the data was received from.
712    remote_addr: Option<IpAddr>,
713}
714
715impl Meta {
716    pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
717        self.outcome_aggregator.send(TrackOutcome {
718            timestamp: self.received_at,
719            scoping: self.scoping,
720            outcome,
721            event_id: self.event_id,
722            remote_addr: self.remote_addr,
723            category,
724            quantity: quantity.try_into().unwrap_or(u32::MAX),
725        });
726    }
727}
728
729/// A record keeper makes sure modifications done on a [`Managed`] item are all accounted for
730/// correctly.
731pub struct RecordKeeper<'a> {
732    meta: &'a Meta,
733    on_drop: Quantities,
734    #[cfg(debug_assertions)]
735    lenient: SmallVec<[DataCategory; 1]>,
736    #[cfg(debug_assertions)]
737    modifications: BTreeMap<DataCategory, isize>,
738    in_flight: SmallVec<[(DataCategory, usize, Option<Outcome>); 2]>,
739}
740
741impl<'a> RecordKeeper<'a> {
742    fn new(meta: &'a Meta, quantities: Quantities) -> Self {
743        Self {
744            meta,
745            on_drop: quantities,
746            #[cfg(debug_assertions)]
747            lenient: Default::default(),
748            #[cfg(debug_assertions)]
749            modifications: Default::default(),
750            in_flight: Default::default(),
751        }
752    }
753
754    /// Marking a data category as lenient exempts this category from outcome quantity validations.
755    ///
756    /// Consider using [`Self::modify_by`] instead.
757    ///
758    /// This can be used in cases where the quantity is knowingly modified, which is quite common
759    /// for data categories which count bytes.
760    pub fn lenient(&mut self, category: DataCategory) {
761        let _category = category;
762        #[cfg(debug_assertions)]
763        self.lenient.push(_category);
764    }
765
766    /// Modifies the expected count for a category.
767    ///
768    /// When extracting payloads category counts may expectedly change, these changes can be
769    /// tracked using this function.
770    ///
771    /// Prefer using [`Self::modify_by`] over [`Self::lenient`] as lenient completely disables
772    /// validation for the entire category.
773    pub fn modify_by(&mut self, category: DataCategory, offset: isize) {
774        let _category = category;
775        let _offset = offset;
776        #[cfg(debug_assertions)]
777        {
778            *self.modifications.entry(_category).or_default() += offset;
779        }
780    }
781
782    /// Finalizes all records and emits the necessary outcomes.
783    ///
784    /// This uses the quantities of the original item.
785    fn failure<E>(mut self, error: E) -> Rejected<E::Error>
786    where
787        E: OutcomeError,
788    {
789        let (outcome, error) = error.consume();
790
791        if let Some(outcome) = outcome {
792            for (category, quantity) in std::mem::take(&mut self.on_drop) {
793                self.meta.track_outcome(outcome.clone(), category, quantity);
794            }
795        }
796
797        Rejected(error)
798    }
799
800    /// Finalizes all records and asserts that no additional outcomes have been tracked.
801    ///
802    /// Unlike [`Self::success`], this method does not allow for intermediate or partial outcomes,
803    /// it also does not verify any outcomes.
804    ///
805    /// This method is useful for using the record keeper to track failure outcomes, either
806    /// explicit failures or panics.
807    fn accept(mut self) {
808        debug_assert!(
809            self.in_flight.is_empty(),
810            "records accepted, but intermediate outcomes tracked"
811        );
812        self.on_drop.clear();
813    }
814
815    /// Finalizes all records and emits the created outcomes.
816    ///
817    /// This only emits the outcomes that have been explicitly registered.
818    /// In a debug build, the function also ensure no outcomes have been missed by comparing
819    /// quantities of the item before and after.
820    fn success(mut self, new: Quantities) {
821        let original = std::mem::take(&mut self.on_drop);
822        self.assert_quantities(original, new);
823
824        self.on_drop.clear();
825        for (category, quantity, outcome) in std::mem::take(&mut self.in_flight) {
826            if let Some(outcome) = outcome {
827                self.meta.track_outcome(outcome, category, quantity);
828            }
829        }
830    }
831
832    /// Asserts that there have been no quantities lost.
833    ///
834    /// The original amount of quantities should match the new amount of quantities + all emitted
835    /// outcomes.
836    #[cfg(debug_assertions)]
837    fn assert_quantities(&self, original: Quantities, new: Quantities) {
838        macro_rules! emit {
839            ($category:expr, $($tt:tt)*) => {{
840                match self.lenient.contains(&$category) {
841                    // Certain categories are known to be not always correct,
842                    // they are logged instead.
843                    true => relay_log::debug!($($tt)*),
844                    false  => {
845                        relay_log::error!("Original: {original:?}");
846                        relay_log::error!("New: {new:?}");
847                        relay_log::error!("Modifications: {:?}", self.modifications);
848                        relay_log::error!("In Flight: {:?}", self.in_flight);
849                        panic!($($tt)*)
850                    }
851                }
852            }};
853        }
854
855        let mut sums = debug::Quantities::from(&original).0;
856        for (category, offset) in &self.modifications {
857            let v = sums.entry(*category).or_default();
858            match v.checked_add_signed(*offset) {
859                Some(result) => *v = result,
860                None => emit!(
861                    category,
862                    "Attempted to modify original quantity {v} into the negative ({offset})"
863                ),
864            }
865        }
866
867        for (category, quantity, outcome) in &self.in_flight {
868            match sums.get_mut(category) {
869                Some(c) if *c >= *quantity => *c -= *quantity,
870                Some(c) => emit!(
871                    category,
872                    "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there were only {c} items in the category originally"
873                ),
874                None => emit!(
875                    category,
876                    "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there never was an item in this category"
877                ),
878            }
879        }
880
881        for (category, quantity) in &new {
882            match sums.get_mut(category) {
883                Some(c) if *c >= *quantity => *c -= *quantity,
884                Some(c) => emit!(
885                    category,
886                    "New item has {quantity} items in category '{category}', but original (after emitted outcomes) only has {c} left"
887                ),
888                None => emit!(
889                    category,
890                    "New item has {quantity} items in category '{category}', but after emitted outcomes there are none left"
891                ),
892            }
893        }
894
895        for (category, quantity) in sums {
896            if quantity > 0 {
897                emit!(
898                    category,
899                    "Missing outcomes or mismatched quantity in category '{category}', off by {quantity}"
900                );
901            }
902        }
903    }
904
905    #[cfg(not(debug_assertions))]
906    fn assert_quantities(&self, _: Quantities, _: Quantities) {}
907}
908
909impl<'a> Drop for RecordKeeper<'a> {
910    fn drop(&mut self) {
911        for (category, quantity) in std::mem::take(&mut self.on_drop) {
912            self.meta.track_outcome(
913                Outcome::Invalid(DiscardReason::Internal),
914                category,
915                quantity,
916            );
917        }
918    }
919}
920
921impl RecordKeeper<'_> {
922    /// Rejects an item if the passed result is an error and returns a default value.
923    ///
924    /// Similar to [`Self::reject_err`], this emits the necessary outcomes for an
925    /// item, if there is an error.
926    pub fn or_default<T, E, Q>(&mut self, r: Result<T, E>, q: Q) -> T
927    where
928        T: Default,
929        E: OutcomeError,
930        Q: Counted,
931    {
932        match r {
933            Ok(result) => result,
934            Err(err) => {
935                self.reject_err(err, q);
936                T::default()
937            }
938        }
939    }
940
941    /// Rejects an item with an error.
942    ///
943    /// Makes sure the correct outcomes are tracked for the item, that is discarded due to an
944    /// error.
945    pub fn reject_err<E, Q>(&mut self, err: E, q: Q) -> E::Error
946    where
947        E: OutcomeError,
948        Q: Counted,
949    {
950        let (outcome, err) = err.consume();
951        for (category, quantity) in q.quantities() {
952            self.in_flight.push((category, quantity, outcome.clone()))
953        }
954        err
955    }
956
957    /// Rejects an item with an internal error.
958    ///
959    /// See also: [`Managed::internal_error`].
960    #[track_caller]
961    pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
962    where
963        E: std::error::Error + 'static,
964        Q: Counted,
965    {
966        relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
967        debug_assert!(false, "internal error: {error}");
968        self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
969    }
970}
971
972/// Iterator returned by [`Managed::split`].
973pub struct Split<I, S>
974where
975    I: Iterator<Item = S>,
976    S: Counted,
977{
978    #[cfg(debug_assertions)]
979    quantities: Quantities,
980    items: I,
981    meta: Arc<Meta>,
982    exhausted: bool,
983}
984
985impl<I, S> Split<I, S>
986where
987    I: Iterator<Item = S>,
988    S: Counted,
989{
990    /// Subtracts passed quantities from the total quantities to verify total quantity counts are
991    /// matching.
992    #[cfg(debug_assertions)]
993    fn subtract(&mut self, q: Quantities) {
994        for (category, quantities) in q {
995            let Some(orig_quantities) = self
996                .quantities
997                .iter_mut()
998                .find_map(|(c, q)| (*c == category).then_some(q))
999            else {
1000                debug_assert!(
1001                    false,
1002                    "mismatching quantities, item split into category {category}, \
1003                    which originally was not present"
1004                );
1005                continue;
1006            };
1007
1008            if *orig_quantities >= quantities {
1009                *orig_quantities -= quantities;
1010            } else {
1011                debug_assert!(
1012                    false,
1013                    "in total more items produced in category {category} than originally available"
1014                );
1015            }
1016        }
1017    }
1018}
1019
1020impl<I, S> Iterator for Split<I, S>
1021where
1022    I: Iterator<Item = S>,
1023    S: Counted,
1024{
1025    type Item = Managed<S>;
1026
1027    fn next(&mut self) -> Option<Self::Item> {
1028        let next = match self.items.next() {
1029            Some(next) => next,
1030            None => {
1031                self.exhausted = true;
1032                return None;
1033            }
1034        };
1035
1036        #[cfg(debug_assertions)]
1037        self.subtract(next.quantities());
1038
1039        Some(Managed::from_parts(next, Arc::clone(&self.meta)))
1040    }
1041}
1042
1043impl<I, S> Drop for Split<I, S>
1044where
1045    I: Iterator<Item = S>,
1046    S: Counted,
1047{
1048    fn drop(&mut self) {
1049        // If the inner iterator was exhausted, no items should be remaining.
1050        #[cfg(debug_assertions)]
1051        if self.exhausted {
1052            for (category, quantities) in &self.quantities {
1053                debug_assert!(
1054                    *quantities == 0,
1055                    "items split, but still {quantities} remaining in category {category}"
1056                );
1057            }
1058        }
1059
1060        if self.exhausted {
1061            return;
1062        }
1063
1064        // There may be items remaining in the iterator for multiple reasons:
1065        // - there was a panic
1066        // - the iterator was never fully consumed
1067        //
1068        // In any case, outcomes must be emitted for the remaining items.
1069        for item in &mut self.items {
1070            for (category, quantity) in item.quantities() {
1071                self.meta.track_outcome(
1072                    Outcome::Invalid(DiscardReason::Internal),
1073                    category,
1074                    quantity,
1075                );
1076            }
1077        }
1078    }
1079}
1080
1081impl<I, S> FusedIterator for Split<I, S>
1082where
1083    I: Iterator<Item = S> + FusedIterator,
1084    S: Counted,
1085{
1086}
1087
1088#[cfg(test)]
1089mod tests {
1090    use super::*;
1091
1092    struct CountedVec(Vec<u32>);
1093
1094    impl Counted for CountedVec {
1095        fn quantities(&self) -> Quantities {
1096            smallvec::smallvec![(DataCategory::Error, self.0.len())]
1097        }
1098    }
1099
1100    struct CountedValue(u32);
1101
1102    impl Counted for CountedValue {
1103        fn quantities(&self) -> Quantities {
1104            smallvec::smallvec![(DataCategory::Error, 1)]
1105        }
1106    }
1107
1108    #[test]
1109    fn test_reject_err_no_outcome() {
1110        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1111        let (managed, mut handle) = Managed::for_test(value).build();
1112
1113        // Rejecting with no outcome, should not emit any outcomes.
1114        let _ = managed.reject_err((None, ()));
1115        handle.assert_no_outcomes();
1116
1117        // Now dropping the manged instance, should not record any (internal) outcomes either.
1118        drop(managed);
1119        handle.assert_no_outcomes();
1120    }
1121
1122    #[test]
1123    fn test_split_fully_consumed() {
1124        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1125        let (managed, mut handle) = Managed::for_test(value).build();
1126
1127        let s = managed
1128            .split(|value| value.0.into_iter().map(CountedValue))
1129            // Fully consume the iterator to make sure there aren't any outcomes emitted on drop.
1130            .collect::<Vec<_>>();
1131
1132        handle.assert_no_outcomes();
1133
1134        for (i, s) in s.into_iter().enumerate() {
1135            assert_eq!(s.as_ref().0, i as u32);
1136            let outcome = Outcome::Invalid(DiscardReason::Cors);
1137            let _ = s.reject_err((outcome.clone(), ()));
1138            handle.assert_outcome(&outcome, DataCategory::Error, 1);
1139        }
1140    }
1141
1142    #[test]
1143    fn test_split_partially_consumed_emits_remaining() {
1144        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1145        let (managed, mut handle) = Managed::for_test(value).build();
1146
1147        let mut s = managed.split(|value| value.0.into_iter().map(CountedValue));
1148        handle.assert_no_outcomes();
1149
1150        drop(s.next());
1151        handle.assert_internal_outcome(DataCategory::Error, 1);
1152        drop(s.next());
1153        handle.assert_internal_outcome(DataCategory::Error, 1);
1154        drop(s.next());
1155        handle.assert_internal_outcome(DataCategory::Error, 1);
1156        handle.assert_no_outcomes();
1157
1158        drop(s);
1159
1160        handle.assert_internal_outcome(DataCategory::Error, 1);
1161        handle.assert_internal_outcome(DataCategory::Error, 1);
1162        handle.assert_internal_outcome(DataCategory::Error, 1);
1163    }
1164
1165    #[test]
1166    fn test_split_changing_quantities_should_panic() {
1167        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1168        let (managed, mut handle) = Managed::for_test(value).build();
1169
1170        let mut s = managed.split(|_| std::iter::once(CountedValue(0)));
1171
1172        s.next().unwrap().accept(|_| {});
1173        handle.assert_no_outcomes();
1174
1175        assert!(s.next().is_none());
1176
1177        let r = std::panic::catch_unwind(move || {
1178            drop(s);
1179        });
1180
1181        assert!(
1182            r.is_err(),
1183            "expected split to panic because of mismatched (not enough) outcomes"
1184        );
1185    }
1186
1187    #[test]
1188    fn test_split_more_outcomes_than_before_should_panic() {
1189        let value = CountedVec(vec![0]);
1190        let (managed, mut handle) = Managed::for_test(value).build();
1191
1192        let mut s = managed.split(|_| vec![CountedValue(0), CountedValue(2)].into_iter());
1193
1194        s.next().unwrap().accept(|_| {});
1195        handle.assert_no_outcomes();
1196
1197        let r = std::panic::catch_unwind(move || {
1198            s.next();
1199        });
1200
1201        assert!(
1202            r.is_err(),
1203            "expected split to panic because of mismatched (too many) outcomes"
1204        );
1205    }
1206
1207    #[test]
1208    fn test_split_changing_categories_should_panic() {
1209        struct Special;
1210        impl Counted for Special {
1211            fn quantities(&self) -> Quantities {
1212                smallvec::smallvec![(DataCategory::Error, 1), (DataCategory::Transaction, 1)]
1213            }
1214        }
1215
1216        let value = CountedVec(vec![0]);
1217        let (managed, _handle) = Managed::for_test(value).build();
1218
1219        let mut s = managed.split(|value| value.0.into_iter().map(|_| Special));
1220
1221        let r = std::panic::catch_unwind(move || {
1222            let _ = s.next();
1223        });
1224
1225        assert!(
1226            r.is_err(),
1227            "expected split to panic because of mismatched outcome categories"
1228        );
1229    }
1230
1231    #[test]
1232    fn test_split_assert_fused() {
1233        fn only_fused<T: FusedIterator>(_: T) {}
1234
1235        let (managed, mut handle) = Managed::for_test(CountedVec(vec![0])).build();
1236        only_fused(managed.split(|value| value.0.into_iter().map(CountedValue)));
1237        handle.assert_internal_outcome(DataCategory::Error, 1);
1238    }
1239}