relay_server/managed/
managed.rs

1#[cfg(debug_assertions)]
2use std::collections::BTreeMap;
3use std::convert::Infallible;
4use std::fmt;
5use std::iter::FusedIterator;
6use std::mem::ManuallyDrop;
7use std::net::IpAddr;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use chrono::{DateTime, Utc};
12use relay_event_schema::protocol::EventId;
13use relay_quotas::{DataCategory, Scoping};
14use relay_system::Addr;
15use smallvec::SmallVec;
16
17use crate::Envelope;
18use crate::managed::{Counted, ManagedEnvelope, Quantities};
19use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
20use crate::services::processor::ProcessingError;
21
22#[cfg(debug_assertions)]
23mod debug;
24#[cfg(test)]
25mod test;
26
27#[cfg(test)]
28pub use self::test::*;
29
30/// An error which can be extracted into an outcome.
31pub trait OutcomeError {
32    /// Produced error, without attached outcome.
33    type Error;
34
35    /// Consumes the error and returns an outcome and [`Self::Error`].
36    ///
37    /// Returning a `None` outcome should discard the item(s) silently.
38    fn consume(self) -> (Option<Outcome>, Self::Error);
39}
40
41impl OutcomeError for Outcome {
42    type Error = ();
43
44    fn consume(self) -> (Option<Outcome>, Self::Error) {
45        (self, ()).consume()
46    }
47}
48
49impl<E> OutcomeError for (Outcome, E) {
50    type Error = E;
51
52    fn consume(self) -> (Option<Outcome>, Self::Error) {
53        (Some(self.0), self.1)
54    }
55}
56
57impl<E> OutcomeError for (Option<Outcome>, E) {
58    type Error = E;
59
60    fn consume(self) -> (Option<Outcome>, Self::Error) {
61        self
62    }
63}
64
65impl OutcomeError for ProcessingError {
66    type Error = Self;
67
68    fn consume(self) -> (Option<Outcome>, Self::Error) {
69        (self.to_outcome(), self)
70    }
71}
72
73impl OutcomeError for Infallible {
74    type Error = Self;
75
76    fn consume(self) -> (Option<Outcome>, Self::Error) {
77        match self {}
78    }
79}
80
81/// A wrapper type which ensures outcomes have been emitted for an error.
82///
83/// [`Managed`] wraps an error in [`Rejected`] once outcomes for have been emitted for the managed
84/// item.
85#[derive(Debug, Clone, Copy)]
86#[must_use = "a rejection must be propagated"]
87pub struct Rejected<T>(T);
88
89impl<T> Rejected<T> {
90    /// Extracts the underlying error.
91    pub fn into_inner(self) -> T {
92        self.0
93    }
94
95    /// Maps the rejected error to a different error.
96    pub fn map<F, S>(self, f: F) -> Rejected<S>
97    where
98        F: FnOnce(T) -> S,
99    {
100        Rejected(f(self.0))
101    }
102}
103
104impl<T> std::error::Error for Rejected<T>
105where
106    T: std::error::Error,
107{
108    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
109        self.0.source()
110    }
111}
112
113impl<T> std::fmt::Display for Rejected<T>
114where
115    T: std::fmt::Display,
116{
117    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118        self.0.fmt(f)
119    }
120}
121
122/// The [`Managed`] wrapper ensures outcomes are correctly emitted for the contained item.
123pub struct Managed<T: Counted> {
124    value: T,
125    meta: Arc<Meta>,
126    done: AtomicBool,
127}
128
129impl<T: Counted> Managed<T> {
130    /// Creates a new managed instance with a `value` from a [`ManagedEnvelope`].
131    ///
132    /// The [`Managed`] instance, inherits all metadata from the passed [`ManagedEnvelope`],
133    /// like received time or scoping.
134    pub fn from_envelope(envelope: &ManagedEnvelope, value: T) -> Self {
135        Self::from_parts(
136            value,
137            Arc::new(Meta {
138                outcome_aggregator: envelope.outcome_aggregator().clone(),
139                received_at: envelope.received_at(),
140                scoping: envelope.scoping(),
141                event_id: envelope.envelope().event_id(),
142                remote_addr: envelope.meta().remote_addr(),
143            }),
144        )
145    }
146
147    /// Creates another [`Managed`] instance, with a new value but shared metadata.
148    pub fn wrap<S>(&self, other: S) -> Managed<S>
149    where
150        S: Counted,
151    {
152        Managed::from_parts(other, Arc::clone(&self.meta))
153    }
154
155    /// Original received timestamp.
156    pub fn received_at(&self) -> DateTime<Utc> {
157        self.meta.received_at
158    }
159
160    /// Scoping information stored in this context.
161    pub fn scoping(&self) -> Scoping {
162        self.meta.scoping
163    }
164
165    /// Splits [`Self`] into two other [`Managed`] items.
166    ///
167    /// The two resulting managed instances together are expected to have the same outcomes as the original instance..
168    /// Since splitting may introduce a new type of item, which some of the original
169    /// quantities are transferred to, there may be new additional data categories created.
170    pub fn split_once<F, S, U>(self, f: F) -> (Managed<S>, Managed<U>)
171    where
172        F: FnOnce(T) -> (S, U),
173        S: Counted,
174        U: Counted,
175    {
176        debug_assert!(!self.is_done());
177
178        let (value, meta) = self.destructure();
179        #[cfg(debug_assertions)]
180        let quantities = value.quantities();
181
182        let (a, b) = f(value);
183
184        #[cfg(debug_assertions)]
185        debug::Quantities::from(&quantities)
186            // Instead of `assert_only_extra`, used for extracted metrics also counting
187            // in the `metric bucket` category, it may make sense to give the mapping function
188            // control over which categories to ignore, similar to the record keeper's lenient
189            // method.
190            .assert_only_extra(debug::Quantities::from(&a) + debug::Quantities::from(&b));
191
192        (
193            Managed::from_parts(a, Arc::clone(&meta)),
194            Managed::from_parts(b, meta),
195        )
196    }
197
198    /// Splits [`Self`] into a variable amount if individual items.
199    ///
200    /// Useful when the current instance contains multiple items of the same type
201    /// and must be split into individually managed items.
202    pub fn split<F, I, S>(self, f: F) -> Split<I::IntoIter, I::Item>
203    where
204        F: FnOnce(T) -> I,
205        I: IntoIterator<Item = S>,
206        S: Counted,
207    {
208        self.split_with_context(|value| (f(value), ())).0
209    }
210
211    /// Splits [`Self`] into a variable amount if individual items.
212    ///
213    /// Like [`Self::split`] but also allows returning an untracked context,
214    /// a way of returning additional data when deconstructing the original item.
215    pub fn split_with_context<F, I, S, C>(self, f: F) -> (Split<I::IntoIter, I::Item>, C)
216    where
217        F: FnOnce(T) -> (I, C),
218        I: IntoIterator<Item = S>,
219        S: Counted,
220    {
221        debug_assert!(!self.is_done());
222
223        let (value, meta) = self.destructure();
224        #[cfg(debug_assertions)]
225        let quantities = value.quantities();
226
227        let (items, context) = f(value);
228
229        (
230            Split {
231                #[cfg(debug_assertions)]
232                quantities,
233                items: items.into_iter(),
234                meta,
235                exhausted: false,
236            },
237            context,
238        )
239    }
240
241    /// Filters individual items and emits outcomes for them if they are removed.
242    ///
243    /// This is particularly useful when the managed instance is a container of individual items,
244    /// which need to be processed or filtered on a case by case basis.
245    ///
246    /// # Examples:
247    ///
248    /// ```
249    /// # use relay_server::managed::{Counted, Managed, Quantities};
250    /// # #[derive(Copy, Clone)]
251    /// # struct Context<'a>(&'a u32);
252    /// # struct Item;
253    /// struct Items {
254    ///     items: Vec<Item>,
255    /// }
256    /// # impl Counted for Items {
257    /// #   fn quantities(&self) -> Quantities {
258    /// #       todo!()
259    /// #   }
260    /// # }
261    /// # impl Counted for Item {
262    /// #   fn quantities(&self) -> Quantities {
263    /// #       todo!()
264    /// #   }
265    /// # }
266    /// # type Error = std::convert::Infallible;
267    ///
268    /// fn process_items(items: &mut Managed<Items>, ctx: Context<'_>) {
269    ///     items.retain(|items| &mut items.items, |item, _| process(item, ctx));
270    /// }
271    ///
272    /// fn process(item: &mut Item, ctx: Context<'_>) -> Result<(), Error> {
273    ///     todo!()
274    /// }
275    /// ```
276    pub fn retain<S, I, U, E>(&mut self, select: S, mut retain: U)
277    where
278        S: FnOnce(&mut T) -> &mut Vec<I>,
279        I: Counted,
280        U: FnMut(&mut I, &mut RecordKeeper<'_>) -> Result<(), E>,
281        E: OutcomeError,
282    {
283        self.retain_with_context(
284            |inner| (select(inner), &()),
285            |item, _, records| retain(item, records),
286        );
287    }
288
289    /// Filters individual items and emits outcomes for them if they are removed.
290    ///
291    /// Like [`Self::retain`], but it allows for an additional context extracted from the managed
292    /// object passed to the retain function.
293    ///
294    /// # Examples:
295    ///
296    /// ```
297    /// # use relay_server::managed::{Counted, Managed, Quantities};
298    /// # #[derive(Copy, Clone)]
299    /// # struct Context<'a>(&'a u32);
300    /// # struct Item;
301    /// struct Items {
302    ///     ty: String,
303    ///     items: Vec<Item>,
304    /// }
305    /// # impl Counted for Items {
306    /// #   fn quantities(&self) -> Quantities {
307    /// #       todo!()
308    /// #   }
309    /// # }
310    /// # impl Counted for Item {
311    /// #   fn quantities(&self) -> Quantities {
312    /// #       todo!()
313    /// #   }
314    /// # }
315    /// # type Error = std::convert::Infallible;
316    ///
317    /// fn process_items(items: &mut Managed<Items>, ctx: Context<'_>) {
318    ///     items.retain_with_context(|items| (&mut items.items, &items.ty), |item, ty, _| process(item, ty, ctx));
319    /// }
320    ///
321    /// fn process(item: &mut Item, ty: &str, ctx: Context<'_>) -> Result<(), Error> {
322    ///     todo!()
323    /// }
324    /// ```
325    pub fn retain_with_context<S, C, I, U, E>(&mut self, select: S, mut retain: U)
326    where
327        // Returning `&'a C` here is not optimal, ideally we return C here and express the correct
328        // bound of `C: 'a` but this is, to my knowledge, currently not possible to express in stable Rust.
329        //
330        // This is unfortunately a bit limiting but for most of our purposes it is enough.
331        for<'a> S: FnOnce(&'a mut T) -> (&'a mut Vec<I>, &'a C),
332        I: Counted,
333        U: FnMut(&mut I, &C, &mut RecordKeeper<'_>) -> Result<(), E>,
334        E: OutcomeError,
335    {
336        self.modify(|inner, records| {
337            let (items, ctx) = select(inner);
338            items.retain_mut(|item| match retain(item, ctx, records) {
339                Ok(()) => true,
340                Err(err) => {
341                    records.reject_err(err, &*item);
342                    false
343                }
344            })
345        });
346    }
347
348    /// Maps a [`Managed<T>`] to [`Managed<S>`] by applying the mapping function `f`.
349    ///
350    /// Like [`Self::try_map`] but not fallible.
351    pub fn map<S, F>(self, f: F) -> Managed<S>
352    where
353        F: FnOnce(T, &mut RecordKeeper) -> S,
354        S: Counted,
355    {
356        self.try_map(move |inner, records| Ok::<_, Infallible>(f(inner, records)))
357            .unwrap_or_else(|e| match e.0 {})
358    }
359
360    /// Maps a [`Managed<T>`] to [`Managed<S>`] by applying the mapping function `f`.
361    ///
362    /// The mapping function gets access to a [`RecordKeeper`], to emit outcomes for partial
363    /// discards.
364    ///
365    /// If the mapping function returns an error, the entire (original) [`Self`] is rejected,
366    /// no partial outcomes are emitted.
367    pub fn try_map<S, F, E>(self, f: F) -> Result<Managed<S>, Rejected<E::Error>>
368    where
369        F: FnOnce(T, &mut RecordKeeper) -> Result<S, E>,
370        S: Counted,
371        E: OutcomeError,
372    {
373        debug_assert!(!self.is_done());
374
375        let (value, meta) = self.destructure();
376        let quantities = value.quantities();
377
378        let mut records = RecordKeeper::new(&meta, quantities);
379
380        match f(value, &mut records) {
381            Ok(value) => {
382                records.success(value.quantities());
383                Ok(Managed::from_parts(value, meta))
384            }
385            Err(err) => Err(records.failure(err)),
386        }
387    }
388
389    /// Gives mutable access to the contained value to modify it.
390    ///
391    /// Like [`Self::try_modify`] but not fallible.
392    pub fn modify<F>(&mut self, f: F)
393    where
394        F: FnOnce(&mut T, &mut RecordKeeper),
395    {
396        self.try_modify(move |inner, records| {
397            f(inner, records);
398            Ok::<_, Infallible>(())
399        })
400        .unwrap_or_else(|e| match e {})
401    }
402
403    /// Gives mutable access to the contained value to modify it.
404    ///
405    /// The modifying function gets access to a [`RecordKeeper`], to emit outcomes for partial
406    /// discards.
407    ///
408    /// If the modifying function returns an error, the entire (original) [`Self`] is rejected,
409    /// no partial outcomes are emitted.
410    pub fn try_modify<F, E>(&mut self, f: F) -> Result<(), Rejected<E::Error>>
411    where
412        F: FnOnce(&mut T, &mut RecordKeeper) -> Result<(), E>,
413        E: OutcomeError,
414    {
415        debug_assert!(!self.is_done());
416
417        let quantities = self.value.quantities();
418        let mut records = RecordKeeper::new(&self.meta, quantities);
419
420        match f(&mut self.value, &mut records) {
421            Ok(()) => {
422                records.success(self.value.quantities());
423                Ok(())
424            }
425            Err(err) => {
426                let err = records.failure(err);
427                self.done.store(true, Ordering::Relaxed);
428                Err(err)
429            }
430        }
431    }
432
433    /// Accepts the item of this managed instance.
434    ///
435    /// This should be called if the item has been or is about to be accepted by the upstream, which means that
436    /// the responsibility for logging outcomes has been moved. This function will not log any
437    /// outcomes.
438    ///
439    /// Like [`Self::try_accept`], but infallible.
440    pub fn accept<F, S>(self, f: F) -> S
441    where
442        F: FnOnce(T) -> S,
443    {
444        self.try_accept(|item| Ok::<_, Infallible>(f(item)))
445            .unwrap_or_else(|err| match err.0 {})
446    }
447
448    /// Accepts the item of this managed instance.
449    ///
450    /// This should be called if the item has been or is about to be accepted by the upstream.
451    ///
452    /// Outcomes are only emitted when the accepting closure returns an error, which means that
453    /// in the success case the responsibility for logging outcomes has been moved to the
454    /// caller/upstream.
455    pub fn try_accept<F, S, E>(self, f: F) -> Result<S, Rejected<E::Error>>
456    where
457        F: FnOnce(T) -> Result<S, E>,
458        E: OutcomeError,
459    {
460        debug_assert!(!self.is_done());
461
462        let (value, meta) = self.destructure();
463        let records = RecordKeeper::new(&meta, value.quantities());
464
465        match f(value) {
466            Ok(value) => {
467                records.accept();
468                Ok(value)
469            }
470            Err(err) => Err(records.failure(err)),
471        }
472    }
473
474    /// Rejects the entire [`Managed`] instance with an internal error.
475    ///
476    /// Internal errors should be reserved for uses where logical invariants are violated.
477    /// Cases which should never happen and always indicate a logical bug.
478    ///
479    /// This function will panic in debug builds, but discard the item
480    /// with an internal discard reason in release builds.
481    #[track_caller]
482    pub fn internal_error(&self, reason: &'static str) -> Rejected<()> {
483        relay_log::error!("internal error: {reason}");
484        debug_assert!(false, "internal error: {reason}");
485        self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
486    }
487
488    /// Rejects the entire [`Managed`] instance.
489    pub fn reject_err<E>(&self, error: E) -> Rejected<E::Error>
490    where
491        E: OutcomeError,
492    {
493        debug_assert!(!self.is_done());
494
495        let (outcome, error) = error.consume();
496        if let Some(outcome) = outcome {
497            self.do_reject(outcome);
498        }
499        Rejected(error)
500    }
501
502    fn do_reject(&self, outcome: Outcome) {
503        if !self.done.fetch_or(true, Ordering::Relaxed) {
504            for (category, quantity) in self.value.quantities() {
505                self.meta.track_outcome(outcome.clone(), category, quantity);
506            }
507        }
508    }
509
510    /// De-structures this managed instance into its own parts.
511    ///
512    /// While de-structured no outcomes will be emitted on drop.
513    fn destructure(self) -> (T, Arc<Meta>) {
514        // SAFETY: this follows an approach mentioned in the RFC
515        // <https://github.com/rust-lang/rfcs/pull/3466> to move fields out of
516        // a type with a drop implementation.
517        //
518        // The original type is wrapped in a manual drop to prevent running the
519        // drop handler, afterwards all fields are moved out of the type.
520        //
521        // And the original type is forgotten, de-structuring the original type
522        // without running its drop implementation.
523        let this = ManuallyDrop::new(self);
524        let value = unsafe { std::ptr::read(&this.value) };
525        let meta = unsafe { std::ptr::read(&this.meta) };
526        (value, meta)
527    }
528
529    fn from_parts(value: T, meta: Arc<Meta>) -> Self {
530        Self {
531            value,
532            meta,
533            done: AtomicBool::new(false),
534        }
535    }
536
537    fn is_done(&self) -> bool {
538        self.done.load(Ordering::Relaxed)
539    }
540}
541
542impl<T: Counted> Drop for Managed<T> {
543    fn drop(&mut self) {
544        self.do_reject(Outcome::Invalid(DiscardReason::Internal));
545    }
546}
547
548impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
549    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
550        write!(f, "Managed<{}>[", std::any::type_name::<T>())?;
551        for (i, (category, quantity)) in self.value.quantities().iter().enumerate() {
552            if i > 0 {
553                write!(f, ",")?;
554            }
555            write!(f, "{category}:{quantity}")?;
556        }
557        write!(f, "](")?;
558        self.value.fmt(f)?;
559        write!(f, ")")
560    }
561}
562
563impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
564    fn from(value: Managed<Box<Envelope>>) -> Self {
565        let (value, meta) = value.destructure();
566        let mut envelope = ManagedEnvelope::new(value, meta.outcome_aggregator.clone());
567        envelope.scope(meta.scoping);
568        envelope
569    }
570}
571
572impl<T: Counted> AsRef<T> for Managed<T> {
573    fn as_ref(&self) -> &T {
574        &self.value
575    }
576}
577
578impl<T: Counted> std::ops::Deref for Managed<T> {
579    type Target = T;
580
581    fn deref(&self) -> &Self::Target {
582        &self.value
583    }
584}
585
586/// Internal metadata attached with a [`Managed`] instance.
587struct Meta {
588    /// Outcome aggregator service.
589    outcome_aggregator: Addr<TrackOutcome>,
590    /// Received timestamp, when the contained payload/information was received.
591    ///
592    /// See also: [`crate::extractors::RequestMeta::received_at`].
593    received_at: DateTime<Utc>,
594    /// Data scoping information of the contained item.
595    scoping: Scoping,
596    /// Optional event id associated with the contained data.
597    event_id: Option<EventId>,
598    /// Optional remote addr from where the data was received from.
599    remote_addr: Option<IpAddr>,
600}
601
602impl Meta {
603    pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
604        self.outcome_aggregator.send(TrackOutcome {
605            timestamp: self.received_at,
606            scoping: self.scoping,
607            outcome,
608            event_id: self.event_id,
609            remote_addr: self.remote_addr,
610            category,
611            quantity: quantity.try_into().unwrap_or(u32::MAX),
612        });
613    }
614}
615
616/// A record keeper makes sure modifications done on a [`Managed`] item are all accounted for
617/// correctly.
618pub struct RecordKeeper<'a> {
619    meta: &'a Meta,
620    on_drop: Quantities,
621    #[cfg(debug_assertions)]
622    lenient: SmallVec<[DataCategory; 1]>,
623    #[cfg(debug_assertions)]
624    modifications: BTreeMap<DataCategory, isize>,
625    in_flight: SmallVec<[(DataCategory, usize, Option<Outcome>); 2]>,
626}
627
628impl<'a> RecordKeeper<'a> {
629    fn new(meta: &'a Meta, quantities: Quantities) -> Self {
630        Self {
631            meta,
632            on_drop: quantities,
633            #[cfg(debug_assertions)]
634            lenient: Default::default(),
635            #[cfg(debug_assertions)]
636            modifications: Default::default(),
637            in_flight: Default::default(),
638        }
639    }
640
641    /// Marking a data category as lenient exempts this category from outcome quantity validations.
642    ///
643    /// Consider using [`Self::modify_by`] instead.
644    ///
645    /// This can be used in cases where the quantity is knowingly modified, which is quite common
646    /// for data categories which count bytes.
647    pub fn lenient(&mut self, category: DataCategory) {
648        let _category = category;
649        #[cfg(debug_assertions)]
650        self.lenient.push(_category);
651    }
652
653    /// Modifies the expected count for a category.
654    ///
655    /// When extracting payloads category counts may expectedly change, these changes can be
656    /// tracked using this function.
657    ///
658    /// Prefer using [`Self::modify_by`] over [`Self::lenient`] as lenient completely disables
659    /// validation for the entire category.
660    pub fn modify_by(&mut self, category: DataCategory, offset: isize) {
661        let _category = category;
662        let _offset = offset;
663        #[cfg(debug_assertions)]
664        {
665            *self.modifications.entry(_category).or_default() += offset;
666        }
667    }
668
669    /// Finalizes all records and emits the necessary outcomes.
670    ///
671    /// This uses the quantities of the original item.
672    fn failure<E>(mut self, error: E) -> Rejected<E::Error>
673    where
674        E: OutcomeError,
675    {
676        let (outcome, error) = error.consume();
677
678        if let Some(outcome) = outcome {
679            for (category, quantity) in std::mem::take(&mut self.on_drop) {
680                self.meta.track_outcome(outcome.clone(), category, quantity);
681            }
682        }
683
684        Rejected(error)
685    }
686
687    /// Finalizes all records and asserts that no additional outcomes have been tracked.
688    ///
689    /// Unlike [`Self::success`], this method does not allow for intermediate or partial outcomes,
690    /// it also does not verify any outcomes.
691    ///
692    /// This method is useful for using the record keeper to track failure outcomes, either
693    /// explicit failures or panics.
694    fn accept(mut self) {
695        debug_assert!(
696            self.in_flight.is_empty(),
697            "records accepted, but intermediate outcomes tracked"
698        );
699        self.on_drop.clear();
700    }
701
702    /// Finalizes all records and emits the created outcomes.
703    ///
704    /// This only emits the outcomes that have been explicitly registered.
705    /// In a debug build, the function also ensure no outcomes have been missed by comparing
706    /// quantities of the item before and after.
707    fn success(mut self, new: Quantities) {
708        let original = std::mem::take(&mut self.on_drop);
709        self.assert_quantities(original, new);
710
711        self.on_drop.clear();
712        for (category, quantity, outcome) in std::mem::take(&mut self.in_flight) {
713            if let Some(outcome) = outcome {
714                self.meta.track_outcome(outcome, category, quantity);
715            }
716        }
717    }
718
719    /// Asserts that there have been no quantities lost.
720    ///
721    /// The original amount of quantities should match the new amount of quantities + all emitted
722    /// outcomes.
723    #[cfg(debug_assertions)]
724    fn assert_quantities(&self, original: Quantities, new: Quantities) {
725        macro_rules! emit {
726            ($category:expr, $($tt:tt)*) => {{
727                match self.lenient.contains(&$category) {
728                    // Certain categories are known to be not always correct,
729                    // they are logged instead.
730                    true => relay_log::debug!($($tt)*),
731                    false  => {
732                        relay_log::error!("Original: {original:?}");
733                        relay_log::error!("New: {new:?}");
734                        relay_log::error!("Modifications: {:?}", self.modifications);
735                        relay_log::error!("In Flight: {:?}", self.in_flight);
736                        panic!($($tt)*)
737                    }
738                }
739            }};
740        }
741
742        let mut sums = debug::Quantities::from(&original).0;
743        for (category, offset) in &self.modifications {
744            let v = sums.entry(*category).or_default();
745            match v.checked_add_signed(*offset) {
746                Some(result) => *v = result,
747                None => emit!(
748                    category,
749                    "Attempted to modify original quantity {v} into the negative ({offset})"
750                ),
751            }
752        }
753
754        for (category, quantity, outcome) in &self.in_flight {
755            match sums.get_mut(category) {
756                Some(c) if *c >= *quantity => *c -= *quantity,
757                Some(c) => emit!(
758                    category,
759                    "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there were only {c} items in the category originally"
760                ),
761                None => emit!(
762                    category,
763                    "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there never was an item in this category"
764                ),
765            }
766        }
767
768        for (category, quantity) in &new {
769            match sums.get_mut(category) {
770                Some(c) if *c >= *quantity => *c -= *quantity,
771                Some(c) => emit!(
772                    category,
773                    "New item has {quantity} items in category '{category}', but original (after emitted outcomes) only has {c} left"
774                ),
775                None => emit!(
776                    category,
777                    "New item has {quantity} items in category '{category}', but after emitted outcomes there are none left"
778                ),
779            }
780        }
781
782        for (category, quantity) in sums {
783            if quantity > 0 {
784                emit!(
785                    category,
786                    "Missing outcomes or mismatched quantity in category '{category}', off by {quantity}"
787                );
788            }
789        }
790    }
791
792    #[cfg(not(debug_assertions))]
793    fn assert_quantities(&self, _: Quantities, _: Quantities) {}
794}
795
796impl<'a> Drop for RecordKeeper<'a> {
797    fn drop(&mut self) {
798        for (category, quantity) in std::mem::take(&mut self.on_drop) {
799            self.meta.track_outcome(
800                Outcome::Invalid(DiscardReason::Internal),
801                category,
802                quantity,
803            );
804        }
805    }
806}
807
808impl RecordKeeper<'_> {
809    /// Rejects an item if the passed result is an error and returns a default value.
810    ///
811    /// Similar to [`Self::reject_err`], this emits the necessary outcomes for an
812    /// item, if there is an error.
813    pub fn or_default<T, E, Q>(&mut self, r: Result<T, E>, q: Q) -> T
814    where
815        T: Default,
816        E: OutcomeError,
817        Q: Counted,
818    {
819        match r {
820            Ok(result) => result,
821            Err(err) => {
822                self.reject_err(err, q);
823                T::default()
824            }
825        }
826    }
827
828    /// Rejects an item with an error.
829    ///
830    /// Makes sure the correct outcomes are tracked for the item, that is discarded due to an
831    /// error.
832    pub fn reject_err<E, Q>(&mut self, err: E, q: Q) -> E::Error
833    where
834        E: OutcomeError,
835        Q: Counted,
836    {
837        let (outcome, err) = err.consume();
838        for (category, quantity) in q.quantities() {
839            self.in_flight.push((category, quantity, outcome.clone()))
840        }
841        err
842    }
843
844    /// Rejects an item with an internal error.
845    ///
846    /// See also: [`Managed::internal_error`].
847    #[track_caller]
848    pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
849    where
850        E: std::error::Error + 'static,
851        Q: Counted,
852    {
853        relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
854        debug_assert!(false, "internal error: {error}");
855        self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
856    }
857}
858
859/// Iterator returned by [`Managed::split`].
860pub struct Split<I, S>
861where
862    I: Iterator<Item = S>,
863    S: Counted,
864{
865    #[cfg(debug_assertions)]
866    quantities: Quantities,
867    items: I,
868    meta: Arc<Meta>,
869    exhausted: bool,
870}
871
872impl<I, S> Split<I, S>
873where
874    I: Iterator<Item = S>,
875    S: Counted,
876{
877    /// Subtracts passed quantities from the total quantities to verify total quantity counts are
878    /// matching.
879    #[cfg(debug_assertions)]
880    fn subtract(&mut self, q: Quantities) {
881        for (category, quantities) in q {
882            let Some(orig_quantities) = self
883                .quantities
884                .iter_mut()
885                .find_map(|(c, q)| (*c == category).then_some(q))
886            else {
887                debug_assert!(
888                    false,
889                    "mismatching quantities, item split into category {category}, \
890                    which originally was not present"
891                );
892                continue;
893            };
894
895            if *orig_quantities >= quantities {
896                *orig_quantities -= quantities;
897            } else {
898                debug_assert!(
899                    false,
900                    "in total more items produced in category {category} than originally available"
901                );
902            }
903        }
904    }
905}
906
907impl<I, S> Iterator for Split<I, S>
908where
909    I: Iterator<Item = S>,
910    S: Counted,
911{
912    type Item = Managed<S>;
913
914    fn next(&mut self) -> Option<Self::Item> {
915        let next = match self.items.next() {
916            Some(next) => next,
917            None => {
918                self.exhausted = true;
919                return None;
920            }
921        };
922
923        #[cfg(debug_assertions)]
924        self.subtract(next.quantities());
925
926        Some(Managed::from_parts(next, Arc::clone(&self.meta)))
927    }
928}
929
930impl<I, S> Drop for Split<I, S>
931where
932    I: Iterator<Item = S>,
933    S: Counted,
934{
935    fn drop(&mut self) {
936        // If the inner iterator was exhausted, no items should be remaining.
937        #[cfg(debug_assertions)]
938        if self.exhausted {
939            for (category, quantities) in &self.quantities {
940                debug_assert!(
941                    *quantities == 0,
942                    "items split, but still {quantities} remaining in category {category}"
943                );
944            }
945        }
946
947        if self.exhausted {
948            return;
949        }
950
951        // There may be items remaining in the iterator for multiple reasons:
952        // - there was a panic
953        // - the iterator was never fully consumed
954        //
955        // In any case, outcomes must be emitted for the remaining items.
956        for item in &mut self.items {
957            for (category, quantity) in item.quantities() {
958                self.meta.track_outcome(
959                    Outcome::Invalid(DiscardReason::Internal),
960                    category,
961                    quantity,
962                );
963            }
964        }
965    }
966}
967
968impl<I, S> FusedIterator for Split<I, S>
969where
970    I: Iterator<Item = S> + FusedIterator,
971    S: Counted,
972{
973}
974
975#[cfg(test)]
976mod tests {
977    use super::*;
978
979    struct CountedVec(Vec<u32>);
980
981    impl Counted for CountedVec {
982        fn quantities(&self) -> Quantities {
983            smallvec::smallvec![(DataCategory::Error, self.0.len())]
984        }
985    }
986
987    struct CountedValue(u32);
988
989    impl Counted for CountedValue {
990        fn quantities(&self) -> Quantities {
991            smallvec::smallvec![(DataCategory::Error, 1)]
992        }
993    }
994
995    #[test]
996    fn test_split_fully_consumed() {
997        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
998        let (managed, mut handle) = Managed::for_test(value).build();
999
1000        let s = managed
1001            .split(|value| value.0.into_iter().map(CountedValue))
1002            // Fully consume the iterator to make sure there aren't any outcomes emitted on drop.
1003            .collect::<Vec<_>>();
1004
1005        handle.assert_no_outcomes();
1006
1007        for (i, s) in s.into_iter().enumerate() {
1008            assert_eq!(s.as_ref().0, i as u32);
1009            let outcome = Outcome::Invalid(DiscardReason::Cors);
1010            let _ = s.reject_err((outcome.clone(), ()));
1011            handle.assert_outcome(&outcome, DataCategory::Error, 1);
1012        }
1013    }
1014
1015    #[test]
1016    fn test_split_partially_consumed_emits_remaining() {
1017        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1018        let (managed, mut handle) = Managed::for_test(value).build();
1019
1020        let mut s = managed.split(|value| value.0.into_iter().map(CountedValue));
1021        handle.assert_no_outcomes();
1022
1023        drop(s.next());
1024        handle.assert_internal_outcome(DataCategory::Error, 1);
1025        drop(s.next());
1026        handle.assert_internal_outcome(DataCategory::Error, 1);
1027        drop(s.next());
1028        handle.assert_internal_outcome(DataCategory::Error, 1);
1029        handle.assert_no_outcomes();
1030
1031        drop(s);
1032
1033        handle.assert_internal_outcome(DataCategory::Error, 1);
1034        handle.assert_internal_outcome(DataCategory::Error, 1);
1035        handle.assert_internal_outcome(DataCategory::Error, 1);
1036    }
1037
1038    #[test]
1039    fn test_split_changing_quantities_should_panic() {
1040        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1041        let (managed, mut handle) = Managed::for_test(value).build();
1042
1043        let mut s = managed.split(|_| std::iter::once(CountedValue(0)));
1044
1045        s.next().unwrap().accept(|_| {});
1046        handle.assert_no_outcomes();
1047
1048        assert!(s.next().is_none());
1049
1050        let r = std::panic::catch_unwind(move || {
1051            drop(s);
1052        });
1053
1054        assert!(
1055            r.is_err(),
1056            "expected split to panic because of mismatched (not enough) outcomes"
1057        );
1058    }
1059
1060    #[test]
1061    fn test_split_more_outcomes_than_before_should_panic() {
1062        let value = CountedVec(vec![0]);
1063        let (managed, mut handle) = Managed::for_test(value).build();
1064
1065        let mut s = managed.split(|_| vec![CountedValue(0), CountedValue(2)].into_iter());
1066
1067        s.next().unwrap().accept(|_| {});
1068        handle.assert_no_outcomes();
1069
1070        let r = std::panic::catch_unwind(move || {
1071            s.next();
1072        });
1073
1074        assert!(
1075            r.is_err(),
1076            "expected split to panic because of mismatched (too many) outcomes"
1077        );
1078    }
1079
1080    #[test]
1081    fn test_split_changing_categories_should_panic() {
1082        struct Special;
1083        impl Counted for Special {
1084            fn quantities(&self) -> Quantities {
1085                smallvec::smallvec![(DataCategory::Error, 1), (DataCategory::Transaction, 1)]
1086            }
1087        }
1088
1089        let value = CountedVec(vec![0]);
1090        let (managed, _handle) = Managed::for_test(value).build();
1091
1092        let mut s = managed.split(|value| value.0.into_iter().map(|_| Special));
1093
1094        let r = std::panic::catch_unwind(move || {
1095            let _ = s.next();
1096        });
1097
1098        assert!(
1099            r.is_err(),
1100            "expected split to panic because of mismatched outcome categories"
1101        );
1102    }
1103
1104    #[test]
1105    fn test_split_assert_fused() {
1106        fn only_fused<T: FusedIterator>(_: T) {}
1107
1108        let (managed, mut handle) = Managed::for_test(CountedVec(vec![0])).build();
1109        only_fused(managed.split(|value| value.0.into_iter().map(CountedValue)));
1110        handle.assert_internal_outcome(DataCategory::Error, 1);
1111    }
1112}