use relay_common::time::UnixTimestamp;
use serde::ser::{SerializeMap, SerializeSeq};
use serde::Serialize;
use crate::{
BucketMetadata, CounterType, DistributionType, GaugeValue, MetricName, SetType, SetValue,
};
use relay_base_schema::metrics::MetricType;
use std::collections::BTreeMap;
use std::fmt;
use std::ops::Range;
use crate::bucket::Bucket;
use crate::BucketValue;
const BUCKET_SPLIT_FACTOR: usize = 32;
const BUCKET_SIZE: usize = 50;
const AVG_VALUE_SIZE: usize = 8;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct Index {
slice: usize,
bucket: usize,
}
#[derive(Clone, Copy)]
pub struct BucketsView<T> {
inner: T,
start: Index,
end: Index,
}
impl<T> BucketsView<T>
where
T: AsRef<[Bucket]>,
{
pub fn new(buckets: T) -> Self {
let len = buckets.as_ref().len();
Self {
inner: buckets,
start: Index {
slice: 0,
bucket: 0,
},
end: Index {
slice: len,
bucket: 0,
},
}
}
pub fn len(&self) -> usize {
let mut len = self.end.slice - self.start.slice;
if self.end.bucket != 0 {
len += 1;
}
len
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn as_slice(&self) -> BucketsView<&[Bucket]> {
BucketsView {
inner: self.inner.as_ref(),
start: self.start,
end: self.end,
}
}
pub fn iter(&self) -> BucketsViewIter<'_> {
BucketsViewIter::new(self.inner.as_ref(), self.start, self.end)
}
pub fn by_size(self, size_in_bytes: usize) -> BucketsViewBySizeIter<T> {
BucketsViewBySizeIter::new(self.inner, self.start, self.end, size_in_bytes)
}
}
impl<'a> From<&'a [Bucket]> for BucketsView<&'a [Bucket]> {
fn from(value: &'a [Bucket]) -> Self {
Self::new(value)
}
}
impl<'a> From<&'a Vec<Bucket>> for BucketsView<&'a [Bucket]> {
fn from(value: &'a Vec<Bucket>) -> Self {
Self::new(value.as_slice())
}
}
impl<T> fmt::Debug for BucketsView<T>
where
T: AsRef<[Bucket]>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let contents = self.iter().collect::<Vec<_>>();
f.debug_tuple("BucketsView").field(&contents).finish()
}
}
impl<'a> IntoIterator for BucketsView<&'a [Bucket]> {
type Item = BucketView<'a>;
type IntoIter = BucketsViewIter<'a>;
fn into_iter(self) -> Self::IntoIter {
BucketsViewIter::new(self.inner, self.start, self.end)
}
}
impl<'a, T> IntoIterator for &'a BucketsView<T>
where
T: AsRef<[Bucket]>,
{
type Item = BucketView<'a>;
type IntoIter = BucketsViewIter<'a>;
fn into_iter(self) -> Self::IntoIter {
BucketsViewIter::new(self.inner.as_ref(), self.start, self.end)
}
}
pub struct BucketsViewIter<'a> {
inner: &'a [Bucket],
current: Index,
end: Index,
}
impl<'a> BucketsViewIter<'a> {
fn new(inner: &'a [Bucket], start: Index, end: Index) -> Self {
Self {
inner,
end,
current: start,
}
}
}
impl<'a> Iterator for BucketsViewIter<'a> {
type Item = BucketView<'a>;
fn next(&mut self) -> Option<Self::Item> {
if self.current.slice == self.end.slice && self.end.bucket == 0 {
return None;
}
if self.current.slice > self.end.slice {
return None;
}
debug_assert!(
self.current.slice < self.inner.len(),
"invariant violated, iterator pointing past the slice"
);
let next = self.inner.get(self.current.slice)?;
let end = match self.current.slice == self.end.slice {
false => next.value.len(),
true => self.end.bucket,
};
let next = BucketView::new(next).select(self.current.bucket..end);
let Some(next) = next else {
debug_assert!(false, "invariant violated, invalid bucket split");
relay_log::error!("Internal invariant violated, invalid bucket split, dropping all remaining buckets.");
return None;
};
self.current = Index {
slice: self.current.slice + 1,
bucket: 0,
};
Some(next)
}
}
pub struct BucketsViewBySizeIter<T> {
inner: T,
current: Index,
end: Index,
max_size_bytes: usize,
}
impl<T> BucketsViewBySizeIter<T> {
fn new(inner: T, start: Index, end: Index, max_size_bytes: usize) -> Self {
Self {
inner,
end,
current: start,
max_size_bytes,
}
}
}
impl<T> Iterator for BucketsViewBySizeIter<T>
where
T: AsRef<[Bucket]>,
T: Clone,
{
type Item = BucketsView<T>;
fn next(&mut self) -> Option<Self::Item> {
let start = self.current;
let mut remaining_bytes = self.max_size_bytes;
loop {
if (self.current.slice > self.end.slice)
|| (self.current.slice == self.end.slice && self.end.bucket == 0)
{
break;
}
let inner = self.inner.as_ref();
debug_assert!(
self.current.slice < inner.len(),
"invariant violated, iterator pointing past the slice"
);
let bucket = inner.get(self.current.slice)?;
let bucket = BucketView::new(bucket).select(self.current.bucket..bucket.value.len());
let Some(bucket) = bucket else {
debug_assert!(false, "internal invariant violated, invalid bucket split");
relay_log::error!("Internal invariant violated, invalid bucket split, dropping all remaining buckets.");
return None;
};
match split(
&bucket,
remaining_bytes,
self.max_size_bytes / BUCKET_SPLIT_FACTOR,
) {
SplitDecision::BucketFits(size) => {
remaining_bytes -= size;
self.current = Index {
slice: self.current.slice + 1,
bucket: 0,
};
continue;
}
SplitDecision::MoveToNextBatch => break,
SplitDecision::Split(n) => {
self.current = Index {
slice: self.current.slice,
bucket: self.current.bucket + n,
};
break;
}
}
}
if start == self.current {
return None;
}
Some(BucketsView {
inner: self.inner.clone(),
start,
end: self.current,
})
}
}
impl<T> Serialize for BucketsView<T>
where
T: AsRef<[Bucket]>,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut state = serializer.serialize_seq(Some(self.len()))?;
for bucket in self.iter() {
state.serialize_element(&bucket)?;
}
state.end()
}
}
#[derive(Clone)]
pub struct BucketView<'a> {
inner: &'a Bucket,
range: Range<usize>,
}
impl<'a> BucketView<'a> {
pub fn new(bucket: &'a Bucket) -> Self {
Self {
inner: bucket,
range: 0..bucket.value.len(),
}
}
pub fn timestamp(&self) -> UnixTimestamp {
self.inner.timestamp
}
pub fn width(&self) -> u64 {
self.inner.width
}
pub fn name(&self) -> &'a MetricName {
&self.inner.name
}
pub fn value(&self) -> BucketViewValue<'a> {
match &self.inner.value {
BucketValue::Counter(c) => BucketViewValue::Counter(*c),
BucketValue::Distribution(d) => BucketViewValue::Distribution(&d[self.range.clone()]),
BucketValue::Set(s) => BucketViewValue::Set(SetView::new(s, self.range.clone())),
BucketValue::Gauge(g) => BucketViewValue::Gauge(*g),
}
}
pub fn ty(&self) -> MetricType {
match &self.inner.value {
BucketValue::Counter(_) => MetricType::Counter,
BucketValue::Distribution(_) => MetricType::Distribution,
BucketValue::Set(_) => MetricType::Set,
BucketValue::Gauge(_) => MetricType::Gauge,
}
}
pub fn tags(&self) -> &'a BTreeMap<String, String> {
&self.inner.tags
}
pub fn tag(&self, name: &str) -> Option<&'a str> {
self.inner.tag(name)
}
pub fn metadata(&self) -> BucketMetadata {
let merges = if self.range.start == 0 {
self.inner.metadata.merges
} else {
0
};
BucketMetadata {
merges,
..self.inner.metadata
}
}
pub fn len(&self) -> usize {
self.range.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn select(mut self, range: Range<usize>) -> Option<Self> {
if range.start < self.range.start || range.end > self.range.end {
return None;
}
if !self.can_split() && range != (0..self.inner.value.len()) {
return None;
}
self.range = range;
Some(self)
}
fn estimated_base_size(&self) -> usize {
BUCKET_SIZE + self.name().len() + crate::utils::tags_cost(self.tags())
}
pub fn estimated_size(&self) -> usize {
self.estimated_base_size() + self.len() * AVG_VALUE_SIZE
}
pub fn split(self, size: usize, max_size: Option<usize>) -> (Option<Self>, Option<Self>) {
match split(&self, size, max_size.unwrap_or(0) / BUCKET_SPLIT_FACTOR) {
SplitDecision::BucketFits(_) => (Some(self), None),
SplitDecision::MoveToNextBatch => (None, Some(self)),
SplitDecision::Split(n) => {
let Range { start, end } = self.range;
let at = start + n;
(self.clone().select(start..at), self.select(at..end))
}
}
}
fn can_split(&self) -> bool {
matches!(
self.inner.value,
BucketValue::Distribution(_) | BucketValue::Set(_)
)
}
fn is_full_bucket(&self) -> bool {
self.range.start == 0 && self.range.end == self.inner.value.len()
}
}
impl fmt::Debug for BucketView<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BucketView")
.field("timestamp", &self.inner.timestamp)
.field("width", &self.inner.width)
.field("name", &self.inner.name)
.field("value", &self.value())
.field("tags", &self.inner.tags)
.finish()
}
}
impl<'a> From<&'a Bucket> for BucketView<'a> {
fn from(value: &'a Bucket) -> Self {
BucketView::new(value)
}
}
impl Serialize for BucketView<'_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let Bucket {
timestamp,
width,
name,
value: _,
tags,
metadata,
} = self.inner;
let len = match tags.is_empty() {
true => 4,
false => 5,
};
let mut state = serializer.serialize_map(Some(len))?;
state.serialize_entry("timestamp", timestamp)?;
state.serialize_entry("width", width)?;
state.serialize_entry("name", name)?;
if self.is_full_bucket() {
self.inner
.value
.serialize(serde::__private::ser::FlatMapSerializer(&mut state))?;
} else {
self.value()
.serialize(serde::__private::ser::FlatMapSerializer(&mut state))?;
}
if !tags.is_empty() {
state.serialize_entry("tags", tags)?;
}
if !metadata.is_default() {
state.serialize_entry("metadata", metadata)?;
}
state.end()
}
}
#[derive(Debug, Clone, PartialEq, Serialize)]
#[serde(tag = "type", content = "value")]
pub enum BucketViewValue<'a> {
#[serde(rename = "c")]
Counter(CounterType),
#[serde(rename = "d")]
Distribution(&'a [DistributionType]),
#[serde(rename = "s")]
Set(SetView<'a>),
#[serde(rename = "g")]
Gauge(GaugeValue),
}
impl<'a> From<&'a BucketValue> for BucketViewValue<'a> {
fn from(value: &'a BucketValue) -> Self {
match value {
BucketValue::Counter(c) => BucketViewValue::Counter(*c),
BucketValue::Distribution(d) => BucketViewValue::Distribution(d),
BucketValue::Set(s) => BucketViewValue::Set(SetView::new(s, 0..s.len())),
BucketValue::Gauge(g) => BucketViewValue::Gauge(*g),
}
}
}
#[derive(Clone)]
pub struct SetView<'a> {
source: &'a SetValue,
range: Range<usize>,
}
impl<'a> SetView<'a> {
fn new(source: &'a SetValue, range: Range<usize>) -> Self {
Self { source, range }
}
pub fn len(&self) -> usize {
self.range.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn iter(&self) -> impl Iterator<Item = &SetType> {
self.source
.iter()
.skip(self.range.start)
.take(self.range.len())
}
}
impl PartialEq for SetView<'_> {
fn eq(&self, other: &Self) -> bool {
self.iter().eq(other.iter())
}
}
impl fmt::Debug for SetView<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("SetView")
.field(&self.iter().collect::<Vec<_>>())
.finish()
}
}
impl Serialize for SetView<'_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut state = serializer.serialize_seq(Some(self.len()))?;
for item in self.iter() {
state.serialize_element(item)?;
}
state.end()
}
}
enum SplitDecision {
BucketFits(usize),
MoveToNextBatch,
Split(usize),
}
fn split(bucket: &BucketView<'_>, max_size: usize, min_split_size: usize) -> SplitDecision {
let bucket_size = bucket.estimated_size();
if max_size >= bucket_size {
return SplitDecision::BucketFits(bucket_size);
}
if !bucket.can_split() {
return SplitDecision::MoveToNextBatch;
}
let own_size = bucket.estimated_base_size();
if max_size < (own_size + AVG_VALUE_SIZE) {
return SplitDecision::MoveToNextBatch;
}
if bucket_size < min_split_size {
return SplitDecision::MoveToNextBatch;
}
let split_at = (max_size - own_size) / AVG_VALUE_SIZE;
SplitDecision::Split(split_at)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use insta::assert_json_snapshot;
use super::*;
#[test]
fn test_bucket_view_select_counter() {
let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap();
let view = BucketView::new(&bucket).select(0..1).unwrap();
assert_eq!(view.len(), 1);
assert_eq!(
serde_json::to_string(&view).unwrap(),
serde_json::to_string(&bucket).unwrap()
);
}
#[test]
fn test_bucket_view_select_invalid_counter() {
let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap();
assert!(BucketView::new(&bucket).select(0..0).is_none());
assert!(BucketView::new(&bucket).select(0..2).is_none());
assert!(BucketView::new(&bucket).select(1..1).is_none());
}
#[test]
fn test_bucket_view_counter_metadata() {
let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap();
assert_eq!(bucket.metadata, BucketView::new(&bucket).metadata());
}
#[test]
fn test_bucket_view_select_distribution() {
let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
let view = BucketView::new(&bucket).select(0..3).unwrap();
assert_eq!(view.len(), 3);
assert_eq!(
view.value(),
BucketViewValue::Distribution(&[1.into(), 2.into(), 3.into()])
);
let view = BucketView::new(&bucket).select(1..3).unwrap();
assert_eq!(view.len(), 2);
assert_eq!(
view.value(),
BucketViewValue::Distribution(&[2.into(), 3.into()])
);
let view = BucketView::new(&bucket).select(1..5).unwrap();
assert_eq!(view.len(), 4);
assert_eq!(
view.value(),
BucketViewValue::Distribution(&[2.into(), 3.into(), 5.into(), 5.into()])
);
}
#[test]
fn test_bucket_view_select_invalid_distribution() {
let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
assert!(BucketView::new(&bucket).select(0..6).is_none());
assert!(BucketView::new(&bucket).select(5..6).is_none());
assert!(BucketView::new(&bucket).select(77..99).is_none());
}
#[test]
fn test_bucket_view_distribution_metadata() {
let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
assert_eq!(bucket.metadata, BucketView::new(&bucket).metadata());
assert_eq!(
BucketView::new(&bucket).select(0..3).unwrap().metadata(),
bucket.metadata
);
let m = BucketView::new(&bucket).select(1..3).unwrap().metadata();
assert_eq!(
m,
BucketMetadata {
merges: 0,
..bucket.metadata
}
);
}
#[test]
fn test_bucket_view_select_set() {
let bucket = Bucket::parse(b"b3:42:75|s", UnixTimestamp::from_secs(5000)).unwrap();
let s = [42, 75].into();
let view = BucketView::new(&bucket).select(0..2).unwrap();
assert_eq!(view.len(), 2);
assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 0..2)));
let view = BucketView::new(&bucket).select(1..2).unwrap();
assert_eq!(view.len(), 1);
assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 1..2)));
let view = BucketView::new(&bucket).select(0..1).unwrap();
assert_eq!(view.len(), 1);
assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 0..1)));
}
#[test]
fn test_bucket_view_select_invalid_set() {
let bucket = Bucket::parse(b"b3:42:75|s", UnixTimestamp::from_secs(5000)).unwrap();
assert!(BucketView::new(&bucket).select(0..3).is_none());
assert!(BucketView::new(&bucket).select(2..5).is_none());
assert!(BucketView::new(&bucket).select(77..99).is_none());
}
#[test]
fn test_bucket_view_set_metadata() {
let bucket = Bucket::parse(b"b2:1:2:3:5:5|s", UnixTimestamp::from_secs(5000)).unwrap();
assert_eq!(bucket.metadata, BucketView::new(&bucket).metadata());
assert_eq!(
BucketView::new(&bucket).select(0..3).unwrap().metadata(),
bucket.metadata
);
let m = BucketView::new(&bucket).select(1..3).unwrap().metadata();
assert_eq!(
m,
BucketMetadata {
merges: 0,
..bucket.metadata
}
);
}
#[test]
fn test_bucket_view_select_gauge() {
let bucket =
Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap();
let view = BucketView::new(&bucket).select(0..5).unwrap();
assert_eq!(view.len(), 5);
assert_eq!(
view.value(),
BucketViewValue::Gauge(GaugeValue {
last: 25.into(),
min: 17.into(),
max: 42.into(),
sum: 220.into(),
count: 85
})
);
}
#[test]
fn test_bucket_view_select_invalid_gauge() {
let bucket =
Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap();
assert!(BucketView::new(&bucket).select(0..1).is_none());
assert!(BucketView::new(&bucket).select(0..4).is_none());
assert!(BucketView::new(&bucket).select(5..5).is_none());
assert!(BucketView::new(&bucket).select(5..6).is_none());
}
#[test]
fn test_bucket_view_gauge_metadata() {
let bucket =
Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap();
assert_eq!(BucketView::new(&bucket).metadata(), bucket.metadata);
}
fn buckets<T>(s: &[u8]) -> T
where
T: FromIterator<Bucket>,
{
let timestamp = UnixTimestamp::from_secs(5000);
Bucket::parse_all(s, timestamp)
.collect::<Result<T, _>>()
.unwrap()
}
#[test]
fn test_buckets_view_empty() {
let view = BucketsView::new(Vec::new());
assert_eq!(view.len(), 0);
assert!(view.is_empty());
let partials = view.iter().collect::<Vec<_>>();
assert!(partials.is_empty());
}
#[test]
fn test_buckets_view_iter_full() {
let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
let view = BucketsView::from(&buckets);
assert_eq!(view.len(), 4);
assert!(!view.is_empty());
let partials = view.iter().collect::<Vec<_>>();
assert_eq!(partials.len(), 4);
assert_eq!(partials[0].name(), "c:custom/b0@none");
assert_eq!(partials[0].len(), 1);
assert_eq!(partials[1].name(), "c:custom/b1@none");
assert_eq!(partials[1].len(), 1);
assert_eq!(partials[2].name(), "d:custom/b2@none");
assert_eq!(partials[2].len(), 5);
assert_eq!(partials[3].name(), "s:custom/b3@none");
assert_eq!(partials[3].len(), 2);
}
#[test]
fn test_buckets_view_iter_partial_end() {
let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
let mut view = BucketsView::new(&buckets);
view.end.slice = 2;
view.end.bucket = 3;
assert_eq!(view.len(), 3);
assert!(!view.is_empty());
let partials = view.iter().collect::<Vec<_>>();
assert_eq!(partials.len(), 3);
assert_eq!(partials[0].name(), "c:custom/b0@none");
assert_eq!(partials[0].len(), 1);
assert_eq!(partials[1].name(), "c:custom/b1@none");
assert_eq!(partials[1].len(), 1);
assert_eq!(partials[2].name(), "d:custom/b2@none");
assert_eq!(partials[2].len(), 3);
}
#[test]
fn test_buckets_view_iter_partial_start() {
let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
let mut view = BucketsView::new(buckets);
view.start.slice = 2;
view.start.bucket = 3;
assert_eq!(view.len(), 2);
assert!(!view.is_empty());
let partials = view.iter().collect::<Vec<_>>();
assert_eq!(partials.len(), 2);
assert_eq!(partials[0].name(), "d:custom/b2@none");
assert_eq!(partials[0].len(), 2);
assert_eq!(partials[1].name(), "s:custom/b3@none");
assert_eq!(partials[1].len(), 2);
}
#[test]
fn test_buckets_view_iter_partial_start_and_end() {
let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
let mut view = BucketsView::from(&buckets);
view.start.slice = 2;
view.start.bucket = 1;
view.end.slice = 3;
view.end.bucket = 1;
assert_eq!(view.len(), 2);
assert!(!view.is_empty());
let partials = view.iter().collect::<Vec<_>>();
assert_eq!(partials.len(), 2);
assert_eq!(partials[0].name(), "d:custom/b2@none");
assert_eq!(partials[0].len(), 4);
assert_eq!(partials[1].name(), "s:custom/b3@none");
assert_eq!(partials[1].len(), 1);
}
#[test]
fn test_buckets_view_by_size_small() {
let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
let view = BucketsView::from(&buckets);
let partials = view
.by_size(100)
.map(|bv| {
let len: usize = bv.iter().map(|b| b.len()).sum();
let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
(len, size)
})
.collect::<Vec<_>>();
assert_eq!(partials, vec![(1, 74), (1, 74), (4, 98), (1, 74), (2, 82),]);
}
#[test]
fn test_buckets_view_by_size_small_as_arc() {
let buckets: Arc<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
let view = BucketsView::new(buckets);
let partials = view
.by_size(100)
.map(|bv| {
let len: usize = bv.iter().map(|b| b.len()).sum();
let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
(len, size)
})
.collect::<Vec<_>>();
assert_eq!(partials, vec![(1, 74), (1, 74), (4, 98), (1, 74), (2, 82),]);
}
#[test]
fn test_buckets_view_by_size_one_split() {
let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
let view = BucketsView::from(&buckets);
let partials = view
.by_size(250)
.map(|bv| {
let len: usize = bv.iter().map(|b| b.len()).sum();
let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
(len, size)
})
.collect::<Vec<_>>();
assert_eq!(partials, vec![(6, 246), (3, 156)]);
}
#[test]
fn test_buckets_view_by_size_no_split() {
let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
let view = BucketsView::from(&buckets);
let partials = view
.by_size(500)
.map(|bv| {
let len: usize = bv.iter().map(|b| b.len()).sum();
let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
(len, size)
})
.collect::<Vec<_>>();
assert_eq!(partials, vec![(9, 336)]);
}
#[test]
fn test_buckets_view_by_size_no_too_small_no_bucket_fits() {
let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
let view = BucketsView::from(&buckets);
let partials = view
.by_size(50) .count();
assert_eq!(partials, 0);
}
#[test]
fn test_buckets_view_by_size_do_not_split_gauge() {
let buckets: Vec<_> = buckets(b"transactions/foo:25:17:42:220:85|g");
let view = BucketsView::from(&buckets);
let partials = view.by_size(100).count();
assert_eq!(partials, 0);
}
#[test]
fn test_buckets_view_serialize_full() {
let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c|#foo,bar:baz\nb2:1:2:3:5:5|d|#foo,bar:baz b3:42:75|s\ntransactions/foo:25:17:42:220:85|g");
assert_eq!(
serde_json::to_string(&BucketsView::from(&buckets)).unwrap(),
serde_json::to_string(&buckets).unwrap()
);
}
#[test]
fn test_buckets_view_serialize_partial() {
let buckets: Arc<[_]> = buckets(
b"b1:12|c|#foo,bar:baz\nb2:1:2:3:5:5|d|#foo,bar:baz\nb3:42:75|s\nb4:25:17:42:220:85|g",
);
let view = BucketsView::new(buckets);
let partials = view.by_size(178).collect::<Vec<_>>();
assert_json_snapshot!(partials);
}
#[test]
fn test_split_repeatedly() {
let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
let view = BucketView::new(&bucket);
let split_size = view.estimated_base_size() + 2 * AVG_VALUE_SIZE;
let (first, rest) = view.split(split_size, None);
let (second, rest) = rest.unwrap().split(split_size, None);
let (third, rest) = rest.unwrap().split(split_size, None);
assert_eq!(first.unwrap().range, 0..2);
assert_eq!(second.unwrap().range, 2..4);
assert_eq!(third.unwrap().range, 4..5);
assert!(rest.is_none());
}
}