relay_metrics/
bucket.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::iter::FusedIterator;
3use std::{fmt, mem};
4
5use relay_common::time::UnixTimestamp;
6use relay_protocol::FiniteF64;
7use serde::{Deserialize, Serialize};
8use smallvec::SmallVec;
9
10use crate::ParseMetricError;
11use crate::protocol::{
12    self, CounterType, DistributionType, GaugeType, MetricName, MetricResourceIdentifier,
13    MetricType, SetType, hash_set_value,
14};
15
16const VALUE_SEPARATOR: char = ':';
17
18/// Type of [`Bucket::tags`].
19pub type MetricTags = BTreeMap<String, String>;
20
21/// A snapshot of values within a [`Bucket`].
22#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
23pub struct GaugeValue {
24    /// The last value reported in the bucket.
25    ///
26    /// This aggregation is not commutative.
27    pub last: GaugeType,
28    /// The minimum value reported in the bucket.
29    pub min: GaugeType,
30    /// The maximum value reported in the bucket.
31    pub max: GaugeType,
32    /// The sum of all values reported in the bucket.
33    pub sum: GaugeType,
34    /// The number of times this bucket was updated with a new value.
35    pub count: u64,
36}
37
38impl GaugeValue {
39    /// Creates a gauge snapshot from a single value.
40    pub fn single(value: GaugeType) -> Self {
41        Self {
42            last: value,
43            min: value,
44            max: value,
45            sum: value,
46            count: 1,
47        }
48    }
49
50    /// Inserts a new value into the gauge.
51    pub fn insert(&mut self, value: GaugeType) {
52        self.last = value;
53        self.min = self.min.min(value);
54        self.max = self.max.max(value);
55        self.sum = self.sum.saturating_add(value);
56        self.count += 1;
57    }
58
59    /// Merges two gauge snapshots.
60    pub fn merge(&mut self, other: Self) {
61        self.last = other.last;
62        self.min = self.min.min(other.min);
63        self.max = self.max.max(other.max);
64        self.sum = self.sum.saturating_add(other.sum);
65        self.count += other.count;
66    }
67
68    /// Returns the average of all values reported in this bucket.
69    pub fn avg(&self) -> Option<GaugeType> {
70        self.sum / FiniteF64::new(self.count as f64)?
71    }
72}
73
74/// A distribution of values within a [`Bucket`].
75///
76/// Distributions logically store a histogram of values. Based on individual reported values,
77/// distributions allow to query the maximum, minimum, or average of the reported values, as well as
78/// statistical quantiles.
79///
80/// # Example
81///
82/// ```
83/// use relay_metrics::dist;
84///
85/// let mut dist = dist![1, 1, 1, 2];
86/// dist.push(5.into());
87/// dist.extend(std::iter::repeat(3.into()).take(7));
88/// ```
89///
90/// Logically, this distribution is equivalent to this visualization:
91///
92/// ```plain
93/// value | count
94/// 1.0   | ***
95/// 2.0   | *
96/// 3.0   | *******
97/// 4.0   |
98/// 5.0   | *
99/// ```
100///
101/// # Serialization
102///
103/// Distributions serialize as lists of floating point values. The list contains one entry for each
104/// value in the distribution, including duplicates.
105pub type DistributionValue = SmallVec<[DistributionType; 3]>;
106
107#[doc(hidden)]
108pub use smallvec::smallvec as _smallvec;
109
110/// Creates a [`DistributionValue`] containing the given arguments.
111///
112/// `dist!` allows `DistributionValue` to be defined with the same syntax as array expressions.
113///
114/// # Example
115///
116/// ```
117/// let dist = relay_metrics::dist![1, 2];
118/// ```
119#[macro_export]
120macro_rules! dist {
121    ($($x:expr),*$(,)*) => {
122        $crate::_smallvec!($($crate::DistributionType::from($x)),*) as $crate::DistributionValue
123    };
124}
125
126/// A set of unique values.
127///
128/// Set values can be specified as strings in the submission protocol. They are always hashed
129/// into a 32-bit value and the original value is dropped. If the submission protocol contains a
130/// 32-bit integer, it will be used directly, instead.
131///
132/// See the [bucket docs](crate::Bucket) for more information on set hashing.
133pub type SetValue = BTreeSet<SetType>;
134
135/// The [aggregated value](Bucket::value) of a metric bucket.
136#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
137#[serde(tag = "type", content = "value")]
138pub enum BucketValue {
139    /// Counts instances of an event ([`MetricType::Counter`]).
140    ///
141    /// Counters can be incremented and decremented. The default operation is to increment a counter
142    /// by `1`, although increments by larger values are equally possible.
143    ///
144    /// # Statsd Format
145    ///
146    /// Counters are declared as `"c"`. Alternatively, `"m"` is allowed.
147    ///
148    /// There can be a variable number of floating point values. If more than one value is given,
149    /// the values are summed into a single counter value:
150    ///
151    /// ```text
152    /// endpoint.hits:4.5:21:17.0|c
153    /// ```
154    ///
155    /// # Serialization
156    ///
157    /// This variant serializes to a double precision float.
158    ///
159    /// # Aggregation
160    ///
161    /// Counters aggregate by folding individual values into a single sum value per bucket. The sum
162    /// is ingested and stored directly.
163    #[serde(rename = "c")]
164    Counter(CounterType),
165
166    /// Builds a statistical distribution over values reported ([`MetricType::Distribution`]).
167    ///
168    /// Based on individual reported values, distributions allow to query the maximum, minimum, or
169    /// average of the reported values, as well as statistical quantiles. With an increasing number
170    /// of values in the distribution, its accuracy becomes approximate.
171    ///
172    /// # Statsd Format
173    ///
174    /// Distributions are declared as `"d"`. Alternatively, `"d"` and `"ms"` are allowed.
175    ///
176    /// There can be a variable number of floating point values. These values are collected directly
177    /// in a list per bucket.
178    ///
179    /// ```text
180    /// endpoint.response_time@millisecond:36:49:57:68|d
181    /// ```
182    ///
183    /// # Serialization
184    ///
185    /// This variant serializes to a list of double precision floats, see [`DistributionValue`].
186    ///
187    /// # Aggregation
188    ///
189    /// During ingestion, all individual reported values are collected in a lossless format. In
190    /// storage, these values are compressed into data sketches that allow to query quantiles.
191    /// Separately, the count and sum of the reported values is stored, which makes distributions a
192    /// strict superset of counters.
193    #[serde(rename = "d")]
194    Distribution(DistributionValue),
195
196    /// Counts the number of unique reported values.
197    ///
198    /// Sets allow sending arbitrary discrete values, including strings, and store the deduplicated
199    /// count. With an increasing number of unique values in the set, its accuracy becomes
200    /// approximate. It is not possible to query individual values from a set.
201    ///
202    /// # Statsd Format
203    ///
204    /// Sets are declared as `"s"`. Values in the list should be deduplicated.
205    ///
206    ///
207    /// ```text
208    /// endpoint.users:3182887624:4267882815|s
209    /// endpoint.users:e2546e4c-ecd0-43ad-ae27-87960e57a658|s
210    /// ```
211    ///
212    /// # Serialization
213    ///
214    /// This variant serializes to a list of 32-bit integers.
215    ///
216    /// # Aggregation
217    ///
218    /// Set values are internally represented as 32-bit integer hashes of the original value. These
219    /// hashes can be ingested directly as seen in the first example above. If raw strings are sent,
220    /// they will be hashed on-the-fly.
221    ///
222    /// Internally, set metrics are stored in data sketches that expose an approximate cardinality.
223    #[serde(rename = "s")]
224    Set(SetValue),
225
226    /// Stores absolute snapshots of values.
227    ///
228    /// In addition to plain [counters](Self::Counter), gauges store a snapshot of the maximum,
229    /// minimum and sum of all values, as well as the last reported value. Note that the "last"
230    /// component of this aggregation is not commutative. Which value is preserved as last value is
231    /// implementation-defined.
232    ///
233    /// # Statsd Format
234    ///
235    /// Gauges are declared as `"g"`. There are two ways to ingest gauges:
236    ///  1. As a single value. In this case, the provided value is assumed as the last, minimum,
237    ///     maximum, and the sum.
238    ///  2. As a sequence of five values in the order: `last`, `min`, `max`, `sum`, `count`.
239    ///
240    /// ```text
241    /// endpoint.parallel_requests:25|g
242    /// endpoint.parallel_requests:25:17:42:220:85|g
243    /// ```
244    ///
245    /// # Serialization
246    ///
247    /// This variant serializes to a structure with named fields, see [`GaugeValue`].
248    ///
249    /// # Aggregation
250    ///
251    /// Gauges aggregate by folding each of the components based on their semantics:
252    ///  - `last` assumes the newly added value
253    ///  - `min` retains the smaller value
254    ///  - `max` retains the larger value
255    ///  - `sum` adds the new value to the existing sum
256    ///  - `count` adds the count of the newly added gauge (defaulting to `1`)
257    #[serde(rename = "g")]
258    Gauge(GaugeValue),
259}
260
261impl BucketValue {
262    /// Returns a bucket value representing a counter with the given value.
263    pub fn counter(value: CounterType) -> Self {
264        Self::Counter(value)
265    }
266
267    /// Returns a bucket value representing a distribution with a single given value.
268    pub fn distribution(value: DistributionType) -> Self {
269        Self::Distribution(dist![value])
270    }
271
272    /// Returns a bucket value representing a set with a single given hash value.
273    pub fn set(value: SetType) -> Self {
274        Self::Set(std::iter::once(value).collect())
275    }
276
277    /// Returns a bucket value representing a set with a single given string value.
278    pub fn set_from_str(string: &str) -> Self {
279        Self::set(hash_set_value(string))
280    }
281
282    /// Returns a bucket value representing a set with a single given value.
283    pub fn set_from_display(display: impl fmt::Display) -> Self {
284        Self::set(hash_set_value(&display.to_string()))
285    }
286
287    /// Returns a bucket value representing a gauge with a single given value.
288    pub fn gauge(value: GaugeType) -> Self {
289        Self::Gauge(GaugeValue::single(value))
290    }
291
292    /// Returns the type of this value.
293    pub fn ty(&self) -> MetricType {
294        match self {
295            Self::Counter(_) => MetricType::Counter,
296            Self::Distribution(_) => MetricType::Distribution,
297            Self::Set(_) => MetricType::Set,
298            Self::Gauge(_) => MetricType::Gauge,
299        }
300    }
301
302    /// Returns the number of raw data points in this value.
303    pub fn len(&self) -> usize {
304        match self {
305            BucketValue::Counter(_) => 1,
306            BucketValue::Distribution(distribution) => distribution.len(),
307            BucketValue::Set(set) => set.len(),
308            BucketValue::Gauge(_) => 5,
309        }
310    }
311
312    /// Returns `true` if this bucket contains no values.
313    pub fn is_empty(&self) -> bool {
314        self.len() == 0
315    }
316
317    /// Estimates the number of bytes needed to encode the bucket value.
318    ///
319    /// Note that this does not necessarily match the exact memory footprint of the value,
320    /// because data structures have a memory overhead.
321    pub fn cost(&self) -> usize {
322        // Beside the size of [`BucketValue`], we also need to account for the cost of values
323        // allocated dynamically.
324        let allocated_cost = match self {
325            Self::Counter(_) => 0,
326            Self::Set(s) => mem::size_of::<SetType>() * s.len(),
327            Self::Gauge(_) => 0,
328            Self::Distribution(d) => d.len() * mem::size_of::<DistributionType>(),
329        };
330
331        mem::size_of::<Self>() + allocated_cost
332    }
333
334    /// Merges the given `bucket_value` into `self`.
335    ///
336    /// Returns `Ok(())` if the two bucket values can be merged. This is the case when both bucket
337    /// values are of the same variant. Otherwise, this returns `Err(other)`.
338    pub fn merge(&mut self, other: Self) -> Result<(), Self> {
339        match (self, other) {
340            (Self::Counter(slf), Self::Counter(other)) => *slf = slf.saturating_add(other),
341            (Self::Distribution(slf), Self::Distribution(other)) => slf.extend_from_slice(&other),
342            (Self::Set(slf), Self::Set(other)) => slf.extend(other),
343            (Self::Gauge(slf), Self::Gauge(other)) => slf.merge(other),
344            (_, other) => return Err(other),
345        }
346
347        Ok(())
348    }
349}
350
351/// Parses a list of counter values separated by colons and sums them up.
352fn parse_counter(string: &str) -> Option<CounterType> {
353    let mut sum = CounterType::default();
354    for component in string.split(VALUE_SEPARATOR) {
355        sum = sum.saturating_add(component.parse().ok()?);
356    }
357    Some(sum)
358}
359
360/// Parses a distribution from a list of floating point values separated by colons.
361fn parse_distribution(string: &str) -> Option<DistributionValue> {
362    let mut dist = DistributionValue::default();
363    for component in string.split(VALUE_SEPARATOR) {
364        dist.push(component.parse().ok()?);
365    }
366    Some(dist)
367}
368
369/// Parses a set of hashed numeric values.
370fn parse_set(string: &str) -> Option<SetValue> {
371    let mut set = SetValue::default();
372    for component in string.split(VALUE_SEPARATOR) {
373        let hash = component
374            .parse()
375            .unwrap_or_else(|_| protocol::hash_set_value(component));
376        set.insert(hash);
377    }
378    Some(set)
379}
380
381/// Parses a gauge from a value.
382///
383/// The gauge can either be given as a single floating point value, or as a list of exactly five
384/// values in the order of [`GaugeValue`] fields.
385fn parse_gauge(string: &str) -> Option<GaugeValue> {
386    let mut components = string.split(VALUE_SEPARATOR);
387
388    let last = components.next()?.parse().ok()?;
389    Some(if let Some(min) = components.next() {
390        GaugeValue {
391            last,
392            min: min.parse().ok()?,
393            max: components.next()?.parse().ok()?,
394            sum: components.next()?.parse().ok()?,
395            count: components.next()?.parse().ok()?,
396        }
397    } else {
398        GaugeValue::single(last)
399    })
400}
401
402/// Parses tags in the format `tag1,tag2:value`.
403///
404/// Tag values are optional. For tags with missing values, an empty `""` value is assumed.
405fn parse_tags(string: &str) -> Option<MetricTags> {
406    let mut map = MetricTags::new();
407
408    for pair in string.split(',') {
409        let mut name_value = pair.splitn(2, ':');
410
411        let name = name_value.next()?;
412        if !protocol::is_valid_tag_key(name) {
413            continue;
414        }
415
416        if let Ok(value) = protocol::unescape_tag_value(name_value.next().unwrap_or_default()) {
417            map.insert(name.to_owned(), value);
418        }
419    }
420
421    Some(map)
422}
423
424/// Parses a unix UTC timestamp.
425fn parse_timestamp(string: &str) -> Option<UnixTimestamp> {
426    string.parse().ok().map(UnixTimestamp::from_secs)
427}
428
429/// An aggregation of metric values.
430///
431/// As opposed to single metric values, bucket aggregations can carry multiple values. See
432/// [`MetricType`] for a description on how values are aggregated in buckets. Values are aggregated
433/// by metric name, type, time window, and all tags. Particularly, this allows metrics to have the
434/// same name even if their types differ.
435///
436/// See the [crate documentation](crate) for general information on Metrics.
437///
438/// # Values
439///
440/// The contents of a bucket, especially their representation and serialization, depend on the
441/// metric type:
442///
443/// - [Counters](BucketValue::Counter) store a single value, serialized as floating point.
444/// - [Distributions](MetricType::Distribution) and [sets](MetricType::Set) store the full set of
445///   reported values.
446/// - [Gauges](BucketValue::Gauge) store a snapshot of reported values, see [`GaugeValue`].
447///
448/// # Submission Protocol
449///
450/// ```text
451/// <name>[@unit]:<value>[:<value>...]|<type>[|#<tag_key>:<tag_value>,<tag>][|T<timestamp>]
452/// ```
453///
454/// See the [field documentation](Bucket#fields) for more information on the components. An example
455/// submission looks like this:
456///
457/// ```text
458#[doc = include_str!("../tests/fixtures/buckets.statsd.txt")]
459/// ```
460///
461/// To parse a submission payload, use [`Bucket::parse_all`].
462///
463/// # JSON Representation
464///
465/// Alternatively to the submission protocol, metrics can be represented as structured data in JSON.
466/// The data type of the `value` field is determined by the metric type.
467///
468/// In addition to the submission protocol, buckets have a required [`width`](Self::width) field in
469/// their JSON representation.
470///
471/// ```json
472#[doc = include_str!("../tests/fixtures/buckets.json")]
473/// ```
474///
475/// To parse a JSON payload, use [`serde_json`].
476///
477/// # Hashing of Sets
478///
479/// Set values can be specified as strings in the submission protocol. They are always hashed
480/// into a 32-bit value and the original value is dropped. If the submission protocol contains a
481/// 32-bit integer, it will be used directly, instead.
482///
483/// **Example**:
484///
485/// ```text
486#[doc = include_str!("../tests/fixtures/set.statsd.txt")]
487/// ```
488///
489/// The above submission is represented as:
490///
491/// ```json
492#[doc = include_str!("../tests/fixtures/set.json")]
493/// ```
494#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
495pub struct Bucket {
496    /// The start time of the bucket's time window.
497    ///
498    /// If a timestamp is not supplied as part of the submission payload, the default timestamp
499    /// supplied to [`Bucket::parse`] or [`Bucket::parse_all`] is associated with the metric. It is
500    /// then aligned with the aggregation window.
501    ///
502    /// # Statsd Format
503    ///
504    /// In statsd, timestamps are part of the `|`-separated list following values. Timestamps start
505    /// with the literal character `'T'` followed by the UNIX timestamp.
506    ///
507    /// The timestamp must be a positive integer in decimal notation representing the value of the
508    /// UNIX timestamp.
509    ///
510    /// # Example
511    ///
512    /// ```text
513    /// endpoint.hits:1|c|T1615889440
514    /// ```
515    pub timestamp: UnixTimestamp,
516
517    /// The length of the time window in seconds.
518    ///
519    /// To initialize a new bucket, choose `0` as width. Once the bucket is tracked by Relay's
520    /// aggregator, the width is aligned with configuration for the namespace and the  timestamp is
521    /// adjusted accordingly.
522    ///
523    /// # Statsd Format
524    ///
525    /// Specifying the bucket width in statsd is not supported.
526    pub width: u64,
527
528    /// The name of the metric in MRI (metric resource identifier) format.
529    ///
530    /// MRIs have the format `<type>:<ns>/<name>@<unit>`. See [`MetricResourceIdentifier`] for
531    /// information on fields and representations.
532    ///
533    /// # Statsd Format
534    ///
535    /// MRIs are sent in a more relaxed format: `[<namespace/]name[@unit]`. The value type is not
536    /// part of the metric name and namespaces are optional.
537    ///
538    /// Namespaces and units must consist of ASCII characters and match the regular expression
539    /// `/\w+/`. The name component of MRIs consist of unicode characters and must match the
540    /// regular expression `/\w[\w\-.]*/`. Note that the name must begin with a letter.
541    ///
542    /// Per convention, dots separate metric names into components, where the leading components are
543    /// considered namespaces and the final component is the name of the metric within its
544    /// namespace.
545    ///
546    /// # Examples
547    ///
548    /// ```text
549    /// endpoint.hits:1|c
550    /// custom/endpoint.hits:1|c
551    /// custom/endpoint.duration@millisecond:21.5|d
552    /// ```
553    pub name: MetricName,
554
555    /// The type and aggregated values of this bucket.
556    ///
557    /// Buckets support multiple values that are aggregated and can be accessed using a range of
558    /// aggregation functions depending on the value type. While always a variable number of values
559    /// can be sent in, some aggregations reduce the raw values to a fixed set of aggregates.
560    ///
561    /// See [`BucketValue`] for more examples and semantics.
562    ///
563    /// # Statsd Payload
564    ///
565    /// The bucket value and its type are specified in separate fields following the metric name in
566    /// the format: `<name>:<value>|<type>`. Values must be base-10 floating point numbers with
567    /// optional decimal places.
568    ///
569    /// It is possible to pack multiple values into a single datagram, but note that the type and
570    /// the value representation must match for this. Refer to the [`BucketValue`] docs for more
571    /// examples.
572    ///
573    /// # Example
574    ///
575    /// ```text
576    /// endpoint.hits:21|c
577    /// endpoint.hits:4.5|c
578    /// ```
579    #[serde(flatten)]
580    pub value: BucketValue,
581
582    /// A list of tags adding dimensions to the metric for filtering and aggregation.
583    ///
584    /// Tags allow to compute separate aggregates to filter or group metric values by any number of
585    /// dimensions. Tags consist of a unique tag key and one associated value. For tags with missing
586    /// values, an empty `""` value is assumed at query time.
587    ///
588    /// # Statsd Format
589    ///
590    /// Tags are preceded with a hash `#` and specified in a comma (`,`) separated list. Each tag
591    /// can either be a tag name, or a `name:value` combination. Tags are optional and can be
592    /// omitted.
593    ///
594    /// Tag keys are restricted to ASCII characters and must match the regular expression
595    /// `/[\w\-.\/]+/`.
596    ///
597    /// Tag values can contain unicode characters with the following escaping rules:
598    ///  - Tab is escaped as `\t`.
599    ///  - Carriage return is escaped as `\r`.
600    ///  - Line feed is escaped as `\n`.
601    ///  - Backslash is escaped as `\\`.
602    ///  - Commas and pipes are given unicode escapes in the form `\u{2c}` and `\u{7c}`,
603    ///    respectively.
604    ///
605    /// # Example
606    ///
607    /// ```text
608    /// endpoint.hits:1|c|#route:user_index,environment:production,release:1.4.0
609    /// ```
610    #[serde(default, skip_serializing_if = "MetricTags::is_empty")]
611    pub tags: MetricTags,
612
613    /// Relay internal metadata for a metric bucket.
614    ///
615    /// The metadata contains meta information about the metric bucket itself,
616    /// for example how many this bucket has been aggregated in total.
617    #[serde(default, skip_serializing_if = "BucketMetadata::is_default")]
618    pub metadata: BucketMetadata,
619}
620
621impl Bucket {
622    /// Parses a statsd-compatible payload.
623    ///
624    /// ```text
625    /// [<ns>/]<name>[@<unit>]:<value>|<type>[|#<tags>]`
626    /// ```
627    fn parse_str(string: &str, timestamp: UnixTimestamp) -> Option<Self> {
628        let mut components = string.split('|');
629
630        let (mri_str, values_str) = components.next()?.split_once(':')?;
631        let ty = components.next().and_then(|s| s.parse().ok())?;
632
633        let mri = MetricResourceIdentifier::parse_with_type(mri_str, ty).ok()?;
634        let value = match ty {
635            MetricType::Counter => BucketValue::Counter(parse_counter(values_str)?),
636            MetricType::Distribution => BucketValue::Distribution(parse_distribution(values_str)?),
637            MetricType::Set => BucketValue::Set(parse_set(values_str)?),
638            MetricType::Gauge => BucketValue::Gauge(parse_gauge(values_str)?),
639        };
640
641        let mut bucket = Bucket {
642            timestamp,
643            width: 0,
644            name: mri.to_string().into(),
645            value,
646            tags: Default::default(),
647            metadata: Default::default(),
648        };
649
650        for component in components {
651            match component.chars().next() {
652                Some('#') => {
653                    bucket.tags = parse_tags(component.get(1..)?)?;
654                }
655                Some('T') => {
656                    bucket.timestamp = parse_timestamp(component.get(1..)?)?;
657                }
658                _ => (),
659            }
660        }
661
662        Some(bucket)
663    }
664
665    /// Parses a single metric aggregate from the raw protocol.
666    ///
667    /// See the [`Bucket`] for more information on the protocol.
668    ///
669    /// # Example
670    ///
671    /// ```
672    /// use relay_metrics::{Bucket, UnixTimestamp};
673    ///
674    /// let bucket = Bucket::parse(b"response_time@millisecond:57|d", UnixTimestamp::now())
675    ///     .expect("metric should parse");
676    /// ```
677    pub fn parse(slice: &[u8], timestamp: UnixTimestamp) -> Result<Self, ParseMetricError> {
678        let string = std::str::from_utf8(slice).map_err(|_| ParseMetricError)?;
679        Self::parse_str(string, timestamp).ok_or(ParseMetricError)
680    }
681
682    /// Parses a set of metric aggregates from the raw protocol.
683    ///
684    /// Returns a metric result for each line in `slice`, ignoring empty lines. Both UNIX newlines
685    /// (`\n`) and Windows newlines (`\r\n`) are supported.
686    ///
687    /// It is possible to continue consuming the iterator after `Err` is yielded.
688    ///
689    /// See [`Bucket`] for more information on the protocol.
690    ///
691    /// # Example
692    ///
693    /// ```
694    /// use relay_metrics::{Bucket, UnixTimestamp};
695    ///
696    /// let data = br#"
697    /// endpoint.response_time@millisecond:57|d
698    /// endpoint.hits:1|c
699    /// "#;
700    ///
701    /// for metric_result in Bucket::parse_all(data, UnixTimestamp::now()) {
702    ///     let bucket = metric_result.expect("metric should parse");
703    ///     println!("Metric {}: {:?}", bucket.name, bucket.value);
704    /// }
705    /// ```
706    pub fn parse_all(slice: &[u8], timestamp: UnixTimestamp) -> ParseBuckets<'_> {
707        ParseBuckets { slice, timestamp }
708    }
709
710    /// Returns the value of the specified tag if it exists.
711    pub fn tag(&self, name: &str) -> Option<&str> {
712        self.tags.get(name).map(|s| s.as_str())
713    }
714
715    /// Removes the value of the specified tag.
716    ///
717    /// If the tag exists, the removed value is returned.
718    pub fn remove_tag(&mut self, name: &str) -> Option<String> {
719        self.tags.remove(name)
720    }
721}
722
723/// Relay internal metadata for a metric bucket.
724#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
725pub struct BucketMetadata {
726    /// How many times the bucket was merged.
727    ///
728    /// Creating a new bucket is the first merge.
729    /// Merging two buckets sums the amount of merges.
730    ///
731    /// For example: Merging two un-merged buckets will yield a total
732    /// of `2` merges.
733    ///
734    /// Due to how Relay aggregates metrics and later splits them into multiple
735    /// buckets again, the amount of merges can be zero.
736    /// When splitting a bucket the total volume of the bucket may only be attributed
737    /// to one part or distributed across the resulting buckets, in either case
738    /// values of `0` are possible.
739    pub merges: u32,
740
741    /// Received timestamp of the first metric in this bucket.
742    ///
743    /// This field should be set to the time in which the first metric of a specific bucket was
744    /// received in the outermost internal Relay.
745    pub received_at: Option<UnixTimestamp>,
746
747    /// Is `true` if this metric was extracted from a sampled/indexed envelope item.
748    ///
749    /// The final dynamic sampling decision is always made in processing Relays.
750    /// If a metric was extracted from an item which is sampled (i.e. retained by dynamic sampling), this flag is `true`.
751    ///
752    /// Since these metrics from samples carry additional information, e.g. they don't
753    /// require rate limiting since the sample they've been extracted from was already
754    /// rate limited, this flag must be included in the aggregation key when aggregation buckets.
755    #[serde(skip)]
756    pub extracted_from_indexed: bool,
757}
758
759impl BucketMetadata {
760    /// Creates a fresh metadata instance.
761    ///
762    /// The new metadata is initialized with `1` merge and a given `received_at` timestamp.
763    pub fn new(received_at: UnixTimestamp) -> Self {
764        Self {
765            merges: 1,
766            received_at: Some(received_at),
767            extracted_from_indexed: false,
768        }
769    }
770
771    /// Whether the metadata does not contain more information than the default.
772    pub fn is_default(&self) -> bool {
773        &Self::default() == self
774    }
775
776    /// Merges another metadata object into the current one.
777    pub fn merge(&mut self, other: Self) {
778        self.merges = self.merges.saturating_add(other.merges);
779        self.received_at = match (self.received_at, other.received_at) {
780            (Some(received_at), None) => Some(received_at),
781            (None, Some(received_at)) => Some(received_at),
782            (left, right) => left.min(right),
783        };
784    }
785}
786
787impl Default for BucketMetadata {
788    fn default() -> Self {
789        Self {
790            merges: 1,
791            received_at: None,
792            extracted_from_indexed: false,
793        }
794    }
795}
796
797/// Iterator over parsed metrics returned from [`Bucket::parse_all`].
798#[derive(Clone, Debug)]
799pub struct ParseBuckets<'a> {
800    slice: &'a [u8],
801    timestamp: UnixTimestamp,
802}
803
804impl Default for ParseBuckets<'_> {
805    fn default() -> Self {
806        Self {
807            slice: &[],
808            // The timestamp will never be returned.
809            timestamp: UnixTimestamp::from_secs(4711),
810        }
811    }
812}
813
814impl Iterator for ParseBuckets<'_> {
815    type Item = Result<Bucket, ParseMetricError>;
816
817    fn next(&mut self) -> Option<Self::Item> {
818        loop {
819            if self.slice.is_empty() {
820                return None;
821            }
822
823            let mut split = self.slice.splitn(2, |&b| b == b'\n');
824            let current = split.next()?;
825            self.slice = split.next().unwrap_or_default();
826
827            let string = match std::str::from_utf8(current) {
828                Ok(string) => string.strip_suffix('\r').unwrap_or(string),
829                Err(_) => return Some(Err(ParseMetricError)),
830            };
831
832            if !string.is_empty() {
833                return Some(Bucket::parse_str(string, self.timestamp).ok_or(ParseMetricError));
834            }
835        }
836    }
837}
838
839impl FusedIterator for ParseBuckets<'_> {}
840
841#[cfg(test)]
842mod tests {
843    use similar_asserts::assert_eq;
844
845    use crate::protocol::{DurationUnit, MetricUnit};
846
847    use super::*;
848
849    #[test]
850    fn test_distribution_value_size() {
851        // DistributionValue uses a SmallVec internally to prevent an additional allocation and
852        // indirection in cases where it needs to store only a small number of items. This is
853        // enabled by a comparably large `GaugeValue`, which stores five atoms. Ensure that the
854        // `DistributionValue`'s size does not exceed that of `GaugeValue`.
855        assert!(
856            std::mem::size_of::<DistributionValue>() <= std::mem::size_of::<GaugeValue>(),
857            "distribution value should not exceed gauge {}",
858            std::mem::size_of::<DistributionValue>()
859        );
860    }
861
862    #[test]
863    fn test_bucket_value_merge_counter() {
864        let mut value = BucketValue::Counter(42.into());
865        value.merge(BucketValue::Counter(43.into())).unwrap();
866        assert_eq!(value, BucketValue::Counter(85.into()));
867    }
868
869    #[test]
870    fn test_bucket_value_merge_distribution() {
871        let mut value = BucketValue::Distribution(dist![1, 2, 3]);
872        value.merge(BucketValue::Distribution(dist![2, 4])).unwrap();
873        assert_eq!(value, BucketValue::Distribution(dist![1, 2, 3, 2, 4]));
874    }
875
876    #[test]
877    fn test_bucket_value_merge_set() {
878        let mut value = BucketValue::Set(vec![1, 2].into_iter().collect());
879        value.merge(BucketValue::Set([2, 3].into())).unwrap();
880        assert_eq!(value, BucketValue::Set(vec![1, 2, 3].into_iter().collect()));
881    }
882
883    #[test]
884    fn test_bucket_value_merge_gauge() {
885        let mut value = BucketValue::Gauge(GaugeValue::single(42.into()));
886        value.merge(BucketValue::gauge(43.into())).unwrap();
887
888        assert_eq!(
889            value,
890            BucketValue::Gauge(GaugeValue {
891                last: 43.into(),
892                min: 42.into(),
893                max: 43.into(),
894                sum: 85.into(),
895                count: 2,
896            })
897        );
898    }
899
900    #[test]
901    fn test_parse_garbage() {
902        let s = "x23-408j17z4232@#34d\nc3456y7^😎";
903        let timestamp = UnixTimestamp::from_secs(4711);
904        let result = Bucket::parse(s.as_bytes(), timestamp);
905        assert!(result.is_err());
906    }
907
908    #[test]
909    fn test_parse_counter() {
910        let s = "spans/foo:42|c";
911        let timestamp = UnixTimestamp::from_secs(4711);
912        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
913        insta::assert_debug_snapshot!(metric, @r#"
914        Bucket {
915            timestamp: UnixTimestamp(4711),
916            width: 0,
917            name: MetricName(
918                "c:spans/foo@none",
919            ),
920            value: Counter(
921                42.0,
922            ),
923            tags: {},
924            metadata: BucketMetadata {
925                merges: 1,
926                received_at: None,
927                extracted_from_indexed: false,
928            },
929        }
930        "#);
931    }
932
933    #[test]
934    fn test_parse_counter_packed() {
935        let s = "spans/foo:42:17:21|c";
936        let timestamp = UnixTimestamp::from_secs(4711);
937        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
938        assert_eq!(metric.value, BucketValue::Counter(80.into()));
939    }
940
941    #[test]
942    fn test_parse_distribution() {
943        let s = "spans/foo:17.5|d";
944        let timestamp = UnixTimestamp::from_secs(4711);
945        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
946        insta::assert_debug_snapshot!(metric, @r#"
947        Bucket {
948            timestamp: UnixTimestamp(4711),
949            width: 0,
950            name: MetricName(
951                "d:spans/foo@none",
952            ),
953            value: Distribution(
954                [
955                    17.5,
956                ],
957            ),
958            tags: {},
959            metadata: BucketMetadata {
960                merges: 1,
961                received_at: None,
962                extracted_from_indexed: false,
963            },
964        }
965        "#);
966    }
967
968    #[test]
969    fn test_parse_distribution_packed() {
970        let s = "transactions/foo:17.5:21.9:42.7|d";
971        let timestamp = UnixTimestamp::from_secs(4711);
972        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
973        assert_eq!(
974            metric.value,
975            BucketValue::Distribution(dist![
976                FiniteF64::new(17.5).unwrap(),
977                FiniteF64::new(21.9).unwrap(),
978                FiniteF64::new(42.7).unwrap()
979            ])
980        );
981    }
982
983    #[test]
984    fn test_parse_histogram() {
985        let s = "transactions/foo:17.5|h"; // common alias for distribution
986        let timestamp = UnixTimestamp::from_secs(4711);
987        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
988        assert_eq!(
989            metric.value,
990            BucketValue::Distribution(dist![FiniteF64::new(17.5).unwrap()])
991        );
992    }
993
994    #[test]
995    fn test_parse_set() {
996        let s = "spans/foo:4267882815|s";
997        let timestamp = UnixTimestamp::from_secs(4711);
998        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
999        insta::assert_debug_snapshot!(metric, @r#"
1000        Bucket {
1001            timestamp: UnixTimestamp(4711),
1002            width: 0,
1003            name: MetricName(
1004                "s:spans/foo@none",
1005            ),
1006            value: Set(
1007                {
1008                    4267882815,
1009                },
1010            ),
1011            tags: {},
1012            metadata: BucketMetadata {
1013                merges: 1,
1014                received_at: None,
1015                extracted_from_indexed: false,
1016            },
1017        }
1018        "#);
1019    }
1020
1021    #[test]
1022    fn test_parse_set_hashed() {
1023        let s = "transactions/foo:e2546e4c-ecd0-43ad-ae27-87960e57a658|s";
1024        let timestamp = UnixTimestamp::from_secs(4711);
1025        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1026        assert_eq!(metric.value, BucketValue::Set([4267882815].into()));
1027    }
1028
1029    #[test]
1030    fn test_parse_set_hashed_packed() {
1031        let s = "transactions/foo:e2546e4c-ecd0-43ad-ae27-87960e57a658:00449b66-d91f-4fb8-b324-4c8bdf2499f6|s";
1032        let timestamp = UnixTimestamp::from_secs(4711);
1033        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1034        assert_eq!(
1035            metric.value,
1036            BucketValue::Set([181348692, 4267882815].into())
1037        );
1038    }
1039
1040    #[test]
1041    fn test_parse_set_packed() {
1042        let s = "transactions/foo:3182887624:4267882815|s";
1043        let timestamp = UnixTimestamp::from_secs(4711);
1044        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1045        assert_eq!(
1046            metric.value,
1047            BucketValue::Set([3182887624, 4267882815].into())
1048        )
1049    }
1050
1051    #[test]
1052    fn test_parse_gauge() {
1053        let s = "spans/foo:42|g";
1054        let timestamp = UnixTimestamp::from_secs(4711);
1055        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1056        insta::assert_debug_snapshot!(metric, @r#"
1057        Bucket {
1058            timestamp: UnixTimestamp(4711),
1059            width: 0,
1060            name: MetricName(
1061                "g:spans/foo@none",
1062            ),
1063            value: Gauge(
1064                GaugeValue {
1065                    last: 42.0,
1066                    min: 42.0,
1067                    max: 42.0,
1068                    sum: 42.0,
1069                    count: 1,
1070                },
1071            ),
1072            tags: {},
1073            metadata: BucketMetadata {
1074                merges: 1,
1075                received_at: None,
1076                extracted_from_indexed: false,
1077            },
1078        }
1079        "#);
1080    }
1081
1082    #[test]
1083    fn test_parse_gauge_packed() {
1084        let s = "spans/foo:25:17:42:220:85|g";
1085        let timestamp = UnixTimestamp::from_secs(4711);
1086        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1087        insta::assert_debug_snapshot!(metric, @r#"
1088        Bucket {
1089            timestamp: UnixTimestamp(4711),
1090            width: 0,
1091            name: MetricName(
1092                "g:spans/foo@none",
1093            ),
1094            value: Gauge(
1095                GaugeValue {
1096                    last: 25.0,
1097                    min: 17.0,
1098                    max: 42.0,
1099                    sum: 220.0,
1100                    count: 85,
1101                },
1102            ),
1103            tags: {},
1104            metadata: BucketMetadata {
1105                merges: 1,
1106                received_at: None,
1107                extracted_from_indexed: false,
1108            },
1109        }
1110        "#);
1111    }
1112
1113    #[test]
1114    fn test_parse_implicit_namespace() {
1115        let s = "foo:42|c";
1116        let timestamp = UnixTimestamp::from_secs(4711);
1117        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1118        insta::assert_debug_snapshot!(metric, @r###"
1119        Bucket {
1120            timestamp: UnixTimestamp(4711),
1121            width: 0,
1122            name: MetricName(
1123                "c:custom/foo@none",
1124            ),
1125            value: Counter(
1126                42.0,
1127            ),
1128            tags: {},
1129            metadata: BucketMetadata {
1130                merges: 1,
1131                received_at: None,
1132                extracted_from_indexed: false,
1133            },
1134        }
1135        "###);
1136    }
1137
1138    #[test]
1139    fn test_parse_unit() {
1140        let s = "transactions/foo@second:17.5|d";
1141        let timestamp = UnixTimestamp::from_secs(4711);
1142        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1143        let mri = MetricResourceIdentifier::parse(&metric.name).unwrap();
1144        assert_eq!(mri.unit, MetricUnit::Duration(DurationUnit::Second));
1145    }
1146
1147    #[test]
1148    fn test_parse_unit_regression() {
1149        let s = "transactions/foo@s:17.5|d";
1150        let timestamp = UnixTimestamp::from_secs(4711);
1151        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1152        let mri = MetricResourceIdentifier::parse(&metric.name).unwrap();
1153        assert_eq!(mri.unit, MetricUnit::Duration(DurationUnit::Second));
1154    }
1155
1156    #[test]
1157    fn test_parse_tags() {
1158        let s = "transactions/foo:17.5|d|#foo,bar:baz";
1159        let timestamp = UnixTimestamp::from_secs(4711);
1160        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1161        insta::assert_debug_snapshot!(metric.tags, @r###"
1162        {
1163            "bar": "baz",
1164            "foo": "",
1165        }
1166        "###);
1167    }
1168
1169    #[test]
1170    fn test_parse_tags_escaped() {
1171        let s = "transactions/foo:17.5|d|#foo:😅\\u{2c}🚀";
1172        let timestamp = UnixTimestamp::from_secs(4711);
1173        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1174        insta::assert_debug_snapshot!(metric.tags, @r###"
1175        {
1176            "foo": "😅,🚀",
1177        }
1178        "###);
1179    }
1180
1181    #[test]
1182    fn test_parse_timestamp() {
1183        let s = "transactions/foo:17.5|d|T1615889449";
1184        let timestamp = UnixTimestamp::from_secs(4711);
1185        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1186        assert_eq!(metric.timestamp, UnixTimestamp::from_secs(1615889449));
1187    }
1188
1189    #[test]
1190    fn test_parse_sample_rate() {
1191        // Sample rate should be ignored
1192        let s = "transactions/foo:17.5|d|@0.1";
1193        let timestamp = UnixTimestamp::from_secs(4711);
1194        Bucket::parse(s.as_bytes(), timestamp).unwrap();
1195    }
1196
1197    #[test]
1198    fn test_parse_invalid_name() {
1199        let s = "foo#bar:42|c";
1200        let timestamp = UnixTimestamp::from_secs(4711);
1201        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1202        assert_eq!(metric.name.as_ref(), "c:custom/foo_bar@none");
1203    }
1204
1205    #[test]
1206    fn test_parse_empty_name() {
1207        let s = ":42|c";
1208        let timestamp = UnixTimestamp::from_secs(4711);
1209        let metric = Bucket::parse(s.as_bytes(), timestamp);
1210        assert!(metric.is_err());
1211    }
1212
1213    #[test]
1214    fn test_parse_invalid_name_with_leading_digit() {
1215        let s = "64bit:42|c";
1216        let timestamp = UnixTimestamp::from_secs(4711);
1217        let metric = Bucket::parse(s.as_bytes(), timestamp);
1218        assert!(metric.is_err());
1219    }
1220
1221    #[test]
1222    fn test_parse_all() {
1223        let s = "transactions/foo:42|c\nbar:17|c";
1224        let timestamp = UnixTimestamp::from_secs(4711);
1225
1226        let metrics: Vec<Bucket> = Bucket::parse_all(s.as_bytes(), timestamp)
1227            .collect::<Result<_, _>>()
1228            .unwrap();
1229
1230        assert_eq!(metrics.len(), 2);
1231    }
1232
1233    #[test]
1234    fn test_parse_all_crlf() {
1235        let s = "transactions/foo:42|c\r\nbar:17|c";
1236        let timestamp = UnixTimestamp::from_secs(4711);
1237
1238        let metrics: Vec<Bucket> = Bucket::parse_all(s.as_bytes(), timestamp)
1239            .collect::<Result<_, _>>()
1240            .unwrap();
1241
1242        assert_eq!(metrics.len(), 2);
1243    }
1244
1245    #[test]
1246    fn test_parse_all_empty_lines() {
1247        let s = "transactions/foo:42|c\n\n\nbar:17|c";
1248        let timestamp = UnixTimestamp::from_secs(4711);
1249
1250        let metric_count = Bucket::parse_all(s.as_bytes(), timestamp).count();
1251        assert_eq!(metric_count, 2);
1252    }
1253
1254    #[test]
1255    fn test_parse_all_trailing() {
1256        let s = "transactions/foo:42|c\nbar:17|c\n";
1257        let timestamp = UnixTimestamp::from_secs(4711);
1258
1259        let metric_count = Bucket::parse_all(s.as_bytes(), timestamp).count();
1260        assert_eq!(metric_count, 2);
1261    }
1262
1263    #[test]
1264    fn test_metrics_docs() {
1265        let text = include_str!("../tests/fixtures/buckets.statsd.txt").trim_end();
1266        let json = include_str!("../tests/fixtures/buckets.json").trim_end();
1267
1268        let timestamp = UnixTimestamp::from_secs(0);
1269        let statsd_metrics = Bucket::parse_all(text.as_bytes(), timestamp)
1270            .collect::<Result<Vec<_>, _>>()
1271            .unwrap();
1272
1273        let json_metrics: Vec<Bucket> = serde_json::from_str(json).unwrap();
1274
1275        assert_eq!(statsd_metrics, json_metrics);
1276    }
1277
1278    #[test]
1279    fn test_set_docs() {
1280        let text = include_str!("../tests/fixtures/set.statsd.txt").trim_end();
1281        let json = include_str!("../tests/fixtures/set.json").trim_end();
1282
1283        let timestamp = UnixTimestamp::from_secs(1615889449);
1284        let statsd_metric = Bucket::parse(text.as_bytes(), timestamp).unwrap();
1285        let json_metric: Bucket = serde_json::from_str(json).unwrap();
1286
1287        assert_eq!(statsd_metric, json_metric);
1288    }
1289
1290    #[test]
1291    fn test_parse_buckets() {
1292        let json = r#"[
1293          {
1294            "name": "endpoint.response_time",
1295            "unit": "millisecond",
1296            "value": [36, 49, 57, 68],
1297            "type": "d",
1298            "timestamp": 1615889440,
1299            "width": 10,
1300            "tags": {
1301                "route": "user_index"
1302            },
1303            "metadata": {
1304                "merges": 1,
1305                "received_at": 1615889440
1306            }
1307          }
1308        ]"#;
1309
1310        let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1311
1312        insta::assert_debug_snapshot!(buckets, @r###"
1313        [
1314            Bucket {
1315                timestamp: UnixTimestamp(1615889440),
1316                width: 10,
1317                name: MetricName(
1318                    "endpoint.response_time",
1319                ),
1320                value: Distribution(
1321                    [
1322                        36.0,
1323                        49.0,
1324                        57.0,
1325                        68.0,
1326                    ],
1327                ),
1328                tags: {
1329                    "route": "user_index",
1330                },
1331                metadata: BucketMetadata {
1332                    merges: 1,
1333                    received_at: Some(
1334                        UnixTimestamp(1615889440),
1335                    ),
1336                    extracted_from_indexed: false,
1337                },
1338            },
1339        ]
1340        "###);
1341    }
1342
1343    #[test]
1344    fn test_parse_bucket_defaults() {
1345        let json = r#"[
1346          {
1347            "name": "endpoint.hits",
1348            "value": 4,
1349            "type": "c",
1350            "timestamp": 1615889440,
1351            "width": 10,
1352            "metadata": {
1353                "merges": 1,
1354                "received_at": 1615889440
1355            }
1356          }
1357        ]"#;
1358
1359        let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1360
1361        insta::assert_debug_snapshot!(buckets, @r###"
1362        [
1363            Bucket {
1364                timestamp: UnixTimestamp(1615889440),
1365                width: 10,
1366                name: MetricName(
1367                    "endpoint.hits",
1368                ),
1369                value: Counter(
1370                    4.0,
1371                ),
1372                tags: {},
1373                metadata: BucketMetadata {
1374                    merges: 1,
1375                    received_at: Some(
1376                        UnixTimestamp(1615889440),
1377                    ),
1378                    extracted_from_indexed: false,
1379                },
1380            },
1381        ]
1382        "###);
1383    }
1384
1385    #[test]
1386    fn test_buckets_roundtrip() {
1387        let json = r#"[
1388  {
1389    "timestamp": 1615889440,
1390    "width": 10,
1391    "name": "endpoint.response_time",
1392    "type": "d",
1393    "value": [
1394      36.0,
1395      49.0,
1396      57.0,
1397      68.0
1398    ],
1399    "tags": {
1400      "route": "user_index"
1401    }
1402  },
1403  {
1404    "timestamp": 1615889440,
1405    "width": 10,
1406    "name": "endpoint.hits",
1407    "type": "c",
1408    "value": 4.0,
1409    "tags": {
1410      "route": "user_index"
1411    }
1412  },
1413  {
1414    "timestamp": 1615889440,
1415    "width": 10,
1416    "name": "endpoint.parallel_requests",
1417    "type": "g",
1418    "value": {
1419      "last": 25.0,
1420      "min": 17.0,
1421      "max": 42.0,
1422      "sum": 2210.0,
1423      "count": 85
1424    }
1425  },
1426  {
1427    "timestamp": 1615889440,
1428    "width": 10,
1429    "name": "endpoint.users",
1430    "type": "s",
1431    "value": [
1432      3182887624,
1433      4267882815
1434    ],
1435    "tags": {
1436      "route": "user_index"
1437    }
1438  }
1439]"#;
1440
1441        let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1442        let serialized = serde_json::to_string_pretty(&buckets).unwrap();
1443        assert_eq!(json, serialized);
1444    }
1445
1446    #[test]
1447    fn test_bucket_docs_roundtrip() {
1448        let json = include_str!("../tests/fixtures/buckets.json")
1449            .trim_end()
1450            .replace("\r\n", "\n");
1451        let buckets = serde_json::from_str::<Vec<Bucket>>(&json).unwrap();
1452
1453        let serialized = serde_json::to_string_pretty(&buckets).unwrap();
1454        assert_eq!(json, serialized);
1455    }
1456
1457    #[test]
1458    fn test_bucket_metadata_merge() {
1459        let mut metadata = BucketMetadata::default();
1460
1461        let other_metadata = BucketMetadata::default();
1462        metadata.merge(other_metadata);
1463        assert_eq!(
1464            metadata,
1465            BucketMetadata {
1466                merges: 2,
1467                received_at: None,
1468                extracted_from_indexed: false,
1469            }
1470        );
1471
1472        let other_metadata = BucketMetadata::new(UnixTimestamp::from_secs(10));
1473        metadata.merge(other_metadata);
1474        assert_eq!(
1475            metadata,
1476            BucketMetadata {
1477                merges: 3,
1478                received_at: Some(UnixTimestamp::from_secs(10)),
1479                extracted_from_indexed: false,
1480            }
1481        );
1482
1483        let other_metadata = BucketMetadata::new(UnixTimestamp::from_secs(20));
1484        metadata.merge(other_metadata);
1485        assert_eq!(
1486            metadata,
1487            BucketMetadata {
1488                merges: 4,
1489                received_at: Some(UnixTimestamp::from_secs(10)),
1490                extracted_from_indexed: false,
1491            }
1492        );
1493    }
1494}