1use relay_common::time::UnixTimestamp;
2use serde::ser::{SerializeMap, SerializeSeq};
3use serde::Serialize;
4
5use crate::{
6 BucketMetadata, CounterType, DistributionType, GaugeValue, MetricName, SetType, SetValue,
7};
8use relay_base_schema::metrics::MetricType;
9use std::collections::BTreeMap;
10use std::fmt;
11use std::ops::Range;
12
13use crate::bucket::Bucket;
14use crate::BucketValue;
15
16const BUCKET_SPLIT_FACTOR: usize = 32;
20
21const BUCKET_SIZE: usize = 50;
25
26const AVG_VALUE_SIZE: usize = 8;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33struct Index {
34 slice: usize,
36 bucket: usize,
38}
39
40#[derive(Clone, Copy)]
62pub struct BucketsView<T> {
63 inner: T,
65 start: Index,
70 end: Index,
75}
76
77impl<T> BucketsView<T>
78where
79 T: AsRef<[Bucket]>,
80{
81 pub fn new(buckets: T) -> Self {
83 let len = buckets.as_ref().len();
84 Self {
85 inner: buckets,
86 start: Index {
87 slice: 0,
88 bucket: 0,
89 },
90 end: Index {
91 slice: len,
92 bucket: 0,
93 },
94 }
95 }
96
97 pub fn len(&self) -> usize {
99 let mut len = self.end.slice - self.start.slice;
100 if self.end.bucket != 0 {
101 len += 1;
102 }
103 len
104 }
105
106 pub fn is_empty(&self) -> bool {
108 self.len() == 0
109 }
110
111 pub fn as_slice(&self) -> BucketsView<&[Bucket]> {
113 BucketsView {
114 inner: self.inner.as_ref(),
115 start: self.start,
116 end: self.end,
117 }
118 }
119
120 pub fn iter(&self) -> BucketsViewIter<'_> {
122 BucketsViewIter::new(self.inner.as_ref(), self.start, self.end)
123 }
124
125 pub fn by_size(self, size_in_bytes: usize) -> BucketsViewBySizeIter<T> {
127 BucketsViewBySizeIter::new(self.inner, self.start, self.end, size_in_bytes)
128 }
129}
130
131impl<'a> From<&'a [Bucket]> for BucketsView<&'a [Bucket]> {
132 fn from(value: &'a [Bucket]) -> Self {
133 Self::new(value)
134 }
135}
136
137impl<'a> From<&'a Vec<Bucket>> for BucketsView<&'a [Bucket]> {
138 fn from(value: &'a Vec<Bucket>) -> Self {
139 Self::new(value.as_slice())
140 }
141}
142
143impl<T> fmt::Debug for BucketsView<T>
144where
145 T: AsRef<[Bucket]>,
146{
147 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148 let contents = self.iter().collect::<Vec<_>>();
149 f.debug_tuple("BucketsView").field(&contents).finish()
150 }
151}
152
153impl<'a> IntoIterator for BucketsView<&'a [Bucket]> {
154 type Item = BucketView<'a>;
155 type IntoIter = BucketsViewIter<'a>;
156
157 fn into_iter(self) -> Self::IntoIter {
158 BucketsViewIter::new(self.inner, self.start, self.end)
159 }
160}
161
162impl<'a, T> IntoIterator for &'a BucketsView<T>
163where
164 T: AsRef<[Bucket]>,
165{
166 type Item = BucketView<'a>;
167 type IntoIter = BucketsViewIter<'a>;
168
169 fn into_iter(self) -> Self::IntoIter {
170 BucketsViewIter::new(self.inner.as_ref(), self.start, self.end)
171 }
172}
173
174pub struct BucketsViewIter<'a> {
178 inner: &'a [Bucket],
180 current: Index,
182 end: Index,
184}
185
186impl<'a> BucketsViewIter<'a> {
187 fn new(inner: &'a [Bucket], start: Index, end: Index) -> Self {
191 Self {
192 inner,
193 end,
194 current: start,
195 }
196 }
197}
198
199impl<'a> Iterator for BucketsViewIter<'a> {
200 type Item = BucketView<'a>;
201
202 fn next(&mut self) -> Option<Self::Item> {
203 if self.current.slice == self.end.slice && self.end.bucket == 0 {
205 return None;
206 }
207 if self.current.slice > self.end.slice {
209 return None;
210 }
211
212 debug_assert!(
214 self.current.slice < self.inner.len(),
215 "invariant violated, iterator pointing past the slice"
216 );
217 let next = self.inner.get(self.current.slice)?;
218
219 let end = match self.current.slice == self.end.slice {
221 false => next.value.len(),
222 true => self.end.bucket,
223 };
224
225 let next = BucketView::new(next).select(self.current.bucket..end);
226 let Some(next) = next else {
227 debug_assert!(false, "invariant violated, invalid bucket split");
228 relay_log::error!("Internal invariant violated, invalid bucket split, dropping all remaining buckets.");
229 return None;
230 };
231
232 self.current = Index {
235 slice: self.current.slice + 1,
236 bucket: 0,
237 };
238
239 Some(next)
240 }
241}
242
243pub struct BucketsViewBySizeIter<T> {
247 inner: T,
249 current: Index,
251 end: Index,
253 max_size_bytes: usize,
255}
256
257impl<T> BucketsViewBySizeIter<T> {
258 fn new(inner: T, start: Index, end: Index, max_size_bytes: usize) -> Self {
262 Self {
263 inner,
264 end,
265 current: start,
266 max_size_bytes,
267 }
268 }
269}
270
271impl<T> Iterator for BucketsViewBySizeIter<T>
272where
273 T: AsRef<[Bucket]>,
274 T: Clone,
275{
276 type Item = BucketsView<T>;
277
278 fn next(&mut self) -> Option<Self::Item> {
279 let start = self.current;
280
281 let mut remaining_bytes = self.max_size_bytes;
282 loop {
283 if (self.current.slice > self.end.slice)
285 || (self.current.slice == self.end.slice && self.end.bucket == 0)
286 {
287 break;
288 }
289
290 let inner = self.inner.as_ref();
291 debug_assert!(
295 self.current.slice < inner.len(),
296 "invariant violated, iterator pointing past the slice"
297 );
298 let bucket = inner.get(self.current.slice)?;
299
300 let bucket = BucketView::new(bucket).select(self.current.bucket..bucket.value.len());
303 let Some(bucket) = bucket else {
304 debug_assert!(false, "internal invariant violated, invalid bucket split");
305 relay_log::error!("Internal invariant violated, invalid bucket split, dropping all remaining buckets.");
306 return None;
307 };
308
309 match split(
310 &bucket,
311 remaining_bytes,
312 self.max_size_bytes / BUCKET_SPLIT_FACTOR,
313 ) {
314 SplitDecision::BucketFits(size) => {
315 remaining_bytes -= size;
316 self.current = Index {
317 slice: self.current.slice + 1,
318 bucket: 0,
319 };
320 continue;
321 }
322 SplitDecision::MoveToNextBatch => break,
323 SplitDecision::Split(n) => {
324 self.current = Index {
325 slice: self.current.slice,
326 bucket: self.current.bucket + n,
327 };
328 break;
329 }
330 }
331 }
332
333 if start == self.current {
334 return None;
337 }
338
339 Some(BucketsView {
342 inner: self.inner.clone(),
343 start,
344 end: self.current,
345 })
346 }
347}
348
349impl<T> Serialize for BucketsView<T>
350where
351 T: AsRef<[Bucket]>,
352{
353 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
354 where
355 S: serde::Serializer,
356 {
357 let mut state = serializer.serialize_seq(Some(self.len()))?;
358
359 for bucket in self.iter() {
360 state.serialize_element(&bucket)?;
361 }
362
363 state.end()
364 }
365}
366
367#[derive(Clone)]
380pub struct BucketView<'a> {
381 inner: &'a Bucket,
383 range: Range<usize>,
386}
387
388impl<'a> BucketView<'a> {
389 pub fn new(bucket: &'a Bucket) -> Self {
393 Self {
394 inner: bucket,
395 range: 0..bucket.value.len(),
396 }
397 }
398
399 pub fn timestamp(&self) -> UnixTimestamp {
403 self.inner.timestamp
404 }
405
406 pub fn width(&self) -> u64 {
410 self.inner.width
411 }
412
413 pub fn name(&self) -> &'a MetricName {
417 &self.inner.name
418 }
419
420 pub fn value(&self) -> BucketViewValue<'a> {
422 match &self.inner.value {
423 BucketValue::Counter(c) => BucketViewValue::Counter(*c),
424 BucketValue::Distribution(d) => BucketViewValue::Distribution(&d[self.range.clone()]),
425 BucketValue::Set(s) => BucketViewValue::Set(SetView::new(s, self.range.clone())),
426 BucketValue::Gauge(g) => BucketViewValue::Gauge(*g),
427 }
428 }
429
430 pub fn ty(&self) -> MetricType {
432 match &self.inner.value {
433 BucketValue::Counter(_) => MetricType::Counter,
434 BucketValue::Distribution(_) => MetricType::Distribution,
435 BucketValue::Set(_) => MetricType::Set,
436 BucketValue::Gauge(_) => MetricType::Gauge,
437 }
438 }
439
440 pub fn tags(&self) -> &'a BTreeMap<String, String> {
444 &self.inner.tags
445 }
446
447 pub fn tag(&self, name: &str) -> Option<&'a str> {
451 self.inner.tag(name)
452 }
453
454 pub fn metadata(&self) -> BucketMetadata {
472 let merges = if self.range.start == 0 {
473 self.inner.metadata.merges
474 } else {
475 0
476 };
477
478 BucketMetadata {
479 merges,
480 ..self.inner.metadata
481 }
482 }
483
484 pub fn len(&self) -> usize {
488 self.range.len()
489 }
490
491 pub fn is_empty(&self) -> bool {
493 self.len() == 0
494 }
495
496 pub fn select(mut self, range: Range<usize>) -> Option<Self> {
502 if range.start < self.range.start || range.end > self.range.end {
503 return None;
504 }
505
506 if !self.can_split() && range != (0..self.inner.value.len()) {
508 return None;
509 }
510
511 self.range = range;
512 Some(self)
513 }
514
515 fn estimated_base_size(&self) -> usize {
520 BUCKET_SIZE + self.name().len() + crate::utils::tags_cost(self.tags())
521 }
522
523 pub fn estimated_size(&self) -> usize {
529 self.estimated_base_size() + self.len() * AVG_VALUE_SIZE
530 }
531
532 pub fn split(self, size: usize, max_size: Option<usize>) -> (Option<Self>, Option<Self>) {
545 match split(&self, size, max_size.unwrap_or(0) / BUCKET_SPLIT_FACTOR) {
546 SplitDecision::BucketFits(_) => (Some(self), None),
547 SplitDecision::MoveToNextBatch => (None, Some(self)),
548 SplitDecision::Split(n) => {
549 let Range { start, end } = self.range;
550 let at = start + n;
551 (self.clone().select(start..at), self.select(at..end))
552 }
553 }
554 }
555
556 fn can_split(&self) -> bool {
560 matches!(
561 self.inner.value,
562 BucketValue::Distribution(_) | BucketValue::Set(_)
563 )
564 }
565
566 fn is_full_bucket(&self) -> bool {
568 self.range.start == 0 && self.range.end == self.inner.value.len()
569 }
570}
571
572impl fmt::Debug for BucketView<'_> {
573 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
574 f.debug_struct("BucketView")
575 .field("timestamp", &self.inner.timestamp)
576 .field("width", &self.inner.width)
577 .field("name", &self.inner.name)
578 .field("value", &self.value())
579 .field("tags", &self.inner.tags)
580 .finish()
581 }
582}
583
584impl<'a> From<&'a Bucket> for BucketView<'a> {
585 fn from(value: &'a Bucket) -> Self {
586 BucketView::new(value)
587 }
588}
589
590impl Serialize for BucketView<'_> {
591 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
592 where
593 S: serde::Serializer,
594 {
595 let Bucket {
596 timestamp,
597 width,
598 name,
599 value: _,
600 tags,
601 metadata,
602 } = self.inner;
603
604 let len = match tags.is_empty() {
605 true => 4,
606 false => 5,
607 };
608
609 let mut state = serializer.serialize_map(Some(len))?;
610
611 state.serialize_entry("timestamp", timestamp)?;
612 state.serialize_entry("width", width)?;
613 state.serialize_entry("name", name)?;
614
615 if self.is_full_bucket() {
616 self.inner
617 .value
618 .serialize(serde::__private::ser::FlatMapSerializer(&mut state))?;
619 } else {
620 self.value()
621 .serialize(serde::__private::ser::FlatMapSerializer(&mut state))?;
622 }
623
624 if !tags.is_empty() {
625 state.serialize_entry("tags", tags)?;
626 }
627 if !metadata.is_default() {
628 state.serialize_entry("metadata", metadata)?;
629 }
630
631 state.end()
632 }
633}
634
635#[derive(Debug, Clone, PartialEq, Serialize)]
637#[serde(tag = "type", content = "value")]
638pub enum BucketViewValue<'a> {
639 #[serde(rename = "c")]
643 Counter(CounterType),
644 #[serde(rename = "d")]
648 Distribution(&'a [DistributionType]),
649 #[serde(rename = "s")]
653 Set(SetView<'a>),
654 #[serde(rename = "g")]
658 Gauge(GaugeValue),
659}
660
661impl<'a> From<&'a BucketValue> for BucketViewValue<'a> {
662 fn from(value: &'a BucketValue) -> Self {
663 match value {
664 BucketValue::Counter(c) => BucketViewValue::Counter(*c),
665 BucketValue::Distribution(d) => BucketViewValue::Distribution(d),
666 BucketValue::Set(s) => BucketViewValue::Set(SetView::new(s, 0..s.len())),
667 BucketValue::Gauge(g) => BucketViewValue::Gauge(*g),
668 }
669 }
670}
671
672#[derive(Clone)]
674pub struct SetView<'a> {
675 source: &'a SetValue,
676 range: Range<usize>,
677}
678
679impl<'a> SetView<'a> {
680 fn new(source: &'a SetValue, range: Range<usize>) -> Self {
681 Self { source, range }
682 }
683
684 pub fn len(&self) -> usize {
686 self.range.len()
687 }
688
689 pub fn is_empty(&self) -> bool {
691 self.len() == 0
692 }
693
694 pub fn iter(&self) -> impl Iterator<Item = &SetType> {
696 self.source
697 .iter()
698 .skip(self.range.start)
699 .take(self.range.len())
700 }
701}
702
703impl PartialEq for SetView<'_> {
704 fn eq(&self, other: &Self) -> bool {
705 self.iter().eq(other.iter())
706 }
707}
708
709impl fmt::Debug for SetView<'_> {
710 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
711 f.debug_tuple("SetView")
712 .field(&self.iter().collect::<Vec<_>>())
713 .finish()
714 }
715}
716
717impl Serialize for SetView<'_> {
718 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
719 where
720 S: serde::Serializer,
721 {
722 let mut state = serializer.serialize_seq(Some(self.len()))?;
723
724 for item in self.iter() {
725 state.serialize_element(item)?;
726 }
727
728 state.end()
729 }
730}
731
732enum SplitDecision {
734 BucketFits(usize),
738 MoveToNextBatch,
740 Split(usize),
742}
743
744fn split(bucket: &BucketView<'_>, max_size: usize, min_split_size: usize) -> SplitDecision {
756 let bucket_size = bucket.estimated_size();
758 if max_size >= bucket_size {
759 return SplitDecision::BucketFits(bucket_size);
760 }
761
762 if !bucket.can_split() {
763 return SplitDecision::MoveToNextBatch;
764 }
765
766 let own_size = bucket.estimated_base_size();
769 if max_size < (own_size + AVG_VALUE_SIZE) {
770 return SplitDecision::MoveToNextBatch;
772 }
773
774 if bucket_size < min_split_size {
775 return SplitDecision::MoveToNextBatch;
776 }
777
778 let split_at = (max_size - own_size) / AVG_VALUE_SIZE;
781 SplitDecision::Split(split_at)
782}
783
784#[cfg(test)]
785mod tests {
786 use std::sync::Arc;
787
788 use insta::assert_json_snapshot;
789
790 use super::*;
791
792 #[test]
793 fn test_bucket_view_select_counter() {
794 let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap();
795
796 let view = BucketView::new(&bucket).select(0..1).unwrap();
797 assert_eq!(view.len(), 1);
798 assert_eq!(
799 serde_json::to_string(&view).unwrap(),
800 serde_json::to_string(&bucket).unwrap()
801 );
802 }
803
804 #[test]
805 fn test_bucket_view_select_invalid_counter() {
806 let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap();
807
808 assert!(BucketView::new(&bucket).select(0..0).is_none());
809 assert!(BucketView::new(&bucket).select(0..2).is_none());
810 assert!(BucketView::new(&bucket).select(1..1).is_none());
811 }
812
813 #[test]
814 fn test_bucket_view_counter_metadata() {
815 let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap();
816 assert_eq!(bucket.metadata, BucketView::new(&bucket).metadata());
817 }
818
819 #[test]
820 fn test_bucket_view_select_distribution() {
821 let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
822
823 let view = BucketView::new(&bucket).select(0..3).unwrap();
824 assert_eq!(view.len(), 3);
825 assert_eq!(
826 view.value(),
827 BucketViewValue::Distribution(&[1.into(), 2.into(), 3.into()])
828 );
829 let view = BucketView::new(&bucket).select(1..3).unwrap();
830 assert_eq!(view.len(), 2);
831 assert_eq!(
832 view.value(),
833 BucketViewValue::Distribution(&[2.into(), 3.into()])
834 );
835 let view = BucketView::new(&bucket).select(1..5).unwrap();
836 assert_eq!(view.len(), 4);
837 assert_eq!(
838 view.value(),
839 BucketViewValue::Distribution(&[2.into(), 3.into(), 5.into(), 5.into()])
840 );
841 }
842
843 #[test]
844 fn test_bucket_view_select_invalid_distribution() {
845 let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
846
847 assert!(BucketView::new(&bucket).select(0..6).is_none());
848 assert!(BucketView::new(&bucket).select(5..6).is_none());
849 assert!(BucketView::new(&bucket).select(77..99).is_none());
850 }
851
852 #[test]
853 fn test_bucket_view_distribution_metadata() {
854 let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
855 assert_eq!(bucket.metadata, BucketView::new(&bucket).metadata());
856
857 assert_eq!(
858 BucketView::new(&bucket).select(0..3).unwrap().metadata(),
859 bucket.metadata
860 );
861
862 let m = BucketView::new(&bucket).select(1..3).unwrap().metadata();
863 assert_eq!(
864 m,
865 BucketMetadata {
866 merges: 0,
867 ..bucket.metadata
868 }
869 );
870 }
871
872 #[test]
873 fn test_bucket_view_select_set() {
874 let bucket = Bucket::parse(b"b3:42:75|s", UnixTimestamp::from_secs(5000)).unwrap();
875 let s = [42, 75].into();
876
877 let view = BucketView::new(&bucket).select(0..2).unwrap();
878 assert_eq!(view.len(), 2);
879 assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 0..2)));
880 let view = BucketView::new(&bucket).select(1..2).unwrap();
881 assert_eq!(view.len(), 1);
882 assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 1..2)));
883 let view = BucketView::new(&bucket).select(0..1).unwrap();
884 assert_eq!(view.len(), 1);
885 assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 0..1)));
886 }
887
888 #[test]
889 fn test_bucket_view_select_invalid_set() {
890 let bucket = Bucket::parse(b"b3:42:75|s", UnixTimestamp::from_secs(5000)).unwrap();
891
892 assert!(BucketView::new(&bucket).select(0..3).is_none());
893 assert!(BucketView::new(&bucket).select(2..5).is_none());
894 assert!(BucketView::new(&bucket).select(77..99).is_none());
895 }
896
897 #[test]
898 fn test_bucket_view_set_metadata() {
899 let bucket = Bucket::parse(b"b2:1:2:3:5:5|s", UnixTimestamp::from_secs(5000)).unwrap();
900 assert_eq!(bucket.metadata, BucketView::new(&bucket).metadata());
901
902 assert_eq!(
903 BucketView::new(&bucket).select(0..3).unwrap().metadata(),
904 bucket.metadata
905 );
906
907 let m = BucketView::new(&bucket).select(1..3).unwrap().metadata();
908 assert_eq!(
909 m,
910 BucketMetadata {
911 merges: 0,
912 ..bucket.metadata
913 }
914 );
915 }
916
917 #[test]
918 fn test_bucket_view_select_gauge() {
919 let bucket =
920 Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap();
921
922 let view = BucketView::new(&bucket).select(0..5).unwrap();
923 assert_eq!(view.len(), 5);
924 assert_eq!(
925 view.value(),
926 BucketViewValue::Gauge(GaugeValue {
927 last: 25.into(),
928 min: 17.into(),
929 max: 42.into(),
930 sum: 220.into(),
931 count: 85
932 })
933 );
934 }
935
936 #[test]
937 fn test_bucket_view_select_invalid_gauge() {
938 let bucket =
939 Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap();
940
941 assert!(BucketView::new(&bucket).select(0..1).is_none());
942 assert!(BucketView::new(&bucket).select(0..4).is_none());
943 assert!(BucketView::new(&bucket).select(5..5).is_none());
944 assert!(BucketView::new(&bucket).select(5..6).is_none());
945 }
946
947 #[test]
948 fn test_bucket_view_gauge_metadata() {
949 let bucket =
950 Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap();
951 assert_eq!(BucketView::new(&bucket).metadata(), bucket.metadata);
952 }
953
954 fn buckets<T>(s: &[u8]) -> T
955 where
956 T: FromIterator<Bucket>,
957 {
958 let timestamp = UnixTimestamp::from_secs(5000);
959 Bucket::parse_all(s, timestamp)
960 .collect::<Result<T, _>>()
961 .unwrap()
962 }
963
964 #[test]
965 fn test_buckets_view_empty() {
966 let view = BucketsView::new(Vec::new());
967 assert_eq!(view.len(), 0);
968 assert!(view.is_empty());
969 let partials = view.iter().collect::<Vec<_>>();
970 assert!(partials.is_empty());
971 }
972
973 #[test]
974 fn test_buckets_view_iter_full() {
975 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
976
977 let view = BucketsView::from(&buckets);
978 assert_eq!(view.len(), 4);
979 assert!(!view.is_empty());
980 let partials = view.iter().collect::<Vec<_>>();
981 assert_eq!(partials.len(), 4);
982 assert_eq!(partials[0].name(), "c:custom/b0@none");
983 assert_eq!(partials[0].len(), 1);
984 assert_eq!(partials[1].name(), "c:custom/b1@none");
985 assert_eq!(partials[1].len(), 1);
986 assert_eq!(partials[2].name(), "d:custom/b2@none");
987 assert_eq!(partials[2].len(), 5);
988 assert_eq!(partials[3].name(), "s:custom/b3@none");
989 assert_eq!(partials[3].len(), 2);
990 }
991
992 #[test]
993 fn test_buckets_view_iter_partial_end() {
994 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
995
996 let mut view = BucketsView::new(&buckets);
997 view.end.slice = 2;
998 view.end.bucket = 3;
999 assert_eq!(view.len(), 3);
1000 assert!(!view.is_empty());
1001
1002 let partials = view.iter().collect::<Vec<_>>();
1003 assert_eq!(partials.len(), 3);
1004 assert_eq!(partials[0].name(), "c:custom/b0@none");
1005 assert_eq!(partials[0].len(), 1);
1006 assert_eq!(partials[1].name(), "c:custom/b1@none");
1007 assert_eq!(partials[1].len(), 1);
1008 assert_eq!(partials[2].name(), "d:custom/b2@none");
1009 assert_eq!(partials[2].len(), 3);
1010 }
1011
1012 #[test]
1013 fn test_buckets_view_iter_partial_start() {
1014 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1015
1016 let mut view = BucketsView::new(buckets);
1017 view.start.slice = 2;
1018 view.start.bucket = 3;
1019 assert_eq!(view.len(), 2);
1020 assert!(!view.is_empty());
1021
1022 let partials = view.iter().collect::<Vec<_>>();
1023 assert_eq!(partials.len(), 2);
1024 assert_eq!(partials[0].name(), "d:custom/b2@none");
1025 assert_eq!(partials[0].len(), 2);
1026 assert_eq!(partials[1].name(), "s:custom/b3@none");
1027 assert_eq!(partials[1].len(), 2);
1028 }
1029
1030 #[test]
1031 fn test_buckets_view_iter_partial_start_and_end() {
1032 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1033
1034 let mut view = BucketsView::from(&buckets);
1035 view.start.slice = 2;
1036 view.start.bucket = 1;
1037 view.end.slice = 3;
1038 view.end.bucket = 1;
1039 assert_eq!(view.len(), 2);
1040 assert!(!view.is_empty());
1041
1042 let partials = view.iter().collect::<Vec<_>>();
1043 assert_eq!(partials.len(), 2);
1044 assert_eq!(partials[0].name(), "d:custom/b2@none");
1045 assert_eq!(partials[0].len(), 4);
1046 assert_eq!(partials[1].name(), "s:custom/b3@none");
1047 assert_eq!(partials[1].len(), 1);
1048 }
1049
1050 #[test]
1051 fn test_buckets_view_by_size_small() {
1052 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1053
1054 let view = BucketsView::from(&buckets);
1055 let partials = view
1056 .by_size(100)
1057 .map(|bv| {
1058 let len: usize = bv.iter().map(|b| b.len()).sum();
1059 let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
1060 (len, size)
1061 })
1062 .collect::<Vec<_>>();
1063
1064 assert_eq!(partials, vec![(1, 74), (1, 74), (4, 98), (1, 74), (2, 82),]);
1065 }
1066
1067 #[test]
1068 fn test_buckets_view_by_size_small_as_arc() {
1069 let buckets: Arc<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1070
1071 let view = BucketsView::new(buckets);
1072 let partials = view
1073 .by_size(100)
1074 .map(|bv| {
1075 let len: usize = bv.iter().map(|b| b.len()).sum();
1076 let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
1077 (len, size)
1078 })
1079 .collect::<Vec<_>>();
1080
1081 assert_eq!(partials, vec![(1, 74), (1, 74), (4, 98), (1, 74), (2, 82),]);
1082 }
1083
1084 #[test]
1085 fn test_buckets_view_by_size_one_split() {
1086 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1087
1088 let view = BucketsView::from(&buckets);
1089 let partials = view
1090 .by_size(250)
1091 .map(|bv| {
1092 let len: usize = bv.iter().map(|b| b.len()).sum();
1093 let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
1094 (len, size)
1095 })
1096 .collect::<Vec<_>>();
1097
1098 assert_eq!(partials, vec![(6, 246), (3, 156)]);
1099 }
1100
1101 #[test]
1102 fn test_buckets_view_by_size_no_split() {
1103 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1104
1105 let view = BucketsView::from(&buckets);
1106 let partials = view
1107 .by_size(500)
1108 .map(|bv| {
1109 let len: usize = bv.iter().map(|b| b.len()).sum();
1110 let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
1111 (len, size)
1112 })
1113 .collect::<Vec<_>>();
1114
1115 assert_eq!(partials, vec![(9, 336)]);
1116 }
1117
1118 #[test]
1119 fn test_buckets_view_by_size_no_too_small_no_bucket_fits() {
1120 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1121
1122 let view = BucketsView::from(&buckets);
1123 let partials = view
1124 .by_size(50) .count();
1126
1127 assert_eq!(partials, 0);
1128 }
1129
1130 #[test]
1131 fn test_buckets_view_by_size_do_not_split_gauge() {
1132 let buckets: Vec<_> = buckets(b"transactions/foo:25:17:42:220:85|g");
1133
1134 let view = BucketsView::from(&buckets);
1135 let partials = view.by_size(100).count();
1138
1139 assert_eq!(partials, 0);
1140 }
1141
1142 #[test]
1143 fn test_buckets_view_serialize_full() {
1144 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c|#foo,bar:baz\nb2:1:2:3:5:5|d|#foo,bar:baz b3:42:75|s\ntransactions/foo:25:17:42:220:85|g");
1145
1146 assert_eq!(
1147 serde_json::to_string(&BucketsView::from(&buckets)).unwrap(),
1148 serde_json::to_string(&buckets).unwrap()
1149 );
1150 }
1151
1152 #[test]
1153 fn test_buckets_view_serialize_partial() {
1154 let buckets: Arc<[_]> = buckets(
1155 b"b1:12|c|#foo,bar:baz\nb2:1:2:3:5:5|d|#foo,bar:baz\nb3:42:75|s\nb4:25:17:42:220:85|g",
1156 );
1157
1158 let view = BucketsView::new(buckets);
1159 let partials = view.by_size(178).collect::<Vec<_>>();
1162
1163 assert_json_snapshot!(partials);
1164 }
1165
1166 #[test]
1167 fn test_split_repeatedly() {
1168 let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
1169 let view = BucketView::new(&bucket);
1170
1171 let split_size = view.estimated_base_size() + 2 * AVG_VALUE_SIZE;
1173
1174 let (first, rest) = view.split(split_size, None);
1175 let (second, rest) = rest.unwrap().split(split_size, None);
1176 let (third, rest) = rest.unwrap().split(split_size, None);
1177
1178 assert_eq!(first.unwrap().range, 0..2);
1179 assert_eq!(second.unwrap().range, 2..4);
1180 assert_eq!(third.unwrap().range, 4..5);
1181 assert!(rest.is_none());
1182 }
1183}