1use core::fmt;
2use std::collections::{BTreeMap, VecDeque};
3use std::mem;
4use std::time::Duration;
5
6use ahash::RandomState;
7use hashbrown::HashMap;
8use hashbrown::hash_map::Entry;
9use relay_base_schema::metrics::MetricName;
10use relay_base_schema::project::ProjectKey;
11use relay_common::time::UnixTimestamp;
12
13use crate::aggregator::stats;
14use crate::aggregator::{AggregateMetricsError, FlushBatching};
15use crate::utils::ByNamespace;
16use crate::{BucketMetadata, BucketValue, DistributionType, SetType};
17
18#[derive(Default)]
19pub struct Partition {
20 pub partition_key: u32,
21 pub buckets: HashMap<BucketKey, BucketData>,
22 pub stats: PartitionStats,
23}
24
25impl fmt::Debug for Partition {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 #[cfg(test)]
28 let buckets = &self.buckets.iter().collect::<BTreeMap<_, _>>();
29 #[cfg(not(test))]
30 let buckets = &self.buckets;
31
32 f.debug_struct("Partition")
33 .field("partition_key", &self.partition_key)
34 .field("stats", &self.stats)
35 .field("buckets", buckets)
36 .finish()
37 }
38}
39
40#[derive(Default, Debug)]
41pub struct PartitionStats {
42 #[expect(unused, reason = "used for snapshot tests")]
44 pub count: u64,
45 pub count_by_namespace: ByNamespace<u64>,
47 #[expect(unused, reason = "used for snapshot tests")]
49 pub merges: u64,
50 pub merges_by_namespace: ByNamespace<u64>,
52 #[expect(unused, reason = "used for snapshot tests")]
54 pub cost: u64,
55 pub cost_by_namespace: ByNamespace<u64>,
57}
58
59impl From<&stats::Slot> for PartitionStats {
60 fn from(value: &stats::Slot) -> Self {
61 Self {
62 count: value.count,
63 count_by_namespace: value.count_by_namespace,
64 merges: value.merges,
65 merges_by_namespace: value.merges_by_namespace,
66 cost: value.cost,
67 cost_by_namespace: value.cost_by_namespace,
68 }
69 }
70}
71
72#[derive(Default, Debug)]
73pub struct Stats {
74 #[expect(unused, reason = "used for snapshot tests")]
76 pub count: u64,
77 pub count_by_namespace: ByNamespace<u64>,
79 #[expect(unused, reason = "used for snapshot tests")]
81 pub cost: u64,
82 pub cost_by_namespace: ByNamespace<u64>,
84}
85
86#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
87pub struct BucketKey {
88 pub project_key: ProjectKey,
89 pub timestamp: UnixTimestamp,
90 pub metric_name: MetricName,
91 pub tags: BTreeMap<String, String>,
92 pub extracted_from_indexed: bool,
93}
94
95impl BucketKey {
96 pub fn cost(&self) -> usize {
101 std::mem::size_of::<Self>() + self.metric_name.len() + crate::utils::tags_cost(&self.tags)
102 }
103}
104
105impl fmt::Debug for BucketKey {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 write!(
108 f,
109 "{}-{}-{}",
110 self.timestamp, self.project_key, self.metric_name
111 )
112 }
113}
114
115pub struct BucketData {
116 pub value: BucketValue,
117 pub metadata: BucketMetadata,
118}
119
120impl fmt::Debug for BucketData {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 self.value.fmt(f)
123 }
124}
125
126impl BucketData {
127 fn merge(&mut self, other: Self) -> Result<usize, AggregateMetricsError> {
131 let cost_before = self.value.cost();
132
133 self.value
134 .merge(other.value)
135 .map_err(|_| AggregateMetricsError::InvalidTypes)?;
136 self.metadata.merge(other.metadata);
137
138 Ok(self.value.cost().saturating_sub(cost_before))
139 }
140}
141
142#[derive(Debug)]
144pub struct Config {
145 pub start: UnixTimestamp,
149 pub bucket_interval: u32,
151 pub num_time_slots: u32,
155 pub num_partitions: u32,
157 pub delay: u32,
161 pub max_total_bucket_bytes: Option<u64>,
163 pub max_project_key_bucket_bytes: Option<u64>,
165 pub max_secs_in_past: Option<u64>,
167 pub max_secs_in_future: Option<u64>,
169 pub partition_by: FlushBatching,
171}
172
173pub struct Inner {
183 slots: VecDeque<Slot>,
207 num_partitions: u64,
209
210 head: u64,
212
213 bucket_interval: u64,
215 delay: u64,
219
220 stats: stats::Total,
222 limits: stats::Limits,
224
225 slot_range: RelativeRange,
228
229 partition_by: FlushBatching,
231 hasher: ahash::RandomState,
233}
234
235impl Inner {
236 pub fn new(config: Config) -> Self {
237 let bucket_interval = config.bucket_interval.max(1);
238 let num_time_slots = config.num_time_slots.max(1) + config.delay.div_ceil(bucket_interval);
245 let num_partitions = config.num_partitions.max(1);
246
247 let mut slots = Vec::with_capacity((num_time_slots * num_partitions) as usize);
248 for _ in 0..num_time_slots {
249 for partition_key in 0..num_partitions {
250 slots.push(Slot {
251 partition_key,
252 ..Default::default()
253 });
254 }
255 }
256 let total_slots = slots.len();
257
258 let bucket_interval = u64::from(bucket_interval);
259 let num_partitions = u64::from(num_partitions);
260
261 let slot_diff = RelativeRange {
262 max_in_past: config
263 .max_secs_in_past
264 .map_or(u64::MAX, |v| v.div_ceil(bucket_interval)),
265 max_in_future: config
266 .max_secs_in_future
267 .map_or(u64::MAX, |v| v.div_ceil(bucket_interval)),
268 };
269
270 let max_partition_project = {
272 let ratio_per_partition = match config.partition_by {
273 FlushBatching::None | FlushBatching::Project => u64::from(num_time_slots),
276 _ => total_slots as u64,
278 };
279
280 config
281 .max_project_key_bucket_bytes
282 .map(|c| c.div_ceil(ratio_per_partition))
283 .unwrap_or(u64::MAX)
284 };
285
286 Self {
287 slots: VecDeque::from(slots),
288 num_partitions,
289 head: config.start.as_secs() / bucket_interval * num_partitions,
290 bucket_interval,
291 delay: u64::from(config.delay),
292 stats: stats::Total::default(),
293 limits: stats::Limits {
294 max_total: config.max_total_bucket_bytes.unwrap_or(u64::MAX),
295 max_partition_project,
296 },
297 slot_range: slot_diff,
298 partition_by: config.partition_by,
299 hasher: build_hasher(),
300 }
301 }
302
303 pub fn bucket_interval(&self) -> u64 {
305 self.bucket_interval
306 }
307
308 pub fn stats(&self) -> Stats {
310 Stats {
311 count: self.stats.count,
312 count_by_namespace: self.stats.count_by_namespace,
313 cost: self.stats.count,
314 cost_by_namespace: self.stats.cost_by_namespace,
315 }
316 }
317
318 pub fn is_empty(&self) -> bool {
320 self.stats.count == 0
321 }
322
323 pub fn next_flush_at(&self) -> Duration {
325 let offset = Duration::from_secs(self.head + 1) / self.num_partitions as u32
330 * self.bucket_interval as u32;
331 offset + Duration::from_secs(self.delay)
332 }
333
334 pub fn merge(
336 &mut self,
337 mut key: BucketKey,
338 value: BucketData,
339 ) -> Result<(), AggregateMetricsError> {
340 let project_key = key.project_key;
341 let namespace = key.metric_name.namespace();
342
343 let time_slot = key.timestamp.as_secs() / self.bucket_interval;
344 key.timestamp = UnixTimestamp::from_secs(time_slot * self.bucket_interval);
346
347 let now_slot = self.head / self.num_partitions;
348 if !self.slot_range.contains(now_slot, time_slot) {
349 return Err(AggregateMetricsError::InvalidTimestamp(key.timestamp));
350 }
351
352 let assigned_partition = match self.partition_by {
353 FlushBatching::None => 0,
354 FlushBatching::Project => self.hasher.hash_one(key.project_key),
355 FlushBatching::Bucket => self.hasher.hash_one(&key),
356 FlushBatching::Partition => {
357 self.hasher
358 .hash_one((key.project_key, &key.metric_name, &key.tags))
359 }
360 } % self.num_partitions;
361
362 let slot = time_slot * self.num_partitions + assigned_partition;
364 let index = sub_rem_euclid(slot, self.head, self.slots.len() as u64);
367
368 let slot = self
369 .slots
370 .get_mut(index as usize)
371 .expect("index should always be a valid slot index");
372
373 debug_assert_eq!(
374 u64::from(slot.partition_key),
375 assigned_partition,
376 "assigned partition does not match selected partition"
377 );
378
379 let key_cost = key.cost() as u64;
380 match slot.buckets.entry(key) {
381 Entry::Occupied(occupied_entry) => {
382 let estimated_cost = match &value.value {
383 BucketValue::Counter(_) | BucketValue::Gauge(_) => 0,
385 BucketValue::Distribution(d) => d.len() * mem::size_of::<DistributionType>(),
387 BucketValue::Set(s) => s.len() * mem::size_of::<SetType>(),
389 };
390
391 let reservation = slot.stats.reserve(
393 &mut self.stats,
394 project_key,
395 namespace,
396 estimated_cost as u64,
397 &self.limits,
398 )?;
399
400 let actual_cost = occupied_entry.into_mut().merge(value)?;
401
402 reservation.consume_with(actual_cost as u64);
404 slot.stats.incr_merges(namespace);
405 }
406 Entry::Vacant(vacant_entry) => {
407 let reservation = slot.stats.reserve(
408 &mut self.stats,
409 project_key,
410 namespace,
411 key_cost + value.value.cost() as u64,
412 &self.limits,
413 )?;
414
415 vacant_entry.insert(value);
416
417 reservation.consume();
418 slot.stats.incr_count(&mut self.stats, namespace);
419 }
420 };
421
422 debug_assert_eq!(slot.stats.count, slot.buckets.len() as u64);
423
424 Ok(())
425 }
426
427 pub fn flush_next(&mut self) -> Partition {
429 let mut slot @ Slot { partition_key, .. } = self
430 .slots
431 .pop_front()
432 .expect("there should always be at least one partition");
433
434 let stats = PartitionStats::from(&slot.stats);
435
436 self.stats.remove_slot(&slot.stats);
438 slot.stats.reset();
439
440 self.slots.push_back(Slot {
446 buckets: HashMap::with_capacity_and_hasher(slot.buckets.len(), *slot.buckets.hasher()),
447 ..slot
448 });
449
450 self.head += 1;
454
455 Partition {
456 partition_key,
457 buckets: slot.buckets,
458 stats,
459 }
460 }
461
462 pub fn into_partitions(self) -> impl Iterator<Item = Partition> {
464 self.slots.into_iter().map(|slot| Partition {
465 partition_key: slot.partition_key,
466 buckets: slot.buckets,
467 stats: PartitionStats::from(&slot.stats),
468 })
469 }
470}
471
472impl fmt::Debug for Inner {
473 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474 let mut list = f.debug_list();
475 list.entry(&self.stats);
476 for (i, v) in self.slots.iter().enumerate() {
477 let head_partitions = self.head % self.num_partitions;
478 let head_time = self.head / self.num_partitions;
479
480 let time_offset = (head_partitions + i as u64) / self.num_partitions;
481 let time = (head_time + time_offset) * self.bucket_interval;
482
483 match v.is_empty() {
484 true => list.entry(&format!("({time}, {v:?})")),
486 false => list.entry(&(time, v)),
487 };
488 }
489 list.finish()
490 }
491}
492
493#[derive(Default)]
494struct Slot {
495 pub partition_key: u32,
496 pub stats: stats::Slot,
497 pub buckets: HashMap<BucketKey, BucketData>,
498}
499
500impl Slot {
501 fn is_empty(&self) -> bool {
502 self.stats == Default::default() && self.buckets.is_empty()
503 }
504}
505
506impl fmt::Debug for Slot {
507 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
508 if self.is_empty() {
509 write!(f, "Slot({})", self.partition_key)
510 } else {
511 #[cfg(test)]
512 let buckets = &self.buckets.iter().collect::<BTreeMap<_, _>>();
513 #[cfg(not(test))]
514 let buckets = &self.buckets;
515
516 f.debug_struct("Slot")
517 .field("partition_key", &self.partition_key)
518 .field("stats", &self.stats)
519 .field("buckets", buckets)
520 .finish()
521 }
522 }
523}
524
525struct RelativeRange {
526 max_in_past: u64,
527 max_in_future: u64,
528}
529
530impl RelativeRange {
531 fn contains(&self, now: u64, target: u64) -> bool {
532 if target < now {
533 let diff = now - target;
535 diff <= self.max_in_past
536 } else {
537 let diff = target - now;
539 diff <= self.max_in_future
540 }
541 }
542}
543
544fn sub_rem_euclid(a: u64, b: u64, m: u64) -> u64 {
552 (
553 (a + m)
555 -
556 (b % m)
558 ) % m
559}
560
561fn build_hasher() -> RandomState {
562 const K0: u64 = 0x06459b7d5da84ed8;
564 const K1: u64 = 0x3321ce2636c567cc;
565 const K2: u64 = 0x56c94d7107c49765;
566 const K3: u64 = 0x685bf5f9abbea5ab;
567
568 ahash::RandomState::with_seeds(K0, K1, K2, K3)
569}
570
571#[cfg(test)]
572mod tests {
573 use super::*;
574
575 fn bucket_key(ts: u64, name: &str) -> BucketKey {
576 BucketKey {
577 project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
578 timestamp: UnixTimestamp::from_secs(ts),
579 metric_name: name.into(),
580 tags: Default::default(),
581 extracted_from_indexed: false,
582 }
583 }
584
585 fn counter(value: f64) -> BucketData {
586 BucketData {
587 value: BucketValue::counter(value.try_into().unwrap()),
588 metadata: Default::default(),
589 }
590 }
591
592 #[test]
593 fn test_merge_flush() -> Result<(), AggregateMetricsError> {
594 let mut buckets = Inner::new(Config {
595 bucket_interval: 10,
596 num_time_slots: 6,
597 num_partitions: 2,
598 delay: 0,
599 max_secs_in_past: None,
600 max_secs_in_future: None,
601 max_total_bucket_bytes: None,
602 max_project_key_bucket_bytes: None,
603 start: UnixTimestamp::from_secs(70),
604 partition_by: FlushBatching::Partition,
605 });
606
607 buckets.merge(bucket_key(70, "a"), counter(1.0))?;
609 buckets.merge(bucket_key(80, "b"), counter(1.0))?;
610 buckets.merge(bucket_key(80, "b"), counter(2.0))?;
611 buckets.merge(bucket_key(32, "c"), counter(1.0))?;
613 buckets.merge(bucket_key(42, "d"), counter(1.0))?;
614 buckets.merge(bucket_key(171, "e"), counter(1.0))?;
616 buckets.merge(bucket_key(181, "f"), counter(1.0))?;
617 buckets.merge(bucket_key(191, "a"), counter(1.0))?;
618
619 insta::assert_debug_snapshot!(buckets);
620
621 let partition = buckets.flush_next();
622 insta::assert_debug_snapshot!(partition, @r###"
623 Partition {
624 partition_key: 0,
625 stats: PartitionStats {
626 count: 2,
627 count_by_namespace: (unsupported:2),
628 merges: 0,
629 merges_by_namespace: (0),
630 cost: 274,
631 cost_by_namespace: (unsupported:274),
632 },
633 buckets: {
634 70-00000000000000000000000000000000-a: Counter(
635 1.0,
636 ),
637 190-00000000000000000000000000000000-a: Counter(
638 1.0,
639 ),
640 },
641 }
642 "###);
643
644 buckets.merge(bucket_key(70, "a"), counter(1.0))?;
646
647 insta::assert_debug_snapshot!(buckets);
648
649 let partition = buckets.flush_next();
650 insta::assert_debug_snapshot!(partition, @r###"
651 Partition {
652 partition_key: 1,
653 stats: PartitionStats {
654 count: 0,
655 count_by_namespace: (0),
656 merges: 0,
657 merges_by_namespace: (0),
658 cost: 0,
659 cost_by_namespace: (0),
660 },
661 buckets: {},
662 }
663 "###);
664
665 insta::assert_debug_snapshot!(buckets);
666
667 insta::assert_debug_snapshot!(buckets.stats(), @r###"
668 Stats {
669 count: 6,
670 count_by_namespace: (unsupported:6),
671 cost: 6,
672 cost_by_namespace: (unsupported:822),
673 }
674 "###);
675
676 Ok(())
677 }
678
679 #[test]
680 fn test_merge_flush_project() -> Result<(), AggregateMetricsError> {
681 let mut buckets = Inner::new(Config {
682 bucket_interval: 10,
683 num_time_slots: 1,
684 num_partitions: 2,
685 delay: 0,
686 max_secs_in_past: None,
687 max_secs_in_future: None,
688 max_total_bucket_bytes: None,
689 max_project_key_bucket_bytes: None,
690 start: UnixTimestamp::from_secs(70),
691 partition_by: FlushBatching::Project,
692 });
693
694 for i in 0..1_000 {
695 for j in 0..10 {
696 let bucket = BucketKey {
697 project_key: ProjectKey::parse(&format!("{i:0width$x}", width = 32)).unwrap(),
698 ..bucket_key(70, &format!("b_{j}"))
699 };
700 buckets.merge(bucket, counter(1.0))?;
701 }
702 }
703
704 let mut by_project1 = HashMap::new();
705 for (key, _) in buckets.flush_next().buckets {
706 *by_project1.entry(key.project_key).or_insert(0u64) += 1;
707 }
708
709 let mut by_project2 = HashMap::new();
710 for (key, _) in buckets.flush_next().buckets {
711 *by_project2.entry(key.project_key).or_insert(0u64) += 1;
712 }
713
714 assert_eq!(by_project1.len() + by_project2.len(), 1_000);
715 assert_eq!(by_project1.len(), 509);
717
718 for (pk, v) in by_project1 {
719 assert_eq!(v, 10);
721 assert!(!by_project2.contains_key(&pk));
723 }
724 for (_, v) in by_project2 {
725 assert_eq!(v, 10);
726 }
727
728 Ok(())
729 }
730
731 #[test]
732 fn test_merge_flush_cost_limits() -> Result<(), AggregateMetricsError> {
733 const ONE_BUCKET_COST: u64 = 137;
734
735 let mut buckets = Inner::new(Config {
736 bucket_interval: 10,
737 num_time_slots: 3,
738 num_partitions: 1,
739 delay: 0,
740 max_secs_in_past: None,
741 max_secs_in_future: None,
742 max_total_bucket_bytes: Some(ONE_BUCKET_COST * 2),
743 max_project_key_bucket_bytes: Some(ONE_BUCKET_COST * 3),
745 start: UnixTimestamp::from_secs(70),
746 partition_by: FlushBatching::Partition,
747 });
748
749 buckets.merge(bucket_key(70, "a"), counter(1.0))?;
750 assert_eq!(
752 buckets
753 .merge(bucket_key(70, "b"), counter(1.0))
754 .unwrap_err(),
755 AggregateMetricsError::ProjectLimitExceeded
756 );
757 buckets.merge(bucket_key(70, "a"), counter(2.0))?;
759
760 let other_project = BucketKey {
762 project_key: ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap(),
763 ..bucket_key(70, "a")
764 };
765 buckets.merge(other_project, counter(3.0))?;
766
767 assert_eq!(
769 buckets
770 .merge(bucket_key(80, "c"), counter(1.0))
771 .unwrap_err(),
772 AggregateMetricsError::TotalLimitExceeded
773 );
774 insta::assert_debug_snapshot!(buckets.flush_next(), @r###"
776 Partition {
777 partition_key: 0,
778 stats: PartitionStats {
779 count: 2,
780 count_by_namespace: (unsupported:2),
781 merges: 1,
782 merges_by_namespace: (unsupported:1),
783 cost: 274,
784 cost_by_namespace: (unsupported:274),
785 },
786 buckets: {
787 70-00000000000000000000000000000000-a: Counter(
788 3.0,
789 ),
790 70-bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb-a: Counter(
791 3.0,
792 ),
793 },
794 }
795 "###);
796 buckets.merge(bucket_key(80, "c"), counter(1.0))?;
797
798 insta::assert_debug_snapshot!(buckets, @r###"
799 [
800 Total {
801 count: 1,
802 count_by_namespace: (unsupported:1),
803 cost: 137,
804 cost_by_namespace: (unsupported:137),
805 },
806 (
807 80,
808 Slot {
809 partition_key: 0,
810 stats: Slot {
811 count: 1,
812 count_by_namespace: (unsupported:1),
813 merges: 0,
814 merges_by_namespace: (0),
815 cost: 137,
816 cost_by_namespace: (unsupported:137),
817 cost_by_project: {
818 ProjectKey("00000000000000000000000000000000"): 137,
819 },
820 },
821 buckets: {
822 80-00000000000000000000000000000000-c: Counter(
823 1.0,
824 ),
825 },
826 },
827 ),
828 "(90, Slot(0))",
829 "(100, Slot(0))",
830 ]
831 "###);
832
833 insta::assert_debug_snapshot!(buckets.stats(), @r###"
834 Stats {
835 count: 1,
836 count_by_namespace: (unsupported:1),
837 cost: 1,
838 cost_by_namespace: (unsupported:137),
839 }
840 "###);
841
842 Ok(())
843 }
844
845 #[test]
846 fn test_merge_flush_with_delay() {
847 let mut buckets = Inner::new(Config {
848 bucket_interval: 20,
850 num_time_slots: 3,
852 num_partitions: 2,
854 delay: 20,
856 max_total_bucket_bytes: None,
857 max_project_key_bucket_bytes: None,
858 max_secs_in_past: None,
859 max_secs_in_future: None,
860 start: UnixTimestamp::from_secs(63),
862 partition_by: FlushBatching::Partition,
863 });
864
865 buckets.merge(bucket_key(60, "a"), counter(1.0)).unwrap();
867 buckets.merge(bucket_key(120, "b"), counter(2.0)).unwrap();
869
870 assert_eq!(buckets.next_flush_at(), Duration::from_secs(90));
872 insta::assert_debug_snapshot!(buckets.flush_next(), @r###"
873 Partition {
874 partition_key: 0,
875 stats: PartitionStats {
876 count: 1,
877 count_by_namespace: (unsupported:1),
878 merges: 0,
879 merges_by_namespace: (0),
880 cost: 137,
881 cost_by_namespace: (unsupported:137),
882 },
883 buckets: {
884 60-00000000000000000000000000000000-a: Counter(
885 1.0,
886 ),
887 },
888 }
889 "###);
890 assert!(buckets.flush_next().buckets.is_empty());
891
892 assert_eq!(buckets.next_flush_at(), Duration::from_secs(110));
894 assert!(buckets.flush_next().buckets.is_empty());
895 assert!(buckets.flush_next().buckets.is_empty());
896
897 assert_eq!(buckets.next_flush_at(), Duration::from_secs(130));
899 assert!(buckets.flush_next().buckets.is_empty());
900 assert!(buckets.flush_next().buckets.is_empty());
901
902 assert_eq!(buckets.next_flush_at(), Duration::from_secs(150));
904 insta::assert_debug_snapshot!(buckets.flush_next(), @r###"
905 Partition {
906 partition_key: 0,
907 stats: PartitionStats {
908 count: 1,
909 count_by_namespace: (unsupported:1),
910 merges: 0,
911 merges_by_namespace: (0),
912 cost: 137,
913 cost_by_namespace: (unsupported:137),
914 },
915 buckets: {
916 120-00000000000000000000000000000000-b: Counter(
917 2.0,
918 ),
919 },
920 }
921 "###);
922 assert!(buckets.flush_next().buckets.is_empty());
923
924 assert_eq!(buckets.next_flush_at(), Duration::from_secs(170));
926 }
927
928 #[test]
929 fn test_next_flush() {
930 let mut buckets = Inner::new(Config {
931 bucket_interval: 10,
932 num_time_slots: 6,
933 num_partitions: 2,
934 delay: 0,
935 max_secs_in_past: None,
936 max_secs_in_future: None,
937 max_total_bucket_bytes: None,
938 max_project_key_bucket_bytes: None,
939 start: UnixTimestamp::from_secs(70),
940 partition_by: FlushBatching::Partition,
941 });
942
943 assert_eq!(buckets.next_flush_at(), Duration::from_secs(75));
944 assert_eq!(buckets.flush_next().partition_key, 0);
945 assert_eq!(buckets.next_flush_at(), Duration::from_secs(80));
946 assert_eq!(buckets.flush_next().partition_key, 1);
947 assert_eq!(buckets.next_flush_at(), Duration::from_secs(85));
948 assert_eq!(buckets.flush_next().partition_key, 0);
949 assert_eq!(buckets.next_flush_at(), Duration::from_secs(90));
950 assert_eq!(buckets.next_flush_at(), Duration::from_secs(90));
951 }
952
953 #[test]
954 fn test_next_flush_with_delay() {
955 let mut buckets = Inner::new(Config {
956 bucket_interval: 10,
957 num_time_slots: 6,
958 num_partitions: 2,
959 delay: 3,
960 max_secs_in_past: None,
961 max_secs_in_future: None,
962 max_total_bucket_bytes: None,
963 max_project_key_bucket_bytes: None,
964 start: UnixTimestamp::from_secs(70),
965 partition_by: FlushBatching::Partition,
966 });
967
968 assert_eq!(buckets.next_flush_at(), Duration::from_secs(78));
969 assert_eq!(buckets.flush_next().partition_key, 0);
970 assert_eq!(buckets.next_flush_at(), Duration::from_secs(83));
971 assert_eq!(buckets.flush_next().partition_key, 1);
972 assert_eq!(buckets.next_flush_at(), Duration::from_secs(88));
973 assert_eq!(buckets.flush_next().partition_key, 0);
974 assert_eq!(buckets.next_flush_at(), Duration::from_secs(93));
975 assert_eq!(buckets.next_flush_at(), Duration::from_secs(93));
976 }
977
978 #[test]
979 fn test_merge_flush_time_limits() -> Result<(), AggregateMetricsError> {
980 let mut buckets = Inner::new(Config {
981 bucket_interval: 10,
982 num_time_slots: 6,
983 num_partitions: 2,
984 delay: 0,
985 max_secs_in_past: Some(33), max_secs_in_future: Some(22), max_total_bucket_bytes: None,
988 max_project_key_bucket_bytes: None,
989 start: UnixTimestamp::from_secs(70),
990 partition_by: FlushBatching::Partition,
991 });
992
993 buckets.merge(bucket_key(70, "a"), counter(1.0))?;
994
995 buckets.merge(bucket_key(60, "a"), counter(1.0))?;
997 buckets.merge(bucket_key(50, "a"), counter(1.0))?;
998 buckets.merge(bucket_key(40, "a"), counter(1.0))?;
999 buckets.merge(bucket_key(30, "a"), counter(1.0))?;
1000 assert_eq!(
1001 buckets
1002 .merge(bucket_key(29, "a"), counter(1.0))
1003 .unwrap_err(),
1004 AggregateMetricsError::InvalidTimestamp(UnixTimestamp::from_secs(20))
1005 );
1006
1007 buckets.merge(bucket_key(80, "a"), counter(1.0))?;
1009 buckets.merge(bucket_key(90, "a"), counter(1.0))?;
1010 buckets.merge(bucket_key(109, "a"), counter(1.0))?;
1011 assert_eq!(
1012 buckets
1013 .merge(bucket_key(110, "a"), counter(1.0))
1014 .unwrap_err(),
1015 AggregateMetricsError::InvalidTimestamp(UnixTimestamp::from_secs(110))
1016 );
1017
1018 Ok(())
1019 }
1020
1021 #[test]
1022 fn test_sub_rem_euclid() {
1023 for (head, slot, expected) in [
1024 (253, 253, 0),
1026 (253, 273, 0),
1028 (253, 274, 1),
1029 (253, 275, 2),
1030 (253, 276, 3),
1031 (253, 277, 4),
1032 (253, 233, 0),
1034 (253, 234, 1),
1035 (253, 235, 2),
1036 (253, 236, 3),
1037 (253, 237, 4),
1038 ] {
1039 assert_eq!(sub_rem_euclid(slot, head, 5), expected);
1040 }
1041 }
1042}