relay_metrics/
bucket.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::hash::Hash;
3use std::iter::FusedIterator;
4use std::{fmt, mem};
5
6use hash32::{FnvHasher, Hasher as _};
7use relay_cardinality::CardinalityItem;
8use relay_common::time::UnixTimestamp;
9use serde::{Deserialize, Serialize};
10use smallvec::SmallVec;
11
12use crate::protocol::{
13    self, hash_set_value, CounterType, DistributionType, GaugeType, MetricName,
14    MetricResourceIdentifier, MetricType, SetType,
15};
16use crate::{FiniteF64, MetricNamespace, ParseMetricError};
17
18const VALUE_SEPARATOR: char = ':';
19
20/// Type of [`Bucket::tags`].
21pub type MetricTags = BTreeMap<String, String>;
22
23/// A snapshot of values within a [`Bucket`].
24#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
25pub struct GaugeValue {
26    /// The last value reported in the bucket.
27    ///
28    /// This aggregation is not commutative.
29    pub last: GaugeType,
30    /// The minimum value reported in the bucket.
31    pub min: GaugeType,
32    /// The maximum value reported in the bucket.
33    pub max: GaugeType,
34    /// The sum of all values reported in the bucket.
35    pub sum: GaugeType,
36    /// The number of times this bucket was updated with a new value.
37    pub count: u64,
38}
39
40impl GaugeValue {
41    /// Creates a gauge snapshot from a single value.
42    pub fn single(value: GaugeType) -> Self {
43        Self {
44            last: value,
45            min: value,
46            max: value,
47            sum: value,
48            count: 1,
49        }
50    }
51
52    /// Inserts a new value into the gauge.
53    pub fn insert(&mut self, value: GaugeType) {
54        self.last = value;
55        self.min = self.min.min(value);
56        self.max = self.max.max(value);
57        self.sum = self.sum.saturating_add(value);
58        self.count += 1;
59    }
60
61    /// Merges two gauge snapshots.
62    pub fn merge(&mut self, other: Self) {
63        self.last = other.last;
64        self.min = self.min.min(other.min);
65        self.max = self.max.max(other.max);
66        self.sum = self.sum.saturating_add(other.sum);
67        self.count += other.count;
68    }
69
70    /// Returns the average of all values reported in this bucket.
71    pub fn avg(&self) -> Option<GaugeType> {
72        self.sum / FiniteF64::new(self.count as f64)?
73    }
74}
75
76/// A distribution of values within a [`Bucket`].
77///
78/// Distributions logically store a histogram of values. Based on individual reported values,
79/// distributions allow to query the maximum, minimum, or average of the reported values, as well as
80/// statistical quantiles.
81///
82/// # Example
83///
84/// ```
85/// use relay_metrics::dist;
86///
87/// let mut dist = dist![1, 1, 1, 2];
88/// dist.push(5.into());
89/// dist.extend(std::iter::repeat(3.into()).take(7));
90/// ```
91///
92/// Logically, this distribution is equivalent to this visualization:
93///
94/// ```plain
95/// value | count
96/// 1.0   | ***
97/// 2.0   | *
98/// 3.0   | *******
99/// 4.0   |
100/// 5.0   | *
101/// ```
102///
103/// # Serialization
104///
105/// Distributions serialize as lists of floating point values. The list contains one entry for each
106/// value in the distribution, including duplicates.
107pub type DistributionValue = SmallVec<[DistributionType; 3]>;
108
109#[doc(hidden)]
110pub use smallvec::smallvec as _smallvec;
111
112/// Creates a [`DistributionValue`] containing the given arguments.
113///
114/// `dist!` allows `DistributionValue` to be defined with the same syntax as array expressions.
115///
116/// # Example
117///
118/// ```
119/// let dist = relay_metrics::dist![1, 2];
120/// ```
121#[macro_export]
122macro_rules! dist {
123    ($($x:expr),*$(,)*) => {
124        $crate::_smallvec!($($crate::DistributionType::from($x)),*) as $crate::DistributionValue
125    };
126}
127
128/// A set of unique values.
129///
130/// Set values can be specified as strings in the submission protocol. They are always hashed
131/// into a 32-bit value and the original value is dropped. If the submission protocol contains a
132/// 32-bit integer, it will be used directly, instead.
133///
134/// See the [bucket docs](crate::Bucket) for more information on set hashing.
135pub type SetValue = BTreeSet<SetType>;
136
137/// The [aggregated value](Bucket::value) of a metric bucket.
138#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
139#[serde(tag = "type", content = "value")]
140pub enum BucketValue {
141    /// Counts instances of an event ([`MetricType::Counter`]).
142    ///
143    /// Counters can be incremented and decremented. The default operation is to increment a counter
144    /// by `1`, although increments by larger values are equally possible.
145    ///
146    /// # Statsd Format
147    ///
148    /// Counters are declared as `"c"`. Alternatively, `"m"` is allowed.
149    ///
150    /// There can be a variable number of floating point values. If more than one value is given,
151    /// the values are summed into a single counter value:
152    ///
153    /// ```text
154    /// endpoint.hits:4.5:21:17.0|c
155    /// ```
156    ///
157    /// # Serialization
158    ///
159    /// This variant serializes to a double precision float.
160    ///
161    /// # Aggregation
162    ///
163    /// Counters aggregate by folding individual values into a single sum value per bucket. The sum
164    /// is ingested and stored directly.
165    #[serde(rename = "c")]
166    Counter(CounterType),
167
168    /// Builds a statistical distribution over values reported ([`MetricType::Distribution`]).
169    ///
170    /// Based on individual reported values, distributions allow to query the maximum, minimum, or
171    /// average of the reported values, as well as statistical quantiles. With an increasing number
172    /// of values in the distribution, its accuracy becomes approximate.
173    ///
174    /// # Statsd Format
175    ///
176    /// Distributions are declared as `"d"`. Alternatively, `"d"` and `"ms"` are allowed.
177    ///
178    /// There can be a variable number of floating point values. These values are collected directly
179    /// in a list per bucket.
180    ///
181    /// ```text
182    /// endpoint.response_time@millisecond:36:49:57:68|d
183    /// ```
184    ///
185    /// # Serialization
186    ///
187    /// This variant serializes to a list of double precision floats, see [`DistributionValue`].
188    ///
189    /// # Aggregation
190    ///
191    /// During ingestion, all individual reported values are collected in a lossless format. In
192    /// storage, these values are compressed into data sketches that allow to query quantiles.
193    /// Separately, the count and sum of the reported values is stored, which makes distributions a
194    /// strict superset of counters.
195    #[serde(rename = "d")]
196    Distribution(DistributionValue),
197
198    /// Counts the number of unique reported values.
199    ///
200    /// Sets allow sending arbitrary discrete values, including strings, and store the deduplicated
201    /// count. With an increasing number of unique values in the set, its accuracy becomes
202    /// approximate. It is not possible to query individual values from a set.
203    ///
204    /// # Statsd Format
205    ///
206    /// Sets are declared as `"s"`. Values in the list should be deduplicated.
207    ///
208    ///
209    /// ```text
210    /// endpoint.users:3182887624:4267882815|s
211    /// endpoint.users:e2546e4c-ecd0-43ad-ae27-87960e57a658|s
212    /// ```
213    ///
214    /// # Serialization
215    ///
216    /// This variant serializes to a list of 32-bit integers.
217    ///
218    /// # Aggregation
219    ///
220    /// Set values are internally represented as 32-bit integer hashes of the original value. These
221    /// hashes can be ingested directly as seen in the first example above. If raw strings are sent,
222    /// they will be hashed on-the-fly.
223    ///
224    /// Internally, set metrics are stored in data sketches that expose an approximate cardinality.
225    #[serde(rename = "s")]
226    Set(SetValue),
227
228    /// Stores absolute snapshots of values.
229    ///
230    /// In addition to plain [counters](Self::Counter), gauges store a snapshot of the maximum,
231    /// minimum and sum of all values, as well as the last reported value. Note that the "last"
232    /// component of this aggregation is not commutative. Which value is preserved as last value is
233    /// implementation-defined.
234    ///
235    /// # Statsd Format
236    ///
237    /// Gauges are declared as `"g"`. There are two ways to ingest gauges:
238    ///  1. As a single value. In this case, the provided value is assumed as the last, minimum,
239    ///     maximum, and the sum.
240    ///  2. As a sequence of five values in the order: `last`, `min`, `max`, `sum`, `count`.
241    ///
242    /// ```text
243    /// endpoint.parallel_requests:25|g
244    /// endpoint.parallel_requests:25:17:42:220:85|g
245    /// ```
246    ///
247    /// # Serialization
248    ///
249    /// This variant serializes to a structure with named fields, see [`GaugeValue`].
250    ///
251    /// # Aggregation
252    ///
253    /// Gauges aggregate by folding each of the components based on their semantics:
254    ///  - `last` assumes the newly added value
255    ///  - `min` retains the smaller value
256    ///  - `max` retains the larger value
257    ///  - `sum` adds the new value to the existing sum
258    ///  - `count` adds the count of the newly added gauge (defaulting to `1`)
259    #[serde(rename = "g")]
260    Gauge(GaugeValue),
261}
262
263impl BucketValue {
264    /// Returns a bucket value representing a counter with the given value.
265    pub fn counter(value: CounterType) -> Self {
266        Self::Counter(value)
267    }
268
269    /// Returns a bucket value representing a distribution with a single given value.
270    pub fn distribution(value: DistributionType) -> Self {
271        Self::Distribution(dist![value])
272    }
273
274    /// Returns a bucket value representing a set with a single given hash value.
275    pub fn set(value: SetType) -> Self {
276        Self::Set(std::iter::once(value).collect())
277    }
278
279    /// Returns a bucket value representing a set with a single given string value.
280    pub fn set_from_str(string: &str) -> Self {
281        Self::set(hash_set_value(string))
282    }
283
284    /// Returns a bucket value representing a set with a single given value.
285    pub fn set_from_display(display: impl fmt::Display) -> Self {
286        Self::set(hash_set_value(&display.to_string()))
287    }
288
289    /// Returns a bucket value representing a gauge with a single given value.
290    pub fn gauge(value: GaugeType) -> Self {
291        Self::Gauge(GaugeValue::single(value))
292    }
293
294    /// Returns the type of this value.
295    pub fn ty(&self) -> MetricType {
296        match self {
297            Self::Counter(_) => MetricType::Counter,
298            Self::Distribution(_) => MetricType::Distribution,
299            Self::Set(_) => MetricType::Set,
300            Self::Gauge(_) => MetricType::Gauge,
301        }
302    }
303
304    /// Returns the number of raw data points in this value.
305    pub fn len(&self) -> usize {
306        match self {
307            BucketValue::Counter(_) => 1,
308            BucketValue::Distribution(distribution) => distribution.len(),
309            BucketValue::Set(set) => set.len(),
310            BucketValue::Gauge(_) => 5,
311        }
312    }
313
314    /// Returns `true` if this bucket contains no values.
315    pub fn is_empty(&self) -> bool {
316        self.len() == 0
317    }
318
319    /// Estimates the number of bytes needed to encode the bucket value.
320    ///
321    /// Note that this does not necessarily match the exact memory footprint of the value,
322    /// because data structures have a memory overhead.
323    pub fn cost(&self) -> usize {
324        // Beside the size of [`BucketValue`], we also need to account for the cost of values
325        // allocated dynamically.
326        let allocated_cost = match self {
327            Self::Counter(_) => 0,
328            Self::Set(s) => mem::size_of::<SetType>() * s.len(),
329            Self::Gauge(_) => 0,
330            Self::Distribution(d) => d.len() * mem::size_of::<DistributionType>(),
331        };
332
333        mem::size_of::<Self>() + allocated_cost
334    }
335
336    /// Merges the given `bucket_value` into `self`.
337    ///
338    /// Returns `Ok(())` if the two bucket values can be merged. This is the case when both bucket
339    /// values are of the same variant. Otherwise, this returns `Err(other)`.
340    pub fn merge(&mut self, other: Self) -> Result<(), Self> {
341        match (self, other) {
342            (Self::Counter(slf), Self::Counter(other)) => *slf = slf.saturating_add(other),
343            (Self::Distribution(slf), Self::Distribution(other)) => slf.extend_from_slice(&other),
344            (Self::Set(slf), Self::Set(other)) => slf.extend(other),
345            (Self::Gauge(slf), Self::Gauge(other)) => slf.merge(other),
346            (_, other) => return Err(other),
347        }
348
349        Ok(())
350    }
351}
352
353/// Parses a list of counter values separated by colons and sums them up.
354fn parse_counter(string: &str) -> Option<CounterType> {
355    let mut sum = CounterType::default();
356    for component in string.split(VALUE_SEPARATOR) {
357        sum = sum.saturating_add(component.parse().ok()?);
358    }
359    Some(sum)
360}
361
362/// Parses a distribution from a list of floating point values separated by colons.
363fn parse_distribution(string: &str) -> Option<DistributionValue> {
364    let mut dist = DistributionValue::default();
365    for component in string.split(VALUE_SEPARATOR) {
366        dist.push(component.parse().ok()?);
367    }
368    Some(dist)
369}
370
371/// Parses a set of hashed numeric values.
372fn parse_set(string: &str) -> Option<SetValue> {
373    let mut set = SetValue::default();
374    for component in string.split(VALUE_SEPARATOR) {
375        let hash = component
376            .parse()
377            .unwrap_or_else(|_| protocol::hash_set_value(component));
378        set.insert(hash);
379    }
380    Some(set)
381}
382
383/// Parses a gauge from a value.
384///
385/// The gauge can either be given as a single floating point value, or as a list of exactly five
386/// values in the order of [`GaugeValue`] fields.
387fn parse_gauge(string: &str) -> Option<GaugeValue> {
388    let mut components = string.split(VALUE_SEPARATOR);
389
390    let last = components.next()?.parse().ok()?;
391    Some(if let Some(min) = components.next() {
392        GaugeValue {
393            last,
394            min: min.parse().ok()?,
395            max: components.next()?.parse().ok()?,
396            sum: components.next()?.parse().ok()?,
397            count: components.next()?.parse().ok()?,
398        }
399    } else {
400        GaugeValue::single(last)
401    })
402}
403
404/// Parses tags in the format `tag1,tag2:value`.
405///
406/// Tag values are optional. For tags with missing values, an empty `""` value is assumed.
407fn parse_tags(string: &str) -> Option<MetricTags> {
408    let mut map = MetricTags::new();
409
410    for pair in string.split(',') {
411        let mut name_value = pair.splitn(2, ':');
412
413        let name = name_value.next()?;
414        if !protocol::is_valid_tag_key(name) {
415            continue;
416        }
417
418        if let Ok(value) = protocol::unescape_tag_value(name_value.next().unwrap_or_default()) {
419            map.insert(name.to_owned(), value);
420        }
421    }
422
423    Some(map)
424}
425
426/// Parses a unix UTC timestamp.
427fn parse_timestamp(string: &str) -> Option<UnixTimestamp> {
428    string.parse().ok().map(UnixTimestamp::from_secs)
429}
430
431/// An aggregation of metric values.
432///
433/// As opposed to single metric values, bucket aggregations can carry multiple values. See
434/// [`MetricType`] for a description on how values are aggregated in buckets. Values are aggregated
435/// by metric name, type, time window, and all tags. Particularly, this allows metrics to have the
436/// same name even if their types differ.
437///
438/// See the [crate documentation](crate) for general information on Metrics.
439///
440/// # Values
441///
442/// The contents of a bucket, especially their representation and serialization, depend on the
443/// metric type:
444///
445/// - [Counters](BucketValue::Counter) store a single value, serialized as floating point.
446/// - [Distributions](MetricType::Distribution) and [sets](MetricType::Set) store the full set of
447///   reported values.
448/// - [Gauges](BucketValue::Gauge) store a snapshot of reported values, see [`GaugeValue`].
449///
450/// # Submission Protocol
451///
452/// ```text
453/// <name>[@unit]:<value>[:<value>...]|<type>[|#<tag_key>:<tag_value>,<tag>][|T<timestamp>]
454/// ```
455///
456/// See the [field documentation](Bucket#fields) for more information on the components. An example
457/// submission looks like this:
458///
459/// ```text
460#[doc = include_str!("../tests/fixtures/buckets.statsd.txt")]
461/// ```
462///
463/// To parse a submission payload, use [`Bucket::parse_all`].
464///
465/// # JSON Representation
466///
467/// Alternatively to the submission protocol, metrics can be represented as structured data in JSON.
468/// The data type of the `value` field is determined by the metric type.
469///
470/// In addition to the submission protocol, buckets have a required [`width`](Self::width) field in
471/// their JSON representation.
472///
473/// ```json
474#[doc = include_str!("../tests/fixtures/buckets.json")]
475/// ```
476///
477/// To parse a JSON payload, use [`serde_json`].
478///
479/// # Hashing of Sets
480///
481/// Set values can be specified as strings in the submission protocol. They are always hashed
482/// into a 32-bit value and the original value is dropped. If the submission protocol contains a
483/// 32-bit integer, it will be used directly, instead.
484///
485/// **Example**:
486///
487/// ```text
488#[doc = include_str!("../tests/fixtures/set.statsd.txt")]
489/// ```
490///
491/// The above submission is represented as:
492///
493/// ```json
494#[doc = include_str!("../tests/fixtures/set.json")]
495/// ```
496#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
497pub struct Bucket {
498    /// The start time of the bucket's time window.
499    ///
500    /// If a timestamp is not supplied as part of the submission payload, the default timestamp
501    /// supplied to [`Bucket::parse`] or [`Bucket::parse_all`] is associated with the metric. It is
502    /// then aligned with the aggregation window.
503    ///
504    /// # Statsd Format
505    ///
506    /// In statsd, timestamps are part of the `|`-separated list following values. Timestamps start
507    /// with the literal character `'T'` followed by the UNIX timestamp.
508    ///
509    /// The timestamp must be a positive integer in decimal notation representing the value of the
510    /// UNIX timestamp.
511    ///
512    /// # Example
513    ///
514    /// ```text
515    /// endpoint.hits:1|c|T1615889440
516    /// ```
517    pub timestamp: UnixTimestamp,
518
519    /// The length of the time window in seconds.
520    ///
521    /// To initialize a new bucket, choose `0` as width. Once the bucket is tracked by Relay's
522    /// aggregator, the width is aligned with configuration for the namespace and the  timestamp is
523    /// adjusted accordingly.
524    ///
525    /// # Statsd Format
526    ///
527    /// Specifying the bucket width in statsd is not supported.
528    pub width: u64,
529
530    /// The name of the metric in MRI (metric resource identifier) format.
531    ///
532    /// MRIs have the format `<type>:<ns>/<name>@<unit>`. See [`MetricResourceIdentifier`] for
533    /// information on fields and representations.
534    ///
535    /// # Statsd Format
536    ///
537    /// MRIs are sent in a more relaxed format: `[<namespace/]name[@unit]`. The value type is not
538    /// part of the metric name and namespaces are optional.
539    ///
540    /// Namespaces and units must consist of ASCII characters and match the regular expression
541    /// `/\w+/`. The name component of MRIs consist of unicode characters and must match the
542    /// regular expression `/\w[\w\-.]*/`. Note that the name must begin with a letter.
543    ///
544    /// Per convention, dots separate metric names into components, where the leading components are
545    /// considered namespaces and the final component is the name of the metric within its
546    /// namespace.
547    ///
548    /// # Examples
549    ///
550    /// ```text
551    /// endpoint.hits:1|c
552    /// custom/endpoint.hits:1|c
553    /// custom/endpoint.duration@millisecond:21.5|d
554    /// ```
555    pub name: MetricName,
556
557    /// The type and aggregated values of this bucket.
558    ///
559    /// Buckets support multiple values that are aggregated and can be accessed using a range of
560    /// aggregation functions depending on the value type. While always a variable number of values
561    /// can be sent in, some aggregations reduce the raw values to a fixed set of aggregates.
562    ///
563    /// See [`BucketValue`] for more examples and semantics.
564    ///
565    /// # Statsd Payload
566    ///
567    /// The bucket value and its type are specified in separate fields following the metric name in
568    /// the format: `<name>:<value>|<type>`. Values must be base-10 floating point numbers with
569    /// optional decimal places.
570    ///
571    /// It is possible to pack multiple values into a single datagram, but note that the type and
572    /// the value representation must match for this. Refer to the [`BucketValue`] docs for more
573    /// examples.
574    ///
575    /// # Example
576    ///
577    /// ```text
578    /// endpoint.hits:21|c
579    /// endpoint.hits:4.5|c
580    /// ```
581    #[serde(flatten)]
582    pub value: BucketValue,
583
584    /// A list of tags adding dimensions to the metric for filtering and aggregation.
585    ///
586    /// Tags allow to compute separate aggregates to filter or group metric values by any number of
587    /// dimensions. Tags consist of a unique tag key and one associated value. For tags with missing
588    /// values, an empty `""` value is assumed at query time.
589    ///
590    /// # Statsd Format
591    ///
592    /// Tags are preceded with a hash `#` and specified in a comma (`,`) separated list. Each tag
593    /// can either be a tag name, or a `name:value` combination. Tags are optional and can be
594    /// omitted.
595    ///
596    /// Tag keys are restricted to ASCII characters and must match the regular expression
597    /// `/[\w\-.\/]+/`.
598    ///
599    /// Tag values can contain unicode characters with the following escaping rules:
600    ///  - Tab is escaped as `\t`.
601    ///  - Carriage return is escaped as `\r`.
602    ///  - Line feed is escaped as `\n`.
603    ///  - Backslash is escaped as `\\`.
604    ///  - Commas and pipes are given unicode escapes in the form `\u{2c}` and `\u{7c}`,
605    ///    respectively.
606    ///
607    /// # Example
608    ///
609    /// ```text
610    /// endpoint.hits:1|c|#route:user_index,environment:production,release:1.4.0
611    /// ```
612    #[serde(default, skip_serializing_if = "MetricTags::is_empty")]
613    pub tags: MetricTags,
614
615    /// Relay internal metadata for a metric bucket.
616    ///
617    /// The metadata contains meta information about the metric bucket itself,
618    /// for example how many this bucket has been aggregated in total.
619    #[serde(default, skip_serializing_if = "BucketMetadata::is_default")]
620    pub metadata: BucketMetadata,
621}
622
623impl Bucket {
624    /// Parses a statsd-compatible payload.
625    ///
626    /// ```text
627    /// [<ns>/]<name>[@<unit>]:<value>|<type>[|#<tags>]`
628    /// ```
629    fn parse_str(string: &str, timestamp: UnixTimestamp) -> Option<Self> {
630        let mut components = string.split('|');
631
632        let (mri_str, values_str) = components.next()?.split_once(':')?;
633        let ty = components.next().and_then(|s| s.parse().ok())?;
634
635        let mri = MetricResourceIdentifier::parse_with_type(mri_str, ty).ok()?;
636        let value = match ty {
637            MetricType::Counter => BucketValue::Counter(parse_counter(values_str)?),
638            MetricType::Distribution => BucketValue::Distribution(parse_distribution(values_str)?),
639            MetricType::Set => BucketValue::Set(parse_set(values_str)?),
640            MetricType::Gauge => BucketValue::Gauge(parse_gauge(values_str)?),
641        };
642
643        let mut bucket = Bucket {
644            timestamp,
645            width: 0,
646            name: mri.to_string().into(),
647            value,
648            tags: Default::default(),
649            metadata: Default::default(),
650        };
651
652        for component in components {
653            match component.chars().next() {
654                Some('#') => {
655                    bucket.tags = parse_tags(component.get(1..)?)?;
656                }
657                Some('T') => {
658                    bucket.timestamp = parse_timestamp(component.get(1..)?)?;
659                }
660                _ => (),
661            }
662        }
663
664        Some(bucket)
665    }
666
667    /// Parses a single metric aggregate from the raw protocol.
668    ///
669    /// See the [`Bucket`] for more information on the protocol.
670    ///
671    /// # Example
672    ///
673    /// ```
674    /// use relay_metrics::{Bucket, UnixTimestamp};
675    ///
676    /// let bucket = Bucket::parse(b"response_time@millisecond:57|d", UnixTimestamp::now())
677    ///     .expect("metric should parse");
678    /// ```
679    pub fn parse(slice: &[u8], timestamp: UnixTimestamp) -> Result<Self, ParseMetricError> {
680        let string = std::str::from_utf8(slice).map_err(|_| ParseMetricError)?;
681        Self::parse_str(string, timestamp).ok_or(ParseMetricError)
682    }
683
684    /// Parses a set of metric aggregates from the raw protocol.
685    ///
686    /// Returns a metric result for each line in `slice`, ignoring empty lines. Both UNIX newlines
687    /// (`\n`) and Windows newlines (`\r\n`) are supported.
688    ///
689    /// It is possible to continue consuming the iterator after `Err` is yielded.
690    ///
691    /// See [`Bucket`] for more information on the protocol.
692    ///
693    /// # Example
694    ///
695    /// ```
696    /// use relay_metrics::{Bucket, UnixTimestamp};
697    ///
698    /// let data = br#"
699    /// endpoint.response_time@millisecond:57|d
700    /// endpoint.hits:1|c
701    /// "#;
702    ///
703    /// for metric_result in Bucket::parse_all(data, UnixTimestamp::now()) {
704    ///     let bucket = metric_result.expect("metric should parse");
705    ///     println!("Metric {}: {:?}", bucket.name, bucket.value);
706    /// }
707    /// ```
708    pub fn parse_all(slice: &[u8], timestamp: UnixTimestamp) -> ParseBuckets<'_> {
709        ParseBuckets { slice, timestamp }
710    }
711
712    /// Returns the value of the specified tag if it exists.
713    pub fn tag(&self, name: &str) -> Option<&str> {
714        self.tags.get(name).map(|s| s.as_str())
715    }
716
717    /// Removes the value of the specified tag.
718    ///
719    /// If the tag exists, the removed value is returned.
720    pub fn remove_tag(&mut self, name: &str) -> Option<String> {
721        self.tags.remove(name)
722    }
723}
724
725impl CardinalityItem for Bucket {
726    fn namespace(&self) -> Option<MetricNamespace> {
727        self.name.try_namespace()
728    }
729
730    fn name(&self) -> &MetricName {
731        &self.name
732    }
733
734    fn to_hash(&self) -> u32 {
735        let mut hasher = FnvHasher::default();
736        self.name.hash(&mut hasher);
737        self.tags.hash(&mut hasher);
738        hasher.finish32()
739    }
740}
741
742/// Relay internal metadata for a metric bucket.
743#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
744pub struct BucketMetadata {
745    /// How many times the bucket was merged.
746    ///
747    /// Creating a new bucket is the first merge.
748    /// Merging two buckets sums the amount of merges.
749    ///
750    /// For example: Merging two un-merged buckets will yield a total
751    /// of `2` merges.
752    ///
753    /// Due to how Relay aggregates metrics and later splits them into multiple
754    /// buckets again, the amount of merges can be zero.
755    /// When splitting a bucket the total volume of the bucket may only be attributed
756    /// to one part or distributed across the resulting buckets, in either case
757    /// values of `0` are possible.
758    pub merges: u32,
759
760    /// Received timestamp of the first metric in this bucket.
761    ///
762    /// This field should be set to the time in which the first metric of a specific bucket was
763    /// received in the outermost internal Relay.
764    pub received_at: Option<UnixTimestamp>,
765
766    /// Is `true` if this metric was extracted from a sampled/indexed envelope item.
767    ///
768    /// The final dynamic sampling decision is always made in processing Relays.
769    /// If a metric was extracted from an item which is sampled (i.e. retained by dynamic sampling), this flag is `true`.
770    ///
771    /// Since these metrics from samples carry additional information, e.g. they don't
772    /// require rate limiting since the sample they've been extracted from was already
773    /// rate limited, this flag must be included in the aggregation key when aggregation buckets.
774    #[serde(skip)]
775    pub extracted_from_indexed: bool,
776}
777
778impl BucketMetadata {
779    /// Creates a fresh metadata instance.
780    ///
781    /// The new metadata is initialized with `1` merge and a given `received_at` timestamp.
782    pub fn new(received_at: UnixTimestamp) -> Self {
783        Self {
784            merges: 1,
785            received_at: Some(received_at),
786            extracted_from_indexed: false,
787        }
788    }
789
790    /// Whether the metadata does not contain more information than the default.
791    pub fn is_default(&self) -> bool {
792        &Self::default() == self
793    }
794
795    /// Merges another metadata object into the current one.
796    pub fn merge(&mut self, other: Self) {
797        self.merges = self.merges.saturating_add(other.merges);
798        self.received_at = match (self.received_at, other.received_at) {
799            (Some(received_at), None) => Some(received_at),
800            (None, Some(received_at)) => Some(received_at),
801            (left, right) => left.min(right),
802        };
803    }
804}
805
806impl Default for BucketMetadata {
807    fn default() -> Self {
808        Self {
809            merges: 1,
810            received_at: None,
811            extracted_from_indexed: false,
812        }
813    }
814}
815
816/// Iterator over parsed metrics returned from [`Bucket::parse_all`].
817#[derive(Clone, Debug)]
818pub struct ParseBuckets<'a> {
819    slice: &'a [u8],
820    timestamp: UnixTimestamp,
821}
822
823impl Default for ParseBuckets<'_> {
824    fn default() -> Self {
825        Self {
826            slice: &[],
827            // The timestamp will never be returned.
828            timestamp: UnixTimestamp::from_secs(4711),
829        }
830    }
831}
832
833impl Iterator for ParseBuckets<'_> {
834    type Item = Result<Bucket, ParseMetricError>;
835
836    fn next(&mut self) -> Option<Self::Item> {
837        loop {
838            if self.slice.is_empty() {
839                return None;
840            }
841
842            let mut split = self.slice.splitn(2, |&b| b == b'\n');
843            let current = split.next()?;
844            self.slice = split.next().unwrap_or_default();
845
846            let string = match std::str::from_utf8(current) {
847                Ok(string) => string.strip_suffix('\r').unwrap_or(string),
848                Err(_) => return Some(Err(ParseMetricError)),
849            };
850
851            if !string.is_empty() {
852                return Some(Bucket::parse_str(string, self.timestamp).ok_or(ParseMetricError));
853            }
854        }
855    }
856}
857
858impl FusedIterator for ParseBuckets<'_> {}
859
860#[cfg(test)]
861mod tests {
862    use similar_asserts::assert_eq;
863
864    use crate::protocol::{DurationUnit, MetricUnit};
865
866    use super::*;
867
868    #[test]
869    fn test_distribution_value_size() {
870        // DistributionValue uses a SmallVec internally to prevent an additional allocation and
871        // indirection in cases where it needs to store only a small number of items. This is
872        // enabled by a comparably large `GaugeValue`, which stores five atoms. Ensure that the
873        // `DistributionValue`'s size does not exceed that of `GaugeValue`.
874        assert!(
875            std::mem::size_of::<DistributionValue>() <= std::mem::size_of::<GaugeValue>(),
876            "distribution value should not exceed gauge {}",
877            std::mem::size_of::<DistributionValue>()
878        );
879    }
880
881    #[test]
882    fn test_bucket_value_merge_counter() {
883        let mut value = BucketValue::Counter(42.into());
884        value.merge(BucketValue::Counter(43.into())).unwrap();
885        assert_eq!(value, BucketValue::Counter(85.into()));
886    }
887
888    #[test]
889    fn test_bucket_value_merge_distribution() {
890        let mut value = BucketValue::Distribution(dist![1, 2, 3]);
891        value.merge(BucketValue::Distribution(dist![2, 4])).unwrap();
892        assert_eq!(value, BucketValue::Distribution(dist![1, 2, 3, 2, 4]));
893    }
894
895    #[test]
896    fn test_bucket_value_merge_set() {
897        let mut value = BucketValue::Set(vec![1, 2].into_iter().collect());
898        value.merge(BucketValue::Set([2, 3].into())).unwrap();
899        assert_eq!(value, BucketValue::Set(vec![1, 2, 3].into_iter().collect()));
900    }
901
902    #[test]
903    fn test_bucket_value_merge_gauge() {
904        let mut value = BucketValue::Gauge(GaugeValue::single(42.into()));
905        value.merge(BucketValue::gauge(43.into())).unwrap();
906
907        assert_eq!(
908            value,
909            BucketValue::Gauge(GaugeValue {
910                last: 43.into(),
911                min: 42.into(),
912                max: 43.into(),
913                sum: 85.into(),
914                count: 2,
915            })
916        );
917    }
918
919    #[test]
920    fn test_parse_garbage() {
921        let s = "x23-408j17z4232@#34d\nc3456y7^😎";
922        let timestamp = UnixTimestamp::from_secs(4711);
923        let result = Bucket::parse(s.as_bytes(), timestamp);
924        assert!(result.is_err());
925    }
926
927    #[test]
928    fn test_parse_counter() {
929        let s = "transactions/foo:42|c";
930        let timestamp = UnixTimestamp::from_secs(4711);
931        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
932        insta::assert_debug_snapshot!(metric, @r###"
933        Bucket {
934            timestamp: UnixTimestamp(4711),
935            width: 0,
936            name: MetricName(
937                "c:transactions/foo@none",
938            ),
939            value: Counter(
940                42.0,
941            ),
942            tags: {},
943            metadata: BucketMetadata {
944                merges: 1,
945                received_at: None,
946                extracted_from_indexed: false,
947            },
948        }
949        "###);
950    }
951
952    #[test]
953    fn test_parse_counter_packed() {
954        let s = "transactions/foo:42:17:21|c";
955        let timestamp = UnixTimestamp::from_secs(4711);
956        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
957        assert_eq!(metric.value, BucketValue::Counter(80.into()));
958    }
959
960    #[test]
961    fn test_parse_distribution() {
962        let s = "transactions/foo:17.5|d";
963        let timestamp = UnixTimestamp::from_secs(4711);
964        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
965        insta::assert_debug_snapshot!(metric, @r###"
966        Bucket {
967            timestamp: UnixTimestamp(4711),
968            width: 0,
969            name: MetricName(
970                "d:transactions/foo@none",
971            ),
972            value: Distribution(
973                [
974                    17.5,
975                ],
976            ),
977            tags: {},
978            metadata: BucketMetadata {
979                merges: 1,
980                received_at: None,
981                extracted_from_indexed: false,
982            },
983        }
984        "###);
985    }
986
987    #[test]
988    fn test_parse_distribution_packed() {
989        let s = "transactions/foo:17.5:21.9:42.7|d";
990        let timestamp = UnixTimestamp::from_secs(4711);
991        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
992        assert_eq!(
993            metric.value,
994            BucketValue::Distribution(dist![
995                FiniteF64::new(17.5).unwrap(),
996                FiniteF64::new(21.9).unwrap(),
997                FiniteF64::new(42.7).unwrap()
998            ])
999        );
1000    }
1001
1002    #[test]
1003    fn test_parse_histogram() {
1004        let s = "transactions/foo:17.5|h"; // common alias for distribution
1005        let timestamp = UnixTimestamp::from_secs(4711);
1006        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1007        assert_eq!(
1008            metric.value,
1009            BucketValue::Distribution(dist![FiniteF64::new(17.5).unwrap()])
1010        );
1011    }
1012
1013    #[test]
1014    fn test_parse_set() {
1015        let s = "transactions/foo:4267882815|s";
1016        let timestamp = UnixTimestamp::from_secs(4711);
1017        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1018        insta::assert_debug_snapshot!(metric, @r###"
1019        Bucket {
1020            timestamp: UnixTimestamp(4711),
1021            width: 0,
1022            name: MetricName(
1023                "s:transactions/foo@none",
1024            ),
1025            value: Set(
1026                {
1027                    4267882815,
1028                },
1029            ),
1030            tags: {},
1031            metadata: BucketMetadata {
1032                merges: 1,
1033                received_at: None,
1034                extracted_from_indexed: false,
1035            },
1036        }
1037        "###);
1038    }
1039
1040    #[test]
1041    fn test_parse_set_hashed() {
1042        let s = "transactions/foo:e2546e4c-ecd0-43ad-ae27-87960e57a658|s";
1043        let timestamp = UnixTimestamp::from_secs(4711);
1044        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1045        assert_eq!(metric.value, BucketValue::Set([4267882815].into()));
1046    }
1047
1048    #[test]
1049    fn test_parse_set_hashed_packed() {
1050        let s = "transactions/foo:e2546e4c-ecd0-43ad-ae27-87960e57a658:00449b66-d91f-4fb8-b324-4c8bdf2499f6|s";
1051        let timestamp = UnixTimestamp::from_secs(4711);
1052        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1053        assert_eq!(
1054            metric.value,
1055            BucketValue::Set([181348692, 4267882815].into())
1056        );
1057    }
1058
1059    #[test]
1060    fn test_parse_set_packed() {
1061        let s = "transactions/foo:3182887624:4267882815|s";
1062        let timestamp = UnixTimestamp::from_secs(4711);
1063        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1064        assert_eq!(
1065            metric.value,
1066            BucketValue::Set([3182887624, 4267882815].into())
1067        )
1068    }
1069
1070    #[test]
1071    fn test_parse_gauge() {
1072        let s = "transactions/foo:42|g";
1073        let timestamp = UnixTimestamp::from_secs(4711);
1074        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1075        insta::assert_debug_snapshot!(metric, @r###"
1076        Bucket {
1077            timestamp: UnixTimestamp(4711),
1078            width: 0,
1079            name: MetricName(
1080                "g:transactions/foo@none",
1081            ),
1082            value: Gauge(
1083                GaugeValue {
1084                    last: 42.0,
1085                    min: 42.0,
1086                    max: 42.0,
1087                    sum: 42.0,
1088                    count: 1,
1089                },
1090            ),
1091            tags: {},
1092            metadata: BucketMetadata {
1093                merges: 1,
1094                received_at: None,
1095                extracted_from_indexed: false,
1096            },
1097        }
1098        "###);
1099    }
1100
1101    #[test]
1102    fn test_parse_gauge_packed() {
1103        let s = "transactions/foo:25:17:42:220:85|g";
1104        let timestamp = UnixTimestamp::from_secs(4711);
1105        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1106        insta::assert_debug_snapshot!(metric, @r###"
1107        Bucket {
1108            timestamp: UnixTimestamp(4711),
1109            width: 0,
1110            name: MetricName(
1111                "g:transactions/foo@none",
1112            ),
1113            value: Gauge(
1114                GaugeValue {
1115                    last: 25.0,
1116                    min: 17.0,
1117                    max: 42.0,
1118                    sum: 220.0,
1119                    count: 85,
1120                },
1121            ),
1122            tags: {},
1123            metadata: BucketMetadata {
1124                merges: 1,
1125                received_at: None,
1126                extracted_from_indexed: false,
1127            },
1128        }
1129        "###);
1130    }
1131
1132    #[test]
1133    fn test_parse_implicit_namespace() {
1134        let s = "foo:42|c";
1135        let timestamp = UnixTimestamp::from_secs(4711);
1136        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1137        insta::assert_debug_snapshot!(metric, @r###"
1138        Bucket {
1139            timestamp: UnixTimestamp(4711),
1140            width: 0,
1141            name: MetricName(
1142                "c:custom/foo@none",
1143            ),
1144            value: Counter(
1145                42.0,
1146            ),
1147            tags: {},
1148            metadata: BucketMetadata {
1149                merges: 1,
1150                received_at: None,
1151                extracted_from_indexed: false,
1152            },
1153        }
1154        "###);
1155    }
1156
1157    #[test]
1158    fn test_parse_unit() {
1159        let s = "transactions/foo@second:17.5|d";
1160        let timestamp = UnixTimestamp::from_secs(4711);
1161        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1162        let mri = MetricResourceIdentifier::parse(&metric.name).unwrap();
1163        assert_eq!(mri.unit, MetricUnit::Duration(DurationUnit::Second));
1164    }
1165
1166    #[test]
1167    fn test_parse_unit_regression() {
1168        let s = "transactions/foo@s:17.5|d";
1169        let timestamp = UnixTimestamp::from_secs(4711);
1170        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1171        let mri = MetricResourceIdentifier::parse(&metric.name).unwrap();
1172        assert_eq!(mri.unit, MetricUnit::Duration(DurationUnit::Second));
1173    }
1174
1175    #[test]
1176    fn test_parse_tags() {
1177        let s = "transactions/foo:17.5|d|#foo,bar:baz";
1178        let timestamp = UnixTimestamp::from_secs(4711);
1179        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1180        insta::assert_debug_snapshot!(metric.tags, @r#"
1181            {
1182                "bar": "baz",
1183                "foo": "",
1184            }
1185            "#);
1186    }
1187
1188    #[test]
1189    fn test_parse_tags_escaped() {
1190        let s = "transactions/foo:17.5|d|#foo:😅\\u{2c}🚀";
1191        let timestamp = UnixTimestamp::from_secs(4711);
1192        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1193        insta::assert_debug_snapshot!(metric.tags, @r#"
1194            {
1195                "foo": "😅,🚀",
1196            }
1197            "#);
1198    }
1199
1200    #[test]
1201    fn test_parse_timestamp() {
1202        let s = "transactions/foo:17.5|d|T1615889449";
1203        let timestamp = UnixTimestamp::from_secs(4711);
1204        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1205        assert_eq!(metric.timestamp, UnixTimestamp::from_secs(1615889449));
1206    }
1207
1208    #[test]
1209    fn test_parse_sample_rate() {
1210        // Sample rate should be ignored
1211        let s = "transactions/foo:17.5|d|@0.1";
1212        let timestamp = UnixTimestamp::from_secs(4711);
1213        Bucket::parse(s.as_bytes(), timestamp).unwrap();
1214    }
1215
1216    #[test]
1217    fn test_parse_invalid_name() {
1218        let s = "foo#bar:42|c";
1219        let timestamp = UnixTimestamp::from_secs(4711);
1220        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1221        assert_eq!(metric.name.as_ref(), "c:custom/foo_bar@none");
1222    }
1223
1224    #[test]
1225    fn test_parse_empty_name() {
1226        let s = ":42|c";
1227        let timestamp = UnixTimestamp::from_secs(4711);
1228        let metric = Bucket::parse(s.as_bytes(), timestamp);
1229        assert!(metric.is_err());
1230    }
1231
1232    #[test]
1233    fn test_parse_invalid_name_with_leading_digit() {
1234        let s = "64bit:42|c";
1235        let timestamp = UnixTimestamp::from_secs(4711);
1236        let metric = Bucket::parse(s.as_bytes(), timestamp);
1237        assert!(metric.is_err());
1238    }
1239
1240    #[test]
1241    fn test_parse_all() {
1242        let s = "transactions/foo:42|c\nbar:17|c";
1243        let timestamp = UnixTimestamp::from_secs(4711);
1244
1245        let metrics: Vec<Bucket> = Bucket::parse_all(s.as_bytes(), timestamp)
1246            .collect::<Result<_, _>>()
1247            .unwrap();
1248
1249        assert_eq!(metrics.len(), 2);
1250    }
1251
1252    #[test]
1253    fn test_parse_all_crlf() {
1254        let s = "transactions/foo:42|c\r\nbar:17|c";
1255        let timestamp = UnixTimestamp::from_secs(4711);
1256
1257        let metrics: Vec<Bucket> = Bucket::parse_all(s.as_bytes(), timestamp)
1258            .collect::<Result<_, _>>()
1259            .unwrap();
1260
1261        assert_eq!(metrics.len(), 2);
1262    }
1263
1264    #[test]
1265    fn test_parse_all_empty_lines() {
1266        let s = "transactions/foo:42|c\n\n\nbar:17|c";
1267        let timestamp = UnixTimestamp::from_secs(4711);
1268
1269        let metric_count = Bucket::parse_all(s.as_bytes(), timestamp).count();
1270        assert_eq!(metric_count, 2);
1271    }
1272
1273    #[test]
1274    fn test_parse_all_trailing() {
1275        let s = "transactions/foo:42|c\nbar:17|c\n";
1276        let timestamp = UnixTimestamp::from_secs(4711);
1277
1278        let metric_count = Bucket::parse_all(s.as_bytes(), timestamp).count();
1279        assert_eq!(metric_count, 2);
1280    }
1281
1282    #[test]
1283    fn test_metrics_docs() {
1284        let text = include_str!("../tests/fixtures/buckets.statsd.txt").trim_end();
1285        let json = include_str!("../tests/fixtures/buckets.json").trim_end();
1286
1287        let timestamp = UnixTimestamp::from_secs(0);
1288        let statsd_metrics = Bucket::parse_all(text.as_bytes(), timestamp)
1289            .collect::<Result<Vec<_>, _>>()
1290            .unwrap();
1291
1292        let json_metrics: Vec<Bucket> = serde_json::from_str(json).unwrap();
1293
1294        assert_eq!(statsd_metrics, json_metrics);
1295    }
1296
1297    #[test]
1298    fn test_set_docs() {
1299        let text = include_str!("../tests/fixtures/set.statsd.txt").trim_end();
1300        let json = include_str!("../tests/fixtures/set.json").trim_end();
1301
1302        let timestamp = UnixTimestamp::from_secs(1615889449);
1303        let statsd_metric = Bucket::parse(text.as_bytes(), timestamp).unwrap();
1304        let json_metric: Bucket = serde_json::from_str(json).unwrap();
1305
1306        assert_eq!(statsd_metric, json_metric);
1307    }
1308
1309    #[test]
1310    fn test_parse_buckets() {
1311        let json = r#"[
1312          {
1313            "name": "endpoint.response_time",
1314            "unit": "millisecond",
1315            "value": [36, 49, 57, 68],
1316            "type": "d",
1317            "timestamp": 1615889440,
1318            "width": 10,
1319            "tags": {
1320                "route": "user_index"
1321            },
1322            "metadata": {
1323                "merges": 1,
1324                "received_at": 1615889440
1325            }
1326          }
1327        ]"#;
1328
1329        let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1330
1331        insta::assert_debug_snapshot!(buckets, @r###"
1332        [
1333            Bucket {
1334                timestamp: UnixTimestamp(1615889440),
1335                width: 10,
1336                name: MetricName(
1337                    "endpoint.response_time",
1338                ),
1339                value: Distribution(
1340                    [
1341                        36.0,
1342                        49.0,
1343                        57.0,
1344                        68.0,
1345                    ],
1346                ),
1347                tags: {
1348                    "route": "user_index",
1349                },
1350                metadata: BucketMetadata {
1351                    merges: 1,
1352                    received_at: Some(
1353                        UnixTimestamp(1615889440),
1354                    ),
1355                    extracted_from_indexed: false,
1356                },
1357            },
1358        ]
1359        "###);
1360    }
1361
1362    #[test]
1363    fn test_parse_bucket_defaults() {
1364        let json = r#"[
1365          {
1366            "name": "endpoint.hits",
1367            "value": 4,
1368            "type": "c",
1369            "timestamp": 1615889440,
1370            "width": 10,
1371            "metadata": {
1372                "merges": 1,
1373                "received_at": 1615889440
1374            }
1375          }
1376        ]"#;
1377
1378        let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1379
1380        insta::assert_debug_snapshot!(buckets, @r###"
1381        [
1382            Bucket {
1383                timestamp: UnixTimestamp(1615889440),
1384                width: 10,
1385                name: MetricName(
1386                    "endpoint.hits",
1387                ),
1388                value: Counter(
1389                    4.0,
1390                ),
1391                tags: {},
1392                metadata: BucketMetadata {
1393                    merges: 1,
1394                    received_at: Some(
1395                        UnixTimestamp(1615889440),
1396                    ),
1397                    extracted_from_indexed: false,
1398                },
1399            },
1400        ]
1401        "###);
1402    }
1403
1404    #[test]
1405    fn test_buckets_roundtrip() {
1406        let json = r#"[
1407  {
1408    "timestamp": 1615889440,
1409    "width": 10,
1410    "name": "endpoint.response_time",
1411    "type": "d",
1412    "value": [
1413      36.0,
1414      49.0,
1415      57.0,
1416      68.0
1417    ],
1418    "tags": {
1419      "route": "user_index"
1420    }
1421  },
1422  {
1423    "timestamp": 1615889440,
1424    "width": 10,
1425    "name": "endpoint.hits",
1426    "type": "c",
1427    "value": 4.0,
1428    "tags": {
1429      "route": "user_index"
1430    }
1431  },
1432  {
1433    "timestamp": 1615889440,
1434    "width": 10,
1435    "name": "endpoint.parallel_requests",
1436    "type": "g",
1437    "value": {
1438      "last": 25.0,
1439      "min": 17.0,
1440      "max": 42.0,
1441      "sum": 2210.0,
1442      "count": 85
1443    }
1444  },
1445  {
1446    "timestamp": 1615889440,
1447    "width": 10,
1448    "name": "endpoint.users",
1449    "type": "s",
1450    "value": [
1451      3182887624,
1452      4267882815
1453    ],
1454    "tags": {
1455      "route": "user_index"
1456    }
1457  }
1458]"#;
1459
1460        let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1461        let serialized = serde_json::to_string_pretty(&buckets).unwrap();
1462        assert_eq!(json, serialized);
1463    }
1464
1465    #[test]
1466    fn test_bucket_docs_roundtrip() {
1467        let json = include_str!("../tests/fixtures/buckets.json")
1468            .trim_end()
1469            .replace("\r\n", "\n");
1470        let buckets = serde_json::from_str::<Vec<Bucket>>(&json).unwrap();
1471
1472        let serialized = serde_json::to_string_pretty(&buckets).unwrap();
1473        assert_eq!(json, serialized);
1474    }
1475
1476    #[test]
1477    fn test_bucket_metadata_merge() {
1478        let mut metadata = BucketMetadata::default();
1479
1480        let other_metadata = BucketMetadata::default();
1481        metadata.merge(other_metadata);
1482        assert_eq!(
1483            metadata,
1484            BucketMetadata {
1485                merges: 2,
1486                received_at: None,
1487                extracted_from_indexed: false,
1488            }
1489        );
1490
1491        let other_metadata = BucketMetadata::new(UnixTimestamp::from_secs(10));
1492        metadata.merge(other_metadata);
1493        assert_eq!(
1494            metadata,
1495            BucketMetadata {
1496                merges: 3,
1497                received_at: Some(UnixTimestamp::from_secs(10)),
1498                extracted_from_indexed: false,
1499            }
1500        );
1501
1502        let other_metadata = BucketMetadata::new(UnixTimestamp::from_secs(20));
1503        metadata.merge(other_metadata);
1504        assert_eq!(
1505            metadata,
1506            BucketMetadata {
1507                merges: 4,
1508                received_at: Some(UnixTimestamp::from_secs(10)),
1509                extracted_from_indexed: false,
1510            }
1511        );
1512    }
1513}