relay_metrics/
bucket.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::hash::Hash;
3use std::iter::FusedIterator;
4use std::{fmt, mem};
5
6use hash32::{FnvHasher, Hasher as _};
7use relay_cardinality::CardinalityItem;
8use relay_common::time::UnixTimestamp;
9use relay_protocol::FiniteF64;
10use serde::{Deserialize, Serialize};
11use smallvec::SmallVec;
12
13use crate::protocol::{
14    self, CounterType, DistributionType, GaugeType, MetricName, MetricResourceIdentifier,
15    MetricType, SetType, hash_set_value,
16};
17use crate::{MetricNamespace, ParseMetricError};
18
19const VALUE_SEPARATOR: char = ':';
20
21/// Type of [`Bucket::tags`].
22pub type MetricTags = BTreeMap<String, String>;
23
24/// A snapshot of values within a [`Bucket`].
25#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
26pub struct GaugeValue {
27    /// The last value reported in the bucket.
28    ///
29    /// This aggregation is not commutative.
30    pub last: GaugeType,
31    /// The minimum value reported in the bucket.
32    pub min: GaugeType,
33    /// The maximum value reported in the bucket.
34    pub max: GaugeType,
35    /// The sum of all values reported in the bucket.
36    pub sum: GaugeType,
37    /// The number of times this bucket was updated with a new value.
38    pub count: u64,
39}
40
41impl GaugeValue {
42    /// Creates a gauge snapshot from a single value.
43    pub fn single(value: GaugeType) -> Self {
44        Self {
45            last: value,
46            min: value,
47            max: value,
48            sum: value,
49            count: 1,
50        }
51    }
52
53    /// Inserts a new value into the gauge.
54    pub fn insert(&mut self, value: GaugeType) {
55        self.last = value;
56        self.min = self.min.min(value);
57        self.max = self.max.max(value);
58        self.sum = self.sum.saturating_add(value);
59        self.count += 1;
60    }
61
62    /// Merges two gauge snapshots.
63    pub fn merge(&mut self, other: Self) {
64        self.last = other.last;
65        self.min = self.min.min(other.min);
66        self.max = self.max.max(other.max);
67        self.sum = self.sum.saturating_add(other.sum);
68        self.count += other.count;
69    }
70
71    /// Returns the average of all values reported in this bucket.
72    pub fn avg(&self) -> Option<GaugeType> {
73        self.sum / FiniteF64::new(self.count as f64)?
74    }
75}
76
77/// A distribution of values within a [`Bucket`].
78///
79/// Distributions logically store a histogram of values. Based on individual reported values,
80/// distributions allow to query the maximum, minimum, or average of the reported values, as well as
81/// statistical quantiles.
82///
83/// # Example
84///
85/// ```
86/// use relay_metrics::dist;
87///
88/// let mut dist = dist![1, 1, 1, 2];
89/// dist.push(5.into());
90/// dist.extend(std::iter::repeat(3.into()).take(7));
91/// ```
92///
93/// Logically, this distribution is equivalent to this visualization:
94///
95/// ```plain
96/// value | count
97/// 1.0   | ***
98/// 2.0   | *
99/// 3.0   | *******
100/// 4.0   |
101/// 5.0   | *
102/// ```
103///
104/// # Serialization
105///
106/// Distributions serialize as lists of floating point values. The list contains one entry for each
107/// value in the distribution, including duplicates.
108pub type DistributionValue = SmallVec<[DistributionType; 3]>;
109
110#[doc(hidden)]
111pub use smallvec::smallvec as _smallvec;
112
113/// Creates a [`DistributionValue`] containing the given arguments.
114///
115/// `dist!` allows `DistributionValue` to be defined with the same syntax as array expressions.
116///
117/// # Example
118///
119/// ```
120/// let dist = relay_metrics::dist![1, 2];
121/// ```
122#[macro_export]
123macro_rules! dist {
124    ($($x:expr),*$(,)*) => {
125        $crate::_smallvec!($($crate::DistributionType::from($x)),*) as $crate::DistributionValue
126    };
127}
128
129/// A set of unique values.
130///
131/// Set values can be specified as strings in the submission protocol. They are always hashed
132/// into a 32-bit value and the original value is dropped. If the submission protocol contains a
133/// 32-bit integer, it will be used directly, instead.
134///
135/// See the [bucket docs](crate::Bucket) for more information on set hashing.
136pub type SetValue = BTreeSet<SetType>;
137
138/// The [aggregated value](Bucket::value) of a metric bucket.
139#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
140#[serde(tag = "type", content = "value")]
141pub enum BucketValue {
142    /// Counts instances of an event ([`MetricType::Counter`]).
143    ///
144    /// Counters can be incremented and decremented. The default operation is to increment a counter
145    /// by `1`, although increments by larger values are equally possible.
146    ///
147    /// # Statsd Format
148    ///
149    /// Counters are declared as `"c"`. Alternatively, `"m"` is allowed.
150    ///
151    /// There can be a variable number of floating point values. If more than one value is given,
152    /// the values are summed into a single counter value:
153    ///
154    /// ```text
155    /// endpoint.hits:4.5:21:17.0|c
156    /// ```
157    ///
158    /// # Serialization
159    ///
160    /// This variant serializes to a double precision float.
161    ///
162    /// # Aggregation
163    ///
164    /// Counters aggregate by folding individual values into a single sum value per bucket. The sum
165    /// is ingested and stored directly.
166    #[serde(rename = "c")]
167    Counter(CounterType),
168
169    /// Builds a statistical distribution over values reported ([`MetricType::Distribution`]).
170    ///
171    /// Based on individual reported values, distributions allow to query the maximum, minimum, or
172    /// average of the reported values, as well as statistical quantiles. With an increasing number
173    /// of values in the distribution, its accuracy becomes approximate.
174    ///
175    /// # Statsd Format
176    ///
177    /// Distributions are declared as `"d"`. Alternatively, `"d"` and `"ms"` are allowed.
178    ///
179    /// There can be a variable number of floating point values. These values are collected directly
180    /// in a list per bucket.
181    ///
182    /// ```text
183    /// endpoint.response_time@millisecond:36:49:57:68|d
184    /// ```
185    ///
186    /// # Serialization
187    ///
188    /// This variant serializes to a list of double precision floats, see [`DistributionValue`].
189    ///
190    /// # Aggregation
191    ///
192    /// During ingestion, all individual reported values are collected in a lossless format. In
193    /// storage, these values are compressed into data sketches that allow to query quantiles.
194    /// Separately, the count and sum of the reported values is stored, which makes distributions a
195    /// strict superset of counters.
196    #[serde(rename = "d")]
197    Distribution(DistributionValue),
198
199    /// Counts the number of unique reported values.
200    ///
201    /// Sets allow sending arbitrary discrete values, including strings, and store the deduplicated
202    /// count. With an increasing number of unique values in the set, its accuracy becomes
203    /// approximate. It is not possible to query individual values from a set.
204    ///
205    /// # Statsd Format
206    ///
207    /// Sets are declared as `"s"`. Values in the list should be deduplicated.
208    ///
209    ///
210    /// ```text
211    /// endpoint.users:3182887624:4267882815|s
212    /// endpoint.users:e2546e4c-ecd0-43ad-ae27-87960e57a658|s
213    /// ```
214    ///
215    /// # Serialization
216    ///
217    /// This variant serializes to a list of 32-bit integers.
218    ///
219    /// # Aggregation
220    ///
221    /// Set values are internally represented as 32-bit integer hashes of the original value. These
222    /// hashes can be ingested directly as seen in the first example above. If raw strings are sent,
223    /// they will be hashed on-the-fly.
224    ///
225    /// Internally, set metrics are stored in data sketches that expose an approximate cardinality.
226    #[serde(rename = "s")]
227    Set(SetValue),
228
229    /// Stores absolute snapshots of values.
230    ///
231    /// In addition to plain [counters](Self::Counter), gauges store a snapshot of the maximum,
232    /// minimum and sum of all values, as well as the last reported value. Note that the "last"
233    /// component of this aggregation is not commutative. Which value is preserved as last value is
234    /// implementation-defined.
235    ///
236    /// # Statsd Format
237    ///
238    /// Gauges are declared as `"g"`. There are two ways to ingest gauges:
239    ///  1. As a single value. In this case, the provided value is assumed as the last, minimum,
240    ///     maximum, and the sum.
241    ///  2. As a sequence of five values in the order: `last`, `min`, `max`, `sum`, `count`.
242    ///
243    /// ```text
244    /// endpoint.parallel_requests:25|g
245    /// endpoint.parallel_requests:25:17:42:220:85|g
246    /// ```
247    ///
248    /// # Serialization
249    ///
250    /// This variant serializes to a structure with named fields, see [`GaugeValue`].
251    ///
252    /// # Aggregation
253    ///
254    /// Gauges aggregate by folding each of the components based on their semantics:
255    ///  - `last` assumes the newly added value
256    ///  - `min` retains the smaller value
257    ///  - `max` retains the larger value
258    ///  - `sum` adds the new value to the existing sum
259    ///  - `count` adds the count of the newly added gauge (defaulting to `1`)
260    #[serde(rename = "g")]
261    Gauge(GaugeValue),
262}
263
264impl BucketValue {
265    /// Returns a bucket value representing a counter with the given value.
266    pub fn counter(value: CounterType) -> Self {
267        Self::Counter(value)
268    }
269
270    /// Returns a bucket value representing a distribution with a single given value.
271    pub fn distribution(value: DistributionType) -> Self {
272        Self::Distribution(dist![value])
273    }
274
275    /// Returns a bucket value representing a set with a single given hash value.
276    pub fn set(value: SetType) -> Self {
277        Self::Set(std::iter::once(value).collect())
278    }
279
280    /// Returns a bucket value representing a set with a single given string value.
281    pub fn set_from_str(string: &str) -> Self {
282        Self::set(hash_set_value(string))
283    }
284
285    /// Returns a bucket value representing a set with a single given value.
286    pub fn set_from_display(display: impl fmt::Display) -> Self {
287        Self::set(hash_set_value(&display.to_string()))
288    }
289
290    /// Returns a bucket value representing a gauge with a single given value.
291    pub fn gauge(value: GaugeType) -> Self {
292        Self::Gauge(GaugeValue::single(value))
293    }
294
295    /// Returns the type of this value.
296    pub fn ty(&self) -> MetricType {
297        match self {
298            Self::Counter(_) => MetricType::Counter,
299            Self::Distribution(_) => MetricType::Distribution,
300            Self::Set(_) => MetricType::Set,
301            Self::Gauge(_) => MetricType::Gauge,
302        }
303    }
304
305    /// Returns the number of raw data points in this value.
306    pub fn len(&self) -> usize {
307        match self {
308            BucketValue::Counter(_) => 1,
309            BucketValue::Distribution(distribution) => distribution.len(),
310            BucketValue::Set(set) => set.len(),
311            BucketValue::Gauge(_) => 5,
312        }
313    }
314
315    /// Returns `true` if this bucket contains no values.
316    pub fn is_empty(&self) -> bool {
317        self.len() == 0
318    }
319
320    /// Estimates the number of bytes needed to encode the bucket value.
321    ///
322    /// Note that this does not necessarily match the exact memory footprint of the value,
323    /// because data structures have a memory overhead.
324    pub fn cost(&self) -> usize {
325        // Beside the size of [`BucketValue`], we also need to account for the cost of values
326        // allocated dynamically.
327        let allocated_cost = match self {
328            Self::Counter(_) => 0,
329            Self::Set(s) => mem::size_of::<SetType>() * s.len(),
330            Self::Gauge(_) => 0,
331            Self::Distribution(d) => d.len() * mem::size_of::<DistributionType>(),
332        };
333
334        mem::size_of::<Self>() + allocated_cost
335    }
336
337    /// Merges the given `bucket_value` into `self`.
338    ///
339    /// Returns `Ok(())` if the two bucket values can be merged. This is the case when both bucket
340    /// values are of the same variant. Otherwise, this returns `Err(other)`.
341    pub fn merge(&mut self, other: Self) -> Result<(), Self> {
342        match (self, other) {
343            (Self::Counter(slf), Self::Counter(other)) => *slf = slf.saturating_add(other),
344            (Self::Distribution(slf), Self::Distribution(other)) => slf.extend_from_slice(&other),
345            (Self::Set(slf), Self::Set(other)) => slf.extend(other),
346            (Self::Gauge(slf), Self::Gauge(other)) => slf.merge(other),
347            (_, other) => return Err(other),
348        }
349
350        Ok(())
351    }
352}
353
354/// Parses a list of counter values separated by colons and sums them up.
355fn parse_counter(string: &str) -> Option<CounterType> {
356    let mut sum = CounterType::default();
357    for component in string.split(VALUE_SEPARATOR) {
358        sum = sum.saturating_add(component.parse().ok()?);
359    }
360    Some(sum)
361}
362
363/// Parses a distribution from a list of floating point values separated by colons.
364fn parse_distribution(string: &str) -> Option<DistributionValue> {
365    let mut dist = DistributionValue::default();
366    for component in string.split(VALUE_SEPARATOR) {
367        dist.push(component.parse().ok()?);
368    }
369    Some(dist)
370}
371
372/// Parses a set of hashed numeric values.
373fn parse_set(string: &str) -> Option<SetValue> {
374    let mut set = SetValue::default();
375    for component in string.split(VALUE_SEPARATOR) {
376        let hash = component
377            .parse()
378            .unwrap_or_else(|_| protocol::hash_set_value(component));
379        set.insert(hash);
380    }
381    Some(set)
382}
383
384/// Parses a gauge from a value.
385///
386/// The gauge can either be given as a single floating point value, or as a list of exactly five
387/// values in the order of [`GaugeValue`] fields.
388fn parse_gauge(string: &str) -> Option<GaugeValue> {
389    let mut components = string.split(VALUE_SEPARATOR);
390
391    let last = components.next()?.parse().ok()?;
392    Some(if let Some(min) = components.next() {
393        GaugeValue {
394            last,
395            min: min.parse().ok()?,
396            max: components.next()?.parse().ok()?,
397            sum: components.next()?.parse().ok()?,
398            count: components.next()?.parse().ok()?,
399        }
400    } else {
401        GaugeValue::single(last)
402    })
403}
404
405/// Parses tags in the format `tag1,tag2:value`.
406///
407/// Tag values are optional. For tags with missing values, an empty `""` value is assumed.
408fn parse_tags(string: &str) -> Option<MetricTags> {
409    let mut map = MetricTags::new();
410
411    for pair in string.split(',') {
412        let mut name_value = pair.splitn(2, ':');
413
414        let name = name_value.next()?;
415        if !protocol::is_valid_tag_key(name) {
416            continue;
417        }
418
419        if let Ok(value) = protocol::unescape_tag_value(name_value.next().unwrap_or_default()) {
420            map.insert(name.to_owned(), value);
421        }
422    }
423
424    Some(map)
425}
426
427/// Parses a unix UTC timestamp.
428fn parse_timestamp(string: &str) -> Option<UnixTimestamp> {
429    string.parse().ok().map(UnixTimestamp::from_secs)
430}
431
432/// An aggregation of metric values.
433///
434/// As opposed to single metric values, bucket aggregations can carry multiple values. See
435/// [`MetricType`] for a description on how values are aggregated in buckets. Values are aggregated
436/// by metric name, type, time window, and all tags. Particularly, this allows metrics to have the
437/// same name even if their types differ.
438///
439/// See the [crate documentation](crate) for general information on Metrics.
440///
441/// # Values
442///
443/// The contents of a bucket, especially their representation and serialization, depend on the
444/// metric type:
445///
446/// - [Counters](BucketValue::Counter) store a single value, serialized as floating point.
447/// - [Distributions](MetricType::Distribution) and [sets](MetricType::Set) store the full set of
448///   reported values.
449/// - [Gauges](BucketValue::Gauge) store a snapshot of reported values, see [`GaugeValue`].
450///
451/// # Submission Protocol
452///
453/// ```text
454/// <name>[@unit]:<value>[:<value>...]|<type>[|#<tag_key>:<tag_value>,<tag>][|T<timestamp>]
455/// ```
456///
457/// See the [field documentation](Bucket#fields) for more information on the components. An example
458/// submission looks like this:
459///
460/// ```text
461#[doc = include_str!("../tests/fixtures/buckets.statsd.txt")]
462/// ```
463///
464/// To parse a submission payload, use [`Bucket::parse_all`].
465///
466/// # JSON Representation
467///
468/// Alternatively to the submission protocol, metrics can be represented as structured data in JSON.
469/// The data type of the `value` field is determined by the metric type.
470///
471/// In addition to the submission protocol, buckets have a required [`width`](Self::width) field in
472/// their JSON representation.
473///
474/// ```json
475#[doc = include_str!("../tests/fixtures/buckets.json")]
476/// ```
477///
478/// To parse a JSON payload, use [`serde_json`].
479///
480/// # Hashing of Sets
481///
482/// Set values can be specified as strings in the submission protocol. They are always hashed
483/// into a 32-bit value and the original value is dropped. If the submission protocol contains a
484/// 32-bit integer, it will be used directly, instead.
485///
486/// **Example**:
487///
488/// ```text
489#[doc = include_str!("../tests/fixtures/set.statsd.txt")]
490/// ```
491///
492/// The above submission is represented as:
493///
494/// ```json
495#[doc = include_str!("../tests/fixtures/set.json")]
496/// ```
497#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
498pub struct Bucket {
499    /// The start time of the bucket's time window.
500    ///
501    /// If a timestamp is not supplied as part of the submission payload, the default timestamp
502    /// supplied to [`Bucket::parse`] or [`Bucket::parse_all`] is associated with the metric. It is
503    /// then aligned with the aggregation window.
504    ///
505    /// # Statsd Format
506    ///
507    /// In statsd, timestamps are part of the `|`-separated list following values. Timestamps start
508    /// with the literal character `'T'` followed by the UNIX timestamp.
509    ///
510    /// The timestamp must be a positive integer in decimal notation representing the value of the
511    /// UNIX timestamp.
512    ///
513    /// # Example
514    ///
515    /// ```text
516    /// endpoint.hits:1|c|T1615889440
517    /// ```
518    pub timestamp: UnixTimestamp,
519
520    /// The length of the time window in seconds.
521    ///
522    /// To initialize a new bucket, choose `0` as width. Once the bucket is tracked by Relay's
523    /// aggregator, the width is aligned with configuration for the namespace and the  timestamp is
524    /// adjusted accordingly.
525    ///
526    /// # Statsd Format
527    ///
528    /// Specifying the bucket width in statsd is not supported.
529    pub width: u64,
530
531    /// The name of the metric in MRI (metric resource identifier) format.
532    ///
533    /// MRIs have the format `<type>:<ns>/<name>@<unit>`. See [`MetricResourceIdentifier`] for
534    /// information on fields and representations.
535    ///
536    /// # Statsd Format
537    ///
538    /// MRIs are sent in a more relaxed format: `[<namespace/]name[@unit]`. The value type is not
539    /// part of the metric name and namespaces are optional.
540    ///
541    /// Namespaces and units must consist of ASCII characters and match the regular expression
542    /// `/\w+/`. The name component of MRIs consist of unicode characters and must match the
543    /// regular expression `/\w[\w\-.]*/`. Note that the name must begin with a letter.
544    ///
545    /// Per convention, dots separate metric names into components, where the leading components are
546    /// considered namespaces and the final component is the name of the metric within its
547    /// namespace.
548    ///
549    /// # Examples
550    ///
551    /// ```text
552    /// endpoint.hits:1|c
553    /// custom/endpoint.hits:1|c
554    /// custom/endpoint.duration@millisecond:21.5|d
555    /// ```
556    pub name: MetricName,
557
558    /// The type and aggregated values of this bucket.
559    ///
560    /// Buckets support multiple values that are aggregated and can be accessed using a range of
561    /// aggregation functions depending on the value type. While always a variable number of values
562    /// can be sent in, some aggregations reduce the raw values to a fixed set of aggregates.
563    ///
564    /// See [`BucketValue`] for more examples and semantics.
565    ///
566    /// # Statsd Payload
567    ///
568    /// The bucket value and its type are specified in separate fields following the metric name in
569    /// the format: `<name>:<value>|<type>`. Values must be base-10 floating point numbers with
570    /// optional decimal places.
571    ///
572    /// It is possible to pack multiple values into a single datagram, but note that the type and
573    /// the value representation must match for this. Refer to the [`BucketValue`] docs for more
574    /// examples.
575    ///
576    /// # Example
577    ///
578    /// ```text
579    /// endpoint.hits:21|c
580    /// endpoint.hits:4.5|c
581    /// ```
582    #[serde(flatten)]
583    pub value: BucketValue,
584
585    /// A list of tags adding dimensions to the metric for filtering and aggregation.
586    ///
587    /// Tags allow to compute separate aggregates to filter or group metric values by any number of
588    /// dimensions. Tags consist of a unique tag key and one associated value. For tags with missing
589    /// values, an empty `""` value is assumed at query time.
590    ///
591    /// # Statsd Format
592    ///
593    /// Tags are preceded with a hash `#` and specified in a comma (`,`) separated list. Each tag
594    /// can either be a tag name, or a `name:value` combination. Tags are optional and can be
595    /// omitted.
596    ///
597    /// Tag keys are restricted to ASCII characters and must match the regular expression
598    /// `/[\w\-.\/]+/`.
599    ///
600    /// Tag values can contain unicode characters with the following escaping rules:
601    ///  - Tab is escaped as `\t`.
602    ///  - Carriage return is escaped as `\r`.
603    ///  - Line feed is escaped as `\n`.
604    ///  - Backslash is escaped as `\\`.
605    ///  - Commas and pipes are given unicode escapes in the form `\u{2c}` and `\u{7c}`,
606    ///    respectively.
607    ///
608    /// # Example
609    ///
610    /// ```text
611    /// endpoint.hits:1|c|#route:user_index,environment:production,release:1.4.0
612    /// ```
613    #[serde(default, skip_serializing_if = "MetricTags::is_empty")]
614    pub tags: MetricTags,
615
616    /// Relay internal metadata for a metric bucket.
617    ///
618    /// The metadata contains meta information about the metric bucket itself,
619    /// for example how many this bucket has been aggregated in total.
620    #[serde(default, skip_serializing_if = "BucketMetadata::is_default")]
621    pub metadata: BucketMetadata,
622}
623
624impl Bucket {
625    /// Parses a statsd-compatible payload.
626    ///
627    /// ```text
628    /// [<ns>/]<name>[@<unit>]:<value>|<type>[|#<tags>]`
629    /// ```
630    fn parse_str(string: &str, timestamp: UnixTimestamp) -> Option<Self> {
631        let mut components = string.split('|');
632
633        let (mri_str, values_str) = components.next()?.split_once(':')?;
634        let ty = components.next().and_then(|s| s.parse().ok())?;
635
636        let mri = MetricResourceIdentifier::parse_with_type(mri_str, ty).ok()?;
637        let value = match ty {
638            MetricType::Counter => BucketValue::Counter(parse_counter(values_str)?),
639            MetricType::Distribution => BucketValue::Distribution(parse_distribution(values_str)?),
640            MetricType::Set => BucketValue::Set(parse_set(values_str)?),
641            MetricType::Gauge => BucketValue::Gauge(parse_gauge(values_str)?),
642        };
643
644        let mut bucket = Bucket {
645            timestamp,
646            width: 0,
647            name: mri.to_string().into(),
648            value,
649            tags: Default::default(),
650            metadata: Default::default(),
651        };
652
653        for component in components {
654            match component.chars().next() {
655                Some('#') => {
656                    bucket.tags = parse_tags(component.get(1..)?)?;
657                }
658                Some('T') => {
659                    bucket.timestamp = parse_timestamp(component.get(1..)?)?;
660                }
661                _ => (),
662            }
663        }
664
665        Some(bucket)
666    }
667
668    /// Parses a single metric aggregate from the raw protocol.
669    ///
670    /// See the [`Bucket`] for more information on the protocol.
671    ///
672    /// # Example
673    ///
674    /// ```
675    /// use relay_metrics::{Bucket, UnixTimestamp};
676    ///
677    /// let bucket = Bucket::parse(b"response_time@millisecond:57|d", UnixTimestamp::now())
678    ///     .expect("metric should parse");
679    /// ```
680    pub fn parse(slice: &[u8], timestamp: UnixTimestamp) -> Result<Self, ParseMetricError> {
681        let string = std::str::from_utf8(slice).map_err(|_| ParseMetricError)?;
682        Self::parse_str(string, timestamp).ok_or(ParseMetricError)
683    }
684
685    /// Parses a set of metric aggregates from the raw protocol.
686    ///
687    /// Returns a metric result for each line in `slice`, ignoring empty lines. Both UNIX newlines
688    /// (`\n`) and Windows newlines (`\r\n`) are supported.
689    ///
690    /// It is possible to continue consuming the iterator after `Err` is yielded.
691    ///
692    /// See [`Bucket`] for more information on the protocol.
693    ///
694    /// # Example
695    ///
696    /// ```
697    /// use relay_metrics::{Bucket, UnixTimestamp};
698    ///
699    /// let data = br#"
700    /// endpoint.response_time@millisecond:57|d
701    /// endpoint.hits:1|c
702    /// "#;
703    ///
704    /// for metric_result in Bucket::parse_all(data, UnixTimestamp::now()) {
705    ///     let bucket = metric_result.expect("metric should parse");
706    ///     println!("Metric {}: {:?}", bucket.name, bucket.value);
707    /// }
708    /// ```
709    pub fn parse_all(slice: &[u8], timestamp: UnixTimestamp) -> ParseBuckets<'_> {
710        ParseBuckets { slice, timestamp }
711    }
712
713    /// Returns the value of the specified tag if it exists.
714    pub fn tag(&self, name: &str) -> Option<&str> {
715        self.tags.get(name).map(|s| s.as_str())
716    }
717
718    /// Removes the value of the specified tag.
719    ///
720    /// If the tag exists, the removed value is returned.
721    pub fn remove_tag(&mut self, name: &str) -> Option<String> {
722        self.tags.remove(name)
723    }
724}
725
726impl CardinalityItem for Bucket {
727    fn namespace(&self) -> Option<MetricNamespace> {
728        self.name.try_namespace()
729    }
730
731    fn name(&self) -> &MetricName {
732        &self.name
733    }
734
735    fn to_hash(&self) -> u32 {
736        let mut hasher = FnvHasher::default();
737        self.name.hash(&mut hasher);
738        self.tags.hash(&mut hasher);
739        hasher.finish32()
740    }
741}
742
743/// Relay internal metadata for a metric bucket.
744#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
745pub struct BucketMetadata {
746    /// How many times the bucket was merged.
747    ///
748    /// Creating a new bucket is the first merge.
749    /// Merging two buckets sums the amount of merges.
750    ///
751    /// For example: Merging two un-merged buckets will yield a total
752    /// of `2` merges.
753    ///
754    /// Due to how Relay aggregates metrics and later splits them into multiple
755    /// buckets again, the amount of merges can be zero.
756    /// When splitting a bucket the total volume of the bucket may only be attributed
757    /// to one part or distributed across the resulting buckets, in either case
758    /// values of `0` are possible.
759    pub merges: u32,
760
761    /// Received timestamp of the first metric in this bucket.
762    ///
763    /// This field should be set to the time in which the first metric of a specific bucket was
764    /// received in the outermost internal Relay.
765    pub received_at: Option<UnixTimestamp>,
766
767    /// Is `true` if this metric was extracted from a sampled/indexed envelope item.
768    ///
769    /// The final dynamic sampling decision is always made in processing Relays.
770    /// If a metric was extracted from an item which is sampled (i.e. retained by dynamic sampling), this flag is `true`.
771    ///
772    /// Since these metrics from samples carry additional information, e.g. they don't
773    /// require rate limiting since the sample they've been extracted from was already
774    /// rate limited, this flag must be included in the aggregation key when aggregation buckets.
775    #[serde(skip)]
776    pub extracted_from_indexed: bool,
777}
778
779impl BucketMetadata {
780    /// Creates a fresh metadata instance.
781    ///
782    /// The new metadata is initialized with `1` merge and a given `received_at` timestamp.
783    pub fn new(received_at: UnixTimestamp) -> Self {
784        Self {
785            merges: 1,
786            received_at: Some(received_at),
787            extracted_from_indexed: false,
788        }
789    }
790
791    /// Whether the metadata does not contain more information than the default.
792    pub fn is_default(&self) -> bool {
793        &Self::default() == self
794    }
795
796    /// Merges another metadata object into the current one.
797    pub fn merge(&mut self, other: Self) {
798        self.merges = self.merges.saturating_add(other.merges);
799        self.received_at = match (self.received_at, other.received_at) {
800            (Some(received_at), None) => Some(received_at),
801            (None, Some(received_at)) => Some(received_at),
802            (left, right) => left.min(right),
803        };
804    }
805}
806
807impl Default for BucketMetadata {
808    fn default() -> Self {
809        Self {
810            merges: 1,
811            received_at: None,
812            extracted_from_indexed: false,
813        }
814    }
815}
816
817/// Iterator over parsed metrics returned from [`Bucket::parse_all`].
818#[derive(Clone, Debug)]
819pub struct ParseBuckets<'a> {
820    slice: &'a [u8],
821    timestamp: UnixTimestamp,
822}
823
824impl Default for ParseBuckets<'_> {
825    fn default() -> Self {
826        Self {
827            slice: &[],
828            // The timestamp will never be returned.
829            timestamp: UnixTimestamp::from_secs(4711),
830        }
831    }
832}
833
834impl Iterator for ParseBuckets<'_> {
835    type Item = Result<Bucket, ParseMetricError>;
836
837    fn next(&mut self) -> Option<Self::Item> {
838        loop {
839            if self.slice.is_empty() {
840                return None;
841            }
842
843            let mut split = self.slice.splitn(2, |&b| b == b'\n');
844            let current = split.next()?;
845            self.slice = split.next().unwrap_or_default();
846
847            let string = match std::str::from_utf8(current) {
848                Ok(string) => string.strip_suffix('\r').unwrap_or(string),
849                Err(_) => return Some(Err(ParseMetricError)),
850            };
851
852            if !string.is_empty() {
853                return Some(Bucket::parse_str(string, self.timestamp).ok_or(ParseMetricError));
854            }
855        }
856    }
857}
858
859impl FusedIterator for ParseBuckets<'_> {}
860
861#[cfg(test)]
862mod tests {
863    use similar_asserts::assert_eq;
864
865    use crate::protocol::{DurationUnit, MetricUnit};
866
867    use super::*;
868
869    #[test]
870    fn test_distribution_value_size() {
871        // DistributionValue uses a SmallVec internally to prevent an additional allocation and
872        // indirection in cases where it needs to store only a small number of items. This is
873        // enabled by a comparably large `GaugeValue`, which stores five atoms. Ensure that the
874        // `DistributionValue`'s size does not exceed that of `GaugeValue`.
875        assert!(
876            std::mem::size_of::<DistributionValue>() <= std::mem::size_of::<GaugeValue>(),
877            "distribution value should not exceed gauge {}",
878            std::mem::size_of::<DistributionValue>()
879        );
880    }
881
882    #[test]
883    fn test_bucket_value_merge_counter() {
884        let mut value = BucketValue::Counter(42.into());
885        value.merge(BucketValue::Counter(43.into())).unwrap();
886        assert_eq!(value, BucketValue::Counter(85.into()));
887    }
888
889    #[test]
890    fn test_bucket_value_merge_distribution() {
891        let mut value = BucketValue::Distribution(dist![1, 2, 3]);
892        value.merge(BucketValue::Distribution(dist![2, 4])).unwrap();
893        assert_eq!(value, BucketValue::Distribution(dist![1, 2, 3, 2, 4]));
894    }
895
896    #[test]
897    fn test_bucket_value_merge_set() {
898        let mut value = BucketValue::Set(vec![1, 2].into_iter().collect());
899        value.merge(BucketValue::Set([2, 3].into())).unwrap();
900        assert_eq!(value, BucketValue::Set(vec![1, 2, 3].into_iter().collect()));
901    }
902
903    #[test]
904    fn test_bucket_value_merge_gauge() {
905        let mut value = BucketValue::Gauge(GaugeValue::single(42.into()));
906        value.merge(BucketValue::gauge(43.into())).unwrap();
907
908        assert_eq!(
909            value,
910            BucketValue::Gauge(GaugeValue {
911                last: 43.into(),
912                min: 42.into(),
913                max: 43.into(),
914                sum: 85.into(),
915                count: 2,
916            })
917        );
918    }
919
920    #[test]
921    fn test_parse_garbage() {
922        let s = "x23-408j17z4232@#34d\nc3456y7^😎";
923        let timestamp = UnixTimestamp::from_secs(4711);
924        let result = Bucket::parse(s.as_bytes(), timestamp);
925        assert!(result.is_err());
926    }
927
928    #[test]
929    fn test_parse_counter() {
930        let s = "transactions/foo:42|c";
931        let timestamp = UnixTimestamp::from_secs(4711);
932        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
933        insta::assert_debug_snapshot!(metric, @r###"
934        Bucket {
935            timestamp: UnixTimestamp(4711),
936            width: 0,
937            name: MetricName(
938                "c:transactions/foo@none",
939            ),
940            value: Counter(
941                42.0,
942            ),
943            tags: {},
944            metadata: BucketMetadata {
945                merges: 1,
946                received_at: None,
947                extracted_from_indexed: false,
948            },
949        }
950        "###);
951    }
952
953    #[test]
954    fn test_parse_counter_packed() {
955        let s = "transactions/foo:42:17:21|c";
956        let timestamp = UnixTimestamp::from_secs(4711);
957        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
958        assert_eq!(metric.value, BucketValue::Counter(80.into()));
959    }
960
961    #[test]
962    fn test_parse_distribution() {
963        let s = "transactions/foo:17.5|d";
964        let timestamp = UnixTimestamp::from_secs(4711);
965        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
966        insta::assert_debug_snapshot!(metric, @r###"
967        Bucket {
968            timestamp: UnixTimestamp(4711),
969            width: 0,
970            name: MetricName(
971                "d:transactions/foo@none",
972            ),
973            value: Distribution(
974                [
975                    17.5,
976                ],
977            ),
978            tags: {},
979            metadata: BucketMetadata {
980                merges: 1,
981                received_at: None,
982                extracted_from_indexed: false,
983            },
984        }
985        "###);
986    }
987
988    #[test]
989    fn test_parse_distribution_packed() {
990        let s = "transactions/foo:17.5:21.9:42.7|d";
991        let timestamp = UnixTimestamp::from_secs(4711);
992        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
993        assert_eq!(
994            metric.value,
995            BucketValue::Distribution(dist![
996                FiniteF64::new(17.5).unwrap(),
997                FiniteF64::new(21.9).unwrap(),
998                FiniteF64::new(42.7).unwrap()
999            ])
1000        );
1001    }
1002
1003    #[test]
1004    fn test_parse_histogram() {
1005        let s = "transactions/foo:17.5|h"; // common alias for distribution
1006        let timestamp = UnixTimestamp::from_secs(4711);
1007        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1008        assert_eq!(
1009            metric.value,
1010            BucketValue::Distribution(dist![FiniteF64::new(17.5).unwrap()])
1011        );
1012    }
1013
1014    #[test]
1015    fn test_parse_set() {
1016        let s = "transactions/foo:4267882815|s";
1017        let timestamp = UnixTimestamp::from_secs(4711);
1018        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1019        insta::assert_debug_snapshot!(metric, @r###"
1020        Bucket {
1021            timestamp: UnixTimestamp(4711),
1022            width: 0,
1023            name: MetricName(
1024                "s:transactions/foo@none",
1025            ),
1026            value: Set(
1027                {
1028                    4267882815,
1029                },
1030            ),
1031            tags: {},
1032            metadata: BucketMetadata {
1033                merges: 1,
1034                received_at: None,
1035                extracted_from_indexed: false,
1036            },
1037        }
1038        "###);
1039    }
1040
1041    #[test]
1042    fn test_parse_set_hashed() {
1043        let s = "transactions/foo:e2546e4c-ecd0-43ad-ae27-87960e57a658|s";
1044        let timestamp = UnixTimestamp::from_secs(4711);
1045        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1046        assert_eq!(metric.value, BucketValue::Set([4267882815].into()));
1047    }
1048
1049    #[test]
1050    fn test_parse_set_hashed_packed() {
1051        let s = "transactions/foo:e2546e4c-ecd0-43ad-ae27-87960e57a658:00449b66-d91f-4fb8-b324-4c8bdf2499f6|s";
1052        let timestamp = UnixTimestamp::from_secs(4711);
1053        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1054        assert_eq!(
1055            metric.value,
1056            BucketValue::Set([181348692, 4267882815].into())
1057        );
1058    }
1059
1060    #[test]
1061    fn test_parse_set_packed() {
1062        let s = "transactions/foo:3182887624:4267882815|s";
1063        let timestamp = UnixTimestamp::from_secs(4711);
1064        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1065        assert_eq!(
1066            metric.value,
1067            BucketValue::Set([3182887624, 4267882815].into())
1068        )
1069    }
1070
1071    #[test]
1072    fn test_parse_gauge() {
1073        let s = "transactions/foo:42|g";
1074        let timestamp = UnixTimestamp::from_secs(4711);
1075        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1076        insta::assert_debug_snapshot!(metric, @r###"
1077        Bucket {
1078            timestamp: UnixTimestamp(4711),
1079            width: 0,
1080            name: MetricName(
1081                "g:transactions/foo@none",
1082            ),
1083            value: Gauge(
1084                GaugeValue {
1085                    last: 42.0,
1086                    min: 42.0,
1087                    max: 42.0,
1088                    sum: 42.0,
1089                    count: 1,
1090                },
1091            ),
1092            tags: {},
1093            metadata: BucketMetadata {
1094                merges: 1,
1095                received_at: None,
1096                extracted_from_indexed: false,
1097            },
1098        }
1099        "###);
1100    }
1101
1102    #[test]
1103    fn test_parse_gauge_packed() {
1104        let s = "transactions/foo:25:17:42:220:85|g";
1105        let timestamp = UnixTimestamp::from_secs(4711);
1106        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1107        insta::assert_debug_snapshot!(metric, @r###"
1108        Bucket {
1109            timestamp: UnixTimestamp(4711),
1110            width: 0,
1111            name: MetricName(
1112                "g:transactions/foo@none",
1113            ),
1114            value: Gauge(
1115                GaugeValue {
1116                    last: 25.0,
1117                    min: 17.0,
1118                    max: 42.0,
1119                    sum: 220.0,
1120                    count: 85,
1121                },
1122            ),
1123            tags: {},
1124            metadata: BucketMetadata {
1125                merges: 1,
1126                received_at: None,
1127                extracted_from_indexed: false,
1128            },
1129        }
1130        "###);
1131    }
1132
1133    #[test]
1134    fn test_parse_implicit_namespace() {
1135        let s = "foo:42|c";
1136        let timestamp = UnixTimestamp::from_secs(4711);
1137        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1138        insta::assert_debug_snapshot!(metric, @r###"
1139        Bucket {
1140            timestamp: UnixTimestamp(4711),
1141            width: 0,
1142            name: MetricName(
1143                "c:custom/foo@none",
1144            ),
1145            value: Counter(
1146                42.0,
1147            ),
1148            tags: {},
1149            metadata: BucketMetadata {
1150                merges: 1,
1151                received_at: None,
1152                extracted_from_indexed: false,
1153            },
1154        }
1155        "###);
1156    }
1157
1158    #[test]
1159    fn test_parse_unit() {
1160        let s = "transactions/foo@second:17.5|d";
1161        let timestamp = UnixTimestamp::from_secs(4711);
1162        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1163        let mri = MetricResourceIdentifier::parse(&metric.name).unwrap();
1164        assert_eq!(mri.unit, MetricUnit::Duration(DurationUnit::Second));
1165    }
1166
1167    #[test]
1168    fn test_parse_unit_regression() {
1169        let s = "transactions/foo@s:17.5|d";
1170        let timestamp = UnixTimestamp::from_secs(4711);
1171        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1172        let mri = MetricResourceIdentifier::parse(&metric.name).unwrap();
1173        assert_eq!(mri.unit, MetricUnit::Duration(DurationUnit::Second));
1174    }
1175
1176    #[test]
1177    fn test_parse_tags() {
1178        let s = "transactions/foo:17.5|d|#foo,bar:baz";
1179        let timestamp = UnixTimestamp::from_secs(4711);
1180        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1181        insta::assert_debug_snapshot!(metric.tags, @r#"
1182            {
1183                "bar": "baz",
1184                "foo": "",
1185            }
1186            "#);
1187    }
1188
1189    #[test]
1190    fn test_parse_tags_escaped() {
1191        let s = "transactions/foo:17.5|d|#foo:😅\\u{2c}🚀";
1192        let timestamp = UnixTimestamp::from_secs(4711);
1193        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1194        insta::assert_debug_snapshot!(metric.tags, @r#"
1195            {
1196                "foo": "😅,🚀",
1197            }
1198            "#);
1199    }
1200
1201    #[test]
1202    fn test_parse_timestamp() {
1203        let s = "transactions/foo:17.5|d|T1615889449";
1204        let timestamp = UnixTimestamp::from_secs(4711);
1205        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1206        assert_eq!(metric.timestamp, UnixTimestamp::from_secs(1615889449));
1207    }
1208
1209    #[test]
1210    fn test_parse_sample_rate() {
1211        // Sample rate should be ignored
1212        let s = "transactions/foo:17.5|d|@0.1";
1213        let timestamp = UnixTimestamp::from_secs(4711);
1214        Bucket::parse(s.as_bytes(), timestamp).unwrap();
1215    }
1216
1217    #[test]
1218    fn test_parse_invalid_name() {
1219        let s = "foo#bar:42|c";
1220        let timestamp = UnixTimestamp::from_secs(4711);
1221        let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1222        assert_eq!(metric.name.as_ref(), "c:custom/foo_bar@none");
1223    }
1224
1225    #[test]
1226    fn test_parse_empty_name() {
1227        let s = ":42|c";
1228        let timestamp = UnixTimestamp::from_secs(4711);
1229        let metric = Bucket::parse(s.as_bytes(), timestamp);
1230        assert!(metric.is_err());
1231    }
1232
1233    #[test]
1234    fn test_parse_invalid_name_with_leading_digit() {
1235        let s = "64bit:42|c";
1236        let timestamp = UnixTimestamp::from_secs(4711);
1237        let metric = Bucket::parse(s.as_bytes(), timestamp);
1238        assert!(metric.is_err());
1239    }
1240
1241    #[test]
1242    fn test_parse_all() {
1243        let s = "transactions/foo:42|c\nbar:17|c";
1244        let timestamp = UnixTimestamp::from_secs(4711);
1245
1246        let metrics: Vec<Bucket> = Bucket::parse_all(s.as_bytes(), timestamp)
1247            .collect::<Result<_, _>>()
1248            .unwrap();
1249
1250        assert_eq!(metrics.len(), 2);
1251    }
1252
1253    #[test]
1254    fn test_parse_all_crlf() {
1255        let s = "transactions/foo:42|c\r\nbar:17|c";
1256        let timestamp = UnixTimestamp::from_secs(4711);
1257
1258        let metrics: Vec<Bucket> = Bucket::parse_all(s.as_bytes(), timestamp)
1259            .collect::<Result<_, _>>()
1260            .unwrap();
1261
1262        assert_eq!(metrics.len(), 2);
1263    }
1264
1265    #[test]
1266    fn test_parse_all_empty_lines() {
1267        let s = "transactions/foo:42|c\n\n\nbar:17|c";
1268        let timestamp = UnixTimestamp::from_secs(4711);
1269
1270        let metric_count = Bucket::parse_all(s.as_bytes(), timestamp).count();
1271        assert_eq!(metric_count, 2);
1272    }
1273
1274    #[test]
1275    fn test_parse_all_trailing() {
1276        let s = "transactions/foo:42|c\nbar:17|c\n";
1277        let timestamp = UnixTimestamp::from_secs(4711);
1278
1279        let metric_count = Bucket::parse_all(s.as_bytes(), timestamp).count();
1280        assert_eq!(metric_count, 2);
1281    }
1282
1283    #[test]
1284    fn test_metrics_docs() {
1285        let text = include_str!("../tests/fixtures/buckets.statsd.txt").trim_end();
1286        let json = include_str!("../tests/fixtures/buckets.json").trim_end();
1287
1288        let timestamp = UnixTimestamp::from_secs(0);
1289        let statsd_metrics = Bucket::parse_all(text.as_bytes(), timestamp)
1290            .collect::<Result<Vec<_>, _>>()
1291            .unwrap();
1292
1293        let json_metrics: Vec<Bucket> = serde_json::from_str(json).unwrap();
1294
1295        assert_eq!(statsd_metrics, json_metrics);
1296    }
1297
1298    #[test]
1299    fn test_set_docs() {
1300        let text = include_str!("../tests/fixtures/set.statsd.txt").trim_end();
1301        let json = include_str!("../tests/fixtures/set.json").trim_end();
1302
1303        let timestamp = UnixTimestamp::from_secs(1615889449);
1304        let statsd_metric = Bucket::parse(text.as_bytes(), timestamp).unwrap();
1305        let json_metric: Bucket = serde_json::from_str(json).unwrap();
1306
1307        assert_eq!(statsd_metric, json_metric);
1308    }
1309
1310    #[test]
1311    fn test_parse_buckets() {
1312        let json = r#"[
1313          {
1314            "name": "endpoint.response_time",
1315            "unit": "millisecond",
1316            "value": [36, 49, 57, 68],
1317            "type": "d",
1318            "timestamp": 1615889440,
1319            "width": 10,
1320            "tags": {
1321                "route": "user_index"
1322            },
1323            "metadata": {
1324                "merges": 1,
1325                "received_at": 1615889440
1326            }
1327          }
1328        ]"#;
1329
1330        let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1331
1332        insta::assert_debug_snapshot!(buckets, @r###"
1333        [
1334            Bucket {
1335                timestamp: UnixTimestamp(1615889440),
1336                width: 10,
1337                name: MetricName(
1338                    "endpoint.response_time",
1339                ),
1340                value: Distribution(
1341                    [
1342                        36.0,
1343                        49.0,
1344                        57.0,
1345                        68.0,
1346                    ],
1347                ),
1348                tags: {
1349                    "route": "user_index",
1350                },
1351                metadata: BucketMetadata {
1352                    merges: 1,
1353                    received_at: Some(
1354                        UnixTimestamp(1615889440),
1355                    ),
1356                    extracted_from_indexed: false,
1357                },
1358            },
1359        ]
1360        "###);
1361    }
1362
1363    #[test]
1364    fn test_parse_bucket_defaults() {
1365        let json = r#"[
1366          {
1367            "name": "endpoint.hits",
1368            "value": 4,
1369            "type": "c",
1370            "timestamp": 1615889440,
1371            "width": 10,
1372            "metadata": {
1373                "merges": 1,
1374                "received_at": 1615889440
1375            }
1376          }
1377        ]"#;
1378
1379        let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1380
1381        insta::assert_debug_snapshot!(buckets, @r###"
1382        [
1383            Bucket {
1384                timestamp: UnixTimestamp(1615889440),
1385                width: 10,
1386                name: MetricName(
1387                    "endpoint.hits",
1388                ),
1389                value: Counter(
1390                    4.0,
1391                ),
1392                tags: {},
1393                metadata: BucketMetadata {
1394                    merges: 1,
1395                    received_at: Some(
1396                        UnixTimestamp(1615889440),
1397                    ),
1398                    extracted_from_indexed: false,
1399                },
1400            },
1401        ]
1402        "###);
1403    }
1404
1405    #[test]
1406    fn test_buckets_roundtrip() {
1407        let json = r#"[
1408  {
1409    "timestamp": 1615889440,
1410    "width": 10,
1411    "name": "endpoint.response_time",
1412    "type": "d",
1413    "value": [
1414      36.0,
1415      49.0,
1416      57.0,
1417      68.0
1418    ],
1419    "tags": {
1420      "route": "user_index"
1421    }
1422  },
1423  {
1424    "timestamp": 1615889440,
1425    "width": 10,
1426    "name": "endpoint.hits",
1427    "type": "c",
1428    "value": 4.0,
1429    "tags": {
1430      "route": "user_index"
1431    }
1432  },
1433  {
1434    "timestamp": 1615889440,
1435    "width": 10,
1436    "name": "endpoint.parallel_requests",
1437    "type": "g",
1438    "value": {
1439      "last": 25.0,
1440      "min": 17.0,
1441      "max": 42.0,
1442      "sum": 2210.0,
1443      "count": 85
1444    }
1445  },
1446  {
1447    "timestamp": 1615889440,
1448    "width": 10,
1449    "name": "endpoint.users",
1450    "type": "s",
1451    "value": [
1452      3182887624,
1453      4267882815
1454    ],
1455    "tags": {
1456      "route": "user_index"
1457    }
1458  }
1459]"#;
1460
1461        let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1462        let serialized = serde_json::to_string_pretty(&buckets).unwrap();
1463        assert_eq!(json, serialized);
1464    }
1465
1466    #[test]
1467    fn test_bucket_docs_roundtrip() {
1468        let json = include_str!("../tests/fixtures/buckets.json")
1469            .trim_end()
1470            .replace("\r\n", "\n");
1471        let buckets = serde_json::from_str::<Vec<Bucket>>(&json).unwrap();
1472
1473        let serialized = serde_json::to_string_pretty(&buckets).unwrap();
1474        assert_eq!(json, serialized);
1475    }
1476
1477    #[test]
1478    fn test_bucket_metadata_merge() {
1479        let mut metadata = BucketMetadata::default();
1480
1481        let other_metadata = BucketMetadata::default();
1482        metadata.merge(other_metadata);
1483        assert_eq!(
1484            metadata,
1485            BucketMetadata {
1486                merges: 2,
1487                received_at: None,
1488                extracted_from_indexed: false,
1489            }
1490        );
1491
1492        let other_metadata = BucketMetadata::new(UnixTimestamp::from_secs(10));
1493        metadata.merge(other_metadata);
1494        assert_eq!(
1495            metadata,
1496            BucketMetadata {
1497                merges: 3,
1498                received_at: Some(UnixTimestamp::from_secs(10)),
1499                extracted_from_indexed: false,
1500            }
1501        );
1502
1503        let other_metadata = BucketMetadata::new(UnixTimestamp::from_secs(20));
1504        metadata.merge(other_metadata);
1505        assert_eq!(
1506            metadata,
1507            BucketMetadata {
1508                merges: 4,
1509                received_at: Some(UnixTimestamp::from_secs(10)),
1510                extracted_from_indexed: false,
1511            }
1512        );
1513    }
1514}