1use std::collections::{BTreeMap, BTreeSet};
2use std::hash::Hash;
3use std::iter::FusedIterator;
4use std::{fmt, mem};
5
6use hash32::{FnvHasher, Hasher as _};
7use relay_cardinality::CardinalityItem;
8use relay_common::time::UnixTimestamp;
9use serde::{Deserialize, Serialize};
10use smallvec::SmallVec;
11
12use crate::protocol::{
13 self, hash_set_value, CounterType, DistributionType, GaugeType, MetricName,
14 MetricResourceIdentifier, MetricType, SetType,
15};
16use crate::{FiniteF64, MetricNamespace, ParseMetricError};
17
18const VALUE_SEPARATOR: char = ':';
19
20pub type MetricTags = BTreeMap<String, String>;
22
23#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
25pub struct GaugeValue {
26 pub last: GaugeType,
30 pub min: GaugeType,
32 pub max: GaugeType,
34 pub sum: GaugeType,
36 pub count: u64,
38}
39
40impl GaugeValue {
41 pub fn single(value: GaugeType) -> Self {
43 Self {
44 last: value,
45 min: value,
46 max: value,
47 sum: value,
48 count: 1,
49 }
50 }
51
52 pub fn insert(&mut self, value: GaugeType) {
54 self.last = value;
55 self.min = self.min.min(value);
56 self.max = self.max.max(value);
57 self.sum = self.sum.saturating_add(value);
58 self.count += 1;
59 }
60
61 pub fn merge(&mut self, other: Self) {
63 self.last = other.last;
64 self.min = self.min.min(other.min);
65 self.max = self.max.max(other.max);
66 self.sum = self.sum.saturating_add(other.sum);
67 self.count += other.count;
68 }
69
70 pub fn avg(&self) -> Option<GaugeType> {
72 self.sum / FiniteF64::new(self.count as f64)?
73 }
74}
75
76pub type DistributionValue = SmallVec<[DistributionType; 3]>;
108
109#[doc(hidden)]
110pub use smallvec::smallvec as _smallvec;
111
112#[macro_export]
122macro_rules! dist {
123 ($($x:expr),*$(,)*) => {
124 $crate::_smallvec!($($crate::DistributionType::from($x)),*) as $crate::DistributionValue
125 };
126}
127
128pub type SetValue = BTreeSet<SetType>;
136
137#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
139#[serde(tag = "type", content = "value")]
140pub enum BucketValue {
141 #[serde(rename = "c")]
166 Counter(CounterType),
167
168 #[serde(rename = "d")]
196 Distribution(DistributionValue),
197
198 #[serde(rename = "s")]
226 Set(SetValue),
227
228 #[serde(rename = "g")]
260 Gauge(GaugeValue),
261}
262
263impl BucketValue {
264 pub fn counter(value: CounterType) -> Self {
266 Self::Counter(value)
267 }
268
269 pub fn distribution(value: DistributionType) -> Self {
271 Self::Distribution(dist![value])
272 }
273
274 pub fn set(value: SetType) -> Self {
276 Self::Set(std::iter::once(value).collect())
277 }
278
279 pub fn set_from_str(string: &str) -> Self {
281 Self::set(hash_set_value(string))
282 }
283
284 pub fn set_from_display(display: impl fmt::Display) -> Self {
286 Self::set(hash_set_value(&display.to_string()))
287 }
288
289 pub fn gauge(value: GaugeType) -> Self {
291 Self::Gauge(GaugeValue::single(value))
292 }
293
294 pub fn ty(&self) -> MetricType {
296 match self {
297 Self::Counter(_) => MetricType::Counter,
298 Self::Distribution(_) => MetricType::Distribution,
299 Self::Set(_) => MetricType::Set,
300 Self::Gauge(_) => MetricType::Gauge,
301 }
302 }
303
304 pub fn len(&self) -> usize {
306 match self {
307 BucketValue::Counter(_) => 1,
308 BucketValue::Distribution(distribution) => distribution.len(),
309 BucketValue::Set(set) => set.len(),
310 BucketValue::Gauge(_) => 5,
311 }
312 }
313
314 pub fn is_empty(&self) -> bool {
316 self.len() == 0
317 }
318
319 pub fn cost(&self) -> usize {
324 let allocated_cost = match self {
327 Self::Counter(_) => 0,
328 Self::Set(s) => mem::size_of::<SetType>() * s.len(),
329 Self::Gauge(_) => 0,
330 Self::Distribution(d) => d.len() * mem::size_of::<DistributionType>(),
331 };
332
333 mem::size_of::<Self>() + allocated_cost
334 }
335
336 pub fn merge(&mut self, other: Self) -> Result<(), Self> {
341 match (self, other) {
342 (Self::Counter(slf), Self::Counter(other)) => *slf = slf.saturating_add(other),
343 (Self::Distribution(slf), Self::Distribution(other)) => slf.extend_from_slice(&other),
344 (Self::Set(slf), Self::Set(other)) => slf.extend(other),
345 (Self::Gauge(slf), Self::Gauge(other)) => slf.merge(other),
346 (_, other) => return Err(other),
347 }
348
349 Ok(())
350 }
351}
352
353fn parse_counter(string: &str) -> Option<CounterType> {
355 let mut sum = CounterType::default();
356 for component in string.split(VALUE_SEPARATOR) {
357 sum = sum.saturating_add(component.parse().ok()?);
358 }
359 Some(sum)
360}
361
362fn parse_distribution(string: &str) -> Option<DistributionValue> {
364 let mut dist = DistributionValue::default();
365 for component in string.split(VALUE_SEPARATOR) {
366 dist.push(component.parse().ok()?);
367 }
368 Some(dist)
369}
370
371fn parse_set(string: &str) -> Option<SetValue> {
373 let mut set = SetValue::default();
374 for component in string.split(VALUE_SEPARATOR) {
375 let hash = component
376 .parse()
377 .unwrap_or_else(|_| protocol::hash_set_value(component));
378 set.insert(hash);
379 }
380 Some(set)
381}
382
383fn parse_gauge(string: &str) -> Option<GaugeValue> {
388 let mut components = string.split(VALUE_SEPARATOR);
389
390 let last = components.next()?.parse().ok()?;
391 Some(if let Some(min) = components.next() {
392 GaugeValue {
393 last,
394 min: min.parse().ok()?,
395 max: components.next()?.parse().ok()?,
396 sum: components.next()?.parse().ok()?,
397 count: components.next()?.parse().ok()?,
398 }
399 } else {
400 GaugeValue::single(last)
401 })
402}
403
404fn parse_tags(string: &str) -> Option<MetricTags> {
408 let mut map = MetricTags::new();
409
410 for pair in string.split(',') {
411 let mut name_value = pair.splitn(2, ':');
412
413 let name = name_value.next()?;
414 if !protocol::is_valid_tag_key(name) {
415 continue;
416 }
417
418 if let Ok(value) = protocol::unescape_tag_value(name_value.next().unwrap_or_default()) {
419 map.insert(name.to_owned(), value);
420 }
421 }
422
423 Some(map)
424}
425
426fn parse_timestamp(string: &str) -> Option<UnixTimestamp> {
428 string.parse().ok().map(UnixTimestamp::from_secs)
429}
430
431#[doc = include_str!("../tests/fixtures/buckets.statsd.txt")]
461#[doc = include_str!("../tests/fixtures/buckets.json")]
475#[doc = include_str!("../tests/fixtures/set.statsd.txt")]
489#[doc = include_str!("../tests/fixtures/set.json")]
495#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
497pub struct Bucket {
498 pub timestamp: UnixTimestamp,
518
519 pub width: u64,
529
530 pub name: MetricName,
556
557 #[serde(flatten)]
582 pub value: BucketValue,
583
584 #[serde(default, skip_serializing_if = "MetricTags::is_empty")]
613 pub tags: MetricTags,
614
615 #[serde(default, skip_serializing_if = "BucketMetadata::is_default")]
620 pub metadata: BucketMetadata,
621}
622
623impl Bucket {
624 fn parse_str(string: &str, timestamp: UnixTimestamp) -> Option<Self> {
630 let mut components = string.split('|');
631
632 let (mri_str, values_str) = components.next()?.split_once(':')?;
633 let ty = components.next().and_then(|s| s.parse().ok())?;
634
635 let mri = MetricResourceIdentifier::parse_with_type(mri_str, ty).ok()?;
636 let value = match ty {
637 MetricType::Counter => BucketValue::Counter(parse_counter(values_str)?),
638 MetricType::Distribution => BucketValue::Distribution(parse_distribution(values_str)?),
639 MetricType::Set => BucketValue::Set(parse_set(values_str)?),
640 MetricType::Gauge => BucketValue::Gauge(parse_gauge(values_str)?),
641 };
642
643 let mut bucket = Bucket {
644 timestamp,
645 width: 0,
646 name: mri.to_string().into(),
647 value,
648 tags: Default::default(),
649 metadata: Default::default(),
650 };
651
652 for component in components {
653 match component.chars().next() {
654 Some('#') => {
655 bucket.tags = parse_tags(component.get(1..)?)?;
656 }
657 Some('T') => {
658 bucket.timestamp = parse_timestamp(component.get(1..)?)?;
659 }
660 _ => (),
661 }
662 }
663
664 Some(bucket)
665 }
666
667 pub fn parse(slice: &[u8], timestamp: UnixTimestamp) -> Result<Self, ParseMetricError> {
680 let string = std::str::from_utf8(slice).map_err(|_| ParseMetricError)?;
681 Self::parse_str(string, timestamp).ok_or(ParseMetricError)
682 }
683
684 pub fn parse_all(slice: &[u8], timestamp: UnixTimestamp) -> ParseBuckets<'_> {
709 ParseBuckets { slice, timestamp }
710 }
711
712 pub fn tag(&self, name: &str) -> Option<&str> {
714 self.tags.get(name).map(|s| s.as_str())
715 }
716
717 pub fn remove_tag(&mut self, name: &str) -> Option<String> {
721 self.tags.remove(name)
722 }
723}
724
725impl CardinalityItem for Bucket {
726 fn namespace(&self) -> Option<MetricNamespace> {
727 self.name.try_namespace()
728 }
729
730 fn name(&self) -> &MetricName {
731 &self.name
732 }
733
734 fn to_hash(&self) -> u32 {
735 let mut hasher = FnvHasher::default();
736 self.name.hash(&mut hasher);
737 self.tags.hash(&mut hasher);
738 hasher.finish32()
739 }
740}
741
742#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
744pub struct BucketMetadata {
745 pub merges: u32,
759
760 pub received_at: Option<UnixTimestamp>,
765
766 #[serde(skip)]
775 pub extracted_from_indexed: bool,
776}
777
778impl BucketMetadata {
779 pub fn new(received_at: UnixTimestamp) -> Self {
783 Self {
784 merges: 1,
785 received_at: Some(received_at),
786 extracted_from_indexed: false,
787 }
788 }
789
790 pub fn is_default(&self) -> bool {
792 &Self::default() == self
793 }
794
795 pub fn merge(&mut self, other: Self) {
797 self.merges = self.merges.saturating_add(other.merges);
798 self.received_at = match (self.received_at, other.received_at) {
799 (Some(received_at), None) => Some(received_at),
800 (None, Some(received_at)) => Some(received_at),
801 (left, right) => left.min(right),
802 };
803 }
804}
805
806impl Default for BucketMetadata {
807 fn default() -> Self {
808 Self {
809 merges: 1,
810 received_at: None,
811 extracted_from_indexed: false,
812 }
813 }
814}
815
816#[derive(Clone, Debug)]
818pub struct ParseBuckets<'a> {
819 slice: &'a [u8],
820 timestamp: UnixTimestamp,
821}
822
823impl Default for ParseBuckets<'_> {
824 fn default() -> Self {
825 Self {
826 slice: &[],
827 timestamp: UnixTimestamp::from_secs(4711),
829 }
830 }
831}
832
833impl Iterator for ParseBuckets<'_> {
834 type Item = Result<Bucket, ParseMetricError>;
835
836 fn next(&mut self) -> Option<Self::Item> {
837 loop {
838 if self.slice.is_empty() {
839 return None;
840 }
841
842 let mut split = self.slice.splitn(2, |&b| b == b'\n');
843 let current = split.next()?;
844 self.slice = split.next().unwrap_or_default();
845
846 let string = match std::str::from_utf8(current) {
847 Ok(string) => string.strip_suffix('\r').unwrap_or(string),
848 Err(_) => return Some(Err(ParseMetricError)),
849 };
850
851 if !string.is_empty() {
852 return Some(Bucket::parse_str(string, self.timestamp).ok_or(ParseMetricError));
853 }
854 }
855 }
856}
857
858impl FusedIterator for ParseBuckets<'_> {}
859
860#[cfg(test)]
861mod tests {
862 use similar_asserts::assert_eq;
863
864 use crate::protocol::{DurationUnit, MetricUnit};
865
866 use super::*;
867
868 #[test]
869 fn test_distribution_value_size() {
870 assert!(
875 std::mem::size_of::<DistributionValue>() <= std::mem::size_of::<GaugeValue>(),
876 "distribution value should not exceed gauge {}",
877 std::mem::size_of::<DistributionValue>()
878 );
879 }
880
881 #[test]
882 fn test_bucket_value_merge_counter() {
883 let mut value = BucketValue::Counter(42.into());
884 value.merge(BucketValue::Counter(43.into())).unwrap();
885 assert_eq!(value, BucketValue::Counter(85.into()));
886 }
887
888 #[test]
889 fn test_bucket_value_merge_distribution() {
890 let mut value = BucketValue::Distribution(dist![1, 2, 3]);
891 value.merge(BucketValue::Distribution(dist![2, 4])).unwrap();
892 assert_eq!(value, BucketValue::Distribution(dist![1, 2, 3, 2, 4]));
893 }
894
895 #[test]
896 fn test_bucket_value_merge_set() {
897 let mut value = BucketValue::Set(vec![1, 2].into_iter().collect());
898 value.merge(BucketValue::Set([2, 3].into())).unwrap();
899 assert_eq!(value, BucketValue::Set(vec![1, 2, 3].into_iter().collect()));
900 }
901
902 #[test]
903 fn test_bucket_value_merge_gauge() {
904 let mut value = BucketValue::Gauge(GaugeValue::single(42.into()));
905 value.merge(BucketValue::gauge(43.into())).unwrap();
906
907 assert_eq!(
908 value,
909 BucketValue::Gauge(GaugeValue {
910 last: 43.into(),
911 min: 42.into(),
912 max: 43.into(),
913 sum: 85.into(),
914 count: 2,
915 })
916 );
917 }
918
919 #[test]
920 fn test_parse_garbage() {
921 let s = "x23-408j17z4232@#34d\nc3456y7^😎";
922 let timestamp = UnixTimestamp::from_secs(4711);
923 let result = Bucket::parse(s.as_bytes(), timestamp);
924 assert!(result.is_err());
925 }
926
927 #[test]
928 fn test_parse_counter() {
929 let s = "transactions/foo:42|c";
930 let timestamp = UnixTimestamp::from_secs(4711);
931 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
932 insta::assert_debug_snapshot!(metric, @r###"
933 Bucket {
934 timestamp: UnixTimestamp(4711),
935 width: 0,
936 name: MetricName(
937 "c:transactions/foo@none",
938 ),
939 value: Counter(
940 42.0,
941 ),
942 tags: {},
943 metadata: BucketMetadata {
944 merges: 1,
945 received_at: None,
946 extracted_from_indexed: false,
947 },
948 }
949 "###);
950 }
951
952 #[test]
953 fn test_parse_counter_packed() {
954 let s = "transactions/foo:42:17:21|c";
955 let timestamp = UnixTimestamp::from_secs(4711);
956 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
957 assert_eq!(metric.value, BucketValue::Counter(80.into()));
958 }
959
960 #[test]
961 fn test_parse_distribution() {
962 let s = "transactions/foo:17.5|d";
963 let timestamp = UnixTimestamp::from_secs(4711);
964 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
965 insta::assert_debug_snapshot!(metric, @r###"
966 Bucket {
967 timestamp: UnixTimestamp(4711),
968 width: 0,
969 name: MetricName(
970 "d:transactions/foo@none",
971 ),
972 value: Distribution(
973 [
974 17.5,
975 ],
976 ),
977 tags: {},
978 metadata: BucketMetadata {
979 merges: 1,
980 received_at: None,
981 extracted_from_indexed: false,
982 },
983 }
984 "###);
985 }
986
987 #[test]
988 fn test_parse_distribution_packed() {
989 let s = "transactions/foo:17.5:21.9:42.7|d";
990 let timestamp = UnixTimestamp::from_secs(4711);
991 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
992 assert_eq!(
993 metric.value,
994 BucketValue::Distribution(dist![
995 FiniteF64::new(17.5).unwrap(),
996 FiniteF64::new(21.9).unwrap(),
997 FiniteF64::new(42.7).unwrap()
998 ])
999 );
1000 }
1001
1002 #[test]
1003 fn test_parse_histogram() {
1004 let s = "transactions/foo:17.5|h"; let timestamp = UnixTimestamp::from_secs(4711);
1006 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1007 assert_eq!(
1008 metric.value,
1009 BucketValue::Distribution(dist![FiniteF64::new(17.5).unwrap()])
1010 );
1011 }
1012
1013 #[test]
1014 fn test_parse_set() {
1015 let s = "transactions/foo:4267882815|s";
1016 let timestamp = UnixTimestamp::from_secs(4711);
1017 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1018 insta::assert_debug_snapshot!(metric, @r###"
1019 Bucket {
1020 timestamp: UnixTimestamp(4711),
1021 width: 0,
1022 name: MetricName(
1023 "s:transactions/foo@none",
1024 ),
1025 value: Set(
1026 {
1027 4267882815,
1028 },
1029 ),
1030 tags: {},
1031 metadata: BucketMetadata {
1032 merges: 1,
1033 received_at: None,
1034 extracted_from_indexed: false,
1035 },
1036 }
1037 "###);
1038 }
1039
1040 #[test]
1041 fn test_parse_set_hashed() {
1042 let s = "transactions/foo:e2546e4c-ecd0-43ad-ae27-87960e57a658|s";
1043 let timestamp = UnixTimestamp::from_secs(4711);
1044 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1045 assert_eq!(metric.value, BucketValue::Set([4267882815].into()));
1046 }
1047
1048 #[test]
1049 fn test_parse_set_hashed_packed() {
1050 let s = "transactions/foo:e2546e4c-ecd0-43ad-ae27-87960e57a658:00449b66-d91f-4fb8-b324-4c8bdf2499f6|s";
1051 let timestamp = UnixTimestamp::from_secs(4711);
1052 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1053 assert_eq!(
1054 metric.value,
1055 BucketValue::Set([181348692, 4267882815].into())
1056 );
1057 }
1058
1059 #[test]
1060 fn test_parse_set_packed() {
1061 let s = "transactions/foo:3182887624:4267882815|s";
1062 let timestamp = UnixTimestamp::from_secs(4711);
1063 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1064 assert_eq!(
1065 metric.value,
1066 BucketValue::Set([3182887624, 4267882815].into())
1067 )
1068 }
1069
1070 #[test]
1071 fn test_parse_gauge() {
1072 let s = "transactions/foo:42|g";
1073 let timestamp = UnixTimestamp::from_secs(4711);
1074 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1075 insta::assert_debug_snapshot!(metric, @r###"
1076 Bucket {
1077 timestamp: UnixTimestamp(4711),
1078 width: 0,
1079 name: MetricName(
1080 "g:transactions/foo@none",
1081 ),
1082 value: Gauge(
1083 GaugeValue {
1084 last: 42.0,
1085 min: 42.0,
1086 max: 42.0,
1087 sum: 42.0,
1088 count: 1,
1089 },
1090 ),
1091 tags: {},
1092 metadata: BucketMetadata {
1093 merges: 1,
1094 received_at: None,
1095 extracted_from_indexed: false,
1096 },
1097 }
1098 "###);
1099 }
1100
1101 #[test]
1102 fn test_parse_gauge_packed() {
1103 let s = "transactions/foo:25:17:42:220:85|g";
1104 let timestamp = UnixTimestamp::from_secs(4711);
1105 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1106 insta::assert_debug_snapshot!(metric, @r###"
1107 Bucket {
1108 timestamp: UnixTimestamp(4711),
1109 width: 0,
1110 name: MetricName(
1111 "g:transactions/foo@none",
1112 ),
1113 value: Gauge(
1114 GaugeValue {
1115 last: 25.0,
1116 min: 17.0,
1117 max: 42.0,
1118 sum: 220.0,
1119 count: 85,
1120 },
1121 ),
1122 tags: {},
1123 metadata: BucketMetadata {
1124 merges: 1,
1125 received_at: None,
1126 extracted_from_indexed: false,
1127 },
1128 }
1129 "###);
1130 }
1131
1132 #[test]
1133 fn test_parse_implicit_namespace() {
1134 let s = "foo:42|c";
1135 let timestamp = UnixTimestamp::from_secs(4711);
1136 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1137 insta::assert_debug_snapshot!(metric, @r###"
1138 Bucket {
1139 timestamp: UnixTimestamp(4711),
1140 width: 0,
1141 name: MetricName(
1142 "c:custom/foo@none",
1143 ),
1144 value: Counter(
1145 42.0,
1146 ),
1147 tags: {},
1148 metadata: BucketMetadata {
1149 merges: 1,
1150 received_at: None,
1151 extracted_from_indexed: false,
1152 },
1153 }
1154 "###);
1155 }
1156
1157 #[test]
1158 fn test_parse_unit() {
1159 let s = "transactions/foo@second:17.5|d";
1160 let timestamp = UnixTimestamp::from_secs(4711);
1161 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1162 let mri = MetricResourceIdentifier::parse(&metric.name).unwrap();
1163 assert_eq!(mri.unit, MetricUnit::Duration(DurationUnit::Second));
1164 }
1165
1166 #[test]
1167 fn test_parse_unit_regression() {
1168 let s = "transactions/foo@s:17.5|d";
1169 let timestamp = UnixTimestamp::from_secs(4711);
1170 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1171 let mri = MetricResourceIdentifier::parse(&metric.name).unwrap();
1172 assert_eq!(mri.unit, MetricUnit::Duration(DurationUnit::Second));
1173 }
1174
1175 #[test]
1176 fn test_parse_tags() {
1177 let s = "transactions/foo:17.5|d|#foo,bar:baz";
1178 let timestamp = UnixTimestamp::from_secs(4711);
1179 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1180 insta::assert_debug_snapshot!(metric.tags, @r#"
1181 {
1182 "bar": "baz",
1183 "foo": "",
1184 }
1185 "#);
1186 }
1187
1188 #[test]
1189 fn test_parse_tags_escaped() {
1190 let s = "transactions/foo:17.5|d|#foo:😅\\u{2c}🚀";
1191 let timestamp = UnixTimestamp::from_secs(4711);
1192 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1193 insta::assert_debug_snapshot!(metric.tags, @r#"
1194 {
1195 "foo": "😅,🚀",
1196 }
1197 "#);
1198 }
1199
1200 #[test]
1201 fn test_parse_timestamp() {
1202 let s = "transactions/foo:17.5|d|T1615889449";
1203 let timestamp = UnixTimestamp::from_secs(4711);
1204 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1205 assert_eq!(metric.timestamp, UnixTimestamp::from_secs(1615889449));
1206 }
1207
1208 #[test]
1209 fn test_parse_sample_rate() {
1210 let s = "transactions/foo:17.5|d|@0.1";
1212 let timestamp = UnixTimestamp::from_secs(4711);
1213 Bucket::parse(s.as_bytes(), timestamp).unwrap();
1214 }
1215
1216 #[test]
1217 fn test_parse_invalid_name() {
1218 let s = "foo#bar:42|c";
1219 let timestamp = UnixTimestamp::from_secs(4711);
1220 let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
1221 assert_eq!(metric.name.as_ref(), "c:custom/foo_bar@none");
1222 }
1223
1224 #[test]
1225 fn test_parse_empty_name() {
1226 let s = ":42|c";
1227 let timestamp = UnixTimestamp::from_secs(4711);
1228 let metric = Bucket::parse(s.as_bytes(), timestamp);
1229 assert!(metric.is_err());
1230 }
1231
1232 #[test]
1233 fn test_parse_invalid_name_with_leading_digit() {
1234 let s = "64bit:42|c";
1235 let timestamp = UnixTimestamp::from_secs(4711);
1236 let metric = Bucket::parse(s.as_bytes(), timestamp);
1237 assert!(metric.is_err());
1238 }
1239
1240 #[test]
1241 fn test_parse_all() {
1242 let s = "transactions/foo:42|c\nbar:17|c";
1243 let timestamp = UnixTimestamp::from_secs(4711);
1244
1245 let metrics: Vec<Bucket> = Bucket::parse_all(s.as_bytes(), timestamp)
1246 .collect::<Result<_, _>>()
1247 .unwrap();
1248
1249 assert_eq!(metrics.len(), 2);
1250 }
1251
1252 #[test]
1253 fn test_parse_all_crlf() {
1254 let s = "transactions/foo:42|c\r\nbar:17|c";
1255 let timestamp = UnixTimestamp::from_secs(4711);
1256
1257 let metrics: Vec<Bucket> = Bucket::parse_all(s.as_bytes(), timestamp)
1258 .collect::<Result<_, _>>()
1259 .unwrap();
1260
1261 assert_eq!(metrics.len(), 2);
1262 }
1263
1264 #[test]
1265 fn test_parse_all_empty_lines() {
1266 let s = "transactions/foo:42|c\n\n\nbar:17|c";
1267 let timestamp = UnixTimestamp::from_secs(4711);
1268
1269 let metric_count = Bucket::parse_all(s.as_bytes(), timestamp).count();
1270 assert_eq!(metric_count, 2);
1271 }
1272
1273 #[test]
1274 fn test_parse_all_trailing() {
1275 let s = "transactions/foo:42|c\nbar:17|c\n";
1276 let timestamp = UnixTimestamp::from_secs(4711);
1277
1278 let metric_count = Bucket::parse_all(s.as_bytes(), timestamp).count();
1279 assert_eq!(metric_count, 2);
1280 }
1281
1282 #[test]
1283 fn test_metrics_docs() {
1284 let text = include_str!("../tests/fixtures/buckets.statsd.txt").trim_end();
1285 let json = include_str!("../tests/fixtures/buckets.json").trim_end();
1286
1287 let timestamp = UnixTimestamp::from_secs(0);
1288 let statsd_metrics = Bucket::parse_all(text.as_bytes(), timestamp)
1289 .collect::<Result<Vec<_>, _>>()
1290 .unwrap();
1291
1292 let json_metrics: Vec<Bucket> = serde_json::from_str(json).unwrap();
1293
1294 assert_eq!(statsd_metrics, json_metrics);
1295 }
1296
1297 #[test]
1298 fn test_set_docs() {
1299 let text = include_str!("../tests/fixtures/set.statsd.txt").trim_end();
1300 let json = include_str!("../tests/fixtures/set.json").trim_end();
1301
1302 let timestamp = UnixTimestamp::from_secs(1615889449);
1303 let statsd_metric = Bucket::parse(text.as_bytes(), timestamp).unwrap();
1304 let json_metric: Bucket = serde_json::from_str(json).unwrap();
1305
1306 assert_eq!(statsd_metric, json_metric);
1307 }
1308
1309 #[test]
1310 fn test_parse_buckets() {
1311 let json = r#"[
1312 {
1313 "name": "endpoint.response_time",
1314 "unit": "millisecond",
1315 "value": [36, 49, 57, 68],
1316 "type": "d",
1317 "timestamp": 1615889440,
1318 "width": 10,
1319 "tags": {
1320 "route": "user_index"
1321 },
1322 "metadata": {
1323 "merges": 1,
1324 "received_at": 1615889440
1325 }
1326 }
1327 ]"#;
1328
1329 let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1330
1331 insta::assert_debug_snapshot!(buckets, @r###"
1332 [
1333 Bucket {
1334 timestamp: UnixTimestamp(1615889440),
1335 width: 10,
1336 name: MetricName(
1337 "endpoint.response_time",
1338 ),
1339 value: Distribution(
1340 [
1341 36.0,
1342 49.0,
1343 57.0,
1344 68.0,
1345 ],
1346 ),
1347 tags: {
1348 "route": "user_index",
1349 },
1350 metadata: BucketMetadata {
1351 merges: 1,
1352 received_at: Some(
1353 UnixTimestamp(1615889440),
1354 ),
1355 extracted_from_indexed: false,
1356 },
1357 },
1358 ]
1359 "###);
1360 }
1361
1362 #[test]
1363 fn test_parse_bucket_defaults() {
1364 let json = r#"[
1365 {
1366 "name": "endpoint.hits",
1367 "value": 4,
1368 "type": "c",
1369 "timestamp": 1615889440,
1370 "width": 10,
1371 "metadata": {
1372 "merges": 1,
1373 "received_at": 1615889440
1374 }
1375 }
1376 ]"#;
1377
1378 let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1379
1380 insta::assert_debug_snapshot!(buckets, @r###"
1381 [
1382 Bucket {
1383 timestamp: UnixTimestamp(1615889440),
1384 width: 10,
1385 name: MetricName(
1386 "endpoint.hits",
1387 ),
1388 value: Counter(
1389 4.0,
1390 ),
1391 tags: {},
1392 metadata: BucketMetadata {
1393 merges: 1,
1394 received_at: Some(
1395 UnixTimestamp(1615889440),
1396 ),
1397 extracted_from_indexed: false,
1398 },
1399 },
1400 ]
1401 "###);
1402 }
1403
1404 #[test]
1405 fn test_buckets_roundtrip() {
1406 let json = r#"[
1407 {
1408 "timestamp": 1615889440,
1409 "width": 10,
1410 "name": "endpoint.response_time",
1411 "type": "d",
1412 "value": [
1413 36.0,
1414 49.0,
1415 57.0,
1416 68.0
1417 ],
1418 "tags": {
1419 "route": "user_index"
1420 }
1421 },
1422 {
1423 "timestamp": 1615889440,
1424 "width": 10,
1425 "name": "endpoint.hits",
1426 "type": "c",
1427 "value": 4.0,
1428 "tags": {
1429 "route": "user_index"
1430 }
1431 },
1432 {
1433 "timestamp": 1615889440,
1434 "width": 10,
1435 "name": "endpoint.parallel_requests",
1436 "type": "g",
1437 "value": {
1438 "last": 25.0,
1439 "min": 17.0,
1440 "max": 42.0,
1441 "sum": 2210.0,
1442 "count": 85
1443 }
1444 },
1445 {
1446 "timestamp": 1615889440,
1447 "width": 10,
1448 "name": "endpoint.users",
1449 "type": "s",
1450 "value": [
1451 3182887624,
1452 4267882815
1453 ],
1454 "tags": {
1455 "route": "user_index"
1456 }
1457 }
1458]"#;
1459
1460 let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
1461 let serialized = serde_json::to_string_pretty(&buckets).unwrap();
1462 assert_eq!(json, serialized);
1463 }
1464
1465 #[test]
1466 fn test_bucket_docs_roundtrip() {
1467 let json = include_str!("../tests/fixtures/buckets.json")
1468 .trim_end()
1469 .replace("\r\n", "\n");
1470 let buckets = serde_json::from_str::<Vec<Bucket>>(&json).unwrap();
1471
1472 let serialized = serde_json::to_string_pretty(&buckets).unwrap();
1473 assert_eq!(json, serialized);
1474 }
1475
1476 #[test]
1477 fn test_bucket_metadata_merge() {
1478 let mut metadata = BucketMetadata::default();
1479
1480 let other_metadata = BucketMetadata::default();
1481 metadata.merge(other_metadata);
1482 assert_eq!(
1483 metadata,
1484 BucketMetadata {
1485 merges: 2,
1486 received_at: None,
1487 extracted_from_indexed: false,
1488 }
1489 );
1490
1491 let other_metadata = BucketMetadata::new(UnixTimestamp::from_secs(10));
1492 metadata.merge(other_metadata);
1493 assert_eq!(
1494 metadata,
1495 BucketMetadata {
1496 merges: 3,
1497 received_at: Some(UnixTimestamp::from_secs(10)),
1498 extracted_from_indexed: false,
1499 }
1500 );
1501
1502 let other_metadata = BucketMetadata::new(UnixTimestamp::from_secs(20));
1503 metadata.merge(other_metadata);
1504 assert_eq!(
1505 metadata,
1506 BucketMetadata {
1507 merges: 4,
1508 received_at: Some(UnixTimestamp::from_secs(10)),
1509 extracted_from_indexed: false,
1510 }
1511 );
1512 }
1513}