relay_server/managed/
managed.rs

1#[cfg(debug_assertions)]
2use std::collections::BTreeMap;
3use std::convert::Infallible;
4use std::fmt;
5use std::iter::FusedIterator;
6use std::mem::ManuallyDrop;
7use std::net::IpAddr;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use chrono::{DateTime, Utc};
12use relay_event_schema::protocol::EventId;
13use relay_quotas::{DataCategory, Scoping};
14use relay_system::Addr;
15use smallvec::SmallVec;
16
17use crate::Envelope;
18use crate::managed::{Counted, ManagedEnvelope, Quantities};
19use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
20use crate::services::processor::ProcessingError;
21
22#[cfg(debug_assertions)]
23mod debug;
24#[cfg(test)]
25mod test;
26
27#[cfg(test)]
28pub use self::test::*;
29
30/// An error which can be extracted into an outcome.
31pub trait OutcomeError {
32    /// Produced error, without attached outcome.
33    type Error;
34
35    /// Consumes the error and returns an outcome and [`Self::Error`].
36    ///
37    /// Returning a `None` outcome should discard the item(s) silently.
38    fn consume(self) -> (Option<Outcome>, Self::Error);
39}
40
41impl OutcomeError for Outcome {
42    type Error = ();
43
44    fn consume(self) -> (Option<Outcome>, Self::Error) {
45        (self, ()).consume()
46    }
47}
48
49impl<E> OutcomeError for (Outcome, E) {
50    type Error = E;
51
52    fn consume(self) -> (Option<Outcome>, Self::Error) {
53        (Some(self.0), self.1)
54    }
55}
56
57impl<E> OutcomeError for (Option<Outcome>, E) {
58    type Error = E;
59
60    fn consume(self) -> (Option<Outcome>, Self::Error) {
61        self
62    }
63}
64
65impl OutcomeError for ProcessingError {
66    type Error = Self;
67
68    fn consume(self) -> (Option<Outcome>, Self::Error) {
69        (self.to_outcome(), self)
70    }
71}
72
73impl OutcomeError for Infallible {
74    type Error = Self;
75
76    fn consume(self) -> (Option<Outcome>, Self::Error) {
77        match self {}
78    }
79}
80
81/// A wrapper type which ensures outcomes have been emitted for an error.
82///
83/// [`Managed`] wraps an error in [`Rejected`] once outcomes for have been emitted for the managed
84/// item.
85#[derive(Debug, Clone, Copy)]
86#[must_use = "a rejection must be propagated"]
87pub struct Rejected<T>(T);
88
89impl<T> Rejected<T> {
90    /// Extracts the underlying error.
91    pub fn into_inner(self) -> T {
92        self.0
93    }
94
95    /// Maps the rejected error to a different error.
96    pub fn map<F, S>(self, f: F) -> Rejected<S>
97    where
98        F: FnOnce(T) -> S,
99    {
100        Rejected(f(self.0))
101    }
102}
103
104impl<T> std::error::Error for Rejected<T>
105where
106    T: std::error::Error,
107{
108    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
109        self.0.source()
110    }
111}
112
113impl<T> std::fmt::Display for Rejected<T>
114where
115    T: std::fmt::Display,
116{
117    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118        self.0.fmt(f)
119    }
120}
121
122/// The [`Managed`] wrapper ensures outcomes are correctly emitted for the contained item.
123pub struct Managed<T: Counted> {
124    value: T,
125    meta: Arc<Meta>,
126    done: AtomicBool,
127}
128
129impl<T: Counted> Managed<T> {
130    /// Creates a new managed instance with a `value` from a [`ManagedEnvelope`].
131    ///
132    /// The [`Managed`] instance, inherits all metadata from the passed [`ManagedEnvelope`],
133    /// like received time or scoping.
134    pub fn from_envelope(envelope: &ManagedEnvelope, value: T) -> Self {
135        Self::from_parts(
136            value,
137            Arc::new(Meta {
138                outcome_aggregator: envelope.outcome_aggregator().clone(),
139                received_at: envelope.received_at(),
140                scoping: envelope.scoping(),
141                event_id: envelope.envelope().event_id(),
142                remote_addr: envelope.meta().remote_addr(),
143            }),
144        )
145    }
146
147    /// Creates another [`Managed`] instance, with a new value but shared metadata.
148    pub fn wrap<S>(&self, other: S) -> Managed<S>
149    where
150        S: Counted,
151    {
152        Managed::from_parts(other, Arc::clone(&self.meta))
153    }
154
155    /// Original received timestamp.
156    pub fn received_at(&self) -> DateTime<Utc> {
157        self.meta.received_at
158    }
159
160    /// Scoping information stored in this context.
161    pub fn scoping(&self) -> Scoping {
162        self.meta.scoping
163    }
164
165    /// Splits [`Self`] into two other [`Managed`] items.
166    ///
167    /// The two resulting managed instances together are expected to have the same outcomes as the original instance..
168    /// Since splitting may introduce a new type of item, which some of the original
169    /// quantities are transferred to, there may be new additional data categories created.
170    pub fn split_once<F, S, U>(self, f: F) -> (Managed<S>, Managed<U>)
171    where
172        F: FnOnce(T) -> (S, U),
173        S: Counted,
174        U: Counted,
175    {
176        debug_assert!(!self.is_done());
177
178        let (value, meta) = self.destructure();
179        #[cfg(debug_assertions)]
180        let quantities = value.quantities();
181
182        let (a, b) = f(value);
183
184        #[cfg(debug_assertions)]
185        debug::Quantities::from(&quantities)
186            // Instead of `assert_only_extra`, used for extracted metrics also counting
187            // in the `metric bucket` category, it may make sense to give the mapping function
188            // control over which categories to ignore, similar to the record keeper's lenient
189            // method.
190            .assert_only_extra(debug::Quantities::from(&a) + debug::Quantities::from(&b));
191
192        (
193            Managed::from_parts(a, Arc::clone(&meta)),
194            Managed::from_parts(b, meta),
195        )
196    }
197
198    /// Splits [`Self`] into a variable amount if individual items.
199    ///
200    /// Useful when the current instance contains multiple items of the same type
201    /// and must be split into individually managed items.
202    pub fn split<F, I, S>(self, f: F) -> Split<I::IntoIter, I::Item>
203    where
204        F: FnOnce(T) -> I,
205        I: IntoIterator<Item = S>,
206        S: Counted,
207    {
208        self.split_with_context(|value| (f(value), ())).0
209    }
210
211    /// Splits [`Self`] into a variable amount if individual items.
212    ///
213    /// Like [`Self::split`] but also allows returning an untracked context,
214    /// a way of returning additional data when deconstructing the original item.
215    pub fn split_with_context<F, I, S, C>(self, f: F) -> (Split<I::IntoIter, I::Item>, C)
216    where
217        F: FnOnce(T) -> (I, C),
218        I: IntoIterator<Item = S>,
219        S: Counted,
220    {
221        debug_assert!(!self.is_done());
222
223        let (value, meta) = self.destructure();
224        #[cfg(debug_assertions)]
225        let quantities = value.quantities();
226
227        let (items, context) = f(value);
228
229        (
230            Split {
231                #[cfg(debug_assertions)]
232                quantities,
233                items: items.into_iter(),
234                meta,
235                exhausted: false,
236            },
237            context,
238        )
239    }
240
241    /// Filters individual items and emits outcomes for them if they are removed.
242    ///
243    /// This is particularly useful when the managed instance is a container of individual items,
244    /// which need to be processed or filtered on a case by case basis.
245    ///
246    /// # Examples:
247    ///
248    /// ```
249    /// # use relay_server::managed::{Counted, Managed, Quantities};
250    /// # #[derive(Copy, Clone)]
251    /// # struct Context<'a>(&'a u32);
252    /// # struct Item;
253    /// struct Items {
254    ///     items: Vec<Item>,
255    /// }
256    /// # impl Counted for Items {
257    /// #   fn quantities(&self) -> Quantities {
258    /// #       todo!()
259    /// #   }
260    /// # }
261    /// # impl Counted for Item {
262    /// #   fn quantities(&self) -> Quantities {
263    /// #       todo!()
264    /// #   }
265    /// # }
266    /// # type Error = std::convert::Infallible;
267    ///
268    /// fn process_items(items: &mut Managed<Items>, ctx: Context<'_>) {
269    ///     items.retain(|items| &mut items.items, |item, _| process(item, ctx));
270    /// }
271    ///
272    /// fn process(item: &mut Item, ctx: Context<'_>) -> Result<(), Error> {
273    ///     todo!()
274    /// }
275    /// ```
276    pub fn retain<S, I, U, E>(&mut self, select: S, mut retain: U)
277    where
278        S: FnOnce(&mut T) -> &mut Vec<I>,
279        I: Counted,
280        U: FnMut(&mut I, &mut RecordKeeper<'_>) -> Result<(), E>,
281        E: OutcomeError,
282    {
283        self.retain_with_context(
284            |inner| (select(inner), &()),
285            |item, _, records| retain(item, records),
286        );
287    }
288
289    /// Filters individual items and emits outcomes for them if they are removed.
290    ///
291    /// Like [`Self::retain`], but it allows for an additional context extracted from the managed
292    /// object passed to the retain function.
293    ///
294    /// # Examples:
295    ///
296    /// ```
297    /// # use relay_server::managed::{Counted, Managed, Quantities};
298    /// # #[derive(Copy, Clone)]
299    /// # struct Context<'a>(&'a u32);
300    /// # struct Item;
301    /// struct Items {
302    ///     ty: String,
303    ///     items: Vec<Item>,
304    /// }
305    /// # impl Counted for Items {
306    /// #   fn quantities(&self) -> Quantities {
307    /// #       todo!()
308    /// #   }
309    /// # }
310    /// # impl Counted for Item {
311    /// #   fn quantities(&self) -> Quantities {
312    /// #       todo!()
313    /// #   }
314    /// # }
315    /// # type Error = std::convert::Infallible;
316    ///
317    /// fn process_items(items: &mut Managed<Items>, ctx: Context<'_>) {
318    ///     items.retain_with_context(|items| (&mut items.items, &items.ty), |item, ty, _| process(item, ty, ctx));
319    /// }
320    ///
321    /// fn process(item: &mut Item, ty: &str, ctx: Context<'_>) -> Result<(), Error> {
322    ///     todo!()
323    /// }
324    /// ```
325    pub fn retain_with_context<S, C, I, U, E>(&mut self, select: S, mut retain: U)
326    where
327        // Returning `&'a C` here is not optimal, ideally we return C here and express the correct
328        // bound of `C: 'a` but this is, to my knowledge, currently not possible to express in stable Rust.
329        //
330        // This is unfortunately a bit limiting but for most of our purposes it is enough.
331        for<'a> S: FnOnce(&'a mut T) -> (&'a mut Vec<I>, &'a C),
332        I: Counted,
333        U: FnMut(&mut I, &C, &mut RecordKeeper<'_>) -> Result<(), E>,
334        E: OutcomeError,
335    {
336        self.modify(|inner, records| {
337            let (items, ctx) = select(inner);
338            items.retain_mut(|item| match retain(item, ctx, records) {
339                Ok(()) => true,
340                Err(err) => {
341                    records.reject_err(err, &*item);
342                    false
343                }
344            })
345        });
346    }
347
348    /// Maps a [`Managed<T>`] to [`Managed<S>`] by applying the mapping function `f`.
349    ///
350    /// Like [`Self::try_map`] but not fallible.
351    pub fn map<S, F>(self, f: F) -> Managed<S>
352    where
353        F: FnOnce(T, &mut RecordKeeper) -> S,
354        S: Counted,
355    {
356        self.try_map(move |inner, records| Ok::<_, Infallible>(f(inner, records)))
357            .unwrap_or_else(|e| match e.0 {})
358    }
359
360    /// Maps a [`Managed<T>`] to [`Managed<S>`] by applying the mapping function `f`.
361    ///
362    /// The mapping function gets access to a [`RecordKeeper`], to emit outcomes for partial
363    /// discards.
364    ///
365    /// If the mapping function returns an error, the entire (original) [`Self`] is rejected,
366    /// no partial outcomes are emitted.
367    pub fn try_map<S, F, E>(self, f: F) -> Result<Managed<S>, Rejected<E::Error>>
368    where
369        F: FnOnce(T, &mut RecordKeeper) -> Result<S, E>,
370        S: Counted,
371        E: OutcomeError,
372    {
373        debug_assert!(!self.is_done());
374
375        let (value, meta) = self.destructure();
376        let quantities = value.quantities();
377
378        let mut records = RecordKeeper::new(&meta, quantities);
379
380        match f(value, &mut records) {
381            Ok(value) => {
382                records.success(value.quantities());
383                Ok(Managed::from_parts(value, meta))
384            }
385            Err(err) => Err(records.failure(err)),
386        }
387    }
388
389    /// Gives mutable access to the contained value to modify it.
390    ///
391    /// Like [`Self::try_modify`] but not fallible.
392    pub fn modify<F>(&mut self, f: F)
393    where
394        F: FnOnce(&mut T, &mut RecordKeeper),
395    {
396        self.try_modify(move |inner, records| {
397            f(inner, records);
398            Ok::<_, Infallible>(())
399        })
400        .unwrap_or_else(|e| match e {})
401    }
402
403    /// Gives mutable access to the contained value to modify it.
404    ///
405    /// The modifying function gets access to a [`RecordKeeper`], to emit outcomes for partial
406    /// discards.
407    ///
408    /// If the modifying function returns an error, the entire (original) [`Self`] is rejected,
409    /// no partial outcomes are emitted.
410    pub fn try_modify<F, E>(&mut self, f: F) -> Result<(), Rejected<E::Error>>
411    where
412        F: FnOnce(&mut T, &mut RecordKeeper) -> Result<(), E>,
413        E: OutcomeError,
414    {
415        debug_assert!(!self.is_done());
416
417        let quantities = self.value.quantities();
418        let mut records = RecordKeeper::new(&self.meta, quantities);
419
420        match f(&mut self.value, &mut records) {
421            Ok(()) => {
422                records.success(self.value.quantities());
423                Ok(())
424            }
425            Err(err) => {
426                let err = records.failure(err);
427                self.done.store(true, Ordering::Relaxed);
428                Err(err)
429            }
430        }
431    }
432
433    /// Accepts the item of this managed instance.
434    ///
435    /// This should be called if the item has been or is about to be accepted by the upstream, which means that
436    /// the responsibility for logging outcomes has been moved. This function will not log any
437    /// outcomes.
438    ///
439    /// Like [`Self::try_accept`], but infallible.
440    pub fn accept<F, S>(self, f: F) -> S
441    where
442        F: FnOnce(T) -> S,
443    {
444        self.try_accept(|item| Ok::<_, Infallible>(f(item)))
445            .unwrap_or_else(|err| match err.0 {})
446    }
447
448    /// Accepts the item of this managed instance.
449    ///
450    /// This should be called if the item has been or is about to be accepted by the upstream.
451    ///
452    /// Outcomes are only emitted when the accepting closure returns an error, which means that
453    /// in the success case the responsibility for logging outcomes has been moved to the
454    /// caller/upstream.
455    pub fn try_accept<F, S, E>(self, f: F) -> Result<S, Rejected<E::Error>>
456    where
457        F: FnOnce(T) -> Result<S, E>,
458        E: OutcomeError,
459    {
460        debug_assert!(!self.is_done());
461
462        let (value, meta) = self.destructure();
463        let records = RecordKeeper::new(&meta, value.quantities());
464
465        match f(value) {
466            Ok(value) => {
467                records.accept();
468                Ok(value)
469            }
470            Err(err) => Err(records.failure(err)),
471        }
472    }
473
474    /// Rejects the entire [`Managed`] instance with an internal error.
475    ///
476    /// Internal errors should be reserved for uses where logical invariants are violated.
477    /// Cases which should never happen and always indicate a logical bug.
478    ///
479    /// This function will panic in debug builds, but discard the item
480    /// with an internal discard reason in release builds.
481    #[track_caller]
482    pub fn internal_error(&self, reason: &'static str) -> Rejected<()> {
483        relay_log::error!("internal error: {reason}");
484        debug_assert!(false, "internal error: {reason}");
485        self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
486    }
487
488    /// Rejects the entire [`Managed`] instance.
489    pub fn reject_err<E>(&self, error: E) -> Rejected<E::Error>
490    where
491        E: OutcomeError,
492    {
493        debug_assert!(!self.is_done());
494
495        let (outcome, error) = error.consume();
496        self.do_reject(outcome);
497        Rejected(error)
498    }
499
500    fn do_reject(&self, outcome: Option<Outcome>) {
501        // Always set the internal state to `done`, even if there is no outcome to be emitted.
502        // All bookkeeping has been done.
503        let is_done = self.done.fetch_or(true, Ordering::Relaxed);
504
505        // No outcome to emit, we're done.
506        let Some(outcome) = outcome else {
507            return;
508        };
509
510        // Only emit outcomes if we were not yet done.
511        //
512        // Callers should guard against accidentally calling `do_reject` when the `is_done` flag is
513        // already set, but internal uses (like `Drop`) can rely on this double emission
514        // prevention.
515        if !is_done {
516            for (category, quantity) in self.value.quantities() {
517                self.meta.track_outcome(outcome.clone(), category, quantity);
518            }
519        }
520    }
521
522    /// De-structures this managed instance into its own parts.
523    ///
524    /// While de-structured no outcomes will be emitted on drop.
525    ///
526    /// Currently no `Managed`, which already has outcomes emitted, should be de-structured
527    /// as this status is lost.
528    fn destructure(self) -> (T, Arc<Meta>) {
529        // SAFETY: this follows an approach mentioned in the RFC
530        // <https://github.com/rust-lang/rfcs/pull/3466> to move fields out of
531        // a type with a drop implementation.
532        //
533        // The original type is wrapped in a manual drop to prevent running the
534        // drop handler, afterwards all fields are moved out of the type.
535        //
536        // And the original type is forgotten, de-structuring the original type
537        // without running its drop implementation.
538        let this = ManuallyDrop::new(self);
539        let Managed { value, meta, done } = &*this;
540
541        let value = unsafe { std::ptr::read(value) };
542        let meta = unsafe { std::ptr::read(meta) };
543        let done = unsafe { std::ptr::read(done) };
544        // This is a current invariant, if we ever need to change the invariant,
545        // the done status should be preserved and returned instead.
546        debug_assert!(
547            !done.load(Ordering::Relaxed),
548            "a `done` managed should never be destructured"
549        );
550
551        (value, meta)
552    }
553
554    fn from_parts(value: T, meta: Arc<Meta>) -> Self {
555        Self {
556            value,
557            meta,
558            done: AtomicBool::new(false),
559        }
560    }
561
562    fn is_done(&self) -> bool {
563        self.done.load(Ordering::Relaxed)
564    }
565}
566
567impl<T: Counted> Drop for Managed<T> {
568    fn drop(&mut self) {
569        self.do_reject(Some(Outcome::Invalid(DiscardReason::Internal)));
570    }
571}
572
573impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
574    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
575        write!(f, "Managed<{}>[", std::any::type_name::<T>())?;
576        for (i, (category, quantity)) in self.value.quantities().iter().enumerate() {
577            if i > 0 {
578                write!(f, ",")?;
579            }
580            write!(f, "{category}:{quantity}")?;
581        }
582        write!(f, "](")?;
583        self.value.fmt(f)?;
584        write!(f, ")")
585    }
586}
587
588impl<T: Counted> Managed<Option<T>> {
589    /// Turns a managed option into an optional [`Managed`].
590    pub fn transpose(self) -> Option<Managed<T>> {
591        let (o, meta) = self.destructure();
592        o.map(|t| Managed::from_parts(t, meta))
593    }
594}
595
596impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
597    fn from(value: Managed<Box<Envelope>>) -> Self {
598        let (value, meta) = value.destructure();
599        let mut envelope = ManagedEnvelope::new(value, meta.outcome_aggregator.clone());
600        envelope.scope(meta.scoping);
601        envelope
602    }
603}
604
605impl<T: Counted> AsRef<T> for Managed<T> {
606    fn as_ref(&self) -> &T {
607        &self.value
608    }
609}
610
611impl<T: Counted> std::ops::Deref for Managed<T> {
612    type Target = T;
613
614    fn deref(&self) -> &Self::Target {
615        &self.value
616    }
617}
618
619/// Internal metadata attached with a [`Managed`] instance.
620struct Meta {
621    /// Outcome aggregator service.
622    outcome_aggregator: Addr<TrackOutcome>,
623    /// Received timestamp, when the contained payload/information was received.
624    ///
625    /// See also: [`crate::extractors::RequestMeta::received_at`].
626    received_at: DateTime<Utc>,
627    /// Data scoping information of the contained item.
628    scoping: Scoping,
629    /// Optional event id associated with the contained data.
630    event_id: Option<EventId>,
631    /// Optional remote addr from where the data was received from.
632    remote_addr: Option<IpAddr>,
633}
634
635impl Meta {
636    pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
637        self.outcome_aggregator.send(TrackOutcome {
638            timestamp: self.received_at,
639            scoping: self.scoping,
640            outcome,
641            event_id: self.event_id,
642            remote_addr: self.remote_addr,
643            category,
644            quantity: quantity.try_into().unwrap_or(u32::MAX),
645        });
646    }
647}
648
649/// A record keeper makes sure modifications done on a [`Managed`] item are all accounted for
650/// correctly.
651pub struct RecordKeeper<'a> {
652    meta: &'a Meta,
653    on_drop: Quantities,
654    #[cfg(debug_assertions)]
655    lenient: SmallVec<[DataCategory; 1]>,
656    #[cfg(debug_assertions)]
657    modifications: BTreeMap<DataCategory, isize>,
658    in_flight: SmallVec<[(DataCategory, usize, Option<Outcome>); 2]>,
659}
660
661impl<'a> RecordKeeper<'a> {
662    fn new(meta: &'a Meta, quantities: Quantities) -> Self {
663        Self {
664            meta,
665            on_drop: quantities,
666            #[cfg(debug_assertions)]
667            lenient: Default::default(),
668            #[cfg(debug_assertions)]
669            modifications: Default::default(),
670            in_flight: Default::default(),
671        }
672    }
673
674    /// Marking a data category as lenient exempts this category from outcome quantity validations.
675    ///
676    /// Consider using [`Self::modify_by`] instead.
677    ///
678    /// This can be used in cases where the quantity is knowingly modified, which is quite common
679    /// for data categories which count bytes.
680    pub fn lenient(&mut self, category: DataCategory) {
681        let _category = category;
682        #[cfg(debug_assertions)]
683        self.lenient.push(_category);
684    }
685
686    /// Modifies the expected count for a category.
687    ///
688    /// When extracting payloads category counts may expectedly change, these changes can be
689    /// tracked using this function.
690    ///
691    /// Prefer using [`Self::modify_by`] over [`Self::lenient`] as lenient completely disables
692    /// validation for the entire category.
693    pub fn modify_by(&mut self, category: DataCategory, offset: isize) {
694        let _category = category;
695        let _offset = offset;
696        #[cfg(debug_assertions)]
697        {
698            *self.modifications.entry(_category).or_default() += offset;
699        }
700    }
701
702    /// Finalizes all records and emits the necessary outcomes.
703    ///
704    /// This uses the quantities of the original item.
705    fn failure<E>(mut self, error: E) -> Rejected<E::Error>
706    where
707        E: OutcomeError,
708    {
709        let (outcome, error) = error.consume();
710
711        if let Some(outcome) = outcome {
712            for (category, quantity) in std::mem::take(&mut self.on_drop) {
713                self.meta.track_outcome(outcome.clone(), category, quantity);
714            }
715        }
716
717        Rejected(error)
718    }
719
720    /// Finalizes all records and asserts that no additional outcomes have been tracked.
721    ///
722    /// Unlike [`Self::success`], this method does not allow for intermediate or partial outcomes,
723    /// it also does not verify any outcomes.
724    ///
725    /// This method is useful for using the record keeper to track failure outcomes, either
726    /// explicit failures or panics.
727    fn accept(mut self) {
728        debug_assert!(
729            self.in_flight.is_empty(),
730            "records accepted, but intermediate outcomes tracked"
731        );
732        self.on_drop.clear();
733    }
734
735    /// Finalizes all records and emits the created outcomes.
736    ///
737    /// This only emits the outcomes that have been explicitly registered.
738    /// In a debug build, the function also ensure no outcomes have been missed by comparing
739    /// quantities of the item before and after.
740    fn success(mut self, new: Quantities) {
741        let original = std::mem::take(&mut self.on_drop);
742        self.assert_quantities(original, new);
743
744        self.on_drop.clear();
745        for (category, quantity, outcome) in std::mem::take(&mut self.in_flight) {
746            if let Some(outcome) = outcome {
747                self.meta.track_outcome(outcome, category, quantity);
748            }
749        }
750    }
751
752    /// Asserts that there have been no quantities lost.
753    ///
754    /// The original amount of quantities should match the new amount of quantities + all emitted
755    /// outcomes.
756    #[cfg(debug_assertions)]
757    fn assert_quantities(&self, original: Quantities, new: Quantities) {
758        macro_rules! emit {
759            ($category:expr, $($tt:tt)*) => {{
760                match self.lenient.contains(&$category) {
761                    // Certain categories are known to be not always correct,
762                    // they are logged instead.
763                    true => relay_log::debug!($($tt)*),
764                    false  => {
765                        relay_log::error!("Original: {original:?}");
766                        relay_log::error!("New: {new:?}");
767                        relay_log::error!("Modifications: {:?}", self.modifications);
768                        relay_log::error!("In Flight: {:?}", self.in_flight);
769                        panic!($($tt)*)
770                    }
771                }
772            }};
773        }
774
775        let mut sums = debug::Quantities::from(&original).0;
776        for (category, offset) in &self.modifications {
777            let v = sums.entry(*category).or_default();
778            match v.checked_add_signed(*offset) {
779                Some(result) => *v = result,
780                None => emit!(
781                    category,
782                    "Attempted to modify original quantity {v} into the negative ({offset})"
783                ),
784            }
785        }
786
787        for (category, quantity, outcome) in &self.in_flight {
788            match sums.get_mut(category) {
789                Some(c) if *c >= *quantity => *c -= *quantity,
790                Some(c) => emit!(
791                    category,
792                    "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there were only {c} items in the category originally"
793                ),
794                None => emit!(
795                    category,
796                    "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there never was an item in this category"
797                ),
798            }
799        }
800
801        for (category, quantity) in &new {
802            match sums.get_mut(category) {
803                Some(c) if *c >= *quantity => *c -= *quantity,
804                Some(c) => emit!(
805                    category,
806                    "New item has {quantity} items in category '{category}', but original (after emitted outcomes) only has {c} left"
807                ),
808                None => emit!(
809                    category,
810                    "New item has {quantity} items in category '{category}', but after emitted outcomes there are none left"
811                ),
812            }
813        }
814
815        for (category, quantity) in sums {
816            if quantity > 0 {
817                emit!(
818                    category,
819                    "Missing outcomes or mismatched quantity in category '{category}', off by {quantity}"
820                );
821            }
822        }
823    }
824
825    #[cfg(not(debug_assertions))]
826    fn assert_quantities(&self, _: Quantities, _: Quantities) {}
827}
828
829impl<'a> Drop for RecordKeeper<'a> {
830    fn drop(&mut self) {
831        for (category, quantity) in std::mem::take(&mut self.on_drop) {
832            self.meta.track_outcome(
833                Outcome::Invalid(DiscardReason::Internal),
834                category,
835                quantity,
836            );
837        }
838    }
839}
840
841impl RecordKeeper<'_> {
842    /// Rejects an item if the passed result is an error and returns a default value.
843    ///
844    /// Similar to [`Self::reject_err`], this emits the necessary outcomes for an
845    /// item, if there is an error.
846    pub fn or_default<T, E, Q>(&mut self, r: Result<T, E>, q: Q) -> T
847    where
848        T: Default,
849        E: OutcomeError,
850        Q: Counted,
851    {
852        match r {
853            Ok(result) => result,
854            Err(err) => {
855                self.reject_err(err, q);
856                T::default()
857            }
858        }
859    }
860
861    /// Rejects an item with an error.
862    ///
863    /// Makes sure the correct outcomes are tracked for the item, that is discarded due to an
864    /// error.
865    pub fn reject_err<E, Q>(&mut self, err: E, q: Q) -> E::Error
866    where
867        E: OutcomeError,
868        Q: Counted,
869    {
870        let (outcome, err) = err.consume();
871        for (category, quantity) in q.quantities() {
872            self.in_flight.push((category, quantity, outcome.clone()))
873        }
874        err
875    }
876
877    /// Rejects an item with an internal error.
878    ///
879    /// See also: [`Managed::internal_error`].
880    #[track_caller]
881    pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
882    where
883        E: std::error::Error + 'static,
884        Q: Counted,
885    {
886        relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
887        debug_assert!(false, "internal error: {error}");
888        self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
889    }
890}
891
892/// Iterator returned by [`Managed::split`].
893pub struct Split<I, S>
894where
895    I: Iterator<Item = S>,
896    S: Counted,
897{
898    #[cfg(debug_assertions)]
899    quantities: Quantities,
900    items: I,
901    meta: Arc<Meta>,
902    exhausted: bool,
903}
904
905impl<I, S> Split<I, S>
906where
907    I: Iterator<Item = S>,
908    S: Counted,
909{
910    /// Subtracts passed quantities from the total quantities to verify total quantity counts are
911    /// matching.
912    #[cfg(debug_assertions)]
913    fn subtract(&mut self, q: Quantities) {
914        for (category, quantities) in q {
915            let Some(orig_quantities) = self
916                .quantities
917                .iter_mut()
918                .find_map(|(c, q)| (*c == category).then_some(q))
919            else {
920                debug_assert!(
921                    false,
922                    "mismatching quantities, item split into category {category}, \
923                    which originally was not present"
924                );
925                continue;
926            };
927
928            if *orig_quantities >= quantities {
929                *orig_quantities -= quantities;
930            } else {
931                debug_assert!(
932                    false,
933                    "in total more items produced in category {category} than originally available"
934                );
935            }
936        }
937    }
938}
939
940impl<I, S> Iterator for Split<I, S>
941where
942    I: Iterator<Item = S>,
943    S: Counted,
944{
945    type Item = Managed<S>;
946
947    fn next(&mut self) -> Option<Self::Item> {
948        let next = match self.items.next() {
949            Some(next) => next,
950            None => {
951                self.exhausted = true;
952                return None;
953            }
954        };
955
956        #[cfg(debug_assertions)]
957        self.subtract(next.quantities());
958
959        Some(Managed::from_parts(next, Arc::clone(&self.meta)))
960    }
961}
962
963impl<I, S> Drop for Split<I, S>
964where
965    I: Iterator<Item = S>,
966    S: Counted,
967{
968    fn drop(&mut self) {
969        // If the inner iterator was exhausted, no items should be remaining.
970        #[cfg(debug_assertions)]
971        if self.exhausted {
972            for (category, quantities) in &self.quantities {
973                debug_assert!(
974                    *quantities == 0,
975                    "items split, but still {quantities} remaining in category {category}"
976                );
977            }
978        }
979
980        if self.exhausted {
981            return;
982        }
983
984        // There may be items remaining in the iterator for multiple reasons:
985        // - there was a panic
986        // - the iterator was never fully consumed
987        //
988        // In any case, outcomes must be emitted for the remaining items.
989        for item in &mut self.items {
990            for (category, quantity) in item.quantities() {
991                self.meta.track_outcome(
992                    Outcome::Invalid(DiscardReason::Internal),
993                    category,
994                    quantity,
995                );
996            }
997        }
998    }
999}
1000
1001impl<I, S> FusedIterator for Split<I, S>
1002where
1003    I: Iterator<Item = S> + FusedIterator,
1004    S: Counted,
1005{
1006}
1007
1008#[cfg(test)]
1009mod tests {
1010    use super::*;
1011
1012    struct CountedVec(Vec<u32>);
1013
1014    impl Counted for CountedVec {
1015        fn quantities(&self) -> Quantities {
1016            smallvec::smallvec![(DataCategory::Error, self.0.len())]
1017        }
1018    }
1019
1020    struct CountedValue(u32);
1021
1022    impl Counted for CountedValue {
1023        fn quantities(&self) -> Quantities {
1024            smallvec::smallvec![(DataCategory::Error, 1)]
1025        }
1026    }
1027
1028    #[test]
1029    fn test_reject_err_no_outcome() {
1030        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1031        let (managed, mut handle) = Managed::for_test(value).build();
1032
1033        // Rejecting with no outcome, should not emit any outcomes.
1034        let _ = managed.reject_err((None, ()));
1035        handle.assert_no_outcomes();
1036
1037        // Now dropping the manged instance, should not record any (internal) outcomes either.
1038        drop(managed);
1039        handle.assert_no_outcomes();
1040    }
1041
1042    #[test]
1043    fn test_split_fully_consumed() {
1044        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1045        let (managed, mut handle) = Managed::for_test(value).build();
1046
1047        let s = managed
1048            .split(|value| value.0.into_iter().map(CountedValue))
1049            // Fully consume the iterator to make sure there aren't any outcomes emitted on drop.
1050            .collect::<Vec<_>>();
1051
1052        handle.assert_no_outcomes();
1053
1054        for (i, s) in s.into_iter().enumerate() {
1055            assert_eq!(s.as_ref().0, i as u32);
1056            let outcome = Outcome::Invalid(DiscardReason::Cors);
1057            let _ = s.reject_err((outcome.clone(), ()));
1058            handle.assert_outcome(&outcome, DataCategory::Error, 1);
1059        }
1060    }
1061
1062    #[test]
1063    fn test_split_partially_consumed_emits_remaining() {
1064        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1065        let (managed, mut handle) = Managed::for_test(value).build();
1066
1067        let mut s = managed.split(|value| value.0.into_iter().map(CountedValue));
1068        handle.assert_no_outcomes();
1069
1070        drop(s.next());
1071        handle.assert_internal_outcome(DataCategory::Error, 1);
1072        drop(s.next());
1073        handle.assert_internal_outcome(DataCategory::Error, 1);
1074        drop(s.next());
1075        handle.assert_internal_outcome(DataCategory::Error, 1);
1076        handle.assert_no_outcomes();
1077
1078        drop(s);
1079
1080        handle.assert_internal_outcome(DataCategory::Error, 1);
1081        handle.assert_internal_outcome(DataCategory::Error, 1);
1082        handle.assert_internal_outcome(DataCategory::Error, 1);
1083    }
1084
1085    #[test]
1086    fn test_split_changing_quantities_should_panic() {
1087        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1088        let (managed, mut handle) = Managed::for_test(value).build();
1089
1090        let mut s = managed.split(|_| std::iter::once(CountedValue(0)));
1091
1092        s.next().unwrap().accept(|_| {});
1093        handle.assert_no_outcomes();
1094
1095        assert!(s.next().is_none());
1096
1097        let r = std::panic::catch_unwind(move || {
1098            drop(s);
1099        });
1100
1101        assert!(
1102            r.is_err(),
1103            "expected split to panic because of mismatched (not enough) outcomes"
1104        );
1105    }
1106
1107    #[test]
1108    fn test_split_more_outcomes_than_before_should_panic() {
1109        let value = CountedVec(vec![0]);
1110        let (managed, mut handle) = Managed::for_test(value).build();
1111
1112        let mut s = managed.split(|_| vec![CountedValue(0), CountedValue(2)].into_iter());
1113
1114        s.next().unwrap().accept(|_| {});
1115        handle.assert_no_outcomes();
1116
1117        let r = std::panic::catch_unwind(move || {
1118            s.next();
1119        });
1120
1121        assert!(
1122            r.is_err(),
1123            "expected split to panic because of mismatched (too many) outcomes"
1124        );
1125    }
1126
1127    #[test]
1128    fn test_split_changing_categories_should_panic() {
1129        struct Special;
1130        impl Counted for Special {
1131            fn quantities(&self) -> Quantities {
1132                smallvec::smallvec![(DataCategory::Error, 1), (DataCategory::Transaction, 1)]
1133            }
1134        }
1135
1136        let value = CountedVec(vec![0]);
1137        let (managed, _handle) = Managed::for_test(value).build();
1138
1139        let mut s = managed.split(|value| value.0.into_iter().map(|_| Special));
1140
1141        let r = std::panic::catch_unwind(move || {
1142            let _ = s.next();
1143        });
1144
1145        assert!(
1146            r.is_err(),
1147            "expected split to panic because of mismatched outcome categories"
1148        );
1149    }
1150
1151    #[test]
1152    fn test_split_assert_fused() {
1153        fn only_fused<T: FusedIterator>(_: T) {}
1154
1155        let (managed, mut handle) = Managed::for_test(CountedVec(vec![0])).build();
1156        only_fused(managed.split(|value| value.0.into_iter().map(CountedValue)));
1157        handle.assert_internal_outcome(DataCategory::Error, 1);
1158    }
1159}