relay_metrics/aggregator/
stats.rs

1use hashbrown::HashMap;
2use relay_base_schema::metrics::MetricNamespace;
3use relay_base_schema::project::ProjectKey;
4
5use crate::aggregator::AggregateMetricsError;
6use crate::utils::ByNamespace;
7
8/// Total stats tracked for the aggregator.
9#[derive(Default, Debug)]
10pub struct Total {
11    /// Total amount of buckets in the aggregator.
12    pub count: u64,
13    /// Total amount of buckets in the aggregator by namespace.
14    pub count_by_namespace: ByNamespace<u64>,
15
16    /// Total cost of buckets in the aggregator.
17    pub cost: u64,
18    /// Total cost of buckets in the aggregator by namespace.
19    pub cost_by_namespace: ByNamespace<u64>,
20}
21
22impl Total {
23    pub fn remove_slot(&mut self, slot: &Slot) {
24        let Self {
25            count,
26            count_by_namespace,
27            cost,
28            cost_by_namespace,
29        } = self;
30
31        *count -= slot.count;
32        *count_by_namespace -= slot.count_by_namespace;
33        *cost -= slot.cost;
34        *cost_by_namespace -= slot.cost_by_namespace;
35    }
36}
37
38/// Stats tracked by slot in the aggregator.
39#[derive(Default, Debug, PartialEq, Eq)]
40pub struct Slot {
41    /// Amount of buckets created in this slot.
42    pub count: u64,
43    /// Amount of buckets created in this slot by namespace.
44    pub count_by_namespace: ByNamespace<u64>,
45    /// Amount of merges happened in this slot.
46    pub merges: u64,
47    /// Amount of merges happened in this slot by namespace.
48    pub merges_by_namespace: ByNamespace<u64>,
49
50    /// Cost of buckets in this slot.
51    pub cost: u64,
52    /// Cost of buckets in this slot by namespace.
53    pub cost_by_namespace: ByNamespace<u64>,
54    /// Cost of buckets in this slot by project.
55    pub cost_by_project: HashMap<ProjectKey, u64>,
56}
57
58impl Slot {
59    /// Resets the slot to its initial empty state.
60    pub fn reset(&mut self) {
61        let Self {
62            count,
63            count_by_namespace,
64            merges,
65            merges_by_namespace,
66            cost,
67            cost_by_namespace,
68            cost_by_project,
69        } = self;
70
71        *count = 0;
72        *count_by_namespace = Default::default();
73        *merges = 0;
74        *merges_by_namespace = Default::default();
75
76        *cost = 0;
77        *cost_by_namespace = Default::default();
78        // Keep the allocation around but at the same time make it possible for it to shrink.
79        cost_by_project.shrink_to_fit();
80        cost_by_project.clear();
81    }
82
83    /// Increments the count by one.
84    pub fn incr_count(&mut self, total: &mut Total, namespace: MetricNamespace) {
85        self.count += 1;
86        *self.count_by_namespace.get_mut(namespace) += 1;
87        total.count += 1;
88        *total.count_by_namespace.get_mut(namespace) += 1;
89    }
90
91    /// Increments the amount of merges in the slot by one.
92    pub fn incr_merges(&mut self, namespace: MetricNamespace) {
93        self.merges += 1;
94        *self.merges_by_namespace.get_mut(namespace) += 1;
95    }
96
97    /// Tries to reserve a certain amount of cost.
98    ///
99    /// Returns an error if there is not enough budget left.
100    pub fn reserve<'a>(
101        &'a mut self,
102        total: &'a mut Total,
103        project_key: ProjectKey,
104        namespace: MetricNamespace,
105        cost: u64,
106        limits: &Limits,
107    ) -> Result<Reservation<'a>, AggregateMetricsError> {
108        if total.cost + cost > limits.max_total {
109            return Err(AggregateMetricsError::TotalLimitExceeded);
110        }
111        let project = self.cost_by_project.entry(project_key).or_insert(0);
112        if *project + cost > limits.max_partition_project {
113            return Err(AggregateMetricsError::ProjectLimitExceeded);
114        }
115
116        Ok(Reservation {
117            total,
118            cost_partition: &mut self.cost,
119            cost_partition_by_namespace: &mut self.cost_by_namespace,
120            cost_partition_project: project,
121            namespace,
122            reserved: cost,
123        })
124    }
125}
126
127#[derive(Debug, Clone, Copy)]
128pub struct Limits {
129    pub max_total: u64,
130    pub max_partition_project: u64,
131}
132
133#[must_use = "a reservation does nothing if it is not used"]
134pub struct Reservation<'a> {
135    total: &'a mut Total,
136
137    cost_partition: &'a mut u64,
138    cost_partition_by_namespace: &'a mut ByNamespace<u64>,
139    cost_partition_project: &'a mut u64,
140
141    namespace: MetricNamespace,
142
143    reserved: u64,
144}
145
146impl Reservation<'_> {
147    pub fn consume(self) {
148        let reserved = self.reserved;
149        self.consume_with(reserved);
150    }
151
152    pub fn consume_with(self, cost: u64) {
153        debug_assert!(cost <= self.reserved, "less reserved than used");
154        // Update total costs.
155        self.total.cost += cost;
156        *self.total.cost_by_namespace.get_mut(self.namespace) += cost;
157
158        // Update all partition costs.
159        *self.cost_partition += cost;
160        *self.cost_partition_by_namespace.get_mut(self.namespace) += cost;
161        *self.cost_partition_project += cost;
162    }
163}