relay_server/managed/
managed.rs

1#[cfg(debug_assertions)]
2use std::collections::BTreeMap;
3use std::convert::Infallible;
4use std::fmt;
5use std::iter::FusedIterator;
6use std::mem::ManuallyDrop;
7use std::net::IpAddr;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use chrono::{DateTime, Utc};
12use itertools::Either;
13use relay_event_schema::protocol::EventId;
14use relay_quotas::{DataCategory, Scoping};
15use relay_system::Addr;
16use smallvec::SmallVec;
17
18use crate::Envelope;
19use crate::managed::{Counted, ManagedEnvelope, Quantities};
20use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
21use crate::services::processor::ProcessingError;
22
23#[cfg(debug_assertions)]
24mod debug;
25#[cfg(test)]
26mod test;
27
28#[cfg(test)]
29pub use self::test::*;
30
31/// An error which can be extracted into an outcome.
32pub trait OutcomeError {
33    /// Produced error, without attached outcome.
34    type Error;
35
36    /// Consumes the error and returns an outcome and [`Self::Error`].
37    ///
38    /// Returning a `None` outcome should discard the item(s) silently.
39    fn consume(self) -> (Option<Outcome>, Self::Error);
40}
41
42impl OutcomeError for Outcome {
43    type Error = ();
44
45    fn consume(self) -> (Option<Outcome>, Self::Error) {
46        (self, ()).consume()
47    }
48}
49
50impl OutcomeError for Option<Outcome> {
51    type Error = ();
52
53    fn consume(self) -> (Option<Outcome>, Self::Error) {
54        (self, ()).consume()
55    }
56}
57
58impl<E> OutcomeError for (Outcome, E) {
59    type Error = E;
60
61    fn consume(self) -> (Option<Outcome>, Self::Error) {
62        (Some(self.0), self.1)
63    }
64}
65
66impl<E> OutcomeError for (Option<Outcome>, E) {
67    type Error = E;
68
69    fn consume(self) -> (Option<Outcome>, Self::Error) {
70        self
71    }
72}
73
74impl OutcomeError for ProcessingError {
75    type Error = Self;
76
77    fn consume(self) -> (Option<Outcome>, Self::Error) {
78        (self.to_outcome(), self)
79    }
80}
81
82impl OutcomeError for Infallible {
83    type Error = Self;
84
85    fn consume(self) -> (Option<Outcome>, Self::Error) {
86        match self {}
87    }
88}
89
90/// A wrapper type which ensures outcomes have been emitted for an error.
91///
92/// [`Managed`] wraps an error in [`Rejected`] once outcomes for have been emitted for the managed
93/// item.
94#[derive(Debug, Clone, Copy)]
95#[must_use = "a rejection must be propagated"]
96pub struct Rejected<T>(T);
97
98impl<T> Rejected<T> {
99    /// Extracts the underlying error.
100    pub fn into_inner(self) -> T {
101        self.0
102    }
103
104    /// Maps the rejected error to a different error.
105    pub fn map<F, S>(self, f: F) -> Rejected<S>
106    where
107        F: FnOnce(T) -> S,
108    {
109        Rejected(f(self.0))
110    }
111}
112
113impl<T> std::error::Error for Rejected<T>
114where
115    T: std::error::Error,
116{
117    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
118        self.0.source()
119    }
120}
121
122impl<T> std::fmt::Display for Rejected<T>
123where
124    T: std::fmt::Display,
125{
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        self.0.fmt(f)
128    }
129}
130
131impl<T> axum::response::IntoResponse for Rejected<T>
132where
133    T: axum::response::IntoResponse,
134{
135    fn into_response(self) -> axum::response::Response {
136        self.0.into_response()
137    }
138}
139
140/// The [`Managed`] wrapper ensures outcomes are correctly emitted for the contained item.
141pub struct Managed<T: Counted> {
142    value: T,
143    meta: Arc<Meta>,
144    done: AtomicBool,
145}
146
147impl Managed<Box<Envelope>> {
148    /// Creates a managed instance from an unmanaged envelope.
149    pub fn from_envelope(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
150        let meta = Arc::new(Meta {
151            outcome_aggregator,
152            received_at: envelope.received_at(),
153            scoping: envelope.meta().get_partial_scoping().into_scoping(),
154            event_id: envelope.event_id(),
155            remote_addr: envelope.meta().remote_addr(),
156        });
157
158        Self::from_parts(envelope, meta)
159    }
160}
161
162impl<T: Counted> Managed<T> {
163    /// Creates new [`Managed`] instance with the provided `value` and metadata from a [`ManagedEnvelope`].
164    ///
165    /// The [`Managed`] instance, inherits all metadata from the passed [`ManagedEnvelope`],
166    /// like received time or scoping.
167    pub fn with_meta_from(envelope: &ManagedEnvelope, value: T) -> Self {
168        Self::from_parts(
169            value,
170            Arc::new(Meta {
171                outcome_aggregator: envelope.outcome_aggregator().clone(),
172                received_at: envelope.received_at(),
173                scoping: envelope.scoping(),
174                event_id: envelope.envelope().event_id(),
175                remote_addr: envelope.meta().remote_addr(),
176            }),
177        )
178    }
179
180    /// Creates another [`Managed`] instance, with a new value but shared metadata.
181    pub fn wrap<S>(&self, other: S) -> Managed<S>
182    where
183        S: Counted,
184    {
185        Managed::from_parts(other, Arc::clone(&self.meta))
186    }
187
188    /// Original received timestamp.
189    pub fn received_at(&self) -> DateTime<Utc> {
190        self.meta.received_at
191    }
192
193    /// Scoping information stored in this context.
194    pub fn scoping(&self) -> Scoping {
195        self.meta.scoping
196    }
197
198    /// Updates the scoping stored in this context.
199    ///
200    /// Special care has to be taken when items contained in the managed instance also store a
201    /// scoping. Such a scoping will **not** be updated.
202    ///
203    /// Conversions between `Managed<Box<Envelope>>` and `ManagedEnvelope` transfer the scoping
204    /// correctly.
205    ///
206    /// See also: [`ManagedEnvelope::scope`].
207    pub fn scope(&mut self, scoping: Scoping) {
208        let meta = Arc::make_mut(&mut self.meta);
209        meta.scoping = scoping;
210    }
211
212    /// Splits [`Self`] into two other [`Managed`] items.
213    ///
214    /// The two resulting managed instances together are expected to have the same outcomes as the original instance..
215    /// Since splitting may introduce a new type of item, which some of the original
216    /// quantities are transferred to, there may be new additional data categories created.
217    pub fn split_once<F, S, U>(self, f: F) -> (Managed<S>, Managed<U>)
218    where
219        F: FnOnce(T) -> (S, U),
220        S: Counted,
221        U: Counted,
222    {
223        debug_assert!(!self.is_done());
224
225        let (value, meta) = self.destructure();
226        #[cfg(debug_assertions)]
227        let quantities = value.quantities();
228
229        let (a, b) = f(value);
230
231        #[cfg(debug_assertions)]
232        debug::Quantities::from(&quantities)
233            // Instead of `assert_only_extra`, used for extracted metrics also counting
234            // in the `metric bucket` category, it may make sense to give the mapping function
235            // control over which categories to ignore, similar to the record keeper's lenient
236            // method.
237            .assert_only_extra(debug::Quantities::from(&a) + debug::Quantities::from(&b));
238
239        (
240            Managed::from_parts(a, Arc::clone(&meta)),
241            Managed::from_parts(b, meta),
242        )
243    }
244
245    /// Splits [`Self`] into a variable amount if individual items.
246    ///
247    /// Useful when the current instance contains multiple items of the same type
248    /// and must be split into individually managed items.
249    pub fn split<F, I, S>(self, f: F) -> Split<I::IntoIter, I::Item>
250    where
251        F: FnOnce(T) -> I,
252        I: IntoIterator<Item = S>,
253        S: Counted,
254    {
255        self.split_with_context(|value| (f(value), ())).0
256    }
257
258    /// Splits [`Self`] into a variable amount if individual items.
259    ///
260    /// Like [`Self::split`] but also allows returning an untracked context,
261    /// a way of returning additional data when deconstructing the original item.
262    pub fn split_with_context<F, I, S, C>(self, f: F) -> (Split<I::IntoIter, I::Item>, C)
263    where
264        F: FnOnce(T) -> (I, C),
265        I: IntoIterator<Item = S>,
266        S: Counted,
267    {
268        debug_assert!(!self.is_done());
269
270        let (value, meta) = self.destructure();
271        #[cfg(debug_assertions)]
272        let quantities = value.quantities();
273
274        let (items, context) = f(value);
275
276        (
277            Split {
278                #[cfg(debug_assertions)]
279                quantities,
280                items: items.into_iter(),
281                meta,
282                exhausted: false,
283            },
284            context,
285        )
286    }
287
288    /// Filters individual items and emits outcomes for them if they are removed.
289    ///
290    /// This is particularly useful when the managed instance is a container of individual items,
291    /// which need to be processed or filtered on a case by case basis.
292    ///
293    /// # Examples:
294    ///
295    /// ```
296    /// # use relay_server::managed::{Counted, Managed, Quantities};
297    /// # #[derive(Copy, Clone)]
298    /// # struct Context<'a>(&'a u32);
299    /// # struct Item;
300    /// struct Items {
301    ///     items: Vec<Item>,
302    /// }
303    /// # impl Counted for Items {
304    /// #   fn quantities(&self) -> Quantities {
305    /// #       todo!()
306    /// #   }
307    /// # }
308    /// # impl Counted for Item {
309    /// #   fn quantities(&self) -> Quantities {
310    /// #       todo!()
311    /// #   }
312    /// # }
313    /// # type Error = std::convert::Infallible;
314    ///
315    /// fn process_items(items: &mut Managed<Items>, ctx: Context<'_>) {
316    ///     items.retain(|items| &mut items.items, |item, _| process(item, ctx));
317    /// }
318    ///
319    /// fn process(item: &mut Item, ctx: Context<'_>) -> Result<(), Error> {
320    ///     todo!()
321    /// }
322    /// ```
323    pub fn retain<S, I, U, E>(&mut self, select: S, mut retain: U)
324    where
325        S: FnOnce(&mut T) -> &mut Vec<I>,
326        I: Counted,
327        U: FnMut(&mut I, &mut RecordKeeper<'_>) -> Result<(), E>,
328        E: OutcomeError,
329    {
330        self.retain_with_context(
331            |inner| (select(inner), &()),
332            |item, _, records| retain(item, records),
333        );
334    }
335
336    /// Filters individual items and emits outcomes for them if they are removed.
337    ///
338    /// Like [`Self::retain`], but it allows for an additional context extracted from the managed
339    /// object passed to the retain function.
340    ///
341    /// # Examples:
342    ///
343    /// ```
344    /// # use relay_server::managed::{Counted, Managed, Quantities};
345    /// # #[derive(Copy, Clone)]
346    /// # struct Context<'a>(&'a u32);
347    /// # struct Item;
348    /// struct Items {
349    ///     ty: String,
350    ///     items: Vec<Item>,
351    /// }
352    /// # impl Counted for Items {
353    /// #   fn quantities(&self) -> Quantities {
354    /// #       todo!()
355    /// #   }
356    /// # }
357    /// # impl Counted for Item {
358    /// #   fn quantities(&self) -> Quantities {
359    /// #       todo!()
360    /// #   }
361    /// # }
362    /// # type Error = std::convert::Infallible;
363    ///
364    /// fn process_items(items: &mut Managed<Items>, ctx: Context<'_>) {
365    ///     items.retain_with_context(|items| (&mut items.items, &items.ty), |item, ty, _| process(item, ty, ctx));
366    /// }
367    ///
368    /// fn process(item: &mut Item, ty: &str, ctx: Context<'_>) -> Result<(), Error> {
369    ///     todo!()
370    /// }
371    /// ```
372    pub fn retain_with_context<S, C, I, U, E>(&mut self, select: S, mut retain: U)
373    where
374        // Returning `&'a C` here is not optimal, ideally we return C here and express the correct
375        // bound of `C: 'a` but this is, to my knowledge, currently not possible to express in stable Rust.
376        //
377        // This is unfortunately a bit limiting but for most of our purposes it is enough.
378        for<'a> S: FnOnce(&'a mut T) -> (&'a mut Vec<I>, &'a C),
379        I: Counted,
380        U: FnMut(&mut I, &C, &mut RecordKeeper<'_>) -> Result<(), E>,
381        E: OutcomeError,
382    {
383        self.modify(|inner, records| {
384            let (items, ctx) = select(inner);
385            items.retain_mut(|item| match retain(item, ctx, records) {
386                Ok(()) => true,
387                Err(err) => {
388                    records.reject_err(err, &*item);
389                    false
390                }
391            })
392        });
393    }
394
395    /// Maps a [`Managed<T>`] to [`Managed<S>`] by applying the mapping function `f`.
396    ///
397    /// Like [`Self::try_map`] but not fallible.
398    pub fn map<S, F>(self, f: F) -> Managed<S>
399    where
400        F: FnOnce(T, &mut RecordKeeper) -> S,
401        S: Counted,
402    {
403        self.try_map(move |inner, records| Ok::<_, Infallible>(f(inner, records)))
404            .unwrap_or_else(|e| match e.0 {})
405    }
406
407    /// Maps a [`Managed<T>`] to [`Managed<S>`] by applying the mapping function `f`.
408    ///
409    /// The mapping function gets access to a [`RecordKeeper`], to emit outcomes for partial
410    /// discards.
411    ///
412    /// If the mapping function returns an error, the entire (original) [`Self`] is rejected,
413    /// no partial outcomes are emitted.
414    pub fn try_map<S, F, E>(self, f: F) -> Result<Managed<S>, Rejected<E::Error>>
415    where
416        F: FnOnce(T, &mut RecordKeeper) -> Result<S, E>,
417        S: Counted,
418        E: OutcomeError,
419    {
420        debug_assert!(!self.is_done());
421
422        let (value, meta) = self.destructure();
423        let quantities = value.quantities();
424
425        let mut records = RecordKeeper::new(&meta, quantities);
426
427        match f(value, &mut records) {
428            Ok(value) => {
429                records.success(value.quantities());
430                Ok(Managed::from_parts(value, meta))
431            }
432            Err(err) => Err(records.failure(err)),
433        }
434    }
435
436    /// Gives mutable access to the contained value to modify it.
437    ///
438    /// Like [`Self::try_modify`] but not fallible.
439    pub fn modify<F>(&mut self, f: F)
440    where
441        F: FnOnce(&mut T, &mut RecordKeeper),
442    {
443        self.try_modify(move |inner, records| {
444            f(inner, records);
445            Ok::<_, Infallible>(())
446        })
447        .unwrap_or_else(|e| match e {})
448    }
449
450    /// Gives mutable access to the contained value to modify it.
451    ///
452    /// The modifying function gets access to a [`RecordKeeper`], to emit outcomes for partial
453    /// discards.
454    ///
455    /// If the modifying function returns an error, the entire (original) [`Self`] is rejected,
456    /// no partial outcomes are emitted.
457    pub fn try_modify<F, E>(&mut self, f: F) -> Result<(), Rejected<E::Error>>
458    where
459        F: FnOnce(&mut T, &mut RecordKeeper) -> Result<(), E>,
460        E: OutcomeError,
461    {
462        debug_assert!(!self.is_done());
463
464        let quantities = self.value.quantities();
465        let mut records = RecordKeeper::new(&self.meta, quantities);
466
467        match f(&mut self.value, &mut records) {
468            Ok(()) => {
469                records.success(self.value.quantities());
470                Ok(())
471            }
472            Err(err) => {
473                let err = records.failure(err);
474                self.done.store(true, Ordering::Relaxed);
475                Err(err)
476            }
477        }
478    }
479
480    /// Accepts the item of this managed instance.
481    ///
482    /// This should be called if the item has been or is about to be accepted by the upstream, which means that
483    /// the responsibility for logging outcomes has been moved. This function will not log any
484    /// outcomes.
485    ///
486    /// Like [`Self::try_accept`], but infallible.
487    pub fn accept<F, S>(self, f: F) -> S
488    where
489        F: FnOnce(T) -> S,
490    {
491        self.try_accept(|item| Ok::<_, Infallible>(f(item)))
492            .unwrap_or_else(|err| match err.0 {})
493    }
494
495    /// Accepts the item of this managed instance.
496    ///
497    /// This should be called if the item has been or is about to be accepted by the upstream.
498    ///
499    /// Outcomes are only emitted when the accepting closure returns an error, which means that
500    /// in the success case the responsibility for logging outcomes has been moved to the
501    /// caller/upstream.
502    pub fn try_accept<F, S, E>(self, f: F) -> Result<S, Rejected<E::Error>>
503    where
504        F: FnOnce(T) -> Result<S, E>,
505        E: OutcomeError,
506    {
507        debug_assert!(!self.is_done());
508
509        let (value, meta) = self.destructure();
510        let records = RecordKeeper::new(&meta, value.quantities());
511
512        match f(value) {
513            Ok(value) => {
514                records.accept();
515                Ok(value)
516            }
517            Err(err) => Err(records.failure(err)),
518        }
519    }
520
521    /// Rejects the entire [`Managed`] instance with an internal error.
522    ///
523    /// Internal errors should be reserved for uses where logical invariants are violated.
524    /// Cases which should never happen and always indicate a logical bug.
525    ///
526    /// This function will panic in debug builds, but discard the item
527    /// with an internal discard reason in release builds.
528    #[track_caller]
529    pub fn internal_error(&self, reason: &'static str) -> Rejected<()> {
530        relay_log::error!("internal error: {reason}");
531        debug_assert!(false, "internal error: {reason}");
532        self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
533    }
534
535    /// Rejects the entire [`Managed`] instance.
536    pub fn reject_err<E>(&self, error: E) -> Rejected<E::Error>
537    where
538        E: OutcomeError,
539    {
540        debug_assert!(!self.is_done());
541
542        let (outcome, error) = error.consume();
543        self.do_reject(outcome);
544        Rejected(error)
545    }
546
547    fn do_reject(&self, outcome: Option<Outcome>) {
548        // Always set the internal state to `done`, even if there is no outcome to be emitted.
549        // All bookkeeping has been done.
550        let is_done = self.done.fetch_or(true, Ordering::Relaxed);
551
552        // No outcome to emit, we're done.
553        let Some(outcome) = outcome else {
554            return;
555        };
556
557        // Only emit outcomes if we were not yet done.
558        //
559        // Callers should guard against accidentally calling `do_reject` when the `is_done` flag is
560        // already set, but internal uses (like `Drop`) can rely on this double emission
561        // prevention.
562        if !is_done {
563            for (category, quantity) in self.value.quantities() {
564                self.meta.track_outcome(outcome.clone(), category, quantity);
565            }
566        }
567    }
568
569    /// De-structures this managed instance into its own parts.
570    ///
571    /// While de-structured no outcomes will be emitted on drop.
572    ///
573    /// Currently no `Managed`, which already has outcomes emitted, should be de-structured
574    /// as this status is lost.
575    fn destructure(self) -> (T, Arc<Meta>) {
576        // SAFETY: this follows an approach mentioned in the RFC
577        // <https://github.com/rust-lang/rfcs/pull/3466> to move fields out of
578        // a type with a drop implementation.
579        //
580        // The original type is wrapped in a manual drop to prevent running the
581        // drop handler, afterwards all fields are moved out of the type.
582        //
583        // And the original type is forgotten, de-structuring the original type
584        // without running its drop implementation.
585        let this = ManuallyDrop::new(self);
586        let Managed { value, meta, done } = &*this;
587
588        let value = unsafe { std::ptr::read(value) };
589        let meta = unsafe { std::ptr::read(meta) };
590        let done = unsafe { std::ptr::read(done) };
591        // This is a current invariant, if we ever need to change the invariant,
592        // the done status should be preserved and returned instead.
593        debug_assert!(
594            !done.load(Ordering::Relaxed),
595            "a `done` managed should never be destructured"
596        );
597
598        (value, meta)
599    }
600
601    fn from_parts(value: T, meta: Arc<Meta>) -> Self {
602        Self {
603            value,
604            meta,
605            done: AtomicBool::new(false),
606        }
607    }
608
609    fn is_done(&self) -> bool {
610        self.done.load(Ordering::Relaxed)
611    }
612}
613
614impl<T: Counted> Drop for Managed<T> {
615    fn drop(&mut self) {
616        self.do_reject(Some(Outcome::Invalid(DiscardReason::Internal)));
617    }
618}
619
620impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
621    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
622        write!(f, "Managed<{}>[", std::any::type_name::<T>())?;
623        for (i, (category, quantity)) in self.value.quantities().iter().enumerate() {
624            if i > 0 {
625                write!(f, ",")?;
626            }
627            write!(f, "{category}:{quantity}")?;
628        }
629        write!(f, "](")?;
630        self.value.fmt(f)?;
631        write!(f, ")")
632    }
633}
634
635impl<T: Counted> Managed<Option<T>> {
636    /// Turns a managed option into an optional [`Managed`].
637    pub fn transpose(self) -> Option<Managed<T>> {
638        let (o, meta) = self.destructure();
639        o.map(|t| Managed::from_parts(t, meta))
640    }
641}
642
643impl<L: Counted, R: Counted> Managed<Either<L, R>> {
644    /// Turns a managed [`Either`] into an [`Either`] of [`Managed`].
645    pub fn transpose(self) -> Either<Managed<L>, Managed<R>> {
646        let (either, meta) = self.destructure();
647        match either {
648            Either::Left(value) => Either::Left(Managed::from_parts(value, meta)),
649            Either::Right(value) => Either::Right(Managed::from_parts(value, meta)),
650        }
651    }
652}
653
654impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
655    fn from(value: Managed<Box<Envelope>>) -> Self {
656        let (value, meta) = value.destructure();
657        let mut envelope = ManagedEnvelope::new(value, meta.outcome_aggregator.clone());
658        envelope.scope(meta.scoping);
659        envelope
660    }
661}
662
663impl<T: Counted> AsRef<T> for Managed<T> {
664    fn as_ref(&self) -> &T {
665        &self.value
666    }
667}
668
669impl<T: Counted> std::ops::Deref for Managed<T> {
670    type Target = T;
671
672    fn deref(&self) -> &Self::Target {
673        &self.value
674    }
675}
676
677/// Internal metadata attached with a [`Managed`] instance.
678#[derive(Debug, Clone)]
679struct Meta {
680    /// Outcome aggregator service.
681    outcome_aggregator: Addr<TrackOutcome>,
682    /// Received timestamp, when the contained payload/information was received.
683    ///
684    /// See also: [`crate::extractors::RequestMeta::received_at`].
685    received_at: DateTime<Utc>,
686    /// Data scoping information of the contained item.
687    scoping: Scoping,
688    /// Optional event id associated with the contained data.
689    event_id: Option<EventId>,
690    /// Optional remote addr from where the data was received from.
691    remote_addr: Option<IpAddr>,
692}
693
694impl Meta {
695    pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
696        self.outcome_aggregator.send(TrackOutcome {
697            timestamp: self.received_at,
698            scoping: self.scoping,
699            outcome,
700            event_id: self.event_id,
701            remote_addr: self.remote_addr,
702            category,
703            quantity: quantity.try_into().unwrap_or(u32::MAX),
704        });
705    }
706}
707
708/// A record keeper makes sure modifications done on a [`Managed`] item are all accounted for
709/// correctly.
710pub struct RecordKeeper<'a> {
711    meta: &'a Meta,
712    on_drop: Quantities,
713    #[cfg(debug_assertions)]
714    lenient: SmallVec<[DataCategory; 1]>,
715    #[cfg(debug_assertions)]
716    modifications: BTreeMap<DataCategory, isize>,
717    in_flight: SmallVec<[(DataCategory, usize, Option<Outcome>); 2]>,
718}
719
720impl<'a> RecordKeeper<'a> {
721    fn new(meta: &'a Meta, quantities: Quantities) -> Self {
722        Self {
723            meta,
724            on_drop: quantities,
725            #[cfg(debug_assertions)]
726            lenient: Default::default(),
727            #[cfg(debug_assertions)]
728            modifications: Default::default(),
729            in_flight: Default::default(),
730        }
731    }
732
733    /// Marking a data category as lenient exempts this category from outcome quantity validations.
734    ///
735    /// Consider using [`Self::modify_by`] instead.
736    ///
737    /// This can be used in cases where the quantity is knowingly modified, which is quite common
738    /// for data categories which count bytes.
739    pub fn lenient(&mut self, category: DataCategory) {
740        let _category = category;
741        #[cfg(debug_assertions)]
742        self.lenient.push(_category);
743    }
744
745    /// Modifies the expected count for a category.
746    ///
747    /// When extracting payloads category counts may expectedly change, these changes can be
748    /// tracked using this function.
749    ///
750    /// Prefer using [`Self::modify_by`] over [`Self::lenient`] as lenient completely disables
751    /// validation for the entire category.
752    pub fn modify_by(&mut self, category: DataCategory, offset: isize) {
753        let _category = category;
754        let _offset = offset;
755        #[cfg(debug_assertions)]
756        {
757            *self.modifications.entry(_category).or_default() += offset;
758        }
759    }
760
761    /// Finalizes all records and emits the necessary outcomes.
762    ///
763    /// This uses the quantities of the original item.
764    fn failure<E>(mut self, error: E) -> Rejected<E::Error>
765    where
766        E: OutcomeError,
767    {
768        let (outcome, error) = error.consume();
769
770        if let Some(outcome) = outcome {
771            for (category, quantity) in std::mem::take(&mut self.on_drop) {
772                self.meta.track_outcome(outcome.clone(), category, quantity);
773            }
774        }
775
776        Rejected(error)
777    }
778
779    /// Finalizes all records and asserts that no additional outcomes have been tracked.
780    ///
781    /// Unlike [`Self::success`], this method does not allow for intermediate or partial outcomes,
782    /// it also does not verify any outcomes.
783    ///
784    /// This method is useful for using the record keeper to track failure outcomes, either
785    /// explicit failures or panics.
786    fn accept(mut self) {
787        debug_assert!(
788            self.in_flight.is_empty(),
789            "records accepted, but intermediate outcomes tracked"
790        );
791        self.on_drop.clear();
792    }
793
794    /// Finalizes all records and emits the created outcomes.
795    ///
796    /// This only emits the outcomes that have been explicitly registered.
797    /// In a debug build, the function also ensure no outcomes have been missed by comparing
798    /// quantities of the item before and after.
799    fn success(mut self, new: Quantities) {
800        let original = std::mem::take(&mut self.on_drop);
801        self.assert_quantities(original, new);
802
803        self.on_drop.clear();
804        for (category, quantity, outcome) in std::mem::take(&mut self.in_flight) {
805            if let Some(outcome) = outcome {
806                self.meta.track_outcome(outcome, category, quantity);
807            }
808        }
809    }
810
811    /// Asserts that there have been no quantities lost.
812    ///
813    /// The original amount of quantities should match the new amount of quantities + all emitted
814    /// outcomes.
815    #[cfg(debug_assertions)]
816    fn assert_quantities(&self, original: Quantities, new: Quantities) {
817        macro_rules! emit {
818            ($category:expr, $($tt:tt)*) => {{
819                match self.lenient.contains(&$category) {
820                    // Certain categories are known to be not always correct,
821                    // they are logged instead.
822                    true => relay_log::debug!($($tt)*),
823                    false  => {
824                        relay_log::error!("Original: {original:?}");
825                        relay_log::error!("New: {new:?}");
826                        relay_log::error!("Modifications: {:?}", self.modifications);
827                        relay_log::error!("In Flight: {:?}", self.in_flight);
828                        panic!($($tt)*)
829                    }
830                }
831            }};
832        }
833
834        let mut sums = debug::Quantities::from(&original).0;
835        for (category, offset) in &self.modifications {
836            let v = sums.entry(*category).or_default();
837            match v.checked_add_signed(*offset) {
838                Some(result) => *v = result,
839                None => emit!(
840                    category,
841                    "Attempted to modify original quantity {v} into the negative ({offset})"
842                ),
843            }
844        }
845
846        for (category, quantity, outcome) in &self.in_flight {
847            match sums.get_mut(category) {
848                Some(c) if *c >= *quantity => *c -= *quantity,
849                Some(c) => emit!(
850                    category,
851                    "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there were only {c} items in the category originally"
852                ),
853                None => emit!(
854                    category,
855                    "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there never was an item in this category"
856                ),
857            }
858        }
859
860        for (category, quantity) in &new {
861            match sums.get_mut(category) {
862                Some(c) if *c >= *quantity => *c -= *quantity,
863                Some(c) => emit!(
864                    category,
865                    "New item has {quantity} items in category '{category}', but original (after emitted outcomes) only has {c} left"
866                ),
867                None => emit!(
868                    category,
869                    "New item has {quantity} items in category '{category}', but after emitted outcomes there are none left"
870                ),
871            }
872        }
873
874        for (category, quantity) in sums {
875            if quantity > 0 {
876                emit!(
877                    category,
878                    "Missing outcomes or mismatched quantity in category '{category}', off by {quantity}"
879                );
880            }
881        }
882    }
883
884    #[cfg(not(debug_assertions))]
885    fn assert_quantities(&self, _: Quantities, _: Quantities) {}
886}
887
888impl<'a> Drop for RecordKeeper<'a> {
889    fn drop(&mut self) {
890        for (category, quantity) in std::mem::take(&mut self.on_drop) {
891            self.meta.track_outcome(
892                Outcome::Invalid(DiscardReason::Internal),
893                category,
894                quantity,
895            );
896        }
897    }
898}
899
900impl RecordKeeper<'_> {
901    /// Rejects an item if the passed result is an error and returns a default value.
902    ///
903    /// Similar to [`Self::reject_err`], this emits the necessary outcomes for an
904    /// item, if there is an error.
905    pub fn or_default<T, E, Q>(&mut self, r: Result<T, E>, q: Q) -> T
906    where
907        T: Default,
908        E: OutcomeError,
909        Q: Counted,
910    {
911        match r {
912            Ok(result) => result,
913            Err(err) => {
914                self.reject_err(err, q);
915                T::default()
916            }
917        }
918    }
919
920    /// Rejects an item with an error.
921    ///
922    /// Makes sure the correct outcomes are tracked for the item, that is discarded due to an
923    /// error.
924    pub fn reject_err<E, Q>(&mut self, err: E, q: Q) -> E::Error
925    where
926        E: OutcomeError,
927        Q: Counted,
928    {
929        let (outcome, err) = err.consume();
930        for (category, quantity) in q.quantities() {
931            self.in_flight.push((category, quantity, outcome.clone()))
932        }
933        err
934    }
935
936    /// Rejects an item with an internal error.
937    ///
938    /// See also: [`Managed::internal_error`].
939    #[track_caller]
940    pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
941    where
942        E: std::error::Error + 'static,
943        Q: Counted,
944    {
945        relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
946        debug_assert!(false, "internal error: {error}");
947        self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
948    }
949}
950
951/// Iterator returned by [`Managed::split`].
952pub struct Split<I, S>
953where
954    I: Iterator<Item = S>,
955    S: Counted,
956{
957    #[cfg(debug_assertions)]
958    quantities: Quantities,
959    items: I,
960    meta: Arc<Meta>,
961    exhausted: bool,
962}
963
964impl<I, S> Split<I, S>
965where
966    I: Iterator<Item = S>,
967    S: Counted,
968{
969    /// Subtracts passed quantities from the total quantities to verify total quantity counts are
970    /// matching.
971    #[cfg(debug_assertions)]
972    fn subtract(&mut self, q: Quantities) {
973        for (category, quantities) in q {
974            let Some(orig_quantities) = self
975                .quantities
976                .iter_mut()
977                .find_map(|(c, q)| (*c == category).then_some(q))
978            else {
979                debug_assert!(
980                    false,
981                    "mismatching quantities, item split into category {category}, \
982                    which originally was not present"
983                );
984                continue;
985            };
986
987            if *orig_quantities >= quantities {
988                *orig_quantities -= quantities;
989            } else {
990                debug_assert!(
991                    false,
992                    "in total more items produced in category {category} than originally available"
993                );
994            }
995        }
996    }
997}
998
999impl<I, S> Iterator for Split<I, S>
1000where
1001    I: Iterator<Item = S>,
1002    S: Counted,
1003{
1004    type Item = Managed<S>;
1005
1006    fn next(&mut self) -> Option<Self::Item> {
1007        let next = match self.items.next() {
1008            Some(next) => next,
1009            None => {
1010                self.exhausted = true;
1011                return None;
1012            }
1013        };
1014
1015        #[cfg(debug_assertions)]
1016        self.subtract(next.quantities());
1017
1018        Some(Managed::from_parts(next, Arc::clone(&self.meta)))
1019    }
1020}
1021
1022impl<I, S> Drop for Split<I, S>
1023where
1024    I: Iterator<Item = S>,
1025    S: Counted,
1026{
1027    fn drop(&mut self) {
1028        // If the inner iterator was exhausted, no items should be remaining.
1029        #[cfg(debug_assertions)]
1030        if self.exhausted {
1031            for (category, quantities) in &self.quantities {
1032                debug_assert!(
1033                    *quantities == 0,
1034                    "items split, but still {quantities} remaining in category {category}"
1035                );
1036            }
1037        }
1038
1039        if self.exhausted {
1040            return;
1041        }
1042
1043        // There may be items remaining in the iterator for multiple reasons:
1044        // - there was a panic
1045        // - the iterator was never fully consumed
1046        //
1047        // In any case, outcomes must be emitted for the remaining items.
1048        for item in &mut self.items {
1049            for (category, quantity) in item.quantities() {
1050                self.meta.track_outcome(
1051                    Outcome::Invalid(DiscardReason::Internal),
1052                    category,
1053                    quantity,
1054                );
1055            }
1056        }
1057    }
1058}
1059
1060impl<I, S> FusedIterator for Split<I, S>
1061where
1062    I: Iterator<Item = S> + FusedIterator,
1063    S: Counted,
1064{
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069    use super::*;
1070
1071    struct CountedVec(Vec<u32>);
1072
1073    impl Counted for CountedVec {
1074        fn quantities(&self) -> Quantities {
1075            smallvec::smallvec![(DataCategory::Error, self.0.len())]
1076        }
1077    }
1078
1079    struct CountedValue(u32);
1080
1081    impl Counted for CountedValue {
1082        fn quantities(&self) -> Quantities {
1083            smallvec::smallvec![(DataCategory::Error, 1)]
1084        }
1085    }
1086
1087    #[test]
1088    fn test_reject_err_no_outcome() {
1089        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1090        let (managed, mut handle) = Managed::for_test(value).build();
1091
1092        // Rejecting with no outcome, should not emit any outcomes.
1093        let _ = managed.reject_err((None, ()));
1094        handle.assert_no_outcomes();
1095
1096        // Now dropping the manged instance, should not record any (internal) outcomes either.
1097        drop(managed);
1098        handle.assert_no_outcomes();
1099    }
1100
1101    #[test]
1102    fn test_split_fully_consumed() {
1103        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1104        let (managed, mut handle) = Managed::for_test(value).build();
1105
1106        let s = managed
1107            .split(|value| value.0.into_iter().map(CountedValue))
1108            // Fully consume the iterator to make sure there aren't any outcomes emitted on drop.
1109            .collect::<Vec<_>>();
1110
1111        handle.assert_no_outcomes();
1112
1113        for (i, s) in s.into_iter().enumerate() {
1114            assert_eq!(s.as_ref().0, i as u32);
1115            let outcome = Outcome::Invalid(DiscardReason::Cors);
1116            let _ = s.reject_err((outcome.clone(), ()));
1117            handle.assert_outcome(&outcome, DataCategory::Error, 1);
1118        }
1119    }
1120
1121    #[test]
1122    fn test_split_partially_consumed_emits_remaining() {
1123        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1124        let (managed, mut handle) = Managed::for_test(value).build();
1125
1126        let mut s = managed.split(|value| value.0.into_iter().map(CountedValue));
1127        handle.assert_no_outcomes();
1128
1129        drop(s.next());
1130        handle.assert_internal_outcome(DataCategory::Error, 1);
1131        drop(s.next());
1132        handle.assert_internal_outcome(DataCategory::Error, 1);
1133        drop(s.next());
1134        handle.assert_internal_outcome(DataCategory::Error, 1);
1135        handle.assert_no_outcomes();
1136
1137        drop(s);
1138
1139        handle.assert_internal_outcome(DataCategory::Error, 1);
1140        handle.assert_internal_outcome(DataCategory::Error, 1);
1141        handle.assert_internal_outcome(DataCategory::Error, 1);
1142    }
1143
1144    #[test]
1145    fn test_split_changing_quantities_should_panic() {
1146        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1147        let (managed, mut handle) = Managed::for_test(value).build();
1148
1149        let mut s = managed.split(|_| std::iter::once(CountedValue(0)));
1150
1151        s.next().unwrap().accept(|_| {});
1152        handle.assert_no_outcomes();
1153
1154        assert!(s.next().is_none());
1155
1156        let r = std::panic::catch_unwind(move || {
1157            drop(s);
1158        });
1159
1160        assert!(
1161            r.is_err(),
1162            "expected split to panic because of mismatched (not enough) outcomes"
1163        );
1164    }
1165
1166    #[test]
1167    fn test_split_more_outcomes_than_before_should_panic() {
1168        let value = CountedVec(vec![0]);
1169        let (managed, mut handle) = Managed::for_test(value).build();
1170
1171        let mut s = managed.split(|_| vec![CountedValue(0), CountedValue(2)].into_iter());
1172
1173        s.next().unwrap().accept(|_| {});
1174        handle.assert_no_outcomes();
1175
1176        let r = std::panic::catch_unwind(move || {
1177            s.next();
1178        });
1179
1180        assert!(
1181            r.is_err(),
1182            "expected split to panic because of mismatched (too many) outcomes"
1183        );
1184    }
1185
1186    #[test]
1187    fn test_split_changing_categories_should_panic() {
1188        struct Special;
1189        impl Counted for Special {
1190            fn quantities(&self) -> Quantities {
1191                smallvec::smallvec![(DataCategory::Error, 1), (DataCategory::Transaction, 1)]
1192            }
1193        }
1194
1195        let value = CountedVec(vec![0]);
1196        let (managed, _handle) = Managed::for_test(value).build();
1197
1198        let mut s = managed.split(|value| value.0.into_iter().map(|_| Special));
1199
1200        let r = std::panic::catch_unwind(move || {
1201            let _ = s.next();
1202        });
1203
1204        assert!(
1205            r.is_err(),
1206            "expected split to panic because of mismatched outcome categories"
1207        );
1208    }
1209
1210    #[test]
1211    fn test_split_assert_fused() {
1212        fn only_fused<T: FusedIterator>(_: T) {}
1213
1214        let (managed, mut handle) = Managed::for_test(CountedVec(vec![0])).build();
1215        only_fused(managed.split(|value| value.0.into_iter().map(CountedValue)));
1216        handle.assert_internal_outcome(DataCategory::Error, 1);
1217    }
1218}