1#[cfg(debug_assertions)]
2use std::collections::BTreeMap;
3use std::convert::Infallible;
4use std::fmt;
5use std::iter::FusedIterator;
6use std::mem::ManuallyDrop;
7use std::net::IpAddr;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10
11use chrono::{DateTime, Utc};
12use itertools::Either;
13use relay_event_schema::protocol::EventId;
14use relay_quotas::{DataCategory, Scoping};
15use relay_system::Addr;
16use smallvec::SmallVec;
17
18use crate::Envelope;
19use crate::managed::{Counted, ManagedEnvelope, Quantities};
20use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
21use crate::services::processor::ProcessingError;
22
23#[cfg(debug_assertions)]
24mod debug;
25#[cfg(test)]
26mod test;
27
28#[cfg(test)]
29pub use self::test::*;
30
31pub trait OutcomeError {
33 type Error;
35
36 fn consume(self) -> (Option<Outcome>, Self::Error);
40}
41
42impl OutcomeError for Outcome {
43 type Error = ();
44
45 fn consume(self) -> (Option<Outcome>, Self::Error) {
46 (self, ()).consume()
47 }
48}
49
50impl OutcomeError for Option<Outcome> {
51 type Error = ();
52
53 fn consume(self) -> (Option<Outcome>, Self::Error) {
54 (self, ()).consume()
55 }
56}
57
58impl<E> OutcomeError for (Outcome, E) {
59 type Error = E;
60
61 fn consume(self) -> (Option<Outcome>, Self::Error) {
62 (Some(self.0), self.1)
63 }
64}
65
66impl<E> OutcomeError for (Option<Outcome>, E) {
67 type Error = E;
68
69 fn consume(self) -> (Option<Outcome>, Self::Error) {
70 self
71 }
72}
73
74impl OutcomeError for ProcessingError {
75 type Error = Self;
76
77 fn consume(self) -> (Option<Outcome>, Self::Error) {
78 (self.to_outcome(), self)
79 }
80}
81
82impl OutcomeError for Infallible {
83 type Error = Self;
84
85 fn consume(self) -> (Option<Outcome>, Self::Error) {
86 match self {}
87 }
88}
89
90#[derive(Debug, Clone, Copy)]
95#[must_use = "a rejection must be propagated"]
96pub struct Rejected<T>(T);
97
98impl<T> Rejected<T> {
99 pub fn into_inner(self) -> T {
101 self.0
102 }
103
104 pub fn map<F, S>(self, f: F) -> Rejected<S>
106 where
107 F: FnOnce(T) -> S,
108 {
109 Rejected(f(self.0))
110 }
111}
112
113impl<T> std::error::Error for Rejected<T>
114where
115 T: std::error::Error,
116{
117 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
118 self.0.source()
119 }
120}
121
122impl<T> std::fmt::Display for Rejected<T>
123where
124 T: std::fmt::Display,
125{
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 self.0.fmt(f)
128 }
129}
130
131impl<T> axum::response::IntoResponse for Rejected<T>
132where
133 T: axum::response::IntoResponse,
134{
135 fn into_response(self) -> axum::response::Response {
136 self.0.into_response()
137 }
138}
139
140pub struct Managed<T: Counted> {
142 value: T,
143 meta: Arc<Meta>,
144 done: AtomicBool,
145}
146
147impl Managed<Box<Envelope>> {
148 pub fn from_envelope(envelope: Box<Envelope>, outcome_aggregator: Addr<TrackOutcome>) -> Self {
150 let meta = Arc::new(Meta {
151 outcome_aggregator,
152 received_at: envelope.received_at(),
153 scoping: envelope.meta().get_partial_scoping().into_scoping(),
154 event_id: envelope.event_id(),
155 remote_addr: envelope.meta().remote_addr(),
156 });
157
158 Self::from_parts(envelope, meta)
159 }
160}
161
162pub trait RetainMut<I> {
164 fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool);
166}
167
168impl<I> RetainMut<I> for Vec<I> {
169 fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool) {
170 Vec::retain_mut(self, f)
171 }
172}
173impl<I, const N: usize> RetainMut<I> for SmallVec<[I; N]> {
174 fn retain_mut(&mut self, f: impl FnMut(&mut I) -> bool) {
175 SmallVec::retain_mut(self, f)
176 }
177}
178
179impl<T: Counted> Managed<T> {
180 pub fn with_meta_from(envelope: &ManagedEnvelope, value: T) -> Self {
185 Self::from_parts(
186 value,
187 Arc::new(Meta {
188 outcome_aggregator: envelope.outcome_aggregator().clone(),
189 received_at: envelope.received_at(),
190 scoping: envelope.scoping(),
191 event_id: envelope.envelope().event_id(),
192 remote_addr: envelope.meta().remote_addr(),
193 }),
194 )
195 }
196
197 pub fn wrap<S>(&self, other: S) -> Managed<S>
199 where
200 S: Counted,
201 {
202 Managed::from_parts(other, Arc::clone(&self.meta))
203 }
204
205 pub fn boxed(self) -> Managed<Box<T>> {
207 self.map(|value, _| Box::new(value))
208 }
209
210 pub fn received_at(&self) -> DateTime<Utc> {
212 self.meta.received_at
213 }
214
215 pub fn scoping(&self) -> Scoping {
217 self.meta.scoping
218 }
219
220 pub fn scope(&mut self, scoping: Scoping) {
230 let meta = Arc::make_mut(&mut self.meta);
231 meta.scoping = scoping;
232 }
233
234 pub fn split_once<F, S, U>(self, f: F) -> (Managed<S>, Managed<U>)
240 where
241 F: FnOnce(T, &mut RecordKeeper) -> (S, U),
242 S: Counted,
243 U: Counted,
244 {
245 debug_assert!(!self.is_done());
246
247 let (value, meta) = self.destructure();
248 let quantities = value.quantities();
249
250 let mut records = RecordKeeper::new(&meta, quantities);
251
252 let (a, b) = f(value, &mut records);
253
254 let mut quantities = a.quantities();
255 quantities.extend(b.quantities());
256 records.success(quantities);
257
258 (
259 Managed::from_parts(a, Arc::clone(&meta)),
260 Managed::from_parts(b, meta),
261 )
262 }
263
264 pub fn split<F, I, S>(self, f: F) -> Split<I::IntoIter, I::Item>
269 where
270 F: FnOnce(T) -> I,
271 I: IntoIterator<Item = S>,
272 S: Counted,
273 {
274 self.split_with_context(|value| (f(value), ())).0
275 }
276
277 pub fn split_with_context<F, I, S, C>(self, f: F) -> (Split<I::IntoIter, I::Item>, C)
282 where
283 F: FnOnce(T) -> (I, C),
284 I: IntoIterator<Item = S>,
285 S: Counted,
286 {
287 debug_assert!(!self.is_done());
288
289 let (value, meta) = self.destructure();
290 #[cfg(debug_assertions)]
291 let quantities = value.quantities();
292
293 let (items, context) = f(value);
294
295 (
296 Split {
297 #[cfg(debug_assertions)]
298 quantities,
299 items: items.into_iter(),
300 meta,
301 exhausted: false,
302 },
303 context,
304 )
305 }
306
307 pub fn retain<S, I, U, E, V>(&mut self, select: S, mut retain: U)
343 where
344 S: FnOnce(&mut T) -> &mut V,
345 I: Counted,
346 U: FnMut(&mut I, &mut RecordKeeper<'_>) -> Result<(), E>,
347 E: OutcomeError,
348 V: RetainMut<I>,
349 {
350 self.retain_with_context(
351 |inner| (select(inner), &()),
352 |item, _, records| retain(item, records),
353 );
354 }
355
356 pub fn retain_with_context<S, C, I, U, E, V>(&mut self, select: S, mut retain: U)
393 where
394 for<'a> S: FnOnce(&'a mut T) -> (&'a mut V, &'a C),
399 I: Counted,
400 U: FnMut(&mut I, &C, &mut RecordKeeper<'_>) -> Result<(), E>,
401 E: OutcomeError,
402 V: RetainMut<I>,
403 {
404 self.modify(|inner, records| {
405 let (items, ctx) = select(inner);
406 items.retain_mut(|item| match retain(item, ctx, records) {
407 Ok(()) => true,
408 Err(err) => {
409 records.reject_err(err, &*item);
410 false
411 }
412 })
413 });
414 }
415
416 pub fn map<S, F>(self, f: F) -> Managed<S>
420 where
421 F: FnOnce(T, &mut RecordKeeper) -> S,
422 S: Counted,
423 {
424 self.try_map(move |inner, records| Ok::<_, Infallible>(f(inner, records)))
425 .unwrap_or_else(|e| match e.0 {})
426 }
427
428 pub fn try_map<S, F, E>(self, f: F) -> Result<Managed<S>, Rejected<E::Error>>
436 where
437 F: FnOnce(T, &mut RecordKeeper) -> Result<S, E>,
438 S: Counted,
439 E: OutcomeError,
440 {
441 debug_assert!(!self.is_done());
442
443 let (value, meta) = self.destructure();
444 let quantities = value.quantities();
445
446 let mut records = RecordKeeper::new(&meta, quantities);
447
448 match f(value, &mut records) {
449 Ok(value) => {
450 records.success(value.quantities());
451 Ok(Managed::from_parts(value, meta))
452 }
453 Err(err) => Err(records.failure(err)),
454 }
455 }
456
457 pub fn modify<F>(&mut self, f: F)
461 where
462 F: FnOnce(&mut T, &mut RecordKeeper),
463 {
464 self.try_modify(move |inner, records| {
465 f(inner, records);
466 Ok::<_, Infallible>(())
467 })
468 .unwrap_or_else(|e| match e {})
469 }
470
471 pub fn try_modify<F, E>(&mut self, f: F) -> Result<(), Rejected<E::Error>>
479 where
480 F: FnOnce(&mut T, &mut RecordKeeper) -> Result<(), E>,
481 E: OutcomeError,
482 {
483 debug_assert!(!self.is_done());
484
485 let quantities = self.value.quantities();
486 let mut records = RecordKeeper::new(&self.meta, quantities);
487
488 match f(&mut self.value, &mut records) {
489 Ok(()) => {
490 records.success(self.value.quantities());
491 Ok(())
492 }
493 Err(err) => {
494 let err = records.failure(err);
495 self.done.store(true, Ordering::Relaxed);
496 Err(err)
497 }
498 }
499 }
500
501 pub fn accept<F, S>(self, f: F) -> S
509 where
510 F: FnOnce(T) -> S,
511 {
512 self.try_accept(|item| Ok::<_, Infallible>(f(item)))
513 .unwrap_or_else(|err| match err.0 {})
514 }
515
516 pub fn try_accept<F, S, E>(self, f: F) -> Result<S, Rejected<E::Error>>
524 where
525 F: FnOnce(T) -> Result<S, E>,
526 E: OutcomeError,
527 {
528 debug_assert!(!self.is_done());
529
530 let (value, meta) = self.destructure();
531 let records = RecordKeeper::new(&meta, value.quantities());
532
533 match f(value) {
534 Ok(value) => {
535 records.accept();
536 Ok(value)
537 }
538 Err(err) => Err(records.failure(err)),
539 }
540 }
541
542 #[track_caller]
550 pub fn internal_error(&self, reason: &'static str) -> Rejected<()> {
551 relay_log::error!("internal error: {reason}");
552 debug_assert!(false, "internal error: {reason}");
553 self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
554 }
555
556 pub fn reject_err<E>(&self, error: E) -> Rejected<E::Error>
558 where
559 E: OutcomeError,
560 {
561 debug_assert!(!self.is_done());
562
563 let (outcome, error) = error.consume();
564 self.do_reject(outcome);
565 Rejected(error)
566 }
567
568 fn do_reject(&self, outcome: Option<Outcome>) {
569 let is_done = self.done.fetch_or(true, Ordering::Relaxed);
572
573 let Some(outcome) = outcome else {
575 return;
576 };
577
578 if !is_done {
584 for (category, quantity) in self.value.quantities() {
585 self.meta.track_outcome(outcome.clone(), category, quantity);
586 }
587 }
588 }
589
590 fn destructure(self) -> (T, Arc<Meta>) {
597 let this = ManuallyDrop::new(self);
607 let Managed { value, meta, done } = &*this;
608
609 let value = unsafe { std::ptr::read(value) };
610 let meta = unsafe { std::ptr::read(meta) };
611 let done = unsafe { std::ptr::read(done) };
612 debug_assert!(
615 !done.load(Ordering::Relaxed),
616 "a `done` managed should never be destructured"
617 );
618
619 (value, meta)
620 }
621
622 fn from_parts(value: T, meta: Arc<Meta>) -> Self {
623 Self {
624 value,
625 meta,
626 done: AtomicBool::new(false),
627 }
628 }
629
630 fn is_done(&self) -> bool {
631 self.done.load(Ordering::Relaxed)
632 }
633}
634
635impl<T: Counted> Drop for Managed<T> {
636 fn drop(&mut self) {
637 self.do_reject(Some(Outcome::Invalid(DiscardReason::Internal)));
638 }
639}
640
641impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
642 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
643 write!(f, "Managed<{}>[", std::any::type_name::<T>())?;
644 for (i, (category, quantity)) in self.value.quantities().iter().enumerate() {
645 if i > 0 {
646 write!(f, ",")?;
647 }
648 write!(f, "{category}:{quantity}")?;
649 }
650 write!(f, "](")?;
651 self.value.fmt(f)?;
652 write!(f, ")")
653 }
654}
655
656impl<T: Counted> Managed<Option<T>> {
657 pub fn transpose(self) -> Option<Managed<T>> {
659 let (o, meta) = self.destructure();
660 o.map(|t| Managed::from_parts(t, meta))
661 }
662}
663
664impl<L: Counted, R: Counted> Managed<Either<L, R>> {
665 pub fn transpose(self) -> Either<Managed<L>, Managed<R>> {
667 let (either, meta) = self.destructure();
668 match either {
669 Either::Left(value) => Either::Left(Managed::from_parts(value, meta)),
670 Either::Right(value) => Either::Right(Managed::from_parts(value, meta)),
671 }
672 }
673}
674
675impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
676 fn from(value: Managed<Box<Envelope>>) -> Self {
677 let (value, meta) = value.destructure();
678 let mut envelope = ManagedEnvelope::new(value, meta.outcome_aggregator.clone());
679 envelope.scope(meta.scoping);
680 envelope
681 }
682}
683
684impl<T: Counted> AsRef<T> for Managed<T> {
685 fn as_ref(&self) -> &T {
686 &self.value
687 }
688}
689
690impl<T: Counted> std::ops::Deref for Managed<T> {
691 type Target = T;
692
693 fn deref(&self) -> &Self::Target {
694 &self.value
695 }
696}
697
698#[derive(Debug, Clone)]
700struct Meta {
701 outcome_aggregator: Addr<TrackOutcome>,
703 received_at: DateTime<Utc>,
707 scoping: Scoping,
709 event_id: Option<EventId>,
711 remote_addr: Option<IpAddr>,
713}
714
715impl Meta {
716 pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
717 self.outcome_aggregator.send(TrackOutcome {
718 timestamp: self.received_at,
719 scoping: self.scoping,
720 outcome,
721 event_id: self.event_id,
722 remote_addr: self.remote_addr,
723 category,
724 quantity: quantity.try_into().unwrap_or(u32::MAX),
725 });
726 }
727}
728
729pub struct RecordKeeper<'a> {
732 meta: &'a Meta,
733 on_drop: Quantities,
734 #[cfg(debug_assertions)]
735 lenient: SmallVec<[DataCategory; 1]>,
736 #[cfg(debug_assertions)]
737 modifications: BTreeMap<DataCategory, isize>,
738 in_flight: SmallVec<[(DataCategory, usize, Option<Outcome>); 2]>,
739}
740
741impl<'a> RecordKeeper<'a> {
742 fn new(meta: &'a Meta, quantities: Quantities) -> Self {
743 Self {
744 meta,
745 on_drop: quantities,
746 #[cfg(debug_assertions)]
747 lenient: Default::default(),
748 #[cfg(debug_assertions)]
749 modifications: Default::default(),
750 in_flight: Default::default(),
751 }
752 }
753
754 pub fn lenient(&mut self, category: DataCategory) {
761 let _category = category;
762 #[cfg(debug_assertions)]
763 self.lenient.push(_category);
764 }
765
766 pub fn modify_by(&mut self, category: DataCategory, offset: isize) {
774 let _category = category;
775 let _offset = offset;
776 #[cfg(debug_assertions)]
777 {
778 *self.modifications.entry(_category).or_default() += offset;
779 }
780 }
781
782 fn failure<E>(mut self, error: E) -> Rejected<E::Error>
786 where
787 E: OutcomeError,
788 {
789 let (outcome, error) = error.consume();
790
791 if let Some(outcome) = outcome {
792 for (category, quantity) in std::mem::take(&mut self.on_drop) {
793 self.meta.track_outcome(outcome.clone(), category, quantity);
794 }
795 }
796
797 Rejected(error)
798 }
799
800 fn accept(mut self) {
808 debug_assert!(
809 self.in_flight.is_empty(),
810 "records accepted, but intermediate outcomes tracked"
811 );
812 self.on_drop.clear();
813 }
814
815 fn success(mut self, new: Quantities) {
821 let original = std::mem::take(&mut self.on_drop);
822 self.assert_quantities(original, new);
823
824 self.on_drop.clear();
825 for (category, quantity, outcome) in std::mem::take(&mut self.in_flight) {
826 if let Some(outcome) = outcome {
827 self.meta.track_outcome(outcome, category, quantity);
828 }
829 }
830 }
831
832 #[cfg(debug_assertions)]
837 fn assert_quantities(&self, original: Quantities, new: Quantities) {
838 macro_rules! emit {
839 ($category:expr, $($tt:tt)*) => {{
840 match self.lenient.contains(&$category) {
841 true => relay_log::debug!($($tt)*),
844 false => {
845 relay_log::error!("Original: {original:?}");
846 relay_log::error!("New: {new:?}");
847 relay_log::error!("Modifications: {:?}", self.modifications);
848 relay_log::error!("In Flight: {:?}", self.in_flight);
849 panic!($($tt)*)
850 }
851 }
852 }};
853 }
854
855 let mut sums = debug::Quantities::from(&original).0;
856 for (category, offset) in &self.modifications {
857 let v = sums.entry(*category).or_default();
858 match v.checked_add_signed(*offset) {
859 Some(result) => *v = result,
860 None => emit!(
861 category,
862 "Attempted to modify original quantity {v} into the negative ({offset})"
863 ),
864 }
865 }
866
867 for (category, quantity, outcome) in &self.in_flight {
868 match sums.get_mut(category) {
869 Some(c) if *c >= *quantity => *c -= *quantity,
870 Some(c) => emit!(
871 category,
872 "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there were only {c} items in the category originally"
873 ),
874 None => emit!(
875 category,
876 "Emitted {quantity} outcomes ({outcome:?}) for {category}, but there never was an item in this category"
877 ),
878 }
879 }
880
881 for (category, quantity) in &new {
882 match sums.get_mut(category) {
883 Some(c) if *c >= *quantity => *c -= *quantity,
884 Some(c) => emit!(
885 category,
886 "New item has {quantity} items in category '{category}', but original (after emitted outcomes) only has {c} left"
887 ),
888 None => emit!(
889 category,
890 "New item has {quantity} items in category '{category}', but after emitted outcomes there are none left"
891 ),
892 }
893 }
894
895 for (category, quantity) in sums {
896 if quantity > 0 {
897 emit!(
898 category,
899 "Missing outcomes or mismatched quantity in category '{category}', off by {quantity}"
900 );
901 }
902 }
903 }
904
905 #[cfg(not(debug_assertions))]
906 fn assert_quantities(&self, _: Quantities, _: Quantities) {}
907}
908
909impl<'a> Drop for RecordKeeper<'a> {
910 fn drop(&mut self) {
911 for (category, quantity) in std::mem::take(&mut self.on_drop) {
912 self.meta.track_outcome(
913 Outcome::Invalid(DiscardReason::Internal),
914 category,
915 quantity,
916 );
917 }
918 }
919}
920
921impl RecordKeeper<'_> {
922 pub fn or_default<T, E, Q>(&mut self, r: Result<T, E>, q: Q) -> T
927 where
928 T: Default,
929 E: OutcomeError,
930 Q: Counted,
931 {
932 match r {
933 Ok(result) => result,
934 Err(err) => {
935 self.reject_err(err, q);
936 T::default()
937 }
938 }
939 }
940
941 pub fn reject_err<E, Q>(&mut self, err: E, q: Q) -> E::Error
946 where
947 E: OutcomeError,
948 Q: Counted,
949 {
950 let (outcome, err) = err.consume();
951 for (category, quantity) in q.quantities() {
952 self.in_flight.push((category, quantity, outcome.clone()))
953 }
954 err
955 }
956
957 #[track_caller]
961 pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
962 where
963 E: std::error::Error + 'static,
964 Q: Counted,
965 {
966 relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
967 debug_assert!(false, "internal error: {error}");
968 self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
969 }
970}
971
972pub struct Split<I, S>
974where
975 I: Iterator<Item = S>,
976 S: Counted,
977{
978 #[cfg(debug_assertions)]
979 quantities: Quantities,
980 items: I,
981 meta: Arc<Meta>,
982 exhausted: bool,
983}
984
985impl<I, S> Split<I, S>
986where
987 I: Iterator<Item = S>,
988 S: Counted,
989{
990 #[cfg(debug_assertions)]
993 fn subtract(&mut self, q: Quantities) {
994 for (category, quantities) in q {
995 let Some(orig_quantities) = self
996 .quantities
997 .iter_mut()
998 .find_map(|(c, q)| (*c == category).then_some(q))
999 else {
1000 debug_assert!(
1001 false,
1002 "mismatching quantities, item split into category {category}, \
1003 which originally was not present"
1004 );
1005 continue;
1006 };
1007
1008 if *orig_quantities >= quantities {
1009 *orig_quantities -= quantities;
1010 } else {
1011 debug_assert!(
1012 false,
1013 "in total more items produced in category {category} than originally available"
1014 );
1015 }
1016 }
1017 }
1018}
1019
1020impl<I, S> Iterator for Split<I, S>
1021where
1022 I: Iterator<Item = S>,
1023 S: Counted,
1024{
1025 type Item = Managed<S>;
1026
1027 fn next(&mut self) -> Option<Self::Item> {
1028 let next = match self.items.next() {
1029 Some(next) => next,
1030 None => {
1031 self.exhausted = true;
1032 return None;
1033 }
1034 };
1035
1036 #[cfg(debug_assertions)]
1037 self.subtract(next.quantities());
1038
1039 Some(Managed::from_parts(next, Arc::clone(&self.meta)))
1040 }
1041}
1042
1043impl<I, S> Drop for Split<I, S>
1044where
1045 I: Iterator<Item = S>,
1046 S: Counted,
1047{
1048 fn drop(&mut self) {
1049 #[cfg(debug_assertions)]
1051 if self.exhausted {
1052 for (category, quantities) in &self.quantities {
1053 debug_assert!(
1054 *quantities == 0,
1055 "items split, but still {quantities} remaining in category {category}"
1056 );
1057 }
1058 }
1059
1060 if self.exhausted {
1061 return;
1062 }
1063
1064 for item in &mut self.items {
1070 for (category, quantity) in item.quantities() {
1071 self.meta.track_outcome(
1072 Outcome::Invalid(DiscardReason::Internal),
1073 category,
1074 quantity,
1075 );
1076 }
1077 }
1078 }
1079}
1080
1081impl<I, S> FusedIterator for Split<I, S>
1082where
1083 I: Iterator<Item = S> + FusedIterator,
1084 S: Counted,
1085{
1086}
1087
1088#[cfg(test)]
1089mod tests {
1090 use super::*;
1091
1092 struct CountedVec(Vec<u32>);
1093
1094 impl Counted for CountedVec {
1095 fn quantities(&self) -> Quantities {
1096 smallvec::smallvec![(DataCategory::Error, self.0.len())]
1097 }
1098 }
1099
1100 struct CountedValue(u32);
1101
1102 impl Counted for CountedValue {
1103 fn quantities(&self) -> Quantities {
1104 smallvec::smallvec![(DataCategory::Error, 1)]
1105 }
1106 }
1107
1108 #[test]
1109 fn test_reject_err_no_outcome() {
1110 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1111 let (managed, mut handle) = Managed::for_test(value).build();
1112
1113 let _ = managed.reject_err((None, ()));
1115 handle.assert_no_outcomes();
1116
1117 drop(managed);
1119 handle.assert_no_outcomes();
1120 }
1121
1122 #[test]
1123 fn test_split_fully_consumed() {
1124 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1125 let (managed, mut handle) = Managed::for_test(value).build();
1126
1127 let s = managed
1128 .split(|value| value.0.into_iter().map(CountedValue))
1129 .collect::<Vec<_>>();
1131
1132 handle.assert_no_outcomes();
1133
1134 for (i, s) in s.into_iter().enumerate() {
1135 assert_eq!(s.as_ref().0, i as u32);
1136 let outcome = Outcome::Invalid(DiscardReason::Cors);
1137 let _ = s.reject_err((outcome.clone(), ()));
1138 handle.assert_outcome(&outcome, DataCategory::Error, 1);
1139 }
1140 }
1141
1142 #[test]
1143 fn test_split_partially_consumed_emits_remaining() {
1144 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1145 let (managed, mut handle) = Managed::for_test(value).build();
1146
1147 let mut s = managed.split(|value| value.0.into_iter().map(CountedValue));
1148 handle.assert_no_outcomes();
1149
1150 drop(s.next());
1151 handle.assert_internal_outcome(DataCategory::Error, 1);
1152 drop(s.next());
1153 handle.assert_internal_outcome(DataCategory::Error, 1);
1154 drop(s.next());
1155 handle.assert_internal_outcome(DataCategory::Error, 1);
1156 handle.assert_no_outcomes();
1157
1158 drop(s);
1159
1160 handle.assert_internal_outcome(DataCategory::Error, 1);
1161 handle.assert_internal_outcome(DataCategory::Error, 1);
1162 handle.assert_internal_outcome(DataCategory::Error, 1);
1163 }
1164
1165 #[test]
1166 fn test_split_changing_quantities_should_panic() {
1167 let value = CountedVec(vec![0, 1, 2, 3, 4, 5]);
1168 let (managed, mut handle) = Managed::for_test(value).build();
1169
1170 let mut s = managed.split(|_| std::iter::once(CountedValue(0)));
1171
1172 s.next().unwrap().accept(|_| {});
1173 handle.assert_no_outcomes();
1174
1175 assert!(s.next().is_none());
1176
1177 let r = std::panic::catch_unwind(move || {
1178 drop(s);
1179 });
1180
1181 assert!(
1182 r.is_err(),
1183 "expected split to panic because of mismatched (not enough) outcomes"
1184 );
1185 }
1186
1187 #[test]
1188 fn test_split_more_outcomes_than_before_should_panic() {
1189 let value = CountedVec(vec![0]);
1190 let (managed, mut handle) = Managed::for_test(value).build();
1191
1192 let mut s = managed.split(|_| vec![CountedValue(0), CountedValue(2)].into_iter());
1193
1194 s.next().unwrap().accept(|_| {});
1195 handle.assert_no_outcomes();
1196
1197 let r = std::panic::catch_unwind(move || {
1198 s.next();
1199 });
1200
1201 assert!(
1202 r.is_err(),
1203 "expected split to panic because of mismatched (too many) outcomes"
1204 );
1205 }
1206
1207 #[test]
1208 fn test_split_changing_categories_should_panic() {
1209 struct Special;
1210 impl Counted for Special {
1211 fn quantities(&self) -> Quantities {
1212 smallvec::smallvec![(DataCategory::Error, 1), (DataCategory::Transaction, 1)]
1213 }
1214 }
1215
1216 let value = CountedVec(vec![0]);
1217 let (managed, _handle) = Managed::for_test(value).build();
1218
1219 let mut s = managed.split(|value| value.0.into_iter().map(|_| Special));
1220
1221 let r = std::panic::catch_unwind(move || {
1222 let _ = s.next();
1223 });
1224
1225 assert!(
1226 r.is_err(),
1227 "expected split to panic because of mismatched outcome categories"
1228 );
1229 }
1230
1231 #[test]
1232 fn test_split_assert_fused() {
1233 fn only_fused<T: FusedIterator>(_: T) {}
1234
1235 let (managed, mut handle) = Managed::for_test(CountedVec(vec![0])).build();
1236 only_fused(managed.split(|value| value.0.into_iter().map(CountedValue)));
1237 handle.assert_internal_outcome(DataCategory::Error, 1);
1238 }
1239}