relay_metrics/
view.rs

1use relay_common::time::UnixTimestamp;
2use serde::ser::{SerializeMap, SerializeSeq};
3use serde::Serialize;
4
5use crate::{
6    BucketMetadata, CounterType, DistributionType, GaugeValue, MetricName, SetType, SetValue,
7};
8use relay_base_schema::metrics::MetricType;
9use std::collections::BTreeMap;
10use std::fmt;
11use std::ops::Range;
12
13use crate::bucket::Bucket;
14use crate::BucketValue;
15
16/// The fraction of size passed to [`BucketsView::by_size()`] at which buckets will be split. A value of
17/// `2` means that all buckets smaller than half of `metrics_max_batch_size` will be moved in their entirety,
18/// and buckets larger will be split up.
19const BUCKET_SPLIT_FACTOR: usize = 32;
20
21/// The base size of a serialized bucket in bytes.
22///
23/// This is the size of a bucket's fixed fields in JSON format, excluding the value and tags.
24const BUCKET_SIZE: usize = 50;
25
26/// The average size of values when serialized.
27const AVG_VALUE_SIZE: usize = 8;
28
29/// An internal type representing an index into a slice of buckets.
30///
31/// Note: the meaning of fields depends on the context of the index.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33struct Index {
34    /// Index into the slice.
35    slice: usize,
36    /// Index into a bucket.
37    bucket: usize,
38}
39
40/// A view into a slice of metric buckets.
41///
42/// The view can be used to iterate over a large slice
43/// of metric data slicing even into the buckets themselves.
44///
45/// ```txt
46///                    Full View
47///  /---------------------------------------------\
48/// [[C:1], [C:12], [D:0, 1, 2, 3, 5, 5], [S:42, 75]]
49///  \--------------------------/  \---------------/
50///            View 1                    View 2
51/// ```
52///
53/// Iterating over a [`BucketsView`] yields [`BucketView`] items,
54/// only the first and last elements may be partial buckets.
55///
56/// In the above example `View 1` has a partial bucket at the end and
57/// `View 2` has a partial bucket in the beginning.
58///
59/// Using the above example, iterating over `View 1` yields the buckets:
60/// `[C:1], [C:12], [D:0, 1, 2, 3]`.
61#[derive(Clone, Copy)]
62pub struct BucketsView<T> {
63    /// The contained buckets.
64    inner: T,
65    /// Start index.
66    ///
67    /// - Slice index indicates bucket.
68    /// - Bucket index indicates offset in the selected bucket.
69    start: Index,
70    /// End index.
71    ///
72    /// - Slice index indicates exclusive end.
73    /// - Bucket index, indicates offset into the *next* bucket past the end.
74    end: Index,
75}
76
77impl<T> BucketsView<T>
78where
79    T: AsRef<[Bucket]>,
80{
81    /// Creates a new buckets view containing all data from the slice.
82    pub fn new(buckets: T) -> Self {
83        let len = buckets.as_ref().len();
84        Self {
85            inner: buckets,
86            start: Index {
87                slice: 0,
88                bucket: 0,
89            },
90            end: Index {
91                slice: len,
92                bucket: 0,
93            },
94        }
95    }
96
97    /// Returns the amount of partial or full buckets in the view.
98    pub fn len(&self) -> usize {
99        let mut len = self.end.slice - self.start.slice;
100        if self.end.bucket != 0 {
101            len += 1;
102        }
103        len
104    }
105
106    /// Returns whether the view contains any buckets.
107    pub fn is_empty(&self) -> bool {
108        self.len() == 0
109    }
110
111    /// Returns the same bucket view as a bucket view over a slice.
112    pub fn as_slice(&self) -> BucketsView<&[Bucket]> {
113        BucketsView {
114            inner: self.inner.as_ref(),
115            start: self.start,
116            end: self.end,
117        }
118    }
119
120    /// Iterator over all buckets in the view.
121    pub fn iter(&self) -> BucketsViewIter<'_> {
122        BucketsViewIter::new(self.inner.as_ref(), self.start, self.end)
123    }
124
125    /// Iterator which slices the source view into segments with an approximate size of `size_in_bytes`.
126    pub fn by_size(self, size_in_bytes: usize) -> BucketsViewBySizeIter<T> {
127        BucketsViewBySizeIter::new(self.inner, self.start, self.end, size_in_bytes)
128    }
129}
130
131impl<'a> From<&'a [Bucket]> for BucketsView<&'a [Bucket]> {
132    fn from(value: &'a [Bucket]) -> Self {
133        Self::new(value)
134    }
135}
136
137impl<'a> From<&'a Vec<Bucket>> for BucketsView<&'a [Bucket]> {
138    fn from(value: &'a Vec<Bucket>) -> Self {
139        Self::new(value.as_slice())
140    }
141}
142
143impl<T> fmt::Debug for BucketsView<T>
144where
145    T: AsRef<[Bucket]>,
146{
147    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148        let contents = self.iter().collect::<Vec<_>>();
149        f.debug_tuple("BucketsView").field(&contents).finish()
150    }
151}
152
153impl<'a> IntoIterator for BucketsView<&'a [Bucket]> {
154    type Item = BucketView<'a>;
155    type IntoIter = BucketsViewIter<'a>;
156
157    fn into_iter(self) -> Self::IntoIter {
158        BucketsViewIter::new(self.inner, self.start, self.end)
159    }
160}
161
162impl<'a, T> IntoIterator for &'a BucketsView<T>
163where
164    T: AsRef<[Bucket]>,
165{
166    type Item = BucketView<'a>;
167    type IntoIter = BucketsViewIter<'a>;
168
169    fn into_iter(self) -> Self::IntoIter {
170        BucketsViewIter::new(self.inner.as_ref(), self.start, self.end)
171    }
172}
173
174/// Iterator yielding all items contained in a [`BucketsView`].
175///
176/// First and/or last item may be partial buckets.
177pub struct BucketsViewIter<'a> {
178    /// Source slice of buckets.
179    inner: &'a [Bucket],
180    /// Current index.
181    current: Index,
182    /// End index.
183    end: Index,
184}
185
186impl<'a> BucketsViewIter<'a> {
187    /// Creates a new iterator.
188    ///
189    /// Start and end must be valid indices or iterator may end early.
190    fn new(inner: &'a [Bucket], start: Index, end: Index) -> Self {
191        Self {
192            inner,
193            end,
194            current: start,
195        }
196    }
197}
198
199impl<'a> Iterator for BucketsViewIter<'a> {
200    type Item = BucketView<'a>;
201
202    fn next(&mut self) -> Option<Self::Item> {
203        // We reached the exact end, there is no sub-bucket index.
204        if self.current.slice == self.end.slice && self.end.bucket == 0 {
205            return None;
206        }
207        // We are way past, including sub-bucket offset.
208        if self.current.slice > self.end.slice {
209            return None;
210        }
211
212        // This doesn't overflow because the last bucket in the inner slice will always have a 0 bucket index.
213        debug_assert!(
214            self.current.slice < self.inner.len(),
215            "invariant violated, iterator pointing past the slice"
216        );
217        let next = self.inner.get(self.current.slice)?;
218
219        // Choose the bucket end, this will always be the full bucket except if it is the last.
220        let end = match self.current.slice == self.end.slice {
221            false => next.value.len(),
222            true => self.end.bucket,
223        };
224
225        let next = BucketView::new(next).select(self.current.bucket..end);
226        let Some(next) = next else {
227            debug_assert!(false, "invariant violated, invalid bucket split");
228            relay_log::error!("Internal invariant violated, invalid bucket split, dropping all remaining buckets.");
229            return None;
230        };
231
232        // Even if the current Bucket was partial, the next one will be full,
233        // except if it is the last one.
234        self.current = Index {
235            slice: self.current.slice + 1,
236            bucket: 0,
237        };
238
239        Some(next)
240    }
241}
242
243/// Iterator slicing a [`BucketsView`] into smaller views constrained by a given size in bytes.
244///
245// See [`estimate_size`] for how the size of a bucket is calculated.
246pub struct BucketsViewBySizeIter<T> {
247    /// Source slice.
248    inner: T,
249    /// Current position in the slice.
250    current: Index,
251    /// Terminal position.
252    end: Index,
253    /// Maximum size in bytes of each slice.
254    max_size_bytes: usize,
255}
256
257impl<T> BucketsViewBySizeIter<T> {
258    /// Creates a new iterator.
259    ///
260    /// Start and end must be valid indices or iterator may end early.
261    fn new(inner: T, start: Index, end: Index, max_size_bytes: usize) -> Self {
262        Self {
263            inner,
264            end,
265            current: start,
266            max_size_bytes,
267        }
268    }
269}
270
271impl<T> Iterator for BucketsViewBySizeIter<T>
272where
273    T: AsRef<[Bucket]>,
274    T: Clone,
275{
276    type Item = BucketsView<T>;
277
278    fn next(&mut self) -> Option<Self::Item> {
279        let start = self.current;
280
281        let mut remaining_bytes = self.max_size_bytes;
282        loop {
283            // Make sure, we don't shoot past the end ...
284            if (self.current.slice > self.end.slice)
285                || (self.current.slice == self.end.slice && self.end.bucket == 0)
286            {
287                break;
288            }
289
290            let inner = self.inner.as_ref();
291            // Select next potential bucket,
292            // this should never overflow because `end` will never go past the slice and
293            // we just validated that current is constrained by end.
294            debug_assert!(
295                self.current.slice < inner.len(),
296                "invariant violated, iterator pointing past the slice"
297            );
298            let bucket = inner.get(self.current.slice)?;
299
300            // Selection should never fail, because either we select the entire range,
301            // or we previously already split the bucket, which means this range is good.
302            let bucket = BucketView::new(bucket).select(self.current.bucket..bucket.value.len());
303            let Some(bucket) = bucket else {
304                debug_assert!(false, "internal invariant violated, invalid bucket split");
305                relay_log::error!("Internal invariant violated, invalid bucket split, dropping all remaining buckets.");
306                return None;
307            };
308
309            match split(
310                &bucket,
311                remaining_bytes,
312                self.max_size_bytes / BUCKET_SPLIT_FACTOR,
313            ) {
314                SplitDecision::BucketFits(size) => {
315                    remaining_bytes -= size;
316                    self.current = Index {
317                        slice: self.current.slice + 1,
318                        bucket: 0,
319                    };
320                    continue;
321                }
322                SplitDecision::MoveToNextBatch => break,
323                SplitDecision::Split(n) => {
324                    self.current = Index {
325                        slice: self.current.slice,
326                        bucket: self.current.bucket + n,
327                    };
328                    break;
329                }
330            }
331        }
332
333        if start == self.current {
334            // Either no progress could be made (not enough space to fit a bucket),
335            // or we're done.
336            return None;
337        }
338
339        // Current is the current for the next batch now,
340        // which means, current is the end for this batch.
341        Some(BucketsView {
342            inner: self.inner.clone(),
343            start,
344            end: self.current,
345        })
346    }
347}
348
349impl<T> Serialize for BucketsView<T>
350where
351    T: AsRef<[Bucket]>,
352{
353    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
354    where
355        S: serde::Serializer,
356    {
357        let mut state = serializer.serialize_seq(Some(self.len()))?;
358
359        for bucket in self.iter() {
360            state.serialize_element(&bucket)?;
361        }
362
363        state.end()
364    }
365}
366
367/// A view into a metrics bucket. Sometimes also called a partial bucket.
368/// A view contains a subset of datapoints of the original bucket.
369///
370/// ```txt
371///             Full Bucket
372/// /-------------------------------\
373/// [0, 1, 2, 3, 5, 5, 5, 10, 11, 11]
374/// \----------------/\-------------/
375///       View 1          View 2
376/// ```
377///
378/// A view can be split again into multiple smaller views.
379#[derive(Clone)]
380pub struct BucketView<'a> {
381    /// The source bucket.
382    inner: &'a Bucket,
383    /// Non-empty and valid range into the bucket.
384    /// The full range is constrained by `0..bucket.value.len()`
385    range: Range<usize>,
386}
387
388impl<'a> BucketView<'a> {
389    /// Creates a new bucket view of a bucket.
390    ///
391    /// The resulting view contains the entire bucket.
392    pub fn new(bucket: &'a Bucket) -> Self {
393        Self {
394            inner: bucket,
395            range: 0..bucket.value.len(),
396        }
397    }
398
399    /// Timestamp of the bucket.
400    ///
401    /// See also: [`Bucket::timestamp`]
402    pub fn timestamp(&self) -> UnixTimestamp {
403        self.inner.timestamp
404    }
405
406    /// Width of the bucket.
407    ///
408    /// See also: [`Bucket::width`]
409    pub fn width(&self) -> u64 {
410        self.inner.width
411    }
412
413    /// Name of the bucket.
414    ///
415    /// See also: [`Bucket::name`]
416    pub fn name(&self) -> &'a MetricName {
417        &self.inner.name
418    }
419
420    /// Value of the bucket view.
421    pub fn value(&self) -> BucketViewValue<'a> {
422        match &self.inner.value {
423            BucketValue::Counter(c) => BucketViewValue::Counter(*c),
424            BucketValue::Distribution(d) => BucketViewValue::Distribution(&d[self.range.clone()]),
425            BucketValue::Set(s) => BucketViewValue::Set(SetView::new(s, self.range.clone())),
426            BucketValue::Gauge(g) => BucketViewValue::Gauge(*g),
427        }
428    }
429
430    /// Type of the value of the bucket view.
431    pub fn ty(&self) -> MetricType {
432        match &self.inner.value {
433            BucketValue::Counter(_) => MetricType::Counter,
434            BucketValue::Distribution(_) => MetricType::Distribution,
435            BucketValue::Set(_) => MetricType::Set,
436            BucketValue::Gauge(_) => MetricType::Gauge,
437        }
438    }
439
440    /// Name of the bucket.
441    ///
442    /// See also: [`Bucket::tags`]
443    pub fn tags(&self) -> &'a BTreeMap<String, String> {
444        &self.inner.tags
445    }
446
447    /// Returns the value of the specified tag if it exists.
448    ///
449    /// See also: [`Bucket::tag()`]
450    pub fn tag(&self, name: &str) -> Option<&'a str> {
451        self.inner.tag(name)
452    }
453
454    /// Returns the metadata for this bucket.
455    ///
456    /// The aggregation process of metadata is inheritly lossy, which means
457    /// some metadata, for example the amount of merges, can not be accurately split
458    /// or divided over multiple bucket views.
459    ///
460    /// To compensate for this only a bucket view which contains the start of a bucket
461    /// will yield this metadata, all other views created from the bucket return an
462    /// identity value. Merging all metadata from non-overlapping bucket views must
463    /// yield the same values as stored on the original bucket.
464    ///
465    /// This causes some problems when operations on partial buckets are fallible,
466    /// for example transmitting two bucket views in separate http requests.
467    /// To deal with this Relay needs to prevent the splitting of buckets in the first place,
468    /// by never not creating too large buckets via aggregation in the first place.
469    ///
470    /// See also: [`Bucket::metadata`].
471    pub fn metadata(&self) -> BucketMetadata {
472        let merges = if self.range.start == 0 {
473            self.inner.metadata.merges
474        } else {
475            0
476        };
477
478        BucketMetadata {
479            merges,
480            ..self.inner.metadata
481        }
482    }
483
484    /// Number of raw datapoints in this view.
485    ///
486    /// See also: [`BucketValue::len()`]
487    pub fn len(&self) -> usize {
488        self.range.len()
489    }
490
491    /// Returns `true` if this bucket view contains no values.
492    pub fn is_empty(&self) -> bool {
493        self.len() == 0
494    }
495
496    /// Selects a sub-view of the current view.
497    ///
498    /// Returns `None` when:
499    /// - the passed range is not contained in the current view.
500    /// - trying to split a counter or gauge bucket.
501    pub fn select(mut self, range: Range<usize>) -> Option<Self> {
502        if range.start < self.range.start || range.end > self.range.end {
503            return None;
504        }
505
506        // Make sure the bucket can be split, or the entire bucket range is passed.
507        if !self.can_split() && range != (0..self.inner.value.len()) {
508            return None;
509        }
510
511        self.range = range;
512        Some(self)
513    }
514
515    /// Estimates the number of bytes needed to serialize the bucket without value.
516    ///
517    /// Note that this does not match the exact size of the serialized payload. Instead, the size is
518    /// approximated through tags and a static overhead.
519    fn estimated_base_size(&self) -> usize {
520        BUCKET_SIZE + self.name().len() + crate::utils::tags_cost(self.tags())
521    }
522
523    /// Estimates the number of bytes needed to serialize the bucket.
524    ///
525    /// Note that this does not match the exact size of the serialized payload. Instead, the size is
526    /// approximated through the number of contained values, assuming an average size of serialized
527    /// values.
528    pub fn estimated_size(&self) -> usize {
529        self.estimated_base_size() + self.len() * AVG_VALUE_SIZE
530    }
531
532    /// Calculates a split for this bucket if its estimated serialization size exceeds a threshold.
533    ///
534    /// There are three possible return values:
535    ///  - `(Some, None)` if the bucket fits entirely into the size budget. There is no split.
536    ///  - `(None, Some)` if the size budget cannot even hold the bucket name and tags. There is no
537    ///    split, the entire bucket is moved.
538    ///  - `(Some, Some)` if the bucket fits partially. Remaining values are moved into a new bucket
539    ///    with all other information cloned.
540    ///
541    /// This is an approximate function. The bucket is not actually serialized, but rather its
542    /// footprint is estimated through the number of data points contained. See
543    /// [`estimated_size`](Self::estimated_size) for more information.
544    pub fn split(self, size: usize, max_size: Option<usize>) -> (Option<Self>, Option<Self>) {
545        match split(&self, size, max_size.unwrap_or(0) / BUCKET_SPLIT_FACTOR) {
546            SplitDecision::BucketFits(_) => (Some(self), None),
547            SplitDecision::MoveToNextBatch => (None, Some(self)),
548            SplitDecision::Split(n) => {
549                let Range { start, end } = self.range;
550                let at = start + n;
551                (self.clone().select(start..at), self.select(at..end))
552            }
553        }
554    }
555
556    /// Whether the bucket can be split into multiple.
557    ///
558    /// Only set and distribution buckets can be split.
559    fn can_split(&self) -> bool {
560        matches!(
561            self.inner.value,
562            BucketValue::Distribution(_) | BucketValue::Set(_)
563        )
564    }
565
566    /// Returns `true` when this view contains the entire bucket.
567    fn is_full_bucket(&self) -> bool {
568        self.range.start == 0 && self.range.end == self.inner.value.len()
569    }
570}
571
572impl fmt::Debug for BucketView<'_> {
573    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
574        f.debug_struct("BucketView")
575            .field("timestamp", &self.inner.timestamp)
576            .field("width", &self.inner.width)
577            .field("name", &self.inner.name)
578            .field("value", &self.value())
579            .field("tags", &self.inner.tags)
580            .finish()
581    }
582}
583
584impl<'a> From<&'a Bucket> for BucketView<'a> {
585    fn from(value: &'a Bucket) -> Self {
586        BucketView::new(value)
587    }
588}
589
590impl Serialize for BucketView<'_> {
591    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
592    where
593        S: serde::Serializer,
594    {
595        let Bucket {
596            timestamp,
597            width,
598            name,
599            value: _,
600            tags,
601            metadata,
602        } = self.inner;
603
604        let len = match tags.is_empty() {
605            true => 4,
606            false => 5,
607        };
608
609        let mut state = serializer.serialize_map(Some(len))?;
610
611        state.serialize_entry("timestamp", timestamp)?;
612        state.serialize_entry("width", width)?;
613        state.serialize_entry("name", name)?;
614
615        if self.is_full_bucket() {
616            self.inner
617                .value
618                .serialize(serde::__private::ser::FlatMapSerializer(&mut state))?;
619        } else {
620            self.value()
621                .serialize(serde::__private::ser::FlatMapSerializer(&mut state))?;
622        }
623
624        if !tags.is_empty() {
625            state.serialize_entry("tags", tags)?;
626        }
627        if !metadata.is_default() {
628            state.serialize_entry("metadata", metadata)?;
629        }
630
631        state.end()
632    }
633}
634
635/// A view into the datapoints of a [`BucketValue`].
636#[derive(Debug, Clone, PartialEq, Serialize)]
637#[serde(tag = "type", content = "value")]
638pub enum BucketViewValue<'a> {
639    /// A counter metric.
640    ///
641    /// See: [`BucketValue::Counter`].
642    #[serde(rename = "c")]
643    Counter(CounterType),
644    /// A distribution metric.
645    ///
646    /// See: [`BucketValue::Distribution`].
647    #[serde(rename = "d")]
648    Distribution(&'a [DistributionType]),
649    /// A set metric.
650    ///
651    /// See: [`BucketValue::Set`].
652    #[serde(rename = "s")]
653    Set(SetView<'a>),
654    /// A gauage metric.
655    ///
656    /// See: [`BucketValue::Gauge`].
657    #[serde(rename = "g")]
658    Gauge(GaugeValue),
659}
660
661impl<'a> From<&'a BucketValue> for BucketViewValue<'a> {
662    fn from(value: &'a BucketValue) -> Self {
663        match value {
664            BucketValue::Counter(c) => BucketViewValue::Counter(*c),
665            BucketValue::Distribution(d) => BucketViewValue::Distribution(d),
666            BucketValue::Set(s) => BucketViewValue::Set(SetView::new(s, 0..s.len())),
667            BucketValue::Gauge(g) => BucketViewValue::Gauge(*g),
668        }
669    }
670}
671
672/// A view into the datapoints of a set metric.
673#[derive(Clone)]
674pub struct SetView<'a> {
675    source: &'a SetValue,
676    range: Range<usize>,
677}
678
679impl<'a> SetView<'a> {
680    fn new(source: &'a SetValue, range: Range<usize>) -> Self {
681        Self { source, range }
682    }
683
684    /// Amount of datapoints contained within the set view.
685    pub fn len(&self) -> usize {
686        self.range.len()
687    }
688
689    /// Returns `true` if this set contains no values.
690    pub fn is_empty(&self) -> bool {
691        self.len() == 0
692    }
693
694    /// Iterator over all datapoints contained in this set metric.
695    pub fn iter(&self) -> impl Iterator<Item = &SetType> {
696        self.source
697            .iter()
698            .skip(self.range.start)
699            .take(self.range.len())
700    }
701}
702
703impl PartialEq for SetView<'_> {
704    fn eq(&self, other: &Self) -> bool {
705        self.iter().eq(other.iter())
706    }
707}
708
709impl fmt::Debug for SetView<'_> {
710    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
711        f.debug_tuple("SetView")
712            .field(&self.iter().collect::<Vec<_>>())
713            .finish()
714    }
715}
716
717impl Serialize for SetView<'_> {
718    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
719    where
720        S: serde::Serializer,
721    {
722        let mut state = serializer.serialize_seq(Some(self.len()))?;
723
724        for item in self.iter() {
725            state.serialize_element(item)?;
726        }
727
728        state.end()
729    }
730}
731
732/// Result of [`split`].
733enum SplitDecision {
734    /// Bucket fits within the current budget.
735    ///
736    /// Contains the size of the bucket to subtract from the budget.
737    BucketFits(usize),
738    /// Bucket does not fit within the current budget and cannot be split.
739    MoveToNextBatch,
740    /// The bucket should be split at the specified position.
741    Split(usize),
742}
743
744/// Calculates a split for this bucket if its estimated serialization size exceeds a threshold.
745///
746/// There are three possible return values:
747///  - `BucketFits(size)` if the bucket fits entirely into the budget and consumes `size` bytes.
748///  - `MoveToNextBatch` if the size budget cannot even hold the bucket name and tags. There is no
749///    split, the entire bucket is moved.
750///  - `Split(n)` if the bucket fits partially, the bucket should be split after `n` elements.
751///
752/// This is an approximate function. The bucket is not actually serialized, but rather its
753/// footprint is estimated through the number of data points contained. See
754/// `estimate_size` for more information.
755fn split(bucket: &BucketView<'_>, max_size: usize, min_split_size: usize) -> SplitDecision {
756    // If there's enough space for the entire bucket, do not perform a split.
757    let bucket_size = bucket.estimated_size();
758    if max_size >= bucket_size {
759        return SplitDecision::BucketFits(bucket_size);
760    }
761
762    if !bucket.can_split() {
763        return SplitDecision::MoveToNextBatch;
764    }
765
766    // If the bucket key can't even fit into the remaining length, move the entire bucket into
767    // the right-hand side.
768    let own_size = bucket.estimated_base_size();
769    if max_size < (own_size + AVG_VALUE_SIZE) {
770        // split_at must not be zero
771        return SplitDecision::MoveToNextBatch;
772    }
773
774    if bucket_size < min_split_size {
775        return SplitDecision::MoveToNextBatch;
776    }
777
778    // Perform a split with the remaining space after adding the key. We assume an average
779    // length of 8 bytes per value and compute the number of items fitting into the left side.
780    let split_at = (max_size - own_size) / AVG_VALUE_SIZE;
781    SplitDecision::Split(split_at)
782}
783
784#[cfg(test)]
785mod tests {
786    use std::sync::Arc;
787
788    use insta::assert_json_snapshot;
789
790    use super::*;
791
792    #[test]
793    fn test_bucket_view_select_counter() {
794        let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap();
795
796        let view = BucketView::new(&bucket).select(0..1).unwrap();
797        assert_eq!(view.len(), 1);
798        assert_eq!(
799            serde_json::to_string(&view).unwrap(),
800            serde_json::to_string(&bucket).unwrap()
801        );
802    }
803
804    #[test]
805    fn test_bucket_view_select_invalid_counter() {
806        let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap();
807
808        assert!(BucketView::new(&bucket).select(0..0).is_none());
809        assert!(BucketView::new(&bucket).select(0..2).is_none());
810        assert!(BucketView::new(&bucket).select(1..1).is_none());
811    }
812
813    #[test]
814    fn test_bucket_view_counter_metadata() {
815        let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap();
816        assert_eq!(bucket.metadata, BucketView::new(&bucket).metadata());
817    }
818
819    #[test]
820    fn test_bucket_view_select_distribution() {
821        let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
822
823        let view = BucketView::new(&bucket).select(0..3).unwrap();
824        assert_eq!(view.len(), 3);
825        assert_eq!(
826            view.value(),
827            BucketViewValue::Distribution(&[1.into(), 2.into(), 3.into()])
828        );
829        let view = BucketView::new(&bucket).select(1..3).unwrap();
830        assert_eq!(view.len(), 2);
831        assert_eq!(
832            view.value(),
833            BucketViewValue::Distribution(&[2.into(), 3.into()])
834        );
835        let view = BucketView::new(&bucket).select(1..5).unwrap();
836        assert_eq!(view.len(), 4);
837        assert_eq!(
838            view.value(),
839            BucketViewValue::Distribution(&[2.into(), 3.into(), 5.into(), 5.into()])
840        );
841    }
842
843    #[test]
844    fn test_bucket_view_select_invalid_distribution() {
845        let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
846
847        assert!(BucketView::new(&bucket).select(0..6).is_none());
848        assert!(BucketView::new(&bucket).select(5..6).is_none());
849        assert!(BucketView::new(&bucket).select(77..99).is_none());
850    }
851
852    #[test]
853    fn test_bucket_view_distribution_metadata() {
854        let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
855        assert_eq!(bucket.metadata, BucketView::new(&bucket).metadata());
856
857        assert_eq!(
858            BucketView::new(&bucket).select(0..3).unwrap().metadata(),
859            bucket.metadata
860        );
861
862        let m = BucketView::new(&bucket).select(1..3).unwrap().metadata();
863        assert_eq!(
864            m,
865            BucketMetadata {
866                merges: 0,
867                ..bucket.metadata
868            }
869        );
870    }
871
872    #[test]
873    fn test_bucket_view_select_set() {
874        let bucket = Bucket::parse(b"b3:42:75|s", UnixTimestamp::from_secs(5000)).unwrap();
875        let s = [42, 75].into();
876
877        let view = BucketView::new(&bucket).select(0..2).unwrap();
878        assert_eq!(view.len(), 2);
879        assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 0..2)));
880        let view = BucketView::new(&bucket).select(1..2).unwrap();
881        assert_eq!(view.len(), 1);
882        assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 1..2)));
883        let view = BucketView::new(&bucket).select(0..1).unwrap();
884        assert_eq!(view.len(), 1);
885        assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 0..1)));
886    }
887
888    #[test]
889    fn test_bucket_view_select_invalid_set() {
890        let bucket = Bucket::parse(b"b3:42:75|s", UnixTimestamp::from_secs(5000)).unwrap();
891
892        assert!(BucketView::new(&bucket).select(0..3).is_none());
893        assert!(BucketView::new(&bucket).select(2..5).is_none());
894        assert!(BucketView::new(&bucket).select(77..99).is_none());
895    }
896
897    #[test]
898    fn test_bucket_view_set_metadata() {
899        let bucket = Bucket::parse(b"b2:1:2:3:5:5|s", UnixTimestamp::from_secs(5000)).unwrap();
900        assert_eq!(bucket.metadata, BucketView::new(&bucket).metadata());
901
902        assert_eq!(
903            BucketView::new(&bucket).select(0..3).unwrap().metadata(),
904            bucket.metadata
905        );
906
907        let m = BucketView::new(&bucket).select(1..3).unwrap().metadata();
908        assert_eq!(
909            m,
910            BucketMetadata {
911                merges: 0,
912                ..bucket.metadata
913            }
914        );
915    }
916
917    #[test]
918    fn test_bucket_view_select_gauge() {
919        let bucket =
920            Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap();
921
922        let view = BucketView::new(&bucket).select(0..5).unwrap();
923        assert_eq!(view.len(), 5);
924        assert_eq!(
925            view.value(),
926            BucketViewValue::Gauge(GaugeValue {
927                last: 25.into(),
928                min: 17.into(),
929                max: 42.into(),
930                sum: 220.into(),
931                count: 85
932            })
933        );
934    }
935
936    #[test]
937    fn test_bucket_view_select_invalid_gauge() {
938        let bucket =
939            Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap();
940
941        assert!(BucketView::new(&bucket).select(0..1).is_none());
942        assert!(BucketView::new(&bucket).select(0..4).is_none());
943        assert!(BucketView::new(&bucket).select(5..5).is_none());
944        assert!(BucketView::new(&bucket).select(5..6).is_none());
945    }
946
947    #[test]
948    fn test_bucket_view_gauge_metadata() {
949        let bucket =
950            Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap();
951        assert_eq!(BucketView::new(&bucket).metadata(), bucket.metadata);
952    }
953
954    fn buckets<T>(s: &[u8]) -> T
955    where
956        T: FromIterator<Bucket>,
957    {
958        let timestamp = UnixTimestamp::from_secs(5000);
959        Bucket::parse_all(s, timestamp)
960            .collect::<Result<T, _>>()
961            .unwrap()
962    }
963
964    #[test]
965    fn test_buckets_view_empty() {
966        let view = BucketsView::new(Vec::new());
967        assert_eq!(view.len(), 0);
968        assert!(view.is_empty());
969        let partials = view.iter().collect::<Vec<_>>();
970        assert!(partials.is_empty());
971    }
972
973    #[test]
974    fn test_buckets_view_iter_full() {
975        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
976
977        let view = BucketsView::from(&buckets);
978        assert_eq!(view.len(), 4);
979        assert!(!view.is_empty());
980        let partials = view.iter().collect::<Vec<_>>();
981        assert_eq!(partials.len(), 4);
982        assert_eq!(partials[0].name(), "c:custom/b0@none");
983        assert_eq!(partials[0].len(), 1);
984        assert_eq!(partials[1].name(), "c:custom/b1@none");
985        assert_eq!(partials[1].len(), 1);
986        assert_eq!(partials[2].name(), "d:custom/b2@none");
987        assert_eq!(partials[2].len(), 5);
988        assert_eq!(partials[3].name(), "s:custom/b3@none");
989        assert_eq!(partials[3].len(), 2);
990    }
991
992    #[test]
993    fn test_buckets_view_iter_partial_end() {
994        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
995
996        let mut view = BucketsView::new(&buckets);
997        view.end.slice = 2;
998        view.end.bucket = 3;
999        assert_eq!(view.len(), 3);
1000        assert!(!view.is_empty());
1001
1002        let partials = view.iter().collect::<Vec<_>>();
1003        assert_eq!(partials.len(), 3);
1004        assert_eq!(partials[0].name(), "c:custom/b0@none");
1005        assert_eq!(partials[0].len(), 1);
1006        assert_eq!(partials[1].name(), "c:custom/b1@none");
1007        assert_eq!(partials[1].len(), 1);
1008        assert_eq!(partials[2].name(), "d:custom/b2@none");
1009        assert_eq!(partials[2].len(), 3);
1010    }
1011
1012    #[test]
1013    fn test_buckets_view_iter_partial_start() {
1014        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1015
1016        let mut view = BucketsView::new(buckets);
1017        view.start.slice = 2;
1018        view.start.bucket = 3;
1019        assert_eq!(view.len(), 2);
1020        assert!(!view.is_empty());
1021
1022        let partials = view.iter().collect::<Vec<_>>();
1023        assert_eq!(partials.len(), 2);
1024        assert_eq!(partials[0].name(), "d:custom/b2@none");
1025        assert_eq!(partials[0].len(), 2);
1026        assert_eq!(partials[1].name(), "s:custom/b3@none");
1027        assert_eq!(partials[1].len(), 2);
1028    }
1029
1030    #[test]
1031    fn test_buckets_view_iter_partial_start_and_end() {
1032        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1033
1034        let mut view = BucketsView::from(&buckets);
1035        view.start.slice = 2;
1036        view.start.bucket = 1;
1037        view.end.slice = 3;
1038        view.end.bucket = 1;
1039        assert_eq!(view.len(), 2);
1040        assert!(!view.is_empty());
1041
1042        let partials = view.iter().collect::<Vec<_>>();
1043        assert_eq!(partials.len(), 2);
1044        assert_eq!(partials[0].name(), "d:custom/b2@none");
1045        assert_eq!(partials[0].len(), 4);
1046        assert_eq!(partials[1].name(), "s:custom/b3@none");
1047        assert_eq!(partials[1].len(), 1);
1048    }
1049
1050    #[test]
1051    fn test_buckets_view_by_size_small() {
1052        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1053
1054        let view = BucketsView::from(&buckets);
1055        let partials = view
1056            .by_size(100)
1057            .map(|bv| {
1058                let len: usize = bv.iter().map(|b| b.len()).sum();
1059                let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
1060                (len, size)
1061            })
1062            .collect::<Vec<_>>();
1063
1064        assert_eq!(partials, vec![(1, 74), (1, 74), (4, 98), (1, 74), (2, 82),]);
1065    }
1066
1067    #[test]
1068    fn test_buckets_view_by_size_small_as_arc() {
1069        let buckets: Arc<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1070
1071        let view = BucketsView::new(buckets);
1072        let partials = view
1073            .by_size(100)
1074            .map(|bv| {
1075                let len: usize = bv.iter().map(|b| b.len()).sum();
1076                let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
1077                (len, size)
1078            })
1079            .collect::<Vec<_>>();
1080
1081        assert_eq!(partials, vec![(1, 74), (1, 74), (4, 98), (1, 74), (2, 82),]);
1082    }
1083
1084    #[test]
1085    fn test_buckets_view_by_size_one_split() {
1086        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1087
1088        let view = BucketsView::from(&buckets);
1089        let partials = view
1090            .by_size(250)
1091            .map(|bv| {
1092                let len: usize = bv.iter().map(|b| b.len()).sum();
1093                let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
1094                (len, size)
1095            })
1096            .collect::<Vec<_>>();
1097
1098        assert_eq!(partials, vec![(6, 246), (3, 156)]);
1099    }
1100
1101    #[test]
1102    fn test_buckets_view_by_size_no_split() {
1103        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1104
1105        let view = BucketsView::from(&buckets);
1106        let partials = view
1107            .by_size(500)
1108            .map(|bv| {
1109                let len: usize = bv.iter().map(|b| b.len()).sum();
1110                let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
1111                (len, size)
1112            })
1113            .collect::<Vec<_>>();
1114
1115        assert_eq!(partials, vec![(9, 336)]);
1116    }
1117
1118    #[test]
1119    fn test_buckets_view_by_size_no_too_small_no_bucket_fits() {
1120        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1121
1122        let view = BucketsView::from(&buckets);
1123        let partials = view
1124            .by_size(50) // Too small, a bucket requires at least 74 bytes
1125            .count();
1126
1127        assert_eq!(partials, 0);
1128    }
1129
1130    #[test]
1131    fn test_buckets_view_by_size_do_not_split_gauge() {
1132        let buckets: Vec<_> = buckets(b"transactions/foo:25:17:42:220:85|g");
1133
1134        let view = BucketsView::from(&buckets);
1135        // 100 is too small to fit the gauge, but it is big enough to fit half a gauage,
1136        // make sure the gauge does not actually get split.
1137        let partials = view.by_size(100).count();
1138
1139        assert_eq!(partials, 0);
1140    }
1141
1142    #[test]
1143    fn test_buckets_view_serialize_full() {
1144        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c|#foo,bar:baz\nb2:1:2:3:5:5|d|#foo,bar:baz b3:42:75|s\ntransactions/foo:25:17:42:220:85|g");
1145
1146        assert_eq!(
1147            serde_json::to_string(&BucketsView::from(&buckets)).unwrap(),
1148            serde_json::to_string(&buckets).unwrap()
1149        );
1150    }
1151
1152    #[test]
1153    fn test_buckets_view_serialize_partial() {
1154        let buckets: Arc<[_]> = buckets(
1155            b"b1:12|c|#foo,bar:baz\nb2:1:2:3:5:5|d|#foo,bar:baz\nb3:42:75|s\nb4:25:17:42:220:85|g",
1156        );
1157
1158        let view = BucketsView::new(buckets);
1159        // This creates 4 separate views, spanning 1-2, 2-3, 3, 4.
1160        // 4 is too big to fit into a view together with the remainder of 3.
1161        let partials = view.by_size(178).collect::<Vec<_>>();
1162
1163        assert_json_snapshot!(partials);
1164    }
1165
1166    #[test]
1167    fn test_split_repeatedly() {
1168        let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
1169        let view = BucketView::new(&bucket);
1170
1171        // construct this so that we can take 2 values per split and result in 3 parts.
1172        let split_size = view.estimated_base_size() + 2 * AVG_VALUE_SIZE;
1173
1174        let (first, rest) = view.split(split_size, None);
1175        let (second, rest) = rest.unwrap().split(split_size, None);
1176        let (third, rest) = rest.unwrap().split(split_size, None);
1177
1178        assert_eq!(first.unwrap().range, 0..2);
1179        assert_eq!(second.unwrap().range, 2..4);
1180        assert_eq!(third.unwrap().range, 4..5);
1181        assert!(rest.is_none());
1182    }
1183}