relay_metrics/bucket.rs
1use std::collections::{BTreeMap, BTreeSet};
2use std::iter::FusedIterator;
3use std::{fmt, mem};
4
5use relay_common::time::UnixTimestamp;
6use relay_protocol::FiniteF64;
7use serde::{Deserialize, Serialize};
8use smallvec::SmallVec;
9
10use crate::ParseMetricError;
11use crate::protocol::{
12 self, CounterType, DistributionType, GaugeType, MetricName, MetricResourceIdentifier,
13 MetricType, SetType, hash_set_value,
14};
15
16const VALUE_SEPARATOR: char = ':';
17
18/// Type of [`Bucket::tags`].
19pub type MetricTags = BTreeMap<String, String>;
20
21/// A snapshot of values within a [`Bucket`].
22#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
23pub struct GaugeValue {
24 /// The last value reported in the bucket.
25 ///
26 /// This aggregation is not commutative.
27 pub last: GaugeType,
28 /// The minimum value reported in the bucket.
29 pub min: GaugeType,
30 /// The maximum value reported in the bucket.
31 pub max: GaugeType,
32 /// The sum of all values reported in the bucket.
33 pub sum: GaugeType,
34 /// The number of times this bucket was updated with a new value.
35 pub count: u64,
36}
37
38impl GaugeValue {
39 /// Creates a gauge snapshot from a single value.
40 pub fn single(value: GaugeType) -> Self {
41 Self {
42 last: value,
43 min: value,
44 max: value,
45 sum: value,
46 count: 1,
47 }
48 }
49
50 /// Inserts a new value into the gauge.
51 pub fn insert(&mut self, value: GaugeType) {
52 self.last = value;
53 self.min = self.min.min(value);
54 self.max = self.max.max(value);
55 self.sum = self.sum.saturating_add(value);
56 self.count += 1;
57 }
58
59 /// Merges two gauge snapshots.
60 pub fn merge(&mut self, other: Self) {
61 self.last = other.last;
62 self.min = self.min.min(other.min);
63 self.max = self.max.max(other.max);
64 self.sum = self.sum.saturating_add(other.sum);
65 self.count += other.count;
66 }
67
68 /// Returns the average of all values reported in this bucket.
69 pub fn avg(&self) -> Option<GaugeType> {
70 self.sum / FiniteF64::new(self.count as f64)?
71 }
72}
73
74/// A distribution of values within a [`Bucket`].
75///
76/// Distributions logically store a histogram of values. Based on individual reported values,
77/// distributions allow to query the maximum, minimum, or average of the reported values, as well as
78/// statistical quantiles.
79///
80/// # Example
81///
82/// ```
83/// use relay_metrics::dist;
84///
85/// let mut dist = dist![1, 1, 1, 2];
86/// dist.push(5.into());
87/// dist.extend(std::iter::repeat(3.into()).take(7));
88/// ```
89///
90/// Logically, this distribution is equivalent to this visualization:
91///
92/// ```plain
93/// value | count
94/// 1.0 | ***
95/// 2.0 | *
96/// 3.0 | *******
97/// 4.0 |
98/// 5.0 | *
99/// ```
100///
101/// # Serialization
102///
103/// Distributions serialize as lists of floating point values. The list contains one entry for each
104/// value in the distribution, including duplicates.
105pub type DistributionValue = SmallVec<[DistributionType; 3]>;
106
107#[doc(hidden)]
108pub use smallvec::smallvec as _smallvec;
109
110/// Creates a [`DistributionValue`] containing the given arguments.
111///
112/// `dist!` allows `DistributionValue` to be defined with the same syntax as array expressions.
113///
114/// # Example
115///
116/// ```
117/// let dist = relay_metrics::dist![1, 2];
118/// ```
119#[macro_export]
120macro_rules! dist {
121 ($($x:expr),*$(,)*) => {
122 $crate::_smallvec!($($crate::DistributionType::from($x)),*) as $crate::DistributionValue
123 };
124}
125
126/// A set of unique values.
127///
128/// Set values can be specified as strings in the submission protocol. They are always hashed
129/// into a 32-bit value and the original value is dropped. If the submission protocol contains a
130/// 32-bit integer, it will be used directly, instead.
131///
132/// See the [bucket docs](crate::Bucket) for more information on set hashing.
133pub type SetValue = BTreeSet<SetType>;
134
135/// The [aggregated value](Bucket::value) of a metric bucket.
136#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
137#[serde(tag = "type", content = "value")]
138pub enum BucketValue {
139 /// Counts instances of an event ([`MetricType::Counter`]).
140 ///
141 /// Counters can be incremented and decremented. The default operation is to increment a counter
142 /// by `1`, although increments by larger values are equally possible.
143 ///
144 /// # Statsd Format
145 ///
146 /// Counters are declared as `"c"`. Alternatively, `"m"` is allowed.
147 ///
148 /// There can be a variable number of floating point values. If more than one value is given,
149 /// the values are summed into a single counter value:
150 ///
151 /// ```text
152 /// endpoint.hits:4.5:21:17.0|c
153 /// ```
154 ///
155 /// # Serialization
156 ///
157 /// This variant serializes to a double precision float.
158 ///
159 /// # Aggregation
160 ///
161 /// Counters aggregate by folding individual values into a single sum value per bucket. The sum
162 /// is ingested and stored directly.
163 #[serde(rename = "c")]
164 Counter(CounterType),
165
166 /// Builds a statistical distribution over values reported ([`MetricType::Distribution`]).
167 ///
168 /// Based on individual reported values, distributions allow to query the maximum, minimum, or
169 /// average of the reported values, as well as statistical quantiles. With an increasing number
170 /// of values in the distribution, its accuracy becomes approximate.
171 ///
172 /// # Statsd Format
173 ///
174 /// Distributions are declared as `"d"`. Alternatively, `"d"` and `"ms"` are allowed.
175 ///
176 /// There can be a variable number of floating point values. These values are collected directly
177 /// in a list per bucket.
178 ///
179 /// ```text
180 /// endpoint.response_time@millisecond:36:49:57:68|d
181 /// ```
182 ///
183 /// # Serialization
184 ///
185 /// This variant serializes to a list of double precision floats, see [`DistributionValue`].
186 ///
187 /// # Aggregation
188 ///
189 /// During ingestion, all individual reported values are collected in a lossless format. In
190 /// storage, these values are compressed into data sketches that allow to query quantiles.
191 /// Separately, the count and sum of the reported values is stored, which makes distributions a
192 /// strict superset of counters.
193 #[serde(rename = "d")]
194 Distribution(DistributionValue),
195
196 /// Counts the number of unique reported values.
197 ///
198 /// Sets allow sending arbitrary discrete values, including strings, and store the deduplicated
199 /// count. With an increasing number of unique values in the set, its accuracy becomes
200 /// approximate. It is not possible to query individual values from a set.
201 ///
202 /// # Statsd Format
203 ///
204 /// Sets are declared as `"s"`. Values in the list should be deduplicated.
205 ///
206 ///
207 /// ```text
208 /// endpoint.users:3182887624:4267882815|s
209 /// endpoint.users:e2546e4c-ecd0-43ad-ae27-87960e57a658|s
210 /// ```
211 ///
212 /// # Serialization
213 ///
214 /// This variant serializes to a list of 32-bit integers.
215 ///
216 /// # Aggregation
217 ///
218 /// Set values are internally represented as 32-bit integer hashes of the original value. These
219 /// hashes can be ingested directly as seen in the first example above. If raw strings are sent,
220 /// they will be hashed on-the-fly.
221 ///
222 /// Internally, set metrics are stored in data sketches that expose an approximate cardinality.
223 #[serde(rename = "s")]
224 Set(SetValue),
225
226 /// Stores absolute snapshots of values.
227 ///
228 /// In addition to plain [counters](Self::Counter), gauges store a snapshot of the maximum,
229 /// minimum and sum of all values, as well as the last reported value. Note that the "last"
230 /// component of this aggregation is not commutative. Which value is preserved as last value is
231 /// implementation-defined.
232 ///
233 /// # Statsd Format
234 ///
235 /// Gauges are declared as `"g"`. There are two ways to ingest gauges:
236 /// 1. As a single value. In this case, the provided value is assumed as the last, minimum,
237 /// maximum, and the sum.
238 /// 2. As a sequence of five values in the order: `last`, `min`, `max`, `sum`, `count`.
239 ///
240 /// ```text
241 /// endpoint.parallel_requests:25|g
242 /// endpoint.parallel_requests:25:17:42:220:85|g
243 /// ```
244 ///
245 /// # Serialization
246 ///
247 /// This variant serializes to a structure with named fields, see [`GaugeValue`].
248 ///
249 /// # Aggregation
250 ///
251 /// Gauges aggregate by folding each of the components based on their semantics:
252 /// - `last` assumes the newly added value
253 /// - `min` retains the smaller value
254 /// - `max` retains the larger value
255 /// - `sum` adds the new value to the existing sum
256 /// - `count` adds the count of the newly added gauge (defaulting to `1`)
257 #[serde(rename = "g")]
258 Gauge(GaugeValue),
259}
260
261impl BucketValue {
262 /// Returns a bucket value representing a counter with the given value.
263 pub fn counter(value: CounterType) -> Self {
264 Self::Counter(value)
265 }
266
267 /// Returns a bucket value representing a distribution with a single given value.
268 pub fn distribution(value: DistributionType) -> Self {
269 Self::Distribution(dist![value])
270 }
271
272 /// Returns a bucket value representing a set with a single given hash value.
273 pub fn set(value: SetType) -> Self {
274 Self::Set(std::iter::once(value).collect())
275 }
276
277 /// Returns a bucket value representing a set with a single given string value.
278 pub fn set_from_str(string: &str) -> Self {
279 Self::set(hash_set_value(string))
280 }
281
282 /// Returns a bucket value representing a set with a single given value.
283 pub fn set_from_display(display: impl fmt::Display) -> Self {
284 Self::set(hash_set_value(&display.to_string()))
285 }
286
287 /// Returns a bucket value representing a gauge with a single given value.
288 pub fn gauge(value: GaugeType) -> Self {
289 Self::Gauge(GaugeValue::single(value))
290 }
291
292 /// Returns the type of this value.
293 pub fn ty(&self) -> MetricType {
294 match self {
295 Self::Counter(_) => MetricType::Counter,
296 Self::Distribution(_) => MetricType::Distribution,
297 Self::Set(_) => MetricType::Set,
298 Self::Gauge(_) => MetricType::Gauge,
299 }
300 }
301
302 /// Returns the number of raw data points in this value.
303 pub fn len(&self) -> usize {
304 match self {
305 BucketValue::Counter(_) => 1,
306 BucketValue::Distribution(distribution) => distribution.len(),
307 BucketValue::Set(set) => set.len(),
308 BucketValue::Gauge(_) => 5,
309 }
310 }
311
312 /// Returns `true` if this bucket contains no values.
313 pub fn is_empty(&self) -> bool {
314 self.len() == 0
315 }
316
317 /// Estimates the number of bytes needed to encode the bucket value.
318 ///
319 /// Note that this does not necessarily match the exact memory footprint of the value,
320 /// because data structures have a memory overhead.
321 pub fn cost(&self) -> usize {
322 // Beside the size of [`BucketValue`], we also need to account for the cost of values
323 // allocated dynamically.
324 let allocated_cost = match self {
325 Self::Counter(_) => 0,
326 Self::Set(s) => mem::size_of::<SetType>() * s.len(),
327 Self::Gauge(_) => 0,
328 Self::Distribution(d) => d.len() * mem::size_of::<DistributionType>(),
329 };
330
331 mem::size_of::<Self>() + allocated_cost
332 }
333
334 /// Merges the given `bucket_value` into `self`.
335 ///
336 /// Returns `Ok(())` if the two bucket values can be merged. This is the case when both bucket
337 /// values are of the same variant. Otherwise, this returns `Err(other)`.
338 pub fn merge(&mut self, other: Self) -> Result<(), Self> {
339 match (self, other) {
340 (Self::Counter(slf), Self::Counter(other)) => *slf = slf.saturating_add(other),
341 (Self::Distribution(slf), Self::Distribution(other)) => slf.extend_from_slice(&other),
342 (Self::Set(slf), Self::Set(other)) => slf.extend(other),
343 (Self::Gauge(slf), Self::Gauge(other)) => slf.merge(other),
344 (_, other) => return Err(other),
345 }
346
347 Ok(())
348 }
349}
350
351/// Parses a list of counter values separated by colons and sums them up.
352fn parse_counter(string: &str) -> Option<CounterType> {
353 let mut sum = CounterType::default();
354 for component in string.split(VALUE_SEPARATOR) {
355 sum = sum.saturating_add(component.parse().ok()?);
356 }
357 Some(sum)
358}
359
360/// Parses a distribution from a list of floating point values separated by colons.
361fn parse_distribution(string: &str) -> Option<DistributionValue> {
362 let mut dist = DistributionValue::default();
363 for component in string.split(VALUE_SEPARATOR) {
364 dist.push(component.parse().ok()?);
365 }
366 Some(dist)
367}
368
369/// Parses a set of hashed numeric values.
370fn parse_set(string: &str) -> Option<SetValue> {
371 let mut set = SetValue::default();
372 for component in string.split(VALUE_SEPARATOR) {
373 let hash = component
374 .parse()
375 .unwrap_or_else(|_| protocol::hash_set_value(component));
376 set.insert(hash);
377 }
378 Some(set)
379}
380
381/// Parses a gauge from a value.
382///
383/// The gauge can either be given as a single floating point value, or as a list of exactly five
384/// values in the order of [`GaugeValue`] fields.
385fn parse_gauge(string: &str) -> Option<GaugeValue> {
386 let mut components = string.split(VALUE_SEPARATOR);
387
388 let last = components.next()?.parse().ok()?;
389 Some(if let Some(min) = components.next() {
390 GaugeValue {
391 last,
392 min: min.parse().ok()?,
393 max: components.next()?.parse().ok()?,
394 sum: components.next()?.parse().ok()?,
395 count: components.next()?.parse().ok()?,
396 }
397 } else {
398 GaugeValue::single(last)
399 })
400}
401
402/// Parses tags in the format `tag1,tag2:value`.
403///
404/// Tag values are optional. For tags with missing values, an empty `""` value is assumed.
405fn parse_tags(string: &str) -> Option<MetricTags> {
406 let mut map = MetricTags::new();
407
408 for pair in string.split(',') {
409 let mut name_value = pair.splitn(2, ':');
410
411 let name = name_value.next()?;
412 if !protocol::is_valid_tag_key(name) {
413 continue;
414 }
415
416 if let Ok(value) = protocol::unescape_tag_value(name_value.next().unwrap_or_default()) {
417 map.insert(name.to_owned(), value);
418 }
419 }
420
421 Some(map)
422}
423
424/// Parses a unix UTC timestamp.
425fn parse_timestamp(string: &str) -> Option<UnixTimestamp> {
426 string.parse().ok().map(UnixTimestamp::from_secs)
427}
428
429/// An aggregation of metric values.
430///
431/// As opposed to single metric values, bucket aggregations can carry multiple values. See
432/// [`MetricType`] for a description on how values are aggregated in buckets. Values are aggregated
433/// by metric name, type, time window, and all tags. Particularly, this allows metrics to have the
434/// same name even if their types differ.
435///
436/// See the [crate documentation](crate) for general information on Metrics.
437///
438/// # Values
439///
440/// The contents of a bucket, especially their representation and serialization, depend on the
441/// metric type:
442///
443/// - [Counters](BucketValue::Counter) store a single value, serialized as floating point.
444/// - [Distributions](MetricType::Distribution) and [sets](MetricType::Set) store the full set of
445/// reported values.
446/// - [Gauges](BucketValue::Gauge) store a snapshot of reported values, see [`GaugeValue`].
447///
448/// # Submission Protocol
449///
450/// ```text
451/// <name>[@unit]:<value>[:<value>...]|<type>[|#<tag_key>:<tag_value>,<tag>][|T<timestamp>]
452/// ```
453///
454/// See the [field documentation](Bucket#fields) for more information on the components. An example
455/// submission looks like this:
456///
457/// ```text
458#[doc = include_str!("../tests/fixtures/buckets.statsd.txt")]
459/// ```
460///
461/// To parse a submission payload, use [`Bucket::parse_all`].
462///
463/// # JSON Representation
464///
465/// Alternatively to the submission protocol, metrics can be represented as structured data in JSON.
466/// The data type of the `value` field is determined by the metric type.
467///
468/// In addition to the submission protocol, buckets have a required [`width`](Self::width) field in
469/// their JSON representation.
470///
471/// ```json
472#[doc = include_str!("../tests/fixtures/buckets.json")]
473/// ```
474///
475/// To parse a JSON payload, use [`serde_json`].
476///
477/// # Hashing of Sets
478///
479/// Set values can be specified as strings in the submission protocol. They are always hashed
480/// into a 32-bit value and the original value is dropped. If the submission protocol contains a
481/// 32-bit integer, it will be used directly, instead.
482///
483/// **Example**:
484///
485/// ```text
486#[doc = include_str!("../tests/fixtures/set.statsd.txt")]
487/// ```
488///
489/// The above submission is represented as:
490///
491/// ```json
492#[doc = include_str!("../tests/fixtures/set.json")]
493/// ```
494#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
495pub struct Bucket {
496 /// The start time of the bucket's time window.
497 ///
498 /// If a timestamp is not supplied as part of the submission payload, the default timestamp
499 /// supplied to [`Bucket::parse`] or [`Bucket::parse_all`] is associated with the metric. It is
500 /// then aligned with the aggregation window.
501 ///
502 /// # Statsd Format
503 ///
504 /// In statsd, timestamps are part of the `|`-separated list following values. Timestamps start
505 /// with the literal character `'T'` followed by the UNIX timestamp.
506 ///
507 /// The timestamp must be a positive integer in decimal notation representing the value of the
508 /// UNIX timestamp.
509 ///
510 /// # Example
511 ///
512 /// ```text
513 /// endpoint.hits:1|c|T1615889440
514 /// ```
515 pub timestamp: UnixTimestamp,
516
517 /// The length of the time window in seconds.
518 ///
519 /// To initialize a new bucket, choose `0` as width. Once the bucket is tracked by Relay's
520 /// aggregator, the width is aligned with configuration for the namespace and the timestamp is
521 /// adjusted accordingly.
522 ///
523 /// # Statsd Format
524 ///
525 /// Specifying the bucket width in statsd is not supported.
526 pub width: u64,
527
528 /// The name of the metric in MRI (metric resource identifier) format.
529 ///
530 /// MRIs have the format `<type>:<ns>/<name>@<unit>`. See [`MetricResourceIdentifier`] for
531 /// information on fields and representations.
532 ///
533 /// # Statsd Format
534 ///
535 /// MRIs are sent in a more relaxed format: `[<namespace/]name[@unit]`. The value type is not
536 /// part of the metric name and namespaces are optional.
537 ///
538 /// Namespaces and units must consist of ASCII characters and match the regular expression
539 /// `/\w+/`. The name component of MRIs consist of unicode characters and must match the
540 /// regular expression `/\w[\w\-.]*/`. Note that the name must begin with a letter.
541 ///
542 /// Per convention, dots separate metric names into components, where the leading components are
543 /// considered namespaces and the final component is the name of the metric within its
544 /// namespace.
545 ///
546 /// # Examples
547 ///
548 /// ```text
549 /// endpoint.hits:1|c
550 /// custom/endpoint.hits:1|c
551 /// custom/endpoint.duration@millisecond:21.5|d
552 /// ```
553 pub name: MetricName,
554
555 /// The type and aggregated values of this bucket.
556 ///
557 /// Buckets support multiple values that are aggregated and can be accessed using a range of
558 /// aggregation functions depending on the value type. While always a variable number of values
559 /// can be sent in, some aggregations reduce the raw values to a fixed set of aggregates.
560 ///
561 /// See [`BucketValue`] for more examples and semantics.
562 ///
563 /// # Statsd Payload
564 ///
565 /// The bucket value and its type are specified in separate fields following the metric name in
566 /// the format: `<name>:<value>|<type>`. Values must be base-10 floating point numbers with
567 /// optional decimal places.
568 ///
569 /// It is possible to pack multiple values into a single datagram, but note that the type and
570 /// the value representation must match for this. Refer to the [`BucketValue`] docs for more
571 /// examples.
572 ///
573 /// # Example
574 ///
575 /// ```text
576 /// endpoint.hits:21|c
577 /// endpoint.hits:4.5|c
578 /// ```
579 #[serde(flatten)]
580 pub value: BucketValue,
581
582 /// A list of tags adding dimensions to the metric for filtering and aggregation.
583 ///
584 /// Tags allow to compute separate aggregates to filter or group metric values by any number of
585 /// dimensions. Tags consist of a unique tag key and one associated value. For tags with missing
586 /// values, an empty `""` value is assumed at query time.
587 ///
588 /// # Statsd Format
589 ///
590 /// Tags are preceded with a hash `#` and specified in a comma (`,`) separated list. Each tag
591 /// can either be a tag name, or a `name:value` combination. Tags are optional and can be
592 /// omitted.
593 ///
594 /// Tag keys are restricted to ASCII characters and must match the regular expression
595 /// `/[\w\-.\/]+/`.
596 ///
597 /// Tag values can contain unicode characters with the following escaping rules:
598 /// - Tab is escaped as `\t`.
599 /// - Carriage return is escaped as `\r`.
600 /// - Line feed is escaped as `\n`.
601 /// - Backslash is escaped as `\\`.
602 /// - Commas and pipes are given unicode escapes in the form `\u{2c}` and `\u{7c}`,
603 /// respectively.
604 ///
605 /// # Example
606 ///
607 /// ```text
608 /// endpoint.hits:1|c|#route:user_index,environment:production,release:1.4.0
609 /// ```
610 #[serde(default, skip_serializing_if = "MetricTags::is_empty")]
611 pub tags: MetricTags,
612
613 /// Relay internal metadata for a metric bucket.
614 ///
615 /// The metadata contains meta information about the metric bucket itself,
616 /// for example how many this bucket has been aggregated in total.
617 #[serde(default, skip_serializing_if = "BucketMetadata::is_default")]
618 pub metadata: BucketMetadata,
619}
620
621impl Bucket {
622 /// Parses a statsd-compatible payload.
623 ///
624 /// ```text
625 /// [<ns>/]<name>[@<unit>]:<value>|<type>[|#<tags>]`
626 /// ```
627 fn parse_str(string: &str, timestamp: UnixTimestamp) -> Option<Self> {
628 let mut components = string.split('|');
629
630 let (mri_str, values_str) = components.next()?.split_once(':')?;
631 let ty = components.next().and_then(|s| s.parse().ok())?;
632
633 let mri = MetricResourceIdentifier::parse_with_type(mri_str, ty).ok()?;
634 let value = match ty {
635 MetricType::Counter => BucketValue::Counter(parse_counter(values_str)?),
636 MetricType::Distribution => BucketValue::Distribution(parse_distribution(values_str)?),
637 MetricType::Set => BucketValue::Set(parse_set(values_str)?),
638 MetricType::Gauge => BucketValue::Gauge(parse_gauge(values_str)?),
639 };
640
641 let mut bucket = Bucket {
642 timestamp,
643 width: 0,
644 name: mri.to_string().into(),
645 value,
646 tags: Default::default(),
647 metadata: Default::default(),
648 };
649
650 for component in components {
651 match component.chars().next() {
652 Some('#') => {
653 bucket.tags = parse_tags(component.get(1..)?)?;
654 }
655 Some('T') => {
656 bucket.timestamp = parse_timestamp(component.get(1..)?)?;
657 }
658 _ => (),
659 }
660 }
661
662 Some(bucket)
663 }
664
665 /// Parses a single metric aggregate from the raw protocol.
666 ///
667 /// See the [`Bucket`] for more information on the protocol.
668 ///
669 /// # Example
670 ///
671 /// ```
672 /// use relay_metrics::{Bucket, UnixTimestamp};
673 ///
674 /// let bucket = Bucket::parse(b"response_time@millisecond:57|d", UnixTimestamp::now())
675 /// .expect("metric should parse");
676 /// ```
677 pub fn parse(slice: &[u8], timestamp: UnixTimestamp) -> Result<Self, ParseMetricError> {
678 let string = std::str::from_utf8(slice).map_err(|_| ParseMetricError)?;
679 Self::parse_str(string, timestamp).ok_or(ParseMetricError)
680 }
681
682 /// Parses a set of metric aggregates from the raw protocol.
683 ///
684 /// Returns a metric result for each line in `slice`, ignoring empty lines. Both UNIX newlines
685 /// (`\n`) and Windows newlines (`\r\n`) are supported.
686 ///
687 /// It is possible to continue consuming the iterator after `Err` is yielded.
688 ///
689 /// See [`Bucket`] for more information on the protocol.
690 ///
691 /// # Example
692 ///
693 /// ```
694 /// use relay_metrics::{Bucket, UnixTimestamp};
695 ///
696 /// let data = br#"
697 /// endpoint.response_time@millisecond:57|d
698 /// endpoint.hits:1|c
699 /// "#;
700 ///
701 /// for metric_result in Bucket::parse_all(data, UnixTimestamp::now()) {
702 /// let bucket = metric_result.expect("metric should parse");
703 /// println!("Metric {}: {:?}", bucket.name, bucket.value);
704 /// }
705 /// ```
706 pub fn parse_all(slice: &[u8], timestamp: UnixTimestamp) -> ParseBuckets<'_> {
707 ParseBuckets { slice, timestamp }
708 }
709
710 /// Returns the value of the specified tag if it exists.
711 pub fn tag(&self, name: &str) -> Option<&str> {
712 self.tags.get(name).map(|s| s.as_str())
713 }
714
715 /// Removes the value of the specified tag.
716 ///
717 /// If the tag exists, the removed value is returned.
718 pub fn remove_tag(&mut self, name: &str) -> Option<String> {
719 self.tags.remove(name)
720 }
721}
722
723/// Relay internal metadata for a metric bucket.
724#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
725pub struct BucketMetadata {
726 /// How many times the bucket was merged.
727 ///
728 /// Creating a new bucket is the first merge.
729 /// Merging two buckets sums the amount of merges.
730 ///
731 /// For example: Merging two un-merged buckets will yield a total
732 /// of `2` merges.
733 ///
734 /// Due to how Relay aggregates metrics and later splits them into multiple
735 /// buckets again, the amount of merges can be zero.
736 /// When splitting a bucket the total volume of the bucket may only be attributed
737 /// to one part or distributed across the resulting buckets, in either case
738 /// values of `0` are possible.
739 pub merges: u32,
740
741 /// Received timestamp of the first metric in this bucket.
742 ///
743 /// This field should be set to the time in which the first metric of a specific bucket was
744 /// received in the outermost internal Relay.
745 pub received_at: Option<UnixTimestamp>,
746
747 /// Is `true` if this metric was extracted from a sampled/indexed envelope item.
748 ///
749 /// The final dynamic sampling decision is always made in processing Relays.
750 /// If a metric was extracted from an item which is sampled (i.e. retained by dynamic sampling), this flag is `true`.
751 ///
752 /// Since these metrics from samples carry additional information, e.g. they don't
753 /// require rate limiting since the sample they've been extracted from was already
754 /// rate limited, this flag must be included in the aggregation key when aggregation buckets.
755 #[serde(skip)]
756 pub extracted_from_indexed: bool,
757}
758
759impl BucketMetadata {
760 /// Creates a fresh metadata instance.
761 ///
762 /// The new metadata is initialized with `1` merge and a given `received_at` timestamp.
763 pub fn new(received_at: UnixTimestamp) -> Self {
764 Self {
765 merges: 1,
766 received_at: Some(received_at),
767 extracted_from_indexed: false,
768 }
769 }
770
771 /// Whether the metadata does not contain more information than the default.
772 pub fn is_default(&self) -> bool {
773 &Self::default() == self
774 }
775
776 /// Merges another metadata object into the current one.
777 pub fn merge(&mut self, other: Self) {
778 self.merges = self.merges.saturating_add(other.merges);
779 self.received_at = match (self.received_at, other.received_at) {
780 (Some(received_at), None) => Some(received_at),
781 (None, Some(received_at)) => Some(received_at),
782 (left, right) => left.min(right),
783 };
784 }
785}
786
787impl Default for BucketMetadata {
788 fn default() -> Self {
789 Self {
790 merges: 1,
791 received_at: None,
792 extracted_from_indexed: false,
793 }
794 }
795}
796
797/// Iterator over parsed metrics returned from [`Bucket::parse_all`].
798#[derive(Clone, Debug)]
799pub struct ParseBuckets<'a> {
800 slice: &'a [u8],
801 timestamp: UnixTimestamp,
802}
803
804impl Default for ParseBuckets<'_> {
805 fn default() -> Self {
806 Self {
807 slice: &[],
808 // The timestamp will never be returned.
809 timestamp: UnixTimestamp::from_secs(4711),
810 }
811 }
812}
813
814impl Iterator for ParseBuckets<'_> {
815 type Item = Result<Bucket, ParseMetricError>;
816
817 fn next(&mut self) -> Option<Self::Item> {
818 loop {
819 if self.slice.is_empty() {
820 return None;
821 }
822
823 let mut split = self.slice.splitn(2, |&b| b == b'\n');
824 let current = split.next()?;
825 self.slice = split.next().unwrap_or_default();
826
827 let string = match std::str::from_utf8(current) {
828 Ok(string) => string.strip_suffix('\r').unwrap_or(string),
829 Err(_) => return Some(Err(ParseMetricError)),
830 };
831
832 if !string.is_empty() {
833 return Some(Bucket::parse_str(string, self.timestamp).ok_or(ParseMetricError));
834 }
835 }
836 }
837}
838
839impl FusedIterator for ParseBuckets<'_> {}
840
841#[cfg(test)]
842mod tests {
843 use similar_asserts::assert_eq;
844
845 use crate::protocol::{DurationUnit, MetricUnit};
846
847 use super::*;
848
849 #[test]
850 fn test_distribution_value_size() {
851 // DistributionValue uses a SmallVec internally to prevent an additional allocation and
852 // indirection in cases where it needs to store only a small number of items. This is
853 // enabled by a comparably large `GaugeValue`, which stores five atoms. Ensure that the
854 // `DistributionValue`'s size does not exceed that of `GaugeValue`.
855 assert!(
856 std::mem::size_of::<DistributionValue>() <= std::mem::size_of::<GaugeValue>(),
857 "distribution value should not exceed gauge {}",
858 std::mem::size_of::<DistributionValue>()
859 );
860 }
861
862 #[test]
863 fn test_bucket_value_merge_counter() {
864 let mut value = BucketValue::Counter(42.into());
865 value.merge(BucketValue::Counter(43.into())).unwrap();
866 assert_eq!(value, BucketValue::Counter(85.into()));
867 }
868
869 #[test]
870 fn test_bucket_value_merge_distribution() {
871 let mut value = BucketValue::Distribution(dist![1, 2, 3]);
872 value.merge(BucketValue::Distribution(dist![2, 4])).unwrap();
873 assert_eq!(value, BucketValue::Distribution(dist![1, 2, 3, 2, 4]));
874 }
875
876 #[test]
877 fn test_bucket_value_merge_set() {
878 let mut value = BucketValue::Set(vec![1, 2].into_iter().collect());
879 value.merge(BucketValue::Set([2, 3].into())).unwrap();
880 assert_eq!(value, BucketValue::Set(vec![1, 2, 3].into_iter().collect()));
881 }
882
883 #[test]
884 fn test_bucket_value_merge_gauge() {
885 let mut value = BucketValue::Gauge(GaugeValue::single(42.into()));
886 value.merge(BucketValue::gauge(43.into())).unwrap();
887
888 assert_eq!(
889 value,
890 BucketValue::Gauge(GaugeValue {
891 last: 43.into(),
892 min: 42.into(),
893 max: 43.into(),
894 sum: 85.into(),
895 count: 2,
896 })
897 );
898 }
899
900 #[test]
901 fn test_parse_garbage() {
902 let s = "x23-408j17z4232@#34d\nc3456y7^😎";
903 let timestamp = UnixTimestamp::from_secs(4711);
904 let result = Bucket::parse(s.as_bytes(), timestamp);
905 assert!(result.is_err());
906 }
907
908 #[test]
909 fn test_parse_counter() {
910 let s = "spans/foo:42|c";
911 let timestamp = UnixTimestamp::from_secs(4711);
912 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
913 insta::assert_debug_snapshot!(metric, @r#"
914 Bucket {
915 timestamp: UnixTimestamp(4711),
916 width: 0,
917 name: MetricName(
918 "c:spans/foo@none",
919 ),
920 value: Counter(
921 42.0,
922 ),
923 tags: {},
924 metadata: BucketMetadata {
925 merges: 1,
926 received_at: None,
927 extracted_from_indexed: false,
928 },
929 }
930 "#);
931 }
932
933 #[test]
934 fn test_parse_counter_packed() {
935 let s = "spans/foo:42:17:21|c";
936 let timestamp = UnixTimestamp::from_secs(4711);
937 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
938 assert_eq!(metric.value, BucketValue::Counter(80.into()));
939 }
940
941 #[test]
942 fn test_parse_distribution() {
943 let s = "spans/foo:17.5|d";
944 let timestamp = UnixTimestamp::from_secs(4711);
945 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
946 insta::assert_debug_snapshot!(metric, @r#"
947 Bucket {
948 timestamp: UnixTimestamp(4711),
949 width: 0,
950 name: MetricName(
951 "d:spans/foo@none",
952 ),
953 value: Distribution(
954 [
955 17.5,
956 ],
957 ),
958 tags: {},
959 metadata: BucketMetadata {
960 merges: 1,
961 received_at: None,
962 extracted_from_indexed: false,
963 },
964 }
965 "#);
966 }
967
968 #[test]
969 fn test_parse_distribution_packed() {
970 let s = "transactions/foo:17.5:21.9:42.7|d";
971 let timestamp = UnixTimestamp::from_secs(4711);
972 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
973 assert_eq!(
974 metric.value,
975 BucketValue::Distribution(dist![
976 FiniteF64::new(17.5).unwrap(),
977 FiniteF64::new(21.9).unwrap(),
978 FiniteF64::new(42.7).unwrap()
979 ])
980 );
981 }
982
983 #[test]
984 fn test_parse_histogram() {
985 let s = "transactions/foo:17.5|h"; // common alias for distribution
986 let timestamp = UnixTimestamp::from_secs(4711);
987 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
988 assert_eq!(
989 metric.value,
990 BucketValue::Distribution(dist![FiniteF64::new(17.5).unwrap()])
991 );
992 }
993
994 #[test]
995 fn test_parse_set() {
996 let s = "spans/foo:4267882815|s";
997 let timestamp = UnixTimestamp::from_secs(4711);
998 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
999 insta::assert_debug_snapshot!(metric, @r#"
1000 Bucket {
1001 timestamp: UnixTimestamp(4711),
1002 width: 0,
1003 name: MetricName(
1004 "s:spans/foo@none",
1005 ),
1006 value: Set(
1007 {
1008 4267882815,
1009 },
1010 ),
1011 tags: {},
1012 metadata: BucketMetadata {
1013 merges: 1,
1014 received_at: None,
1015 extracted_from_indexed: false,
1016 },
1017 }
1018 "#);
1019 }
1020
1021 #[test]
1022 fn test_parse_set_hashed() {
1023 let s = "transactions/foo:e2546e4c-ecd0-43ad-ae27-87960e57a658|s";
1024 let timestamp = UnixTimestamp::from_secs(4711);
1025 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1026 assert_eq!(metric.value, BucketValue::Set([4267882815].into()));
1027 }
1028
1029 #[test]
1030 fn test_parse_set_hashed_packed() {
1031 let s = "transactions/foo:e2546e4c-ecd0-43ad-ae27-87960e57a658:00449b66-d91f-4fb8-b324-4c8bdf2499f6|s";
1032 let timestamp = UnixTimestamp::from_secs(4711);
1033 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1034 assert_eq!(
1035 metric.value,
1036 BucketValue::Set([181348692, 4267882815].into())
1037 );
1038 }
1039
1040 #[test]
1041 fn test_parse_set_packed() {
1042 let s = "transactions/foo:3182887624:4267882815|s";
1043 let timestamp = UnixTimestamp::from_secs(4711);
1044 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1045 assert_eq!(
1046 metric.value,
1047 BucketValue::Set([3182887624, 4267882815].into())
1048 )
1049 }
1050
1051 #[test]
1052 fn test_parse_gauge() {
1053 let s = "spans/foo:42|g";
1054 let timestamp = UnixTimestamp::from_secs(4711);
1055 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1056 insta::assert_debug_snapshot!(metric, @r#"
1057 Bucket {
1058 timestamp: UnixTimestamp(4711),
1059 width: 0,
1060 name: MetricName(
1061 "g:spans/foo@none",
1062 ),
1063 value: Gauge(
1064 GaugeValue {
1065 last: 42.0,
1066 min: 42.0,
1067 max: 42.0,
1068 sum: 42.0,
1069 count: 1,
1070 },
1071 ),
1072 tags: {},
1073 metadata: BucketMetadata {
1074 merges: 1,
1075 received_at: None,
1076 extracted_from_indexed: false,
1077 },
1078 }
1079 "#);
1080 }
1081
1082 #[test]
1083 fn test_parse_gauge_packed() {
1084 let s = "spans/foo:25:17:42:220:85|g";
1085 let timestamp = UnixTimestamp::from_secs(4711);
1086 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1087 insta::assert_debug_snapshot!(metric, @r#"
1088 Bucket {
1089 timestamp: UnixTimestamp(4711),
1090 width: 0,
1091 name: MetricName(
1092 "g:spans/foo@none",
1093 ),
1094 value: Gauge(
1095 GaugeValue {
1096 last: 25.0,
1097 min: 17.0,
1098 max: 42.0,
1099 sum: 220.0,
1100 count: 85,
1101 },
1102 ),
1103 tags: {},
1104 metadata: BucketMetadata {
1105 merges: 1,
1106 received_at: None,
1107 extracted_from_indexed: false,
1108 },
1109 }
1110 "#);
1111 }
1112
1113 #[test]
1114 fn test_parse_implicit_namespace() {
1115 let s = "foo:42|c";
1116 let timestamp = UnixTimestamp::from_secs(4711);
1117 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1118 insta::assert_debug_snapshot!(metric, @r###"
1119 Bucket {
1120 timestamp: UnixTimestamp(4711),
1121 width: 0,
1122 name: MetricName(
1123 "c:custom/foo@none",
1124 ),
1125 value: Counter(
1126 42.0,
1127 ),
1128 tags: {},
1129 metadata: BucketMetadata {
1130 merges: 1,
1131 received_at: None,
1132 extracted_from_indexed: false,
1133 },
1134 }
1135 "###);
1136 }
1137
1138 #[test]
1139 fn test_parse_unit() {
1140 let s = "transactions/foo@second:17.5|d";
1141 let timestamp = UnixTimestamp::from_secs(4711);
1142 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1143 let mri = MetricResourceIdentifier::parse(&metric.name).unwrap();
1144 assert_eq!(mri.unit, MetricUnit::Duration(DurationUnit::Second));
1145 }
1146
1147 #[test]
1148 fn test_parse_unit_regression() {
1149 let s = "transactions/foo@s:17.5|d";
1150 let timestamp = UnixTimestamp::from_secs(4711);
1151 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1152 let mri = MetricResourceIdentifier::parse(&metric.name).unwrap();
1153 assert_eq!(mri.unit, MetricUnit::Duration(DurationUnit::Second));
1154 }
1155
1156 #[test]
1157 fn test_parse_tags() {
1158 let s = "transactions/foo:17.5|d|#foo,bar:baz";
1159 let timestamp = UnixTimestamp::from_secs(4711);
1160 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1161 insta::assert_debug_snapshot!(metric.tags, @r###"
1162 {
1163 "bar": "baz",
1164 "foo": "",
1165 }
1166 "###);
1167 }
1168
1169 #[test]
1170 fn test_parse_tags_escaped() {
1171 let s = "transactions/foo:17.5|d|#foo:😅\\u{2c}🚀";
1172 let timestamp = UnixTimestamp::from_secs(4711);
1173 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1174 insta::assert_debug_snapshot!(metric.tags, @r###"
1175 {
1176 "foo": "😅,🚀",
1177 }
1178 "###);
1179 }
1180
1181 #[test]
1182 fn test_parse_timestamp() {
1183 let s = "transactions/foo:17.5|d|T1615889449";
1184 let timestamp = UnixTimestamp::from_secs(4711);
1185 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1186 assert_eq!(metric.timestamp, UnixTimestamp::from_secs(1615889449));
1187 }
1188
1189 #[test]
1190 fn test_parse_sample_rate() {
1191 // Sample rate should be ignored
1192 let s = "transactions/foo:17.5|d|@0.1";
1193 let timestamp = UnixTimestamp::from_secs(4711);
1194 Bucket::parse(s.as_bytes(), timestamp).unwrap();
1195 }
1196
1197 #[test]
1198 fn test_parse_invalid_name() {
1199 let s = "foo#bar:42|c";
1200 let timestamp = UnixTimestamp::from_secs(4711);
1201 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1202 assert_eq!(metric.name.as_ref(), "c:custom/foo_bar@none");
1203 }
1204
1205 #[test]
1206 fn test_parse_empty_name() {
1207 let s = ":42|c";
1208 let timestamp = UnixTimestamp::from_secs(4711);
1209 let metric = Bucket::parse(s.as_bytes(), timestamp);
1210 assert!(metric.is_err());
1211 }
1212
1213 #[test]
1214 fn test_parse_invalid_name_with_leading_digit() {
1215 let s = "64bit:42|c";
1216 let timestamp = UnixTimestamp::from_secs(4711);
1217 let metric = Bucket::parse(s.as_bytes(), timestamp);
1218 assert!(metric.is_err());
1219 }
1220
1221 #[test]
1222 fn test_parse_all() {
1223 let s = "transactions/foo:42|c\nbar:17|c";
1224 let timestamp = UnixTimestamp::from_secs(4711);
1225
1226 let metrics: Vec<Bucket> = Bucket::parse_all(s.as_bytes(), timestamp)
1227 .collect::<Result<_, _>>()
1228 .unwrap();
1229
1230 assert_eq!(metrics.len(), 2);
1231 }
1232
1233 #[test]
1234 fn test_parse_all_crlf() {
1235 let s = "transactions/foo:42|c\r\nbar:17|c";
1236 let timestamp = UnixTimestamp::from_secs(4711);
1237
1238 let metrics: Vec<Bucket> = Bucket::parse_all(s.as_bytes(), timestamp)
1239 .collect::<Result<_, _>>()
1240 .unwrap();
1241
1242 assert_eq!(metrics.len(), 2);
1243 }
1244
1245 #[test]
1246 fn test_parse_all_empty_lines() {
1247 let s = "transactions/foo:42|c\n\n\nbar:17|c";
1248 let timestamp = UnixTimestamp::from_secs(4711);
1249
1250 let metric_count = Bucket::parse_all(s.as_bytes(), timestamp).count();
1251 assert_eq!(metric_count, 2);
1252 }
1253
1254 #[test]
1255 fn test_parse_all_trailing() {
1256 let s = "transactions/foo:42|c\nbar:17|c\n";
1257 let timestamp = UnixTimestamp::from_secs(4711);
1258
1259 let metric_count = Bucket::parse_all(s.as_bytes(), timestamp).count();
1260 assert_eq!(metric_count, 2);
1261 }
1262
1263 #[test]
1264 fn test_metrics_docs() {
1265 let text = include_str!("../tests/fixtures/buckets.statsd.txt").trim_end();
1266 let json = include_str!("../tests/fixtures/buckets.json").trim_end();
1267
1268 let timestamp = UnixTimestamp::from_secs(0);
1269 let statsd_metrics = Bucket::parse_all(text.as_bytes(), timestamp)
1270 .collect::<Result<Vec<_>, _>>()
1271 .unwrap();
1272
1273 let json_metrics: Vec<Bucket> = serde_json::from_str(json).unwrap();
1274
1275 assert_eq!(statsd_metrics, json_metrics);
1276 }
1277
1278 #[test]
1279 fn test_set_docs() {
1280 let text = include_str!("../tests/fixtures/set.statsd.txt").trim_end();
1281 let json = include_str!("../tests/fixtures/set.json").trim_end();
1282
1283 let timestamp = UnixTimestamp::from_secs(1615889449);
1284 let statsd_metric = Bucket::parse(text.as_bytes(), timestamp).unwrap();
1285 let json_metric: Bucket = serde_json::from_str(json).unwrap();
1286
1287 assert_eq!(statsd_metric, json_metric);
1288 }
1289
1290 #[test]
1291 fn test_parse_buckets() {
1292 let json = r#"[
1293 {
1294 "name": "endpoint.response_time",
1295 "unit": "millisecond",
1296 "value": [36, 49, 57, 68],
1297 "type": "d",
1298 "timestamp": 1615889440,
1299 "width": 10,
1300 "tags": {
1301 "route": "user_index"
1302 },
1303 "metadata": {
1304 "merges": 1,
1305 "received_at": 1615889440
1306 }
1307 }
1308 ]"#;
1309
1310 let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1311
1312 insta::assert_debug_snapshot!(buckets, @r###"
1313 [
1314 Bucket {
1315 timestamp: UnixTimestamp(1615889440),
1316 width: 10,
1317 name: MetricName(
1318 "endpoint.response_time",
1319 ),
1320 value: Distribution(
1321 [
1322 36.0,
1323 49.0,
1324 57.0,
1325 68.0,
1326 ],
1327 ),
1328 tags: {
1329 "route": "user_index",
1330 },
1331 metadata: BucketMetadata {
1332 merges: 1,
1333 received_at: Some(
1334 UnixTimestamp(1615889440),
1335 ),
1336 extracted_from_indexed: false,
1337 },
1338 },
1339 ]
1340 "###);
1341 }
1342
1343 #[test]
1344 fn test_parse_bucket_defaults() {
1345 let json = r#"[
1346 {
1347 "name": "endpoint.hits",
1348 "value": 4,
1349 "type": "c",
1350 "timestamp": 1615889440,
1351 "width": 10,
1352 "metadata": {
1353 "merges": 1,
1354 "received_at": 1615889440
1355 }
1356 }
1357 ]"#;
1358
1359 let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1360
1361 insta::assert_debug_snapshot!(buckets, @r###"
1362 [
1363 Bucket {
1364 timestamp: UnixTimestamp(1615889440),
1365 width: 10,
1366 name: MetricName(
1367 "endpoint.hits",
1368 ),
1369 value: Counter(
1370 4.0,
1371 ),
1372 tags: {},
1373 metadata: BucketMetadata {
1374 merges: 1,
1375 received_at: Some(
1376 UnixTimestamp(1615889440),
1377 ),
1378 extracted_from_indexed: false,
1379 },
1380 },
1381 ]
1382 "###);
1383 }
1384
1385 #[test]
1386 fn test_buckets_roundtrip() {
1387 let json = r#"[
1388 {
1389 "timestamp": 1615889440,
1390 "width": 10,
1391 "name": "endpoint.response_time",
1392 "type": "d",
1393 "value": [
1394 36.0,
1395 49.0,
1396 57.0,
1397 68.0
1398 ],
1399 "tags": {
1400 "route": "user_index"
1401 }
1402 },
1403 {
1404 "timestamp": 1615889440,
1405 "width": 10,
1406 "name": "endpoint.hits",
1407 "type": "c",
1408 "value": 4.0,
1409 "tags": {
1410 "route": "user_index"
1411 }
1412 },
1413 {
1414 "timestamp": 1615889440,
1415 "width": 10,
1416 "name": "endpoint.parallel_requests",
1417 "type": "g",
1418 "value": {
1419 "last": 25.0,
1420 "min": 17.0,
1421 "max": 42.0,
1422 "sum": 2210.0,
1423 "count": 85
1424 }
1425 },
1426 {
1427 "timestamp": 1615889440,
1428 "width": 10,
1429 "name": "endpoint.users",
1430 "type": "s",
1431 "value": [
1432 3182887624,
1433 4267882815
1434 ],
1435 "tags": {
1436 "route": "user_index"
1437 }
1438 }
1439]"#;
1440
1441 let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1442 let serialized = serde_json::to_string_pretty(&buckets).unwrap();
1443 assert_eq!(json, serialized);
1444 }
1445
1446 #[test]
1447 fn test_bucket_docs_roundtrip() {
1448 let json = include_str!("../tests/fixtures/buckets.json")
1449 .trim_end()
1450 .replace("\r\n", "\n");
1451 let buckets = serde_json::from_str::<Vec<Bucket>>(&json).unwrap();
1452
1453 let serialized = serde_json::to_string_pretty(&buckets).unwrap();
1454 assert_eq!(json, serialized);
1455 }
1456
1457 #[test]
1458 fn test_bucket_metadata_merge() {
1459 let mut metadata = BucketMetadata::default();
1460
1461 let other_metadata = BucketMetadata::default();
1462 metadata.merge(other_metadata);
1463 assert_eq!(
1464 metadata,
1465 BucketMetadata {
1466 merges: 2,
1467 received_at: None,
1468 extracted_from_indexed: false,
1469 }
1470 );
1471
1472 let other_metadata = BucketMetadata::new(UnixTimestamp::from_secs(10));
1473 metadata.merge(other_metadata);
1474 assert_eq!(
1475 metadata,
1476 BucketMetadata {
1477 merges: 3,
1478 received_at: Some(UnixTimestamp::from_secs(10)),
1479 extracted_from_indexed: false,
1480 }
1481 );
1482
1483 let other_metadata = BucketMetadata::new(UnixTimestamp::from_secs(20));
1484 metadata.merge(other_metadata);
1485 assert_eq!(
1486 metadata,
1487 BucketMetadata {
1488 merges: 4,
1489 received_at: Some(UnixTimestamp::from_secs(10)),
1490 extracted_from_indexed: false,
1491 }
1492 );
1493 }
1494}