1#[cfg(debug_assertions)]
2use std::collections::BTreeMap;
3use std::convert::Infallible;
4use std::fmt;
5use std::iter::FusedIterator;
6use std::mem::ManuallyDrop;
7use std::net::IpAddr;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use chrono::{DateTime, Utc};
12use relay_event_schema::protocol::EventId;
13use relay_quotas::{DataCategory, Scoping};
14use relay_system::Addr;
15use smallvec::SmallVec;
16
17use crate::Envelope;
18use crate::managed::{Counted, ManagedEnvelope, Quantities};
19use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
20use crate::services::processor::ProcessingError;
21
22#[cfg(debug_assertions)]
23mod debug;
24#[cfg(test)]
25mod test;
26
27#[cfg(test)]
28pub use self::test::*;
29
30pub trait OutcomeError {
32 type Error;
34
35 fn consume(self) -> (Option<Outcome>, Self::Error);
39}
40
41impl OutcomeError for Outcome {
42 type Error = ();
43
44 fn consume(self) -> (Option<Outcome>, Self::Error) {
45 (self, ()).consume()
46 }
47}
48
49impl<E> OutcomeError for (Outcome, E) {
50 type Error = E;
51
52 fn consume(self) -> (Option<Outcome>, Self::Error) {
53 (Some(self.0), self.1)
54 }
55}
56
57impl<E> OutcomeError for (Option<Outcome>, E) {
58 type Error = E;
59
60 fn consume(self) -> (Option<Outcome>, Self::Error) {
61 self
62 }
63}
64
65impl OutcomeError for ProcessingError {
66 type Error = Self;
67
68 fn consume(self) -> (Option<Outcome>, Self::Error) {
69 (self.to_outcome(), self)
70 }
71}
72
73impl OutcomeError for Infallible {
74 type Error = Self;
75
76 fn consume(self) -> (Option<Outcome>, Self::Error) {
77 match self {}
78 }
79}
80
81#[derive(Debug, Clone, Copy)]
86#[must_use = "a rejection must be propagated"]
87pub struct Rejected<T>(T);
88
89impl<T> Rejected<T> {
90 pub fn into_inner(self) -> T {
92 self.0
93 }
94
95 pub fn map<F, S>(self, f: F) -> Rejected<S>
97 where
98 F: FnOnce(T) -> S,
99 {
100 Rejected(f(self.0))
101 }
102}
103
104pub struct Managed<T: Counted> {
106 value: T,
107 meta: Arc<Meta>,
108 done: AtomicBool,
109}
110
111impl<T: Counted> Managed<T> {
112 pub fn from_envelope(envelope: &ManagedEnvelope, value: T) -> Self {
117 Self::from_parts(
118 value,
119 Arc::new(Meta {
120 outcome_aggregator: envelope.outcome_aggregator().clone(),
121 received_at: envelope.received_at(),
122 scoping: envelope.scoping(),
123 event_id: envelope.envelope().event_id(),
124 remote_addr: envelope.meta().remote_addr(),
125 }),
126 )
127 }
128
129 pub fn wrap<S>(&self, other: S) -> Managed<S>
131 where
132 S: Counted,
133 {
134 Managed::from_parts(other, Arc::clone(&self.meta))
135 }
136
137 pub fn received_at(&self) -> DateTime<Utc> {
139 self.meta.received_at
140 }
141
142 pub fn scoping(&self) -> Scoping {
144 self.meta.scoping
145 }
146
147 pub fn split_once<F, S, U>(self, f: F) -> (Managed<S>, Managed<U>)
153 where
154 F: FnOnce(T) -> (S, U),
155 S: Counted,
156 U: Counted,
157 {
158 debug_assert!(!self.is_done());
159
160 let (value, meta) = self.destructure();
161 #[cfg(debug_assertions)]
162 let quantities = value.quantities();
163
164 let (a, b) = f(value);
165
166 #[cfg(debug_assertions)]
167 debug::Quantities::from(&quantities)
168 .assert_only_extra(debug::Quantities::from(&a) + debug::Quantities::from(&b));
173
174 (
175 Managed::from_parts(a, Arc::clone(&meta)),
176 Managed::from_parts(b, meta),
177 )
178 }
179
180 pub fn split<F, I, S>(self, f: F) -> Split<I::IntoIter, I::Item>
185 where
186 F: FnOnce(T) -> I,
187 I: IntoIterator<Item = S>,
188 S: Counted,
189 {
190 self.split_with_context(|value| (f(value), ())).0
191 }
192
193 pub fn split_with_context<F, I, S, C>(self, f: F) -> (Split<I::IntoIter, I::Item>, C)
198 where
199 F: FnOnce(T) -> (I, C),
200 I: IntoIterator<Item = S>,
201 S: Counted,
202 {
203 debug_assert!(!self.is_done());
204
205 let (value, meta) = self.destructure();
206 #[cfg(debug_assertions)]
207 let quantities = value.quantities();
208
209 let (items, context) = f(value);
210
211 (
212 Split {
213 #[cfg(debug_assertions)]
214 quantities,
215 items: items.into_iter(),
216 meta,
217 exhausted: false,
218 },
219 context,
220 )
221 }
222
223 pub fn retain<S, I, U, E>(&mut self, select: S, mut retain: U)
259 where
260 S: FnOnce(&mut T) -> &mut Vec<I>,
261 I: Counted,
262 U: FnMut(&mut I, &mut RecordKeeper<'_>) -> Result<(), E>,
263 E: OutcomeError,
264 {
265 self.retain_with_context(
266 |inner| (select(inner), &()),
267 |item, _, records| retain(item, records),
268 );
269 }
270
271 pub fn retain_with_context<S, C, I, U, E>(&mut self, select: S, mut retain: U)
308 where
309 for<'a> S: FnOnce(&'a mut T) -> (&'a mut Vec<I>, &'a C),
314 I: Counted,
315 U: FnMut(&mut I, &C, &mut RecordKeeper<'_>) -> Result<(), E>,
316 E: OutcomeError,
317 {
318 self.modify(|inner, records| {
319 let (items, ctx) = select(inner);
320 items.retain_mut(|item| match retain(item, ctx, records) {
321 Ok(()) => true,
322 Err(err) => {
323 records.reject_err(err, &*item);
324 false
325 }
326 })
327 });
328 }
329
330 pub fn map<S, F>(self, f: F) -> Managed<S>
334 where
335 F: FnOnce(T, &mut RecordKeeper) -> S,
336 S: Counted,
337 {
338 self.try_map(move |inner, records| Ok::<_, Infallible>(f(inner, records)))
339 .unwrap_or_else(|e| match e.0 {})
340 }
341
342 pub fn try_map<S, F, E>(self, f: F) -> Result<Managed<S>, Rejected<E::Error>>
350 where
351 F: FnOnce(T, &mut RecordKeeper) -> Result<S, E>,
352 S: Counted,
353 E: OutcomeError,
354 {
355 debug_assert!(!self.is_done());
356
357 let (value, meta) = self.destructure();
358 let quantities = value.quantities();
359
360 let mut records = RecordKeeper::new(&meta, quantities);
361
362 match f(value, &mut records) {
363 Ok(value) => {
364 records.success(value.quantities());
365 Ok(Managed::from_parts(value, meta))
366 }
367 Err(err) => Err(records.failure(err)),
368 }
369 }
370
371 pub fn modify<F>(&mut self, f: F)
375 where
376 F: FnOnce(&mut T, &mut RecordKeeper),
377 {
378 self.try_modify(move |inner, records| {
379 f(inner, records);
380 Ok::<_, Infallible>(())
381 })
382 .unwrap_or_else(|e| match e {})
383 }
384
385 pub fn try_modify<F, E>(&mut self, f: F) -> Result<(), Rejected<E::Error>>
393 where
394 F: FnOnce(&mut T, &mut RecordKeeper) -> Result<(), E>,
395 E: OutcomeError,
396 {
397 debug_assert!(!self.is_done());
398
399 let quantities = self.value.quantities();
400 let mut records = RecordKeeper::new(&self.meta, quantities);
401
402 match f(&mut self.value, &mut records) {
403 Ok(()) => {
404 records.success(self.value.quantities());
405 Ok(())
406 }
407 Err(err) => {
408 let err = records.failure(err);
409 self.done.store(true, Ordering::Relaxed);
410 Err(err)
411 }
412 }
413 }
414
415 pub fn accept<F, S>(self, f: F) -> S
423 where
424 F: FnOnce(T) -> S,
425 {
426 self.try_accept(|item| Ok::<_, Infallible>(f(item)))
427 .unwrap_or_else(|err| match err.0 {})
428 }
429
430 pub fn try_accept<F, S, E>(self, f: F) -> Result<S, Rejected<E::Error>>
438 where
439 F: FnOnce(T) -> Result<S, E>,
440 E: OutcomeError,
441 {
442 debug_assert!(!self.is_done());
443
444 let (value, meta) = self.destructure();
445 let records = RecordKeeper::new(&meta, value.quantities());
446
447 match f(value) {
448 Ok(value) => {
449 records.accept();
450 Ok(value)
451 }
452 Err(err) => Err(records.failure(err)),
453 }
454 }
455
456 #[track_caller]
464 pub fn internal_error(&self, reason: &'static str) -> Rejected<()> {
465 relay_log::error!("internal error: {reason}");
466 debug_assert!(false, "internal error: {reason}");
467 self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
468 }
469
470 pub fn reject_err<E>(&self, error: E) -> Rejected<E::Error>
472 where
473 E: OutcomeError,
474 {
475 debug_assert!(!self.is_done());
476
477 let (outcome, error) = error.consume();
478 if let Some(outcome) = outcome {
479 self.do_reject(outcome);
480 }
481 Rejected(error)
482 }
483
484 fn do_reject(&self, outcome: Outcome) {
485 if !self.done.fetch_or(true, Ordering::Relaxed) {
486 for (category, quantity) in self.value.quantities() {
487 self.meta.track_outcome(outcome.clone(), category, quantity);
488 }
489 }
490 }
491
492 fn destructure(self) -> (T, Arc<Meta>) {
496 let this = ManuallyDrop::new(self);
506 let value = unsafe { std::ptr::read(&this.value) };
507 let meta = unsafe { std::ptr::read(&this.meta) };
508 (value, meta)
509 }
510
511 fn from_parts(value: T, meta: Arc<Meta>) -> Self {
512 Self {
513 value,
514 meta,
515 done: AtomicBool::new(false),
516 }
517 }
518
519 fn is_done(&self) -> bool {
520 self.done.load(Ordering::Relaxed)
521 }
522}
523
524impl<T: Counted> Drop for Managed<T> {
525 fn drop(&mut self) {
526 self.do_reject(Outcome::Invalid(DiscardReason::Internal));
527 }
528}
529
530impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
531 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
532 write!(f, "Managed<{}>[", std::any::type_name::<T>())?;
533 for (i, (category, quantity)) in self.value.quantities().iter().enumerate() {
534 if i > 0 {
535 write!(f, ",")?;
536 }
537 write!(f, "{category}:{quantity}")?;
538 }
539 write!(f, "](")?;
540 self.value.fmt(f)?;
541 write!(f, ")")
542 }
543}
544
545impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
546 fn from(value: Managed<Box<Envelope>>) -> Self {
547 let (value, meta) = value.destructure();
548 let mut envelope = ManagedEnvelope::new(value, meta.outcome_aggregator.clone());
549 envelope.scope(meta.scoping);
550 envelope
551 }
552}
553
554impl<T: Counted> AsRef<T> for Managed<T> {
555 fn as_ref(&self) -> &T {
556 &self.value
557 }
558}
559
560impl<T: Counted> std::ops::Deref for Managed<T> {
561 type Target = T;
562
563 fn deref(&self) -> &Self::Target {
564 &self.value
565 }
566}
567
568struct Meta {
570 outcome_aggregator: Addr<TrackOutcome>,
572 received_at: DateTime<Utc>,
576 scoping: Scoping,
578 event_id: Option<EventId>,
580 remote_addr: Option<IpAddr>,
582}
583
584impl Meta {
585 pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
586 self.outcome_aggregator.send(TrackOutcome {
587 timestamp: self.received_at,
588 scoping: self.scoping,
589 outcome,
590 event_id: self.event_id,
591 remote_addr: self.remote_addr,
592 category,
593 quantity: quantity.try_into().unwrap_or(u32::MAX),
594 });
595 }
596}
597
598pub struct RecordKeeper<'a> {
601 meta: &'a Meta,
602 on_drop: Quantities,
603 #[cfg(debug_assertions)]
604 lenient: SmallVec<[DataCategory; 1]>,
605 #[cfg(debug_assertions)]
606 modifications: BTreeMap<DataCategory, isize>,
607 in_flight: SmallVec<[(DataCategory, usize, Option<Outcome>); 2]>,
608}
609
610impl<'a> RecordKeeper<'a> {
611 fn new(meta: &'a Meta, quantities: Quantities) -> Self {
612 Self {
613 meta,
614 on_drop: quantities,
615 #[cfg(debug_assertions)]
616 lenient: Default::default(),
617 #[cfg(debug_assertions)]
618 modifications: Default::default(),
619 in_flight: Default::default(),
620 }
621 }
622
623 pub fn lenient(&mut self, category: DataCategory) {
630 let _category = category;
631 #[cfg(debug_assertions)]
632 self.lenient.push(_category);
633 }
634
635 pub fn modify_by(&mut self, category: DataCategory, offset: isize) {
643 let _category = category;
644 let _offset = offset;
645 #[cfg(debug_assertions)]
646 {
647 *self.modifications.entry(_category).or_default() += offset;
648 }
649 }
650
651 fn failure<E>(mut self, error: E) -> Rejected<E::Error>
655 where
656 E: OutcomeError,
657 {
658 let (outcome, error) = error.consume();
659
660 if let Some(outcome) = outcome {
661 for (category, quantity) in std::mem::take(&mut self.on_drop) {
662 self.meta.track_outcome(outcome.clone(), category, quantity);
663 }
664 }
665
666 Rejected(error)
667 }
668
669 fn accept(mut self) {
677 debug_assert!(
678 self.in_flight.is_empty(),
679 "records accepted, but intermediate outcomes tracked"
680 );
681 self.on_drop.clear();
682 }
683
684 fn success(mut self, new: Quantities) {
690 let original = std::mem::take(&mut self.on_drop);
691 self.assert_quantities(original, new);
692
693 self.on_drop.clear();
694 for (category, quantity, outcome) in std::mem::take(&mut self.in_flight) {
695 if let Some(outcome) = outcome {
696 self.meta.track_outcome(outcome, category, quantity);
697 }
698 }
699 }
700
701 #[cfg(debug_assertions)]
706 fn assert_quantities(&self, original: Quantities, new: Quantities) {
707 macro_rules! emit {
708 ($category:expr, $($tt:tt)*) => {{
709 match self.lenient.contains(&$category) {
710 true => relay_log::debug!($($tt)*),
713 false => {
714 relay_log::error!("Original: {original:?}");
715 relay_log::error!("New: {new:?}");
716 relay_log::error!("Modifications: {:?}", self.modifications);
717 relay_log::error!("In Flight: {:?}", self.in_flight);
718 panic!($($tt)*)
719 }
720 }
721 }};
722 }
723
724 let mut sums = debug::Quantities::from(&original).0;
725 for (category, offset) in &self.modifications {
726 let v = sums.entry(*category).or_default();
727 match v.checked_add_signed(*offset) {
728 Some(result) => *v = result,
729 None => emit!(
730 category,
731 "Attempted to modify original quantity {v} into the negative ({offset})"
732 ),
733 }
734 }
735
736 for (category, quantity, outcome) in &self.in_flight {
737 match sums.get_mut(category) {
738 Some(c) if *c >= *quantity => *c -= *quantity,
739 Some(c) => emit!(
740 category,
741 "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there were only {c} items in the category originally"
742 ),
743 None => emit!(
744 category,
745 "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there never was an item in this category"
746 ),
747 }
748 }
749
750 for (category, quantity) in &new {
751 match sums.get_mut(category) {
752 Some(c) if *c >= *quantity => *c -= *quantity,
753 Some(c) => emit!(
754 category,
755 "New item has {quantity} items in category '{category}', but original (after emitted outcomes) only has {c} left"
756 ),
757 None => emit!(
758 category,
759 "New item has {quantity} items in category '{category}', but after emitted outcomes there are none left"
760 ),
761 }
762 }
763
764 for (category, quantity) in sums {
765 if quantity > 0 {
766 emit!(
767 category,
768 "Missing outcomes or mismatched quantity in category '{category}', off by {quantity}"
769 );
770 }
771 }
772 }
773
774 #[cfg(not(debug_assertions))]
775 fn assert_quantities(&self, _: Quantities, _: Quantities) {}
776}
777
778impl<'a> Drop for RecordKeeper<'a> {
779 fn drop(&mut self) {
780 for (category, quantity) in std::mem::take(&mut self.on_drop) {
781 self.meta.track_outcome(
782 Outcome::Invalid(DiscardReason::Internal),
783 category,
784 quantity,
785 );
786 }
787 }
788}
789
790impl RecordKeeper<'_> {
791 pub fn or_default<T, E, Q>(&mut self, r: Result<T, E>, q: Q) -> T
796 where
797 T: Default,
798 E: OutcomeError,
799 Q: Counted,
800 {
801 match r {
802 Ok(result) => result,
803 Err(err) => {
804 self.reject_err(err, q);
805 T::default()
806 }
807 }
808 }
809
810 pub fn reject_err<E, Q>(&mut self, err: E, q: Q) -> E::Error
815 where
816 E: OutcomeError,
817 Q: Counted,
818 {
819 let (outcome, err) = err.consume();
820 for (category, quantity) in q.quantities() {
821 self.in_flight.push((category, quantity, outcome.clone()))
822 }
823 err
824 }
825
826 #[track_caller]
830 pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
831 where
832 E: std::error::Error + 'static,
833 Q: Counted,
834 {
835 relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
836 debug_assert!(false, "internal error: {error}");
837 self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
838 }
839}
840
841pub struct Split<I, S>
843where
844 I: Iterator<Item = S>,
845 S: Counted,
846{
847 #[cfg(debug_assertions)]
848 quantities: Quantities,
849 items: I,
850 meta: Arc<Meta>,
851 exhausted: bool,
852}
853
854impl<I, S> Split<I, S>
855where
856 I: Iterator<Item = S>,
857 S: Counted,
858{
859 #[cfg(debug_assertions)]
862 fn subtract(&mut self, q: Quantities) {
863 for (category, quantities) in q {
864 let Some(orig_quantities) = self
865 .quantities
866 .iter_mut()
867 .find_map(|(c, q)| (*c == category).then_some(q))
868 else {
869 debug_assert!(
870 false,
871 "mismatching quantities, item split into category {category}, \
872 which originally was not present"
873 );
874 continue;
875 };
876
877 if *orig_quantities >= quantities {
878 *orig_quantities -= quantities;
879 } else {
880 debug_assert!(
881 false,
882 "in total more items produced in category {category} than originally available"
883 );
884 }
885 }
886 }
887}
888
889impl<I, S> Iterator for Split<I, S>
890where
891 I: Iterator<Item = S>,
892 S: Counted,
893{
894 type Item = Managed<S>;
895
896 fn next(&mut self) -> Option<Self::Item> {
897 let next = match self.items.next() {
898 Some(next) => next,
899 None => {
900 self.exhausted = true;
901 return None;
902 }
903 };
904
905 #[cfg(debug_assertions)]
906 self.subtract(next.quantities());
907
908 Some(Managed::from_parts(next, Arc::clone(&self.meta)))
909 }
910}
911
912impl<I, S> Drop for Split<I, S>
913where
914 I: Iterator<Item = S>,
915 S: Counted,
916{
917 fn drop(&mut self) {
918 #[cfg(debug_assertions)]
920 if self.exhausted {
921 for (category, quantities) in &self.quantities {
922 debug_assert!(
923 *quantities == 0,
924 "items split, but still {quantities} remaining in category {category}"
925 );
926 }
927 }
928
929 if self.exhausted {
930 return;
931 }
932
933 for item in &mut self.items {
939 for (category, quantity) in item.quantities() {
940 self.meta.track_outcome(
941 Outcome::Invalid(DiscardReason::Internal),
942 category,
943 quantity,
944 );
945 }
946 }
947 }
948}
949
950impl<I, S> FusedIterator for Split<I, S>
951where
952 I: Iterator<Item = S> + FusedIterator,
953 S: Counted,
954{
955}
956
957#[cfg(test)]
958mod tests {
959 use super::*;
960
961 struct CountedVec(Vec<u32>);
962
963 impl Counted for CountedVec {
964 fn quantities(&self) -> Quantities {
965 smallvec::smallvec![(DataCategory::Error, self.0.len())]
966 }
967 }
968
969 struct CountedValue(u32);
970
971 impl Counted for CountedValue {
972 fn quantities(&self) -> Quantities {
973 smallvec::smallvec![(DataCategory::Error, 1)]
974 }
975 }
976
977 #[test]
978 fn test_split_fully_consumed() {
979 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
980 let (managed, mut handle) = Managed::for_test(value).build();
981
982 let s = managed
983 .split(|value| value.0.into_iter().map(CountedValue))
984 .collect::<Vec<_>>();
986
987 handle.assert_no_outcomes();
988
989 for (i, s) in s.into_iter().enumerate() {
990 assert_eq!(s.as_ref().0, i as u32);
991 let outcome = Outcome::Invalid(DiscardReason::Cors);
992 let _ = s.reject_err((outcome.clone(), ()));
993 handle.assert_outcome(&outcome, DataCategory::Error, 1);
994 }
995 }
996
997 #[test]
998 fn test_split_partially_consumed_emits_remaining() {
999 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1000 let (managed, mut handle) = Managed::for_test(value).build();
1001
1002 let mut s = managed.split(|value| value.0.into_iter().map(CountedValue));
1003 handle.assert_no_outcomes();
1004
1005 drop(s.next());
1006 handle.assert_internal_outcome(DataCategory::Error, 1);
1007 drop(s.next());
1008 handle.assert_internal_outcome(DataCategory::Error, 1);
1009 drop(s.next());
1010 handle.assert_internal_outcome(DataCategory::Error, 1);
1011 handle.assert_no_outcomes();
1012
1013 drop(s);
1014
1015 handle.assert_internal_outcome(DataCategory::Error, 1);
1016 handle.assert_internal_outcome(DataCategory::Error, 1);
1017 handle.assert_internal_outcome(DataCategory::Error, 1);
1018 }
1019
1020 #[test]
1021 fn test_split_changing_quantities_should_panic() {
1022 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1023 let (managed, mut handle) = Managed::for_test(value).build();
1024
1025 let mut s = managed.split(|_| std::iter::once(CountedValue(0)));
1026
1027 s.next().unwrap().accept(|_| {});
1028 handle.assert_no_outcomes();
1029
1030 assert!(s.next().is_none());
1031
1032 let r = std::panic::catch_unwind(move || {
1033 drop(s);
1034 });
1035
1036 assert!(
1037 r.is_err(),
1038 "expected split to panic because of mismatched (not enough) outcomes"
1039 );
1040 }
1041
1042 #[test]
1043 fn test_split_more_outcomes_than_before_should_panic() {
1044 let value = CountedVec(vec![0]);
1045 let (managed, mut handle) = Managed::for_test(value).build();
1046
1047 let mut s = managed.split(|_| vec![CountedValue(0), CountedValue(2)].into_iter());
1048
1049 s.next().unwrap().accept(|_| {});
1050 handle.assert_no_outcomes();
1051
1052 let r = std::panic::catch_unwind(move || {
1053 s.next();
1054 });
1055
1056 assert!(
1057 r.is_err(),
1058 "expected split to panic because of mismatched (too many) outcomes"
1059 );
1060 }
1061
1062 #[test]
1063 fn test_split_changing_categories_should_panic() {
1064 struct Special;
1065 impl Counted for Special {
1066 fn quantities(&self) -> Quantities {
1067 smallvec::smallvec![(DataCategory::Error, 1), (DataCategory::Transaction, 1)]
1068 }
1069 }
1070
1071 let value = CountedVec(vec![0]);
1072 let (managed, _handle) = Managed::for_test(value).build();
1073
1074 let mut s = managed.split(|value| value.0.into_iter().map(|_| Special));
1075
1076 let r = std::panic::catch_unwind(move || {
1077 let _ = s.next();
1078 });
1079
1080 assert!(
1081 r.is_err(),
1082 "expected split to panic because of mismatched outcome categories"
1083 );
1084 }
1085
1086 #[test]
1087 fn test_split_assert_fused() {
1088 fn only_fused<T: FusedIterator>(_: T) {}
1089
1090 let (managed, mut handle) = Managed::for_test(CountedVec(vec![0])).build();
1091 only_fused(managed.split(|value| value.0.into_iter().map(CountedValue)));
1092 handle.assert_internal_outcome(DataCategory::Error, 1);
1093 }
1094}