1#[cfg(debug_assertions)]
2use std::collections::BTreeMap;
3use std::convert::Infallible;
4use std::fmt;
5use std::iter::FusedIterator;
6use std::mem::ManuallyDrop;
7use std::net::IpAddr;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use chrono::{DateTime, Utc};
12use relay_event_schema::protocol::EventId;
13use relay_quotas::{DataCategory, Scoping};
14use relay_system::Addr;
15use smallvec::SmallVec;
16
17use crate::Envelope;
18use crate::managed::{Counted, ManagedEnvelope, Quantities};
19use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
20use crate::services::processor::ProcessingError;
21
22#[cfg(debug_assertions)]
23mod debug;
24#[cfg(test)]
25mod test;
26
27#[cfg(test)]
28pub use self::test::*;
29
30pub trait OutcomeError {
32 type Error;
34
35 fn consume(self) -> (Option<Outcome>, Self::Error);
39}
40
41impl OutcomeError for Outcome {
42 type Error = ();
43
44 fn consume(self) -> (Option<Outcome>, Self::Error) {
45 (self, ()).consume()
46 }
47}
48
49impl<E> OutcomeError for (Outcome, E) {
50 type Error = E;
51
52 fn consume(self) -> (Option<Outcome>, Self::Error) {
53 (Some(self.0), self.1)
54 }
55}
56
57impl<E> OutcomeError for (Option<Outcome>, E) {
58 type Error = E;
59
60 fn consume(self) -> (Option<Outcome>, Self::Error) {
61 self
62 }
63}
64
65impl OutcomeError for ProcessingError {
66 type Error = Self;
67
68 fn consume(self) -> (Option<Outcome>, Self::Error) {
69 (self.to_outcome(), self)
70 }
71}
72
73impl OutcomeError for Infallible {
74 type Error = Self;
75
76 fn consume(self) -> (Option<Outcome>, Self::Error) {
77 match self {}
78 }
79}
80
81#[derive(Debug, Clone, Copy)]
86#[must_use = "a rejection must be propagated"]
87pub struct Rejected<T>(T);
88
89impl<T> Rejected<T> {
90 pub fn into_inner(self) -> T {
92 self.0
93 }
94
95 pub fn map<F, S>(self, f: F) -> Rejected<S>
97 where
98 F: FnOnce(T) -> S,
99 {
100 Rejected(f(self.0))
101 }
102}
103
104impl<T> std::error::Error for Rejected<T>
105where
106 T: std::error::Error,
107{
108 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
109 self.0.source()
110 }
111}
112
113impl<T> std::fmt::Display for Rejected<T>
114where
115 T: std::fmt::Display,
116{
117 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118 self.0.fmt(f)
119 }
120}
121
122pub struct Managed<T: Counted> {
124 value: T,
125 meta: Arc<Meta>,
126 done: AtomicBool,
127}
128
129impl<T: Counted> Managed<T> {
130 pub fn from_envelope(envelope: &ManagedEnvelope, value: T) -> Self {
135 Self::from_parts(
136 value,
137 Arc::new(Meta {
138 outcome_aggregator: envelope.outcome_aggregator().clone(),
139 received_at: envelope.received_at(),
140 scoping: envelope.scoping(),
141 event_id: envelope.envelope().event_id(),
142 remote_addr: envelope.meta().remote_addr(),
143 }),
144 )
145 }
146
147 pub fn wrap<S>(&self, other: S) -> Managed<S>
149 where
150 S: Counted,
151 {
152 Managed::from_parts(other, Arc::clone(&self.meta))
153 }
154
155 pub fn received_at(&self) -> DateTime<Utc> {
157 self.meta.received_at
158 }
159
160 pub fn scoping(&self) -> Scoping {
162 self.meta.scoping
163 }
164
165 pub fn split_once<F, S, U>(self, f: F) -> (Managed<S>, Managed<U>)
171 where
172 F: FnOnce(T) -> (S, U),
173 S: Counted,
174 U: Counted,
175 {
176 debug_assert!(!self.is_done());
177
178 let (value, meta) = self.destructure();
179 #[cfg(debug_assertions)]
180 let quantities = value.quantities();
181
182 let (a, b) = f(value);
183
184 #[cfg(debug_assertions)]
185 debug::Quantities::from(&quantities)
186 .assert_only_extra(debug::Quantities::from(&a) + debug::Quantities::from(&b));
191
192 (
193 Managed::from_parts(a, Arc::clone(&meta)),
194 Managed::from_parts(b, meta),
195 )
196 }
197
198 pub fn split<F, I, S>(self, f: F) -> Split<I::IntoIter, I::Item>
203 where
204 F: FnOnce(T) -> I,
205 I: IntoIterator<Item = S>,
206 S: Counted,
207 {
208 self.split_with_context(|value| (f(value), ())).0
209 }
210
211 pub fn split_with_context<F, I, S, C>(self, f: F) -> (Split<I::IntoIter, I::Item>, C)
216 where
217 F: FnOnce(T) -> (I, C),
218 I: IntoIterator<Item = S>,
219 S: Counted,
220 {
221 debug_assert!(!self.is_done());
222
223 let (value, meta) = self.destructure();
224 #[cfg(debug_assertions)]
225 let quantities = value.quantities();
226
227 let (items, context) = f(value);
228
229 (
230 Split {
231 #[cfg(debug_assertions)]
232 quantities,
233 items: items.into_iter(),
234 meta,
235 exhausted: false,
236 },
237 context,
238 )
239 }
240
241 pub fn retain<S, I, U, E>(&mut self, select: S, mut retain: U)
277 where
278 S: FnOnce(&mut T) -> &mut Vec<I>,
279 I: Counted,
280 U: FnMut(&mut I, &mut RecordKeeper<'_>) -> Result<(), E>,
281 E: OutcomeError,
282 {
283 self.retain_with_context(
284 |inner| (select(inner), &()),
285 |item, _, records| retain(item, records),
286 );
287 }
288
289 pub fn retain_with_context<S, C, I, U, E>(&mut self, select: S, mut retain: U)
326 where
327 for<'a> S: FnOnce(&'a mut T) -> (&'a mut Vec<I>, &'a C),
332 I: Counted,
333 U: FnMut(&mut I, &C, &mut RecordKeeper<'_>) -> Result<(), E>,
334 E: OutcomeError,
335 {
336 self.modify(|inner, records| {
337 let (items, ctx) = select(inner);
338 items.retain_mut(|item| match retain(item, ctx, records) {
339 Ok(()) => true,
340 Err(err) => {
341 records.reject_err(err, &*item);
342 false
343 }
344 })
345 });
346 }
347
348 pub fn map<S, F>(self, f: F) -> Managed<S>
352 where
353 F: FnOnce(T, &mut RecordKeeper) -> S,
354 S: Counted,
355 {
356 self.try_map(move |inner, records| Ok::<_, Infallible>(f(inner, records)))
357 .unwrap_or_else(|e| match e.0 {})
358 }
359
360 pub fn try_map<S, F, E>(self, f: F) -> Result<Managed<S>, Rejected<E::Error>>
368 where
369 F: FnOnce(T, &mut RecordKeeper) -> Result<S, E>,
370 S: Counted,
371 E: OutcomeError,
372 {
373 debug_assert!(!self.is_done());
374
375 let (value, meta) = self.destructure();
376 let quantities = value.quantities();
377
378 let mut records = RecordKeeper::new(&meta, quantities);
379
380 match f(value, &mut records) {
381 Ok(value) => {
382 records.success(value.quantities());
383 Ok(Managed::from_parts(value, meta))
384 }
385 Err(err) => Err(records.failure(err)),
386 }
387 }
388
389 pub fn modify<F>(&mut self, f: F)
393 where
394 F: FnOnce(&mut T, &mut RecordKeeper),
395 {
396 self.try_modify(move |inner, records| {
397 f(inner, records);
398 Ok::<_, Infallible>(())
399 })
400 .unwrap_or_else(|e| match e {})
401 }
402
403 pub fn try_modify<F, E>(&mut self, f: F) -> Result<(), Rejected<E::Error>>
411 where
412 F: FnOnce(&mut T, &mut RecordKeeper) -> Result<(), E>,
413 E: OutcomeError,
414 {
415 debug_assert!(!self.is_done());
416
417 let quantities = self.value.quantities();
418 let mut records = RecordKeeper::new(&self.meta, quantities);
419
420 match f(&mut self.value, &mut records) {
421 Ok(()) => {
422 records.success(self.value.quantities());
423 Ok(())
424 }
425 Err(err) => {
426 let err = records.failure(err);
427 self.done.store(true, Ordering::Relaxed);
428 Err(err)
429 }
430 }
431 }
432
433 pub fn accept<F, S>(self, f: F) -> S
441 where
442 F: FnOnce(T) -> S,
443 {
444 self.try_accept(|item| Ok::<_, Infallible>(f(item)))
445 .unwrap_or_else(|err| match err.0 {})
446 }
447
448 pub fn try_accept<F, S, E>(self, f: F) -> Result<S, Rejected<E::Error>>
456 where
457 F: FnOnce(T) -> Result<S, E>,
458 E: OutcomeError,
459 {
460 debug_assert!(!self.is_done());
461
462 let (value, meta) = self.destructure();
463 let records = RecordKeeper::new(&meta, value.quantities());
464
465 match f(value) {
466 Ok(value) => {
467 records.accept();
468 Ok(value)
469 }
470 Err(err) => Err(records.failure(err)),
471 }
472 }
473
474 #[track_caller]
482 pub fn internal_error(&self, reason: &'static str) -> Rejected<()> {
483 relay_log::error!("internal error: {reason}");
484 debug_assert!(false, "internal error: {reason}");
485 self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
486 }
487
488 pub fn reject_err<E>(&self, error: E) -> Rejected<E::Error>
490 where
491 E: OutcomeError,
492 {
493 debug_assert!(!self.is_done());
494
495 let (outcome, error) = error.consume();
496 if let Some(outcome) = outcome {
497 self.do_reject(outcome);
498 }
499 Rejected(error)
500 }
501
502 fn do_reject(&self, outcome: Outcome) {
503 if !self.done.fetch_or(true, Ordering::Relaxed) {
504 for (category, quantity) in self.value.quantities() {
505 self.meta.track_outcome(outcome.clone(), category, quantity);
506 }
507 }
508 }
509
510 fn destructure(self) -> (T, Arc<Meta>) {
514 let this = ManuallyDrop::new(self);
524 let value = unsafe { std::ptr::read(&this.value) };
525 let meta = unsafe { std::ptr::read(&this.meta) };
526 (value, meta)
527 }
528
529 fn from_parts(value: T, meta: Arc<Meta>) -> Self {
530 Self {
531 value,
532 meta,
533 done: AtomicBool::new(false),
534 }
535 }
536
537 fn is_done(&self) -> bool {
538 self.done.load(Ordering::Relaxed)
539 }
540}
541
542impl<T: Counted> Drop for Managed<T> {
543 fn drop(&mut self) {
544 self.do_reject(Outcome::Invalid(DiscardReason::Internal));
545 }
546}
547
548impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
549 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
550 write!(f, "Managed<{}>[", std::any::type_name::<T>())?;
551 for (i, (category, quantity)) in self.value.quantities().iter().enumerate() {
552 if i > 0 {
553 write!(f, ",")?;
554 }
555 write!(f, "{category}:{quantity}")?;
556 }
557 write!(f, "](")?;
558 self.value.fmt(f)?;
559 write!(f, ")")
560 }
561}
562
563impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
564 fn from(value: Managed<Box<Envelope>>) -> Self {
565 let (value, meta) = value.destructure();
566 let mut envelope = ManagedEnvelope::new(value, meta.outcome_aggregator.clone());
567 envelope.scope(meta.scoping);
568 envelope
569 }
570}
571
572impl<T: Counted> AsRef<T> for Managed<T> {
573 fn as_ref(&self) -> &T {
574 &self.value
575 }
576}
577
578impl<T: Counted> std::ops::Deref for Managed<T> {
579 type Target = T;
580
581 fn deref(&self) -> &Self::Target {
582 &self.value
583 }
584}
585
586struct Meta {
588 outcome_aggregator: Addr<TrackOutcome>,
590 received_at: DateTime<Utc>,
594 scoping: Scoping,
596 event_id: Option<EventId>,
598 remote_addr: Option<IpAddr>,
600}
601
602impl Meta {
603 pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
604 self.outcome_aggregator.send(TrackOutcome {
605 timestamp: self.received_at,
606 scoping: self.scoping,
607 outcome,
608 event_id: self.event_id,
609 remote_addr: self.remote_addr,
610 category,
611 quantity: quantity.try_into().unwrap_or(u32::MAX),
612 });
613 }
614}
615
616pub struct RecordKeeper<'a> {
619 meta: &'a Meta,
620 on_drop: Quantities,
621 #[cfg(debug_assertions)]
622 lenient: SmallVec<[DataCategory; 1]>,
623 #[cfg(debug_assertions)]
624 modifications: BTreeMap<DataCategory, isize>,
625 in_flight: SmallVec<[(DataCategory, usize, Option<Outcome>); 2]>,
626}
627
628impl<'a> RecordKeeper<'a> {
629 fn new(meta: &'a Meta, quantities: Quantities) -> Self {
630 Self {
631 meta,
632 on_drop: quantities,
633 #[cfg(debug_assertions)]
634 lenient: Default::default(),
635 #[cfg(debug_assertions)]
636 modifications: Default::default(),
637 in_flight: Default::default(),
638 }
639 }
640
641 pub fn lenient(&mut self, category: DataCategory) {
648 let _category = category;
649 #[cfg(debug_assertions)]
650 self.lenient.push(_category);
651 }
652
653 pub fn modify_by(&mut self, category: DataCategory, offset: isize) {
661 let _category = category;
662 let _offset = offset;
663 #[cfg(debug_assertions)]
664 {
665 *self.modifications.entry(_category).or_default() += offset;
666 }
667 }
668
669 fn failure<E>(mut self, error: E) -> Rejected<E::Error>
673 where
674 E: OutcomeError,
675 {
676 let (outcome, error) = error.consume();
677
678 if let Some(outcome) = outcome {
679 for (category, quantity) in std::mem::take(&mut self.on_drop) {
680 self.meta.track_outcome(outcome.clone(), category, quantity);
681 }
682 }
683
684 Rejected(error)
685 }
686
687 fn accept(mut self) {
695 debug_assert!(
696 self.in_flight.is_empty(),
697 "records accepted, but intermediate outcomes tracked"
698 );
699 self.on_drop.clear();
700 }
701
702 fn success(mut self, new: Quantities) {
708 let original = std::mem::take(&mut self.on_drop);
709 self.assert_quantities(original, new);
710
711 self.on_drop.clear();
712 for (category, quantity, outcome) in std::mem::take(&mut self.in_flight) {
713 if let Some(outcome) = outcome {
714 self.meta.track_outcome(outcome, category, quantity);
715 }
716 }
717 }
718
719 #[cfg(debug_assertions)]
724 fn assert_quantities(&self, original: Quantities, new: Quantities) {
725 macro_rules! emit {
726 ($category:expr, $($tt:tt)*) => {{
727 match self.lenient.contains(&$category) {
728 true => relay_log::debug!($($tt)*),
731 false => {
732 relay_log::error!("Original: {original:?}");
733 relay_log::error!("New: {new:?}");
734 relay_log::error!("Modifications: {:?}", self.modifications);
735 relay_log::error!("In Flight: {:?}", self.in_flight);
736 panic!($($tt)*)
737 }
738 }
739 }};
740 }
741
742 let mut sums = debug::Quantities::from(&original).0;
743 for (category, offset) in &self.modifications {
744 let v = sums.entry(*category).or_default();
745 match v.checked_add_signed(*offset) {
746 Some(result) => *v = result,
747 None => emit!(
748 category,
749 "Attempted to modify original quantity {v} into the negative ({offset})"
750 ),
751 }
752 }
753
754 for (category, quantity, outcome) in &self.in_flight {
755 match sums.get_mut(category) {
756 Some(c) if *c >= *quantity => *c -= *quantity,
757 Some(c) => emit!(
758 category,
759 "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there were only {c} items in the category originally"
760 ),
761 None => emit!(
762 category,
763 "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there never was an item in this category"
764 ),
765 }
766 }
767
768 for (category, quantity) in &new {
769 match sums.get_mut(category) {
770 Some(c) if *c >= *quantity => *c -= *quantity,
771 Some(c) => emit!(
772 category,
773 "New item has {quantity} items in category '{category}', but original (after emitted outcomes) only has {c} left"
774 ),
775 None => emit!(
776 category,
777 "New item has {quantity} items in category '{category}', but after emitted outcomes there are none left"
778 ),
779 }
780 }
781
782 for (category, quantity) in sums {
783 if quantity > 0 {
784 emit!(
785 category,
786 "Missing outcomes or mismatched quantity in category '{category}', off by {quantity}"
787 );
788 }
789 }
790 }
791
792 #[cfg(not(debug_assertions))]
793 fn assert_quantities(&self, _: Quantities, _: Quantities) {}
794}
795
796impl<'a> Drop for RecordKeeper<'a> {
797 fn drop(&mut self) {
798 for (category, quantity) in std::mem::take(&mut self.on_drop) {
799 self.meta.track_outcome(
800 Outcome::Invalid(DiscardReason::Internal),
801 category,
802 quantity,
803 );
804 }
805 }
806}
807
808impl RecordKeeper<'_> {
809 pub fn or_default<T, E, Q>(&mut self, r: Result<T, E>, q: Q) -> T
814 where
815 T: Default,
816 E: OutcomeError,
817 Q: Counted,
818 {
819 match r {
820 Ok(result) => result,
821 Err(err) => {
822 self.reject_err(err, q);
823 T::default()
824 }
825 }
826 }
827
828 pub fn reject_err<E, Q>(&mut self, err: E, q: Q) -> E::Error
833 where
834 E: OutcomeError,
835 Q: Counted,
836 {
837 let (outcome, err) = err.consume();
838 for (category, quantity) in q.quantities() {
839 self.in_flight.push((category, quantity, outcome.clone()))
840 }
841 err
842 }
843
844 #[track_caller]
848 pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
849 where
850 E: std::error::Error + 'static,
851 Q: Counted,
852 {
853 relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
854 debug_assert!(false, "internal error: {error}");
855 self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
856 }
857}
858
859pub struct Split<I, S>
861where
862 I: Iterator<Item = S>,
863 S: Counted,
864{
865 #[cfg(debug_assertions)]
866 quantities: Quantities,
867 items: I,
868 meta: Arc<Meta>,
869 exhausted: bool,
870}
871
872impl<I, S> Split<I, S>
873where
874 I: Iterator<Item = S>,
875 S: Counted,
876{
877 #[cfg(debug_assertions)]
880 fn subtract(&mut self, q: Quantities) {
881 for (category, quantities) in q {
882 let Some(orig_quantities) = self
883 .quantities
884 .iter_mut()
885 .find_map(|(c, q)| (*c == category).then_some(q))
886 else {
887 debug_assert!(
888 false,
889 "mismatching quantities, item split into category {category}, \
890 which originally was not present"
891 );
892 continue;
893 };
894
895 if *orig_quantities >= quantities {
896 *orig_quantities -= quantities;
897 } else {
898 debug_assert!(
899 false,
900 "in total more items produced in category {category} than originally available"
901 );
902 }
903 }
904 }
905}
906
907impl<I, S> Iterator for Split<I, S>
908where
909 I: Iterator<Item = S>,
910 S: Counted,
911{
912 type Item = Managed<S>;
913
914 fn next(&mut self) -> Option<Self::Item> {
915 let next = match self.items.next() {
916 Some(next) => next,
917 None => {
918 self.exhausted = true;
919 return None;
920 }
921 };
922
923 #[cfg(debug_assertions)]
924 self.subtract(next.quantities());
925
926 Some(Managed::from_parts(next, Arc::clone(&self.meta)))
927 }
928}
929
930impl<I, S> Drop for Split<I, S>
931where
932 I: Iterator<Item = S>,
933 S: Counted,
934{
935 fn drop(&mut self) {
936 #[cfg(debug_assertions)]
938 if self.exhausted {
939 for (category, quantities) in &self.quantities {
940 debug_assert!(
941 *quantities == 0,
942 "items split, but still {quantities} remaining in category {category}"
943 );
944 }
945 }
946
947 if self.exhausted {
948 return;
949 }
950
951 for item in &mut self.items {
957 for (category, quantity) in item.quantities() {
958 self.meta.track_outcome(
959 Outcome::Invalid(DiscardReason::Internal),
960 category,
961 quantity,
962 );
963 }
964 }
965 }
966}
967
968impl<I, S> FusedIterator for Split<I, S>
969where
970 I: Iterator<Item = S> + FusedIterator,
971 S: Counted,
972{
973}
974
975#[cfg(test)]
976mod tests {
977 use super::*;
978
979 struct CountedVec(Vec<u32>);
980
981 impl Counted for CountedVec {
982 fn quantities(&self) -> Quantities {
983 smallvec::smallvec![(DataCategory::Error, self.0.len())]
984 }
985 }
986
987 struct CountedValue(u32);
988
989 impl Counted for CountedValue {
990 fn quantities(&self) -> Quantities {
991 smallvec::smallvec![(DataCategory::Error, 1)]
992 }
993 }
994
995 #[test]
996 fn test_split_fully_consumed() {
997 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
998 let (managed, mut handle) = Managed::for_test(value).build();
999
1000 let s = managed
1001 .split(|value| value.0.into_iter().map(CountedValue))
1002 .collect::<Vec<_>>();
1004
1005 handle.assert_no_outcomes();
1006
1007 for (i, s) in s.into_iter().enumerate() {
1008 assert_eq!(s.as_ref().0, i as u32);
1009 let outcome = Outcome::Invalid(DiscardReason::Cors);
1010 let _ = s.reject_err((outcome.clone(), ()));
1011 handle.assert_outcome(&outcome, DataCategory::Error, 1);
1012 }
1013 }
1014
1015 #[test]
1016 fn test_split_partially_consumed_emits_remaining() {
1017 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1018 let (managed, mut handle) = Managed::for_test(value).build();
1019
1020 let mut s = managed.split(|value| value.0.into_iter().map(CountedValue));
1021 handle.assert_no_outcomes();
1022
1023 drop(s.next());
1024 handle.assert_internal_outcome(DataCategory::Error, 1);
1025 drop(s.next());
1026 handle.assert_internal_outcome(DataCategory::Error, 1);
1027 drop(s.next());
1028 handle.assert_internal_outcome(DataCategory::Error, 1);
1029 handle.assert_no_outcomes();
1030
1031 drop(s);
1032
1033 handle.assert_internal_outcome(DataCategory::Error, 1);
1034 handle.assert_internal_outcome(DataCategory::Error, 1);
1035 handle.assert_internal_outcome(DataCategory::Error, 1);
1036 }
1037
1038 #[test]
1039 fn test_split_changing_quantities_should_panic() {
1040 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1041 let (managed, mut handle) = Managed::for_test(value).build();
1042
1043 let mut s = managed.split(|_| std::iter::once(CountedValue(0)));
1044
1045 s.next().unwrap().accept(|_| {});
1046 handle.assert_no_outcomes();
1047
1048 assert!(s.next().is_none());
1049
1050 let r = std::panic::catch_unwind(move || {
1051 drop(s);
1052 });
1053
1054 assert!(
1055 r.is_err(),
1056 "expected split to panic because of mismatched (not enough) outcomes"
1057 );
1058 }
1059
1060 #[test]
1061 fn test_split_more_outcomes_than_before_should_panic() {
1062 let value = CountedVec(vec![0]);
1063 let (managed, mut handle) = Managed::for_test(value).build();
1064
1065 let mut s = managed.split(|_| vec![CountedValue(0), CountedValue(2)].into_iter());
1066
1067 s.next().unwrap().accept(|_| {});
1068 handle.assert_no_outcomes();
1069
1070 let r = std::panic::catch_unwind(move || {
1071 s.next();
1072 });
1073
1074 assert!(
1075 r.is_err(),
1076 "expected split to panic because of mismatched (too many) outcomes"
1077 );
1078 }
1079
1080 #[test]
1081 fn test_split_changing_categories_should_panic() {
1082 struct Special;
1083 impl Counted for Special {
1084 fn quantities(&self) -> Quantities {
1085 smallvec::smallvec![(DataCategory::Error, 1), (DataCategory::Transaction, 1)]
1086 }
1087 }
1088
1089 let value = CountedVec(vec![0]);
1090 let (managed, _handle) = Managed::for_test(value).build();
1091
1092 let mut s = managed.split(|value| value.0.into_iter().map(|_| Special));
1093
1094 let r = std::panic::catch_unwind(move || {
1095 let _ = s.next();
1096 });
1097
1098 assert!(
1099 r.is_err(),
1100 "expected split to panic because of mismatched outcome categories"
1101 );
1102 }
1103
1104 #[test]
1105 fn test_split_assert_fused() {
1106 fn only_fused<T: FusedIterator>(_: T) {}
1107
1108 let (managed, mut handle) = Managed::for_test(CountedVec(vec![0])).build();
1109 only_fused(managed.split(|value| value.0.into_iter().map(CountedValue)));
1110 handle.assert_internal_outcome(DataCategory::Error, 1);
1111 }
1112}