relay_metrics/aggregator/
inner.rs

1use core::fmt;
2use std::collections::{BTreeMap, VecDeque};
3use std::mem;
4use std::time::Duration;
5
6use ahash::RandomState;
7use hashbrown::HashMap;
8use hashbrown::hash_map::Entry;
9use relay_base_schema::metrics::MetricName;
10use relay_base_schema::project::ProjectKey;
11use relay_common::time::UnixTimestamp;
12
13use crate::aggregator::stats;
14use crate::aggregator::{AggregateMetricsError, FlushBatching};
15use crate::utils::ByNamespace;
16use crate::{BucketMetadata, BucketValue, DistributionType, SetType};
17
18#[derive(Default)]
19pub struct Partition {
20    pub partition_key: u32,
21    pub buckets: HashMap<BucketKey, BucketData>,
22    pub stats: PartitionStats,
23}
24
25impl fmt::Debug for Partition {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        #[cfg(test)]
28        let buckets = &self.buckets.iter().collect::<BTreeMap<_, _>>();
29        #[cfg(not(test))]
30        let buckets = &self.buckets;
31
32        f.debug_struct("Partition")
33            .field("partition_key", &self.partition_key)
34            .field("stats", &self.stats)
35            .field("buckets", buckets)
36            .finish()
37    }
38}
39
40#[derive(Default, Debug)]
41pub struct PartitionStats {
42    /// Amount of unique buckets in the partition.
43    #[expect(unused, reason = "used for snapshot tests")]
44    pub count: u64,
45    /// Amount of unique buckets in the partition by namespace.
46    pub count_by_namespace: ByNamespace<u64>,
47    /// Amount of times a bucket was merged in the partition.
48    #[expect(unused, reason = "used for snapshot tests")]
49    pub merges: u64,
50    /// Amount of times a bucket was merged in the partition by namespace.
51    pub merges_by_namespace: ByNamespace<u64>,
52    /// Cost of buckets in the partition.
53    #[expect(unused, reason = "used for snapshot tests")]
54    pub cost: u64,
55    /// Cost of buckets in the partition by namespace.
56    pub cost_by_namespace: ByNamespace<u64>,
57}
58
59impl From<&stats::Slot> for PartitionStats {
60    fn from(value: &stats::Slot) -> Self {
61        Self {
62            count: value.count,
63            count_by_namespace: value.count_by_namespace,
64            merges: value.merges,
65            merges_by_namespace: value.merges_by_namespace,
66            cost: value.cost,
67            cost_by_namespace: value.cost_by_namespace,
68        }
69    }
70}
71
72#[derive(Default, Debug)]
73pub struct Stats {
74    /// Total amount of buckets in the aggregator.
75    #[expect(unused, reason = "used for snapshot tests")]
76    pub count: u64,
77    /// Total amount of buckets in the aggregator by namespace.
78    pub count_by_namespace: ByNamespace<u64>,
79    /// Total bucket cost in the aggregator.
80    #[expect(unused, reason = "used for snapshot tests")]
81    pub cost: u64,
82    /// Total bucket cost in the aggregator by namespace.
83    pub cost_by_namespace: ByNamespace<u64>,
84}
85
86#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
87pub struct BucketKey {
88    pub project_key: ProjectKey,
89    pub timestamp: UnixTimestamp,
90    pub metric_name: MetricName,
91    pub tags: BTreeMap<String, String>,
92    pub extracted_from_indexed: bool,
93}
94
95impl BucketKey {
96    /// Estimates the number of bytes needed to encode the bucket key.
97    ///
98    /// Note that this does not necessarily match the exact memory footprint of the key,
99    /// because data structures have a memory overhead.
100    pub fn cost(&self) -> usize {
101        std::mem::size_of::<Self>() + self.metric_name.len() + crate::utils::tags_cost(&self.tags)
102    }
103}
104
105impl fmt::Debug for BucketKey {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        write!(
108            f,
109            "{}-{}-{}",
110            self.timestamp, self.project_key, self.metric_name
111        )
112    }
113}
114
115pub struct BucketData {
116    pub value: BucketValue,
117    pub metadata: BucketMetadata,
118}
119
120impl fmt::Debug for BucketData {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        self.value.fmt(f)
123    }
124}
125
126impl BucketData {
127    /// Merges another bucket's data into this one.
128    ///
129    /// Returns the value cost increase on success.
130    fn merge(&mut self, other: Self) -> Result<usize, AggregateMetricsError> {
131        let cost_before = self.value.cost();
132
133        self.value
134            .merge(other.value)
135            .map_err(|_| AggregateMetricsError::InvalidTypes)?;
136        self.metadata.merge(other.metadata);
137
138        Ok(self.value.cost().saturating_sub(cost_before))
139    }
140}
141
142/// Config used to create a [`Inner`] instance.
143#[derive(Debug)]
144pub struct Config {
145    /// Initial position/time of the aggregator.
146    ///
147    /// Except in testing, this should always be [`UnixTimestamp::now`].
148    pub start: UnixTimestamp,
149    /// Size of each individual bucket, inputs are truncated to this value.
150    pub bucket_interval: u32,
151    /// The amount of time slots to keep track of in the aggregator.
152    ///
153    /// The size of a time slot is defined by [`Self::bucket_interval`].
154    pub num_time_slots: u32,
155    /// The amount of partitions per time slot.
156    pub num_partitions: u32,
157    /// Delay how long a bucket should remain in the aggregator before being flushed.
158    ///
159    /// Ideally the delay is a multiple of [`Self::bucket_interval`].
160    pub delay: u32,
161    /// Maximum amount of bytes the aggregator can grow to.
162    pub max_total_bucket_bytes: Option<u64>,
163    /// Maximum amount of bytes the aggregator allows per project key.
164    pub max_project_key_bucket_bytes: Option<u64>,
165    /// The age in seconds of the oldest allowed bucket timestamp.
166    pub max_secs_in_past: Option<u64>,
167    /// The time in seconds that a timestamp may be in the future.
168    pub max_secs_in_future: Option<u64>,
169    /// Determines how partitions are assigned based on the input bucket.
170    pub partition_by: FlushBatching,
171}
172
173/// A metrics aggregator.
174///
175/// The aggregator is unaware of current time and needs to be driven by periodic flushes using
176/// [`Self::flush_next`]. Each flush advances the internal clock by the configured
177/// [`Config::bucket_interval`].
178///
179/// The internal time is set on construction to [`Config::start`].
180///
181/// Use [`Self::next_flush_at`] to determine the time when to call [`Self::flush_next`].
182pub struct Inner {
183    /// Ring-buffer of aggregation slots.
184    ///
185    /// This is treated as a ring-buffer of a two dimensional array. The first dimension is a
186    /// "time slot" and the second dimension is the assigned partition.
187    ///
188    /// The total length of the ring-buffer is then determined by the amount of time slots times
189    /// the amount of partitions. In other words, every time slot has [`Self::num_partitions`]
190    /// partitions.
191    ///
192    /// Layout:
193    /// Time slots: [            ][            ][            ]
194    /// Partitions:  [    ][    ]  [    ][    ]  [    ][    ]
195    ///
196    /// An item is assigned by first determining its time slot and then assigning it to a partition
197    /// based on selected [`Self::partition_by`] strategy.
198    ///
199    /// The first item in the buffer is tracked by [`Self::head`] which is at any time the
200    /// current partition since the beginning "zero". The beginning in the aggregator refers to the
201    /// beginning of the epoch. The [`Self::head`] at time `t` is calculated with
202    /// `f(t) = t / bucket_interval * num_partitions`.
203    ///
204    /// Flushing a partition advances the [`Self::head`] by a single value `+1`. Meaning
205    /// effectively time is advanced by `bucket_interval / num_partitions`.
206    slots: VecDeque<Slot>,
207    /// The amount of partitions per time slot.
208    num_partitions: u64,
209
210    /// Position of the first element in [`Self::slots`].
211    head: u64,
212
213    /// Size of each individual bucket, inputs are truncated modulo to this value.
214    bucket_interval: u64,
215    /// Amount of slots which is added to a bucket as a delay.
216    ///
217    /// This is a fixed delay which is added to to the time returned by [`Self::next_flush_at`].
218    delay: u64,
219
220    /// Total stats of the aggregator.
221    stats: stats::Total,
222    /// Configured limits based on aggregator stats.
223    limits: stats::Limits,
224
225    /// The maximum amount of slots (size of a `bucket_interval`) the timestamp is allowed to be
226    /// in the past or future.
227    slot_range: RelativeRange,
228
229    /// Determines how partitions are assigned based on the input bucket.
230    partition_by: FlushBatching,
231    /// Hasher used to calculate partitions.
232    hasher: ahash::RandomState,
233}
234
235impl Inner {
236    pub fn new(config: Config) -> Self {
237        let bucket_interval = config.bucket_interval.max(1);
238        // Extend the configured time slots with the delay (in slots), to make sure there is
239        // enough space to satisfy the delay.
240        //
241        // We cannot just reserve space for enough partitions to satisfy the delay, because we have
242        // no control over which partition a bucket is assigned to, so we have to prepare for the
243        // 'worst' case, and that is full extra time slots.
244        let num_time_slots = config.num_time_slots.max(1) + config.delay.div_ceil(bucket_interval);
245        let num_partitions = config.num_partitions.max(1);
246
247        let mut slots = Vec::with_capacity((num_time_slots * num_partitions) as usize);
248        for _ in 0..num_time_slots {
249            for partition_key in 0..num_partitions {
250                slots.push(Slot {
251                    partition_key,
252                    ..Default::default()
253                });
254            }
255        }
256        let total_slots = slots.len();
257
258        let bucket_interval = u64::from(bucket_interval);
259        let num_partitions = u64::from(num_partitions);
260
261        let slot_diff = RelativeRange {
262            max_in_past: config
263                .max_secs_in_past
264                .map_or(u64::MAX, |v| v.div_ceil(bucket_interval)),
265            max_in_future: config
266                .max_secs_in_future
267                .map_or(u64::MAX, |v| v.div_ceil(bucket_interval)),
268        };
269
270        // Break down the maximum project cost to a maximum cost per partition.
271        let max_partition_project = {
272            let ratio_per_partition = match config.partition_by {
273                // All buckets in the same timeslot of a project are in the same partition.
274                // -> The total maximum allowed ratio is just determined by the amount of timeslots
275                FlushBatching::None | FlushBatching::Project => u64::from(num_time_slots),
276                // Buckets are evenly distributed across all existing partitions and timeslots.
277                _ => total_slots as u64,
278            };
279
280            config
281                .max_project_key_bucket_bytes
282                .map(|c| c.div_ceil(ratio_per_partition))
283                .unwrap_or(u64::MAX)
284        };
285
286        Self {
287            slots: VecDeque::from(slots),
288            num_partitions,
289            head: config.start.as_secs() / bucket_interval * num_partitions,
290            bucket_interval,
291            delay: u64::from(config.delay),
292            stats: stats::Total::default(),
293            limits: stats::Limits {
294                max_total: config.max_total_bucket_bytes.unwrap_or(u64::MAX),
295                max_partition_project,
296            },
297            slot_range: slot_diff,
298            partition_by: config.partition_by,
299            hasher: build_hasher(),
300        }
301    }
302
303    /// Returns the configured bucket interval.
304    pub fn bucket_interval(&self) -> u64 {
305        self.bucket_interval
306    }
307
308    /// Returns total aggregator stats.
309    pub fn stats(&self) -> Stats {
310        Stats {
311            count: self.stats.count,
312            count_by_namespace: self.stats.count_by_namespace,
313            cost: self.stats.count,
314            cost_by_namespace: self.stats.cost_by_namespace,
315        }
316    }
317
318    /// Returns `true` if the aggregator contains any metric buckets.
319    pub fn is_empty(&self) -> bool {
320        self.stats.count == 0
321    }
322
323    /// Returns the time as a duration since epoch when the next flush is supposed to happen.
324    pub fn next_flush_at(&self) -> Duration {
325        // `head + 1` to get the end time of the slot not the start, convert `head` to a duration
326        // first, to have enough precision for the division.
327        //
328        // Casts do not wrap, configuration requires them to be `u32`.
329        let offset = Duration::from_secs(self.head + 1) / self.num_partitions as u32
330            * self.bucket_interval as u32;
331        offset + Duration::from_secs(self.delay)
332    }
333
334    /// Merges a metric bucket.
335    pub fn merge(
336        &mut self,
337        mut key: BucketKey,
338        value: BucketData,
339    ) -> Result<(), AggregateMetricsError> {
340        let project_key = key.project_key;
341        let namespace = key.metric_name.namespace();
342
343        let time_slot = key.timestamp.as_secs() / self.bucket_interval;
344        // Make sure the timestamp is normalized to the correct interval as well.
345        key.timestamp = UnixTimestamp::from_secs(time_slot * self.bucket_interval);
346
347        let now_slot = self.head / self.num_partitions;
348        if !self.slot_range.contains(now_slot, time_slot) {
349            return Err(AggregateMetricsError::InvalidTimestamp(key.timestamp));
350        }
351
352        let assigned_partition = match self.partition_by {
353            FlushBatching::None => 0,
354            FlushBatching::Project => self.hasher.hash_one(key.project_key),
355            FlushBatching::Bucket => self.hasher.hash_one(&key),
356            FlushBatching::Partition => {
357                self.hasher
358                    .hash_one((key.project_key, &key.metric_name, &key.tags))
359            }
360        } % self.num_partitions;
361
362        // Calculate the slot of the bucket based on it's time and shift it by its assigned partition.
363        let slot = time_slot * self.num_partitions + assigned_partition;
364        // Transform the slot to an offset/index into the ring-buffer, by calculating:
365        // `(slot - self.head).rem_euclid(self.slots.len())`.
366        let index = sub_rem_euclid(slot, self.head, self.slots.len() as u64);
367
368        let slot = self
369            .slots
370            .get_mut(index as usize)
371            .expect("index should always be a valid slot index");
372
373        debug_assert_eq!(
374            u64::from(slot.partition_key),
375            assigned_partition,
376            "assigned partition does not match selected partition"
377        );
378
379        let key_cost = key.cost() as u64;
380        match slot.buckets.entry(key) {
381            Entry::Occupied(occupied_entry) => {
382                let estimated_cost = match &value.value {
383                    // Counters and Gauges aggregate without additional costs.
384                    BucketValue::Counter(_) | BucketValue::Gauge(_) => 0,
385                    // Distributions are an accurate estimation, all values will be added.
386                    BucketValue::Distribution(d) => d.len() * mem::size_of::<DistributionType>(),
387                    // Sets are an upper bound.
388                    BucketValue::Set(s) => s.len() * mem::size_of::<SetType>(),
389                };
390
391                // Reserve for the upper bound of the value.
392                let reservation = slot.stats.reserve(
393                    &mut self.stats,
394                    project_key,
395                    namespace,
396                    estimated_cost as u64,
397                    &self.limits,
398                )?;
399
400                let actual_cost = occupied_entry.into_mut().merge(value)?;
401
402                // Track the actual cost increase, not just the reservation.
403                reservation.consume_with(actual_cost as u64);
404                slot.stats.incr_merges(namespace);
405            }
406            Entry::Vacant(vacant_entry) => {
407                let reservation = slot.stats.reserve(
408                    &mut self.stats,
409                    project_key,
410                    namespace,
411                    key_cost + value.value.cost() as u64,
412                    &self.limits,
413                )?;
414
415                vacant_entry.insert(value);
416
417                reservation.consume();
418                slot.stats.incr_count(&mut self.stats, namespace);
419            }
420        };
421
422        debug_assert_eq!(slot.stats.count, slot.buckets.len() as u64);
423
424        Ok(())
425    }
426
427    /// FLushes the next partition and advances time.
428    pub fn flush_next(&mut self) -> Partition {
429        let mut slot @ Slot { partition_key, .. } = self
430            .slots
431            .pop_front()
432            .expect("there should always be at least one partition");
433
434        let stats = PartitionStats::from(&slot.stats);
435
436        // Remove the partition cost from the total cost and reset the partition cost.
437        self.stats.remove_slot(&slot.stats);
438        slot.stats.reset();
439
440        // Create a new and empty slot with a similar capacity as the original one,
441        // but still shrinking a little bit so that over time we free up memory again
442        // without re-allocating too many times.
443        //
444        // Also re-use the original hasher, no need to create a new/expensive one.
445        self.slots.push_back(Slot {
446            buckets: HashMap::with_capacity_and_hasher(slot.buckets.len(), *slot.buckets.hasher()),
447            ..slot
448        });
449
450        // Increment to the next partition.
451        //
452        // Effectively advance time by `bucket_interval / num_partitions`.
453        self.head += 1;
454
455        Partition {
456            partition_key,
457            buckets: slot.buckets,
458            stats,
459        }
460    }
461
462    /// Consumes the aggregator and returns an iterator over all contained partitions.
463    pub fn into_partitions(self) -> impl Iterator<Item = Partition> {
464        self.slots.into_iter().map(|slot| Partition {
465            partition_key: slot.partition_key,
466            buckets: slot.buckets,
467            stats: PartitionStats::from(&slot.stats),
468        })
469    }
470}
471
472impl fmt::Debug for Inner {
473    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474        let mut list = f.debug_list();
475        list.entry(&self.stats);
476        for (i, v) in self.slots.iter().enumerate() {
477            let head_partitions = self.head % self.num_partitions;
478            let head_time = self.head / self.num_partitions;
479
480            let time_offset = (head_partitions + i as u64) / self.num_partitions;
481            let time = (head_time + time_offset) * self.bucket_interval;
482
483            match v.is_empty() {
484                // Make the output shorter with a string until `entry_with` is stable.
485                true => list.entry(&format!("({time}, {v:?})")),
486                false => list.entry(&(time, v)),
487            };
488        }
489        list.finish()
490    }
491}
492
493#[derive(Default)]
494struct Slot {
495    pub partition_key: u32,
496    pub stats: stats::Slot,
497    pub buckets: HashMap<BucketKey, BucketData>,
498}
499
500impl Slot {
501    fn is_empty(&self) -> bool {
502        self.stats == Default::default() && self.buckets.is_empty()
503    }
504}
505
506impl fmt::Debug for Slot {
507    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
508        if self.is_empty() {
509            write!(f, "Slot({})", self.partition_key)
510        } else {
511            #[cfg(test)]
512            let buckets = &self.buckets.iter().collect::<BTreeMap<_, _>>();
513            #[cfg(not(test))]
514            let buckets = &self.buckets;
515
516            f.debug_struct("Slot")
517                .field("partition_key", &self.partition_key)
518                .field("stats", &self.stats)
519                .field("buckets", buckets)
520                .finish()
521        }
522    }
523}
524
525struct RelativeRange {
526    max_in_past: u64,
527    max_in_future: u64,
528}
529
530impl RelativeRange {
531    fn contains(&self, now: u64, target: u64) -> bool {
532        if target < now {
533            // Timestamp/target in the past.
534            let diff = now - target;
535            diff <= self.max_in_past
536        } else {
537            // Timestamp/target in the future.
538            let diff = target - now;
539            diff <= self.max_in_future
540        }
541    }
542}
543
544/// Calculates `(a - b).rem_euclid(mod)` for `u64` values.
545///
546/// Since `a - b` can be negative, you naively need to temporarily change the number space from
547/// unsigned to signed and after the modulo back to unsigned.
548///
549/// This function instead operates purely with unsigned arithmetic and makes sure the subtraction
550/// is always positive.
551fn sub_rem_euclid(a: u64, b: u64, m: u64) -> u64 {
552    (
553        // Shift a by `m`: `[m, inf)`.
554        (a + m)
555        -
556        // Modulo b: `[0, m)`.
557        (b % m)
558    ) % m
559}
560
561fn build_hasher() -> RandomState {
562    // A fixed, consistent seed across all instances of Relay.
563    const K0: u64 = 0x06459b7d5da84ed8;
564    const K1: u64 = 0x3321ce2636c567cc;
565    const K2: u64 = 0x56c94d7107c49765;
566    const K3: u64 = 0x685bf5f9abbea5ab;
567
568    ahash::RandomState::with_seeds(K0, K1, K2, K3)
569}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574
575    fn bucket_key(ts: u64, name: &str) -> BucketKey {
576        BucketKey {
577            project_key: ProjectKey::parse("00000000000000000000000000000000").unwrap(),
578            timestamp: UnixTimestamp::from_secs(ts),
579            metric_name: name.into(),
580            tags: Default::default(),
581            extracted_from_indexed: false,
582        }
583    }
584
585    fn counter(value: f64) -> BucketData {
586        BucketData {
587            value: BucketValue::counter(value.try_into().unwrap()),
588            metadata: Default::default(),
589        }
590    }
591
592    #[test]
593    fn test_merge_flush() -> Result<(), AggregateMetricsError> {
594        let mut buckets = Inner::new(Config {
595            bucket_interval: 10,
596            num_time_slots: 6,
597            num_partitions: 2,
598            delay: 0,
599            max_secs_in_past: None,
600            max_secs_in_future: None,
601            max_total_bucket_bytes: None,
602            max_project_key_bucket_bytes: None,
603            start: UnixTimestamp::from_secs(70),
604            partition_by: FlushBatching::Partition,
605        });
606
607        // Within the time range.
608        buckets.merge(bucket_key(70, "a"), counter(1.0))?;
609        buckets.merge(bucket_key(80, "b"), counter(1.0))?;
610        buckets.merge(bucket_key(80, "b"), counter(2.0))?;
611        // Too early.
612        buckets.merge(bucket_key(32, "c"), counter(1.0))?;
613        buckets.merge(bucket_key(42, "d"), counter(1.0))?;
614        // Too late.
615        buckets.merge(bucket_key(171, "e"), counter(1.0))?;
616        buckets.merge(bucket_key(181, "f"), counter(1.0))?;
617        buckets.merge(bucket_key(191, "a"), counter(1.0))?;
618
619        insta::assert_debug_snapshot!(buckets);
620
621        let partition = buckets.flush_next();
622        insta::assert_debug_snapshot!(partition, @r###"
623        Partition {
624            partition_key: 0,
625            stats: PartitionStats {
626                count: 2,
627                count_by_namespace: (unsupported:2),
628                merges: 0,
629                merges_by_namespace: (0),
630                cost: 274,
631                cost_by_namespace: (unsupported:274),
632            },
633            buckets: {
634                70-00000000000000000000000000000000-a: Counter(
635                    1.0,
636                ),
637                190-00000000000000000000000000000000-a: Counter(
638                    1.0,
639                ),
640            },
641        }
642        "###);
643
644        // This was just flushed and now is supposed to be at the end.
645        buckets.merge(bucket_key(70, "a"), counter(1.0))?;
646
647        insta::assert_debug_snapshot!(buckets);
648
649        let partition = buckets.flush_next();
650        insta::assert_debug_snapshot!(partition, @r###"
651        Partition {
652            partition_key: 1,
653            stats: PartitionStats {
654                count: 0,
655                count_by_namespace: (0),
656                merges: 0,
657                merges_by_namespace: (0),
658                cost: 0,
659                cost_by_namespace: (0),
660            },
661            buckets: {},
662        }
663        "###);
664
665        insta::assert_debug_snapshot!(buckets);
666
667        insta::assert_debug_snapshot!(buckets.stats(), @r###"
668        Stats {
669            count: 6,
670            count_by_namespace: (unsupported:6),
671            cost: 6,
672            cost_by_namespace: (unsupported:822),
673        }
674        "###);
675
676        Ok(())
677    }
678
679    #[test]
680    fn test_merge_flush_project() -> Result<(), AggregateMetricsError> {
681        let mut buckets = Inner::new(Config {
682            bucket_interval: 10,
683            num_time_slots: 1,
684            num_partitions: 2,
685            delay: 0,
686            max_secs_in_past: None,
687            max_secs_in_future: None,
688            max_total_bucket_bytes: None,
689            max_project_key_bucket_bytes: None,
690            start: UnixTimestamp::from_secs(70),
691            partition_by: FlushBatching::Project,
692        });
693
694        for i in 0..1_000 {
695            for j in 0..10 {
696                let bucket = BucketKey {
697                    project_key: ProjectKey::parse(&format!("{i:0width$x}", width = 32)).unwrap(),
698                    ..bucket_key(70, &format!("b_{j}"))
699                };
700                buckets.merge(bucket, counter(1.0))?;
701            }
702        }
703
704        let mut by_project1 = HashMap::new();
705        for (key, _) in buckets.flush_next().buckets {
706            *by_project1.entry(key.project_key).or_insert(0u64) += 1;
707        }
708
709        let mut by_project2 = HashMap::new();
710        for (key, _) in buckets.flush_next().buckets {
711            *by_project2.entry(key.project_key).or_insert(0u64) += 1;
712        }
713
714        assert_eq!(by_project1.len() + by_project2.len(), 1_000);
715        // This assertion needs to be updated when the hashing changes.
716        assert_eq!(by_project1.len(), 509);
717
718        for (pk, v) in by_project1 {
719            // 10 buckets per project.
720            assert_eq!(v, 10);
721            // Make sure the entries do not overlap.
722            assert!(!by_project2.contains_key(&pk));
723        }
724        for (_, v) in by_project2 {
725            assert_eq!(v, 10);
726        }
727
728        Ok(())
729    }
730
731    #[test]
732    fn test_merge_flush_cost_limits() -> Result<(), AggregateMetricsError> {
733        const ONE_BUCKET_COST: u64 = 137;
734
735        let mut buckets = Inner::new(Config {
736            bucket_interval: 10,
737            num_time_slots: 3,
738            num_partitions: 1,
739            delay: 0,
740            max_secs_in_past: None,
741            max_secs_in_future: None,
742            max_total_bucket_bytes: Some(ONE_BUCKET_COST * 2),
743            // Enough for one bucket per partition.
744            max_project_key_bucket_bytes: Some(ONE_BUCKET_COST * 3),
745            start: UnixTimestamp::from_secs(70),
746            partition_by: FlushBatching::Partition,
747        });
748
749        buckets.merge(bucket_key(70, "a"), counter(1.0))?;
750        // Adding a new bucket exceeds the cost.
751        assert_eq!(
752            buckets
753                .merge(bucket_key(70, "b"), counter(1.0))
754                .unwrap_err(),
755            AggregateMetricsError::ProjectLimitExceeded
756        );
757        // Merging a counter bucket is fine.
758        buckets.merge(bucket_key(70, "a"), counter(2.0))?;
759
760        // There is still room in the total budget and for a different project.
761        let other_project = BucketKey {
762            project_key: ProjectKey::parse("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap(),
763            ..bucket_key(70, "a")
764        };
765        buckets.merge(other_project, counter(3.0))?;
766
767        // Add a bucket to a different partition, but the total limit is exceeded.
768        assert_eq!(
769            buckets
770                .merge(bucket_key(80, "c"), counter(1.0))
771                .unwrap_err(),
772            AggregateMetricsError::TotalLimitExceeded
773        );
774        // Flush some data and make space.
775        insta::assert_debug_snapshot!(buckets.flush_next(), @r###"
776        Partition {
777            partition_key: 0,
778            stats: PartitionStats {
779                count: 2,
780                count_by_namespace: (unsupported:2),
781                merges: 1,
782                merges_by_namespace: (unsupported:1),
783                cost: 274,
784                cost_by_namespace: (unsupported:274),
785            },
786            buckets: {
787                70-00000000000000000000000000000000-a: Counter(
788                    3.0,
789                ),
790                70-bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb-a: Counter(
791                    3.0,
792                ),
793            },
794        }
795        "###);
796        buckets.merge(bucket_key(80, "c"), counter(1.0))?;
797
798        insta::assert_debug_snapshot!(buckets, @r###"
799        [
800            Total {
801                count: 1,
802                count_by_namespace: (unsupported:1),
803                cost: 137,
804                cost_by_namespace: (unsupported:137),
805            },
806            (
807                80,
808                Slot {
809                    partition_key: 0,
810                    stats: Slot {
811                        count: 1,
812                        count_by_namespace: (unsupported:1),
813                        merges: 0,
814                        merges_by_namespace: (0),
815                        cost: 137,
816                        cost_by_namespace: (unsupported:137),
817                        cost_by_project: {
818                            ProjectKey("00000000000000000000000000000000"): 137,
819                        },
820                    },
821                    buckets: {
822                        80-00000000000000000000000000000000-c: Counter(
823                            1.0,
824                        ),
825                    },
826                },
827            ),
828            "(90, Slot(0))",
829            "(100, Slot(0))",
830        ]
831        "###);
832
833        insta::assert_debug_snapshot!(buckets.stats(), @r###"
834        Stats {
835            count: 1,
836            count_by_namespace: (unsupported:1),
837            cost: 1,
838            cost_by_namespace: (unsupported:137),
839        }
840        "###);
841
842        Ok(())
843    }
844
845    #[test]
846    fn test_merge_flush_with_delay() {
847        let mut buckets = Inner::new(Config {
848            // 20 seconds.
849            bucket_interval: 20,
850            // Slots for 1 minute.
851            num_time_slots: 3,
852            // A partition for every 10 seconds.
853            num_partitions: 2,
854            // 20 second delay -> 1 extra time slot.
855            delay: 20,
856            max_total_bucket_bytes: None,
857            max_project_key_bucket_bytes: None,
858            max_secs_in_past: None,
859            max_secs_in_future: None,
860            // Truncated to 60 seconds.
861            start: UnixTimestamp::from_secs(63),
862            partition_by: FlushBatching::Partition,
863        });
864
865        // Add a bucket now -> should be flushed 30 seconds in the future.
866        buckets.merge(bucket_key(60, "a"), counter(1.0)).unwrap();
867        // Add a bucket 1 minute into the future, this one should still have a delay.
868        buckets.merge(bucket_key(120, "b"), counter(2.0)).unwrap();
869
870        // First flush is in 20 seconds + 10 seconds for the end of the partition.
871        assert_eq!(buckets.next_flush_at(), Duration::from_secs(90));
872        insta::assert_debug_snapshot!(buckets.flush_next(), @r###"
873        Partition {
874            partition_key: 0,
875            stats: PartitionStats {
876                count: 1,
877                count_by_namespace: (unsupported:1),
878                merges: 0,
879                merges_by_namespace: (0),
880                cost: 137,
881                cost_by_namespace: (unsupported:137),
882            },
883            buckets: {
884                60-00000000000000000000000000000000-a: Counter(
885                    1.0,
886                ),
887            },
888        }
889        "###);
890        assert!(buckets.flush_next().buckets.is_empty());
891
892        // We're now at second 100s.
893        assert_eq!(buckets.next_flush_at(), Duration::from_secs(110));
894        assert!(buckets.flush_next().buckets.is_empty());
895        assert!(buckets.flush_next().buckets.is_empty());
896
897        // We're now at second 120s.
898        assert_eq!(buckets.next_flush_at(), Duration::from_secs(130));
899        assert!(buckets.flush_next().buckets.is_empty());
900        assert!(buckets.flush_next().buckets.is_empty());
901
902        // We're now at second 140s -> our second bucket is ready (120s + 20s delay).
903        assert_eq!(buckets.next_flush_at(), Duration::from_secs(150));
904        insta::assert_debug_snapshot!(buckets.flush_next(), @r###"
905        Partition {
906            partition_key: 0,
907            stats: PartitionStats {
908                count: 1,
909                count_by_namespace: (unsupported:1),
910                merges: 0,
911                merges_by_namespace: (0),
912                cost: 137,
913                cost_by_namespace: (unsupported:137),
914            },
915            buckets: {
916                120-00000000000000000000000000000000-b: Counter(
917                    2.0,
918                ),
919            },
920        }
921        "###);
922        assert!(buckets.flush_next().buckets.is_empty());
923
924        // We're now at 160s.
925        assert_eq!(buckets.next_flush_at(), Duration::from_secs(170));
926    }
927
928    #[test]
929    fn test_next_flush() {
930        let mut buckets = Inner::new(Config {
931            bucket_interval: 10,
932            num_time_slots: 6,
933            num_partitions: 2,
934            delay: 0,
935            max_secs_in_past: None,
936            max_secs_in_future: None,
937            max_total_bucket_bytes: None,
938            max_project_key_bucket_bytes: None,
939            start: UnixTimestamp::from_secs(70),
940            partition_by: FlushBatching::Partition,
941        });
942
943        assert_eq!(buckets.next_flush_at(), Duration::from_secs(75));
944        assert_eq!(buckets.flush_next().partition_key, 0);
945        assert_eq!(buckets.next_flush_at(), Duration::from_secs(80));
946        assert_eq!(buckets.flush_next().partition_key, 1);
947        assert_eq!(buckets.next_flush_at(), Duration::from_secs(85));
948        assert_eq!(buckets.flush_next().partition_key, 0);
949        assert_eq!(buckets.next_flush_at(), Duration::from_secs(90));
950        assert_eq!(buckets.next_flush_at(), Duration::from_secs(90));
951    }
952
953    #[test]
954    fn test_next_flush_with_delay() {
955        let mut buckets = Inner::new(Config {
956            bucket_interval: 10,
957            num_time_slots: 6,
958            num_partitions: 2,
959            delay: 3,
960            max_secs_in_past: None,
961            max_secs_in_future: None,
962            max_total_bucket_bytes: None,
963            max_project_key_bucket_bytes: None,
964            start: UnixTimestamp::from_secs(70),
965            partition_by: FlushBatching::Partition,
966        });
967
968        assert_eq!(buckets.next_flush_at(), Duration::from_secs(78));
969        assert_eq!(buckets.flush_next().partition_key, 0);
970        assert_eq!(buckets.next_flush_at(), Duration::from_secs(83));
971        assert_eq!(buckets.flush_next().partition_key, 1);
972        assert_eq!(buckets.next_flush_at(), Duration::from_secs(88));
973        assert_eq!(buckets.flush_next().partition_key, 0);
974        assert_eq!(buckets.next_flush_at(), Duration::from_secs(93));
975        assert_eq!(buckets.next_flush_at(), Duration::from_secs(93));
976    }
977
978    #[test]
979    fn test_merge_flush_time_limits() -> Result<(), AggregateMetricsError> {
980        let mut buckets = Inner::new(Config {
981            bucket_interval: 10,
982            num_time_slots: 6,
983            num_partitions: 2,
984            delay: 0,
985            max_secs_in_past: Some(33), // -> Upgraded to 4 slots (40 seconds).
986            max_secs_in_future: Some(22), // -> Upgraded to 3 slots (30 seconds).
987            max_total_bucket_bytes: None,
988            max_project_key_bucket_bytes: None,
989            start: UnixTimestamp::from_secs(70),
990            partition_by: FlushBatching::Partition,
991        });
992
993        buckets.merge(bucket_key(70, "a"), counter(1.0))?;
994
995        // 4 slots in the past.
996        buckets.merge(bucket_key(60, "a"), counter(1.0))?;
997        buckets.merge(bucket_key(50, "a"), counter(1.0))?;
998        buckets.merge(bucket_key(40, "a"), counter(1.0))?;
999        buckets.merge(bucket_key(30, "a"), counter(1.0))?;
1000        assert_eq!(
1001            buckets
1002                .merge(bucket_key(29, "a"), counter(1.0))
1003                .unwrap_err(),
1004            AggregateMetricsError::InvalidTimestamp(UnixTimestamp::from_secs(20))
1005        );
1006
1007        // 3 slots in the future.
1008        buckets.merge(bucket_key(80, "a"), counter(1.0))?;
1009        buckets.merge(bucket_key(90, "a"), counter(1.0))?;
1010        buckets.merge(bucket_key(109, "a"), counter(1.0))?;
1011        assert_eq!(
1012            buckets
1013                .merge(bucket_key(110, "a"), counter(1.0))
1014                .unwrap_err(),
1015            AggregateMetricsError::InvalidTimestamp(UnixTimestamp::from_secs(110))
1016        );
1017
1018        Ok(())
1019    }
1020
1021    #[test]
1022    fn test_sub_rem_euclid() {
1023        for (head, slot, expected) in [
1024            // head == slot
1025            (253, 253, 0),
1026            // head < slot
1027            (253, 273, 0),
1028            (253, 274, 1),
1029            (253, 275, 2),
1030            (253, 276, 3),
1031            (253, 277, 4),
1032            // head > slot
1033            (253, 233, 0),
1034            (253, 234, 1),
1035            (253, 235, 2),
1036            (253, 236, 3),
1037            (253, 237, 4),
1038        ] {
1039            assert_eq!(sub_rem_euclid(slot, head, 5), expected);
1040        }
1041    }
1042}