1use std::collections::{BTreeMap, BTreeSet};
2use std::hash::Hash;
3use std::iter::FusedIterator;
4use std::{fmt, mem};
5
6use hash32::{FnvHasher, Hasher as _};
7use relay_cardinality::CardinalityItem;
8use relay_common::time::UnixTimestamp;
9use relay_protocol::FiniteF64;
10use serde::{Deserialize, Serialize};
11use smallvec::SmallVec;
12
13use crate::protocol::{
14 self, CounterType, DistributionType, GaugeType, MetricName, MetricResourceIdentifier,
15 MetricType, SetType, hash_set_value,
16};
17use crate::{MetricNamespace, ParseMetricError};
18
19const VALUE_SEPARATOR: char = ':';
20
21pub type MetricTags = BTreeMap<String, String>;
23
24#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
26pub struct GaugeValue {
27 pub last: GaugeType,
31 pub min: GaugeType,
33 pub max: GaugeType,
35 pub sum: GaugeType,
37 pub count: u64,
39}
40
41impl GaugeValue {
42 pub fn single(value: GaugeType) -> Self {
44 Self {
45 last: value,
46 min: value,
47 max: value,
48 sum: value,
49 count: 1,
50 }
51 }
52
53 pub fn insert(&mut self, value: GaugeType) {
55 self.last = value;
56 self.min = self.min.min(value);
57 self.max = self.max.max(value);
58 self.sum = self.sum.saturating_add(value);
59 self.count += 1;
60 }
61
62 pub fn merge(&mut self, other: Self) {
64 self.last = other.last;
65 self.min = self.min.min(other.min);
66 self.max = self.max.max(other.max);
67 self.sum = self.sum.saturating_add(other.sum);
68 self.count += other.count;
69 }
70
71 pub fn avg(&self) -> Option<GaugeType> {
73 self.sum / FiniteF64::new(self.count as f64)?
74 }
75}
76
77pub type DistributionValue = SmallVec<[DistributionType; 3]>;
109
110#[doc(hidden)]
111pub use smallvec::smallvec as _smallvec;
112
113#[macro_export]
123macro_rules! dist {
124 ($($x:expr),*$(,)*) => {
125 $crate::_smallvec!($($crate::DistributionType::from($x)),*) as $crate::DistributionValue
126 };
127}
128
129pub type SetValue = BTreeSet<SetType>;
137
138#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
140#[serde(tag = "type", content = "value")]
141pub enum BucketValue {
142 #[serde(rename = "c")]
167 Counter(CounterType),
168
169 #[serde(rename = "d")]
197 Distribution(DistributionValue),
198
199 #[serde(rename = "s")]
227 Set(SetValue),
228
229 #[serde(rename = "g")]
261 Gauge(GaugeValue),
262}
263
264impl BucketValue {
265 pub fn counter(value: CounterType) -> Self {
267 Self::Counter(value)
268 }
269
270 pub fn distribution(value: DistributionType) -> Self {
272 Self::Distribution(dist![value])
273 }
274
275 pub fn set(value: SetType) -> Self {
277 Self::Set(std::iter::once(value).collect())
278 }
279
280 pub fn set_from_str(string: &str) -> Self {
282 Self::set(hash_set_value(string))
283 }
284
285 pub fn set_from_display(display: impl fmt::Display) -> Self {
287 Self::set(hash_set_value(&display.to_string()))
288 }
289
290 pub fn gauge(value: GaugeType) -> Self {
292 Self::Gauge(GaugeValue::single(value))
293 }
294
295 pub fn ty(&self) -> MetricType {
297 match self {
298 Self::Counter(_) => MetricType::Counter,
299 Self::Distribution(_) => MetricType::Distribution,
300 Self::Set(_) => MetricType::Set,
301 Self::Gauge(_) => MetricType::Gauge,
302 }
303 }
304
305 pub fn len(&self) -> usize {
307 match self {
308 BucketValue::Counter(_) => 1,
309 BucketValue::Distribution(distribution) => distribution.len(),
310 BucketValue::Set(set) => set.len(),
311 BucketValue::Gauge(_) => 5,
312 }
313 }
314
315 pub fn is_empty(&self) -> bool {
317 self.len() == 0
318 }
319
320 pub fn cost(&self) -> usize {
325 let allocated_cost = match self {
328 Self::Counter(_) => 0,
329 Self::Set(s) => mem::size_of::<SetType>() * s.len(),
330 Self::Gauge(_) => 0,
331 Self::Distribution(d) => d.len() * mem::size_of::<DistributionType>(),
332 };
333
334 mem::size_of::<Self>() + allocated_cost
335 }
336
337 pub fn merge(&mut self, other: Self) -> Result<(), Self> {
342 match (self, other) {
343 (Self::Counter(slf), Self::Counter(other)) => *slf = slf.saturating_add(other),
344 (Self::Distribution(slf), Self::Distribution(other)) => slf.extend_from_slice(&other),
345 (Self::Set(slf), Self::Set(other)) => slf.extend(other),
346 (Self::Gauge(slf), Self::Gauge(other)) => slf.merge(other),
347 (_, other) => return Err(other),
348 }
349
350 Ok(())
351 }
352}
353
354fn parse_counter(string: &str) -> Option<CounterType> {
356 let mut sum = CounterType::default();
357 for component in string.split(VALUE_SEPARATOR) {
358 sum = sum.saturating_add(component.parse().ok()?);
359 }
360 Some(sum)
361}
362
363fn parse_distribution(string: &str) -> Option<DistributionValue> {
365 let mut dist = DistributionValue::default();
366 for component in string.split(VALUE_SEPARATOR) {
367 dist.push(component.parse().ok()?);
368 }
369 Some(dist)
370}
371
372fn parse_set(string: &str) -> Option<SetValue> {
374 let mut set = SetValue::default();
375 for component in string.split(VALUE_SEPARATOR) {
376 let hash = component
377 .parse()
378 .unwrap_or_else(|_| protocol::hash_set_value(component));
379 set.insert(hash);
380 }
381 Some(set)
382}
383
384fn parse_gauge(string: &str) -> Option<GaugeValue> {
389 let mut components = string.split(VALUE_SEPARATOR);
390
391 let last = components.next()?.parse().ok()?;
392 Some(if let Some(min) = components.next() {
393 GaugeValue {
394 last,
395 min: min.parse().ok()?,
396 max: components.next()?.parse().ok()?,
397 sum: components.next()?.parse().ok()?,
398 count: components.next()?.parse().ok()?,
399 }
400 } else {
401 GaugeValue::single(last)
402 })
403}
404
405fn parse_tags(string: &str) -> Option<MetricTags> {
409 let mut map = MetricTags::new();
410
411 for pair in string.split(',') {
412 let mut name_value = pair.splitn(2, ':');
413
414 let name = name_value.next()?;
415 if !protocol::is_valid_tag_key(name) {
416 continue;
417 }
418
419 if let Ok(value) = protocol::unescape_tag_value(name_value.next().unwrap_or_default()) {
420 map.insert(name.to_owned(), value);
421 }
422 }
423
424 Some(map)
425}
426
427fn parse_timestamp(string: &str) -> Option<UnixTimestamp> {
429 string.parse().ok().map(UnixTimestamp::from_secs)
430}
431
432#[doc = include_str!("../tests/fixtures/buckets.statsd.txt")]
462#[doc = include_str!("../tests/fixtures/buckets.json")]
476#[doc = include_str!("../tests/fixtures/set.statsd.txt")]
490#[doc = include_str!("../tests/fixtures/set.json")]
496#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
498pub struct Bucket {
499 pub timestamp: UnixTimestamp,
519
520 pub width: u64,
530
531 pub name: MetricName,
557
558 #[serde(flatten)]
583 pub value: BucketValue,
584
585 #[serde(default, skip_serializing_if = "MetricTags::is_empty")]
614 pub tags: MetricTags,
615
616 #[serde(default, skip_serializing_if = "BucketMetadata::is_default")]
621 pub metadata: BucketMetadata,
622}
623
624impl Bucket {
625 fn parse_str(string: &str, timestamp: UnixTimestamp) -> Option<Self> {
631 let mut components = string.split('|');
632
633 let (mri_str, values_str) = components.next()?.split_once(':')?;
634 let ty = components.next().and_then(|s| s.parse().ok())?;
635
636 let mri = MetricResourceIdentifier::parse_with_type(mri_str, ty).ok()?;
637 let value = match ty {
638 MetricType::Counter => BucketValue::Counter(parse_counter(values_str)?),
639 MetricType::Distribution => BucketValue::Distribution(parse_distribution(values_str)?),
640 MetricType::Set => BucketValue::Set(parse_set(values_str)?),
641 MetricType::Gauge => BucketValue::Gauge(parse_gauge(values_str)?),
642 };
643
644 let mut bucket = Bucket {
645 timestamp,
646 width: 0,
647 name: mri.to_string().into(),
648 value,
649 tags: Default::default(),
650 metadata: Default::default(),
651 };
652
653 for component in components {
654 match component.chars().next() {
655 Some('#') => {
656 bucket.tags = parse_tags(component.get(1..)?)?;
657 }
658 Some('T') => {
659 bucket.timestamp = parse_timestamp(component.get(1..)?)?;
660 }
661 _ => (),
662 }
663 }
664
665 Some(bucket)
666 }
667
668 pub fn parse(slice: &[u8], timestamp: UnixTimestamp) -> Result<Self, ParseMetricError> {
681 let string = std::str::from_utf8(slice).map_err(|_| ParseMetricError)?;
682 Self::parse_str(string, timestamp).ok_or(ParseMetricError)
683 }
684
685 pub fn parse_all(slice: &[u8], timestamp: UnixTimestamp) -> ParseBuckets<'_> {
710 ParseBuckets { slice, timestamp }
711 }
712
713 pub fn tag(&self, name: &str) -> Option<&str> {
715 self.tags.get(name).map(|s| s.as_str())
716 }
717
718 pub fn remove_tag(&mut self, name: &str) -> Option<String> {
722 self.tags.remove(name)
723 }
724}
725
726impl CardinalityItem for Bucket {
727 fn namespace(&self) -> Option<MetricNamespace> {
728 self.name.try_namespace()
729 }
730
731 fn name(&self) -> &MetricName {
732 &self.name
733 }
734
735 fn to_hash(&self) -> u32 {
736 let mut hasher = FnvHasher::default();
737 self.name.hash(&mut hasher);
738 self.tags.hash(&mut hasher);
739 hasher.finish32()
740 }
741}
742
743#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
745pub struct BucketMetadata {
746 pub merges: u32,
760
761 pub received_at: Option<UnixTimestamp>,
766
767 #[serde(skip)]
776 pub extracted_from_indexed: bool,
777}
778
779impl BucketMetadata {
780 pub fn new(received_at: UnixTimestamp) -> Self {
784 Self {
785 merges: 1,
786 received_at: Some(received_at),
787 extracted_from_indexed: false,
788 }
789 }
790
791 pub fn is_default(&self) -> bool {
793 &Self::default() == self
794 }
795
796 pub fn merge(&mut self, other: Self) {
798 self.merges = self.merges.saturating_add(other.merges);
799 self.received_at = match (self.received_at, other.received_at) {
800 (Some(received_at), None) => Some(received_at),
801 (None, Some(received_at)) => Some(received_at),
802 (left, right) => left.min(right),
803 };
804 }
805}
806
807impl Default for BucketMetadata {
808 fn default() -> Self {
809 Self {
810 merges: 1,
811 received_at: None,
812 extracted_from_indexed: false,
813 }
814 }
815}
816
817#[derive(Clone, Debug)]
819pub struct ParseBuckets<'a> {
820 slice: &'a [u8],
821 timestamp: UnixTimestamp,
822}
823
824impl Default for ParseBuckets<'_> {
825 fn default() -> Self {
826 Self {
827 slice: &[],
828 timestamp: UnixTimestamp::from_secs(4711),
830 }
831 }
832}
833
834impl Iterator for ParseBuckets<'_> {
835 type Item = Result<Bucket, ParseMetricError>;
836
837 fn next(&mut self) -> Option<Self::Item> {
838 loop {
839 if self.slice.is_empty() {
840 return None;
841 }
842
843 let mut split = self.slice.splitn(2, |&b| b == b'\n');
844 let current = split.next()?;
845 self.slice = split.next().unwrap_or_default();
846
847 let string = match std::str::from_utf8(current) {
848 Ok(string) => string.strip_suffix('\r').unwrap_or(string),
849 Err(_) => return Some(Err(ParseMetricError)),
850 };
851
852 if !string.is_empty() {
853 return Some(Bucket::parse_str(string, self.timestamp).ok_or(ParseMetricError));
854 }
855 }
856 }
857}
858
859impl FusedIterator for ParseBuckets<'_> {}
860
861#[cfg(test)]
862mod tests {
863 use similar_asserts::assert_eq;
864
865 use crate::protocol::{DurationUnit, MetricUnit};
866
867 use super::*;
868
869 #[test]
870 fn test_distribution_value_size() {
871 assert!(
876 std::mem::size_of::<DistributionValue>() <= std::mem::size_of::<GaugeValue>(),
877 "distribution value should not exceed gauge {}",
878 std::mem::size_of::<DistributionValue>()
879 );
880 }
881
882 #[test]
883 fn test_bucket_value_merge_counter() {
884 let mut value = BucketValue::Counter(42.into());
885 value.merge(BucketValue::Counter(43.into())).unwrap();
886 assert_eq!(value, BucketValue::Counter(85.into()));
887 }
888
889 #[test]
890 fn test_bucket_value_merge_distribution() {
891 let mut value = BucketValue::Distribution(dist![1, 2, 3]);
892 value.merge(BucketValue::Distribution(dist![2, 4])).unwrap();
893 assert_eq!(value, BucketValue::Distribution(dist![1, 2, 3, 2, 4]));
894 }
895
896 #[test]
897 fn test_bucket_value_merge_set() {
898 let mut value = BucketValue::Set(vec![1, 2].into_iter().collect());
899 value.merge(BucketValue::Set([2, 3].into())).unwrap();
900 assert_eq!(value, BucketValue::Set(vec![1, 2, 3].into_iter().collect()));
901 }
902
903 #[test]
904 fn test_bucket_value_merge_gauge() {
905 let mut value = BucketValue::Gauge(GaugeValue::single(42.into()));
906 value.merge(BucketValue::gauge(43.into())).unwrap();
907
908 assert_eq!(
909 value,
910 BucketValue::Gauge(GaugeValue {
911 last: 43.into(),
912 min: 42.into(),
913 max: 43.into(),
914 sum: 85.into(),
915 count: 2,
916 })
917 );
918 }
919
920 #[test]
921 fn test_parse_garbage() {
922 let s = "x23-408j17z4232@#34d\nc3456y7^😎";
923 let timestamp = UnixTimestamp::from_secs(4711);
924 let result = Bucket::parse(s.as_bytes(), timestamp);
925 assert!(result.is_err());
926 }
927
928 #[test]
929 fn test_parse_counter() {
930 let s = "transactions/foo:42|c";
931 let timestamp = UnixTimestamp::from_secs(4711);
932 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
933 insta::assert_debug_snapshot!(metric, @r###"
934 Bucket {
935 timestamp: UnixTimestamp(4711),
936 width: 0,
937 name: MetricName(
938 "c:transactions/foo@none",
939 ),
940 value: Counter(
941 42.0,
942 ),
943 tags: {},
944 metadata: BucketMetadata {
945 merges: 1,
946 received_at: None,
947 extracted_from_indexed: false,
948 },
949 }
950 "###);
951 }
952
953 #[test]
954 fn test_parse_counter_packed() {
955 let s = "transactions/foo:42:17:21|c";
956 let timestamp = UnixTimestamp::from_secs(4711);
957 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
958 assert_eq!(metric.value, BucketValue::Counter(80.into()));
959 }
960
961 #[test]
962 fn test_parse_distribution() {
963 let s = "transactions/foo:17.5|d";
964 let timestamp = UnixTimestamp::from_secs(4711);
965 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
966 insta::assert_debug_snapshot!(metric, @r###"
967 Bucket {
968 timestamp: UnixTimestamp(4711),
969 width: 0,
970 name: MetricName(
971 "d:transactions/foo@none",
972 ),
973 value: Distribution(
974 [
975 17.5,
976 ],
977 ),
978 tags: {},
979 metadata: BucketMetadata {
980 merges: 1,
981 received_at: None,
982 extracted_from_indexed: false,
983 },
984 }
985 "###);
986 }
987
988 #[test]
989 fn test_parse_distribution_packed() {
990 let s = "transactions/foo:17.5:21.9:42.7|d";
991 let timestamp = UnixTimestamp::from_secs(4711);
992 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
993 assert_eq!(
994 metric.value,
995 BucketValue::Distribution(dist![
996 FiniteF64::new(17.5).unwrap(),
997 FiniteF64::new(21.9).unwrap(),
998 FiniteF64::new(42.7).unwrap()
999 ])
1000 );
1001 }
1002
1003 #[test]
1004 fn test_parse_histogram() {
1005 let s = "transactions/foo:17.5|h"; let timestamp = UnixTimestamp::from_secs(4711);
1007 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1008 assert_eq!(
1009 metric.value,
1010 BucketValue::Distribution(dist![FiniteF64::new(17.5).unwrap()])
1011 );
1012 }
1013
1014 #[test]
1015 fn test_parse_set() {
1016 let s = "transactions/foo:4267882815|s";
1017 let timestamp = UnixTimestamp::from_secs(4711);
1018 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1019 insta::assert_debug_snapshot!(metric, @r###"
1020 Bucket {
1021 timestamp: UnixTimestamp(4711),
1022 width: 0,
1023 name: MetricName(
1024 "s:transactions/foo@none",
1025 ),
1026 value: Set(
1027 {
1028 4267882815,
1029 },
1030 ),
1031 tags: {},
1032 metadata: BucketMetadata {
1033 merges: 1,
1034 received_at: None,
1035 extracted_from_indexed: false,
1036 },
1037 }
1038 "###);
1039 }
1040
1041 #[test]
1042 fn test_parse_set_hashed() {
1043 let s = "transactions/foo:e2546e4c-ecd0-43ad-ae27-87960e57a658|s";
1044 let timestamp = UnixTimestamp::from_secs(4711);
1045 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1046 assert_eq!(metric.value, BucketValue::Set([4267882815].into()));
1047 }
1048
1049 #[test]
1050 fn test_parse_set_hashed_packed() {
1051 let s = "transactions/foo:e2546e4c-ecd0-43ad-ae27-87960e57a658:00449b66-d91f-4fb8-b324-4c8bdf2499f6|s";
1052 let timestamp = UnixTimestamp::from_secs(4711);
1053 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1054 assert_eq!(
1055 metric.value,
1056 BucketValue::Set([181348692, 4267882815].into())
1057 );
1058 }
1059
1060 #[test]
1061 fn test_parse_set_packed() {
1062 let s = "transactions/foo:3182887624:4267882815|s";
1063 let timestamp = UnixTimestamp::from_secs(4711);
1064 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1065 assert_eq!(
1066 metric.value,
1067 BucketValue::Set([3182887624, 4267882815].into())
1068 )
1069 }
1070
1071 #[test]
1072 fn test_parse_gauge() {
1073 let s = "transactions/foo:42|g";
1074 let timestamp = UnixTimestamp::from_secs(4711);
1075 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1076 insta::assert_debug_snapshot!(metric, @r###"
1077 Bucket {
1078 timestamp: UnixTimestamp(4711),
1079 width: 0,
1080 name: MetricName(
1081 "g:transactions/foo@none",
1082 ),
1083 value: Gauge(
1084 GaugeValue {
1085 last: 42.0,
1086 min: 42.0,
1087 max: 42.0,
1088 sum: 42.0,
1089 count: 1,
1090 },
1091 ),
1092 tags: {},
1093 metadata: BucketMetadata {
1094 merges: 1,
1095 received_at: None,
1096 extracted_from_indexed: false,
1097 },
1098 }
1099 "###);
1100 }
1101
1102 #[test]
1103 fn test_parse_gauge_packed() {
1104 let s = "transactions/foo:25:17:42:220:85|g";
1105 let timestamp = UnixTimestamp::from_secs(4711);
1106 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1107 insta::assert_debug_snapshot!(metric, @r###"
1108 Bucket {
1109 timestamp: UnixTimestamp(4711),
1110 width: 0,
1111 name: MetricName(
1112 "g:transactions/foo@none",
1113 ),
1114 value: Gauge(
1115 GaugeValue {
1116 last: 25.0,
1117 min: 17.0,
1118 max: 42.0,
1119 sum: 220.0,
1120 count: 85,
1121 },
1122 ),
1123 tags: {},
1124 metadata: BucketMetadata {
1125 merges: 1,
1126 received_at: None,
1127 extracted_from_indexed: false,
1128 },
1129 }
1130 "###);
1131 }
1132
1133 #[test]
1134 fn test_parse_implicit_namespace() {
1135 let s = "foo:42|c";
1136 let timestamp = UnixTimestamp::from_secs(4711);
1137 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1138 insta::assert_debug_snapshot!(metric, @r###"
1139 Bucket {
1140 timestamp: UnixTimestamp(4711),
1141 width: 0,
1142 name: MetricName(
1143 "c:custom/foo@none",
1144 ),
1145 value: Counter(
1146 42.0,
1147 ),
1148 tags: {},
1149 metadata: BucketMetadata {
1150 merges: 1,
1151 received_at: None,
1152 extracted_from_indexed: false,
1153 },
1154 }
1155 "###);
1156 }
1157
1158 #[test]
1159 fn test_parse_unit() {
1160 let s = "transactions/foo@second:17.5|d";
1161 let timestamp = UnixTimestamp::from_secs(4711);
1162 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1163 let mri = MetricResourceIdentifier::parse(&metric.name).unwrap();
1164 assert_eq!(mri.unit, MetricUnit::Duration(DurationUnit::Second));
1165 }
1166
1167 #[test]
1168 fn test_parse_unit_regression() {
1169 let s = "transactions/foo@s:17.5|d";
1170 let timestamp = UnixTimestamp::from_secs(4711);
1171 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1172 let mri = MetricResourceIdentifier::parse(&metric.name).unwrap();
1173 assert_eq!(mri.unit, MetricUnit::Duration(DurationUnit::Second));
1174 }
1175
1176 #[test]
1177 fn test_parse_tags() {
1178 let s = "transactions/foo:17.5|d|#foo,bar:baz";
1179 let timestamp = UnixTimestamp::from_secs(4711);
1180 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1181 insta::assert_debug_snapshot!(metric.tags, @r#"
1182 {
1183 "bar": "baz",
1184 "foo": "",
1185 }
1186 "#);
1187 }
1188
1189 #[test]
1190 fn test_parse_tags_escaped() {
1191 let s = "transactions/foo:17.5|d|#foo:😅\\u{2c}🚀";
1192 let timestamp = UnixTimestamp::from_secs(4711);
1193 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1194 insta::assert_debug_snapshot!(metric.tags, @r#"
1195 {
1196 "foo": "😅,🚀",
1197 }
1198 "#);
1199 }
1200
1201 #[test]
1202 fn test_parse_timestamp() {
1203 let s = "transactions/foo:17.5|d|T1615889449";
1204 let timestamp = UnixTimestamp::from_secs(4711);
1205 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1206 assert_eq!(metric.timestamp, UnixTimestamp::from_secs(1615889449));
1207 }
1208
1209 #[test]
1210 fn test_parse_sample_rate() {
1211 let s = "transactions/foo:17.5|d|@0.1";
1213 let timestamp = UnixTimestamp::from_secs(4711);
1214 Bucket::parse(s.as_bytes(), timestamp).unwrap();
1215 }
1216
1217 #[test]
1218 fn test_parse_invalid_name() {
1219 let s = "foo#bar:42|c";
1220 let timestamp = UnixTimestamp::from_secs(4711);
1221 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1222 assert_eq!(metric.name.as_ref(), "c:custom/foo_bar@none");
1223 }
1224
1225 #[test]
1226 fn test_parse_empty_name() {
1227 let s = ":42|c";
1228 let timestamp = UnixTimestamp::from_secs(4711);
1229 let metric = Bucket::parse(s.as_bytes(), timestamp);
1230 assert!(metric.is_err());
1231 }
1232
1233 #[test]
1234 fn test_parse_invalid_name_with_leading_digit() {
1235 let s = "64bit:42|c";
1236 let timestamp = UnixTimestamp::from_secs(4711);
1237 let metric = Bucket::parse(s.as_bytes(), timestamp);
1238 assert!(metric.is_err());
1239 }
1240
1241 #[test]
1242 fn test_parse_all() {
1243 let s = "transactions/foo:42|c\nbar:17|c";
1244 let timestamp = UnixTimestamp::from_secs(4711);
1245
1246 let metrics: Vec<Bucket> = Bucket::parse_all(s.as_bytes(), timestamp)
1247 .collect::<Result<_, _>>()
1248 .unwrap();
1249
1250 assert_eq!(metrics.len(), 2);
1251 }
1252
1253 #[test]
1254 fn test_parse_all_crlf() {
1255 let s = "transactions/foo:42|c\r\nbar:17|c";
1256 let timestamp = UnixTimestamp::from_secs(4711);
1257
1258 let metrics: Vec<Bucket> = Bucket::parse_all(s.as_bytes(), timestamp)
1259 .collect::<Result<_, _>>()
1260 .unwrap();
1261
1262 assert_eq!(metrics.len(), 2);
1263 }
1264
1265 #[test]
1266 fn test_parse_all_empty_lines() {
1267 let s = "transactions/foo:42|c\n\n\nbar:17|c";
1268 let timestamp = UnixTimestamp::from_secs(4711);
1269
1270 let metric_count = Bucket::parse_all(s.as_bytes(), timestamp).count();
1271 assert_eq!(metric_count, 2);
1272 }
1273
1274 #[test]
1275 fn test_parse_all_trailing() {
1276 let s = "transactions/foo:42|c\nbar:17|c\n";
1277 let timestamp = UnixTimestamp::from_secs(4711);
1278
1279 let metric_count = Bucket::parse_all(s.as_bytes(), timestamp).count();
1280 assert_eq!(metric_count, 2);
1281 }
1282
1283 #[test]
1284 fn test_metrics_docs() {
1285 let text = include_str!("../tests/fixtures/buckets.statsd.txt").trim_end();
1286 let json = include_str!("../tests/fixtures/buckets.json").trim_end();
1287
1288 let timestamp = UnixTimestamp::from_secs(0);
1289 let statsd_metrics = Bucket::parse_all(text.as_bytes(), timestamp)
1290 .collect::<Result<Vec<_>, _>>()
1291 .unwrap();
1292
1293 let json_metrics: Vec<Bucket> = serde_json::from_str(json).unwrap();
1294
1295 assert_eq!(statsd_metrics, json_metrics);
1296 }
1297
1298 #[test]
1299 fn test_set_docs() {
1300 let text = include_str!("../tests/fixtures/set.statsd.txt").trim_end();
1301 let json = include_str!("../tests/fixtures/set.json").trim_end();
1302
1303 let timestamp = UnixTimestamp::from_secs(1615889449);
1304 let statsd_metric = Bucket::parse(text.as_bytes(), timestamp).unwrap();
1305 let json_metric: Bucket = serde_json::from_str(json).unwrap();
1306
1307 assert_eq!(statsd_metric, json_metric);
1308 }
1309
1310 #[test]
1311 fn test_parse_buckets() {
1312 let json = r#"[
1313 {
1314 "name": "endpoint.response_time",
1315 "unit": "millisecond",
1316 "value": [36, 49, 57, 68],
1317 "type": "d",
1318 "timestamp": 1615889440,
1319 "width": 10,
1320 "tags": {
1321 "route": "user_index"
1322 },
1323 "metadata": {
1324 "merges": 1,
1325 "received_at": 1615889440
1326 }
1327 }
1328 ]"#;
1329
1330 let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1331
1332 insta::assert_debug_snapshot!(buckets, @r###"
1333 [
1334 Bucket {
1335 timestamp: UnixTimestamp(1615889440),
1336 width: 10,
1337 name: MetricName(
1338 "endpoint.response_time",
1339 ),
1340 value: Distribution(
1341 [
1342 36.0,
1343 49.0,
1344 57.0,
1345 68.0,
1346 ],
1347 ),
1348 tags: {
1349 "route": "user_index",
1350 },
1351 metadata: BucketMetadata {
1352 merges: 1,
1353 received_at: Some(
1354 UnixTimestamp(1615889440),
1355 ),
1356 extracted_from_indexed: false,
1357 },
1358 },
1359 ]
1360 "###);
1361 }
1362
1363 #[test]
1364 fn test_parse_bucket_defaults() {
1365 let json = r#"[
1366 {
1367 "name": "endpoint.hits",
1368 "value": 4,
1369 "type": "c",
1370 "timestamp": 1615889440,
1371 "width": 10,
1372 "metadata": {
1373 "merges": 1,
1374 "received_at": 1615889440
1375 }
1376 }
1377 ]"#;
1378
1379 let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1380
1381 insta::assert_debug_snapshot!(buckets, @r###"
1382 [
1383 Bucket {
1384 timestamp: UnixTimestamp(1615889440),
1385 width: 10,
1386 name: MetricName(
1387 "endpoint.hits",
1388 ),
1389 value: Counter(
1390 4.0,
1391 ),
1392 tags: {},
1393 metadata: BucketMetadata {
1394 merges: 1,
1395 received_at: Some(
1396 UnixTimestamp(1615889440),
1397 ),
1398 extracted_from_indexed: false,
1399 },
1400 },
1401 ]
1402 "###);
1403 }
1404
1405 #[test]
1406 fn test_buckets_roundtrip() {
1407 let json = r#"[
1408 {
1409 "timestamp": 1615889440,
1410 "width": 10,
1411 "name": "endpoint.response_time",
1412 "type": "d",
1413 "value": [
1414 36.0,
1415 49.0,
1416 57.0,
1417 68.0
1418 ],
1419 "tags": {
1420 "route": "user_index"
1421 }
1422 },
1423 {
1424 "timestamp": 1615889440,
1425 "width": 10,
1426 "name": "endpoint.hits",
1427 "type": "c",
1428 "value": 4.0,
1429 "tags": {
1430 "route": "user_index"
1431 }
1432 },
1433 {
1434 "timestamp": 1615889440,
1435 "width": 10,
1436 "name": "endpoint.parallel_requests",
1437 "type": "g",
1438 "value": {
1439 "last": 25.0,
1440 "min": 17.0,
1441 "max": 42.0,
1442 "sum": 2210.0,
1443 "count": 85
1444 }
1445 },
1446 {
1447 "timestamp": 1615889440,
1448 "width": 10,
1449 "name": "endpoint.users",
1450 "type": "s",
1451 "value": [
1452 3182887624,
1453 4267882815
1454 ],
1455 "tags": {
1456 "route": "user_index"
1457 }
1458 }
1459]"#;
1460
1461 let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1462 let serialized = serde_json::to_string_pretty(&buckets).unwrap();
1463 assert_eq!(json, serialized);
1464 }
1465
1466 #[test]
1467 fn test_bucket_docs_roundtrip() {
1468 let json = include_str!("../tests/fixtures/buckets.json")
1469 .trim_end()
1470 .replace("\r\n", "\n");
1471 let buckets = serde_json::from_str::<Vec<Bucket>>(&json).unwrap();
1472
1473 let serialized = serde_json::to_string_pretty(&buckets).unwrap();
1474 assert_eq!(json, serialized);
1475 }
1476
1477 #[test]
1478 fn test_bucket_metadata_merge() {
1479 let mut metadata = BucketMetadata::default();
1480
1481 let other_metadata = BucketMetadata::default();
1482 metadata.merge(other_metadata);
1483 assert_eq!(
1484 metadata,
1485 BucketMetadata {
1486 merges: 2,
1487 received_at: None,
1488 extracted_from_indexed: false,
1489 }
1490 );
1491
1492 let other_metadata = BucketMetadata::new(UnixTimestamp::from_secs(10));
1493 metadata.merge(other_metadata);
1494 assert_eq!(
1495 metadata,
1496 BucketMetadata {
1497 merges: 3,
1498 received_at: Some(UnixTimestamp::from_secs(10)),
1499 extracted_from_indexed: false,
1500 }
1501 );
1502
1503 let other_metadata = BucketMetadata::new(UnixTimestamp::from_secs(20));
1504 metadata.merge(other_metadata);
1505 assert_eq!(
1506 metadata,
1507 BucketMetadata {
1508 merges: 4,
1509 received_at: Some(UnixTimestamp::from_secs(10)),
1510 extracted_from_indexed: false,
1511 }
1512 );
1513 }
1514}