1use relay_common::time::UnixTimestamp;
2use serde::Serialize;
3use serde::ser::{SerializeMap, SerializeSeq};
4
5use crate::{
6 BucketMetadata, CounterType, DistributionType, GaugeValue, MetricName, SetType, SetValue,
7};
8use relay_base_schema::metrics::MetricType;
9use std::collections::BTreeMap;
10use std::fmt;
11use std::ops::Range;
12
13use crate::BucketValue;
14use crate::bucket::Bucket;
15
16const BUCKET_SPLIT_FACTOR: usize = 32;
20
21const BUCKET_SIZE: usize = 50;
25
26const AVG_VALUE_SIZE: usize = 8;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33struct Index {
34 slice: usize,
36 bucket: usize,
38}
39
40#[derive(Clone, Copy)]
62pub struct BucketsView<T> {
63 inner: T,
65 start: Index,
70 end: Index,
75}
76
77impl<T> BucketsView<T>
78where
79 T: AsRef<[Bucket]>,
80{
81 pub fn new(buckets: T) -> Self {
83 let len = buckets.as_ref().len();
84 Self {
85 inner: buckets,
86 start: Index {
87 slice: 0,
88 bucket: 0,
89 },
90 end: Index {
91 slice: len,
92 bucket: 0,
93 },
94 }
95 }
96
97 pub fn len(&self) -> usize {
99 let mut len = self.end.slice - self.start.slice;
100 if self.end.bucket != 0 {
101 len += 1;
102 }
103 len
104 }
105
106 pub fn is_empty(&self) -> bool {
108 self.len() == 0
109 }
110
111 pub fn as_slice(&self) -> BucketsView<&[Bucket]> {
113 BucketsView {
114 inner: self.inner.as_ref(),
115 start: self.start,
116 end: self.end,
117 }
118 }
119
120 pub fn iter(&self) -> BucketsViewIter<'_> {
122 BucketsViewIter::new(self.inner.as_ref(), self.start, self.end)
123 }
124
125 pub fn by_size(self, size_in_bytes: usize) -> BucketsViewBySizeIter<T> {
127 BucketsViewBySizeIter::new(self.inner, self.start, self.end, size_in_bytes)
128 }
129}
130
131impl<'a> From<&'a [Bucket]> for BucketsView<&'a [Bucket]> {
132 fn from(value: &'a [Bucket]) -> Self {
133 Self::new(value)
134 }
135}
136
137impl<'a> From<&'a Vec<Bucket>> for BucketsView<&'a [Bucket]> {
138 fn from(value: &'a Vec<Bucket>) -> Self {
139 Self::new(value.as_slice())
140 }
141}
142
143impl<T> fmt::Debug for BucketsView<T>
144where
145 T: AsRef<[Bucket]>,
146{
147 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148 let contents = self.iter().collect::<Vec<_>>();
149 f.debug_tuple("BucketsView").field(&contents).finish()
150 }
151}
152
153impl<'a> IntoIterator for BucketsView<&'a [Bucket]> {
154 type Item = BucketView<'a>;
155 type IntoIter = BucketsViewIter<'a>;
156
157 fn into_iter(self) -> Self::IntoIter {
158 BucketsViewIter::new(self.inner, self.start, self.end)
159 }
160}
161
162impl<'a, T> IntoIterator for &'a BucketsView<T>
163where
164 T: AsRef<[Bucket]>,
165{
166 type Item = BucketView<'a>;
167 type IntoIter = BucketsViewIter<'a>;
168
169 fn into_iter(self) -> Self::IntoIter {
170 BucketsViewIter::new(self.inner.as_ref(), self.start, self.end)
171 }
172}
173
174pub struct BucketsViewIter<'a> {
178 inner: &'a [Bucket],
180 current: Index,
182 end: Index,
184}
185
186impl<'a> BucketsViewIter<'a> {
187 fn new(inner: &'a [Bucket], start: Index, end: Index) -> Self {
191 Self {
192 inner,
193 end,
194 current: start,
195 }
196 }
197}
198
199impl<'a> Iterator for BucketsViewIter<'a> {
200 type Item = BucketView<'a>;
201
202 fn next(&mut self) -> Option<Self::Item> {
203 if self.current.slice == self.end.slice && self.end.bucket == 0 {
205 return None;
206 }
207 if self.current.slice > self.end.slice {
209 return None;
210 }
211
212 debug_assert!(
214 self.current.slice < self.inner.len(),
215 "invariant violated, iterator pointing past the slice"
216 );
217 let next = self.inner.get(self.current.slice)?;
218
219 let end = match self.current.slice == self.end.slice {
221 false => next.value.len(),
222 true => self.end.bucket,
223 };
224
225 let next = BucketView::new(next).select(self.current.bucket..end);
226 let Some(next) = next else {
227 debug_assert!(false, "invariant violated, invalid bucket split");
228 relay_log::error!(
229 "Internal invariant violated, invalid bucket split, dropping all remaining buckets."
230 );
231 return None;
232 };
233
234 self.current = Index {
237 slice: self.current.slice + 1,
238 bucket: 0,
239 };
240
241 Some(next)
242 }
243}
244
245pub struct BucketsViewBySizeIter<T> {
249 inner: T,
251 current: Index,
253 end: Index,
255 max_size_bytes: usize,
257}
258
259impl<T> BucketsViewBySizeIter<T> {
260 fn new(inner: T, start: Index, end: Index, max_size_bytes: usize) -> Self {
264 Self {
265 inner,
266 end,
267 current: start,
268 max_size_bytes,
269 }
270 }
271}
272
273impl<T> Iterator for BucketsViewBySizeIter<T>
274where
275 T: AsRef<[Bucket]>,
276 T: Clone,
277{
278 type Item = BucketsView<T>;
279
280 fn next(&mut self) -> Option<Self::Item> {
281 let start = self.current;
282
283 let mut remaining_bytes = self.max_size_bytes;
284 loop {
285 if (self.current.slice > self.end.slice)
287 || (self.current.slice == self.end.slice && self.end.bucket == 0)
288 {
289 break;
290 }
291
292 let inner = self.inner.as_ref();
293 debug_assert!(
297 self.current.slice < inner.len(),
298 "invariant violated, iterator pointing past the slice"
299 );
300 let bucket = inner.get(self.current.slice)?;
301
302 let bucket = BucketView::new(bucket).select(self.current.bucket..bucket.value.len());
305 let Some(bucket) = bucket else {
306 debug_assert!(false, "internal invariant violated, invalid bucket split");
307 relay_log::error!(
308 "Internal invariant violated, invalid bucket split, dropping all remaining buckets."
309 );
310 return None;
311 };
312
313 match split(
314 &bucket,
315 remaining_bytes,
316 self.max_size_bytes / BUCKET_SPLIT_FACTOR,
317 ) {
318 SplitDecision::BucketFits(size) => {
319 remaining_bytes -= size;
320 self.current = Index {
321 slice: self.current.slice + 1,
322 bucket: 0,
323 };
324 continue;
325 }
326 SplitDecision::MoveToNextBatch => break,
327 SplitDecision::Split(n) => {
328 self.current = Index {
329 slice: self.current.slice,
330 bucket: self.current.bucket + n,
331 };
332 break;
333 }
334 }
335 }
336
337 if start == self.current {
338 return None;
341 }
342
343 Some(BucketsView {
346 inner: self.inner.clone(),
347 start,
348 end: self.current,
349 })
350 }
351}
352
353impl<T> Serialize for BucketsView<T>
354where
355 T: AsRef<[Bucket]>,
356{
357 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
358 where
359 S: serde::Serializer,
360 {
361 let mut state = serializer.serialize_seq(Some(self.len()))?;
362
363 for bucket in self.iter() {
364 state.serialize_element(&bucket)?;
365 }
366
367 state.end()
368 }
369}
370
371#[derive(Clone)]
384pub struct BucketView<'a> {
385 inner: &'a Bucket,
387 range: Range<usize>,
390}
391
392impl<'a> BucketView<'a> {
393 pub fn new(bucket: &'a Bucket) -> Self {
397 Self {
398 inner: bucket,
399 range: 0..bucket.value.len(),
400 }
401 }
402
403 pub fn timestamp(&self) -> UnixTimestamp {
407 self.inner.timestamp
408 }
409
410 pub fn width(&self) -> u64 {
414 self.inner.width
415 }
416
417 pub fn name(&self) -> &'a MetricName {
421 &self.inner.name
422 }
423
424 pub fn value(&self) -> BucketViewValue<'a> {
426 match &self.inner.value {
427 BucketValue::Counter(c) => BucketViewValue::Counter(*c),
428 BucketValue::Distribution(d) => BucketViewValue::Distribution(&d[self.range.clone()]),
429 BucketValue::Set(s) => BucketViewValue::Set(SetView::new(s, self.range.clone())),
430 BucketValue::Gauge(g) => BucketViewValue::Gauge(*g),
431 }
432 }
433
434 pub fn ty(&self) -> MetricType {
436 match &self.inner.value {
437 BucketValue::Counter(_) => MetricType::Counter,
438 BucketValue::Distribution(_) => MetricType::Distribution,
439 BucketValue::Set(_) => MetricType::Set,
440 BucketValue::Gauge(_) => MetricType::Gauge,
441 }
442 }
443
444 pub fn tags(&self) -> &'a BTreeMap<String, String> {
448 &self.inner.tags
449 }
450
451 pub fn tag(&self, name: &str) -> Option<&'a str> {
455 self.inner.tag(name)
456 }
457
458 pub fn metadata(&self) -> BucketMetadata {
476 let merges = if self.range.start == 0 {
477 self.inner.metadata.merges
478 } else {
479 0
480 };
481
482 BucketMetadata {
483 merges,
484 ..self.inner.metadata
485 }
486 }
487
488 pub fn len(&self) -> usize {
492 self.range.len()
493 }
494
495 pub fn is_empty(&self) -> bool {
497 self.len() == 0
498 }
499
500 pub fn select(mut self, range: Range<usize>) -> Option<Self> {
506 if range.start < self.range.start || range.end > self.range.end {
507 return None;
508 }
509
510 if !self.can_split() && range != (0..self.inner.value.len()) {
512 return None;
513 }
514
515 self.range = range;
516 Some(self)
517 }
518
519 fn estimated_base_size(&self) -> usize {
524 BUCKET_SIZE + self.name().len() + crate::utils::tags_cost(self.tags())
525 }
526
527 pub fn estimated_size(&self) -> usize {
533 self.estimated_base_size() + self.len() * AVG_VALUE_SIZE
534 }
535
536 pub fn split(self, size: usize, max_size: Option<usize>) -> (Option<Self>, Option<Self>) {
549 match split(&self, size, max_size.unwrap_or(0) / BUCKET_SPLIT_FACTOR) {
550 SplitDecision::BucketFits(_) => (Some(self), None),
551 SplitDecision::MoveToNextBatch => (None, Some(self)),
552 SplitDecision::Split(n) => {
553 let Range { start, end } = self.range;
554 let at = start + n;
555 (self.clone().select(start..at), self.select(at..end))
556 }
557 }
558 }
559
560 fn can_split(&self) -> bool {
564 matches!(
565 self.inner.value,
566 BucketValue::Distribution(_) | BucketValue::Set(_)
567 )
568 }
569
570 fn is_full_bucket(&self) -> bool {
572 self.range.start == 0 && self.range.end == self.inner.value.len()
573 }
574}
575
576impl fmt::Debug for BucketView<'_> {
577 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
578 f.debug_struct("BucketView")
579 .field("timestamp", &self.inner.timestamp)
580 .field("width", &self.inner.width)
581 .field("name", &self.inner.name)
582 .field("value", &self.value())
583 .field("tags", &self.inner.tags)
584 .finish()
585 }
586}
587
588impl<'a> From<&'a Bucket> for BucketView<'a> {
589 fn from(value: &'a Bucket) -> Self {
590 BucketView::new(value)
591 }
592}
593
594impl Serialize for BucketView<'_> {
595 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
596 where
597 S: serde::Serializer,
598 {
599 let Bucket {
600 timestamp,
601 width,
602 name,
603 value: _,
604 tags,
605 metadata,
606 } = self.inner;
607
608 let len = match tags.is_empty() {
609 true => 4,
610 false => 5,
611 };
612
613 let mut state = serializer.serialize_map(Some(len))?;
614
615 state.serialize_entry("timestamp", timestamp)?;
616 state.serialize_entry("width", width)?;
617 state.serialize_entry("name", name)?;
618
619 if self.is_full_bucket() {
620 self.inner
621 .value
622 .serialize(serde::__private::ser::FlatMapSerializer(&mut state))?;
623 } else {
624 self.value()
625 .serialize(serde::__private::ser::FlatMapSerializer(&mut state))?;
626 }
627
628 if !tags.is_empty() {
629 state.serialize_entry("tags", tags)?;
630 }
631 if !metadata.is_default() {
632 state.serialize_entry("metadata", metadata)?;
633 }
634
635 state.end()
636 }
637}
638
639#[derive(Debug, Clone, PartialEq, Serialize)]
641#[serde(tag = "type", content = "value")]
642pub enum BucketViewValue<'a> {
643 #[serde(rename = "c")]
647 Counter(CounterType),
648 #[serde(rename = "d")]
652 Distribution(&'a [DistributionType]),
653 #[serde(rename = "s")]
657 Set(SetView<'a>),
658 #[serde(rename = "g")]
662 Gauge(GaugeValue),
663}
664
665impl<'a> From<&'a BucketValue> for BucketViewValue<'a> {
666 fn from(value: &'a BucketValue) -> Self {
667 match value {
668 BucketValue::Counter(c) => BucketViewValue::Counter(*c),
669 BucketValue::Distribution(d) => BucketViewValue::Distribution(d),
670 BucketValue::Set(s) => BucketViewValue::Set(SetView::new(s, 0..s.len())),
671 BucketValue::Gauge(g) => BucketViewValue::Gauge(*g),
672 }
673 }
674}
675
676#[derive(Clone)]
678pub struct SetView<'a> {
679 source: &'a SetValue,
680 range: Range<usize>,
681}
682
683impl<'a> SetView<'a> {
684 fn new(source: &'a SetValue, range: Range<usize>) -> Self {
685 Self { source, range }
686 }
687
688 pub fn len(&self) -> usize {
690 self.range.len()
691 }
692
693 pub fn is_empty(&self) -> bool {
695 self.len() == 0
696 }
697
698 pub fn iter(&self) -> impl Iterator<Item = &SetType> {
700 self.source
701 .iter()
702 .skip(self.range.start)
703 .take(self.range.len())
704 }
705}
706
707impl PartialEq for SetView<'_> {
708 fn eq(&self, other: &Self) -> bool {
709 self.iter().eq(other.iter())
710 }
711}
712
713impl fmt::Debug for SetView<'_> {
714 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
715 f.debug_tuple("SetView")
716 .field(&self.iter().collect::<Vec<_>>())
717 .finish()
718 }
719}
720
721impl Serialize for SetView<'_> {
722 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
723 where
724 S: serde::Serializer,
725 {
726 let mut state = serializer.serialize_seq(Some(self.len()))?;
727
728 for item in self.iter() {
729 state.serialize_element(item)?;
730 }
731
732 state.end()
733 }
734}
735
736enum SplitDecision {
738 BucketFits(usize),
742 MoveToNextBatch,
744 Split(usize),
746}
747
748fn split(bucket: &BucketView<'_>, max_size: usize, min_split_size: usize) -> SplitDecision {
760 let bucket_size = bucket.estimated_size();
762 if max_size >= bucket_size {
763 return SplitDecision::BucketFits(bucket_size);
764 }
765
766 if !bucket.can_split() {
767 return SplitDecision::MoveToNextBatch;
768 }
769
770 let own_size = bucket.estimated_base_size();
773 if max_size < (own_size + AVG_VALUE_SIZE) {
774 return SplitDecision::MoveToNextBatch;
776 }
777
778 if bucket_size < min_split_size {
779 return SplitDecision::MoveToNextBatch;
780 }
781
782 let split_at = (max_size - own_size) / AVG_VALUE_SIZE;
785 SplitDecision::Split(split_at)
786}
787
788#[cfg(test)]
789mod tests {
790 use std::sync::Arc;
791
792 use insta::assert_json_snapshot;
793
794 use super::*;
795
796 #[test]
797 fn test_bucket_view_select_counter() {
798 let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap();
799
800 let view = BucketView::new(&bucket).select(0..1).unwrap();
801 assert_eq!(view.len(), 1);
802 assert_eq!(
803 serde_json::to_string(&view).unwrap(),
804 serde_json::to_string(&bucket).unwrap()
805 );
806 }
807
808 #[test]
809 fn test_bucket_view_select_invalid_counter() {
810 let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap();
811
812 assert!(BucketView::new(&bucket).select(0..0).is_none());
813 assert!(BucketView::new(&bucket).select(0..2).is_none());
814 assert!(BucketView::new(&bucket).select(1..1).is_none());
815 }
816
817 #[test]
818 fn test_bucket_view_counter_metadata() {
819 let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap();
820 assert_eq!(bucket.metadata, BucketView::new(&bucket).metadata());
821 }
822
823 #[test]
824 fn test_bucket_view_select_distribution() {
825 let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
826
827 let view = BucketView::new(&bucket).select(0..3).unwrap();
828 assert_eq!(view.len(), 3);
829 assert_eq!(
830 view.value(),
831 BucketViewValue::Distribution(&[1.into(), 2.into(), 3.into()])
832 );
833 let view = BucketView::new(&bucket).select(1..3).unwrap();
834 assert_eq!(view.len(), 2);
835 assert_eq!(
836 view.value(),
837 BucketViewValue::Distribution(&[2.into(), 3.into()])
838 );
839 let view = BucketView::new(&bucket).select(1..5).unwrap();
840 assert_eq!(view.len(), 4);
841 assert_eq!(
842 view.value(),
843 BucketViewValue::Distribution(&[2.into(), 3.into(), 5.into(), 5.into()])
844 );
845 }
846
847 #[test]
848 fn test_bucket_view_select_invalid_distribution() {
849 let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
850
851 assert!(BucketView::new(&bucket).select(0..6).is_none());
852 assert!(BucketView::new(&bucket).select(5..6).is_none());
853 assert!(BucketView::new(&bucket).select(77..99).is_none());
854 }
855
856 #[test]
857 fn test_bucket_view_distribution_metadata() {
858 let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
859 assert_eq!(bucket.metadata, BucketView::new(&bucket).metadata());
860
861 assert_eq!(
862 BucketView::new(&bucket).select(0..3).unwrap().metadata(),
863 bucket.metadata
864 );
865
866 let m = BucketView::new(&bucket).select(1..3).unwrap().metadata();
867 assert_eq!(
868 m,
869 BucketMetadata {
870 merges: 0,
871 ..bucket.metadata
872 }
873 );
874 }
875
876 #[test]
877 fn test_bucket_view_select_set() {
878 let bucket = Bucket::parse(b"b3:42:75|s", UnixTimestamp::from_secs(5000)).unwrap();
879 let s = [42, 75].into();
880
881 let view = BucketView::new(&bucket).select(0..2).unwrap();
882 assert_eq!(view.len(), 2);
883 assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 0..2)));
884 let view = BucketView::new(&bucket).select(1..2).unwrap();
885 assert_eq!(view.len(), 1);
886 assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 1..2)));
887 let view = BucketView::new(&bucket).select(0..1).unwrap();
888 assert_eq!(view.len(), 1);
889 assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 0..1)));
890 }
891
892 #[test]
893 fn test_bucket_view_select_invalid_set() {
894 let bucket = Bucket::parse(b"b3:42:75|s", UnixTimestamp::from_secs(5000)).unwrap();
895
896 assert!(BucketView::new(&bucket).select(0..3).is_none());
897 assert!(BucketView::new(&bucket).select(2..5).is_none());
898 assert!(BucketView::new(&bucket).select(77..99).is_none());
899 }
900
901 #[test]
902 fn test_bucket_view_set_metadata() {
903 let bucket = Bucket::parse(b"b2:1:2:3:5:5|s", UnixTimestamp::from_secs(5000)).unwrap();
904 assert_eq!(bucket.metadata, BucketView::new(&bucket).metadata());
905
906 assert_eq!(
907 BucketView::new(&bucket).select(0..3).unwrap().metadata(),
908 bucket.metadata
909 );
910
911 let m = BucketView::new(&bucket).select(1..3).unwrap().metadata();
912 assert_eq!(
913 m,
914 BucketMetadata {
915 merges: 0,
916 ..bucket.metadata
917 }
918 );
919 }
920
921 #[test]
922 fn test_bucket_view_select_gauge() {
923 let bucket =
924 Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap();
925
926 let view = BucketView::new(&bucket).select(0..5).unwrap();
927 assert_eq!(view.len(), 5);
928 assert_eq!(
929 view.value(),
930 BucketViewValue::Gauge(GaugeValue {
931 last: 25.into(),
932 min: 17.into(),
933 max: 42.into(),
934 sum: 220.into(),
935 count: 85
936 })
937 );
938 }
939
940 #[test]
941 fn test_bucket_view_select_invalid_gauge() {
942 let bucket =
943 Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap();
944
945 assert!(BucketView::new(&bucket).select(0..1).is_none());
946 assert!(BucketView::new(&bucket).select(0..4).is_none());
947 assert!(BucketView::new(&bucket).select(5..5).is_none());
948 assert!(BucketView::new(&bucket).select(5..6).is_none());
949 }
950
951 #[test]
952 fn test_bucket_view_gauge_metadata() {
953 let bucket =
954 Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap();
955 assert_eq!(BucketView::new(&bucket).metadata(), bucket.metadata);
956 }
957
958 fn buckets<T>(s: &[u8]) -> T
959 where
960 T: FromIterator<Bucket>,
961 {
962 let timestamp = UnixTimestamp::from_secs(5000);
963 Bucket::parse_all(s, timestamp)
964 .collect::<Result<T, _>>()
965 .unwrap()
966 }
967
968 #[test]
969 fn test_buckets_view_empty() {
970 let view = BucketsView::new(Vec::new());
971 assert_eq!(view.len(), 0);
972 assert!(view.is_empty());
973 let partials = view.iter().collect::<Vec<_>>();
974 assert!(partials.is_empty());
975 }
976
977 #[test]
978 fn test_buckets_view_iter_full() {
979 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
980
981 let view = BucketsView::from(&buckets);
982 assert_eq!(view.len(), 4);
983 assert!(!view.is_empty());
984 let partials = view.iter().collect::<Vec<_>>();
985 assert_eq!(partials.len(), 4);
986 assert_eq!(partials[0].name(), "c:custom/b0@none");
987 assert_eq!(partials[0].len(), 1);
988 assert_eq!(partials[1].name(), "c:custom/b1@none");
989 assert_eq!(partials[1].len(), 1);
990 assert_eq!(partials[2].name(), "d:custom/b2@none");
991 assert_eq!(partials[2].len(), 5);
992 assert_eq!(partials[3].name(), "s:custom/b3@none");
993 assert_eq!(partials[3].len(), 2);
994 }
995
996 #[test]
997 fn test_buckets_view_iter_partial_end() {
998 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
999
1000 let mut view = BucketsView::new(&buckets);
1001 view.end.slice = 2;
1002 view.end.bucket = 3;
1003 assert_eq!(view.len(), 3);
1004 assert!(!view.is_empty());
1005
1006 let partials = view.iter().collect::<Vec<_>>();
1007 assert_eq!(partials.len(), 3);
1008 assert_eq!(partials[0].name(), "c:custom/b0@none");
1009 assert_eq!(partials[0].len(), 1);
1010 assert_eq!(partials[1].name(), "c:custom/b1@none");
1011 assert_eq!(partials[1].len(), 1);
1012 assert_eq!(partials[2].name(), "d:custom/b2@none");
1013 assert_eq!(partials[2].len(), 3);
1014 }
1015
1016 #[test]
1017 fn test_buckets_view_iter_partial_start() {
1018 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1019
1020 let mut view = BucketsView::new(buckets);
1021 view.start.slice = 2;
1022 view.start.bucket = 3;
1023 assert_eq!(view.len(), 2);
1024 assert!(!view.is_empty());
1025
1026 let partials = view.iter().collect::<Vec<_>>();
1027 assert_eq!(partials.len(), 2);
1028 assert_eq!(partials[0].name(), "d:custom/b2@none");
1029 assert_eq!(partials[0].len(), 2);
1030 assert_eq!(partials[1].name(), "s:custom/b3@none");
1031 assert_eq!(partials[1].len(), 2);
1032 }
1033
1034 #[test]
1035 fn test_buckets_view_iter_partial_start_and_end() {
1036 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1037
1038 let mut view = BucketsView::from(&buckets);
1039 view.start.slice = 2;
1040 view.start.bucket = 1;
1041 view.end.slice = 3;
1042 view.end.bucket = 1;
1043 assert_eq!(view.len(), 2);
1044 assert!(!view.is_empty());
1045
1046 let partials = view.iter().collect::<Vec<_>>();
1047 assert_eq!(partials.len(), 2);
1048 assert_eq!(partials[0].name(), "d:custom/b2@none");
1049 assert_eq!(partials[0].len(), 4);
1050 assert_eq!(partials[1].name(), "s:custom/b3@none");
1051 assert_eq!(partials[1].len(), 1);
1052 }
1053
1054 #[test]
1055 fn test_buckets_view_by_size_small() {
1056 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1057
1058 let view = BucketsView::from(&buckets);
1059 let partials = view
1060 .by_size(100)
1061 .map(|bv| {
1062 let len: usize = bv.iter().map(|b| b.len()).sum();
1063 let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
1064 (len, size)
1065 })
1066 .collect::<Vec<_>>();
1067
1068 assert_eq!(partials, vec![(1, 74), (1, 74), (4, 98), (1, 74), (2, 82),]);
1069 }
1070
1071 #[test]
1072 fn test_buckets_view_by_size_small_as_arc() {
1073 let buckets: Arc<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1074
1075 let view = BucketsView::new(buckets);
1076 let partials = view
1077 .by_size(100)
1078 .map(|bv| {
1079 let len: usize = bv.iter().map(|b| b.len()).sum();
1080 let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
1081 (len, size)
1082 })
1083 .collect::<Vec<_>>();
1084
1085 assert_eq!(partials, vec![(1, 74), (1, 74), (4, 98), (1, 74), (2, 82),]);
1086 }
1087
1088 #[test]
1089 fn test_buckets_view_by_size_one_split() {
1090 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1091
1092 let view = BucketsView::from(&buckets);
1093 let partials = view
1094 .by_size(250)
1095 .map(|bv| {
1096 let len: usize = bv.iter().map(|b| b.len()).sum();
1097 let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
1098 (len, size)
1099 })
1100 .collect::<Vec<_>>();
1101
1102 assert_eq!(partials, vec![(6, 246), (3, 156)]);
1103 }
1104
1105 #[test]
1106 fn test_buckets_view_by_size_no_split() {
1107 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1108
1109 let view = BucketsView::from(&buckets);
1110 let partials = view
1111 .by_size(500)
1112 .map(|bv| {
1113 let len: usize = bv.iter().map(|b| b.len()).sum();
1114 let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
1115 (len, size)
1116 })
1117 .collect::<Vec<_>>();
1118
1119 assert_eq!(partials, vec![(9, 336)]);
1120 }
1121
1122 #[test]
1123 fn test_buckets_view_by_size_no_too_small_no_bucket_fits() {
1124 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1125
1126 let view = BucketsView::from(&buckets);
1127 let partials = view
1128 .by_size(50) .count();
1130
1131 assert_eq!(partials, 0);
1132 }
1133
1134 #[test]
1135 fn test_buckets_view_by_size_do_not_split_gauge() {
1136 let buckets: Vec<_> = buckets(b"transactions/foo:25:17:42:220:85|g");
1137
1138 let view = BucketsView::from(&buckets);
1139 let partials = view.by_size(100).count();
1142
1143 assert_eq!(partials, 0);
1144 }
1145
1146 #[test]
1147 fn test_buckets_view_serialize_full() {
1148 let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c|#foo,bar:baz\nb2:1:2:3:5:5|d|#foo,bar:baz b3:42:75|s\ntransactions/foo:25:17:42:220:85|g");
1149
1150 assert_eq!(
1151 serde_json::to_string(&BucketsView::from(&buckets)).unwrap(),
1152 serde_json::to_string(&buckets).unwrap()
1153 );
1154 }
1155
1156 #[test]
1157 fn test_buckets_view_serialize_partial() {
1158 let buckets: Arc<[_]> = buckets(
1159 b"b1:12|c|#foo,bar:baz\nb2:1:2:3:5:5|d|#foo,bar:baz\nb3:42:75|s\nb4:25:17:42:220:85|g",
1160 );
1161
1162 let view = BucketsView::new(buckets);
1163 let partials = view.by_size(178).collect::<Vec<_>>();
1166
1167 assert_json_snapshot!(partials);
1168 }
1169
1170 #[test]
1171 fn test_split_repeatedly() {
1172 let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
1173 let view = BucketView::new(&bucket);
1174
1175 let split_size = view.estimated_base_size() + 2 * AVG_VALUE_SIZE;
1177
1178 let (first, rest) = view.split(split_size, None);
1179 let (second, rest) = rest.unwrap().split(split_size, None);
1180 let (third, rest) = rest.unwrap().split(split_size, None);
1181
1182 assert_eq!(first.unwrap().range, 0..2);
1183 assert_eq!(second.unwrap().range, 2..4);
1184 assert_eq!(third.unwrap().range, 4..5);
1185 assert!(rest.is_none());
1186 }
1187}