1#[cfg(debug_assertions)]
2use std::collections::BTreeMap;
3use std::convert::Infallible;
4use std::fmt;
5use std::iter::FusedIterator;
6use std::mem::ManuallyDrop;
7use std::net::IpAddr;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use chrono::{DateTime, Utc};
12use itertools::Either;
13use relay_event_schema::protocol::EventId;
14use relay_quotas::{DataCategory, Scoping};
15use relay_system::Addr;
16use smallvec::SmallVec;
17
18use crate::Envelope;
19use crate::managed::{Counted, ManagedEnvelope, Quantities};
20use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
21use crate::services::processor::ProcessingError;
22
23#[cfg(debug_assertions)]
24mod debug;
25#[cfg(test)]
26mod test;
27
28#[cfg(test)]
29pub use self::test::*;
30
31pub trait OutcomeError {
33 type Error;
35
36 fn consume(self) -> (Option<Outcome>, Self::Error);
40}
41
42impl OutcomeError for Outcome {
43 type Error = ();
44
45 fn consume(self) -> (Option<Outcome>, Self::Error) {
46 (self, ()).consume()
47 }
48}
49
50impl OutcomeError for Option<Outcome> {
51 type Error = ();
52
53 fn consume(self) -> (Option<Outcome>, Self::Error) {
54 (self, ()).consume()
55 }
56}
57
58impl<E> OutcomeError for (Outcome, E) {
59 type Error = E;
60
61 fn consume(self) -> (Option<Outcome>, Self::Error) {
62 (Some(self.0), self.1)
63 }
64}
65
66impl<E> OutcomeError for (Option<Outcome>, E) {
67 type Error = E;
68
69 fn consume(self) -> (Option<Outcome>, Self::Error) {
70 self
71 }
72}
73
74impl OutcomeError for ProcessingError {
75 type Error = Self;
76
77 fn consume(self) -> (Option<Outcome>, Self::Error) {
78 (self.to_outcome(), self)
79 }
80}
81
82impl OutcomeError for Infallible {
83 type Error = Self;
84
85 fn consume(self) -> (Option<Outcome>, Self::Error) {
86 match self {}
87 }
88}
89
90#[derive(Debug, Clone, Copy)]
95#[must_use = "a rejection must be propagated"]
96pub struct Rejected<T>(T);
97
98impl<T> Rejected<T> {
99 pub fn into_inner(self) -> T {
101 self.0
102 }
103
104 pub fn map<F, S>(self, f: F) -> Rejected<S>
106 where
107 F: FnOnce(T) -> S,
108 {
109 Rejected(f(self.0))
110 }
111}
112
113impl<T> std::error::Error for Rejected<T>
114where
115 T: std::error::Error,
116{
117 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
118 self.0.source()
119 }
120}
121
122impl<T> std::fmt::Display for Rejected<T>
123where
124 T: std::fmt::Display,
125{
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 self.0.fmt(f)
128 }
129}
130
131impl<T> axum::response::IntoResponse for Rejected<T>
132where
133 T: axum::response::IntoResponse,
134{
135 fn into_response(self) -> axum::response::Response {
136 self.0.into_response()
137 }
138}
139
140pub struct Managed<T: Counted> {
142 value: T,
143 meta: Arc<Meta>,
144 done: AtomicBool,
145}
146
147impl Managed<Box<Envelope>> {
148 pub fn from_envelope(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
150 let meta = Arc::new(Meta {
151 outcome_aggregator,
152 received_at: envelope.received_at(),
153 scoping: envelope.meta().get_partial_scoping().into_scoping(),
154 event_id: envelope.event_id(),
155 remote_addr: envelope.meta().remote_addr(),
156 });
157
158 Self::from_parts(envelope, meta)
159 }
160}
161
162impl<T: Counted> Managed<T> {
163 pub fn with_meta_from(envelope: &ManagedEnvelope, value: T) -> Self {
168 Self::from_parts(
169 value,
170 Arc::new(Meta {
171 outcome_aggregator: envelope.outcome_aggregator().clone(),
172 received_at: envelope.received_at(),
173 scoping: envelope.scoping(),
174 event_id: envelope.envelope().event_id(),
175 remote_addr: envelope.meta().remote_addr(),
176 }),
177 )
178 }
179
180 pub fn wrap<S>(&self, other: S) -> Managed<S>
182 where
183 S: Counted,
184 {
185 Managed::from_parts(other, Arc::clone(&self.meta))
186 }
187
188 pub fn received_at(&self) -> DateTime<Utc> {
190 self.meta.received_at
191 }
192
193 pub fn scoping(&self) -> Scoping {
195 self.meta.scoping
196 }
197
198 pub fn scope(&mut self, scoping: Scoping) {
208 let meta = Arc::make_mut(&mut self.meta);
209 meta.scoping = scoping;
210 }
211
212 pub fn split_once<F, S, U>(self, f: F) -> (Managed<S>, Managed<U>)
218 where
219 F: FnOnce(T) -> (S, U),
220 S: Counted,
221 U: Counted,
222 {
223 debug_assert!(!self.is_done());
224
225 let (value, meta) = self.destructure();
226 #[cfg(debug_assertions)]
227 let quantities = value.quantities();
228
229 let (a, b) = f(value);
230
231 #[cfg(debug_assertions)]
232 debug::Quantities::from(&quantities)
233 .assert_only_extra(debug::Quantities::from(&a) + debug::Quantities::from(&b));
238
239 (
240 Managed::from_parts(a, Arc::clone(&meta)),
241 Managed::from_parts(b, meta),
242 )
243 }
244
245 pub fn split<F, I, S>(self, f: F) -> Split<I::IntoIter, I::Item>
250 where
251 F: FnOnce(T) -> I,
252 I: IntoIterator<Item = S>,
253 S: Counted,
254 {
255 self.split_with_context(|value| (f(value), ())).0
256 }
257
258 pub fn split_with_context<F, I, S, C>(self, f: F) -> (Split<I::IntoIter, I::Item>, C)
263 where
264 F: FnOnce(T) -> (I, C),
265 I: IntoIterator<Item = S>,
266 S: Counted,
267 {
268 debug_assert!(!self.is_done());
269
270 let (value, meta) = self.destructure();
271 #[cfg(debug_assertions)]
272 let quantities = value.quantities();
273
274 let (items, context) = f(value);
275
276 (
277 Split {
278 #[cfg(debug_assertions)]
279 quantities,
280 items: items.into_iter(),
281 meta,
282 exhausted: false,
283 },
284 context,
285 )
286 }
287
288 pub fn retain<S, I, U, E>(&mut self, select: S, mut retain: U)
324 where
325 S: FnOnce(&mut T) -> &mut Vec<I>,
326 I: Counted,
327 U: FnMut(&mut I, &mut RecordKeeper<'_>) -> Result<(), E>,
328 E: OutcomeError,
329 {
330 self.retain_with_context(
331 |inner| (select(inner), &()),
332 |item, _, records| retain(item, records),
333 );
334 }
335
336 pub fn retain_with_context<S, C, I, U, E>(&mut self, select: S, mut retain: U)
373 where
374 for<'a> S: FnOnce(&'a mut T) -> (&'a mut Vec<I>, &'a C),
379 I: Counted,
380 U: FnMut(&mut I, &C, &mut RecordKeeper<'_>) -> Result<(), E>,
381 E: OutcomeError,
382 {
383 self.modify(|inner, records| {
384 let (items, ctx) = select(inner);
385 items.retain_mut(|item| match retain(item, ctx, records) {
386 Ok(()) => true,
387 Err(err) => {
388 records.reject_err(err, &*item);
389 false
390 }
391 })
392 });
393 }
394
395 pub fn map<S, F>(self, f: F) -> Managed<S>
399 where
400 F: FnOnce(T, &mut RecordKeeper) -> S,
401 S: Counted,
402 {
403 self.try_map(move |inner, records| Ok::<_, Infallible>(f(inner, records)))
404 .unwrap_or_else(|e| match e.0 {})
405 }
406
407 pub fn try_map<S, F, E>(self, f: F) -> Result<Managed<S>, Rejected<E::Error>>
415 where
416 F: FnOnce(T, &mut RecordKeeper) -> Result<S, E>,
417 S: Counted,
418 E: OutcomeError,
419 {
420 debug_assert!(!self.is_done());
421
422 let (value, meta) = self.destructure();
423 let quantities = value.quantities();
424
425 let mut records = RecordKeeper::new(&meta, quantities);
426
427 match f(value, &mut records) {
428 Ok(value) => {
429 records.success(value.quantities());
430 Ok(Managed::from_parts(value, meta))
431 }
432 Err(err) => Err(records.failure(err)),
433 }
434 }
435
436 pub fn modify<F>(&mut self, f: F)
440 where
441 F: FnOnce(&mut T, &mut RecordKeeper),
442 {
443 self.try_modify(move |inner, records| {
444 f(inner, records);
445 Ok::<_, Infallible>(())
446 })
447 .unwrap_or_else(|e| match e {})
448 }
449
450 pub fn try_modify<F, E>(&mut self, f: F) -> Result<(), Rejected<E::Error>>
458 where
459 F: FnOnce(&mut T, &mut RecordKeeper) -> Result<(), E>,
460 E: OutcomeError,
461 {
462 debug_assert!(!self.is_done());
463
464 let quantities = self.value.quantities();
465 let mut records = RecordKeeper::new(&self.meta, quantities);
466
467 match f(&mut self.value, &mut records) {
468 Ok(()) => {
469 records.success(self.value.quantities());
470 Ok(())
471 }
472 Err(err) => {
473 let err = records.failure(err);
474 self.done.store(true, Ordering::Relaxed);
475 Err(err)
476 }
477 }
478 }
479
480 pub fn accept<F, S>(self, f: F) -> S
488 where
489 F: FnOnce(T) -> S,
490 {
491 self.try_accept(|item| Ok::<_, Infallible>(f(item)))
492 .unwrap_or_else(|err| match err.0 {})
493 }
494
495 pub fn try_accept<F, S, E>(self, f: F) -> Result<S, Rejected<E::Error>>
503 where
504 F: FnOnce(T) -> Result<S, E>,
505 E: OutcomeError,
506 {
507 debug_assert!(!self.is_done());
508
509 let (value, meta) = self.destructure();
510 let records = RecordKeeper::new(&meta, value.quantities());
511
512 match f(value) {
513 Ok(value) => {
514 records.accept();
515 Ok(value)
516 }
517 Err(err) => Err(records.failure(err)),
518 }
519 }
520
521 #[track_caller]
529 pub fn internal_error(&self, reason: &'static str) -> Rejected<()> {
530 relay_log::error!("internal error: {reason}");
531 debug_assert!(false, "internal error: {reason}");
532 self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
533 }
534
535 pub fn reject_err<E>(&self, error: E) -> Rejected<E::Error>
537 where
538 E: OutcomeError,
539 {
540 debug_assert!(!self.is_done());
541
542 let (outcome, error) = error.consume();
543 self.do_reject(outcome);
544 Rejected(error)
545 }
546
547 fn do_reject(&self, outcome: Option<Outcome>) {
548 let is_done = self.done.fetch_or(true, Ordering::Relaxed);
551
552 let Some(outcome) = outcome else {
554 return;
555 };
556
557 if !is_done {
563 for (category, quantity) in self.value.quantities() {
564 self.meta.track_outcome(outcome.clone(), category, quantity);
565 }
566 }
567 }
568
569 fn destructure(self) -> (T, Arc<Meta>) {
576 let this = ManuallyDrop::new(self);
586 let Managed { value, meta, done } = &*this;
587
588 let value = unsafe { std::ptr::read(value) };
589 let meta = unsafe { std::ptr::read(meta) };
590 let done = unsafe { std::ptr::read(done) };
591 debug_assert!(
594 !done.load(Ordering::Relaxed),
595 "a `done` managed should never be destructured"
596 );
597
598 (value, meta)
599 }
600
601 fn from_parts(value: T, meta: Arc<Meta>) -> Self {
602 Self {
603 value,
604 meta,
605 done: AtomicBool::new(false),
606 }
607 }
608
609 fn is_done(&self) -> bool {
610 self.done.load(Ordering::Relaxed)
611 }
612}
613
614impl<T: Counted> Drop for Managed<T> {
615 fn drop(&mut self) {
616 self.do_reject(Some(Outcome::Invalid(DiscardReason::Internal)));
617 }
618}
619
620impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
621 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
622 write!(f, "Managed<{}>[", std::any::type_name::<T>())?;
623 for (i, (category, quantity)) in self.value.quantities().iter().enumerate() {
624 if i > 0 {
625 write!(f, ",")?;
626 }
627 write!(f, "{category}:{quantity}")?;
628 }
629 write!(f, "](")?;
630 self.value.fmt(f)?;
631 write!(f, ")")
632 }
633}
634
635impl<T: Counted> Managed<Option<T>> {
636 pub fn transpose(self) -> Option<Managed<T>> {
638 let (o, meta) = self.destructure();
639 o.map(|t| Managed::from_parts(t, meta))
640 }
641}
642
643impl<L: Counted, R: Counted> Managed<Either<L, R>> {
644 pub fn transpose(self) -> Either<Managed<L>, Managed<R>> {
646 let (either, meta) = self.destructure();
647 match either {
648 Either::Left(value) => Either::Left(Managed::from_parts(value, meta)),
649 Either::Right(value) => Either::Right(Managed::from_parts(value, meta)),
650 }
651 }
652}
653
654impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
655 fn from(value: Managed<Box<Envelope>>) -> Self {
656 let (value, meta) = value.destructure();
657 let mut envelope = ManagedEnvelope::new(value, meta.outcome_aggregator.clone());
658 envelope.scope(meta.scoping);
659 envelope
660 }
661}
662
663impl<T: Counted> AsRef<T> for Managed<T> {
664 fn as_ref(&self) -> &T {
665 &self.value
666 }
667}
668
669impl<T: Counted> std::ops::Deref for Managed<T> {
670 type Target = T;
671
672 fn deref(&self) -> &Self::Target {
673 &self.value
674 }
675}
676
677#[derive(Debug, Clone)]
679struct Meta {
680 outcome_aggregator: Addr<TrackOutcome>,
682 received_at: DateTime<Utc>,
686 scoping: Scoping,
688 event_id: Option<EventId>,
690 remote_addr: Option<IpAddr>,
692}
693
694impl Meta {
695 pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
696 self.outcome_aggregator.send(TrackOutcome {
697 timestamp: self.received_at,
698 scoping: self.scoping,
699 outcome,
700 event_id: self.event_id,
701 remote_addr: self.remote_addr,
702 category,
703 quantity: quantity.try_into().unwrap_or(u32::MAX),
704 });
705 }
706}
707
708pub struct RecordKeeper<'a> {
711 meta: &'a Meta,
712 on_drop: Quantities,
713 #[cfg(debug_assertions)]
714 lenient: SmallVec<[DataCategory; 1]>,
715 #[cfg(debug_assertions)]
716 modifications: BTreeMap<DataCategory, isize>,
717 in_flight: SmallVec<[(DataCategory, usize, Option<Outcome>); 2]>,
718}
719
720impl<'a> RecordKeeper<'a> {
721 fn new(meta: &'a Meta, quantities: Quantities) -> Self {
722 Self {
723 meta,
724 on_drop: quantities,
725 #[cfg(debug_assertions)]
726 lenient: Default::default(),
727 #[cfg(debug_assertions)]
728 modifications: Default::default(),
729 in_flight: Default::default(),
730 }
731 }
732
733 pub fn lenient(&mut self, category: DataCategory) {
740 let _category = category;
741 #[cfg(debug_assertions)]
742 self.lenient.push(_category);
743 }
744
745 pub fn modify_by(&mut self, category: DataCategory, offset: isize) {
753 let _category = category;
754 let _offset = offset;
755 #[cfg(debug_assertions)]
756 {
757 *self.modifications.entry(_category).or_default() += offset;
758 }
759 }
760
761 fn failure<E>(mut self, error: E) -> Rejected<E::Error>
765 where
766 E: OutcomeError,
767 {
768 let (outcome, error) = error.consume();
769
770 if let Some(outcome) = outcome {
771 for (category, quantity) in std::mem::take(&mut self.on_drop) {
772 self.meta.track_outcome(outcome.clone(), category, quantity);
773 }
774 }
775
776 Rejected(error)
777 }
778
779 fn accept(mut self) {
787 debug_assert!(
788 self.in_flight.is_empty(),
789 "records accepted, but intermediate outcomes tracked"
790 );
791 self.on_drop.clear();
792 }
793
794 fn success(mut self, new: Quantities) {
800 let original = std::mem::take(&mut self.on_drop);
801 self.assert_quantities(original, new);
802
803 self.on_drop.clear();
804 for (category, quantity, outcome) in std::mem::take(&mut self.in_flight) {
805 if let Some(outcome) = outcome {
806 self.meta.track_outcome(outcome, category, quantity);
807 }
808 }
809 }
810
811 #[cfg(debug_assertions)]
816 fn assert_quantities(&self, original: Quantities, new: Quantities) {
817 macro_rules! emit {
818 ($category:expr, $($tt:tt)*) => {{
819 match self.lenient.contains(&$category) {
820 true => relay_log::debug!($($tt)*),
823 false => {
824 relay_log::error!("Original: {original:?}");
825 relay_log::error!("New: {new:?}");
826 relay_log::error!("Modifications: {:?}", self.modifications);
827 relay_log::error!("In Flight: {:?}", self.in_flight);
828 panic!($($tt)*)
829 }
830 }
831 }};
832 }
833
834 let mut sums = debug::Quantities::from(&original).0;
835 for (category, offset) in &self.modifications {
836 let v = sums.entry(*category).or_default();
837 match v.checked_add_signed(*offset) {
838 Some(result) => *v = result,
839 None => emit!(
840 category,
841 "Attempted to modify original quantity {v} into the negative ({offset})"
842 ),
843 }
844 }
845
846 for (category, quantity, outcome) in &self.in_flight {
847 match sums.get_mut(category) {
848 Some(c) if *c >= *quantity => *c -= *quantity,
849 Some(c) => emit!(
850 category,
851 "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there were only {c} items in the category originally"
852 ),
853 None => emit!(
854 category,
855 "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there never was an item in this category"
856 ),
857 }
858 }
859
860 for (category, quantity) in &new {
861 match sums.get_mut(category) {
862 Some(c) if *c >= *quantity => *c -= *quantity,
863 Some(c) => emit!(
864 category,
865 "New item has {quantity} items in category '{category}', but original (after emitted outcomes) only has {c} left"
866 ),
867 None => emit!(
868 category,
869 "New item has {quantity} items in category '{category}', but after emitted outcomes there are none left"
870 ),
871 }
872 }
873
874 for (category, quantity) in sums {
875 if quantity > 0 {
876 emit!(
877 category,
878 "Missing outcomes or mismatched quantity in category '{category}', off by {quantity}"
879 );
880 }
881 }
882 }
883
884 #[cfg(not(debug_assertions))]
885 fn assert_quantities(&self, _: Quantities, _: Quantities) {}
886}
887
888impl<'a> Drop for RecordKeeper<'a> {
889 fn drop(&mut self) {
890 for (category, quantity) in std::mem::take(&mut self.on_drop) {
891 self.meta.track_outcome(
892 Outcome::Invalid(DiscardReason::Internal),
893 category,
894 quantity,
895 );
896 }
897 }
898}
899
900impl RecordKeeper<'_> {
901 pub fn or_default<T, E, Q>(&mut self, r: Result<T, E>, q: Q) -> T
906 where
907 T: Default,
908 E: OutcomeError,
909 Q: Counted,
910 {
911 match r {
912 Ok(result) => result,
913 Err(err) => {
914 self.reject_err(err, q);
915 T::default()
916 }
917 }
918 }
919
920 pub fn reject_err<E, Q>(&mut self, err: E, q: Q) -> E::Error
925 where
926 E: OutcomeError,
927 Q: Counted,
928 {
929 let (outcome, err) = err.consume();
930 for (category, quantity) in q.quantities() {
931 self.in_flight.push((category, quantity, outcome.clone()))
932 }
933 err
934 }
935
936 #[track_caller]
940 pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
941 where
942 E: std::error::Error + 'static,
943 Q: Counted,
944 {
945 relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
946 debug_assert!(false, "internal error: {error}");
947 self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
948 }
949}
950
951pub struct Split<I, S>
953where
954 I: Iterator<Item = S>,
955 S: Counted,
956{
957 #[cfg(debug_assertions)]
958 quantities: Quantities,
959 items: I,
960 meta: Arc<Meta>,
961 exhausted: bool,
962}
963
964impl<I, S> Split<I, S>
965where
966 I: Iterator<Item = S>,
967 S: Counted,
968{
969 #[cfg(debug_assertions)]
972 fn subtract(&mut self, q: Quantities) {
973 for (category, quantities) in q {
974 let Some(orig_quantities) = self
975 .quantities
976 .iter_mut()
977 .find_map(|(c, q)| (*c == category).then_some(q))
978 else {
979 debug_assert!(
980 false,
981 "mismatching quantities, item split into category {category}, \
982 which originally was not present"
983 );
984 continue;
985 };
986
987 if *orig_quantities >= quantities {
988 *orig_quantities -= quantities;
989 } else {
990 debug_assert!(
991 false,
992 "in total more items produced in category {category} than originally available"
993 );
994 }
995 }
996 }
997}
998
999impl<I, S> Iterator for Split<I, S>
1000where
1001 I: Iterator<Item = S>,
1002 S: Counted,
1003{
1004 type Item = Managed<S>;
1005
1006 fn next(&mut self) -> Option<Self::Item> {
1007 let next = match self.items.next() {
1008 Some(next) => next,
1009 None => {
1010 self.exhausted = true;
1011 return None;
1012 }
1013 };
1014
1015 #[cfg(debug_assertions)]
1016 self.subtract(next.quantities());
1017
1018 Some(Managed::from_parts(next, Arc::clone(&self.meta)))
1019 }
1020}
1021
1022impl<I, S> Drop for Split<I, S>
1023where
1024 I: Iterator<Item = S>,
1025 S: Counted,
1026{
1027 fn drop(&mut self) {
1028 #[cfg(debug_assertions)]
1030 if self.exhausted {
1031 for (category, quantities) in &self.quantities {
1032 debug_assert!(
1033 *quantities == 0,
1034 "items split, but still {quantities} remaining in category {category}"
1035 );
1036 }
1037 }
1038
1039 if self.exhausted {
1040 return;
1041 }
1042
1043 for item in &mut self.items {
1049 for (category, quantity) in item.quantities() {
1050 self.meta.track_outcome(
1051 Outcome::Invalid(DiscardReason::Internal),
1052 category,
1053 quantity,
1054 );
1055 }
1056 }
1057 }
1058}
1059
1060impl<I, S> FusedIterator for Split<I, S>
1061where
1062 I: Iterator<Item = S> + FusedIterator,
1063 S: Counted,
1064{
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069 use super::*;
1070
1071 struct CountedVec(Vec<u32>);
1072
1073 impl Counted for CountedVec {
1074 fn quantities(&self) -> Quantities {
1075 smallvec::smallvec![(DataCategory::Error, self.0.len())]
1076 }
1077 }
1078
1079 struct CountedValue(u32);
1080
1081 impl Counted for CountedValue {
1082 fn quantities(&self) -> Quantities {
1083 smallvec::smallvec![(DataCategory::Error, 1)]
1084 }
1085 }
1086
1087 #[test]
1088 fn test_reject_err_no_outcome() {
1089 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1090 let (managed, mut handle) = Managed::for_test(value).build();
1091
1092 let _ = managed.reject_err((None, ()));
1094 handle.assert_no_outcomes();
1095
1096 drop(managed);
1098 handle.assert_no_outcomes();
1099 }
1100
1101 #[test]
1102 fn test_split_fully_consumed() {
1103 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1104 let (managed, mut handle) = Managed::for_test(value).build();
1105
1106 let s = managed
1107 .split(|value| value.0.into_iter().map(CountedValue))
1108 .collect::<Vec<_>>();
1110
1111 handle.assert_no_outcomes();
1112
1113 for (i, s) in s.into_iter().enumerate() {
1114 assert_eq!(s.as_ref().0, i as u32);
1115 let outcome = Outcome::Invalid(DiscardReason::Cors);
1116 let _ = s.reject_err((outcome.clone(), ()));
1117 handle.assert_outcome(&outcome, DataCategory::Error, 1);
1118 }
1119 }
1120
1121 #[test]
1122 fn test_split_partially_consumed_emits_remaining() {
1123 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1124 let (managed, mut handle) = Managed::for_test(value).build();
1125
1126 let mut s = managed.split(|value| value.0.into_iter().map(CountedValue));
1127 handle.assert_no_outcomes();
1128
1129 drop(s.next());
1130 handle.assert_internal_outcome(DataCategory::Error, 1);
1131 drop(s.next());
1132 handle.assert_internal_outcome(DataCategory::Error, 1);
1133 drop(s.next());
1134 handle.assert_internal_outcome(DataCategory::Error, 1);
1135 handle.assert_no_outcomes();
1136
1137 drop(s);
1138
1139 handle.assert_internal_outcome(DataCategory::Error, 1);
1140 handle.assert_internal_outcome(DataCategory::Error, 1);
1141 handle.assert_internal_outcome(DataCategory::Error, 1);
1142 }
1143
1144 #[test]
1145 fn test_split_changing_quantities_should_panic() {
1146 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1147 let (managed, mut handle) = Managed::for_test(value).build();
1148
1149 let mut s = managed.split(|_| std::iter::once(CountedValue(0)));
1150
1151 s.next().unwrap().accept(|_| {});
1152 handle.assert_no_outcomes();
1153
1154 assert!(s.next().is_none());
1155
1156 let r = std::panic::catch_unwind(move || {
1157 drop(s);
1158 });
1159
1160 assert!(
1161 r.is_err(),
1162 "expected split to panic because of mismatched (not enough) outcomes"
1163 );
1164 }
1165
1166 #[test]
1167 fn test_split_more_outcomes_than_before_should_panic() {
1168 let value = CountedVec(vec![0]);
1169 let (managed, mut handle) = Managed::for_test(value).build();
1170
1171 let mut s = managed.split(|_| vec![CountedValue(0), CountedValue(2)].into_iter());
1172
1173 s.next().unwrap().accept(|_| {});
1174 handle.assert_no_outcomes();
1175
1176 let r = std::panic::catch_unwind(move || {
1177 s.next();
1178 });
1179
1180 assert!(
1181 r.is_err(),
1182 "expected split to panic because of mismatched (too many) outcomes"
1183 );
1184 }
1185
1186 #[test]
1187 fn test_split_changing_categories_should_panic() {
1188 struct Special;
1189 impl Counted for Special {
1190 fn quantities(&self) -> Quantities {
1191 smallvec::smallvec![(DataCategory::Error, 1), (DataCategory::Transaction, 1)]
1192 }
1193 }
1194
1195 let value = CountedVec(vec![0]);
1196 let (managed, _handle) = Managed::for_test(value).build();
1197
1198 let mut s = managed.split(|value| value.0.into_iter().map(|_| Special));
1199
1200 let r = std::panic::catch_unwind(move || {
1201 let _ = s.next();
1202 });
1203
1204 assert!(
1205 r.is_err(),
1206 "expected split to panic because of mismatched outcome categories"
1207 );
1208 }
1209
1210 #[test]
1211 fn test_split_assert_fused() {
1212 fn only_fused<T: FusedIterator>(_: T) {}
1213
1214 let (managed, mut handle) = Managed::for_test(CountedVec(vec![0])).build();
1215 only_fused(managed.split(|value| value.0.into_iter().map(CountedValue)));
1216 handle.assert_internal_outcome(DataCategory::Error, 1);
1217 }
1218}