relay_metrics/
view.rs

1use relay_common::time::UnixTimestamp;
2use serde::Serialize;
3use serde::ser::{SerializeMap, SerializeSeq};
4
5use crate::{
6    BucketMetadata, CounterType, DistributionType, GaugeValue, MetricName, SetType, SetValue,
7};
8use relay_base_schema::metrics::MetricType;
9use std::collections::BTreeMap;
10use std::fmt;
11use std::ops::Range;
12
13use crate::BucketValue;
14use crate::bucket::Bucket;
15
16/// The fraction of size passed to [`BucketsView::by_size()`] at which buckets will be split. A value of
17/// `2` means that all buckets smaller than half of `metrics_max_batch_size` will be moved in their entirety,
18/// and buckets larger will be split up.
19const BUCKET_SPLIT_FACTOR: usize = 32;
20
21/// The base size of a serialized bucket in bytes.
22///
23/// This is the size of a bucket's fixed fields in JSON format, excluding the value and tags.
24const BUCKET_SIZE: usize = 50;
25
26/// The average size of values when serialized.
27const AVG_VALUE_SIZE: usize = 8;
28
29/// An internal type representing an index into a slice of buckets.
30///
31/// Note: the meaning of fields depends on the context of the index.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33struct Index {
34    /// Index into the slice.
35    slice: usize,
36    /// Index into a bucket.
37    bucket: usize,
38}
39
40/// A view into a slice of metric buckets.
41///
42/// The view can be used to iterate over a large slice
43/// of metric data slicing even into the buckets themselves.
44///
45/// ```txt
46///                    Full View
47///  /---------------------------------------------\
48/// [[C:1], [C:12], [D:0, 1, 2, 3, 5, 5], [S:42, 75]]
49///  \--------------------------/  \---------------/
50///            View 1                    View 2
51/// ```
52///
53/// Iterating over a [`BucketsView`] yields [`BucketView`] items,
54/// only the first and last elements may be partial buckets.
55///
56/// In the above example `View 1` has a partial bucket at the end and
57/// `View 2` has a partial bucket in the beginning.
58///
59/// Using the above example, iterating over `View 1` yields the buckets:
60/// `[C:1], [C:12], [D:0, 1, 2, 3]`.
61#[derive(Clone, Copy)]
62pub struct BucketsView<T> {
63    /// The contained buckets.
64    inner: T,
65    /// Start index.
66    ///
67    /// - Slice index indicates bucket.
68    /// - Bucket index indicates offset in the selected bucket.
69    start: Index,
70    /// End index.
71    ///
72    /// - Slice index indicates exclusive end.
73    /// - Bucket index, indicates offset into the *next* bucket past the end.
74    end: Index,
75}
76
77impl<T> BucketsView<T>
78where
79    T: AsRef<[Bucket]>,
80{
81    /// Creates a new buckets view containing all data from the slice.
82    pub fn new(buckets: T) -> Self {
83        let len = buckets.as_ref().len();
84        Self {
85            inner: buckets,
86            start: Index {
87                slice: 0,
88                bucket: 0,
89            },
90            end: Index {
91                slice: len,
92                bucket: 0,
93            },
94        }
95    }
96
97    /// Returns the amount of partial or full buckets in the view.
98    pub fn len(&self) -> usize {
99        let mut len = self.end.slice - self.start.slice;
100        if self.end.bucket != 0 {
101            len += 1;
102        }
103        len
104    }
105
106    /// Returns whether the view contains any buckets.
107    pub fn is_empty(&self) -> bool {
108        self.len() == 0
109    }
110
111    /// Returns the same bucket view as a bucket view over a slice.
112    pub fn as_slice(&self) -> BucketsView<&[Bucket]> {
113        BucketsView {
114            inner: self.inner.as_ref(),
115            start: self.start,
116            end: self.end,
117        }
118    }
119
120    /// Iterator over all buckets in the view.
121    pub fn iter(&self) -> BucketsViewIter<'_> {
122        BucketsViewIter::new(self.inner.as_ref(), self.start, self.end)
123    }
124
125    /// Iterator which slices the source view into segments with an approximate size of `size_in_bytes`.
126    pub fn by_size(self, size_in_bytes: usize) -> BucketsViewBySizeIter<T> {
127        BucketsViewBySizeIter::new(self.inner, self.start, self.end, size_in_bytes)
128    }
129}
130
131impl<'a> From<&'a [Bucket]> for BucketsView<&'a [Bucket]> {
132    fn from(value: &'a [Bucket]) -> Self {
133        Self::new(value)
134    }
135}
136
137impl<'a> From<&'a Vec<Bucket>> for BucketsView<&'a [Bucket]> {
138    fn from(value: &'a Vec<Bucket>) -> Self {
139        Self::new(value.as_slice())
140    }
141}
142
143impl<T> fmt::Debug for BucketsView<T>
144where
145    T: AsRef<[Bucket]>,
146{
147    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148        let contents = self.iter().collect::<Vec<_>>();
149        f.debug_tuple("BucketsView").field(&contents).finish()
150    }
151}
152
153impl<'a> IntoIterator for BucketsView<&'a [Bucket]> {
154    type Item = BucketView<'a>;
155    type IntoIter = BucketsViewIter<'a>;
156
157    fn into_iter(self) -> Self::IntoIter {
158        BucketsViewIter::new(self.inner, self.start, self.end)
159    }
160}
161
162impl<'a, T> IntoIterator for &'a BucketsView<T>
163where
164    T: AsRef<[Bucket]>,
165{
166    type Item = BucketView<'a>;
167    type IntoIter = BucketsViewIter<'a>;
168
169    fn into_iter(self) -> Self::IntoIter {
170        BucketsViewIter::new(self.inner.as_ref(), self.start, self.end)
171    }
172}
173
174/// Iterator yielding all items contained in a [`BucketsView`].
175///
176/// First and/or last item may be partial buckets.
177pub struct BucketsViewIter<'a> {
178    /// Source slice of buckets.
179    inner: &'a [Bucket],
180    /// Current index.
181    current: Index,
182    /// End index.
183    end: Index,
184}
185
186impl<'a> BucketsViewIter<'a> {
187    /// Creates a new iterator.
188    ///
189    /// Start and end must be valid indices or iterator may end early.
190    fn new(inner: &'a [Bucket], start: Index, end: Index) -> Self {
191        Self {
192            inner,
193            end,
194            current: start,
195        }
196    }
197}
198
199impl<'a> Iterator for BucketsViewIter<'a> {
200    type Item = BucketView<'a>;
201
202    fn next(&mut self) -> Option<Self::Item> {
203        // We reached the exact end, there is no sub-bucket index.
204        if self.current.slice == self.end.slice && self.end.bucket == 0 {
205            return None;
206        }
207        // We are way past, including sub-bucket offset.
208        if self.current.slice > self.end.slice {
209            return None;
210        }
211
212        // This doesn't overflow because the last bucket in the inner slice will always have a 0 bucket index.
213        debug_assert!(
214            self.current.slice < self.inner.len(),
215            "invariant violated, iterator pointing past the slice"
216        );
217        let next = self.inner.get(self.current.slice)?;
218
219        // Choose the bucket end, this will always be the full bucket except if it is the last.
220        let end = match self.current.slice == self.end.slice {
221            false => next.value.len(),
222            true => self.end.bucket,
223        };
224
225        let next = BucketView::new(next).select(self.current.bucket..end);
226        let Some(next) = next else {
227            debug_assert!(false, "invariant violated, invalid bucket split");
228            relay_log::error!(
229                "Internal invariant violated, invalid bucket split, dropping all remaining buckets."
230            );
231            return None;
232        };
233
234        // Even if the current Bucket was partial, the next one will be full,
235        // except if it is the last one.
236        self.current = Index {
237            slice: self.current.slice + 1,
238            bucket: 0,
239        };
240
241        Some(next)
242    }
243}
244
245/// Iterator slicing a [`BucketsView`] into smaller views constrained by a given size in bytes.
246///
247// See [`estimate_size`] for how the size of a bucket is calculated.
248pub struct BucketsViewBySizeIter<T> {
249    /// Source slice.
250    inner: T,
251    /// Current position in the slice.
252    current: Index,
253    /// Terminal position.
254    end: Index,
255    /// Maximum size in bytes of each slice.
256    max_size_bytes: usize,
257}
258
259impl<T> BucketsViewBySizeIter<T> {
260    /// Creates a new iterator.
261    ///
262    /// Start and end must be valid indices or iterator may end early.
263    fn new(inner: T, start: Index, end: Index, max_size_bytes: usize) -> Self {
264        Self {
265            inner,
266            end,
267            current: start,
268            max_size_bytes,
269        }
270    }
271}
272
273impl<T> Iterator for BucketsViewBySizeIter<T>
274where
275    T: AsRef<[Bucket]>,
276    T: Clone,
277{
278    type Item = BucketsView<T>;
279
280    fn next(&mut self) -> Option<Self::Item> {
281        let start = self.current;
282
283        let mut remaining_bytes = self.max_size_bytes;
284        loop {
285            // Make sure, we don't shoot past the end ...
286            if (self.current.slice > self.end.slice)
287                || (self.current.slice == self.end.slice && self.end.bucket == 0)
288            {
289                break;
290            }
291
292            let inner = self.inner.as_ref();
293            // Select next potential bucket,
294            // this should never overflow because `end` will never go past the slice and
295            // we just validated that current is constrained by end.
296            debug_assert!(
297                self.current.slice < inner.len(),
298                "invariant violated, iterator pointing past the slice"
299            );
300            let bucket = inner.get(self.current.slice)?;
301
302            // Selection should never fail, because either we select the entire range,
303            // or we previously already split the bucket, which means this range is good.
304            let bucket = BucketView::new(bucket).select(self.current.bucket..bucket.value.len());
305            let Some(bucket) = bucket else {
306                debug_assert!(false, "internal invariant violated, invalid bucket split");
307                relay_log::error!(
308                    "Internal invariant violated, invalid bucket split, dropping all remaining buckets."
309                );
310                return None;
311            };
312
313            match split(
314                &bucket,
315                remaining_bytes,
316                self.max_size_bytes / BUCKET_SPLIT_FACTOR,
317            ) {
318                SplitDecision::BucketFits(size) => {
319                    remaining_bytes -= size;
320                    self.current = Index {
321                        slice: self.current.slice + 1,
322                        bucket: 0,
323                    };
324                    continue;
325                }
326                SplitDecision::MoveToNextBatch => break,
327                SplitDecision::Split(n) => {
328                    self.current = Index {
329                        slice: self.current.slice,
330                        bucket: self.current.bucket + n,
331                    };
332                    break;
333                }
334            }
335        }
336
337        if start == self.current {
338            // Either no progress could be made (not enough space to fit a bucket),
339            // or we're done.
340            return None;
341        }
342
343        // Current is the current for the next batch now,
344        // which means, current is the end for this batch.
345        Some(BucketsView {
346            inner: self.inner.clone(),
347            start,
348            end: self.current,
349        })
350    }
351}
352
353impl<T> Serialize for BucketsView<T>
354where
355    T: AsRef<[Bucket]>,
356{
357    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
358    where
359        S: serde::Serializer,
360    {
361        let mut state = serializer.serialize_seq(Some(self.len()))?;
362
363        for bucket in self.iter() {
364            state.serialize_element(&bucket)?;
365        }
366
367        state.end()
368    }
369}
370
371/// A view into a metrics bucket. Sometimes also called a partial bucket.
372/// A view contains a subset of datapoints of the original bucket.
373///
374/// ```txt
375///             Full Bucket
376/// /-------------------------------\
377/// [0, 1, 2, 3, 5, 5, 5, 10, 11, 11]
378/// \----------------/\-------------/
379///       View 1          View 2
380/// ```
381///
382/// A view can be split again into multiple smaller views.
383#[derive(Clone)]
384pub struct BucketView<'a> {
385    /// The source bucket.
386    inner: &'a Bucket,
387    /// Non-empty and valid range into the bucket.
388    /// The full range is constrained by `0..bucket.value.len()`
389    range: Range<usize>,
390}
391
392impl<'a> BucketView<'a> {
393    /// Creates a new bucket view of a bucket.
394    ///
395    /// The resulting view contains the entire bucket.
396    pub fn new(bucket: &'a Bucket) -> Self {
397        Self {
398            inner: bucket,
399            range: 0..bucket.value.len(),
400        }
401    }
402
403    /// Timestamp of the bucket.
404    ///
405    /// See also: [`Bucket::timestamp`]
406    pub fn timestamp(&self) -> UnixTimestamp {
407        self.inner.timestamp
408    }
409
410    /// Width of the bucket.
411    ///
412    /// See also: [`Bucket::width`]
413    pub fn width(&self) -> u64 {
414        self.inner.width
415    }
416
417    /// Name of the bucket.
418    ///
419    /// See also: [`Bucket::name`]
420    pub fn name(&self) -> &'a MetricName {
421        &self.inner.name
422    }
423
424    /// Value of the bucket view.
425    pub fn value(&self) -> BucketViewValue<'a> {
426        match &self.inner.value {
427            BucketValue::Counter(c) => BucketViewValue::Counter(*c),
428            BucketValue::Distribution(d) => BucketViewValue::Distribution(&d[self.range.clone()]),
429            BucketValue::Set(s) => BucketViewValue::Set(SetView::new(s, self.range.clone())),
430            BucketValue::Gauge(g) => BucketViewValue::Gauge(*g),
431        }
432    }
433
434    /// Type of the value of the bucket view.
435    pub fn ty(&self) -> MetricType {
436        match &self.inner.value {
437            BucketValue::Counter(_) => MetricType::Counter,
438            BucketValue::Distribution(_) => MetricType::Distribution,
439            BucketValue::Set(_) => MetricType::Set,
440            BucketValue::Gauge(_) => MetricType::Gauge,
441        }
442    }
443
444    /// Name of the bucket.
445    ///
446    /// See also: [`Bucket::tags`]
447    pub fn tags(&self) -> &'a BTreeMap<String, String> {
448        &self.inner.tags
449    }
450
451    /// Returns the value of the specified tag if it exists.
452    ///
453    /// See also: [`Bucket::tag()`]
454    pub fn tag(&self, name: &str) -> Option<&'a str> {
455        self.inner.tag(name)
456    }
457
458    /// Returns the metadata for this bucket.
459    ///
460    /// The aggregation process of metadata is inheritly lossy, which means
461    /// some metadata, for example the amount of merges, can not be accurately split
462    /// or divided over multiple bucket views.
463    ///
464    /// To compensate for this only a bucket view which contains the start of a bucket
465    /// will yield this metadata, all other views created from the bucket return an
466    /// identity value. Merging all metadata from non-overlapping bucket views must
467    /// yield the same values as stored on the original bucket.
468    ///
469    /// This causes some problems when operations on partial buckets are fallible,
470    /// for example transmitting two bucket views in separate http requests.
471    /// To deal with this Relay needs to prevent the splitting of buckets in the first place,
472    /// by never not creating too large buckets via aggregation in the first place.
473    ///
474    /// See also: [`Bucket::metadata`].
475    pub fn metadata(&self) -> BucketMetadata {
476        let merges = if self.range.start == 0 {
477            self.inner.metadata.merges
478        } else {
479            0
480        };
481
482        BucketMetadata {
483            merges,
484            ..self.inner.metadata
485        }
486    }
487
488    /// Number of raw datapoints in this view.
489    ///
490    /// See also: [`BucketValue::len()`]
491    pub fn len(&self) -> usize {
492        self.range.len()
493    }
494
495    /// Returns `true` if this bucket view contains no values.
496    pub fn is_empty(&self) -> bool {
497        self.len() == 0
498    }
499
500    /// Selects a sub-view of the current view.
501    ///
502    /// Returns `None` when:
503    /// - the passed range is not contained in the current view.
504    /// - trying to split a counter or gauge bucket.
505    pub fn select(mut self, range: Range<usize>) -> Option<Self> {
506        if range.start < self.range.start || range.end > self.range.end {
507            return None;
508        }
509
510        // Make sure the bucket can be split, or the entire bucket range is passed.
511        if !self.can_split() && range != (0..self.inner.value.len()) {
512            return None;
513        }
514
515        self.range = range;
516        Some(self)
517    }
518
519    /// Estimates the number of bytes needed to serialize the bucket without value.
520    ///
521    /// Note that this does not match the exact size of the serialized payload. Instead, the size is
522    /// approximated through tags and a static overhead.
523    fn estimated_base_size(&self) -> usize {
524        BUCKET_SIZE + self.name().len() + crate::utils::tags_cost(self.tags())
525    }
526
527    /// Estimates the number of bytes needed to serialize the bucket.
528    ///
529    /// Note that this does not match the exact size of the serialized payload. Instead, the size is
530    /// approximated through the number of contained values, assuming an average size of serialized
531    /// values.
532    pub fn estimated_size(&self) -> usize {
533        self.estimated_base_size() + self.len() * AVG_VALUE_SIZE
534    }
535
536    /// Calculates a split for this bucket if its estimated serialization size exceeds a threshold.
537    ///
538    /// There are three possible return values:
539    ///  - `(Some, None)` if the bucket fits entirely into the size budget. There is no split.
540    ///  - `(None, Some)` if the size budget cannot even hold the bucket name and tags. There is no
541    ///    split, the entire bucket is moved.
542    ///  - `(Some, Some)` if the bucket fits partially. Remaining values are moved into a new bucket
543    ///    with all other information cloned.
544    ///
545    /// This is an approximate function. The bucket is not actually serialized, but rather its
546    /// footprint is estimated through the number of data points contained. See
547    /// [`estimated_size`](Self::estimated_size) for more information.
548    pub fn split(self, size: usize, max_size: Option<usize>) -> (Option<Self>, Option<Self>) {
549        match split(&self, size, max_size.unwrap_or(0) / BUCKET_SPLIT_FACTOR) {
550            SplitDecision::BucketFits(_) => (Some(self), None),
551            SplitDecision::MoveToNextBatch => (None, Some(self)),
552            SplitDecision::Split(n) => {
553                let Range { start, end } = self.range;
554                let at = start + n;
555                (self.clone().select(start..at), self.select(at..end))
556            }
557        }
558    }
559
560    /// Whether the bucket can be split into multiple.
561    ///
562    /// Only set and distribution buckets can be split.
563    fn can_split(&self) -> bool {
564        matches!(
565            self.inner.value,
566            BucketValue::Distribution(_) | BucketValue::Set(_)
567        )
568    }
569
570    /// Returns `true` when this view contains the entire bucket.
571    fn is_full_bucket(&self) -> bool {
572        self.range.start == 0 && self.range.end == self.inner.value.len()
573    }
574}
575
576impl fmt::Debug for BucketView<'_> {
577    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
578        f.debug_struct("BucketView")
579            .field("timestamp", &self.inner.timestamp)
580            .field("width", &self.inner.width)
581            .field("name", &self.inner.name)
582            .field("value", &self.value())
583            .field("tags", &self.inner.tags)
584            .finish()
585    }
586}
587
588impl<'a> From<&'a Bucket> for BucketView<'a> {
589    fn from(value: &'a Bucket) -> Self {
590        BucketView::new(value)
591    }
592}
593
594impl Serialize for BucketView<'_> {
595    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
596    where
597        S: serde::Serializer,
598    {
599        let Bucket {
600            timestamp,
601            width,
602            name,
603            value: _,
604            tags,
605            metadata,
606        } = self.inner;
607
608        let len = match tags.is_empty() {
609            true => 4,
610            false => 5,
611        };
612
613        let mut state = serializer.serialize_map(Some(len))?;
614
615        state.serialize_entry("timestamp", timestamp)?;
616        state.serialize_entry("width", width)?;
617        state.serialize_entry("name", name)?;
618
619        if self.is_full_bucket() {
620            self.inner
621                .value
622                .serialize(serde::__private::ser::FlatMapSerializer(&mut state))?;
623        } else {
624            self.value()
625                .serialize(serde::__private::ser::FlatMapSerializer(&mut state))?;
626        }
627
628        if !tags.is_empty() {
629            state.serialize_entry("tags", tags)?;
630        }
631        if !metadata.is_default() {
632            state.serialize_entry("metadata", metadata)?;
633        }
634
635        state.end()
636    }
637}
638
639/// A view into the datapoints of a [`BucketValue`].
640#[derive(Debug, Clone, PartialEq, Serialize)]
641#[serde(tag = "type", content = "value")]
642pub enum BucketViewValue<'a> {
643    /// A counter metric.
644    ///
645    /// See: [`BucketValue::Counter`].
646    #[serde(rename = "c")]
647    Counter(CounterType),
648    /// A distribution metric.
649    ///
650    /// See: [`BucketValue::Distribution`].
651    #[serde(rename = "d")]
652    Distribution(&'a [DistributionType]),
653    /// A set metric.
654    ///
655    /// See: [`BucketValue::Set`].
656    #[serde(rename = "s")]
657    Set(SetView<'a>),
658    /// A gauage metric.
659    ///
660    /// See: [`BucketValue::Gauge`].
661    #[serde(rename = "g")]
662    Gauge(GaugeValue),
663}
664
665impl<'a> From<&'a BucketValue> for BucketViewValue<'a> {
666    fn from(value: &'a BucketValue) -> Self {
667        match value {
668            BucketValue::Counter(c) => BucketViewValue::Counter(*c),
669            BucketValue::Distribution(d) => BucketViewValue::Distribution(d),
670            BucketValue::Set(s) => BucketViewValue::Set(SetView::new(s, 0..s.len())),
671            BucketValue::Gauge(g) => BucketViewValue::Gauge(*g),
672        }
673    }
674}
675
676/// A view into the datapoints of a set metric.
677#[derive(Clone)]
678pub struct SetView<'a> {
679    source: &'a SetValue,
680    range: Range<usize>,
681}
682
683impl<'a> SetView<'a> {
684    fn new(source: &'a SetValue, range: Range<usize>) -> Self {
685        Self { source, range }
686    }
687
688    /// Amount of datapoints contained within the set view.
689    pub fn len(&self) -> usize {
690        self.range.len()
691    }
692
693    /// Returns `true` if this set contains no values.
694    pub fn is_empty(&self) -> bool {
695        self.len() == 0
696    }
697
698    /// Iterator over all datapoints contained in this set metric.
699    pub fn iter(&self) -> impl Iterator<Item = &SetType> {
700        self.source
701            .iter()
702            .skip(self.range.start)
703            .take(self.range.len())
704    }
705}
706
707impl PartialEq for SetView<'_> {
708    fn eq(&self, other: &Self) -> bool {
709        self.iter().eq(other.iter())
710    }
711}
712
713impl fmt::Debug for SetView<'_> {
714    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
715        f.debug_tuple("SetView")
716            .field(&self.iter().collect::<Vec<_>>())
717            .finish()
718    }
719}
720
721impl Serialize for SetView<'_> {
722    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
723    where
724        S: serde::Serializer,
725    {
726        let mut state = serializer.serialize_seq(Some(self.len()))?;
727
728        for item in self.iter() {
729            state.serialize_element(item)?;
730        }
731
732        state.end()
733    }
734}
735
736/// Result of [`split`].
737enum SplitDecision {
738    /// Bucket fits within the current budget.
739    ///
740    /// Contains the size of the bucket to subtract from the budget.
741    BucketFits(usize),
742    /// Bucket does not fit within the current budget and cannot be split.
743    MoveToNextBatch,
744    /// The bucket should be split at the specified position.
745    Split(usize),
746}
747
748/// Calculates a split for this bucket if its estimated serialization size exceeds a threshold.
749///
750/// There are three possible return values:
751///  - `BucketFits(size)` if the bucket fits entirely into the budget and consumes `size` bytes.
752///  - `MoveToNextBatch` if the size budget cannot even hold the bucket name and tags. There is no
753///    split, the entire bucket is moved.
754///  - `Split(n)` if the bucket fits partially, the bucket should be split after `n` elements.
755///
756/// This is an approximate function. The bucket is not actually serialized, but rather its
757/// footprint is estimated through the number of data points contained. See
758/// `estimate_size` for more information.
759fn split(bucket: &BucketView<'_>, max_size: usize, min_split_size: usize) -> SplitDecision {
760    // If there's enough space for the entire bucket, do not perform a split.
761    let bucket_size = bucket.estimated_size();
762    if max_size >= bucket_size {
763        return SplitDecision::BucketFits(bucket_size);
764    }
765
766    if !bucket.can_split() {
767        return SplitDecision::MoveToNextBatch;
768    }
769
770    // If the bucket key can't even fit into the remaining length, move the entire bucket into
771    // the right-hand side.
772    let own_size = bucket.estimated_base_size();
773    if max_size < (own_size + AVG_VALUE_SIZE) {
774        // split_at must not be zero
775        return SplitDecision::MoveToNextBatch;
776    }
777
778    if bucket_size < min_split_size {
779        return SplitDecision::MoveToNextBatch;
780    }
781
782    // Perform a split with the remaining space after adding the key. We assume an average
783    // length of 8 bytes per value and compute the number of items fitting into the left side.
784    let split_at = (max_size - own_size) / AVG_VALUE_SIZE;
785    SplitDecision::Split(split_at)
786}
787
788#[cfg(test)]
789mod tests {
790    use std::sync::Arc;
791
792    use insta::assert_json_snapshot;
793
794    use super::*;
795
796    #[test]
797    fn test_bucket_view_select_counter() {
798        let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap();
799
800        let view = BucketView::new(&bucket).select(0..1).unwrap();
801        assert_eq!(view.len(), 1);
802        assert_eq!(
803            serde_json::to_string(&view).unwrap(),
804            serde_json::to_string(&bucket).unwrap()
805        );
806    }
807
808    #[test]
809    fn test_bucket_view_select_invalid_counter() {
810        let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap();
811
812        assert!(BucketView::new(&bucket).select(0..0).is_none());
813        assert!(BucketView::new(&bucket).select(0..2).is_none());
814        assert!(BucketView::new(&bucket).select(1..1).is_none());
815    }
816
817    #[test]
818    fn test_bucket_view_counter_metadata() {
819        let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap();
820        assert_eq!(bucket.metadata, BucketView::new(&bucket).metadata());
821    }
822
823    #[test]
824    fn test_bucket_view_select_distribution() {
825        let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
826
827        let view = BucketView::new(&bucket).select(0..3).unwrap();
828        assert_eq!(view.len(), 3);
829        assert_eq!(
830            view.value(),
831            BucketViewValue::Distribution(&[1.into(), 2.into(), 3.into()])
832        );
833        let view = BucketView::new(&bucket).select(1..3).unwrap();
834        assert_eq!(view.len(), 2);
835        assert_eq!(
836            view.value(),
837            BucketViewValue::Distribution(&[2.into(), 3.into()])
838        );
839        let view = BucketView::new(&bucket).select(1..5).unwrap();
840        assert_eq!(view.len(), 4);
841        assert_eq!(
842            view.value(),
843            BucketViewValue::Distribution(&[2.into(), 3.into(), 5.into(), 5.into()])
844        );
845    }
846
847    #[test]
848    fn test_bucket_view_select_invalid_distribution() {
849        let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
850
851        assert!(BucketView::new(&bucket).select(0..6).is_none());
852        assert!(BucketView::new(&bucket).select(5..6).is_none());
853        assert!(BucketView::new(&bucket).select(77..99).is_none());
854    }
855
856    #[test]
857    fn test_bucket_view_distribution_metadata() {
858        let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
859        assert_eq!(bucket.metadata, BucketView::new(&bucket).metadata());
860
861        assert_eq!(
862            BucketView::new(&bucket).select(0..3).unwrap().metadata(),
863            bucket.metadata
864        );
865
866        let m = BucketView::new(&bucket).select(1..3).unwrap().metadata();
867        assert_eq!(
868            m,
869            BucketMetadata {
870                merges: 0,
871                ..bucket.metadata
872            }
873        );
874    }
875
876    #[test]
877    fn test_bucket_view_select_set() {
878        let bucket = Bucket::parse(b"b3:42:75|s", UnixTimestamp::from_secs(5000)).unwrap();
879        let s = [42, 75].into();
880
881        let view = BucketView::new(&bucket).select(0..2).unwrap();
882        assert_eq!(view.len(), 2);
883        assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 0..2)));
884        let view = BucketView::new(&bucket).select(1..2).unwrap();
885        assert_eq!(view.len(), 1);
886        assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 1..2)));
887        let view = BucketView::new(&bucket).select(0..1).unwrap();
888        assert_eq!(view.len(), 1);
889        assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 0..1)));
890    }
891
892    #[test]
893    fn test_bucket_view_select_invalid_set() {
894        let bucket = Bucket::parse(b"b3:42:75|s", UnixTimestamp::from_secs(5000)).unwrap();
895
896        assert!(BucketView::new(&bucket).select(0..3).is_none());
897        assert!(BucketView::new(&bucket).select(2..5).is_none());
898        assert!(BucketView::new(&bucket).select(77..99).is_none());
899    }
900
901    #[test]
902    fn test_bucket_view_set_metadata() {
903        let bucket = Bucket::parse(b"b2:1:2:3:5:5|s", UnixTimestamp::from_secs(5000)).unwrap();
904        assert_eq!(bucket.metadata, BucketView::new(&bucket).metadata());
905
906        assert_eq!(
907            BucketView::new(&bucket).select(0..3).unwrap().metadata(),
908            bucket.metadata
909        );
910
911        let m = BucketView::new(&bucket).select(1..3).unwrap().metadata();
912        assert_eq!(
913            m,
914            BucketMetadata {
915                merges: 0,
916                ..bucket.metadata
917            }
918        );
919    }
920
921    #[test]
922    fn test_bucket_view_select_gauge() {
923        let bucket =
924            Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap();
925
926        let view = BucketView::new(&bucket).select(0..5).unwrap();
927        assert_eq!(view.len(), 5);
928        assert_eq!(
929            view.value(),
930            BucketViewValue::Gauge(GaugeValue {
931                last: 25.into(),
932                min: 17.into(),
933                max: 42.into(),
934                sum: 220.into(),
935                count: 85
936            })
937        );
938    }
939
940    #[test]
941    fn test_bucket_view_select_invalid_gauge() {
942        let bucket =
943            Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap();
944
945        assert!(BucketView::new(&bucket).select(0..1).is_none());
946        assert!(BucketView::new(&bucket).select(0..4).is_none());
947        assert!(BucketView::new(&bucket).select(5..5).is_none());
948        assert!(BucketView::new(&bucket).select(5..6).is_none());
949    }
950
951    #[test]
952    fn test_bucket_view_gauge_metadata() {
953        let bucket =
954            Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap();
955        assert_eq!(BucketView::new(&bucket).metadata(), bucket.metadata);
956    }
957
958    fn buckets<T>(s: &[u8]) -> T
959    where
960        T: FromIterator<Bucket>,
961    {
962        let timestamp = UnixTimestamp::from_secs(5000);
963        Bucket::parse_all(s, timestamp)
964            .collect::<Result<T, _>>()
965            .unwrap()
966    }
967
968    #[test]
969    fn test_buckets_view_empty() {
970        let view = BucketsView::new(Vec::new());
971        assert_eq!(view.len(), 0);
972        assert!(view.is_empty());
973        let partials = view.iter().collect::<Vec<_>>();
974        assert!(partials.is_empty());
975    }
976
977    #[test]
978    fn test_buckets_view_iter_full() {
979        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
980
981        let view = BucketsView::from(&buckets);
982        assert_eq!(view.len(), 4);
983        assert!(!view.is_empty());
984        let partials = view.iter().collect::<Vec<_>>();
985        assert_eq!(partials.len(), 4);
986        assert_eq!(partials[0].name(), "c:custom/b0@none");
987        assert_eq!(partials[0].len(), 1);
988        assert_eq!(partials[1].name(), "c:custom/b1@none");
989        assert_eq!(partials[1].len(), 1);
990        assert_eq!(partials[2].name(), "d:custom/b2@none");
991        assert_eq!(partials[2].len(), 5);
992        assert_eq!(partials[3].name(), "s:custom/b3@none");
993        assert_eq!(partials[3].len(), 2);
994    }
995
996    #[test]
997    fn test_buckets_view_iter_partial_end() {
998        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
999
1000        let mut view = BucketsView::new(&buckets);
1001        view.end.slice = 2;
1002        view.end.bucket = 3;
1003        assert_eq!(view.len(), 3);
1004        assert!(!view.is_empty());
1005
1006        let partials = view.iter().collect::<Vec<_>>();
1007        assert_eq!(partials.len(), 3);
1008        assert_eq!(partials[0].name(), "c:custom/b0@none");
1009        assert_eq!(partials[0].len(), 1);
1010        assert_eq!(partials[1].name(), "c:custom/b1@none");
1011        assert_eq!(partials[1].len(), 1);
1012        assert_eq!(partials[2].name(), "d:custom/b2@none");
1013        assert_eq!(partials[2].len(), 3);
1014    }
1015
1016    #[test]
1017    fn test_buckets_view_iter_partial_start() {
1018        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1019
1020        let mut view = BucketsView::new(buckets);
1021        view.start.slice = 2;
1022        view.start.bucket = 3;
1023        assert_eq!(view.len(), 2);
1024        assert!(!view.is_empty());
1025
1026        let partials = view.iter().collect::<Vec<_>>();
1027        assert_eq!(partials.len(), 2);
1028        assert_eq!(partials[0].name(), "d:custom/b2@none");
1029        assert_eq!(partials[0].len(), 2);
1030        assert_eq!(partials[1].name(), "s:custom/b3@none");
1031        assert_eq!(partials[1].len(), 2);
1032    }
1033
1034    #[test]
1035    fn test_buckets_view_iter_partial_start_and_end() {
1036        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1037
1038        let mut view = BucketsView::from(&buckets);
1039        view.start.slice = 2;
1040        view.start.bucket = 1;
1041        view.end.slice = 3;
1042        view.end.bucket = 1;
1043        assert_eq!(view.len(), 2);
1044        assert!(!view.is_empty());
1045
1046        let partials = view.iter().collect::<Vec<_>>();
1047        assert_eq!(partials.len(), 2);
1048        assert_eq!(partials[0].name(), "d:custom/b2@none");
1049        assert_eq!(partials[0].len(), 4);
1050        assert_eq!(partials[1].name(), "s:custom/b3@none");
1051        assert_eq!(partials[1].len(), 1);
1052    }
1053
1054    #[test]
1055    fn test_buckets_view_by_size_small() {
1056        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1057
1058        let view = BucketsView::from(&buckets);
1059        let partials = view
1060            .by_size(100)
1061            .map(|bv| {
1062                let len: usize = bv.iter().map(|b| b.len()).sum();
1063                let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
1064                (len, size)
1065            })
1066            .collect::<Vec<_>>();
1067
1068        assert_eq!(partials, vec![(1, 74), (1, 74), (4, 98), (1, 74), (2, 82),]);
1069    }
1070
1071    #[test]
1072    fn test_buckets_view_by_size_small_as_arc() {
1073        let buckets: Arc<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1074
1075        let view = BucketsView::new(buckets);
1076        let partials = view
1077            .by_size(100)
1078            .map(|bv| {
1079                let len: usize = bv.iter().map(|b| b.len()).sum();
1080                let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
1081                (len, size)
1082            })
1083            .collect::<Vec<_>>();
1084
1085        assert_eq!(partials, vec![(1, 74), (1, 74), (4, 98), (1, 74), (2, 82),]);
1086    }
1087
1088    #[test]
1089    fn test_buckets_view_by_size_one_split() {
1090        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1091
1092        let view = BucketsView::from(&buckets);
1093        let partials = view
1094            .by_size(250)
1095            .map(|bv| {
1096                let len: usize = bv.iter().map(|b| b.len()).sum();
1097                let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
1098                (len, size)
1099            })
1100            .collect::<Vec<_>>();
1101
1102        assert_eq!(partials, vec![(6, 246), (3, 156)]);
1103    }
1104
1105    #[test]
1106    fn test_buckets_view_by_size_no_split() {
1107        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1108
1109        let view = BucketsView::from(&buckets);
1110        let partials = view
1111            .by_size(500)
1112            .map(|bv| {
1113                let len: usize = bv.iter().map(|b| b.len()).sum();
1114                let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
1115                (len, size)
1116            })
1117            .collect::<Vec<_>>();
1118
1119        assert_eq!(partials, vec![(9, 336)]);
1120    }
1121
1122    #[test]
1123    fn test_buckets_view_by_size_no_too_small_no_bucket_fits() {
1124        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s");
1125
1126        let view = BucketsView::from(&buckets);
1127        let partials = view
1128            .by_size(50) // Too small, a bucket requires at least 74 bytes
1129            .count();
1130
1131        assert_eq!(partials, 0);
1132    }
1133
1134    #[test]
1135    fn test_buckets_view_by_size_do_not_split_gauge() {
1136        let buckets: Vec<_> = buckets(b"transactions/foo:25:17:42:220:85|g");
1137
1138        let view = BucketsView::from(&buckets);
1139        // 100 is too small to fit the gauge, but it is big enough to fit half a gauage,
1140        // make sure the gauge does not actually get split.
1141        let partials = view.by_size(100).count();
1142
1143        assert_eq!(partials, 0);
1144    }
1145
1146    #[test]
1147    fn test_buckets_view_serialize_full() {
1148        let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c|#foo,bar:baz\nb2:1:2:3:5:5|d|#foo,bar:baz b3:42:75|s\ntransactions/foo:25:17:42:220:85|g");
1149
1150        assert_eq!(
1151            serde_json::to_string(&BucketsView::from(&buckets)).unwrap(),
1152            serde_json::to_string(&buckets).unwrap()
1153        );
1154    }
1155
1156    #[test]
1157    fn test_buckets_view_serialize_partial() {
1158        let buckets: Arc<[_]> = buckets(
1159            b"b1:12|c|#foo,bar:baz\nb2:1:2:3:5:5|d|#foo,bar:baz\nb3:42:75|s\nb4:25:17:42:220:85|g",
1160        );
1161
1162        let view = BucketsView::new(buckets);
1163        // This creates 4 separate views, spanning 1-2, 2-3, 3, 4.
1164        // 4 is too big to fit into a view together with the remainder of 3.
1165        let partials = view.by_size(178).collect::<Vec<_>>();
1166
1167        assert_json_snapshot!(partials);
1168    }
1169
1170    #[test]
1171    fn test_split_repeatedly() {
1172        let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap();
1173        let view = BucketView::new(&bucket);
1174
1175        // construct this so that we can take 2 values per split and result in 3 parts.
1176        let split_size = view.estimated_base_size() + 2 * AVG_VALUE_SIZE;
1177
1178        let (first, rest) = view.split(split_size, None);
1179        let (second, rest) = rest.unwrap().split(split_size, None);
1180        let (third, rest) = rest.unwrap().split(split_size, None);
1181
1182        assert_eq!(first.unwrap().range, 0..2);
1183        assert_eq!(second.unwrap().range, 2..4);
1184        assert_eq!(third.unwrap().range, 4..5);
1185        assert!(rest.is_none());
1186    }
1187}