relay_server/managed/
managed.rs

1#[cfg(debug_assertions)]
2use std::collections::BTreeMap;
3use std::convert::Infallible;
4use std::fmt;
5use std::iter::FusedIterator;
6use std::mem::ManuallyDrop;
7use std::net::IpAddr;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use chrono::{DateTime, Utc};
12use relay_event_schema::protocol::EventId;
13use relay_quotas::{DataCategory, Scoping};
14use relay_system::Addr;
15use smallvec::SmallVec;
16
17use crate::Envelope;
18use crate::managed::{Counted, ManagedEnvelope, Quantities};
19use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
20use crate::services::processor::ProcessingError;
21
22#[cfg(debug_assertions)]
23mod debug;
24#[cfg(test)]
25mod test;
26
27#[cfg(test)]
28pub use self::test::*;
29
30/// An error which can be extracted into an outcome.
31pub trait OutcomeError {
32    /// Produced error, without attached outcome.
33    type Error;
34
35    /// Consumes the error and returns an outcome and [`Self::Error`].
36    ///
37    /// Returning a `None` outcome should discard the item(s) silently.
38    fn consume(self) -> (Option<Outcome>, Self::Error);
39}
40
41impl OutcomeError for Outcome {
42    type Error = ();
43
44    fn consume(self) -> (Option<Outcome>, Self::Error) {
45        (self, ()).consume()
46    }
47}
48
49impl<E> OutcomeError for (Outcome, E) {
50    type Error = E;
51
52    fn consume(self) -> (Option<Outcome>, Self::Error) {
53        (Some(self.0), self.1)
54    }
55}
56
57impl<E> OutcomeError for (Option<Outcome>, E) {
58    type Error = E;
59
60    fn consume(self) -> (Option<Outcome>, Self::Error) {
61        self
62    }
63}
64
65impl OutcomeError for ProcessingError {
66    type Error = Self;
67
68    fn consume(self) -> (Option<Outcome>, Self::Error) {
69        (self.to_outcome(), self)
70    }
71}
72
73impl OutcomeError for Infallible {
74    type Error = Self;
75
76    fn consume(self) -> (Option<Outcome>, Self::Error) {
77        match self {}
78    }
79}
80
81/// A wrapper type which ensures outcomes have been emitted for an error.
82///
83/// [`Managed`] wraps an error in [`Rejected`] once outcomes for have been emitted for the managed
84/// item.
85#[derive(Debug, Clone, Copy)]
86#[must_use = "a rejection must be propagated"]
87pub struct Rejected<T>(T);
88
89impl<T> Rejected<T> {
90    /// Extracts the underlying error.
91    pub fn into_inner(self) -> T {
92        self.0
93    }
94
95    /// Maps the rejected error to a different error.
96    pub fn map<F, S>(self, f: F) -> Rejected<S>
97    where
98        F: FnOnce(T) -> S,
99    {
100        Rejected(f(self.0))
101    }
102}
103
104/// The [`Managed`] wrapper ensures outcomes are correctly emitted for the contained item.
105pub struct Managed<T: Counted> {
106    value: T,
107    meta: Arc<Meta>,
108    done: AtomicBool,
109}
110
111impl<T: Counted> Managed<T> {
112    /// Creates a new managed instance with a `value` from a [`ManagedEnvelope`].
113    ///
114    /// The [`Managed`] instance, inherits all metadata from the passed [`ManagedEnvelope`],
115    /// like received time or scoping.
116    pub fn from_envelope(envelope: &ManagedEnvelope, value: T) -> Self {
117        Self::from_parts(
118            value,
119            Arc::new(Meta {
120                outcome_aggregator: envelope.outcome_aggregator().clone(),
121                received_at: envelope.received_at(),
122                scoping: envelope.scoping(),
123                event_id: envelope.envelope().event_id(),
124                remote_addr: envelope.meta().remote_addr(),
125            }),
126        )
127    }
128
129    /// Creates another [`Managed`] instance, with a new value but shared metadata.
130    pub fn wrap<S>(&self, other: S) -> Managed<S>
131    where
132        S: Counted,
133    {
134        Managed::from_parts(other, Arc::clone(&self.meta))
135    }
136
137    /// Original received timestamp.
138    pub fn received_at(&self) -> DateTime<Utc> {
139        self.meta.received_at
140    }
141
142    /// Scoping information stored in this context.
143    pub fn scoping(&self) -> Scoping {
144        self.meta.scoping
145    }
146
147    /// Splits [`Self`] into two other [`Managed`] items.
148    ///
149    /// The two resulting managed instances together are expected to have the same outcomes as the original instance..
150    /// Since splitting may introduce a new type of item, which some of the original
151    /// quantities are transferred to, there may be new additional data categories created.
152    pub fn split_once<F, S, U>(self, f: F) -> (Managed<S>, Managed<U>)
153    where
154        F: FnOnce(T) -> (S, U),
155        S: Counted,
156        U: Counted,
157    {
158        debug_assert!(!self.is_done());
159
160        let (value, meta) = self.destructure();
161        #[cfg(debug_assertions)]
162        let quantities = value.quantities();
163
164        let (a, b) = f(value);
165
166        #[cfg(debug_assertions)]
167        debug::Quantities::from(&quantities)
168            // Instead of `assert_only_extra`, used for extracted metrics also counting
169            // in the `metric bucket` category, it may make sense to give the mapping function
170            // control over which categories to ignore, similar to the record keeper's lenient
171            // method.
172            .assert_only_extra(debug::Quantities::from(&a) + debug::Quantities::from(&b));
173
174        (
175            Managed::from_parts(a, Arc::clone(&meta)),
176            Managed::from_parts(b, meta),
177        )
178    }
179
180    /// Splits [`Self`] into a variable amount if individual items.
181    ///
182    /// Useful when the current instance contains multiple items of the same type
183    /// and must be split into individually managed items.
184    pub fn split<F, I, S>(self, f: F) -> Split<I::IntoIter, I::Item>
185    where
186        F: FnOnce(T) -> I,
187        I: IntoIterator<Item = S>,
188        S: Counted,
189    {
190        self.split_with_context(|value| (f(value), ())).0
191    }
192
193    /// Splits [`Self`] into a variable amount if individual items.
194    ///
195    /// Like [`Self::split`] but also allows returning an untracked context,
196    /// a way of returning additional data when deconstructing the original item.
197    pub fn split_with_context<F, I, S, C>(self, f: F) -> (Split<I::IntoIter, I::Item>, C)
198    where
199        F: FnOnce(T) -> (I, C),
200        I: IntoIterator<Item = S>,
201        S: Counted,
202    {
203        debug_assert!(!self.is_done());
204
205        let (value, meta) = self.destructure();
206        #[cfg(debug_assertions)]
207        let quantities = value.quantities();
208
209        let (items, context) = f(value);
210
211        (
212            Split {
213                #[cfg(debug_assertions)]
214                quantities,
215                items: items.into_iter(),
216                meta,
217                exhausted: false,
218            },
219            context,
220        )
221    }
222
223    /// Filters individual items and emits outcomes for them if they are removed.
224    ///
225    /// This is particularly useful when the managed instance is a container of individual items,
226    /// which need to be processed or filtered on a case by case basis.
227    ///
228    /// # Examples:
229    ///
230    /// ```
231    /// # use relay_server::managed::{Counted, Managed, Quantities};
232    /// # #[derive(Copy, Clone)]
233    /// # struct Context<'a>(&'a u32);
234    /// # struct Item;
235    /// struct Items {
236    ///     items: Vec<Item>,
237    /// }
238    /// # impl Counted for Items {
239    /// #   fn quantities(&self) -> Quantities {
240    /// #       todo!()
241    /// #   }
242    /// # }
243    /// # impl Counted for Item {
244    /// #   fn quantities(&self) -> Quantities {
245    /// #       todo!()
246    /// #   }
247    /// # }
248    /// # type Error = std::convert::Infallible;
249    ///
250    /// fn process_items(items: &mut Managed<Items>, ctx: Context<'_>) {
251    ///     items.retain(|items| &mut items.items, |item, _| process(item, ctx));
252    /// }
253    ///
254    /// fn process(item: &mut Item, ctx: Context<'_>) -> Result<(), Error> {
255    ///     todo!()
256    /// }
257    /// ```
258    pub fn retain<S, I, U, E>(&mut self, select: S, mut retain: U)
259    where
260        S: FnOnce(&mut T) -> &mut Vec<I>,
261        I: Counted,
262        U: FnMut(&mut I, &mut RecordKeeper<'_>) -> Result<(), E>,
263        E: OutcomeError,
264    {
265        self.retain_with_context(
266            |inner| (select(inner), &()),
267            |item, _, records| retain(item, records),
268        );
269    }
270
271    /// Filters individual items and emits outcomes for them if they are removed.
272    ///
273    /// Like [`Self::retain`], but it allows for an additional context extracted from the managed
274    /// object passed to the retain function.
275    ///
276    /// # Examples:
277    ///
278    /// ```
279    /// # use relay_server::managed::{Counted, Managed, Quantities};
280    /// # #[derive(Copy, Clone)]
281    /// # struct Context<'a>(&'a u32);
282    /// # struct Item;
283    /// struct Items {
284    ///     ty: String,
285    ///     items: Vec<Item>,
286    /// }
287    /// # impl Counted for Items {
288    /// #   fn quantities(&self) -> Quantities {
289    /// #       todo!()
290    /// #   }
291    /// # }
292    /// # impl Counted for Item {
293    /// #   fn quantities(&self) -> Quantities {
294    /// #       todo!()
295    /// #   }
296    /// # }
297    /// # type Error = std::convert::Infallible;
298    ///
299    /// fn process_items(items: &mut Managed<Items>, ctx: Context<'_>) {
300    ///     items.retain_with_context(|items| (&mut items.items, &items.ty), |item, ty, _| process(item, ty, ctx));
301    /// }
302    ///
303    /// fn process(item: &mut Item, ty: &str, ctx: Context<'_>) -> Result<(), Error> {
304    ///     todo!()
305    /// }
306    /// ```
307    pub fn retain_with_context<S, C, I, U, E>(&mut self, select: S, mut retain: U)
308    where
309        // Returning `&'a C` here is not optimal, ideally we return C here and express the correct
310        // bound of `C: 'a` but this is, to my knowledge, currently not possible to express in stable Rust.
311        //
312        // This is unfortunately a bit limiting but for most of our purposes it is enough.
313        for<'a> S: FnOnce(&'a mut T) -> (&'a mut Vec<I>, &'a C),
314        I: Counted,
315        U: FnMut(&mut I, &C, &mut RecordKeeper<'_>) -> Result<(), E>,
316        E: OutcomeError,
317    {
318        self.modify(|inner, records| {
319            let (items, ctx) = select(inner);
320            items.retain_mut(|item| match retain(item, ctx, records) {
321                Ok(()) => true,
322                Err(err) => {
323                    records.reject_err(err, &*item);
324                    false
325                }
326            })
327        });
328    }
329
330    /// Maps a [`Managed<T>`] to [`Managed<S>`] by applying the mapping function `f`.
331    ///
332    /// Like [`Self::try_map`] but not fallible.
333    pub fn map<S, F>(self, f: F) -> Managed<S>
334    where
335        F: FnOnce(T, &mut RecordKeeper) -> S,
336        S: Counted,
337    {
338        self.try_map(move |inner, records| Ok::<_, Infallible>(f(inner, records)))
339            .unwrap_or_else(|e| match e.0 {})
340    }
341
342    /// Maps a [`Managed<T>`] to [`Managed<S>`] by applying the mapping function `f`.
343    ///
344    /// The mapping function gets access to a [`RecordKeeper`], to emit outcomes for partial
345    /// discards.
346    ///
347    /// If the mapping function returns an error, the entire (original) [`Self`] is rejected,
348    /// no partial outcomes are emitted.
349    pub fn try_map<S, F, E>(self, f: F) -> Result<Managed<S>, Rejected<E::Error>>
350    where
351        F: FnOnce(T, &mut RecordKeeper) -> Result<S, E>,
352        S: Counted,
353        E: OutcomeError,
354    {
355        debug_assert!(!self.is_done());
356
357        let (value, meta) = self.destructure();
358        let quantities = value.quantities();
359
360        let mut records = RecordKeeper::new(&meta, quantities);
361
362        match f(value, &mut records) {
363            Ok(value) => {
364                records.success(value.quantities());
365                Ok(Managed::from_parts(value, meta))
366            }
367            Err(err) => Err(records.failure(err)),
368        }
369    }
370
371    /// Gives mutable access to the contained value to modify it.
372    ///
373    /// Like [`Self::try_modify`] but not fallible.
374    pub fn modify<F>(&mut self, f: F)
375    where
376        F: FnOnce(&mut T, &mut RecordKeeper),
377    {
378        self.try_modify(move |inner, records| {
379            f(inner, records);
380            Ok::<_, Infallible>(())
381        })
382        .unwrap_or_else(|e| match e {})
383    }
384
385    /// Gives mutable access to the contained value to modify it.
386    ///
387    /// The modifying function gets access to a [`RecordKeeper`], to emit outcomes for partial
388    /// discards.
389    ///
390    /// If the modifying function returns an error, the entire (original) [`Self`] is rejected,
391    /// no partial outcomes are emitted.
392    pub fn try_modify<F, E>(&mut self, f: F) -> Result<(), Rejected<E::Error>>
393    where
394        F: FnOnce(&mut T, &mut RecordKeeper) -> Result<(), E>,
395        E: OutcomeError,
396    {
397        debug_assert!(!self.is_done());
398
399        let quantities = self.value.quantities();
400        let mut records = RecordKeeper::new(&self.meta, quantities);
401
402        match f(&mut self.value, &mut records) {
403            Ok(()) => {
404                records.success(self.value.quantities());
405                Ok(())
406            }
407            Err(err) => {
408                let err = records.failure(err);
409                self.done.store(true, Ordering::Relaxed);
410                Err(err)
411            }
412        }
413    }
414
415    /// Accepts the item of this managed instance.
416    ///
417    /// This should be called if the item has been or is about to be accepted by the upstream, which means that
418    /// the responsibility for logging outcomes has been moved. This function will not log any
419    /// outcomes.
420    ///
421    /// Like [`Self::try_accept`], but infallible.
422    pub fn accept<F, S>(self, f: F) -> S
423    where
424        F: FnOnce(T) -> S,
425    {
426        self.try_accept(|item| Ok::<_, Infallible>(f(item)))
427            .unwrap_or_else(|err| match err.0 {})
428    }
429
430    /// Accepts the item of this managed instance.
431    ///
432    /// This should be called if the item has been or is about to be accepted by the upstream.
433    ///
434    /// Outcomes are only emitted when the accepting closure returns an error, which means that
435    /// in the success case the responsibility for logging outcomes has been moved to the
436    /// caller/upstream.
437    pub fn try_accept<F, S, E>(self, f: F) -> Result<S, Rejected<E::Error>>
438    where
439        F: FnOnce(T) -> Result<S, E>,
440        E: OutcomeError,
441    {
442        debug_assert!(!self.is_done());
443
444        let (value, meta) = self.destructure();
445        let records = RecordKeeper::new(&meta, value.quantities());
446
447        match f(value) {
448            Ok(value) => {
449                records.accept();
450                Ok(value)
451            }
452            Err(err) => Err(records.failure(err)),
453        }
454    }
455
456    /// Rejects the entire [`Managed`] instance with an internal error.
457    ///
458    /// Internal errors should be reserved for uses where logical invariants are violated.
459    /// Cases which should never happen and always indicate a logical bug.
460    ///
461    /// This function will panic in debug builds, but discard the item
462    /// with an internal discard reason in release builds.
463    #[track_caller]
464    pub fn internal_error(&self, reason: &'static str) -> Rejected<()> {
465        relay_log::error!("internal error: {reason}");
466        debug_assert!(false, "internal error: {reason}");
467        self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
468    }
469
470    /// Rejects the entire [`Managed`] instance.
471    pub fn reject_err<E>(&self, error: E) -> Rejected<E::Error>
472    where
473        E: OutcomeError,
474    {
475        debug_assert!(!self.is_done());
476
477        let (outcome, error) = error.consume();
478        if let Some(outcome) = outcome {
479            self.do_reject(outcome);
480        }
481        Rejected(error)
482    }
483
484    fn do_reject(&self, outcome: Outcome) {
485        if !self.done.fetch_or(true, Ordering::Relaxed) {
486            for (category, quantity) in self.value.quantities() {
487                self.meta.track_outcome(outcome.clone(), category, quantity);
488            }
489        }
490    }
491
492    /// De-structures this managed instance into its own parts.
493    ///
494    /// While de-structured no outcomes will be emitted on drop.
495    fn destructure(self) -> (T, Arc<Meta>) {
496        // SAFETY: this follows an approach mentioned in the RFC
497        // <https://github.com/rust-lang/rfcs/pull/3466> to move fields out of
498        // a type with a drop implementation.
499        //
500        // The original type is wrapped in a manual drop to prevent running the
501        // drop handler, afterwards all fields are moved out of the type.
502        //
503        // And the original type is forgotten, de-structuring the original type
504        // without running its drop implementation.
505        let this = ManuallyDrop::new(self);
506        let value = unsafe { std::ptr::read(&this.value) };
507        let meta = unsafe { std::ptr::read(&this.meta) };
508        (value, meta)
509    }
510
511    fn from_parts(value: T, meta: Arc<Meta>) -> Self {
512        Self {
513            value,
514            meta,
515            done: AtomicBool::new(false),
516        }
517    }
518
519    fn is_done(&self) -> bool {
520        self.done.load(Ordering::Relaxed)
521    }
522}
523
524impl<T: Counted> Drop for Managed<T> {
525    fn drop(&mut self) {
526        self.do_reject(Outcome::Invalid(DiscardReason::Internal));
527    }
528}
529
530impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
531    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
532        write!(f, "Managed<{}>[", std::any::type_name::<T>())?;
533        for (i, (category, quantity)) in self.value.quantities().iter().enumerate() {
534            if i > 0 {
535                write!(f, ",")?;
536            }
537            write!(f, "{category}:{quantity}")?;
538        }
539        write!(f, "](")?;
540        self.value.fmt(f)?;
541        write!(f, ")")
542    }
543}
544
545impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
546    fn from(value: Managed<Box<Envelope>>) -> Self {
547        let (value, meta) = value.destructure();
548        let mut envelope = ManagedEnvelope::new(value, meta.outcome_aggregator.clone());
549        envelope.scope(meta.scoping);
550        envelope
551    }
552}
553
554impl<T: Counted> AsRef<T> for Managed<T> {
555    fn as_ref(&self) -> &T {
556        &self.value
557    }
558}
559
560impl<T: Counted> std::ops::Deref for Managed<T> {
561    type Target = T;
562
563    fn deref(&self) -> &Self::Target {
564        &self.value
565    }
566}
567
568/// Internal metadata attached with a [`Managed`] instance.
569struct Meta {
570    /// Outcome aggregator service.
571    outcome_aggregator: Addr<TrackOutcome>,
572    /// Received timestamp, when the contained payload/information was received.
573    ///
574    /// See also: [`crate::extractors::RequestMeta::received_at`].
575    received_at: DateTime<Utc>,
576    /// Data scoping information of the contained item.
577    scoping: Scoping,
578    /// Optional event id associated with the contained data.
579    event_id: Option<EventId>,
580    /// Optional remote addr from where the data was received from.
581    remote_addr: Option<IpAddr>,
582}
583
584impl Meta {
585    pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
586        self.outcome_aggregator.send(TrackOutcome {
587            timestamp: self.received_at,
588            scoping: self.scoping,
589            outcome,
590            event_id: self.event_id,
591            remote_addr: self.remote_addr,
592            category,
593            quantity: quantity.try_into().unwrap_or(u32::MAX),
594        });
595    }
596}
597
598/// A record keeper makes sure modifications done on a [`Managed`] item are all accounted for
599/// correctly.
600pub struct RecordKeeper<'a> {
601    meta: &'a Meta,
602    on_drop: Quantities,
603    #[cfg(debug_assertions)]
604    lenient: SmallVec<[DataCategory; 1]>,
605    #[cfg(debug_assertions)]
606    modifications: BTreeMap<DataCategory, isize>,
607    in_flight: SmallVec<[(DataCategory, usize, Option<Outcome>); 2]>,
608}
609
610impl<'a> RecordKeeper<'a> {
611    fn new(meta: &'a Meta, quantities: Quantities) -> Self {
612        Self {
613            meta,
614            on_drop: quantities,
615            #[cfg(debug_assertions)]
616            lenient: Default::default(),
617            #[cfg(debug_assertions)]
618            modifications: Default::default(),
619            in_flight: Default::default(),
620        }
621    }
622
623    /// Marking a data category as lenient exempts this category from outcome quantity validations.
624    ///
625    /// Consider using [`Self::modify_by`] instead.
626    ///
627    /// This can be used in cases where the quantity is knowingly modified, which is quite common
628    /// for data categories which count bytes.
629    pub fn lenient(&mut self, category: DataCategory) {
630        let _category = category;
631        #[cfg(debug_assertions)]
632        self.lenient.push(_category);
633    }
634
635    /// Modifies the expected count for a category.
636    ///
637    /// When extracting payloads category counts may expectedly change, these changes can be
638    /// tracked using this function.
639    ///
640    /// Prefer using [`Self::modify_by`] over [`Self::lenient`] as lenient completely disables
641    /// validation for the entire category.
642    pub fn modify_by(&mut self, category: DataCategory, offset: isize) {
643        let _category = category;
644        let _offset = offset;
645        #[cfg(debug_assertions)]
646        {
647            *self.modifications.entry(_category).or_default() += offset;
648        }
649    }
650
651    /// Finalizes all records and emits the necessary outcomes.
652    ///
653    /// This uses the quantities of the original item.
654    fn failure<E>(mut self, error: E) -> Rejected<E::Error>
655    where
656        E: OutcomeError,
657    {
658        let (outcome, error) = error.consume();
659
660        if let Some(outcome) = outcome {
661            for (category, quantity) in std::mem::take(&mut self.on_drop) {
662                self.meta.track_outcome(outcome.clone(), category, quantity);
663            }
664        }
665
666        Rejected(error)
667    }
668
669    /// Finalizes all records and asserts that no additional outcomes have been tracked.
670    ///
671    /// Unlike [`Self::success`], this method does not allow for intermediate or partial outcomes,
672    /// it also does not verify any outcomes.
673    ///
674    /// This method is useful for using the record keeper to track failure outcomes, either
675    /// explicit failures or panics.
676    fn accept(mut self) {
677        debug_assert!(
678            self.in_flight.is_empty(),
679            "records accepted, but intermediate outcomes tracked"
680        );
681        self.on_drop.clear();
682    }
683
684    /// Finalizes all records and emits the created outcomes.
685    ///
686    /// This only emits the outcomes that have been explicitly registered.
687    /// In a debug build, the function also ensure no outcomes have been missed by comparing
688    /// quantities of the item before and after.
689    fn success(mut self, new: Quantities) {
690        let original = std::mem::take(&mut self.on_drop);
691        self.assert_quantities(original, new);
692
693        self.on_drop.clear();
694        for (category, quantity, outcome) in std::mem::take(&mut self.in_flight) {
695            if let Some(outcome) = outcome {
696                self.meta.track_outcome(outcome, category, quantity);
697            }
698        }
699    }
700
701    /// Asserts that there have been no quantities lost.
702    ///
703    /// The original amount of quantities should match the new amount of quantities + all emitted
704    /// outcomes.
705    #[cfg(debug_assertions)]
706    fn assert_quantities(&self, original: Quantities, new: Quantities) {
707        macro_rules! emit {
708            ($category:expr, $($tt:tt)*) => {{
709                match self.lenient.contains(&$category) {
710                    // Certain categories are known to be not always correct,
711                    // they are logged instead.
712                    true => relay_log::debug!($($tt)*),
713                    false  => {
714                        relay_log::error!("Original: {original:?}");
715                        relay_log::error!("New: {new:?}");
716                        relay_log::error!("Modifications: {:?}", self.modifications);
717                        relay_log::error!("In Flight: {:?}", self.in_flight);
718                        panic!($($tt)*)
719                    }
720                }
721            }};
722        }
723
724        let mut sums = debug::Quantities::from(&original).0;
725        for (category, offset) in &self.modifications {
726            let v = sums.entry(*category).or_default();
727            match v.checked_add_signed(*offset) {
728                Some(result) => *v = result,
729                None => emit!(
730                    category,
731                    "Attempted to modify original quantity {v} into the negative ({offset})"
732                ),
733            }
734        }
735
736        for (category, quantity, outcome) in &self.in_flight {
737            match sums.get_mut(category) {
738                Some(c) if *c >= *quantity => *c -= *quantity,
739                Some(c) => emit!(
740                    category,
741                    "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there were only {c} items in the category originally"
742                ),
743                None => emit!(
744                    category,
745                    "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there never was an item in this category"
746                ),
747            }
748        }
749
750        for (category, quantity) in &new {
751            match sums.get_mut(category) {
752                Some(c) if *c >= *quantity => *c -= *quantity,
753                Some(c) => emit!(
754                    category,
755                    "New item has {quantity} items in category '{category}', but original (after emitted outcomes) only has {c} left"
756                ),
757                None => emit!(
758                    category,
759                    "New item has {quantity} items in category '{category}', but after emitted outcomes there are none left"
760                ),
761            }
762        }
763
764        for (category, quantity) in sums {
765            if quantity > 0 {
766                emit!(
767                    category,
768                    "Missing outcomes or mismatched quantity in category '{category}', off by {quantity}"
769                );
770            }
771        }
772    }
773
774    #[cfg(not(debug_assertions))]
775    fn assert_quantities(&self, _: Quantities, _: Quantities) {}
776}
777
778impl<'a> Drop for RecordKeeper<'a> {
779    fn drop(&mut self) {
780        for (category, quantity) in std::mem::take(&mut self.on_drop) {
781            self.meta.track_outcome(
782                Outcome::Invalid(DiscardReason::Internal),
783                category,
784                quantity,
785            );
786        }
787    }
788}
789
790impl RecordKeeper<'_> {
791    /// Rejects an item if the passed result is an error and returns a default value.
792    ///
793    /// Similar to [`Self::reject_err`], this emits the necessary outcomes for an
794    /// item, if there is an error.
795    pub fn or_default<T, E, Q>(&mut self, r: Result<T, E>, q: Q) -> T
796    where
797        T: Default,
798        E: OutcomeError,
799        Q: Counted,
800    {
801        match r {
802            Ok(result) => result,
803            Err(err) => {
804                self.reject_err(err, q);
805                T::default()
806            }
807        }
808    }
809
810    /// Rejects an item with an error.
811    ///
812    /// Makes sure the correct outcomes are tracked for the item, that is discarded due to an
813    /// error.
814    pub fn reject_err<E, Q>(&mut self, err: E, q: Q) -> E::Error
815    where
816        E: OutcomeError,
817        Q: Counted,
818    {
819        let (outcome, err) = err.consume();
820        for (category, quantity) in q.quantities() {
821            self.in_flight.push((category, quantity, outcome.clone()))
822        }
823        err
824    }
825
826    /// Rejects an item with an internal error.
827    ///
828    /// See also: [`Managed::internal_error`].
829    #[track_caller]
830    pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
831    where
832        E: std::error::Error + 'static,
833        Q: Counted,
834    {
835        relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
836        debug_assert!(false, "internal error: {error}");
837        self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
838    }
839}
840
841/// Iterator returned by [`Managed::split`].
842pub struct Split<I, S>
843where
844    I: Iterator<Item = S>,
845    S: Counted,
846{
847    #[cfg(debug_assertions)]
848    quantities: Quantities,
849    items: I,
850    meta: Arc<Meta>,
851    exhausted: bool,
852}
853
854impl<I, S> Split<I, S>
855where
856    I: Iterator<Item = S>,
857    S: Counted,
858{
859    /// Subtracts passed quantities from the total quantities to verify total quantity counts are
860    /// matching.
861    #[cfg(debug_assertions)]
862    fn subtract(&mut self, q: Quantities) {
863        for (category, quantities) in q {
864            let Some(orig_quantities) = self
865                .quantities
866                .iter_mut()
867                .find_map(|(c, q)| (*c == category).then_some(q))
868            else {
869                debug_assert!(
870                    false,
871                    "mismatching quantities, item split into category {category}, \
872                    which originally was not present"
873                );
874                continue;
875            };
876
877            if *orig_quantities >= quantities {
878                *orig_quantities -= quantities;
879            } else {
880                debug_assert!(
881                    false,
882                    "in total more items produced in category {category} than originally available"
883                );
884            }
885        }
886    }
887}
888
889impl<I, S> Iterator for Split<I, S>
890where
891    I: Iterator<Item = S>,
892    S: Counted,
893{
894    type Item = Managed<S>;
895
896    fn next(&mut self) -> Option<Self::Item> {
897        let next = match self.items.next() {
898            Some(next) => next,
899            None => {
900                self.exhausted = true;
901                return None;
902            }
903        };
904
905        #[cfg(debug_assertions)]
906        self.subtract(next.quantities());
907
908        Some(Managed::from_parts(next, Arc::clone(&self.meta)))
909    }
910}
911
912impl<I, S> Drop for Split<I, S>
913where
914    I: Iterator<Item = S>,
915    S: Counted,
916{
917    fn drop(&mut self) {
918        // If the inner iterator was exhausted, no items should be remaining.
919        #[cfg(debug_assertions)]
920        if self.exhausted {
921            for (category, quantities) in &self.quantities {
922                debug_assert!(
923                    *quantities == 0,
924                    "items split, but still {quantities} remaining in category {category}"
925                );
926            }
927        }
928
929        if self.exhausted {
930            return;
931        }
932
933        // There may be items remaining in the iterator for multiple reasons:
934        // - there was a panic
935        // - the iterator was never fully consumed
936        //
937        // In any case, outcomes must be emitted for the remaining items.
938        for item in &mut self.items {
939            for (category, quantity) in item.quantities() {
940                self.meta.track_outcome(
941                    Outcome::Invalid(DiscardReason::Internal),
942                    category,
943                    quantity,
944                );
945            }
946        }
947    }
948}
949
950impl<I, S> FusedIterator for Split<I, S>
951where
952    I: Iterator<Item = S> + FusedIterator,
953    S: Counted,
954{
955}
956
957#[cfg(test)]
958mod tests {
959    use super::*;
960
961    struct CountedVec(Vec<u32>);
962
963    impl Counted for CountedVec {
964        fn quantities(&self) -> Quantities {
965            smallvec::smallvec![(DataCategory::Error, self.0.len())]
966        }
967    }
968
969    struct CountedValue(u32);
970
971    impl Counted for CountedValue {
972        fn quantities(&self) -> Quantities {
973            smallvec::smallvec![(DataCategory::Error, 1)]
974        }
975    }
976
977    #[test]
978    fn test_split_fully_consumed() {
979        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
980        let (managed, mut handle) = Managed::for_test(value).build();
981
982        let s = managed
983            .split(|value| value.0.into_iter().map(CountedValue))
984            // Fully consume the iterator to make sure there aren't any outcomes emitted on drop.
985            .collect::<Vec<_>>();
986
987        handle.assert_no_outcomes();
988
989        for (i, s) in s.into_iter().enumerate() {
990            assert_eq!(s.as_ref().0, i as u32);
991            let outcome = Outcome::Invalid(DiscardReason::Cors);
992            let _ = s.reject_err((outcome.clone(), ()));
993            handle.assert_outcome(&outcome, DataCategory::Error, 1);
994        }
995    }
996
997    #[test]
998    fn test_split_partially_consumed_emits_remaining() {
999        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1000        let (managed, mut handle) = Managed::for_test(value).build();
1001
1002        let mut s = managed.split(|value| value.0.into_iter().map(CountedValue));
1003        handle.assert_no_outcomes();
1004
1005        drop(s.next());
1006        handle.assert_internal_outcome(DataCategory::Error, 1);
1007        drop(s.next());
1008        handle.assert_internal_outcome(DataCategory::Error, 1);
1009        drop(s.next());
1010        handle.assert_internal_outcome(DataCategory::Error, 1);
1011        handle.assert_no_outcomes();
1012
1013        drop(s);
1014
1015        handle.assert_internal_outcome(DataCategory::Error, 1);
1016        handle.assert_internal_outcome(DataCategory::Error, 1);
1017        handle.assert_internal_outcome(DataCategory::Error, 1);
1018    }
1019
1020    #[test]
1021    fn test_split_changing_quantities_should_panic() {
1022        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1023        let (managed, mut handle) = Managed::for_test(value).build();
1024
1025        let mut s = managed.split(|_| std::iter::once(CountedValue(0)));
1026
1027        s.next().unwrap().accept(|_| {});
1028        handle.assert_no_outcomes();
1029
1030        assert!(s.next().is_none());
1031
1032        let r = std::panic::catch_unwind(move || {
1033            drop(s);
1034        });
1035
1036        assert!(
1037            r.is_err(),
1038            "expected split to panic because of mismatched (not enough) outcomes"
1039        );
1040    }
1041
1042    #[test]
1043    fn test_split_more_outcomes_than_before_should_panic() {
1044        let value = CountedVec(vec![0]);
1045        let (managed, mut handle) = Managed::for_test(value).build();
1046
1047        let mut s = managed.split(|_| vec![CountedValue(0), CountedValue(2)].into_iter());
1048
1049        s.next().unwrap().accept(|_| {});
1050        handle.assert_no_outcomes();
1051
1052        let r = std::panic::catch_unwind(move || {
1053            s.next();
1054        });
1055
1056        assert!(
1057            r.is_err(),
1058            "expected split to panic because of mismatched (too many) outcomes"
1059        );
1060    }
1061
1062    #[test]
1063    fn test_split_changing_categories_should_panic() {
1064        struct Special;
1065        impl Counted for Special {
1066            fn quantities(&self) -> Quantities {
1067                smallvec::smallvec![(DataCategory::Error, 1), (DataCategory::Transaction, 1)]
1068            }
1069        }
1070
1071        let value = CountedVec(vec![0]);
1072        let (managed, _handle) = Managed::for_test(value).build();
1073
1074        let mut s = managed.split(|value| value.0.into_iter().map(|_| Special));
1075
1076        let r = std::panic::catch_unwind(move || {
1077            let _ = s.next();
1078        });
1079
1080        assert!(
1081            r.is_err(),
1082            "expected split to panic because of mismatched outcome categories"
1083        );
1084    }
1085
1086    #[test]
1087    fn test_split_assert_fused() {
1088        fn only_fused<T: FusedIterator>(_: T) {}
1089
1090        let (managed, mut handle) = Managed::for_test(CountedVec(vec![0])).build();
1091        only_fused(managed.split(|value| value.0.into_iter().map(CountedValue)));
1092        handle.assert_internal_outcome(DataCategory::Error, 1);
1093    }
1094}