1#[cfg(debug_assertions)]
2use std::collections::BTreeMap;
3use std::convert::Infallible;
4use std::fmt;
5use std::iter::FusedIterator;
6use std::mem::ManuallyDrop;
7use std::net::IpAddr;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use chrono::{DateTime, Utc};
12use relay_event_schema::protocol::EventId;
13use relay_quotas::{DataCategory, Scoping};
14use relay_system::Addr;
15use smallvec::SmallVec;
16
17use crate::Envelope;
18use crate::managed::{Counted, ManagedEnvelope, Quantities};
19use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
20use crate::services::processor::ProcessingError;
21
22#[cfg(debug_assertions)]
23mod debug;
24#[cfg(test)]
25mod test;
26
27#[cfg(test)]
28pub use self::test::*;
29
30pub trait OutcomeError {
32 type Error;
34
35 fn consume(self) -> (Option<Outcome>, Self::Error);
39}
40
41impl OutcomeError for Outcome {
42 type Error = ();
43
44 fn consume(self) -> (Option<Outcome>, Self::Error) {
45 (self, ()).consume()
46 }
47}
48
49impl<E> OutcomeError for (Outcome, E) {
50 type Error = E;
51
52 fn consume(self) -> (Option<Outcome>, Self::Error) {
53 (Some(self.0), self.1)
54 }
55}
56
57impl<E> OutcomeError for (Option<Outcome>, E) {
58 type Error = E;
59
60 fn consume(self) -> (Option<Outcome>, Self::Error) {
61 self
62 }
63}
64
65impl OutcomeError for ProcessingError {
66 type Error = Self;
67
68 fn consume(self) -> (Option<Outcome>, Self::Error) {
69 (self.to_outcome(), self)
70 }
71}
72
73impl OutcomeError for Infallible {
74 type Error = Self;
75
76 fn consume(self) -> (Option<Outcome>, Self::Error) {
77 match self {}
78 }
79}
80
81#[derive(Debug, Clone, Copy)]
86#[must_use = "a rejection must be propagated"]
87pub struct Rejected<T>(T);
88
89impl<T> Rejected<T> {
90 pub fn into_inner(self) -> T {
92 self.0
93 }
94
95 pub fn map<F, S>(self, f: F) -> Rejected<S>
97 where
98 F: FnOnce(T) -> S,
99 {
100 Rejected(f(self.0))
101 }
102}
103
104impl<T> std::error::Error for Rejected<T>
105where
106 T: std::error::Error,
107{
108 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
109 self.0.source()
110 }
111}
112
113impl<T> std::fmt::Display for Rejected<T>
114where
115 T: std::fmt::Display,
116{
117 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118 self.0.fmt(f)
119 }
120}
121
122pub struct Managed<T: Counted> {
124 value: T,
125 meta: Arc<Meta>,
126 done: AtomicBool,
127}
128
129impl<T: Counted> Managed<T> {
130 pub fn from_envelope(envelope: &ManagedEnvelope, value: T) -> Self {
135 Self::from_parts(
136 value,
137 Arc::new(Meta {
138 outcome_aggregator: envelope.outcome_aggregator().clone(),
139 received_at: envelope.received_at(),
140 scoping: envelope.scoping(),
141 event_id: envelope.envelope().event_id(),
142 remote_addr: envelope.meta().remote_addr(),
143 }),
144 )
145 }
146
147 pub fn wrap<S>(&self, other: S) -> Managed<S>
149 where
150 S: Counted,
151 {
152 Managed::from_parts(other, Arc::clone(&self.meta))
153 }
154
155 pub fn received_at(&self) -> DateTime<Utc> {
157 self.meta.received_at
158 }
159
160 pub fn scoping(&self) -> Scoping {
162 self.meta.scoping
163 }
164
165 pub fn split_once<F, S, U>(self, f: F) -> (Managed<S>, Managed<U>)
171 where
172 F: FnOnce(T) -> (S, U),
173 S: Counted,
174 U: Counted,
175 {
176 debug_assert!(!self.is_done());
177
178 let (value, meta) = self.destructure();
179 #[cfg(debug_assertions)]
180 let quantities = value.quantities();
181
182 let (a, b) = f(value);
183
184 #[cfg(debug_assertions)]
185 debug::Quantities::from(&quantities)
186 .assert_only_extra(debug::Quantities::from(&a) + debug::Quantities::from(&b));
191
192 (
193 Managed::from_parts(a, Arc::clone(&meta)),
194 Managed::from_parts(b, meta),
195 )
196 }
197
198 pub fn split<F, I, S>(self, f: F) -> Split<I::IntoIter, I::Item>
203 where
204 F: FnOnce(T) -> I,
205 I: IntoIterator<Item = S>,
206 S: Counted,
207 {
208 self.split_with_context(|value| (f(value), ())).0
209 }
210
211 pub fn split_with_context<F, I, S, C>(self, f: F) -> (Split<I::IntoIter, I::Item>, C)
216 where
217 F: FnOnce(T) -> (I, C),
218 I: IntoIterator<Item = S>,
219 S: Counted,
220 {
221 debug_assert!(!self.is_done());
222
223 let (value, meta) = self.destructure();
224 #[cfg(debug_assertions)]
225 let quantities = value.quantities();
226
227 let (items, context) = f(value);
228
229 (
230 Split {
231 #[cfg(debug_assertions)]
232 quantities,
233 items: items.into_iter(),
234 meta,
235 exhausted: false,
236 },
237 context,
238 )
239 }
240
241 pub fn retain<S, I, U, E>(&mut self, select: S, mut retain: U)
277 where
278 S: FnOnce(&mut T) -> &mut Vec<I>,
279 I: Counted,
280 U: FnMut(&mut I, &mut RecordKeeper<'_>) -> Result<(), E>,
281 E: OutcomeError,
282 {
283 self.retain_with_context(
284 |inner| (select(inner), &()),
285 |item, _, records| retain(item, records),
286 );
287 }
288
289 pub fn retain_with_context<S, C, I, U, E>(&mut self, select: S, mut retain: U)
326 where
327 for<'a> S: FnOnce(&'a mut T) -> (&'a mut Vec<I>, &'a C),
332 I: Counted,
333 U: FnMut(&mut I, &C, &mut RecordKeeper<'_>) -> Result<(), E>,
334 E: OutcomeError,
335 {
336 self.modify(|inner, records| {
337 let (items, ctx) = select(inner);
338 items.retain_mut(|item| match retain(item, ctx, records) {
339 Ok(()) => true,
340 Err(err) => {
341 records.reject_err(err, &*item);
342 false
343 }
344 })
345 });
346 }
347
348 pub fn map<S, F>(self, f: F) -> Managed<S>
352 where
353 F: FnOnce(T, &mut RecordKeeper) -> S,
354 S: Counted,
355 {
356 self.try_map(move |inner, records| Ok::<_, Infallible>(f(inner, records)))
357 .unwrap_or_else(|e| match e.0 {})
358 }
359
360 pub fn try_map<S, F, E>(self, f: F) -> Result<Managed<S>, Rejected<E::Error>>
368 where
369 F: FnOnce(T, &mut RecordKeeper) -> Result<S, E>,
370 S: Counted,
371 E: OutcomeError,
372 {
373 debug_assert!(!self.is_done());
374
375 let (value, meta) = self.destructure();
376 let quantities = value.quantities();
377
378 let mut records = RecordKeeper::new(&meta, quantities);
379
380 match f(value, &mut records) {
381 Ok(value) => {
382 records.success(value.quantities());
383 Ok(Managed::from_parts(value, meta))
384 }
385 Err(err) => Err(records.failure(err)),
386 }
387 }
388
389 pub fn modify<F>(&mut self, f: F)
393 where
394 F: FnOnce(&mut T, &mut RecordKeeper),
395 {
396 self.try_modify(move |inner, records| {
397 f(inner, records);
398 Ok::<_, Infallible>(())
399 })
400 .unwrap_or_else(|e| match e {})
401 }
402
403 pub fn try_modify<F, E>(&mut self, f: F) -> Result<(), Rejected<E::Error>>
411 where
412 F: FnOnce(&mut T, &mut RecordKeeper) -> Result<(), E>,
413 E: OutcomeError,
414 {
415 debug_assert!(!self.is_done());
416
417 let quantities = self.value.quantities();
418 let mut records = RecordKeeper::new(&self.meta, quantities);
419
420 match f(&mut self.value, &mut records) {
421 Ok(()) => {
422 records.success(self.value.quantities());
423 Ok(())
424 }
425 Err(err) => {
426 let err = records.failure(err);
427 self.done.store(true, Ordering::Relaxed);
428 Err(err)
429 }
430 }
431 }
432
433 pub fn accept<F, S>(self, f: F) -> S
441 where
442 F: FnOnce(T) -> S,
443 {
444 self.try_accept(|item| Ok::<_, Infallible>(f(item)))
445 .unwrap_or_else(|err| match err.0 {})
446 }
447
448 pub fn try_accept<F, S, E>(self, f: F) -> Result<S, Rejected<E::Error>>
456 where
457 F: FnOnce(T) -> Result<S, E>,
458 E: OutcomeError,
459 {
460 debug_assert!(!self.is_done());
461
462 let (value, meta) = self.destructure();
463 let records = RecordKeeper::new(&meta, value.quantities());
464
465 match f(value) {
466 Ok(value) => {
467 records.accept();
468 Ok(value)
469 }
470 Err(err) => Err(records.failure(err)),
471 }
472 }
473
474 #[track_caller]
482 pub fn internal_error(&self, reason: &'static str) -> Rejected<()> {
483 relay_log::error!("internal error: {reason}");
484 debug_assert!(false, "internal error: {reason}");
485 self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
486 }
487
488 pub fn reject_err<E>(&self, error: E) -> Rejected<E::Error>
490 where
491 E: OutcomeError,
492 {
493 debug_assert!(!self.is_done());
494
495 let (outcome, error) = error.consume();
496 self.do_reject(outcome);
497 Rejected(error)
498 }
499
500 fn do_reject(&self, outcome: Option<Outcome>) {
501 let is_done = self.done.fetch_or(true, Ordering::Relaxed);
504
505 let Some(outcome) = outcome else {
507 return;
508 };
509
510 if !is_done {
516 for (category, quantity) in self.value.quantities() {
517 self.meta.track_outcome(outcome.clone(), category, quantity);
518 }
519 }
520 }
521
522 fn destructure(self) -> (T, Arc<Meta>) {
529 let this = ManuallyDrop::new(self);
539 let Managed { value, meta, done } = &*this;
540
541 let value = unsafe { std::ptr::read(value) };
542 let meta = unsafe { std::ptr::read(meta) };
543 let done = unsafe { std::ptr::read(done) };
544 debug_assert!(
547 !done.load(Ordering::Relaxed),
548 "a `done` managed should never be destructured"
549 );
550
551 (value, meta)
552 }
553
554 fn from_parts(value: T, meta: Arc<Meta>) -> Self {
555 Self {
556 value,
557 meta,
558 done: AtomicBool::new(false),
559 }
560 }
561
562 fn is_done(&self) -> bool {
563 self.done.load(Ordering::Relaxed)
564 }
565}
566
567impl<T: Counted> Drop for Managed<T> {
568 fn drop(&mut self) {
569 self.do_reject(Some(Outcome::Invalid(DiscardReason::Internal)));
570 }
571}
572
573impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
574 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
575 write!(f, "Managed<{}>[", std::any::type_name::<T>())?;
576 for (i, (category, quantity)) in self.value.quantities().iter().enumerate() {
577 if i > 0 {
578 write!(f, ",")?;
579 }
580 write!(f, "{category}:{quantity}")?;
581 }
582 write!(f, "](")?;
583 self.value.fmt(f)?;
584 write!(f, ")")
585 }
586}
587
588impl<T: Counted> Managed<Option<T>> {
589 pub fn transpose(self) -> Option<Managed<T>> {
591 let (o, meta) = self.destructure();
592 o.map(|t| Managed::from_parts(t, meta))
593 }
594}
595
596impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
597 fn from(value: Managed<Box<Envelope>>) -> Self {
598 let (value, meta) = value.destructure();
599 let mut envelope = ManagedEnvelope::new(value, meta.outcome_aggregator.clone());
600 envelope.scope(meta.scoping);
601 envelope
602 }
603}
604
605impl<T: Counted> AsRef<T> for Managed<T> {
606 fn as_ref(&self) -> &T {
607 &self.value
608 }
609}
610
611impl<T: Counted> std::ops::Deref for Managed<T> {
612 type Target = T;
613
614 fn deref(&self) -> &Self::Target {
615 &self.value
616 }
617}
618
619struct Meta {
621 outcome_aggregator: Addr<TrackOutcome>,
623 received_at: DateTime<Utc>,
627 scoping: Scoping,
629 event_id: Option<EventId>,
631 remote_addr: Option<IpAddr>,
633}
634
635impl Meta {
636 pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
637 self.outcome_aggregator.send(TrackOutcome {
638 timestamp: self.received_at,
639 scoping: self.scoping,
640 outcome,
641 event_id: self.event_id,
642 remote_addr: self.remote_addr,
643 category,
644 quantity: quantity.try_into().unwrap_or(u32::MAX),
645 });
646 }
647}
648
649pub struct RecordKeeper<'a> {
652 meta: &'a Meta,
653 on_drop: Quantities,
654 #[cfg(debug_assertions)]
655 lenient: SmallVec<[DataCategory; 1]>,
656 #[cfg(debug_assertions)]
657 modifications: BTreeMap<DataCategory, isize>,
658 in_flight: SmallVec<[(DataCategory, usize, Option<Outcome>); 2]>,
659}
660
661impl<'a> RecordKeeper<'a> {
662 fn new(meta: &'a Meta, quantities: Quantities) -> Self {
663 Self {
664 meta,
665 on_drop: quantities,
666 #[cfg(debug_assertions)]
667 lenient: Default::default(),
668 #[cfg(debug_assertions)]
669 modifications: Default::default(),
670 in_flight: Default::default(),
671 }
672 }
673
674 pub fn lenient(&mut self, category: DataCategory) {
681 let _category = category;
682 #[cfg(debug_assertions)]
683 self.lenient.push(_category);
684 }
685
686 pub fn modify_by(&mut self, category: DataCategory, offset: isize) {
694 let _category = category;
695 let _offset = offset;
696 #[cfg(debug_assertions)]
697 {
698 *self.modifications.entry(_category).or_default() += offset;
699 }
700 }
701
702 fn failure<E>(mut self, error: E) -> Rejected<E::Error>
706 where
707 E: OutcomeError,
708 {
709 let (outcome, error) = error.consume();
710
711 if let Some(outcome) = outcome {
712 for (category, quantity) in std::mem::take(&mut self.on_drop) {
713 self.meta.track_outcome(outcome.clone(), category, quantity);
714 }
715 }
716
717 Rejected(error)
718 }
719
720 fn accept(mut self) {
728 debug_assert!(
729 self.in_flight.is_empty(),
730 "records accepted, but intermediate outcomes tracked"
731 );
732 self.on_drop.clear();
733 }
734
735 fn success(mut self, new: Quantities) {
741 let original = std::mem::take(&mut self.on_drop);
742 self.assert_quantities(original, new);
743
744 self.on_drop.clear();
745 for (category, quantity, outcome) in std::mem::take(&mut self.in_flight) {
746 if let Some(outcome) = outcome {
747 self.meta.track_outcome(outcome, category, quantity);
748 }
749 }
750 }
751
752 #[cfg(debug_assertions)]
757 fn assert_quantities(&self, original: Quantities, new: Quantities) {
758 macro_rules! emit {
759 ($category:expr, $($tt:tt)*) => {{
760 match self.lenient.contains(&$category) {
761 true => relay_log::debug!($($tt)*),
764 false => {
765 relay_log::error!("Original: {original:?}");
766 relay_log::error!("New: {new:?}");
767 relay_log::error!("Modifications: {:?}", self.modifications);
768 relay_log::error!("In Flight: {:?}", self.in_flight);
769 panic!($($tt)*)
770 }
771 }
772 }};
773 }
774
775 let mut sums = debug::Quantities::from(&original).0;
776 for (category, offset) in &self.modifications {
777 let v = sums.entry(*category).or_default();
778 match v.checked_add_signed(*offset) {
779 Some(result) => *v = result,
780 None => emit!(
781 category,
782 "Attempted to modify original quantity {v} into the negative ({offset})"
783 ),
784 }
785 }
786
787 for (category, quantity, outcome) in &self.in_flight {
788 match sums.get_mut(category) {
789 Some(c) if *c >= *quantity => *c -= *quantity,
790 Some(c) => emit!(
791 category,
792 "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there were only {c} items in the category originally"
793 ),
794 None => emit!(
795 category,
796 "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there never was an item in this category"
797 ),
798 }
799 }
800
801 for (category, quantity) in &new {
802 match sums.get_mut(category) {
803 Some(c) if *c >= *quantity => *c -= *quantity,
804 Some(c) => emit!(
805 category,
806 "New item has {quantity} items in category '{category}', but original (after emitted outcomes) only has {c} left"
807 ),
808 None => emit!(
809 category,
810 "New item has {quantity} items in category '{category}', but after emitted outcomes there are none left"
811 ),
812 }
813 }
814
815 for (category, quantity) in sums {
816 if quantity > 0 {
817 emit!(
818 category,
819 "Missing outcomes or mismatched quantity in category '{category}', off by {quantity}"
820 );
821 }
822 }
823 }
824
825 #[cfg(not(debug_assertions))]
826 fn assert_quantities(&self, _: Quantities, _: Quantities) {}
827}
828
829impl<'a> Drop for RecordKeeper<'a> {
830 fn drop(&mut self) {
831 for (category, quantity) in std::mem::take(&mut self.on_drop) {
832 self.meta.track_outcome(
833 Outcome::Invalid(DiscardReason::Internal),
834 category,
835 quantity,
836 );
837 }
838 }
839}
840
841impl RecordKeeper<'_> {
842 pub fn or_default<T, E, Q>(&mut self, r: Result<T, E>, q: Q) -> T
847 where
848 T: Default,
849 E: OutcomeError,
850 Q: Counted,
851 {
852 match r {
853 Ok(result) => result,
854 Err(err) => {
855 self.reject_err(err, q);
856 T::default()
857 }
858 }
859 }
860
861 pub fn reject_err<E, Q>(&mut self, err: E, q: Q) -> E::Error
866 where
867 E: OutcomeError,
868 Q: Counted,
869 {
870 let (outcome, err) = err.consume();
871 for (category, quantity) in q.quantities() {
872 self.in_flight.push((category, quantity, outcome.clone()))
873 }
874 err
875 }
876
877 #[track_caller]
881 pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
882 where
883 E: std::error::Error + 'static,
884 Q: Counted,
885 {
886 relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
887 debug_assert!(false, "internal error: {error}");
888 self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
889 }
890}
891
892pub struct Split<I, S>
894where
895 I: Iterator<Item = S>,
896 S: Counted,
897{
898 #[cfg(debug_assertions)]
899 quantities: Quantities,
900 items: I,
901 meta: Arc<Meta>,
902 exhausted: bool,
903}
904
905impl<I, S> Split<I, S>
906where
907 I: Iterator<Item = S>,
908 S: Counted,
909{
910 #[cfg(debug_assertions)]
913 fn subtract(&mut self, q: Quantities) {
914 for (category, quantities) in q {
915 let Some(orig_quantities) = self
916 .quantities
917 .iter_mut()
918 .find_map(|(c, q)| (*c == category).then_some(q))
919 else {
920 debug_assert!(
921 false,
922 "mismatching quantities, item split into category {category}, \
923 which originally was not present"
924 );
925 continue;
926 };
927
928 if *orig_quantities >= quantities {
929 *orig_quantities -= quantities;
930 } else {
931 debug_assert!(
932 false,
933 "in total more items produced in category {category} than originally available"
934 );
935 }
936 }
937 }
938}
939
940impl<I, S> Iterator for Split<I, S>
941where
942 I: Iterator<Item = S>,
943 S: Counted,
944{
945 type Item = Managed<S>;
946
947 fn next(&mut self) -> Option<Self::Item> {
948 let next = match self.items.next() {
949 Some(next) => next,
950 None => {
951 self.exhausted = true;
952 return None;
953 }
954 };
955
956 #[cfg(debug_assertions)]
957 self.subtract(next.quantities());
958
959 Some(Managed::from_parts(next, Arc::clone(&self.meta)))
960 }
961}
962
963impl<I, S> Drop for Split<I, S>
964where
965 I: Iterator<Item = S>,
966 S: Counted,
967{
968 fn drop(&mut self) {
969 #[cfg(debug_assertions)]
971 if self.exhausted {
972 for (category, quantities) in &self.quantities {
973 debug_assert!(
974 *quantities == 0,
975 "items split, but still {quantities} remaining in category {category}"
976 );
977 }
978 }
979
980 if self.exhausted {
981 return;
982 }
983
984 for item in &mut self.items {
990 for (category, quantity) in item.quantities() {
991 self.meta.track_outcome(
992 Outcome::Invalid(DiscardReason::Internal),
993 category,
994 quantity,
995 );
996 }
997 }
998 }
999}
1000
1001impl<I, S> FusedIterator for Split<I, S>
1002where
1003 I: Iterator<Item = S> + FusedIterator,
1004 S: Counted,
1005{
1006}
1007
1008#[cfg(test)]
1009mod tests {
1010 use super::*;
1011
1012 struct CountedVec(Vec<u32>);
1013
1014 impl Counted for CountedVec {
1015 fn quantities(&self) -> Quantities {
1016 smallvec::smallvec![(DataCategory::Error, self.0.len())]
1017 }
1018 }
1019
1020 struct CountedValue(u32);
1021
1022 impl Counted for CountedValue {
1023 fn quantities(&self) -> Quantities {
1024 smallvec::smallvec![(DataCategory::Error, 1)]
1025 }
1026 }
1027
1028 #[test]
1029 fn test_reject_err_no_outcome() {
1030 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1031 let (managed, mut handle) = Managed::for_test(value).build();
1032
1033 let _ = managed.reject_err((None, ()));
1035 handle.assert_no_outcomes();
1036
1037 drop(managed);
1039 handle.assert_no_outcomes();
1040 }
1041
1042 #[test]
1043 fn test_split_fully_consumed() {
1044 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1045 let (managed, mut handle) = Managed::for_test(value).build();
1046
1047 let s = managed
1048 .split(|value| value.0.into_iter().map(CountedValue))
1049 .collect::<Vec<_>>();
1051
1052 handle.assert_no_outcomes();
1053
1054 for (i, s) in s.into_iter().enumerate() {
1055 assert_eq!(s.as_ref().0, i as u32);
1056 let outcome = Outcome::Invalid(DiscardReason::Cors);
1057 let _ = s.reject_err((outcome.clone(), ()));
1058 handle.assert_outcome(&outcome, DataCategory::Error, 1);
1059 }
1060 }
1061
1062 #[test]
1063 fn test_split_partially_consumed_emits_remaining() {
1064 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1065 let (managed, mut handle) = Managed::for_test(value).build();
1066
1067 let mut s = managed.split(|value| value.0.into_iter().map(CountedValue));
1068 handle.assert_no_outcomes();
1069
1070 drop(s.next());
1071 handle.assert_internal_outcome(DataCategory::Error, 1);
1072 drop(s.next());
1073 handle.assert_internal_outcome(DataCategory::Error, 1);
1074 drop(s.next());
1075 handle.assert_internal_outcome(DataCategory::Error, 1);
1076 handle.assert_no_outcomes();
1077
1078 drop(s);
1079
1080 handle.assert_internal_outcome(DataCategory::Error, 1);
1081 handle.assert_internal_outcome(DataCategory::Error, 1);
1082 handle.assert_internal_outcome(DataCategory::Error, 1);
1083 }
1084
1085 #[test]
1086 fn test_split_changing_quantities_should_panic() {
1087 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1088 let (managed, mut handle) = Managed::for_test(value).build();
1089
1090 let mut s = managed.split(|_| std::iter::once(CountedValue(0)));
1091
1092 s.next().unwrap().accept(|_| {});
1093 handle.assert_no_outcomes();
1094
1095 assert!(s.next().is_none());
1096
1097 let r = std::panic::catch_unwind(move || {
1098 drop(s);
1099 });
1100
1101 assert!(
1102 r.is_err(),
1103 "expected split to panic because of mismatched (not enough) outcomes"
1104 );
1105 }
1106
1107 #[test]
1108 fn test_split_more_outcomes_than_before_should_panic() {
1109 let value = CountedVec(vec![0]);
1110 let (managed, mut handle) = Managed::for_test(value).build();
1111
1112 let mut s = managed.split(|_| vec![CountedValue(0), CountedValue(2)].into_iter());
1113
1114 s.next().unwrap().accept(|_| {});
1115 handle.assert_no_outcomes();
1116
1117 let r = std::panic::catch_unwind(move || {
1118 s.next();
1119 });
1120
1121 assert!(
1122 r.is_err(),
1123 "expected split to panic because of mismatched (too many) outcomes"
1124 );
1125 }
1126
1127 #[test]
1128 fn test_split_changing_categories_should_panic() {
1129 struct Special;
1130 impl Counted for Special {
1131 fn quantities(&self) -> Quantities {
1132 smallvec::smallvec![(DataCategory::Error, 1), (DataCategory::Transaction, 1)]
1133 }
1134 }
1135
1136 let value = CountedVec(vec![0]);
1137 let (managed, _handle) = Managed::for_test(value).build();
1138
1139 let mut s = managed.split(|value| value.0.into_iter().map(|_| Special));
1140
1141 let r = std::panic::catch_unwind(move || {
1142 let _ = s.next();
1143 });
1144
1145 assert!(
1146 r.is_err(),
1147 "expected split to panic because of mismatched outcome categories"
1148 );
1149 }
1150
1151 #[test]
1152 fn test_split_assert_fused() {
1153 fn only_fused<T: FusedIterator>(_: T) {}
1154
1155 let (managed, mut handle) = Managed::for_test(CountedVec(vec![0])).build();
1156 only_fused(managed.split(|value| value.0.into_iter().map(CountedValue)));
1157 handle.assert_internal_outcome(DataCategory::Error, 1);
1158 }
1159}