relay_metrics/aggregator/
stats.rs1use hashbrown::HashMap;
2use relay_base_schema::metrics::MetricNamespace;
3use relay_base_schema::project::ProjectKey;
4
5use crate::aggregator::AggregateMetricsError;
6use crate::utils::ByNamespace;
7
8#[derive(Default, Debug)]
10pub struct Total {
11 pub count: u64,
13 pub count_by_namespace: ByNamespace<u64>,
15
16 pub cost: u64,
18 pub cost_by_namespace: ByNamespace<u64>,
20}
21
22impl Total {
23 pub fn remove_slot(&mut self, slot: &Slot) {
24 let Self {
25 count,
26 count_by_namespace,
27 cost,
28 cost_by_namespace,
29 } = self;
30
31 *count -= slot.count;
32 *count_by_namespace -= slot.count_by_namespace;
33 *cost -= slot.cost;
34 *cost_by_namespace -= slot.cost_by_namespace;
35 }
36}
37
38#[derive(Default, Debug, PartialEq, Eq)]
40pub struct Slot {
41 pub count: u64,
43 pub count_by_namespace: ByNamespace<u64>,
45 pub merges: u64,
47 pub merges_by_namespace: ByNamespace<u64>,
49
50 pub cost: u64,
52 pub cost_by_namespace: ByNamespace<u64>,
54 pub cost_by_project: HashMap<ProjectKey, u64>,
56}
57
58impl Slot {
59 pub fn reset(&mut self) {
61 let Self {
62 count,
63 count_by_namespace,
64 merges,
65 merges_by_namespace,
66 cost,
67 cost_by_namespace,
68 cost_by_project,
69 } = self;
70
71 *count = 0;
72 *count_by_namespace = Default::default();
73 *merges = 0;
74 *merges_by_namespace = Default::default();
75
76 *cost = 0;
77 *cost_by_namespace = Default::default();
78 cost_by_project.shrink_to_fit();
80 cost_by_project.clear();
81 }
82
83 pub fn incr_count(&mut self, total: &mut Total, namespace: MetricNamespace) {
85 self.count += 1;
86 *self.count_by_namespace.get_mut(namespace) += 1;
87 total.count += 1;
88 *total.count_by_namespace.get_mut(namespace) += 1;
89 }
90
91 pub fn incr_merges(&mut self, namespace: MetricNamespace) {
93 self.merges += 1;
94 *self.merges_by_namespace.get_mut(namespace) += 1;
95 }
96
97 pub fn reserve<'a>(
101 &'a mut self,
102 total: &'a mut Total,
103 project_key: ProjectKey,
104 namespace: MetricNamespace,
105 cost: u64,
106 limits: &Limits,
107 ) -> Result<Reservation<'a>, AggregateMetricsError> {
108 if total.cost + cost > limits.max_total {
109 return Err(AggregateMetricsError::TotalLimitExceeded);
110 }
111 let project = self.cost_by_project.entry(project_key).or_insert(0);
112 if *project + cost > limits.max_partition_project {
113 return Err(AggregateMetricsError::ProjectLimitExceeded);
114 }
115
116 Ok(Reservation {
117 total,
118 cost_partition: &mut self.cost,
119 cost_partition_by_namespace: &mut self.cost_by_namespace,
120 cost_partition_project: project,
121 namespace,
122 reserved: cost,
123 })
124 }
125}
126
127#[derive(Debug, Clone, Copy)]
128pub struct Limits {
129 pub max_total: u64,
130 pub max_partition_project: u64,
131}
132
133#[must_use = "a reservation does nothing if it is not used"]
134pub struct Reservation<'a> {
135 total: &'a mut Total,
136
137 cost_partition: &'a mut u64,
138 cost_partition_by_namespace: &'a mut ByNamespace<u64>,
139 cost_partition_project: &'a mut u64,
140
141 namespace: MetricNamespace,
142
143 reserved: u64,
144}
145
146impl Reservation<'_> {
147 pub fn consume(self) {
148 let reserved = self.reserved;
149 self.consume_with(reserved);
150 }
151
152 pub fn consume_with(self, cost: u64) {
153 debug_assert!(cost <= self.reserved, "less reserved than used");
154 self.total.cost += cost;
156 *self.total.cost_by_namespace.get_mut(self.namespace) += cost;
157
158 *self.cost_partition += cost;
160 *self.cost_partition_by_namespace.get_mut(self.namespace) += cost;
161 *self.cost_partition_project += cost;
162 }
163}