1#[cfg(debug_assertions)]
2use std::collections::BTreeMap;
3use std::convert::Infallible;
4use std::fmt;
5use std::iter::FusedIterator;
6use std::mem::ManuallyDrop;
7use std::net::IpAddr;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use chrono::{DateTime, Utc};
12use itertools::Either;
13use relay_event_schema::protocol::EventId;
14use relay_quotas::{DataCategory, Scoping};
15use relay_system::Addr;
16use smallvec::SmallVec;
17
18use crate::Envelope;
19use crate::managed::{Counted, ManagedEnvelope, Quantities};
20use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
21use crate::services::processor::ProcessingError;
22
23#[cfg(debug_assertions)]
24mod debug;
25#[cfg(test)]
26mod test;
27
28#[cfg(test)]
29pub use self::test::*;
30
31pub trait OutcomeError {
33 type Error;
35
36 fn consume(self) -> (Option<Outcome>, Self::Error);
40}
41
42impl OutcomeError for Outcome {
43 type Error = ();
44
45 fn consume(self) -> (Option<Outcome>, Self::Error) {
46 (self, ()).consume()
47 }
48}
49
50impl OutcomeError for Option<Outcome> {
51 type Error = ();
52
53 fn consume(self) -> (Option<Outcome>, Self::Error) {
54 (self, ()).consume()
55 }
56}
57
58impl<E> OutcomeError for (Outcome, E) {
59 type Error = E;
60
61 fn consume(self) -> (Option<Outcome>, Self::Error) {
62 (Some(self.0), self.1)
63 }
64}
65
66impl<E> OutcomeError for (Option<Outcome>, E) {
67 type Error = E;
68
69 fn consume(self) -> (Option<Outcome>, Self::Error) {
70 self
71 }
72}
73
74impl OutcomeError for ProcessingError {
75 type Error = Self;
76
77 fn consume(self) -> (Option<Outcome>, Self::Error) {
78 (self.to_outcome(), self)
79 }
80}
81
82impl OutcomeError for Infallible {
83 type Error = Self;
84
85 fn consume(self) -> (Option<Outcome>, Self::Error) {
86 match self {}
87 }
88}
89
90#[derive(Debug, Clone, Copy)]
95#[must_use = "a rejection must be propagated"]
96pub struct Rejected<T>(T);
97
98impl<T> Rejected<T> {
99 pub fn into_inner(self) -> T {
101 self.0
102 }
103
104 pub fn map<F, S>(self, f: F) -> Rejected<S>
106 where
107 F: FnOnce(T) -> S,
108 {
109 Rejected(f(self.0))
110 }
111}
112
113impl<T> std::error::Error for Rejected<T>
114where
115 T: std::error::Error,
116{
117 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
118 self.0.source()
119 }
120}
121
122impl<T> std::fmt::Display for Rejected<T>
123where
124 T: std::fmt::Display,
125{
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 self.0.fmt(f)
128 }
129}
130
131impl<T> axum::response::IntoResponse for Rejected<T>
132where
133 T: axum::response::IntoResponse,
134{
135 fn into_response(self) -> axum::response::Response {
136 self.0.into_response()
137 }
138}
139
140pub struct Managed<T: Counted> {
142 value: T,
143 meta: Arc<Meta>,
144 done: AtomicBool,
145}
146
147impl Managed<Box<Envelope>> {
148 pub fn from_envelope(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
150 let meta = Arc::new(Meta {
151 outcome_aggregator,
152 received_at: envelope.received_at(),
153 scoping: envelope.meta().get_partial_scoping().into_scoping(),
154 event_id: envelope.event_id(),
155 remote_addr: envelope.meta().remote_addr(),
156 });
157
158 Self::from_parts(envelope, meta)
159 }
160}
161
162pub trait RetainMut<I> {
164 fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool);
166}
167
168impl<I> RetainMut<I> for Vec<I> {
169 fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool) {
170 Vec::retain_mut(self, f)
171 }
172}
173impl<I, const N: usize> RetainMut<I> for SmallVec<[I; N]> {
174 fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool) {
175 SmallVec::retain_mut(self, f)
176 }
177}
178
179impl<T: Counted> Managed<T> {
180 pub fn with_meta_from(envelope: &ManagedEnvelope, value: T) -> Self {
185 Self::from_parts(
186 value,
187 Arc::new(Meta {
188 outcome_aggregator: envelope.outcome_aggregator().clone(),
189 received_at: envelope.received_at(),
190 scoping: envelope.scoping(),
191 event_id: envelope.envelope().event_id(),
192 remote_addr: envelope.meta().remote_addr(),
193 }),
194 )
195 }
196
197 pub fn wrap<S>(&self, other: S) -> Managed<S>
199 where
200 S: Counted,
201 {
202 Managed::from_parts(other, Arc::clone(&self.meta))
203 }
204
205 pub fn boxed(self) -> Managed<Box<T>> {
207 self.map(|value, _| Box::new(value))
208 }
209
210 pub fn received_at(&self) -> DateTime<Utc> {
212 self.meta.received_at
213 }
214
215 pub fn scoping(&self) -> Scoping {
217 self.meta.scoping
218 }
219
220 pub fn scope(&mut self, scoping: Scoping) {
230 let meta = Arc::make_mut(&mut self.meta);
231 meta.scoping = scoping;
232 }
233
234 pub fn merge_with<S, F>(&mut self, other: Managed<S>, f: F)
239 where
240 S: Counted,
241 F: FnOnce(&mut T, S, &mut RecordKeeper),
242 {
243 self.modify(|s, records| {
244 for (category, quantity) in other.quantities() {
245 records.modify_by(category, quantity as isize);
246 }
247 other.accept(|o| f(s, o, records));
248 })
249 }
250
251 pub fn split_once<F, S, U>(self, f: F) -> (Managed<S>, Managed<U>)
257 where
258 F: FnOnce(T, &mut RecordKeeper) -> (S, U),
259 S: Counted,
260 U: Counted,
261 {
262 debug_assert!(!self.is_done());
263
264 let (value, meta) = self.destructure();
265 let quantities = value.quantities();
266
267 let mut records = RecordKeeper::new(&meta, quantities);
268
269 let (a, b) = f(value, &mut records);
270
271 let mut quantities = a.quantities();
272 quantities.extend(b.quantities());
273 records.success(quantities);
274
275 (
276 Managed::from_parts(a, Arc::clone(&meta)),
277 Managed::from_parts(b, meta),
278 )
279 }
280
281 pub fn split<F, I, S>(self, f: F) -> Split<I::IntoIter, I::Item>
286 where
287 F: FnOnce(T) -> I,
288 I: IntoIterator<Item = S>,
289 S: Counted,
290 {
291 self.split_with_context(|value| (f(value), ())).0
292 }
293
294 pub fn split_with_context<F, I, S, C>(self, f: F) -> (Split<I::IntoIter, I::Item>, C)
299 where
300 F: FnOnce(T) -> (I, C),
301 I: IntoIterator<Item = S>,
302 S: Counted,
303 {
304 debug_assert!(!self.is_done());
305
306 let (value, meta) = self.destructure();
307 #[cfg(debug_assertions)]
308 let quantities = value.quantities();
309
310 let (items, context) = f(value);
311
312 (
313 Split {
314 #[cfg(debug_assertions)]
315 quantities,
316 items: items.into_iter(),
317 meta,
318 exhausted: false,
319 },
320 context,
321 )
322 }
323
324 pub fn retain<S, I, U, E, V>(&mut self, select: S, mut retain: U)
360 where
361 S: FnOnce(&mut T) -> &mut V,
362 I: Counted,
363 U: FnMut(&mut I, &mut RecordKeeper<'_>) -> Result<(), E>,
364 E: OutcomeError,
365 V: RetainMut<I>,
366 {
367 self.retain_with_context(
368 |inner| (select(inner), &()),
369 |item, _, records| retain(item, records),
370 );
371 }
372
373 pub fn retain_with_context<S, C, I, U, E, V>(&mut self, select: S, mut retain: U)
410 where
411 for<'a> S: FnOnce(&'a mut T) -> (&'a mut V, &'a C),
416 I: Counted,
417 U: FnMut(&mut I, &C, &mut RecordKeeper<'_>) -> Result<(), E>,
418 E: OutcomeError,
419 V: RetainMut<I>,
420 {
421 self.modify(|inner, records| {
422 let (items, ctx) = select(inner);
423 items.retain_mut(|item| match retain(item, ctx, records) {
424 Ok(()) => true,
425 Err(err) => {
426 records.reject_err(err, &*item);
427 false
428 }
429 })
430 });
431 }
432
433 pub fn map<S, F>(self, f: F) -> Managed<S>
437 where
438 F: FnOnce(T, &mut RecordKeeper) -> S,
439 S: Counted,
440 {
441 self.try_map(move |inner, records| Ok::<_, Infallible>(f(inner, records)))
442 .unwrap_or_else(|e| match e.0 {})
443 }
444
445 pub fn try_map<S, F, E>(self, f: F) -> Result<Managed<S>, Rejected<E::Error>>
453 where
454 F: FnOnce(T, &mut RecordKeeper) -> Result<S, E>,
455 S: Counted,
456 E: OutcomeError,
457 {
458 debug_assert!(!self.is_done());
459
460 let (value, meta) = self.destructure();
461 let quantities = value.quantities();
462
463 let mut records = RecordKeeper::new(&meta, quantities);
464
465 match f(value, &mut records) {
466 Ok(value) => {
467 records.success(value.quantities());
468 Ok(Managed::from_parts(value, meta))
469 }
470 Err(err) => Err(records.failure(err)),
471 }
472 }
473
474 pub fn modify<F>(&mut self, f: F)
478 where
479 F: FnOnce(&mut T, &mut RecordKeeper),
480 {
481 self.try_modify(move |inner, records| {
482 f(inner, records);
483 Ok::<_, Infallible>(())
484 })
485 .unwrap_or_else(|e| match e {})
486 }
487
488 pub fn try_modify<F, E>(&mut self, f: F) -> Result<(), Rejected<E::Error>>
496 where
497 F: FnOnce(&mut T, &mut RecordKeeper) -> Result<(), E>,
498 E: OutcomeError,
499 {
500 debug_assert!(!self.is_done());
501
502 let quantities = self.value.quantities();
503 let mut records = RecordKeeper::new(&self.meta, quantities);
504
505 match f(&mut self.value, &mut records) {
506 Ok(()) => {
507 records.success(self.value.quantities());
508 Ok(())
509 }
510 Err(err) => {
511 let err = records.failure(err);
512 self.done.store(true, Ordering::Relaxed);
513 Err(err)
514 }
515 }
516 }
517
518 pub fn accept<F, S>(self, f: F) -> S
526 where
527 F: FnOnce(T) -> S,
528 {
529 self.try_accept(|item| Ok::<_, Infallible>(f(item)))
530 .unwrap_or_else(|err| match err.0 {})
531 }
532
533 pub fn try_accept<F, S, E>(self, f: F) -> Result<S, Rejected<E::Error>>
541 where
542 F: FnOnce(T) -> Result<S, E>,
543 E: OutcomeError,
544 {
545 debug_assert!(!self.is_done());
546
547 let (value, meta) = self.destructure();
548 let records = RecordKeeper::new(&meta, value.quantities());
549
550 match f(value) {
551 Ok(value) => {
552 records.accept();
553 Ok(value)
554 }
555 Err(err) => Err(records.failure(err)),
556 }
557 }
558
559 #[track_caller]
567 pub fn internal_error(&self, reason: &'static str) -> Rejected<()> {
568 relay_log::error!("internal error: {reason}");
569 debug_assert!(false, "internal error: {reason}");
570 self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
571 }
572
573 pub fn reject_err<E>(&self, error: E) -> Rejected<E::Error>
575 where
576 E: OutcomeError,
577 {
578 debug_assert!(!self.is_done());
579
580 let (outcome, error) = error.consume();
581 self.do_reject(outcome);
582 Rejected(error)
583 }
584
585 fn do_reject(&self, outcome: Option<Outcome>) {
586 let is_done = self.done.fetch_or(true, Ordering::Relaxed);
589
590 let Some(outcome) = outcome else {
592 return;
593 };
594
595 if !is_done {
601 for (category, quantity) in self.value.quantities() {
602 self.meta.track_outcome(outcome.clone(), category, quantity);
603 }
604 }
605 }
606
607 fn destructure(self) -> (T, Arc<Meta>) {
614 let this = ManuallyDrop::new(self);
624 let Managed { value, meta, done } = &*this;
625
626 let value = unsafe { std::ptr::read(value) };
627 let meta = unsafe { std::ptr::read(meta) };
628 let done = unsafe { std::ptr::read(done) };
629 debug_assert!(
632 !done.load(Ordering::Relaxed),
633 "a `done` managed should never be destructured"
634 );
635
636 (value, meta)
637 }
638
639 fn from_parts(value: T, meta: Arc<Meta>) -> Self {
640 Self {
641 value,
642 meta,
643 done: AtomicBool::new(false),
644 }
645 }
646
647 fn is_done(&self) -> bool {
648 self.done.load(Ordering::Relaxed)
649 }
650}
651
652impl<T: Counted> Drop for Managed<T> {
653 fn drop(&mut self) {
654 self.do_reject(Some(Outcome::Invalid(DiscardReason::Internal)));
655 }
656}
657
658impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
659 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
660 write!(f, "Managed<{}>[", std::any::type_name::<T>())?;
661 for (i, (category, quantity)) in self.value.quantities().iter().enumerate() {
662 if i > 0 {
663 write!(f, ",")?;
664 }
665 write!(f, "{category}:{quantity}")?;
666 }
667 write!(f, "](")?;
668 self.value.fmt(f)?;
669 write!(f, ")")
670 }
671}
672
673impl<T: Counted> Managed<Option<T>> {
674 pub fn transpose(self) -> Option<Managed<T>> {
676 let (o, meta) = self.destructure();
677 o.map(|t| Managed::from_parts(t, meta))
678 }
679}
680
681impl<L: Counted, R: Counted> Managed<Either<L, R>> {
682 pub fn transpose(self) -> Either<Managed<L>, Managed<R>> {
684 let (either, meta) = self.destructure();
685 match either {
686 Either::Left(value) => Either::Left(Managed::from_parts(value, meta)),
687 Either::Right(value) => Either::Right(Managed::from_parts(value, meta)),
688 }
689 }
690}
691
692impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
693 fn from(value: Managed<Box<Envelope>>) -> Self {
694 let (value, meta) = value.destructure();
695 let mut envelope = ManagedEnvelope::new(value, meta.outcome_aggregator.clone());
696 envelope.scope(meta.scoping);
697 envelope
698 }
699}
700
701impl<T: Counted> AsRef<T> for Managed<T> {
702 fn as_ref(&self) -> &T {
703 &self.value
704 }
705}
706
707impl<T: Counted> std::ops::Deref for Managed<T> {
708 type Target = T;
709
710 fn deref(&self) -> &Self::Target {
711 &self.value
712 }
713}
714
715#[derive(Debug, Clone)]
717struct Meta {
718 outcome_aggregator: Addr<TrackOutcome>,
720 received_at: DateTime<Utc>,
724 scoping: Scoping,
726 event_id: Option<EventId>,
728 remote_addr: Option<IpAddr>,
730}
731
732impl Meta {
733 pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
734 self.outcome_aggregator.send(TrackOutcome {
735 timestamp: self.received_at,
736 scoping: self.scoping,
737 outcome,
738 event_id: self.event_id,
739 remote_addr: self.remote_addr,
740 category,
741 quantity: quantity.try_into().unwrap_or(u32::MAX),
742 });
743 }
744}
745
746pub struct RecordKeeper<'a> {
749 meta: &'a Meta,
750 on_drop: Quantities,
751 #[cfg(debug_assertions)]
752 lenient: SmallVec<[DataCategory; 1]>,
753 #[cfg(debug_assertions)]
754 modifications: BTreeMap<DataCategory, isize>,
755 in_flight: SmallVec<[(DataCategory, usize, Option<Outcome>); 2]>,
756}
757
758impl<'a> RecordKeeper<'a> {
759 fn new(meta: &'a Meta, quantities: Quantities) -> Self {
760 Self {
761 meta,
762 on_drop: quantities,
763 #[cfg(debug_assertions)]
764 lenient: Default::default(),
765 #[cfg(debug_assertions)]
766 modifications: Default::default(),
767 in_flight: Default::default(),
768 }
769 }
770
771 pub fn lenient(&mut self, category: DataCategory) {
778 let _category = category;
779 #[cfg(debug_assertions)]
780 self.lenient.push(_category);
781 }
782
783 pub fn modify_by(&mut self, category: DataCategory, offset: isize) {
791 let _category = category;
792 let _offset = offset;
793 #[cfg(debug_assertions)]
794 {
795 *self.modifications.entry(_category).or_default() += offset;
796 }
797 }
798
799 fn failure<E>(mut self, error: E) -> Rejected<E::Error>
803 where
804 E: OutcomeError,
805 {
806 let (outcome, error) = error.consume();
807
808 if let Some(outcome) = outcome {
809 for (category, quantity) in std::mem::take(&mut self.on_drop) {
810 self.meta.track_outcome(outcome.clone(), category, quantity);
811 }
812 }
813
814 Rejected(error)
815 }
816
817 fn accept(mut self) {
825 debug_assert!(
826 self.in_flight.is_empty(),
827 "records accepted, but intermediate outcomes tracked"
828 );
829 self.on_drop.clear();
830 }
831
832 fn success(mut self, new: Quantities) {
838 let original = std::mem::take(&mut self.on_drop);
839 self.assert_quantities(original, new);
840
841 self.on_drop.clear();
842 for (category, quantity, outcome) in std::mem::take(&mut self.in_flight) {
843 if let Some(outcome) = outcome {
844 self.meta.track_outcome(outcome, category, quantity);
845 }
846 }
847 }
848
849 #[cfg(debug_assertions)]
854 fn assert_quantities(&self, original: Quantities, new: Quantities) {
855 macro_rules! emit {
856 ($category:expr, $($tt:tt)*) => {{
857 match self.lenient.contains(&$category) {
858 true => relay_log::debug!($($tt)*),
861 false => {
862 relay_log::error!("Original: {original:?}");
863 relay_log::error!("New: {new:?}");
864 relay_log::error!("Modifications: {:?}", self.modifications);
865 relay_log::error!("In Flight: {:?}", self.in_flight);
866 panic!($($tt)*)
867 }
868 }
869 }};
870 }
871
872 let mut sums = debug::Quantities::from(&original).0;
873 for (category, offset) in &self.modifications {
874 let v = sums.entry(*category).or_default();
875 match v.checked_add_signed(*offset) {
876 Some(result) => *v = result,
877 None => emit!(
878 category,
879 "Attempted to modify original quantity {v} into the negative ({offset})"
880 ),
881 }
882 }
883
884 for (category, quantity, outcome) in &self.in_flight {
885 match sums.get_mut(category) {
886 Some(c) if *c >= *quantity => *c -= *quantity,
887 Some(c) => emit!(
888 category,
889 "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there were only {c} items in the category originally"
890 ),
891 None => emit!(
892 category,
893 "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there never was an item in this category"
894 ),
895 }
896 }
897
898 for (category, quantity) in &new {
899 match sums.get_mut(category) {
900 Some(c) if *c >= *quantity => *c -= *quantity,
901 Some(c) => emit!(
902 category,
903 "New item has {quantity} items in category '{category}', but original (after emitted outcomes) only has {c} left"
904 ),
905 None => emit!(
906 category,
907 "New item has {quantity} items in category '{category}', but after emitted outcomes there are none left"
908 ),
909 }
910 }
911
912 for (category, quantity) in sums {
913 if quantity > 0 {
914 emit!(
915 category,
916 "Missing outcomes or mismatched quantity in category '{category}', off by {quantity}"
917 );
918 }
919 }
920 }
921
922 #[cfg(not(debug_assertions))]
923 fn assert_quantities(&self, _: Quantities, _: Quantities) {}
924}
925
926impl<'a> Drop for RecordKeeper<'a> {
927 fn drop(&mut self) {
928 for (category, quantity) in std::mem::take(&mut self.on_drop) {
929 self.meta.track_outcome(
930 Outcome::Invalid(DiscardReason::Internal),
931 category,
932 quantity,
933 );
934 }
935 }
936}
937
938impl RecordKeeper<'_> {
939 pub fn or_default<T, E, Q>(&mut self, r: Result<T, E>, q: Q) -> T
944 where
945 T: Default,
946 E: OutcomeError,
947 Q: Counted,
948 {
949 match r {
950 Ok(result) => result,
951 Err(err) => {
952 self.reject_err(err, q);
953 T::default()
954 }
955 }
956 }
957
958 pub fn reject_err<E, Q>(&mut self, err: E, q: Q) -> E::Error
963 where
964 E: OutcomeError,
965 Q: Counted,
966 {
967 let (outcome, err) = err.consume();
968 for (category, quantity) in q.quantities() {
969 self.in_flight.push((category, quantity, outcome.clone()))
970 }
971 err
972 }
973
974 #[track_caller]
978 pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
979 where
980 E: std::error::Error + 'static,
981 Q: Counted,
982 {
983 relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
984 debug_assert!(false, "internal error: {error}");
985 self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
986 }
987}
988
989pub struct Split<I, S>
991where
992 I: Iterator<Item = S>,
993 S: Counted,
994{
995 #[cfg(debug_assertions)]
996 quantities: Quantities,
997 items: I,
998 meta: Arc<Meta>,
999 exhausted: bool,
1000}
1001
1002impl<I, S> Split<I, S>
1003where
1004 I: Iterator<Item = S>,
1005 S: Counted,
1006{
1007 #[cfg(debug_assertions)]
1010 fn subtract(&mut self, q: Quantities) {
1011 for (category, quantities) in q {
1012 let Some(orig_quantities) = self
1013 .quantities
1014 .iter_mut()
1015 .find_map(|(c, q)| (*c == category).then_some(q))
1016 else {
1017 debug_assert!(
1018 false,
1019 "mismatching quantities, item split into category {category}, \
1020 which originally was not present"
1021 );
1022 continue;
1023 };
1024
1025 if *orig_quantities >= quantities {
1026 *orig_quantities -= quantities;
1027 } else {
1028 debug_assert!(
1029 false,
1030 "in total more items produced in category {category} than originally available"
1031 );
1032 }
1033 }
1034 }
1035}
1036
1037impl<I, S> Iterator for Split<I, S>
1038where
1039 I: Iterator<Item = S>,
1040 S: Counted,
1041{
1042 type Item = Managed<S>;
1043
1044 fn next(&mut self) -> Option<Self::Item> {
1045 let next = match self.items.next() {
1046 Some(next) => next,
1047 None => {
1048 self.exhausted = true;
1049 return None;
1050 }
1051 };
1052
1053 #[cfg(debug_assertions)]
1054 self.subtract(next.quantities());
1055
1056 Some(Managed::from_parts(next, Arc::clone(&self.meta)))
1057 }
1058}
1059
1060impl<I, S> Drop for Split<I, S>
1061where
1062 I: Iterator<Item = S>,
1063 S: Counted,
1064{
1065 fn drop(&mut self) {
1066 #[cfg(debug_assertions)]
1068 if self.exhausted {
1069 for (category, quantities) in &self.quantities {
1070 debug_assert!(
1071 *quantities == 0,
1072 "items split, but still {quantities} remaining in category {category}"
1073 );
1074 }
1075 }
1076
1077 if self.exhausted {
1078 return;
1079 }
1080
1081 for item in &mut self.items {
1087 for (category, quantity) in item.quantities() {
1088 self.meta.track_outcome(
1089 Outcome::Invalid(DiscardReason::Internal),
1090 category,
1091 quantity,
1092 );
1093 }
1094 }
1095 }
1096}
1097
1098impl<I, S> FusedIterator for Split<I, S>
1099where
1100 I: Iterator<Item = S> + FusedIterator,
1101 S: Counted,
1102{
1103}
1104
1105#[cfg(test)]
1106mod tests {
1107
1108 use super::*;
1109
1110 struct CountedVec(Vec<u32>);
1111
1112 impl Counted for CountedVec {
1113 fn quantities(&self) -> Quantities {
1114 smallvec::smallvec![(DataCategory::Error, self.0.len())]
1115 }
1116 }
1117
1118 struct CountedValue(u32);
1119
1120 impl Counted for CountedValue {
1121 fn quantities(&self) -> Quantities {
1122 smallvec::smallvec![(DataCategory::Error, 1)]
1123 }
1124 }
1125
1126 #[test]
1127 fn test_reject_err_no_outcome() {
1128 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1129 let (managed, mut handle) = Managed::for_test(value).build();
1130
1131 let _ = managed.reject_err((None, ()));
1133 handle.assert_no_outcomes();
1134
1135 drop(managed);
1137 handle.assert_no_outcomes();
1138 }
1139
1140 #[test]
1141 fn test_merge() {
1142 let (mut a, mut handle_a) = Managed::for_test(CountedVec(vec![1, 2])).build();
1143 let (b, mut handle_b) = Managed::for_test(CountedVec(vec![3, 4])).build();
1144
1145 a.merge_with(b, |a, b, _| a.0.extend(b.0));
1146
1147 assert_eq!(a.0, vec![1, 2, 3, 4]);
1148 drop(a);
1149 handle_a.assert_internal_outcome(DataCategory::Error, 4);
1150 handle_b.assert_no_outcomes();
1151 }
1152
1153 #[test]
1154 fn test_merge_mismatched_records_should_panic() {
1155 let (mut a, mut handle_a) = Managed::for_test(CountedVec(vec![1, 2])).build();
1156 let (b, _handle_b) = Managed::for_test(CountedVec(vec![3, 4])).build();
1157
1158 let r = std::panic::catch_unwind(move || {
1159 a.merge_with(b, |_a, _b, _| {});
1160 });
1161
1162 assert!(
1163 r.is_err(),
1164 "expected merge to panic because of mismatched outcome records"
1165 );
1166 handle_a.assert_internal_outcome(DataCategory::Error, 2);
1167 }
1168
1169 #[test]
1170 fn test_split_fully_consumed() {
1171 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1172 let (managed, mut handle) = Managed::for_test(value).build();
1173
1174 let s = managed
1175 .split(|value| value.0.into_iter().map(CountedValue))
1176 .collect::<Vec<_>>();
1178
1179 handle.assert_no_outcomes();
1180
1181 for (i, s) in s.into_iter().enumerate() {
1182 assert_eq!(s.as_ref().0, i as u32);
1183 let outcome = Outcome::Invalid(DiscardReason::Cors);
1184 let _ = s.reject_err((outcome.clone(), ()));
1185 handle.assert_outcome(&outcome, DataCategory::Error, 1);
1186 }
1187 }
1188
1189 #[test]
1190 fn test_split_partially_consumed_emits_remaining() {
1191 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1192 let (managed, mut handle) = Managed::for_test(value).build();
1193
1194 let mut s = managed.split(|value| value.0.into_iter().map(CountedValue));
1195 handle.assert_no_outcomes();
1196
1197 drop(s.next());
1198 handle.assert_internal_outcome(DataCategory::Error, 1);
1199 drop(s.next());
1200 handle.assert_internal_outcome(DataCategory::Error, 1);
1201 drop(s.next());
1202 handle.assert_internal_outcome(DataCategory::Error, 1);
1203 handle.assert_no_outcomes();
1204
1205 drop(s);
1206
1207 handle.assert_internal_outcome(DataCategory::Error, 1);
1208 handle.assert_internal_outcome(DataCategory::Error, 1);
1209 handle.assert_internal_outcome(DataCategory::Error, 1);
1210 }
1211
1212 #[test]
1213 fn test_split_changing_quantities_should_panic() {
1214 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1215 let (managed, mut handle) = Managed::for_test(value).build();
1216
1217 let mut s = managed.split(|_| std::iter::once(CountedValue(0)));
1218
1219 s.next().unwrap().accept(|_| {});
1220 handle.assert_no_outcomes();
1221
1222 assert!(s.next().is_none());
1223
1224 let r = std::panic::catch_unwind(move || {
1225 drop(s);
1226 });
1227
1228 assert!(
1229 r.is_err(),
1230 "expected split to panic because of mismatched (not enough) outcomes"
1231 );
1232 }
1233
1234 #[test]
1235 fn test_split_more_outcomes_than_before_should_panic() {
1236 let value = CountedVec(vec![0]);
1237 let (managed, mut handle) = Managed::for_test(value).build();
1238
1239 let mut s = managed.split(|_| vec![CountedValue(0), CountedValue(2)].into_iter());
1240
1241 s.next().unwrap().accept(|_| {});
1242 handle.assert_no_outcomes();
1243
1244 let r = std::panic::catch_unwind(move || {
1245 s.next();
1246 });
1247
1248 assert!(
1249 r.is_err(),
1250 "expected split to panic because of mismatched (too many) outcomes"
1251 );
1252 }
1253
1254 #[test]
1255 fn test_split_changing_categories_should_panic() {
1256 struct Special;
1257 impl Counted for Special {
1258 fn quantities(&self) -> Quantities {
1259 smallvec::smallvec![(DataCategory::Error, 1), (DataCategory::Transaction, 1)]
1260 }
1261 }
1262
1263 let value = CountedVec(vec![0]);
1264 let (managed, _handle) = Managed::for_test(value).build();
1265
1266 let mut s = managed.split(|value| value.0.into_iter().map(|_| Special));
1267
1268 let r = std::panic::catch_unwind(move || {
1269 let _ = s.next();
1270 });
1271
1272 assert!(
1273 r.is_err(),
1274 "expected split to panic because of mismatched outcome categories"
1275 );
1276 }
1277
1278 #[test]
1279 fn test_split_assert_fused() {
1280 fn only_fused<T: FusedIterator>(_: T) {}
1281
1282 let (managed, mut handle) = Managed::for_test(CountedVec(vec![0])).build();
1283 only_fused(managed.split(|value| value.0.into_iter().map(CountedValue)));
1284 handle.assert_internal_outcome(DataCategory::Error, 1);
1285 }
1286}