Skip to main content

relay_server/managed/
managed.rs

1#[cfg(debug_assertions)]
2use std::collections::BTreeMap;
3use std::convert::Infallible;
4use std::fmt;
5use std::iter::FusedIterator;
6use std::mem::ManuallyDrop;
7use std::net::IpAddr;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use chrono::{DateTime, Utc};
12use itertools::Either;
13use relay_event_schema::protocol::EventId;
14use relay_quotas::{DataCategory, Scoping};
15use relay_system::Addr;
16use smallvec::SmallVec;
17
18use crate::Envelope;
19use crate::managed::{Counted, ManagedEnvelope, Quantities};
20use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
21use crate::services::processor::ProcessingError;
22
23#[cfg(debug_assertions)]
24mod debug;
25#[cfg(test)]
26mod test;
27
28#[cfg(test)]
29pub use self::test::*;
30
31/// An error which can be extracted into an outcome.
32pub trait OutcomeError {
33    /// Produced error, without attached outcome.
34    type Error;
35
36    /// Consumes the error and returns an outcome and [`Self::Error`].
37    ///
38    /// Returning a `None` outcome should discard the item(s) silently.
39    fn consume(self) -> (Option<Outcome>, Self::Error);
40}
41
42impl OutcomeError for Outcome {
43    type Error = ();
44
45    fn consume(self) -> (Option<Outcome>, Self::Error) {
46        (self, ()).consume()
47    }
48}
49
50impl OutcomeError for Option<Outcome> {
51    type Error = ();
52
53    fn consume(self) -> (Option<Outcome>, Self::Error) {
54        (self, ()).consume()
55    }
56}
57
58impl<E> OutcomeError for (Outcome, E) {
59    type Error = E;
60
61    fn consume(self) -> (Option<Outcome>, Self::Error) {
62        (Some(self.0), self.1)
63    }
64}
65
66impl<E> OutcomeError for (Option<Outcome>, E) {
67    type Error = E;
68
69    fn consume(self) -> (Option<Outcome>, Self::Error) {
70        self
71    }
72}
73
74impl OutcomeError for ProcessingError {
75    type Error = Self;
76
77    fn consume(self) -> (Option<Outcome>, Self::Error) {
78        (self.to_outcome(), self)
79    }
80}
81
82impl OutcomeError for Infallible {
83    type Error = Self;
84
85    fn consume(self) -> (Option<Outcome>, Self::Error) {
86        match self {}
87    }
88}
89
90/// A wrapper type which ensures outcomes have been emitted for an error.
91///
92/// [`Managed`] wraps an error in [`Rejected`] once outcomes for have been emitted for the managed
93/// item.
94#[derive(Debug, Clone, Copy)]
95#[must_use = "a rejection must be propagated"]
96pub struct Rejected<T>(T);
97
98impl<T> Rejected<T> {
99    /// Extracts the underlying error.
100    pub fn into_inner(self) -> T {
101        self.0
102    }
103
104    /// Maps the rejected error to a different error.
105    pub fn map<F, S>(self, f: F) -> Rejected<S>
106    where
107        F: FnOnce(T) -> S,
108    {
109        Rejected(f(self.0))
110    }
111}
112
113impl<T> std::error::Error for Rejected<T>
114where
115    T: std::error::Error,
116{
117    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
118        self.0.source()
119    }
120}
121
122impl<T> std::fmt::Display for Rejected<T>
123where
124    T: std::fmt::Display,
125{
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        self.0.fmt(f)
128    }
129}
130
131impl<T> axum::response::IntoResponse for Rejected<T>
132where
133    T: axum::response::IntoResponse,
134{
135    fn into_response(self) -> axum::response::Response {
136        self.0.into_response()
137    }
138}
139
140/// The [`Managed`] wrapper ensures outcomes are correctly emitted for the contained item.
141pub struct Managed<T: Counted> {
142    value: T,
143    meta: Arc<Meta>,
144    done: AtomicBool,
145}
146
147impl Managed<Box<Envelope>> {
148    /// Creates a managed instance from an unmanaged envelope.
149    pub fn from_envelope(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
150        let meta = Arc::new(Meta {
151            outcome_aggregator,
152            received_at: envelope.received_at(),
153            scoping: envelope.meta().get_partial_scoping().into_scoping(),
154            event_id: envelope.event_id(),
155            remote_addr: envelope.meta().remote_addr(),
156        });
157
158        Self::from_parts(envelope, meta)
159    }
160}
161
162/// Helper trait to abstract over `Vec` and `SmallVec` in [`Managed::retain`].
163pub trait RetainMut<I> {
164    /// Retains only the elements specified by the predicate.
165    fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool);
166}
167
168impl<I> RetainMut<I> for Vec<I> {
169    fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool) {
170        Vec::retain_mut(self, f)
171    }
172}
173impl<I, const N: usize> RetainMut<I> for SmallVec<[I; N]> {
174    fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool) {
175        SmallVec::retain_mut(self, f)
176    }
177}
178
179impl<T: Counted> Managed<T> {
180    /// Creates new [`Managed`] instance with the provided `value` and metadata from a [`ManagedEnvelope`].
181    ///
182    /// The [`Managed`] instance, inherits all metadata from the passed [`ManagedEnvelope`],
183    /// like received time or scoping.
184    pub fn with_meta_from(envelope: &ManagedEnvelope, value: T) -> Self {
185        Self::from_parts(
186            value,
187            Arc::new(Meta {
188                outcome_aggregator: envelope.outcome_aggregator().clone(),
189                received_at: envelope.received_at(),
190                scoping: envelope.scoping(),
191                event_id: envelope.envelope().event_id(),
192                remote_addr: envelope.meta().remote_addr(),
193            }),
194        )
195    }
196
197    /// Creates another [`Managed`] instance, with a new value but shared metadata.
198    pub fn wrap<S>(&self, other: S) -> Managed<S>
199    where
200        S: Counted,
201    {
202        Managed::from_parts(other, Arc::clone(&self.meta))
203    }
204
205    /// Boxes the contained value.
206    pub fn boxed(self) -> Managed<Box<T>> {
207        self.map(|value, _| Box::new(value))
208    }
209
210    /// Original received timestamp.
211    pub fn received_at(&self) -> DateTime<Utc> {
212        self.meta.received_at
213    }
214
215    /// Scoping information stored in this context.
216    pub fn scoping(&self) -> Scoping {
217        self.meta.scoping
218    }
219
220    /// Updates the scoping stored in this context.
221    ///
222    /// Special care has to be taken when items contained in the managed instance also store a
223    /// scoping. Such a scoping will **not** be updated.
224    ///
225    /// Conversions between `Managed<Box<Envelope>>` and `ManagedEnvelope` transfer the scoping
226    /// correctly.
227    ///
228    /// See also: [`ManagedEnvelope::scope`].
229    pub fn scope(&mut self, scoping: Scoping) {
230        let meta = Arc::make_mut(&mut self.meta);
231        meta.scoping = scoping;
232    }
233
234    /// Merge [`Self`] with another [`Managed`] instance using a mapping function.
235    ///
236    /// The caller's closure is expected to merge `other`'s inner value into `self`'s inner value.
237    /// The outcome records of `self` are automatically offset by the records of `other`.
238    pub fn merge_with<S, F>(&mut self, other: Managed<S>, f: F)
239    where
240        S: Counted,
241        F: FnOnce(&mut T, S, &mut RecordKeeper),
242    {
243        self.modify(|s, records| {
244            for (category, quantity) in other.quantities() {
245                records.modify_by(category, quantity as isize);
246            }
247            other.accept(|o| f(s, o, records));
248        })
249    }
250
251    /// Splits [`Self`] into two other [`Managed`] items.
252    ///
253    /// The two resulting managed instances together are expected to have the same outcomes as the original instance..
254    /// Since splitting may introduce a new type of item, which some of the original
255    /// quantities are transferred to, there may be new additional data categories created.
256    pub fn split_once<F, S, U>(self, f: F) -> (Managed<S>, Managed<U>)
257    where
258        F: FnOnce(T, &mut RecordKeeper) -> (S, U),
259        S: Counted,
260        U: Counted,
261    {
262        debug_assert!(!self.is_done());
263
264        let (value, meta) = self.destructure();
265        let quantities = value.quantities();
266
267        let mut records = RecordKeeper::new(&meta, quantities);
268
269        let (a, b) = f(value, &mut records);
270
271        let mut quantities = a.quantities();
272        quantities.extend(b.quantities());
273        records.success(quantities);
274
275        (
276            Managed::from_parts(a, Arc::clone(&meta)),
277            Managed::from_parts(b, meta),
278        )
279    }
280
281    /// Splits [`Self`] into a variable amount if individual items.
282    ///
283    /// Useful when the current instance contains multiple items of the same type
284    /// and must be split into individually managed items.
285    pub fn split<F, I, S>(self, f: F) -> Split<I::IntoIter, I::Item>
286    where
287        F: FnOnce(T) -> I,
288        I: IntoIterator<Item = S>,
289        S: Counted,
290    {
291        self.split_with_context(|value| (f(value), ())).0
292    }
293
294    /// Splits [`Self`] into a variable amount if individual items.
295    ///
296    /// Like [`Self::split`] but also allows returning an untracked context,
297    /// a way of returning additional data when deconstructing the original item.
298    pub fn split_with_context<F, I, S, C>(self, f: F) -> (Split<I::IntoIter, I::Item>, C)
299    where
300        F: FnOnce(T) -> (I, C),
301        I: IntoIterator<Item = S>,
302        S: Counted,
303    {
304        debug_assert!(!self.is_done());
305
306        let (value, meta) = self.destructure();
307        #[cfg(debug_assertions)]
308        let quantities = value.quantities();
309
310        let (items, context) = f(value);
311
312        (
313            Split {
314                #[cfg(debug_assertions)]
315                quantities,
316                items: items.into_iter(),
317                meta,
318                exhausted: false,
319            },
320            context,
321        )
322    }
323
324    /// Filters individual items and emits outcomes for them if they are removed.
325    ///
326    /// This is particularly useful when the managed instance is a container of individual items,
327    /// which need to be processed or filtered on a case by case basis.
328    ///
329    /// # Examples:
330    ///
331    /// ```
332    /// # use relay_server::managed::{Counted, Managed, Quantities};
333    /// # #[derive(Copy, Clone)]
334    /// # struct Context<'a>(&'a u32);
335    /// # struct Item;
336    /// struct Items {
337    ///     items: Vec<Item>,
338    /// }
339    /// # impl Counted for Items {
340    /// #   fn quantities(&self) -> Quantities {
341    /// #       todo!()
342    /// #   }
343    /// # }
344    /// # impl Counted for Item {
345    /// #   fn quantities(&self) -> Quantities {
346    /// #       todo!()
347    /// #   }
348    /// # }
349    /// # type Error = std::convert::Infallible;
350    ///
351    /// fn process_items(items: &mut Managed<Items>, ctx: Context<'_>) {
352    ///     items.retain(|items| &mut items.items, |item, _| process(item, ctx));
353    /// }
354    ///
355    /// fn process(item: &mut Item, ctx: Context<'_>) -> Result<(), Error> {
356    ///     todo!()
357    /// }
358    /// ```
359    pub fn retain<S, I, U, E, V>(&mut self, select: S, mut retain: U)
360    where
361        S: FnOnce(&mut T) -> &mut V,
362        I: Counted,
363        U: FnMut(&mut I, &mut RecordKeeper<'_>) -> Result<(), E>,
364        E: OutcomeError,
365        V: RetainMut<I>,
366    {
367        self.retain_with_context(
368            |inner| (select(inner), &()),
369            |item, _, records| retain(item, records),
370        );
371    }
372
373    /// Filters individual items and emits outcomes for them if they are removed.
374    ///
375    /// Like [`Self::retain`], but it allows for an additional context extracted from the managed
376    /// object passed to the retain function.
377    ///
378    /// # Examples:
379    ///
380    /// ```
381    /// # use relay_server::managed::{Counted, Managed, Quantities};
382    /// # #[derive(Copy, Clone)]
383    /// # struct Context<'a>(&'a u32);
384    /// # struct Item;
385    /// struct Items {
386    ///     ty: String,
387    ///     items: Vec<Item>,
388    /// }
389    /// # impl Counted for Items {
390    /// #   fn quantities(&self) -> Quantities {
391    /// #       todo!()
392    /// #   }
393    /// # }
394    /// # impl Counted for Item {
395    /// #   fn quantities(&self) -> Quantities {
396    /// #       todo!()
397    /// #   }
398    /// # }
399    /// # type Error = std::convert::Infallible;
400    ///
401    /// fn process_items(items: &mut Managed<Items>, ctx: Context<'_>) {
402    ///     items.retain_with_context(|items| (&mut items.items, &items.ty), |item, ty, _| process(item, ty, ctx));
403    /// }
404    ///
405    /// fn process(item: &mut Item, ty: &str, ctx: Context<'_>) -> Result<(), Error> {
406    ///     todo!()
407    /// }
408    /// ```
409    pub fn retain_with_context<S, C, I, U, E, V>(&mut self, select: S, mut retain: U)
410    where
411        // Returning `&'a C` here is not optimal, ideally we return C here and express the correct
412        // bound of `C: 'a` but this is, to my knowledge, currently not possible to express in stable Rust.
413        //
414        // This is unfortunately a bit limiting but for most of our purposes it is enough.
415        for<'a> S: FnOnce(&'a mut T) -> (&'a mut V, &'a C),
416        I: Counted,
417        U: FnMut(&mut I, &C, &mut RecordKeeper<'_>) -> Result<(), E>,
418        E: OutcomeError,
419        V: RetainMut<I>,
420    {
421        self.modify(|inner, records| {
422            let (items, ctx) = select(inner);
423            items.retain_mut(|item| match retain(item, ctx, records) {
424                Ok(()) => true,
425                Err(err) => {
426                    records.reject_err(err, &*item);
427                    false
428                }
429            })
430        });
431    }
432
433    /// Maps a [`Managed<T>`] to [`Managed<S>`] by applying the mapping function `f`.
434    ///
435    /// Like [`Self::try_map`] but not fallible.
436    pub fn map<S, F>(self, f: F) -> Managed<S>
437    where
438        F: FnOnce(T, &mut RecordKeeper) -> S,
439        S: Counted,
440    {
441        self.try_map(move |inner, records| Ok::<_, Infallible>(f(inner, records)))
442            .unwrap_or_else(|e| match e.0 {})
443    }
444
445    /// Maps a [`Managed<T>`] to [`Managed<S>`] by applying the mapping function `f`.
446    ///
447    /// The mapping function gets access to a [`RecordKeeper`], to emit outcomes for partial
448    /// discards.
449    ///
450    /// If the mapping function returns an error, the entire (original) [`Self`] is rejected,
451    /// no partial outcomes are emitted.
452    pub fn try_map<S, F, E>(self, f: F) -> Result<Managed<S>, Rejected<E::Error>>
453    where
454        F: FnOnce(T, &mut RecordKeeper) -> Result<S, E>,
455        S: Counted,
456        E: OutcomeError,
457    {
458        debug_assert!(!self.is_done());
459
460        let (value, meta) = self.destructure();
461        let quantities = value.quantities();
462
463        let mut records = RecordKeeper::new(&meta, quantities);
464
465        match f(value, &mut records) {
466            Ok(value) => {
467                records.success(value.quantities());
468                Ok(Managed::from_parts(value, meta))
469            }
470            Err(err) => Err(records.failure(err)),
471        }
472    }
473
474    /// Gives mutable access to the contained value to modify it.
475    ///
476    /// Like [`Self::try_modify`] but not fallible.
477    pub fn modify<F>(&mut self, f: F)
478    where
479        F: FnOnce(&mut T, &mut RecordKeeper),
480    {
481        self.try_modify(move |inner, records| {
482            f(inner, records);
483            Ok::<_, Infallible>(())
484        })
485        .unwrap_or_else(|e| match e {})
486    }
487
488    /// Gives mutable access to the contained value to modify it.
489    ///
490    /// The modifying function gets access to a [`RecordKeeper`], to emit outcomes for partial
491    /// discards.
492    ///
493    /// If the modifying function returns an error, the entire (original) [`Self`] is rejected,
494    /// no partial outcomes are emitted.
495    pub fn try_modify<F, E>(&mut self, f: F) -> Result<(), Rejected<E::Error>>
496    where
497        F: FnOnce(&mut T, &mut RecordKeeper) -> Result<(), E>,
498        E: OutcomeError,
499    {
500        debug_assert!(!self.is_done());
501
502        let quantities = self.value.quantities();
503        let mut records = RecordKeeper::new(&self.meta, quantities);
504
505        match f(&mut self.value, &mut records) {
506            Ok(()) => {
507                records.success(self.value.quantities());
508                Ok(())
509            }
510            Err(err) => {
511                let err = records.failure(err);
512                self.done.store(true, Ordering::Relaxed);
513                Err(err)
514            }
515        }
516    }
517
518    /// Accepts the item of this managed instance.
519    ///
520    /// This should be called if the item has been or is about to be accepted by the upstream, which means that
521    /// the responsibility for logging outcomes has been moved. This function will not log any
522    /// outcomes.
523    ///
524    /// Like [`Self::try_accept`], but infallible.
525    pub fn accept<F, S>(self, f: F) -> S
526    where
527        F: FnOnce(T) -> S,
528    {
529        self.try_accept(|item| Ok::<_, Infallible>(f(item)))
530            .unwrap_or_else(|err| match err.0 {})
531    }
532
533    /// Accepts the item of this managed instance.
534    ///
535    /// This should be called if the item has been or is about to be accepted by the upstream.
536    ///
537    /// Outcomes are only emitted when the accepting closure returns an error, which means that
538    /// in the success case the responsibility for logging outcomes has been moved to the
539    /// caller/upstream.
540    pub fn try_accept<F, S, E>(self, f: F) -> Result<S, Rejected<E::Error>>
541    where
542        F: FnOnce(T) -> Result<S, E>,
543        E: OutcomeError,
544    {
545        debug_assert!(!self.is_done());
546
547        let (value, meta) = self.destructure();
548        let records = RecordKeeper::new(&meta, value.quantities());
549
550        match f(value) {
551            Ok(value) => {
552                records.accept();
553                Ok(value)
554            }
555            Err(err) => Err(records.failure(err)),
556        }
557    }
558
559    /// Rejects the entire [`Managed`] instance with an internal error.
560    ///
561    /// Internal errors should be reserved for uses where logical invariants are violated.
562    /// Cases which should never happen and always indicate a logical bug.
563    ///
564    /// This function will panic in debug builds, but discard the item
565    /// with an internal discard reason in release builds.
566    #[track_caller]
567    pub fn internal_error(&self, reason: &'static str) -> Rejected<()> {
568        relay_log::error!("internal error: {reason}");
569        debug_assert!(false, "internal error: {reason}");
570        self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
571    }
572
573    /// Rejects the entire [`Managed`] instance.
574    pub fn reject_err<E>(&self, error: E) -> Rejected<E::Error>
575    where
576        E: OutcomeError,
577    {
578        debug_assert!(!self.is_done());
579
580        let (outcome, error) = error.consume();
581        self.do_reject(outcome);
582        Rejected(error)
583    }
584
585    fn do_reject(&self, outcome: Option<Outcome>) {
586        // Always set the internal state to `done`, even if there is no outcome to be emitted.
587        // All bookkeeping has been done.
588        let is_done = self.done.fetch_or(true, Ordering::Relaxed);
589
590        // No outcome to emit, we're done.
591        let Some(outcome) = outcome else {
592            return;
593        };
594
595        // Only emit outcomes if we were not yet done.
596        //
597        // Callers should guard against accidentally calling `do_reject` when the `is_done` flag is
598        // already set, but internal uses (like `Drop`) can rely on this double emission
599        // prevention.
600        if !is_done {
601            for (category, quantity) in self.value.quantities() {
602                self.meta.track_outcome(outcome.clone(), category, quantity);
603            }
604        }
605    }
606
607    /// De-structures this managed instance into its own parts.
608    ///
609    /// While de-structured no outcomes will be emitted on drop.
610    ///
611    /// Currently no `Managed`, which already has outcomes emitted, should be de-structured
612    /// as this status is lost.
613    fn destructure(self) -> (T, Arc<Meta>) {
614        // SAFETY: this follows an approach mentioned in the RFC
615        // <https://github.com/rust-lang/rfcs/pull/3466> to move fields out of
616        // a type with a drop implementation.
617        //
618        // The original type is wrapped in a manual drop to prevent running the
619        // drop handler, afterwards all fields are moved out of the type.
620        //
621        // And the original type is forgotten, de-structuring the original type
622        // without running its drop implementation.
623        let this = ManuallyDrop::new(self);
624        let Managed { value, meta, done } = &*this;
625
626        let value = unsafe { std::ptr::read(value) };
627        let meta = unsafe { std::ptr::read(meta) };
628        let done = unsafe { std::ptr::read(done) };
629        // This is a current invariant, if we ever need to change the invariant,
630        // the done status should be preserved and returned instead.
631        debug_assert!(
632            !done.load(Ordering::Relaxed),
633            "a `done` managed should never be destructured"
634        );
635
636        (value, meta)
637    }
638
639    fn from_parts(value: T, meta: Arc<Meta>) -> Self {
640        Self {
641            value,
642            meta,
643            done: AtomicBool::new(false),
644        }
645    }
646
647    fn is_done(&self) -> bool {
648        self.done.load(Ordering::Relaxed)
649    }
650}
651
652impl<T: Counted> Drop for Managed<T> {
653    fn drop(&mut self) {
654        self.do_reject(Some(Outcome::Invalid(DiscardReason::Internal)));
655    }
656}
657
658impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
659    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
660        write!(f, "Managed<{}>[", std::any::type_name::<T>())?;
661        for (i, (category, quantity)) in self.value.quantities().iter().enumerate() {
662            if i > 0 {
663                write!(f, ",")?;
664            }
665            write!(f, "{category}:{quantity}")?;
666        }
667        write!(f, "](")?;
668        self.value.fmt(f)?;
669        write!(f, ")")
670    }
671}
672
673impl<T: Counted> Managed<Option<T>> {
674    /// Turns a managed option into an optional [`Managed`].
675    pub fn transpose(self) -> Option<Managed<T>> {
676        let (o, meta) = self.destructure();
677        o.map(|t| Managed::from_parts(t, meta))
678    }
679}
680
681impl<L: Counted, R: Counted> Managed<Either<L, R>> {
682    /// Turns a managed [`Either`] into an [`Either`] of [`Managed`].
683    pub fn transpose(self) -> Either<Managed<L>, Managed<R>> {
684        let (either, meta) = self.destructure();
685        match either {
686            Either::Left(value) => Either::Left(Managed::from_parts(value, meta)),
687            Either::Right(value) => Either::Right(Managed::from_parts(value, meta)),
688        }
689    }
690}
691
692impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
693    fn from(value: Managed<Box<Envelope>>) -> Self {
694        let (value, meta) = value.destructure();
695        let mut envelope = ManagedEnvelope::new(value, meta.outcome_aggregator.clone());
696        envelope.scope(meta.scoping);
697        envelope
698    }
699}
700
701impl<T: Counted> AsRef<T> for Managed<T> {
702    fn as_ref(&self) -> &T {
703        &self.value
704    }
705}
706
707impl<T: Counted> std::ops::Deref for Managed<T> {
708    type Target = T;
709
710    fn deref(&self) -> &Self::Target {
711        &self.value
712    }
713}
714
715/// Internal metadata attached with a [`Managed`] instance.
716#[derive(Debug, Clone)]
717struct Meta {
718    /// Outcome aggregator service.
719    outcome_aggregator: Addr<TrackOutcome>,
720    /// Received timestamp, when the contained payload/information was received.
721    ///
722    /// See also: [`crate::extractors::RequestMeta::received_at`].
723    received_at: DateTime<Utc>,
724    /// Data scoping information of the contained item.
725    scoping: Scoping,
726    /// Optional event id associated with the contained data.
727    event_id: Option<EventId>,
728    /// Optional remote addr from where the data was received from.
729    remote_addr: Option<IpAddr>,
730}
731
732impl Meta {
733    pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
734        self.outcome_aggregator.send(TrackOutcome {
735            timestamp: self.received_at,
736            scoping: self.scoping,
737            outcome,
738            event_id: self.event_id,
739            remote_addr: self.remote_addr,
740            category,
741            quantity: quantity.try_into().unwrap_or(u32::MAX),
742        });
743    }
744}
745
746/// A record keeper makes sure modifications done on a [`Managed`] item are all accounted for
747/// correctly.
748pub struct RecordKeeper<'a> {
749    meta: &'a Meta,
750    on_drop: Quantities,
751    #[cfg(debug_assertions)]
752    lenient: SmallVec<[DataCategory; 1]>,
753    #[cfg(debug_assertions)]
754    modifications: BTreeMap<DataCategory, isize>,
755    in_flight: SmallVec<[(DataCategory, usize, Option<Outcome>); 2]>,
756}
757
758impl<'a> RecordKeeper<'a> {
759    fn new(meta: &'a Meta, quantities: Quantities) -> Self {
760        Self {
761            meta,
762            on_drop: quantities,
763            #[cfg(debug_assertions)]
764            lenient: Default::default(),
765            #[cfg(debug_assertions)]
766            modifications: Default::default(),
767            in_flight: Default::default(),
768        }
769    }
770
771    /// Marking a data category as lenient exempts this category from outcome quantity validations.
772    ///
773    /// Consider using [`Self::modify_by`] instead.
774    ///
775    /// This can be used in cases where the quantity is knowingly modified, which is quite common
776    /// for data categories which count bytes.
777    pub fn lenient(&mut self, category: DataCategory) {
778        let _category = category;
779        #[cfg(debug_assertions)]
780        self.lenient.push(_category);
781    }
782
783    /// Modifies the expected count for a category.
784    ///
785    /// When extracting payloads category counts may expectedly change, these changes can be
786    /// tracked using this function.
787    ///
788    /// Prefer using [`Self::modify_by`] over [`Self::lenient`] as lenient completely disables
789    /// validation for the entire category.
790    pub fn modify_by(&mut self, category: DataCategory, offset: isize) {
791        let _category = category;
792        let _offset = offset;
793        #[cfg(debug_assertions)]
794        {
795            *self.modifications.entry(_category).or_default() += offset;
796        }
797    }
798
799    /// Finalizes all records and emits the necessary outcomes.
800    ///
801    /// This uses the quantities of the original item.
802    fn failure<E>(mut self, error: E) -> Rejected<E::Error>
803    where
804        E: OutcomeError,
805    {
806        let (outcome, error) = error.consume();
807
808        if let Some(outcome) = outcome {
809            for (category, quantity) in std::mem::take(&mut self.on_drop) {
810                self.meta.track_outcome(outcome.clone(), category, quantity);
811            }
812        }
813
814        Rejected(error)
815    }
816
817    /// Finalizes all records and asserts that no additional outcomes have been tracked.
818    ///
819    /// Unlike [`Self::success`], this method does not allow for intermediate or partial outcomes,
820    /// it also does not verify any outcomes.
821    ///
822    /// This method is useful for using the record keeper to track failure outcomes, either
823    /// explicit failures or panics.
824    fn accept(mut self) {
825        debug_assert!(
826            self.in_flight.is_empty(),
827            "records accepted, but intermediate outcomes tracked"
828        );
829        self.on_drop.clear();
830    }
831
832    /// Finalizes all records and emits the created outcomes.
833    ///
834    /// This only emits the outcomes that have been explicitly registered.
835    /// In a debug build, the function also ensure no outcomes have been missed by comparing
836    /// quantities of the item before and after.
837    fn success(mut self, new: Quantities) {
838        let original = std::mem::take(&mut self.on_drop);
839        self.assert_quantities(original, new);
840
841        self.on_drop.clear();
842        for (category, quantity, outcome) in std::mem::take(&mut self.in_flight) {
843            if let Some(outcome) = outcome {
844                self.meta.track_outcome(outcome, category, quantity);
845            }
846        }
847    }
848
849    /// Asserts that there have been no quantities lost.
850    ///
851    /// The original amount of quantities should match the new amount of quantities + all emitted
852    /// outcomes.
853    #[cfg(debug_assertions)]
854    fn assert_quantities(&self, original: Quantities, new: Quantities) {
855        macro_rules! emit {
856            ($category:expr, $($tt:tt)*) => {{
857                match self.lenient.contains(&$category) {
858                    // Certain categories are known to be not always correct,
859                    // they are logged instead.
860                    true => relay_log::debug!($($tt)*),
861                    false  => {
862                        relay_log::error!("Original: {original:?}");
863                        relay_log::error!("New: {new:?}");
864                        relay_log::error!("Modifications: {:?}", self.modifications);
865                        relay_log::error!("In Flight: {:?}", self.in_flight);
866                        panic!($($tt)*)
867                    }
868                }
869            }};
870        }
871
872        let mut sums = debug::Quantities::from(&original).0;
873        for (category, offset) in &self.modifications {
874            let v = sums.entry(*category).or_default();
875            match v.checked_add_signed(*offset) {
876                Some(result) => *v = result,
877                None => emit!(
878                    category,
879                    "Attempted to modify original quantity {v} into the negative ({offset})"
880                ),
881            }
882        }
883
884        for (category, quantity, outcome) in &self.in_flight {
885            match sums.get_mut(category) {
886                Some(c) if *c >= *quantity => *c -= *quantity,
887                Some(c) => emit!(
888                    category,
889                    "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there were only {c} items in the category originally"
890                ),
891                None => emit!(
892                    category,
893                    "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there never was an item in this category"
894                ),
895            }
896        }
897
898        for (category, quantity) in &new {
899            match sums.get_mut(category) {
900                Some(c) if *c >= *quantity => *c -= *quantity,
901                Some(c) => emit!(
902                    category,
903                    "New item has {quantity} items in category '{category}', but original (after emitted outcomes) only has {c} left"
904                ),
905                None => emit!(
906                    category,
907                    "New item has {quantity} items in category '{category}', but after emitted outcomes there are none left"
908                ),
909            }
910        }
911
912        for (category, quantity) in sums {
913            if quantity > 0 {
914                emit!(
915                    category,
916                    "Missing outcomes or mismatched quantity in category '{category}', off by {quantity}"
917                );
918            }
919        }
920    }
921
922    #[cfg(not(debug_assertions))]
923    fn assert_quantities(&self, _: Quantities, _: Quantities) {}
924}
925
926impl<'a> Drop for RecordKeeper<'a> {
927    fn drop(&mut self) {
928        for (category, quantity) in std::mem::take(&mut self.on_drop) {
929            self.meta.track_outcome(
930                Outcome::Invalid(DiscardReason::Internal),
931                category,
932                quantity,
933            );
934        }
935    }
936}
937
938impl RecordKeeper<'_> {
939    /// Rejects an item if the passed result is an error and returns a default value.
940    ///
941    /// Similar to [`Self::reject_err`], this emits the necessary outcomes for an
942    /// item, if there is an error.
943    pub fn or_default<T, E, Q>(&mut self, r: Result<T, E>, q: Q) -> T
944    where
945        T: Default,
946        E: OutcomeError,
947        Q: Counted,
948    {
949        match r {
950            Ok(result) => result,
951            Err(err) => {
952                self.reject_err(err, q);
953                T::default()
954            }
955        }
956    }
957
958    /// Rejects an item with an error.
959    ///
960    /// Makes sure the correct outcomes are tracked for the item, that is discarded due to an
961    /// error.
962    pub fn reject_err<E, Q>(&mut self, err: E, q: Q) -> E::Error
963    where
964        E: OutcomeError,
965        Q: Counted,
966    {
967        let (outcome, err) = err.consume();
968        for (category, quantity) in q.quantities() {
969            self.in_flight.push((category, quantity, outcome.clone()))
970        }
971        err
972    }
973
974    /// Rejects an item with an internal error.
975    ///
976    /// See also: [`Managed::internal_error`].
977    #[track_caller]
978    pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
979    where
980        E: std::error::Error + 'static,
981        Q: Counted,
982    {
983        relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
984        debug_assert!(false, "internal error: {error}");
985        self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
986    }
987}
988
989/// Iterator returned by [`Managed::split`].
990pub struct Split<I, S>
991where
992    I: Iterator<Item = S>,
993    S: Counted,
994{
995    #[cfg(debug_assertions)]
996    quantities: Quantities,
997    items: I,
998    meta: Arc<Meta>,
999    exhausted: bool,
1000}
1001
1002impl<I, S> Split<I, S>
1003where
1004    I: Iterator<Item = S>,
1005    S: Counted,
1006{
1007    /// Subtracts passed quantities from the total quantities to verify total quantity counts are
1008    /// matching.
1009    #[cfg(debug_assertions)]
1010    fn subtract(&mut self, q: Quantities) {
1011        for (category, quantities) in q {
1012            let Some(orig_quantities) = self
1013                .quantities
1014                .iter_mut()
1015                .find_map(|(c, q)| (*c == category).then_some(q))
1016            else {
1017                debug_assert!(
1018                    false,
1019                    "mismatching quantities, item split into category {category}, \
1020                    which originally was not present"
1021                );
1022                continue;
1023            };
1024
1025            if *orig_quantities >= quantities {
1026                *orig_quantities -= quantities;
1027            } else {
1028                debug_assert!(
1029                    false,
1030                    "in total more items produced in category {category} than originally available"
1031                );
1032            }
1033        }
1034    }
1035}
1036
1037impl<I, S> Iterator for Split<I, S>
1038where
1039    I: Iterator<Item = S>,
1040    S: Counted,
1041{
1042    type Item = Managed<S>;
1043
1044    fn next(&mut self) -> Option<Self::Item> {
1045        let next = match self.items.next() {
1046            Some(next) => next,
1047            None => {
1048                self.exhausted = true;
1049                return None;
1050            }
1051        };
1052
1053        #[cfg(debug_assertions)]
1054        self.subtract(next.quantities());
1055
1056        Some(Managed::from_parts(next, Arc::clone(&self.meta)))
1057    }
1058}
1059
1060impl<I, S> Drop for Split<I, S>
1061where
1062    I: Iterator<Item = S>,
1063    S: Counted,
1064{
1065    fn drop(&mut self) {
1066        // If the inner iterator was exhausted, no items should be remaining.
1067        #[cfg(debug_assertions)]
1068        if self.exhausted {
1069            for (category, quantities) in &self.quantities {
1070                debug_assert!(
1071                    *quantities == 0,
1072                    "items split, but still {quantities} remaining in category {category}"
1073                );
1074            }
1075        }
1076
1077        if self.exhausted {
1078            return;
1079        }
1080
1081        // There may be items remaining in the iterator for multiple reasons:
1082        // - there was a panic
1083        // - the iterator was never fully consumed
1084        //
1085        // In any case, outcomes must be emitted for the remaining items.
1086        for item in &mut self.items {
1087            for (category, quantity) in item.quantities() {
1088                self.meta.track_outcome(
1089                    Outcome::Invalid(DiscardReason::Internal),
1090                    category,
1091                    quantity,
1092                );
1093            }
1094        }
1095    }
1096}
1097
1098impl<I, S> FusedIterator for Split<I, S>
1099where
1100    I: Iterator<Item = S> + FusedIterator,
1101    S: Counted,
1102{
1103}
1104
1105#[cfg(test)]
1106mod tests {
1107
1108    use super::*;
1109
1110    struct CountedVec(Vec<u32>);
1111
1112    impl Counted for CountedVec {
1113        fn quantities(&self) -> Quantities {
1114            smallvec::smallvec![(DataCategory::Error, self.0.len())]
1115        }
1116    }
1117
1118    struct CountedValue(u32);
1119
1120    impl Counted for CountedValue {
1121        fn quantities(&self) -> Quantities {
1122            smallvec::smallvec![(DataCategory::Error, 1)]
1123        }
1124    }
1125
1126    #[test]
1127    fn test_reject_err_no_outcome() {
1128        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1129        let (managed, mut handle) = Managed::for_test(value).build();
1130
1131        // Rejecting with no outcome, should not emit any outcomes.
1132        let _ = managed.reject_err((None, ()));
1133        handle.assert_no_outcomes();
1134
1135        // Now dropping the manged instance, should not record any (internal) outcomes either.
1136        drop(managed);
1137        handle.assert_no_outcomes();
1138    }
1139
1140    #[test]
1141    fn test_merge() {
1142        let (mut a, mut handle_a) = Managed::for_test(CountedVec(vec![1, 2])).build();
1143        let (b, mut handle_b) = Managed::for_test(CountedVec(vec![3, 4])).build();
1144
1145        a.merge_with(b, |a, b, _| a.0.extend(b.0));
1146
1147        assert_eq!(a.0, vec![1, 2, 3, 4]);
1148        drop(a);
1149        handle_a.assert_internal_outcome(DataCategory::Error, 4);
1150        handle_b.assert_no_outcomes();
1151    }
1152
1153    #[test]
1154    fn test_merge_mismatched_records_should_panic() {
1155        let (mut a, mut handle_a) = Managed::for_test(CountedVec(vec![1, 2])).build();
1156        let (b, _handle_b) = Managed::for_test(CountedVec(vec![3, 4])).build();
1157
1158        let r = std::panic::catch_unwind(move || {
1159            a.merge_with(b, |_a, _b, _| {});
1160        });
1161
1162        assert!(
1163            r.is_err(),
1164            "expected merge to panic because of mismatched outcome records"
1165        );
1166        handle_a.assert_internal_outcome(DataCategory::Error, 2);
1167    }
1168
1169    #[test]
1170    fn test_split_fully_consumed() {
1171        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1172        let (managed, mut handle) = Managed::for_test(value).build();
1173
1174        let s = managed
1175            .split(|value| value.0.into_iter().map(CountedValue))
1176            // Fully consume the iterator to make sure there aren't any outcomes emitted on drop.
1177            .collect::<Vec<_>>();
1178
1179        handle.assert_no_outcomes();
1180
1181        for (i, s) in s.into_iter().enumerate() {
1182            assert_eq!(s.as_ref().0, i as u32);
1183            let outcome = Outcome::Invalid(DiscardReason::Cors);
1184            let _ = s.reject_err((outcome.clone(), ()));
1185            handle.assert_outcome(&outcome, DataCategory::Error, 1);
1186        }
1187    }
1188
1189    #[test]
1190    fn test_split_partially_consumed_emits_remaining() {
1191        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1192        let (managed, mut handle) = Managed::for_test(value).build();
1193
1194        let mut s = managed.split(|value| value.0.into_iter().map(CountedValue));
1195        handle.assert_no_outcomes();
1196
1197        drop(s.next());
1198        handle.assert_internal_outcome(DataCategory::Error, 1);
1199        drop(s.next());
1200        handle.assert_internal_outcome(DataCategory::Error, 1);
1201        drop(s.next());
1202        handle.assert_internal_outcome(DataCategory::Error, 1);
1203        handle.assert_no_outcomes();
1204
1205        drop(s);
1206
1207        handle.assert_internal_outcome(DataCategory::Error, 1);
1208        handle.assert_internal_outcome(DataCategory::Error, 1);
1209        handle.assert_internal_outcome(DataCategory::Error, 1);
1210    }
1211
1212    #[test]
1213    fn test_split_changing_quantities_should_panic() {
1214        let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1215        let (managed, mut handle) = Managed::for_test(value).build();
1216
1217        let mut s = managed.split(|_| std::iter::once(CountedValue(0)));
1218
1219        s.next().unwrap().accept(|_| {});
1220        handle.assert_no_outcomes();
1221
1222        assert!(s.next().is_none());
1223
1224        let r = std::panic::catch_unwind(move || {
1225            drop(s);
1226        });
1227
1228        assert!(
1229            r.is_err(),
1230            "expected split to panic because of mismatched (not enough) outcomes"
1231        );
1232    }
1233
1234    #[test]
1235    fn test_split_more_outcomes_than_before_should_panic() {
1236        let value = CountedVec(vec![0]);
1237        let (managed, mut handle) = Managed::for_test(value).build();
1238
1239        let mut s = managed.split(|_| vec![CountedValue(0), CountedValue(2)].into_iter());
1240
1241        s.next().unwrap().accept(|_| {});
1242        handle.assert_no_outcomes();
1243
1244        let r = std::panic::catch_unwind(move || {
1245            s.next();
1246        });
1247
1248        assert!(
1249            r.is_err(),
1250            "expected split to panic because of mismatched (too many) outcomes"
1251        );
1252    }
1253
1254    #[test]
1255    fn test_split_changing_categories_should_panic() {
1256        struct Special;
1257        impl Counted for Special {
1258            fn quantities(&self) -> Quantities {
1259                smallvec::smallvec![(DataCategory::Error, 1), (DataCategory::Transaction, 1)]
1260            }
1261        }
1262
1263        let value = CountedVec(vec![0]);
1264        let (managed, _handle) = Managed::for_test(value).build();
1265
1266        let mut s = managed.split(|value| value.0.into_iter().map(|_| Special));
1267
1268        let r = std::panic::catch_unwind(move || {
1269            let _ = s.next();
1270        });
1271
1272        assert!(
1273            r.is_err(),
1274            "expected split to panic because of mismatched outcome categories"
1275        );
1276    }
1277
1278    #[test]
1279    fn test_split_assert_fused() {
1280        fn only_fused<T: FusedIterator>(_: T) {}
1281
1282        let (managed, mut handle) = Managed::for_test(CountedVec(vec![0])).build();
1283        only_fused(managed.split(|value| value.0.into_iter().map(CountedValue)));
1284        handle.assert_internal_outcome(DataCategory::Error, 1);
1285    }
1286}