1#[cfg(debug_assertions)]
2use std::collections::BTreeMap;
3use std::convert::Infallible;
4use std::fmt;
5use std::iter::FusedIterator;
6use std::mem::ManuallyDrop;
7use std::net::IpAddr;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use chrono::{DateTime, Utc};
12use itertools::Either;
13use relay_event_schema::protocol::EventId;
14use relay_quotas::{DataCategory, Scoping};
15use relay_system::Addr;
16use smallvec::SmallVec;
17
18use crate::Envelope;
19use crate::managed::{Counted, ManagedEnvelope, Quantities};
20use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
21use crate::services::processor::ProcessingError;
22
23#[cfg(debug_assertions)]
24mod debug;
25#[cfg(test)]
26mod test;
27
28#[cfg(test)]
29pub use self::test::*;
30
31pub trait OutcomeError {
33 type Error;
35
36 fn consume(self) -> (Option<Outcome>, Self::Error);
40}
41
42impl OutcomeError for Outcome {
43 type Error = ();
44
45 fn consume(self) -> (Option<Outcome>, Self::Error) {
46 (self, ()).consume()
47 }
48}
49
50impl OutcomeError for Option<Outcome> {
51 type Error = ();
52
53 fn consume(self) -> (Option<Outcome>, Self::Error) {
54 (self, ()).consume()
55 }
56}
57
58impl<E> OutcomeError for (Outcome, E) {
59 type Error = E;
60
61 fn consume(self) -> (Option<Outcome>, Self::Error) {
62 (Some(self.0), self.1)
63 }
64}
65
66impl<E> OutcomeError for (Option<Outcome>, E) {
67 type Error = E;
68
69 fn consume(self) -> (Option<Outcome>, Self::Error) {
70 self
71 }
72}
73
74impl OutcomeError for ProcessingError {
75 type Error = Self;
76
77 fn consume(self) -> (Option<Outcome>, Self::Error) {
78 (self.to_outcome(), self)
79 }
80}
81
82impl OutcomeError for Infallible {
83 type Error = Self;
84
85 fn consume(self) -> (Option<Outcome>, Self::Error) {
86 match self {}
87 }
88}
89
90#[derive(Debug, Clone, Copy)]
95#[must_use = "a rejection must be propagated"]
96pub struct Rejected<T>(T);
97
98impl<T> Rejected<T> {
99 pub fn into_inner(self) -> T {
101 self.0
102 }
103
104 pub fn map<F, S>(self, f: F) -> Rejected<S>
106 where
107 F: FnOnce(T) -> S,
108 {
109 Rejected(f(self.0))
110 }
111}
112
113impl<T> std::error::Error for Rejected<T>
114where
115 T: std::error::Error,
116{
117 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
118 self.0.source()
119 }
120}
121
122impl<T> std::fmt::Display for Rejected<T>
123where
124 T: std::fmt::Display,
125{
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 self.0.fmt(f)
128 }
129}
130
131impl<T> axum::response::IntoResponse for Rejected<T>
132where
133 T: axum::response::IntoResponse,
134{
135 fn into_response(self) -> axum::response::Response {
136 self.0.into_response()
137 }
138}
139
140pub struct Managed<T: Counted> {
142 value: T,
143 meta: Arc<Meta>,
144 done: AtomicBool,
145}
146
147impl Managed<Box<Envelope>> {
148 pub fn from_envelope(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
150 let meta = Arc::new(Meta {
151 outcome_aggregator,
152 received_at: envelope.received_at(),
153 scoping: envelope.meta().get_partial_scoping().into_scoping(),
154 event_id: envelope.event_id(),
155 remote_addr: envelope.meta().remote_addr(),
156 });
157
158 Self::from_parts(envelope, meta)
159 }
160}
161
162impl<T: Counted> Managed<T> {
163 pub fn with_meta_from(envelope: &ManagedEnvelope, value: T) -> Self {
168 Self::from_parts(
169 value,
170 Arc::new(Meta {
171 outcome_aggregator: envelope.outcome_aggregator().clone(),
172 received_at: envelope.received_at(),
173 scoping: envelope.scoping(),
174 event_id: envelope.envelope().event_id(),
175 remote_addr: envelope.meta().remote_addr(),
176 }),
177 )
178 }
179
180 pub fn wrap<S>(&self, other: S) -> Managed<S>
182 where
183 S: Counted,
184 {
185 Managed::from_parts(other, Arc::clone(&self.meta))
186 }
187
188 pub fn boxed(self) -> Managed<Box<T>> {
190 self.map(|value, _| Box::new(value))
191 }
192
193 pub fn received_at(&self) -> DateTime<Utc> {
195 self.meta.received_at
196 }
197
198 pub fn scoping(&self) -> Scoping {
200 self.meta.scoping
201 }
202
203 pub fn scope(&mut self, scoping: Scoping) {
213 let meta = Arc::make_mut(&mut self.meta);
214 meta.scoping = scoping;
215 }
216
217 pub fn split_once<F, S, U>(self, f: F) -> (Managed<S>, Managed<U>)
223 where
224 F: FnOnce(T, &mut RecordKeeper) -> (S, U),
225 S: Counted,
226 U: Counted,
227 {
228 debug_assert!(!self.is_done());
229
230 let (value, meta) = self.destructure();
231 let quantities = value.quantities();
232
233 let mut records = RecordKeeper::new(&meta, quantities);
234
235 let (a, b) = f(value, &mut records);
236
237 let mut quantities = a.quantities();
238 quantities.extend(b.quantities());
239 records.success(quantities);
240
241 (
242 Managed::from_parts(a, Arc::clone(&meta)),
243 Managed::from_parts(b, meta),
244 )
245 }
246
247 pub fn split<F, I, S>(self, f: F) -> Split<I::IntoIter, I::Item>
252 where
253 F: FnOnce(T) -> I,
254 I: IntoIterator<Item = S>,
255 S: Counted,
256 {
257 self.split_with_context(|value| (f(value), ())).0
258 }
259
260 pub fn split_with_context<F, I, S, C>(self, f: F) -> (Split<I::IntoIter, I::Item>, C)
265 where
266 F: FnOnce(T) -> (I, C),
267 I: IntoIterator<Item = S>,
268 S: Counted,
269 {
270 debug_assert!(!self.is_done());
271
272 let (value, meta) = self.destructure();
273 #[cfg(debug_assertions)]
274 let quantities = value.quantities();
275
276 let (items, context) = f(value);
277
278 (
279 Split {
280 #[cfg(debug_assertions)]
281 quantities,
282 items: items.into_iter(),
283 meta,
284 exhausted: false,
285 },
286 context,
287 )
288 }
289
290 pub fn retain<S, I, U, E>(&mut self, select: S, mut retain: U)
326 where
327 S: FnOnce(&mut T) -> &mut Vec<I>,
328 I: Counted,
329 U: FnMut(&mut I, &mut RecordKeeper<'_>) -> Result<(), E>,
330 E: OutcomeError,
331 {
332 self.retain_with_context(
333 |inner| (select(inner), &()),
334 |item, _, records| retain(item, records),
335 );
336 }
337
338 pub fn retain_with_context<S, C, I, U, E>(&mut self, select: S, mut retain: U)
375 where
376 for<'a> S: FnOnce(&'a mut T) -> (&'a mut Vec<I>, &'a C),
381 I: Counted,
382 U: FnMut(&mut I, &C, &mut RecordKeeper<'_>) -> Result<(), E>,
383 E: OutcomeError,
384 {
385 self.modify(|inner, records| {
386 let (items, ctx) = select(inner);
387 items.retain_mut(|item| match retain(item, ctx, records) {
388 Ok(()) => true,
389 Err(err) => {
390 records.reject_err(err, &*item);
391 false
392 }
393 })
394 });
395 }
396
397 pub fn map<S, F>(self, f: F) -> Managed<S>
401 where
402 F: FnOnce(T, &mut RecordKeeper) -> S,
403 S: Counted,
404 {
405 self.try_map(move |inner, records| Ok::<_, Infallible>(f(inner, records)))
406 .unwrap_or_else(|e| match e.0 {})
407 }
408
409 pub fn try_map<S, F, E>(self, f: F) -> Result<Managed<S>, Rejected<E::Error>>
417 where
418 F: FnOnce(T, &mut RecordKeeper) -> Result<S, E>,
419 S: Counted,
420 E: OutcomeError,
421 {
422 debug_assert!(!self.is_done());
423
424 let (value, meta) = self.destructure();
425 let quantities = value.quantities();
426
427 let mut records = RecordKeeper::new(&meta, quantities);
428
429 match f(value, &mut records) {
430 Ok(value) => {
431 records.success(value.quantities());
432 Ok(Managed::from_parts(value, meta))
433 }
434 Err(err) => Err(records.failure(err)),
435 }
436 }
437
438 pub fn modify<F>(&mut self, f: F)
442 where
443 F: FnOnce(&mut T, &mut RecordKeeper),
444 {
445 self.try_modify(move |inner, records| {
446 f(inner, records);
447 Ok::<_, Infallible>(())
448 })
449 .unwrap_or_else(|e| match e {})
450 }
451
452 pub fn try_modify<F, E>(&mut self, f: F) -> Result<(), Rejected<E::Error>>
460 where
461 F: FnOnce(&mut T, &mut RecordKeeper) -> Result<(), E>,
462 E: OutcomeError,
463 {
464 debug_assert!(!self.is_done());
465
466 let quantities = self.value.quantities();
467 let mut records = RecordKeeper::new(&self.meta, quantities);
468
469 match f(&mut self.value, &mut records) {
470 Ok(()) => {
471 records.success(self.value.quantities());
472 Ok(())
473 }
474 Err(err) => {
475 let err = records.failure(err);
476 self.done.store(true, Ordering::Relaxed);
477 Err(err)
478 }
479 }
480 }
481
482 pub fn accept<F, S>(self, f: F) -> S
490 where
491 F: FnOnce(T) -> S,
492 {
493 self.try_accept(|item| Ok::<_, Infallible>(f(item)))
494 .unwrap_or_else(|err| match err.0 {})
495 }
496
497 pub fn try_accept<F, S, E>(self, f: F) -> Result<S, Rejected<E::Error>>
505 where
506 F: FnOnce(T) -> Result<S, E>,
507 E: OutcomeError,
508 {
509 debug_assert!(!self.is_done());
510
511 let (value, meta) = self.destructure();
512 let records = RecordKeeper::new(&meta, value.quantities());
513
514 match f(value) {
515 Ok(value) => {
516 records.accept();
517 Ok(value)
518 }
519 Err(err) => Err(records.failure(err)),
520 }
521 }
522
523 #[track_caller]
531 pub fn internal_error(&self, reason: &'static str) -> Rejected<()> {
532 relay_log::error!("internal error: {reason}");
533 debug_assert!(false, "internal error: {reason}");
534 self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
535 }
536
537 pub fn reject_err<E>(&self, error: E) -> Rejected<E::Error>
539 where
540 E: OutcomeError,
541 {
542 debug_assert!(!self.is_done());
543
544 let (outcome, error) = error.consume();
545 self.do_reject(outcome);
546 Rejected(error)
547 }
548
549 fn do_reject(&self, outcome: Option<Outcome>) {
550 let is_done = self.done.fetch_or(true, Ordering::Relaxed);
553
554 let Some(outcome) = outcome else {
556 return;
557 };
558
559 if !is_done {
565 for (category, quantity) in self.value.quantities() {
566 self.meta.track_outcome(outcome.clone(), category, quantity);
567 }
568 }
569 }
570
571 fn destructure(self) -> (T, Arc<Meta>) {
578 let this = ManuallyDrop::new(self);
588 let Managed { value, meta, done } = &*this;
589
590 let value = unsafe { std::ptr::read(value) };
591 let meta = unsafe { std::ptr::read(meta) };
592 let done = unsafe { std::ptr::read(done) };
593 debug_assert!(
596 !done.load(Ordering::Relaxed),
597 "a `done` managed should never be destructured"
598 );
599
600 (value, meta)
601 }
602
603 fn from_parts(value: T, meta: Arc<Meta>) -> Self {
604 Self {
605 value,
606 meta,
607 done: AtomicBool::new(false),
608 }
609 }
610
611 fn is_done(&self) -> bool {
612 self.done.load(Ordering::Relaxed)
613 }
614}
615
616impl<T: Counted> Drop for Managed<T> {
617 fn drop(&mut self) {
618 self.do_reject(Some(Outcome::Invalid(DiscardReason::Internal)));
619 }
620}
621
622impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
623 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624 write!(f, "Managed<{}>[", std::any::type_name::<T>())?;
625 for (i, (category, quantity)) in self.value.quantities().iter().enumerate() {
626 if i > 0 {
627 write!(f, ",")?;
628 }
629 write!(f, "{category}:{quantity}")?;
630 }
631 write!(f, "](")?;
632 self.value.fmt(f)?;
633 write!(f, ")")
634 }
635}
636
637impl<T: Counted> Managed<Option<T>> {
638 pub fn transpose(self) -> Option<Managed<T>> {
640 let (o, meta) = self.destructure();
641 o.map(|t| Managed::from_parts(t, meta))
642 }
643}
644
645impl<L: Counted, R: Counted> Managed<Either<L, R>> {
646 pub fn transpose(self) -> Either<Managed<L>, Managed<R>> {
648 let (either, meta) = self.destructure();
649 match either {
650 Either::Left(value) => Either::Left(Managed::from_parts(value, meta)),
651 Either::Right(value) => Either::Right(Managed::from_parts(value, meta)),
652 }
653 }
654}
655
656impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
657 fn from(value: Managed<Box<Envelope>>) -> Self {
658 let (value, meta) = value.destructure();
659 let mut envelope = ManagedEnvelope::new(value, meta.outcome_aggregator.clone());
660 envelope.scope(meta.scoping);
661 envelope
662 }
663}
664
665impl<T: Counted> AsRef<T> for Managed<T> {
666 fn as_ref(&self) -> &T {
667 &self.value
668 }
669}
670
671impl<T: Counted> std::ops::Deref for Managed<T> {
672 type Target = T;
673
674 fn deref(&self) -> &Self::Target {
675 &self.value
676 }
677}
678
679#[derive(Debug, Clone)]
681struct Meta {
682 outcome_aggregator: Addr<TrackOutcome>,
684 received_at: DateTime<Utc>,
688 scoping: Scoping,
690 event_id: Option<EventId>,
692 remote_addr: Option<IpAddr>,
694}
695
696impl Meta {
697 pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
698 self.outcome_aggregator.send(TrackOutcome {
699 timestamp: self.received_at,
700 scoping: self.scoping,
701 outcome,
702 event_id: self.event_id,
703 remote_addr: self.remote_addr,
704 category,
705 quantity: quantity.try_into().unwrap_or(u32::MAX),
706 });
707 }
708}
709
710pub struct RecordKeeper<'a> {
713 meta: &'a Meta,
714 on_drop: Quantities,
715 #[cfg(debug_assertions)]
716 lenient: SmallVec<[DataCategory; 1]>,
717 #[cfg(debug_assertions)]
718 modifications: BTreeMap<DataCategory, isize>,
719 in_flight: SmallVec<[(DataCategory, usize, Option<Outcome>); 2]>,
720}
721
722impl<'a> RecordKeeper<'a> {
723 fn new(meta: &'a Meta, quantities: Quantities) -> Self {
724 Self {
725 meta,
726 on_drop: quantities,
727 #[cfg(debug_assertions)]
728 lenient: Default::default(),
729 #[cfg(debug_assertions)]
730 modifications: Default::default(),
731 in_flight: Default::default(),
732 }
733 }
734
735 pub fn lenient(&mut self, category: DataCategory) {
742 let _category = category;
743 #[cfg(debug_assertions)]
744 self.lenient.push(_category);
745 }
746
747 pub fn modify_by(&mut self, category: DataCategory, offset: isize) {
755 let _category = category;
756 let _offset = offset;
757 #[cfg(debug_assertions)]
758 {
759 *self.modifications.entry(_category).or_default() += offset;
760 }
761 }
762
763 fn failure<E>(mut self, error: E) -> Rejected<E::Error>
767 where
768 E: OutcomeError,
769 {
770 let (outcome, error) = error.consume();
771
772 if let Some(outcome) = outcome {
773 for (category, quantity) in std::mem::take(&mut self.on_drop) {
774 self.meta.track_outcome(outcome.clone(), category, quantity);
775 }
776 }
777
778 Rejected(error)
779 }
780
781 fn accept(mut self) {
789 debug_assert!(
790 self.in_flight.is_empty(),
791 "records accepted, but intermediate outcomes tracked"
792 );
793 self.on_drop.clear();
794 }
795
796 fn success(mut self, new: Quantities) {
802 let original = std::mem::take(&mut self.on_drop);
803 self.assert_quantities(original, new);
804
805 self.on_drop.clear();
806 for (category, quantity, outcome) in std::mem::take(&mut self.in_flight) {
807 if let Some(outcome) = outcome {
808 self.meta.track_outcome(outcome, category, quantity);
809 }
810 }
811 }
812
813 #[cfg(debug_assertions)]
818 fn assert_quantities(&self, original: Quantities, new: Quantities) {
819 macro_rules! emit {
820 ($category:expr, $($tt:tt)*) => {{
821 match self.lenient.contains(&$category) {
822 true => relay_log::debug!($($tt)*),
825 false => {
826 relay_log::error!("Original: {original:?}");
827 relay_log::error!("New: {new:?}");
828 relay_log::error!("Modifications: {:?}", self.modifications);
829 relay_log::error!("In Flight: {:?}", self.in_flight);
830 panic!($($tt)*)
831 }
832 }
833 }};
834 }
835
836 let mut sums = debug::Quantities::from(&original).0;
837 for (category, offset) in &self.modifications {
838 let v = sums.entry(*category).or_default();
839 match v.checked_add_signed(*offset) {
840 Some(result) => *v = result,
841 None => emit!(
842 category,
843 "Attempted to modify original quantity {v} into the negative ({offset})"
844 ),
845 }
846 }
847
848 for (category, quantity, outcome) in &self.in_flight {
849 match sums.get_mut(category) {
850 Some(c) if *c >= *quantity => *c -= *quantity,
851 Some(c) => emit!(
852 category,
853 "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there were only {c} items in the category originally"
854 ),
855 None => emit!(
856 category,
857 "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there never was an item in this category"
858 ),
859 }
860 }
861
862 for (category, quantity) in &new {
863 match sums.get_mut(category) {
864 Some(c) if *c >= *quantity => *c -= *quantity,
865 Some(c) => emit!(
866 category,
867 "New item has {quantity} items in category '{category}', but original (after emitted outcomes) only has {c} left"
868 ),
869 None => emit!(
870 category,
871 "New item has {quantity} items in category '{category}', but after emitted outcomes there are none left"
872 ),
873 }
874 }
875
876 for (category, quantity) in sums {
877 if quantity > 0 {
878 emit!(
879 category,
880 "Missing outcomes or mismatched quantity in category '{category}', off by {quantity}"
881 );
882 }
883 }
884 }
885
886 #[cfg(not(debug_assertions))]
887 fn assert_quantities(&self, _: Quantities, _: Quantities) {}
888}
889
890impl<'a> Drop for RecordKeeper<'a> {
891 fn drop(&mut self) {
892 for (category, quantity) in std::mem::take(&mut self.on_drop) {
893 self.meta.track_outcome(
894 Outcome::Invalid(DiscardReason::Internal),
895 category,
896 quantity,
897 );
898 }
899 }
900}
901
902impl RecordKeeper<'_> {
903 pub fn or_default<T, E, Q>(&mut self, r: Result<T, E>, q: Q) -> T
908 where
909 T: Default,
910 E: OutcomeError,
911 Q: Counted,
912 {
913 match r {
914 Ok(result) => result,
915 Err(err) => {
916 self.reject_err(err, q);
917 T::default()
918 }
919 }
920 }
921
922 pub fn reject_err<E, Q>(&mut self, err: E, q: Q) -> E::Error
927 where
928 E: OutcomeError,
929 Q: Counted,
930 {
931 let (outcome, err) = err.consume();
932 for (category, quantity) in q.quantities() {
933 self.in_flight.push((category, quantity, outcome.clone()))
934 }
935 err
936 }
937
938 #[track_caller]
942 pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
943 where
944 E: std::error::Error + 'static,
945 Q: Counted,
946 {
947 relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
948 debug_assert!(false, "internal error: {error}");
949 self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
950 }
951}
952
953pub struct Split<I, S>
955where
956 I: Iterator<Item = S>,
957 S: Counted,
958{
959 #[cfg(debug_assertions)]
960 quantities: Quantities,
961 items: I,
962 meta: Arc<Meta>,
963 exhausted: bool,
964}
965
966impl<I, S> Split<I, S>
967where
968 I: Iterator<Item = S>,
969 S: Counted,
970{
971 #[cfg(debug_assertions)]
974 fn subtract(&mut self, q: Quantities) {
975 for (category, quantities) in q {
976 let Some(orig_quantities) = self
977 .quantities
978 .iter_mut()
979 .find_map(|(c, q)| (*c == category).then_some(q))
980 else {
981 debug_assert!(
982 false,
983 "mismatching quantities, item split into category {category}, \
984 which originally was not present"
985 );
986 continue;
987 };
988
989 if *orig_quantities >= quantities {
990 *orig_quantities -= quantities;
991 } else {
992 debug_assert!(
993 false,
994 "in total more items produced in category {category} than originally available"
995 );
996 }
997 }
998 }
999}
1000
1001impl<I, S> Iterator for Split<I, S>
1002where
1003 I: Iterator<Item = S>,
1004 S: Counted,
1005{
1006 type Item = Managed<S>;
1007
1008 fn next(&mut self) -> Option<Self::Item> {
1009 let next = match self.items.next() {
1010 Some(next) => next,
1011 None => {
1012 self.exhausted = true;
1013 return None;
1014 }
1015 };
1016
1017 #[cfg(debug_assertions)]
1018 self.subtract(next.quantities());
1019
1020 Some(Managed::from_parts(next, Arc::clone(&self.meta)))
1021 }
1022}
1023
1024impl<I, S> Drop for Split<I, S>
1025where
1026 I: Iterator<Item = S>,
1027 S: Counted,
1028{
1029 fn drop(&mut self) {
1030 #[cfg(debug_assertions)]
1032 if self.exhausted {
1033 for (category, quantities) in &self.quantities {
1034 debug_assert!(
1035 *quantities == 0,
1036 "items split, but still {quantities} remaining in category {category}"
1037 );
1038 }
1039 }
1040
1041 if self.exhausted {
1042 return;
1043 }
1044
1045 for item in &mut self.items {
1051 for (category, quantity) in item.quantities() {
1052 self.meta.track_outcome(
1053 Outcome::Invalid(DiscardReason::Internal),
1054 category,
1055 quantity,
1056 );
1057 }
1058 }
1059 }
1060}
1061
1062impl<I, S> FusedIterator for Split<I, S>
1063where
1064 I: Iterator<Item = S> + FusedIterator,
1065 S: Counted,
1066{
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071 use super::*;
1072
1073 struct CountedVec(Vec<u32>);
1074
1075 impl Counted for CountedVec {
1076 fn quantities(&self) -> Quantities {
1077 smallvec::smallvec![(DataCategory::Error, self.0.len())]
1078 }
1079 }
1080
1081 struct CountedValue(u32);
1082
1083 impl Counted for CountedValue {
1084 fn quantities(&self) -> Quantities {
1085 smallvec::smallvec![(DataCategory::Error, 1)]
1086 }
1087 }
1088
1089 #[test]
1090 fn test_reject_err_no_outcome() {
1091 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1092 let (managed, mut handle) = Managed::for_test(value).build();
1093
1094 let _ = managed.reject_err((None, ()));
1096 handle.assert_no_outcomes();
1097
1098 drop(managed);
1100 handle.assert_no_outcomes();
1101 }
1102
1103 #[test]
1104 fn test_split_fully_consumed() {
1105 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1106 let (managed, mut handle) = Managed::for_test(value).build();
1107
1108 let s = managed
1109 .split(|value| value.0.into_iter().map(CountedValue))
1110 .collect::<Vec<_>>();
1112
1113 handle.assert_no_outcomes();
1114
1115 for (i, s) in s.into_iter().enumerate() {
1116 assert_eq!(s.as_ref().0, i as u32);
1117 let outcome = Outcome::Invalid(DiscardReason::Cors);
1118 let _ = s.reject_err((outcome.clone(), ()));
1119 handle.assert_outcome(&outcome, DataCategory::Error, 1);
1120 }
1121 }
1122
1123 #[test]
1124 fn test_split_partially_consumed_emits_remaining() {
1125 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1126 let (managed, mut handle) = Managed::for_test(value).build();
1127
1128 let mut s = managed.split(|value| value.0.into_iter().map(CountedValue));
1129 handle.assert_no_outcomes();
1130
1131 drop(s.next());
1132 handle.assert_internal_outcome(DataCategory::Error, 1);
1133 drop(s.next());
1134 handle.assert_internal_outcome(DataCategory::Error, 1);
1135 drop(s.next());
1136 handle.assert_internal_outcome(DataCategory::Error, 1);
1137 handle.assert_no_outcomes();
1138
1139 drop(s);
1140
1141 handle.assert_internal_outcome(DataCategory::Error, 1);
1142 handle.assert_internal_outcome(DataCategory::Error, 1);
1143 handle.assert_internal_outcome(DataCategory::Error, 1);
1144 }
1145
1146 #[test]
1147 fn test_split_changing_quantities_should_panic() {
1148 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1149 let (managed, mut handle) = Managed::for_test(value).build();
1150
1151 let mut s = managed.split(|_| std::iter::once(CountedValue(0)));
1152
1153 s.next().unwrap().accept(|_| {});
1154 handle.assert_no_outcomes();
1155
1156 assert!(s.next().is_none());
1157
1158 let r = std::panic::catch_unwind(move || {
1159 drop(s);
1160 });
1161
1162 assert!(
1163 r.is_err(),
1164 "expected split to panic because of mismatched (not enough) outcomes"
1165 );
1166 }
1167
1168 #[test]
1169 fn test_split_more_outcomes_than_before_should_panic() {
1170 let value = CountedVec(vec![0]);
1171 let (managed, mut handle) = Managed::for_test(value).build();
1172
1173 let mut s = managed.split(|_| vec![CountedValue(0), CountedValue(2)].into_iter());
1174
1175 s.next().unwrap().accept(|_| {});
1176 handle.assert_no_outcomes();
1177
1178 let r = std::panic::catch_unwind(move || {
1179 s.next();
1180 });
1181
1182 assert!(
1183 r.is_err(),
1184 "expected split to panic because of mismatched (too many) outcomes"
1185 );
1186 }
1187
1188 #[test]
1189 fn test_split_changing_categories_should_panic() {
1190 struct Special;
1191 impl Counted for Special {
1192 fn quantities(&self) -> Quantities {
1193 smallvec::smallvec![(DataCategory::Error, 1), (DataCategory::Transaction, 1)]
1194 }
1195 }
1196
1197 let value = CountedVec(vec![0]);
1198 let (managed, _handle) = Managed::for_test(value).build();
1199
1200 let mut s = managed.split(|value| value.0.into_iter().map(|_| Special));
1201
1202 let r = std::panic::catch_unwind(move || {
1203 let _ = s.next();
1204 });
1205
1206 assert!(
1207 r.is_err(),
1208 "expected split to panic because of mismatched outcome categories"
1209 );
1210 }
1211
1212 #[test]
1213 fn test_split_assert_fused() {
1214 fn only_fused<T: FusedIterator>(_: T) {}
1215
1216 let (managed, mut handle) = Managed::for_test(CountedVec(vec![0])).build();
1217 only_fused(managed.split(|value| value.0.into_iter().map(CountedValue)));
1218 handle.assert_internal_outcome(DataCategory::Error, 1);
1219 }
1220}