1#[cfg(debug_assertions)]
2use std::collections::BTreeMap;
3use std::convert::Infallible;
4use std::fmt;
5use std::iter::FusedIterator;
6use std::mem::ManuallyDrop;
7use std::net::IpAddr;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use chrono::{DateTime, Utc};
12use itertools::Either;
13use relay_event_schema::protocol::EventId;
14use relay_quotas::{DataCategory, Scoping};
15use relay_system::Addr;
16use smallvec::SmallVec;
17
18use crate::Envelope;
19use crate::endpoints::common::BadStoreRequest;
20use crate::extractors::RequestMeta;
21use crate::managed::{Counted, ManagedEnvelope, Quantities};
22use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
23use crate::services::processor::ProcessingError;
24
25#[cfg(debug_assertions)]
26mod debug;
27#[cfg(test)]
28mod test;
29
30#[cfg(test)]
31pub use self::test::*;
32
33pub trait OutcomeError {
35 type Error;
37
38 fn consume(self) -> (Option<Outcome>, Self::Error);
42}
43
44impl OutcomeError for Outcome {
45 type Error = ();
46
47 fn consume(self) -> (Option<Outcome>, Self::Error) {
48 (self, ()).consume()
49 }
50}
51
52impl OutcomeError for Option<Outcome> {
53 type Error = ();
54
55 fn consume(self) -> (Option<Outcome>, Self::Error) {
56 (self, ()).consume()
57 }
58}
59
60impl<E> OutcomeError for (Outcome, E) {
61 type Error = E;
62
63 fn consume(self) -> (Option<Outcome>, Self::Error) {
64 (Some(self.0), self.1)
65 }
66}
67
68impl<E> OutcomeError for (Option<Outcome>, E) {
69 type Error = E;
70
71 fn consume(self) -> (Option<Outcome>, Self::Error) {
72 self
73 }
74}
75
76impl OutcomeError for ProcessingError {
77 type Error = Self;
78
79 fn consume(self) -> (Option<Outcome>, Self::Error) {
80 (self.to_outcome(), self)
81 }
82}
83
84impl OutcomeError for Infallible {
85 type Error = Self;
86
87 fn consume(self) -> (Option<Outcome>, Self::Error) {
88 match self {}
89 }
90}
91
92impl OutcomeError for BadStoreRequest {
93 type Error = Self;
94
95 fn consume(self) -> (Option<Outcome>, Self) {
96 (self.to_outcome(), self)
97 }
98}
99
100#[derive(Debug, Clone, Copy)]
105#[must_use = "a rejection must be propagated"]
106pub struct Rejected<T>(T);
107
108impl<T> Rejected<T> {
109 pub fn into_inner(self) -> T {
111 self.0
112 }
113
114 pub fn map<F, S>(self, f: F) -> Rejected<S>
116 where
117 F: FnOnce(T) -> S,
118 {
119 Rejected(f(self.0))
120 }
121}
122
123impl<T> std::error::Error for Rejected<T>
124where
125 T: std::error::Error,
126{
127 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
128 self.0.source()
129 }
130}
131
132impl<T> std::fmt::Display for Rejected<T>
133where
134 T: std::fmt::Display,
135{
136 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137 self.0.fmt(f)
138 }
139}
140
141impl<T> axum::response::IntoResponse for Rejected<T>
142where
143 T: axum::response::IntoResponse,
144{
145 fn into_response(self) -> axum::response::Response {
146 self.0.into_response()
147 }
148}
149
150pub struct Managed<T: Counted> {
152 value: T,
153 meta: Arc<Meta>,
154 done: AtomicBool,
155}
156
157impl Managed<Box<Envelope>> {
158 pub fn from_envelope(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
160 let meta = Arc::new(Meta {
161 outcome_aggregator,
162 received_at: envelope.received_at(),
163 scoping: envelope.meta().get_partial_scoping().into_scoping(),
164 event_id: envelope.event_id(),
165 remote_addr: envelope.meta().remote_addr(),
166 });
167
168 Self::from_parts(envelope, meta)
169 }
170}
171
172pub trait RetainMut<I> {
174 fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool);
176}
177
178impl<I> RetainMut<I> for Vec<I> {
179 fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool) {
180 Vec::retain_mut(self, f)
181 }
182}
183impl<I, const N: usize> RetainMut<I> for SmallVec<[I; N]> {
184 fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool) {
185 SmallVec::retain_mut(self, f)
186 }
187}
188
189impl<T: Counted> Managed<T> {
190 pub fn with_meta_from_managed_envelope(envelope: &ManagedEnvelope, value: T) -> Self {
195 Self::from_parts(
196 value,
197 Arc::new(Meta {
198 outcome_aggregator: envelope.outcome_aggregator().clone(),
199 received_at: envelope.received_at(),
200 scoping: envelope.scoping(),
201 event_id: envelope.envelope().event_id(),
202 remote_addr: envelope.meta().remote_addr(),
203 }),
204 )
205 }
206
207 pub fn with_meta_from_request_meta(
209 request_meta: &RequestMeta,
210 outcome_aggregator: &Addr<TrackOutcome>,
211 value: T,
212 ) -> Self {
213 Self::from_parts(
214 value,
215 Arc::new(Meta {
216 outcome_aggregator: outcome_aggregator.clone(),
217 received_at: request_meta.received_at(),
218 scoping: request_meta.get_partial_scoping().into_scoping(),
219 event_id: None,
220 remote_addr: request_meta.remote_addr(),
221 }),
222 )
223 }
224
225 pub fn wrap<S>(&self, other: S) -> Managed<S>
227 where
228 S: Counted,
229 {
230 Managed::from_parts(other, Arc::clone(&self.meta))
231 }
232
233 pub fn boxed(self) -> Managed<Box<T>> {
235 self.map(|value, _| Box::new(value))
236 }
237
238 pub fn received_at(&self) -> DateTime<Utc> {
240 self.meta.received_at
241 }
242
243 pub fn scoping(&self) -> Scoping {
245 self.meta.scoping
246 }
247
248 pub fn scope(&mut self, scoping: Scoping) {
258 let meta = Arc::make_mut(&mut self.meta);
259 meta.scoping = scoping;
260 }
261
262 pub fn merge_with<S, F>(&mut self, other: Managed<S>, f: F)
267 where
268 S: Counted,
269 F: FnOnce(&mut T, S, &mut RecordKeeper),
270 {
271 self.modify(|s, records| {
272 for (category, quantity) in other.quantities() {
273 records.modify_by(category, quantity as isize);
274 }
275 other.accept(|o| f(s, o, records));
276 })
277 }
278
279 pub fn split_once<F, S, U>(self, f: F) -> (Managed<S>, Managed<U>)
285 where
286 F: FnOnce(T, &mut RecordKeeper) -> (S, U),
287 S: Counted,
288 U: Counted,
289 {
290 debug_assert!(!self.is_done());
291
292 let (value, meta) = self.destructure();
293 let quantities = value.quantities();
294
295 let mut records = RecordKeeper::new(&meta, quantities);
296
297 let (a, b) = f(value, &mut records);
298
299 let mut quantities = a.quantities();
300 quantities.extend(b.quantities());
301 records.success(quantities);
302
303 (
304 Managed::from_parts(a, Arc::clone(&meta)),
305 Managed::from_parts(b, meta),
306 )
307 }
308
309 pub fn split<F, I, S>(self, f: F) -> Split<I::IntoIter, I::Item>
314 where
315 F: FnOnce(T) -> I,
316 I: IntoIterator<Item = S>,
317 S: Counted,
318 {
319 self.split_with_context(|value| (f(value), ())).0
320 }
321
322 pub fn split_with_context<F, I, S, C>(self, f: F) -> (Split<I::IntoIter, I::Item>, C)
327 where
328 F: FnOnce(T) -> (I, C),
329 I: IntoIterator<Item = S>,
330 S: Counted,
331 {
332 debug_assert!(!self.is_done());
333
334 let (value, meta) = self.destructure();
335 #[cfg(debug_assertions)]
336 let quantities = value.quantities();
337
338 let (items, context) = f(value);
339
340 (
341 Split {
342 #[cfg(debug_assertions)]
343 quantities,
344 items: items.into_iter(),
345 meta,
346 exhausted: false,
347 },
348 context,
349 )
350 }
351
352 pub fn retain<S, I, U, E, V>(&mut self, select: S, mut retain: U)
388 where
389 S: FnOnce(&mut T) -> &mut V,
390 I: Counted,
391 U: FnMut(&mut I, &mut RecordKeeper<'_>) -> Result<(), E>,
392 E: OutcomeError,
393 V: RetainMut<I>,
394 {
395 self.retain_with_context(
396 |inner| (select(inner), &()),
397 |item, _, records| retain(item, records),
398 );
399 }
400
401 pub fn retain_with_context<S, C, I, U, E, V>(&mut self, select: S, mut retain: U)
438 where
439 for<'a> S: FnOnce(&'a mut T) -> (&'a mut V, &'a C),
444 I: Counted,
445 U: FnMut(&mut I, &C, &mut RecordKeeper<'_>) -> Result<(), E>,
446 E: OutcomeError,
447 V: RetainMut<I>,
448 {
449 self.modify(|inner, records| {
450 let (items, ctx) = select(inner);
451 items.retain_mut(|item| match retain(item, ctx, records) {
452 Ok(()) => true,
453 Err(err) => {
454 records.reject_err(err, &*item);
455 false
456 }
457 })
458 });
459 }
460
461 pub fn map<S, F>(self, f: F) -> Managed<S>
465 where
466 F: FnOnce(T, &mut RecordKeeper) -> S,
467 S: Counted,
468 {
469 self.try_map(move |inner, records| Ok::<_, Infallible>(f(inner, records)))
470 .unwrap_or_else(|e| match e.0 {})
471 }
472
473 pub fn try_map<S, F, E>(self, f: F) -> Result<Managed<S>, Rejected<E::Error>>
481 where
482 F: FnOnce(T, &mut RecordKeeper) -> Result<S, E>,
483 S: Counted,
484 E: OutcomeError,
485 {
486 debug_assert!(!self.is_done());
487
488 let (value, meta) = self.destructure();
489 let quantities = value.quantities();
490
491 let mut records = RecordKeeper::new(&meta, quantities);
492
493 match f(value, &mut records) {
494 Ok(value) => {
495 records.success(value.quantities());
496 Ok(Managed::from_parts(value, meta))
497 }
498 Err(err) => Err(records.failure(err)),
499 }
500 }
501
502 pub fn modify<F>(&mut self, f: F)
506 where
507 F: FnOnce(&mut T, &mut RecordKeeper),
508 {
509 self.try_modify(move |inner, records| {
510 f(inner, records);
511 Ok::<_, Infallible>(())
512 })
513 .unwrap_or_else(|e| match e {})
514 }
515
516 pub fn try_modify<F, E>(&mut self, f: F) -> Result<(), Rejected<E::Error>>
524 where
525 F: FnOnce(&mut T, &mut RecordKeeper) -> Result<(), E>,
526 E: OutcomeError,
527 {
528 debug_assert!(!self.is_done());
529
530 let quantities = self.value.quantities();
531 let mut records = RecordKeeper::new(&self.meta, quantities);
532
533 match f(&mut self.value, &mut records) {
534 Ok(()) => {
535 records.success(self.value.quantities());
536 Ok(())
537 }
538 Err(err) => {
539 let err = records.failure(err);
540 self.done.store(true, Ordering::Relaxed);
541 Err(err)
542 }
543 }
544 }
545
546 pub fn accept<F, S>(self, f: F) -> S
554 where
555 F: FnOnce(T) -> S,
556 {
557 self.try_accept(|item| Ok::<_, Infallible>(f(item)))
558 .unwrap_or_else(|err| match err.0 {})
559 }
560
561 pub fn try_accept<F, S, E>(self, f: F) -> Result<S, Rejected<E::Error>>
569 where
570 F: FnOnce(T) -> Result<S, E>,
571 E: OutcomeError,
572 {
573 debug_assert!(!self.is_done());
574
575 let (value, meta) = self.destructure();
576 let records = RecordKeeper::new(&meta, value.quantities());
577
578 match f(value) {
579 Ok(value) => {
580 records.accept();
581 Ok(value)
582 }
583 Err(err) => Err(records.failure(err)),
584 }
585 }
586
587 #[track_caller]
595 pub fn internal_error(&self, reason: &'static str) -> Rejected<()> {
596 relay_log::error!("internal error: {reason}");
597 debug_assert!(false, "internal error: {reason}");
598 self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
599 }
600
601 pub fn reject_err<E>(&self, error: E) -> Rejected<E::Error>
603 where
604 E: OutcomeError,
605 {
606 debug_assert!(!self.is_done());
607
608 let (outcome, error) = error.consume();
609 self.do_reject(outcome);
610 Rejected(error)
611 }
612
613 fn do_reject(&self, outcome: Option<Outcome>) {
614 let is_done = self.done.fetch_or(true, Ordering::Relaxed);
617
618 let Some(outcome) = outcome else {
620 return;
621 };
622
623 if !is_done {
629 for (category, quantity) in self.value.quantities() {
630 self.meta.track_outcome(outcome.clone(), category, quantity);
631 }
632 }
633 }
634
635 fn destructure(self) -> (T, Arc<Meta>) {
642 let this = ManuallyDrop::new(self);
652 let Managed { value, meta, done } = &*this;
653
654 let value = unsafe { std::ptr::read(value) };
655 let meta = unsafe { std::ptr::read(meta) };
656 let done = unsafe { std::ptr::read(done) };
657 debug_assert!(
660 !done.load(Ordering::Relaxed),
661 "a `done` managed should never be destructured"
662 );
663
664 (value, meta)
665 }
666
667 fn from_parts(value: T, meta: Arc<Meta>) -> Self {
668 Self {
669 value,
670 meta,
671 done: AtomicBool::new(false),
672 }
673 }
674
675 fn is_done(&self) -> bool {
676 self.done.load(Ordering::Relaxed)
677 }
678}
679
680impl<T: Counted> Drop for Managed<T> {
681 fn drop(&mut self) {
682 self.do_reject(Some(Outcome::Invalid(DiscardReason::Internal)));
683 }
684}
685
686impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
687 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
688 write!(f, "Managed<{}>[", std::any::type_name::<T>())?;
689 for (i, (category, quantity)) in self.value.quantities().iter().enumerate() {
690 if i > 0 {
691 write!(f, ",")?;
692 }
693 write!(f, "{category}:{quantity}")?;
694 }
695 write!(f, "](")?;
696 self.value.fmt(f)?;
697 write!(f, ")")
698 }
699}
700
701impl<T: Counted> Managed<Option<T>> {
702 pub fn transpose(self) -> Option<Managed<T>> {
704 let (o, meta) = self.destructure();
705 o.map(|t| Managed::from_parts(t, meta))
706 }
707}
708
709impl<L: Counted, R: Counted> Managed<Either<L, R>> {
710 pub fn transpose(self) -> Either<Managed<L>, Managed<R>> {
712 let (either, meta) = self.destructure();
713 match either {
714 Either::Left(value) => Either::Left(Managed::from_parts(value, meta)),
715 Either::Right(value) => Either::Right(Managed::from_parts(value, meta)),
716 }
717 }
718}
719
720impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
721 fn from(value: Managed<Box<Envelope>>) -> Self {
722 let (value, meta) = value.destructure();
723 let mut envelope = ManagedEnvelope::new(value, meta.outcome_aggregator.clone());
724 envelope.scope(meta.scoping);
725 envelope
726 }
727}
728
729impl<T: Counted> AsRef<T> for Managed<T> {
730 fn as_ref(&self) -> &T {
731 &self.value
732 }
733}
734
735impl<T: Counted> std::ops::Deref for Managed<T> {
736 type Target = T;
737
738 fn deref(&self) -> &Self::Target {
739 &self.value
740 }
741}
742
743#[derive(Debug, Clone)]
745struct Meta {
746 outcome_aggregator: Addr<TrackOutcome>,
748 received_at: DateTime<Utc>,
752 scoping: Scoping,
754 event_id: Option<EventId>,
756 remote_addr: Option<IpAddr>,
758}
759
760impl Meta {
761 pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
762 self.outcome_aggregator.send(TrackOutcome {
763 timestamp: self.received_at,
764 scoping: self.scoping,
765 outcome,
766 event_id: self.event_id,
767 remote_addr: self.remote_addr,
768 category,
769 quantity: quantity.try_into().unwrap_or(u32::MAX),
770 });
771 }
772}
773
774pub struct RecordKeeper<'a> {
777 meta: &'a Meta,
778 on_drop: Quantities,
779 #[cfg(debug_assertions)]
780 lenient: SmallVec<[DataCategory; 1]>,
781 #[cfg(debug_assertions)]
782 modifications: BTreeMap<DataCategory, isize>,
783 in_flight: SmallVec<[(DataCategory, usize, Option<Outcome>); 2]>,
784}
785
786impl<'a> RecordKeeper<'a> {
787 fn new(meta: &'a Meta, quantities: Quantities) -> Self {
788 Self {
789 meta,
790 on_drop: quantities,
791 #[cfg(debug_assertions)]
792 lenient: Default::default(),
793 #[cfg(debug_assertions)]
794 modifications: Default::default(),
795 in_flight: Default::default(),
796 }
797 }
798
799 pub fn lenient(&mut self, category: DataCategory) {
806 let _category = category;
807 #[cfg(debug_assertions)]
808 self.lenient.push(_category);
809 }
810
811 pub fn modify_by(&mut self, category: DataCategory, offset: isize) {
819 let _category = category;
820 let _offset = offset;
821 #[cfg(debug_assertions)]
822 {
823 *self.modifications.entry(_category).or_default() += offset;
824 }
825 }
826
827 fn failure<E>(mut self, error: E) -> Rejected<E::Error>
831 where
832 E: OutcomeError,
833 {
834 let (outcome, error) = error.consume();
835
836 if let Some(outcome) = outcome {
837 for (category, quantity) in std::mem::take(&mut self.on_drop) {
838 self.meta.track_outcome(outcome.clone(), category, quantity);
839 }
840 }
841
842 Rejected(error)
843 }
844
845 fn accept(mut self) {
853 debug_assert!(
854 self.in_flight.is_empty(),
855 "records accepted, but intermediate outcomes tracked"
856 );
857 self.on_drop.clear();
858 }
859
860 fn success(mut self, new: Quantities) {
866 let original = std::mem::take(&mut self.on_drop);
867 self.assert_quantities(original, new);
868
869 self.on_drop.clear();
870 for (category, quantity, outcome) in std::mem::take(&mut self.in_flight) {
871 if let Some(outcome) = outcome {
872 self.meta.track_outcome(outcome, category, quantity);
873 }
874 }
875 }
876
877 #[cfg(debug_assertions)]
882 fn assert_quantities(&self, original: Quantities, new: Quantities) {
883 macro_rules! emit {
884 ($category:expr, $($tt:tt)*) => {{
885 match self.lenient.contains(&$category) {
886 true => relay_log::debug!($($tt)*),
889 false => {
890 relay_log::error!("Original: {original:?}");
891 relay_log::error!("New: {new:?}");
892 relay_log::error!("Modifications: {:?}", self.modifications);
893 relay_log::error!("In Flight: {:?}", self.in_flight);
894 panic!($($tt)*)
895 }
896 }
897 }};
898 }
899
900 let mut sums = debug::Quantities::from(&original).0;
901 for (category, offset) in &self.modifications {
902 let v = sums.entry(*category).or_default();
903 match v.checked_add_signed(*offset) {
904 Some(result) => *v = result,
905 None => emit!(
906 category,
907 "Attempted to modify original quantity {v} into the negative ({offset})"
908 ),
909 }
910 }
911
912 for (category, quantity, outcome) in &self.in_flight {
913 match sums.get_mut(category) {
914 Some(c) if *c >= *quantity => *c -= *quantity,
915 Some(c) => emit!(
916 category,
917 "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there were only {c} items in the category originally"
918 ),
919 None => emit!(
920 category,
921 "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there never was an item in this category"
922 ),
923 }
924 }
925
926 for (category, quantity) in &new {
927 match sums.get_mut(category) {
928 Some(c) if *c >= *quantity => *c -= *quantity,
929 Some(c) => emit!(
930 category,
931 "New item has {quantity} items in category '{category}', but original (after emitted outcomes) only has {c} left"
932 ),
933 None => emit!(
934 category,
935 "New item has {quantity} items in category '{category}', but after emitted outcomes there are none left"
936 ),
937 }
938 }
939
940 for (category, quantity) in sums {
941 if quantity > 0 {
942 emit!(
943 category,
944 "Missing outcomes or mismatched quantity in category '{category}', off by {quantity}"
945 );
946 }
947 }
948 }
949
950 #[cfg(not(debug_assertions))]
951 fn assert_quantities(&self, _: Quantities, _: Quantities) {}
952}
953
954impl<'a> Drop for RecordKeeper<'a> {
955 fn drop(&mut self) {
956 for (category, quantity) in std::mem::take(&mut self.on_drop) {
957 self.meta.track_outcome(
958 Outcome::Invalid(DiscardReason::Internal),
959 category,
960 quantity,
961 );
962 }
963 }
964}
965
966impl RecordKeeper<'_> {
967 pub fn or_default<T, E, Q>(&mut self, r: Result<T, E>, q: Q) -> T
972 where
973 T: Default,
974 E: OutcomeError,
975 Q: Counted,
976 {
977 match r {
978 Ok(result) => result,
979 Err(err) => {
980 self.reject_err(err, q);
981 T::default()
982 }
983 }
984 }
985
986 pub fn reject_err<E, Q>(&mut self, err: E, q: Q) -> E::Error
991 where
992 E: OutcomeError,
993 Q: Counted,
994 {
995 let (outcome, err) = err.consume();
996 for (category, quantity) in q.quantities() {
997 self.in_flight.push((category, quantity, outcome.clone()))
998 }
999 err
1000 }
1001
1002 #[track_caller]
1006 pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
1007 where
1008 E: std::error::Error + 'static,
1009 Q: Counted,
1010 {
1011 relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
1012 debug_assert!(false, "internal error: {error}");
1013 self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
1014 }
1015}
1016
1017pub struct Split<I, S>
1019where
1020 I: Iterator<Item = S>,
1021 S: Counted,
1022{
1023 #[cfg(debug_assertions)]
1024 quantities: Quantities,
1025 items: I,
1026 meta: Arc<Meta>,
1027 exhausted: bool,
1028}
1029
1030impl<I, S> Split<I, S>
1031where
1032 I: Iterator<Item = S>,
1033 S: Counted,
1034{
1035 #[cfg(debug_assertions)]
1038 fn subtract(&mut self, q: Quantities) {
1039 for (category, quantities) in q {
1040 let Some(orig_quantities) = self
1041 .quantities
1042 .iter_mut()
1043 .find_map(|(c, q)| (*c == category).then_some(q))
1044 else {
1045 debug_assert!(
1046 false,
1047 "mismatching quantities, item split into category {category}, \
1048 which originally was not present"
1049 );
1050 continue;
1051 };
1052
1053 if *orig_quantities >= quantities {
1054 *orig_quantities -= quantities;
1055 } else {
1056 debug_assert!(
1057 false,
1058 "in total more items produced in category {category} than originally available"
1059 );
1060 }
1061 }
1062 }
1063}
1064
1065impl<I, S> Iterator for Split<I, S>
1066where
1067 I: Iterator<Item = S>,
1068 S: Counted,
1069{
1070 type Item = Managed<S>;
1071
1072 fn next(&mut self) -> Option<Self::Item> {
1073 let next = match self.items.next() {
1074 Some(next) => next,
1075 None => {
1076 self.exhausted = true;
1077 return None;
1078 }
1079 };
1080
1081 #[cfg(debug_assertions)]
1082 self.subtract(next.quantities());
1083
1084 Some(Managed::from_parts(next, Arc::clone(&self.meta)))
1085 }
1086}
1087
1088impl<I, S> Drop for Split<I, S>
1089where
1090 I: Iterator<Item = S>,
1091 S: Counted,
1092{
1093 fn drop(&mut self) {
1094 #[cfg(debug_assertions)]
1096 if self.exhausted {
1097 for (category, quantities) in &self.quantities {
1098 debug_assert!(
1099 *quantities == 0,
1100 "items split, but still {quantities} remaining in category {category}"
1101 );
1102 }
1103 }
1104
1105 if self.exhausted {
1106 return;
1107 }
1108
1109 for item in &mut self.items {
1115 for (category, quantity) in item.quantities() {
1116 self.meta.track_outcome(
1117 Outcome::Invalid(DiscardReason::Internal),
1118 category,
1119 quantity,
1120 );
1121 }
1122 }
1123 }
1124}
1125
1126impl<I, S> FusedIterator for Split<I, S>
1127where
1128 I: Iterator<Item = S> + FusedIterator,
1129 S: Counted,
1130{
1131}
1132
1133#[cfg(test)]
1134mod tests {
1135 use super::*;
1136
1137 struct CountedVec(Vec<u32>);
1138
1139 impl Counted for CountedVec {
1140 fn quantities(&self) -> Quantities {
1141 smallvec::smallvec![(DataCategory::Error, self.0.len())]
1142 }
1143 }
1144
1145 struct CountedValue(u32);
1146
1147 impl Counted for CountedValue {
1148 fn quantities(&self) -> Quantities {
1149 smallvec::smallvec![(DataCategory::Error, 1)]
1150 }
1151 }
1152
1153 #[test]
1154 fn test_reject_err_no_outcome() {
1155 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1156 let (managed, mut handle) = Managed::for_test(value).build();
1157
1158 let _ = managed.reject_err((None, ()));
1160 handle.assert_no_outcomes();
1161
1162 drop(managed);
1164 handle.assert_no_outcomes();
1165 }
1166
1167 #[test]
1168 fn test_merge() {
1169 let (mut a, mut handle_a) = Managed::for_test(CountedVec(vec![1, 2])).build();
1170 let (b, mut handle_b) = Managed::for_test(CountedVec(vec![3, 4])).build();
1171
1172 a.merge_with(b, |a, b, _| a.0.extend(b.0));
1173
1174 assert_eq!(a.0, vec![1, 2, 3, 4]);
1175 drop(a);
1176 handle_a.assert_internal_outcome(DataCategory::Error, 4);
1177 handle_b.assert_no_outcomes();
1178 }
1179
1180 #[test]
1181 fn test_merge_mismatched_records_should_panic() {
1182 let (mut a, mut handle_a) = Managed::for_test(CountedVec(vec![1, 2])).build();
1183 let (b, _handle_b) = Managed::for_test(CountedVec(vec![3, 4])).build();
1184
1185 let r = std::panic::catch_unwind(move || {
1186 a.merge_with(b, |_a, _b, _| {});
1187 });
1188
1189 assert!(
1190 r.is_err(),
1191 "expected merge to panic because of mismatched outcome records"
1192 );
1193 handle_a.assert_internal_outcome(DataCategory::Error, 2);
1194 }
1195
1196 #[test]
1197 fn test_split_fully_consumed() {
1198 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1199 let (managed, mut handle) = Managed::for_test(value).build();
1200
1201 let s = managed
1202 .split(|value| value.0.into_iter().map(CountedValue))
1203 .collect::<Vec<_>>();
1205
1206 handle.assert_no_outcomes();
1207
1208 for (i, s) in s.into_iter().enumerate() {
1209 assert_eq!(s.as_ref().0, i as u32);
1210 let outcome = Outcome::Invalid(DiscardReason::Cors);
1211 let _ = s.reject_err((outcome.clone(), ()));
1212 handle.assert_outcome(&outcome, DataCategory::Error, 1);
1213 }
1214 }
1215
1216 #[test]
1217 fn test_split_partially_consumed_emits_remaining() {
1218 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1219 let (managed, mut handle) = Managed::for_test(value).build();
1220
1221 let mut s = managed.split(|value| value.0.into_iter().map(CountedValue));
1222 handle.assert_no_outcomes();
1223
1224 drop(s.next());
1225 handle.assert_internal_outcome(DataCategory::Error, 1);
1226 drop(s.next());
1227 handle.assert_internal_outcome(DataCategory::Error, 1);
1228 drop(s.next());
1229 handle.assert_internal_outcome(DataCategory::Error, 1);
1230 handle.assert_no_outcomes();
1231
1232 drop(s);
1233
1234 handle.assert_internal_outcome(DataCategory::Error, 1);
1235 handle.assert_internal_outcome(DataCategory::Error, 1);
1236 handle.assert_internal_outcome(DataCategory::Error, 1);
1237 }
1238
1239 #[test]
1240 fn test_split_changing_quantities_should_panic() {
1241 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1242 let (managed, mut handle) = Managed::for_test(value).build();
1243
1244 let mut s = managed.split(|_| std::iter::once(CountedValue(0)));
1245
1246 s.next().unwrap().accept(|_| {});
1247 handle.assert_no_outcomes();
1248
1249 assert!(s.next().is_none());
1250
1251 let r = std::panic::catch_unwind(move || {
1252 drop(s);
1253 });
1254
1255 assert!(
1256 r.is_err(),
1257 "expected split to panic because of mismatched (not enough) outcomes"
1258 );
1259 }
1260
1261 #[test]
1262 fn test_split_more_outcomes_than_before_should_panic() {
1263 let value = CountedVec(vec![0]);
1264 let (managed, mut handle) = Managed::for_test(value).build();
1265
1266 let mut s = managed.split(|_| vec![CountedValue(0), CountedValue(2)].into_iter());
1267
1268 s.next().unwrap().accept(|_| {});
1269 handle.assert_no_outcomes();
1270
1271 let r = std::panic::catch_unwind(move || {
1272 s.next();
1273 });
1274
1275 assert!(
1276 r.is_err(),
1277 "expected split to panic because of mismatched (too many) outcomes"
1278 );
1279 }
1280
1281 #[test]
1282 fn test_split_changing_categories_should_panic() {
1283 struct Special;
1284 impl Counted for Special {
1285 fn quantities(&self) -> Quantities {
1286 smallvec::smallvec![(DataCategory::Error, 1), (DataCategory::Transaction, 1)]
1287 }
1288 }
1289
1290 let value = CountedVec(vec![0]);
1291 let (managed, _handle) = Managed::for_test(value).build();
1292
1293 let mut s = managed.split(|value| value.0.into_iter().map(|_| Special));
1294
1295 let r = std::panic::catch_unwind(move || {
1296 let _ = s.next();
1297 });
1298
1299 assert!(
1300 r.is_err(),
1301 "expected split to panic because of mismatched outcome categories"
1302 );
1303 }
1304
1305 #[test]
1306 fn test_split_assert_fused() {
1307 fn only_fused<T: FusedIterator>(_: T) {}
1308
1309 let (managed, mut handle) = Managed::for_test(CountedVec(vec![0])).build();
1310 only_fused(managed.split(|value| value.0.into_iter().map(CountedValue)));
1311 handle.assert_internal_outcome(DataCategory::Error, 1);
1312 }
1313}