relay_server/managed/
managed.rs

1#[cfg(debug_assertions)]
2use std::collections::BTreeMap;
3use std::convert::Infallible;
4use std::fmt;
5use std::iter::FusedIterator;
6use std::mem::ManuallyDrop;
7use std::net::IpAddr;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use chrono::{DateTime, Utc};
12use itertools::Either;
13use relay_event_schema::protocol::EventId;
14use relay_quotas::{DataCategory, Scoping};
15use relay_system::Addr;
16use smallvec::SmallVec;
17
18use crate::Envelope;
19use crate::managed::{Counted, ManagedEnvelope, Quantities};
20use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
21use crate::services::processor::ProcessingError;
22
23#[cfg(debug_assertions)]
24mod debug;
25#[cfg(test)]
26mod test;
27
28#[cfg(test)]
29pub use self::test::*;
30
31/// An error which can be extracted into an outcome.
32pub trait OutcomeError {
33    /// Produced error, without attached outcome.
34    type Error;
35
36    /// Consumes the error and returns an outcome and [`Self::Error`].
37    ///
38    /// Returning a `None` outcome should discard the item(s) silently.
39    fn consume(self) -> (Option<Outcome>, Self::Error);
40}
41
42impl OutcomeError for Outcome {
43    type Error = ();
44
45    fn consume(self) -> (Option<Outcome>, Self::Error) {
46        (self, ()).consume()
47    }
48}
49
50impl OutcomeError for Option<Outcome> {
51    type Error = ();
52
53    fn consume(self) -> (Option<Outcome>, Self::Error) {
54        (self, ()).consume()
55    }
56}
57
58impl<E> OutcomeError for (Outcome, E) {
59    type Error = E;
60
61    fn consume(self) -> (Option<Outcome>, Self::Error) {
62        (Some(self.0), self.1)
63    }
64}
65
66impl<E> OutcomeError for (Option<Outcome>, E) {
67    type Error = E;
68
69    fn consume(self) -> (Option<Outcome>, Self::Error) {
70        self
71    }
72}
73
74impl OutcomeError for ProcessingError {
75    type Error = Self;
76
77    fn consume(self) -> (Option<Outcome>, Self::Error) {
78        (self.to_outcome(), self)
79    }
80}
81
82impl OutcomeError for Infallible {
83    type Error = Self;
84
85    fn consume(self) -> (Option<Outcome>, Self::Error) {
86        match self {}
87    }
88}
89
90/// A wrapper type which ensures outcomes have been emitted for an error.
91///
92/// [`Managed`] wraps an error in [`Rejected`] once outcomes for have been emitted for the managed
93/// item.
94#[derive(Debug, Clone, Copy)]
95#[must_use = "a rejection must be propagated"]
96pub struct Rejected<T>(T);
97
98impl<T> Rejected<T> {
99    /// Extracts the underlying error.
100    pub fn into_inner(self) -> T {
101        self.0
102    }
103
104    /// Maps the rejected error to a different error.
105    pub fn map<F, S>(self, f: F) -> Rejected<S>
106    where
107        F: FnOnce(T) -> S,
108    {
109        Rejected(f(self.0))
110    }
111}
112
113impl<T> std::error::Error for Rejected<T>
114where
115    T: std::error::Error,
116{
117    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
118        self.0.source()
119    }
120}
121
122impl<T> std::fmt::Display for Rejected<T>
123where
124    T: std::fmt::Display,
125{
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        self.0.fmt(f)
128    }
129}
130
131impl<T> axum::response::IntoResponse for Rejected<T>
132where
133    T: axum::response::IntoResponse,
134{
135    fn into_response(self) -> axum::response::Response {
136        self.0.into_response()
137    }
138}
139
140/// The [`Managed`] wrapper ensures outcomes are correctly emitted for the contained item.
141pub struct Managed<T: Counted> {
142    value: T,
143    meta: Arc<Meta>,
144    done: AtomicBool,
145}
146
147impl Managed<Box<Envelope>> {
148    /// Creates a managed instance from an unmanaged envelope.
149    pub fn from_envelope(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
150        let meta = Arc::new(Meta {
151            outcome_aggregator,
152            received_at: envelope.received_at(),
153            scoping: envelope.meta().get_partial_scoping().into_scoping(),
154            event_id: envelope.event_id(),
155            remote_addr: envelope.meta().remote_addr(),
156        });
157
158        Self::from_parts(envelope, meta)
159    }
160}
161
162impl<T: Counted> Managed<T> {
163    /// Creates new [`Managed`] instance with the provided `value` and metadata from a [`ManagedEnvelope`].
164    ///
165    /// The [`Managed`] instance, inherits all metadata from the passed [`ManagedEnvelope`],
166    /// like received time or scoping.
167    pub fn with_meta_from(envelope: &ManagedEnvelope, value: T) -> Self {
168        Self::from_parts(
169            value,
170            Arc::new(Meta {
171                outcome_aggregator: envelope.outcome_aggregator().clone(),
172                received_at: envelope.received_at(),
173                scoping: envelope.scoping(),
174                event_id: envelope.envelope().event_id(),
175                remote_addr: envelope.meta().remote_addr(),
176            }),
177        )
178    }
179
180    /// Creates another [`Managed`] instance, with a new value but shared metadata.
181    pub fn wrap<S>(&self, other: S) -> Managed<S>
182    where
183        S: Counted,
184    {
185        Managed::from_parts(other, Arc::clone(&self.meta))
186    }
187
188    /// Boxes the contained value.
189    pub fn boxed(self) -> Managed<Box<T>> {
190        self.map(|value, _| Box::new(value))
191    }
192
193    /// Original received timestamp.
194    pub fn received_at(&self) -> DateTime<Utc> {
195        self.meta.received_at
196    }
197
198    /// Scoping information stored in this context.
199    pub fn scoping(&self) -> Scoping {
200        self.meta.scoping
201    }
202
203    /// Updates the scoping stored in this context.
204    ///
205    /// Special care has to be taken when items contained in the managed instance also store a
206    /// scoping. Such a scoping will **not** be updated.
207    ///
208    /// Conversions between `Managed<Box<Envelope>>` and `ManagedEnvelope` transfer the scoping
209    /// correctly.
210    ///
211    /// See also: [`ManagedEnvelope::scope`].
212    pub fn scope(&mut self, scoping: Scoping) {
213        let meta = Arc::make_mut(&mut self.meta);
214        meta.scoping = scoping;
215    }
216
217    /// Splits [`Self`] into two other [`Managed`] items.
218    ///
219    /// The two resulting managed instances together are expected to have the same outcomes as the original instance..
220    /// Since splitting may introduce a new type of item, which some of the original
221    /// quantities are transferred to, there may be new additional data categories created.
222    pub fn split_once<F, S, U>(self, f: F) -> (Managed<S>, Managed<U>)
223    where
224        F: FnOnce(T, &mut RecordKeeper) -> (S, U),
225        S: Counted,
226        U: Counted,
227    {
228        debug_assert!(!self.is_done());
229
230        let (value, meta) = self.destructure();
231        let quantities = value.quantities();
232
233        let mut records = RecordKeeper::new(&meta, quantities);
234
235        let (a, b) = f(value, &mut records);
236
237        let mut quantities = a.quantities();
238        quantities.extend(b.quantities());
239        records.success(quantities);
240
241        (
242            Managed::from_parts(a, Arc::clone(&meta)),
243            Managed::from_parts(b, meta),
244        )
245    }
246
247    /// Splits [`Self`] into a variable amount if individual items.
248    ///
249    /// Useful when the current instance contains multiple items of the same type
250    /// and must be split into individually managed items.
251    pub fn split<F, I, S>(self, f: F) -> Split<I::IntoIter, I::Item>
252    where
253        F: FnOnce(T) -> I,
254        I: IntoIterator<Item = S>,
255        S: Counted,
256    {
257        self.split_with_context(|value| (f(value), ())).0
258    }
259
260    /// Splits [`Self`] into a variable amount if individual items.
261    ///
262    /// Like [`Self::split`] but also allows returning an untracked context,
263    /// a way of returning additional data when deconstructing the original item.
264    pub fn split_with_context<F, I, S, C>(self, f: F) -> (Split<I::IntoIter, I::Item>, C)
265    where
266        F: FnOnce(T) -> (I, C),
267        I: IntoIterator<Item = S>,
268        S: Counted,
269    {
270        debug_assert!(!self.is_done());
271
272        let (value, meta) = self.destructure();
273        #[cfg(debug_assertions)]
274        let quantities = value.quantities();
275
276        let (items, context) = f(value);
277
278        (
279            Split {
280                #[cfg(debug_assertions)]
281                quantities,
282                items: items.into_iter(),
283                meta,
284                exhausted: false,
285            },
286            context,
287        )
288    }
289
290    /// Filters individual items and emits outcomes for them if they are removed.
291    ///
292    /// This is particularly useful when the managed instance is a container of individual items,
293    /// which need to be processed or filtered on a case by case basis.
294    ///
295    /// # Examples:
296    ///
297    /// ```
298    /// # use relay_server::managed::{Counted, Managed, Quantities};
299    /// # #[derive(Copy, Clone)]
300    /// # struct Context<'a>(&'a u32);
301    /// # struct Item;
302    /// struct Items {
303    ///     items: Vec<Item>,
304    /// }
305    /// # impl Counted for Items {
306    /// #   fn quantities(&self) -> Quantities {
307    /// #       todo!()
308    /// #   }
309    /// # }
310    /// # impl Counted for Item {
311    /// #   fn quantities(&self) -> Quantities {
312    /// #       todo!()
313    /// #   }
314    /// # }
315    /// # type Error = std::convert::Infallible;
316    ///
317    /// fn process_items(items: &mut Managed<Items>, ctx: Context<'_>) {
318    ///     items.retain(|items| &mut items.items, |item, _| process(item, ctx));
319    /// }
320    ///
321    /// fn process(item: &mut Item, ctx: Context<'_>) -> Result<(), Error> {
322    ///     todo!()
323    /// }
324    /// ```
325    pub fn retain<S, I, U, E>(&mut self, select: S, mut retain: U)
326    where
327        S: FnOnce(&mut T) -> &mut Vec<I>,
328        I: Counted,
329        U: FnMut(&mut I, &mut RecordKeeper<'_>) -> Result<(), E>,
330        E: OutcomeError,
331    {
332        self.retain_with_context(
333            |inner| (select(inner), &()),
334            |item, _, records| retain(item, records),
335        );
336    }
337
338    /// Filters individual items and emits outcomes for them if they are removed.
339    ///
340    /// Like [`Self::retain`], but it allows for an additional context extracted from the managed
341    /// object passed to the retain function.
342    ///
343    /// # Examples:
344    ///
345    /// ```
346    /// # use relay_server::managed::{Counted, Managed, Quantities};
347    /// # #[derive(Copy, Clone)]
348    /// # struct Context<'a>(&'a u32);
349    /// # struct Item;
350    /// struct Items {
351    ///     ty: String,
352    ///     items: Vec<Item>,
353    /// }
354    /// # impl Counted for Items {
355    /// #   fn quantities(&self) -> Quantities {
356    /// #       todo!()
357    /// #   }
358    /// # }
359    /// # impl Counted for Item {
360    /// #   fn quantities(&self) -> Quantities {
361    /// #       todo!()
362    /// #   }
363    /// # }
364    /// # type Error = std::convert::Infallible;
365    ///
366    /// fn process_items(items: &mut Managed<Items>, ctx: Context<'_>) {
367    ///     items.retain_with_context(|items| (&mut items.items, &items.ty), |item, ty, _| process(item, ty, ctx));
368    /// }
369    ///
370    /// fn process(item: &mut Item, ty: &str, ctx: Context<'_>) -> Result<(), Error> {
371    ///     todo!()
372    /// }
373    /// ```
374    pub fn retain_with_context<S, C, I, U, E>(&mut self, select: S, mut retain: U)
375    where
376        // Returning `&'a C` here is not optimal, ideally we return C here and express the correct
377        // bound of `C: 'a` but this is, to my knowledge, currently not possible to express in stable Rust.
378        //
379        // This is unfortunately a bit limiting but for most of our purposes it is enough.
380        for<'a> S: FnOnce(&'a mut T) -> (&'a mut Vec<I>, &'a C),
381        I: Counted,
382        U: FnMut(&mut I, &C, &mut RecordKeeper<'_>) -> Result<(), E>,
383        E: OutcomeError,
384    {
385        self.modify(|inner, records| {
386            let (items, ctx) = select(inner);
387            items.retain_mut(|item| match retain(item, ctx, records) {
388                Ok(()) => true,
389                Err(err) => {
390                    records.reject_err(err, &*item);
391                    false
392                }
393            })
394        });
395    }
396
397    /// Maps a [`Managed<T>`] to [`Managed<S>`] by applying the mapping function `f`.
398    ///
399    /// Like [`Self::try_map`] but not fallible.
400    pub fn map<S, F>(self, f: F) -> Managed<S>
401    where
402        F: FnOnce(T, &mut RecordKeeper) -> S,
403        S: Counted,
404    {
405        self.try_map(move |inner, records| Ok::<_, Infallible>(f(inner, records)))
406            .unwrap_or_else(|e| match e.0 {})
407    }
408
409    /// Maps a [`Managed<T>`] to [`Managed<S>`] by applying the mapping function `f`.
410    ///
411    /// The mapping function gets access to a [`RecordKeeper`], to emit outcomes for partial
412    /// discards.
413    ///
414    /// If the mapping function returns an error, the entire (original) [`Self`] is rejected,
415    /// no partial outcomes are emitted.
416    pub fn try_map<S, F, E>(self, f: F) -> Result<Managed<S>, Rejected<E::Error>>
417    where
418        F: FnOnce(T, &mut RecordKeeper) -> Result<S, E>,
419        S: Counted,
420        E: OutcomeError,
421    {
422        debug_assert!(!self.is_done());
423
424        let (value, meta) = self.destructure();
425        let quantities = value.quantities();
426
427        let mut records = RecordKeeper::new(&meta, quantities);
428
429        match f(value, &mut records) {
430            Ok(value) => {
431                records.success(value.quantities());
432                Ok(Managed::from_parts(value, meta))
433            }
434            Err(err) => Err(records.failure(err)),
435        }
436    }
437
438    /// Gives mutable access to the contained value to modify it.
439    ///
440    /// Like [`Self::try_modify`] but not fallible.
441    pub fn modify<F>(&mut self, f: F)
442    where
443        F: FnOnce(&mut T, &mut RecordKeeper),
444    {
445        self.try_modify(move |inner, records| {
446            f(inner, records);
447            Ok::<_, Infallible>(())
448        })
449        .unwrap_or_else(|e| match e {})
450    }
451
452    /// Gives mutable access to the contained value to modify it.
453    ///
454    /// The modifying function gets access to a [`RecordKeeper`], to emit outcomes for partial
455    /// discards.
456    ///
457    /// If the modifying function returns an error, the entire (original) [`Self`] is rejected,
458    /// no partial outcomes are emitted.
459    pub fn try_modify<F, E>(&mut self, f: F) -> Result<(), Rejected<E::Error>>
460    where
461        F: FnOnce(&mut T, &mut RecordKeeper) -> Result<(), E>,
462        E: OutcomeError,
463    {
464        debug_assert!(!self.is_done());
465
466        let quantities = self.value.quantities();
467        let mut records = RecordKeeper::new(&self.meta, quantities);
468
469        match f(&mut self.value, &mut records) {
470            Ok(()) => {
471                records.success(self.value.quantities());
472                Ok(())
473            }
474            Err(err) => {
475                let err = records.failure(err);
476                self.done.store(true, Ordering::Relaxed);
477                Err(err)
478            }
479        }
480    }
481
482    /// Accepts the item of this managed instance.
483    ///
484    /// This should be called if the item has been or is about to be accepted by the upstream, which means that
485    /// the responsibility for logging outcomes has been moved. This function will not log any
486    /// outcomes.
487    ///
488    /// Like [`Self::try_accept`], but infallible.
489    pub fn accept<F, S>(self, f: F) -> S
490    where
491        F: FnOnce(T) -> S,
492    {
493        self.try_accept(|item| Ok::<_, Infallible>(f(item)))
494            .unwrap_or_else(|err| match err.0 {})
495    }
496
497    /// Accepts the item of this managed instance.
498    ///
499    /// This should be called if the item has been or is about to be accepted by the upstream.
500    ///
501    /// Outcomes are only emitted when the accepting closure returns an error, which means that
502    /// in the success case the responsibility for logging outcomes has been moved to the
503    /// caller/upstream.
504    pub fn try_accept<F, S, E>(self, f: F) -> Result<S, Rejected<E::Error>>
505    where
506        F: FnOnce(T) -> Result<S, E>,
507        E: OutcomeError,
508    {
509        debug_assert!(!self.is_done());
510
511        let (value, meta) = self.destructure();
512        let records = RecordKeeper::new(&meta, value.quantities());
513
514        match f(value) {
515            Ok(value) => {
516                records.accept();
517                Ok(value)
518            }
519            Err(err) => Err(records.failure(err)),
520        }
521    }
522
523    /// Rejects the entire [`Managed`] instance with an internal error.
524    ///
525    /// Internal errors should be reserved for uses where logical invariants are violated.
526    /// Cases which should never happen and always indicate a logical bug.
527    ///
528    /// This function will panic in debug builds, but discard the item
529    /// with an internal discard reason in release builds.
530    #[track_caller]
531    pub fn internal_error(&self, reason: &'static str) -> Rejected<()> {
532        relay_log::error!("internal error: {reason}");
533        debug_assert!(false, "internal error: {reason}");
534        self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
535    }
536
537    /// Rejects the entire [`Managed`] instance.
538    pub fn reject_err<E>(&self, error: E) -> Rejected<E::Error>
539    where
540        E: OutcomeError,
541    {
542        debug_assert!(!self.is_done());
543
544        let (outcome, error) = error.consume();
545        self.do_reject(outcome);
546        Rejected(error)
547    }
548
549    fn do_reject(&self, outcome: Option<Outcome>) {
550        // Always set the internal state to `done`, even if there is no outcome to be emitted.
551        // All bookkeeping has been done.
552        let is_done = self.done.fetch_or(true, Ordering::Relaxed);
553
554        // No outcome to emit, we're done.
555        let Some(outcome) = outcome else {
556            return;
557        };
558
559        // Only emit outcomes if we were not yet done.
560        //
561        // Callers should guard against accidentally calling `do_reject` when the `is_done` flag is
562        // already set, but internal uses (like `Drop`) can rely on this double emission
563        // prevention.
564        if !is_done {
565            for (category, quantity) in self.value.quantities() {
566                self.meta.track_outcome(outcome.clone(), category, quantity);
567            }
568        }
569    }
570
571    /// De-structures this managed instance into its own parts.
572    ///
573    /// While de-structured no outcomes will be emitted on drop.
574    ///
575    /// Currently no `Managed`, which already has outcomes emitted, should be de-structured
576    /// as this status is lost.
577    fn destructure(self) -> (T, Arc<Meta>) {
578        // SAFETY: this follows an approach mentioned in the RFC
579        // <https://github.com/rust-lang/rfcs/pull/3466> to move fields out of
580        // a type with a drop implementation.
581        //
582        // The original type is wrapped in a manual drop to prevent running the
583        // drop handler, afterwards all fields are moved out of the type.
584        //
585        // And the original type is forgotten, de-structuring the original type
586        // without running its drop implementation.
587        let this = ManuallyDrop::new(self);
588        let Managed { value, meta, done } = &*this;
589
590        let value = unsafe { std::ptr::read(value) };
591        let meta = unsafe { std::ptr::read(meta) };
592        let done = unsafe { std::ptr::read(done) };
593        // This is a current invariant, if we ever need to change the invariant,
594        // the done status should be preserved and returned instead.
595        debug_assert!(
596            !done.load(Ordering::Relaxed),
597            "a `done` managed should never be destructured"
598        );
599
600        (value, meta)
601    }
602
603    fn from_parts(value: T, meta: Arc<Meta>) -> Self {
604        Self {
605            value,
606            meta,
607            done: AtomicBool::new(false),
608        }
609    }
610
611    fn is_done(&self) -> bool {
612        self.done.load(Ordering::Relaxed)
613    }
614}
615
616impl<T: Counted> Drop for Managed<T> {
617    fn drop(&mut self) {
618        self.do_reject(Some(Outcome::Invalid(DiscardReason::Internal)));
619    }
620}
621
622impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
623    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624        write!(f, "Managed<{}>[", std::any::type_name::<T>())?;
625        for (i, (category, quantity)) in self.value.quantities().iter().enumerate() {
626            if i > 0 {
627                write!(f, ",")?;
628            }
629            write!(f, "{category}:{quantity}")?;
630        }
631        write!(f, "](")?;
632        self.value.fmt(f)?;
633        write!(f, ")")
634    }
635}
636
637impl<T: Counted> Managed<Option<T>> {
638    /// Turns a managed option into an optional [`Managed`].
639    pub fn transpose(self) -> Option<Managed<T>> {
640        let (o, meta) = self.destructure();
641        o.map(|t| Managed::from_parts(t, meta))
642    }
643}
644
645impl<L: Counted, R: Counted> Managed<Either<L, R>> {
646    /// Turns a managed [`Either`] into an [`Either`] of [`Managed`].
647    pub fn transpose(self) -> Either<Managed<L>, Managed<R>> {
648        let (either, meta) = self.destructure();
649        match either {
650            Either::Left(value) => Either::Left(Managed::from_parts(value, meta)),
651            Either::Right(value) => Either::Right(Managed::from_parts(value, meta)),
652        }
653    }
654}
655
656impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
657    fn from(value: Managed<Box<Envelope>>) -> Self {
658        let (value, meta) = value.destructure();
659        let mut envelope = ManagedEnvelope::new(value, meta.outcome_aggregator.clone());
660        envelope.scope(meta.scoping);
661        envelope
662    }
663}
664
665impl<T: Counted> AsRef<T> for Managed<T> {
666    fn as_ref(&self) -> &T {
667        &self.value
668    }
669}
670
671impl<T: Counted> std::ops::Deref for Managed<T> {
672    type Target = T;
673
674    fn deref(&self) -> &Self::Target {
675        &self.value
676    }
677}
678
679/// Internal metadata attached with a [`Managed`] instance.
680#[derive(Debug, Clone)]
681struct Meta {
682    /// Outcome aggregator service.
683    outcome_aggregator: Addr<TrackOutcome>,
684    /// Received timestamp, when the contained payload/information was received.
685    ///
686    /// See also: [`crate::extractors::RequestMeta::received_at`].
687    received_at: DateTime<Utc>,
688    /// Data scoping information of the contained item.
689    scoping: Scoping,
690    /// Optional event id associated with the contained data.
691    event_id: Option<EventId>,
692    /// Optional remote addr from where the data was received from.
693    remote_addr: Option<IpAddr>,
694}
695
696impl Meta {
697    pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
698        self.outcome_aggregator.send(TrackOutcome {
699            timestamp: self.received_at,
700            scoping: self.scoping,
701            outcome,
702            event_id: self.event_id,
703            remote_addr: self.remote_addr,
704            category,
705            quantity: quantity.try_into().unwrap_or(u32::MAX),
706        });
707    }
708}
709
710/// A record keeper makes sure modifications done on a [`Managed`] item are all accounted for
711/// correctly.
712pub struct RecordKeeper<'a> {
713    meta: &'a Meta,
714    on_drop: Quantities,
715    #[cfg(debug_assertions)]
716    lenient: SmallVec<[DataCategory; 1]>,
717    #[cfg(debug_assertions)]
718    modifications: BTreeMap<DataCategory, isize>,
719    in_flight: SmallVec<[(DataCategory, usize, Option<Outcome>); 2]>,
720}
721
722impl<'a> RecordKeeper<'a> {
723    fn new(meta: &'a Meta, quantities: Quantities) -> Self {
724        Self {
725            meta,
726            on_drop: quantities,
727            #[cfg(debug_assertions)]
728            lenient: Default::default(),
729            #[cfg(debug_assertions)]
730            modifications: Default::default(),
731            in_flight: Default::default(),
732        }
733    }
734
735    /// Marking a data category as lenient exempts this category from outcome quantity validations.
736    ///
737    /// Consider using [`Self::modify_by`] instead.
738    ///
739    /// This can be used in cases where the quantity is knowingly modified, which is quite common
740    /// for data categories which count bytes.
741    pub fn lenient(&mut self, category: DataCategory) {
742        let _category = category;
743        #[cfg(debug_assertions)]
744        self.lenient.push(_category);
745    }
746
747    /// Modifies the expected count for a category.
748    ///
749    /// When extracting payloads category counts may expectedly change, these changes can be
750    /// tracked using this function.
751    ///
752    /// Prefer using [`Self::modify_by`] over [`Self::lenient`] as lenient completely disables
753    /// validation for the entire category.
754    pub fn modify_by(&mut self, category: DataCategory, offset: isize) {
755        let _category = category;
756        let _offset = offset;
757        #[cfg(debug_assertions)]
758        {
759            *self.modifications.entry(_category).or_default() += offset;
760        }
761    }
762
763    /// Finalizes all records and emits the necessary outcomes.
764    ///
765    /// This uses the quantities of the original item.
766    fn failure<E>(mut self, error: E) -> Rejected<E::Error>
767    where
768        E: OutcomeError,
769    {
770        let (outcome, error) = error.consume();
771
772        if let Some(outcome) = outcome {
773            for (category, quantity) in std::mem::take(&mut self.on_drop) {
774                self.meta.track_outcome(outcome.clone(), category, quantity);
775            }
776        }
777
778        Rejected(error)
779    }
780
781    /// Finalizes all records and asserts that no additional outcomes have been tracked.
782    ///
783    /// Unlike [`Self::success`], this method does not allow for intermediate or partial outcomes,
784    /// it also does not verify any outcomes.
785    ///
786    /// This method is useful for using the record keeper to track failure outcomes, either
787    /// explicit failures or panics.
788    fn accept(mut self) {
789        debug_assert!(
790            self.in_flight.is_empty(),
791            "records accepted, but intermediate outcomes tracked"
792        );
793        self.on_drop.clear();
794    }
795
796    /// Finalizes all records and emits the created outcomes.
797    ///
798    /// This only emits the outcomes that have been explicitly registered.
799    /// In a debug build, the function also ensure no outcomes have been missed by comparing
800    /// quantities of the item before and after.
801    fn success(mut self, new: Quantities) {
802        let original = std::mem::take(&mut self.on_drop);
803        self.assert_quantities(original, new);
804
805        self.on_drop.clear();
806        for (category, quantity, outcome) in std::mem::take(&mut self.in_flight) {
807            if let Some(outcome) = outcome {
808                self.meta.track_outcome(outcome, category, quantity);
809            }
810        }
811    }
812
813    /// Asserts that there have been no quantities lost.
814    ///
815    /// The original amount of quantities should match the new amount of quantities + all emitted
816    /// outcomes.
817    #[cfg(debug_assertions)]
818    fn assert_quantities(&self, original: Quantities, new: Quantities) {
819        macro_rules! emit {
820            ($category:expr, $($tt:tt)*) => {{
821                match self.lenient.contains(&$category) {
822                    // Certain categories are known to be not always correct,
823                    // they are logged instead.
824                    true => relay_log::debug!($($tt)*),
825                    false  => {
826                        relay_log::error!("Original: {original:?}");
827                        relay_log::error!("New: {new:?}");
828                        relay_log::error!("Modifications: {:?}", self.modifications);
829                        relay_log::error!("In Flight: {:?}", self.in_flight);
830                        panic!($($tt)*)
831                    }
832                }
833            }};
834        }
835
836        let mut sums = debug::Quantities::from(&original).0;
837        for (category, offset) in &self.modifications {
838            let v = sums.entry(*category).or_default();
839            match v.checked_add_signed(*offset) {
840                Some(result) => *v = result,
841                None => emit!(
842                    category,
843                    "Attempted to modify original quantity {v} into the negative ({offset})"
844                ),
845            }
846        }
847
848        for (category, quantity, outcome) in &self.in_flight {
849            match sums.get_mut(category) {
850                Some(c) if *c >= *quantity => *c -= *quantity,
851                Some(c) => emit!(
852                    category,
853                    "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there were only {c} items in the category originally"
854                ),
855                None => emit!(
856                    category,
857                    "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there never was an item in this category"
858                ),
859            }
860        }
861
862        for (category, quantity) in &new {
863            match sums.get_mut(category) {
864                Some(c) if *c >= *quantity => *c -= *quantity,
865                Some(c) => emit!(
866                    category,
867                    "New item has {quantity} items in category '{category}', but original (after emitted outcomes) only has {c} left"
868                ),
869                None => emit!(
870                    category,
871                    "New item has {quantity} items in category '{category}', but after emitted outcomes there are none left"
872                ),
873            }
874        }
875
876        for (category, quantity) in sums {
877            if quantity > 0 {
878                emit!(
879                    category,
880                    "Missing outcomes or mismatched quantity in category '{category}', off by {quantity}"
881                );
882            }
883        }
884    }
885
886    #[cfg(not(debug_assertions))]
887    fn assert_quantities(&self, _: Quantities, _: Quantities) {}
888}
889
890impl<'a> Drop for RecordKeeper<'a> {
891    fn drop(&mut self) {
892        for (category, quantity) in std::mem::take(&mut self.on_drop) {
893            self.meta.track_outcome(
894                Outcome::Invalid(DiscardReason::Internal),
895                category,
896                quantity,
897            );
898        }
899    }
900}
901
902impl RecordKeeper<'_> {
903    /// Rejects an item if the passed result is an error and returns a default value.
904    ///
905    /// Similar to [`Self::reject_err`], this emits the necessary outcomes for an
906    /// item, if there is an error.
907    pub fn or_default<T, E, Q>(&mut self, r: Result<T, E>, q: Q) -> T
908    where
909        T: Default,
910        E: OutcomeError,
911        Q: Counted,
912    {
913        match r {
914            Ok(result) => result,
915            Err(err) => {
916                self.reject_err(err, q);
917                T::default()
918            }
919        }
920    }
921
922    /// Rejects an item with an error.
923    ///
924    /// Makes sure the correct outcomes are tracked for the item, that is discarded due to an
925    /// error.
926    pub fn reject_err<E, Q>(&mut self, err: E, q: Q) -> E::Error
927    where
928        E: OutcomeError,
929        Q: Counted,
930    {
931        let (outcome, err) = err.consume();
932        for (category, quantity) in q.quantities() {
933            self.in_flight.push((category, quantity, outcome.clone()))
934        }
935        err
936    }
937
938    /// Rejects an item with an internal error.
939    ///
940    /// See also: [`Managed::internal_error`].
941    #[track_caller]
942    pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
943    where
944        E: std::error::Error + 'static,
945        Q: Counted,
946    {
947        relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
948        debug_assert!(false, "internal error: {error}");
949        self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
950    }
951}
952
953/// Iterator returned by [`Managed::split`].
954pub struct Split<I, S>
955where
956    I: Iterator<Item = S>,
957    S: Counted,
958{
959    #[cfg(debug_assertions)]
960    quantities: Quantities,
961    items: I,
962    meta: Arc<Meta>,
963    exhausted: bool,
964}
965
966impl<I, S> Split<I, S>
967where
968    I: Iterator<Item = S>,
969    S: Counted,
970{
971    /// Subtracts passed quantities from the total quantities to verify total quantity counts are
972    /// matching.
973    #[cfg(debug_assertions)]
974    fn subtract(&mut self, q: Quantities) {
975        for (category, quantities) in q {
976            let Some(orig_quantities) = self
977                .quantities
978                .iter_mut()
979                .find_map(|(c, q)| (*c == category).then_some(q))
980            else {
981                debug_assert!(
982                    false,
983                    "mismatching quantities, item split into category {category}, \
984                    which originally was not present"
985                );
986                continue;
987            };
988
989            if *orig_quantities >= quantities {
990                *orig_quantities -= quantities;
991            } else {
992                debug_assert!(
993                    false,
994                    "in total more items produced in category {category} than originally available"
995                );
996            }
997        }
998    }
999}
1000
1001impl<I, S> Iterator for Split<I, S>
1002where
1003    I: Iterator<Item = S>,
1004    S: Counted,
1005{
1006    type Item = Managed<S>;
1007
1008    fn next(&mut self) -> Option<Self::Item> {
1009        let next = match self.items.next() {
1010            Some(next) => next,
1011            None => {
1012                self.exhausted = true;
1013                return None;
1014            }
1015        };
1016
1017        #[cfg(debug_assertions)]
1018        self.subtract(next.quantities());
1019
1020        Some(Managed::from_parts(next, Arc::clone(&self.meta)))
1021    }
1022}
1023
1024impl<I, S> Drop for Split<I, S>
1025where
1026    I: Iterator<Item = S>,
1027    S: Counted,
1028{
1029    fn drop(&mut self) {
1030        // If the inner iterator was exhausted, no items should be remaining.
1031        #[cfg(debug_assertions)]
1032        if self.exhausted {
1033            for (category, quantities) in &self.quantities {
1034                debug_assert!(
1035                    *quantities == 0,
1036                    "items split, but still {quantities} remaining in category {category}"
1037                );
1038            }
1039        }
1040
1041        if self.exhausted {
1042            return;
1043        }
1044
1045        // There may be items remaining in the iterator for multiple reasons:
1046        // - there was a panic
1047        // - the iterator was never fully consumed
1048        //
1049        // In any case, outcomes must be emitted for the remaining items.
1050        for item in &mut self.items {
1051            for (category, quantity) in item.quantities() {
1052                self.meta.track_outcome(
1053                    Outcome::Invalid(DiscardReason::Internal),
1054                    category,
1055                    quantity,
1056                );
1057            }
1058        }
1059    }
1060}
1061
1062impl<I, S> FusedIterator for Split<I, S>
1063where
1064    I: Iterator<Item = S> + FusedIterator,
1065    S: Counted,
1066{
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071    use super::*;
1072
1073    struct CountedVec(Vec<u32>);
1074
1075    impl Counted for CountedVec {
1076        fn quantities(&self) -> Quantities {
1077            smallvec::smallvec![(DataCategory::Error, self.0.len())]
1078        }
1079    }
1080
1081    struct CountedValue(u32);
1082
1083    impl Counted for CountedValue {
1084        fn quantities(&self) -> Quantities {
1085            smallvec::smallvec![(DataCategory::Error, 1)]
1086        }
1087    }
1088
1089    #[test]
1090    fn test_reject_err_no_outcome() {
1091        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1092        let (managed, mut handle) = Managed::for_test(value).build();
1093
1094        // Rejecting with no outcome, should not emit any outcomes.
1095        let _ = managed.reject_err((None, ()));
1096        handle.assert_no_outcomes();
1097
1098        // Now dropping the manged instance, should not record any (internal) outcomes either.
1099        drop(managed);
1100        handle.assert_no_outcomes();
1101    }
1102
1103    #[test]
1104    fn test_split_fully_consumed() {
1105        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1106        let (managed, mut handle) = Managed::for_test(value).build();
1107
1108        let s = managed
1109            .split(|value| value.0.into_iter().map(CountedValue))
1110            // Fully consume the iterator to make sure there aren't any outcomes emitted on drop.
1111            .collect::<Vec<_>>();
1112
1113        handle.assert_no_outcomes();
1114
1115        for (i, s) in s.into_iter().enumerate() {
1116            assert_eq!(s.as_ref().0, i as u32);
1117            let outcome = Outcome::Invalid(DiscardReason::Cors);
1118            let _ = s.reject_err((outcome.clone(), ()));
1119            handle.assert_outcome(&outcome, DataCategory::Error, 1);
1120        }
1121    }
1122
1123    #[test]
1124    fn test_split_partially_consumed_emits_remaining() {
1125        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1126        let (managed, mut handle) = Managed::for_test(value).build();
1127
1128        let mut s = managed.split(|value| value.0.into_iter().map(CountedValue));
1129        handle.assert_no_outcomes();
1130
1131        drop(s.next());
1132        handle.assert_internal_outcome(DataCategory::Error, 1);
1133        drop(s.next());
1134        handle.assert_internal_outcome(DataCategory::Error, 1);
1135        drop(s.next());
1136        handle.assert_internal_outcome(DataCategory::Error, 1);
1137        handle.assert_no_outcomes();
1138
1139        drop(s);
1140
1141        handle.assert_internal_outcome(DataCategory::Error, 1);
1142        handle.assert_internal_outcome(DataCategory::Error, 1);
1143        handle.assert_internal_outcome(DataCategory::Error, 1);
1144    }
1145
1146    #[test]
1147    fn test_split_changing_quantities_should_panic() {
1148        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1149        let (managed, mut handle) = Managed::for_test(value).build();
1150
1151        let mut s = managed.split(|_| std::iter::once(CountedValue(0)));
1152
1153        s.next().unwrap().accept(|_| {});
1154        handle.assert_no_outcomes();
1155
1156        assert!(s.next().is_none());
1157
1158        let r = std::panic::catch_unwind(move || {
1159            drop(s);
1160        });
1161
1162        assert!(
1163            r.is_err(),
1164            "expected split to panic because of mismatched (not enough) outcomes"
1165        );
1166    }
1167
1168    #[test]
1169    fn test_split_more_outcomes_than_before_should_panic() {
1170        let value = CountedVec(vec![0]);
1171        let (managed, mut handle) = Managed::for_test(value).build();
1172
1173        let mut s = managed.split(|_| vec![CountedValue(0), CountedValue(2)].into_iter());
1174
1175        s.next().unwrap().accept(|_| {});
1176        handle.assert_no_outcomes();
1177
1178        let r = std::panic::catch_unwind(move || {
1179            s.next();
1180        });
1181
1182        assert!(
1183            r.is_err(),
1184            "expected split to panic because of mismatched (too many) outcomes"
1185        );
1186    }
1187
1188    #[test]
1189    fn test_split_changing_categories_should_panic() {
1190        struct Special;
1191        impl Counted for Special {
1192            fn quantities(&self) -> Quantities {
1193                smallvec::smallvec![(DataCategory::Error, 1), (DataCategory::Transaction, 1)]
1194            }
1195        }
1196
1197        let value = CountedVec(vec![0]);
1198        let (managed, _handle) = Managed::for_test(value).build();
1199
1200        let mut s = managed.split(|value| value.0.into_iter().map(|_| Special));
1201
1202        let r = std::panic::catch_unwind(move || {
1203            let _ = s.next();
1204        });
1205
1206        assert!(
1207            r.is_err(),
1208            "expected split to panic because of mismatched outcome categories"
1209        );
1210    }
1211
1212    #[test]
1213    fn test_split_assert_fused() {
1214        fn only_fused<T: FusedIterator>(_: T) {}
1215
1216        let (managed, mut handle) = Managed::for_test(CountedVec(vec![0])).build();
1217        only_fused(managed.split(|value| value.0.into_iter().map(CountedValue)));
1218        handle.assert_internal_outcome(DataCategory::Error, 1);
1219    }
1220}