relay_metrics/aggregator/
mod.rs1use std::time::{Duration, SystemTime};
4
5use hashbrown::HashMap;
6use relay_base_schema::metrics::MetricNamespace;
7use relay_base_schema::project::ProjectKey;
8use relay_common::time::UnixTimestamp;
9
10use crate::statsd::{MetricCounters, MetricGauges};
11use crate::Bucket;
12
13mod config;
14mod inner;
15mod stats;
16
17pub use self::config::*;
18use self::inner::{BucketData, BucketKey};
19
20const DEFAULT_PARTITIONS_PER_SECOND: u32 = 64;
22
23#[derive(Debug, thiserror::Error, PartialEq)]
25pub enum AggregateMetricsError {
26 #[error("found incompatible metric types")]
28 InvalidTypes,
29 #[error("total metrics limit exceeded")]
31 TotalLimitExceeded,
32 #[error("project metrics limit exceeded")]
34 ProjectLimitExceeded,
35 #[error("the timestamp '{0}' is outside the maximum allowed time range")]
37 InvalidTimestamp(UnixTimestamp),
38}
39
40#[derive(Debug)]
61pub struct Aggregator {
62 name: String,
63 inner: inner::Inner,
64}
65
66impl Aggregator {
67 pub fn named(name: String, config: &AggregatorConfig) -> Self {
69 let num_partitions = match config.flush_batching {
70 FlushBatching::Project => config.flush_partitions,
71 FlushBatching::Bucket => config.flush_partitions,
72 FlushBatching::Partition => config.flush_partitions,
73 FlushBatching::None => Some(0),
74 }
75 .unwrap_or(DEFAULT_PARTITIONS_PER_SECOND * config.bucket_interval.max(1));
76
77 Self {
78 name,
79 inner: inner::Inner::new(inner::Config {
80 start: UnixTimestamp::now(),
81 bucket_interval: config.bucket_interval,
82 num_time_slots: config.aggregator_size,
83 num_partitions,
84 delay: config.initial_delay,
85 max_total_bucket_bytes: config.max_total_bucket_bytes,
86 max_project_key_bucket_bytes: config.max_project_key_bucket_bytes,
87 max_secs_in_past: Some(config.max_secs_in_past),
88 max_secs_in_future: Some(config.max_secs_in_future),
89 partition_by: config.flush_batching,
90 }),
91 }
92 }
93
94 pub fn name(&self) -> &str {
96 &self.name
97 }
98
99 pub fn is_empty(&self) -> bool {
101 self.inner.is_empty()
102 }
103
104 pub fn merge(
106 &mut self,
107 project_key: ProjectKey,
108 bucket: Bucket,
109 ) -> Result<(), AggregateMetricsError> {
110 let key = BucketKey {
111 project_key,
112 timestamp: bucket.timestamp,
113 metric_name: bucket.name,
114 tags: bucket.tags,
115 extracted_from_indexed: bucket.metadata.extracted_from_indexed,
116 };
117
118 let value = BucketData {
119 value: bucket.value,
120 metadata: bucket.metadata,
121 };
122
123 self.inner.merge(key, value)
124 }
125
126 pub fn try_flush_next(&mut self, now: SystemTime) -> Result<Partition, Duration> {
132 let next_flush = SystemTime::UNIX_EPOCH + self.inner.next_flush_at();
133
134 if let Err(err) = now.duration_since(next_flush) {
135 return Err(err.duration());
137 }
138
139 emit_stats(&self.name, self.inner.stats());
141
142 let partition = self.inner.flush_next();
143 emit_flush_partition_stats(&self.name, partition.stats);
144
145 Ok(Partition {
146 partition_key: partition.partition_key,
147 buckets: partition.buckets,
148 bucket_interval: self.inner.bucket_interval(),
149 })
150 }
151
152 pub fn next_flush_at(&mut self, now: SystemTime) -> Duration {
154 let next_flush = SystemTime::UNIX_EPOCH + self.inner.next_flush_at();
155
156 match now.duration_since(next_flush) {
157 Ok(_) => Duration::ZERO,
158 Err(err) => err.duration(),
159 }
160 }
161
162 pub fn into_partitions(self) -> impl Iterator<Item = Partition> {
164 let bucket_interval = self.inner.bucket_interval();
165
166 emit_stats(&self.name, self.inner.stats());
167
168 self.inner.into_partitions().map(move |p| Partition {
169 partition_key: p.partition_key,
170 buckets: p.buckets,
171 bucket_interval,
172 })
173 }
174}
175
176pub struct Partition {
180 pub partition_key: u32,
182 buckets: HashMap<BucketKey, BucketData>,
183 bucket_interval: u64,
184}
185
186impl IntoIterator for Partition {
187 type Item = (ProjectKey, Bucket);
188 type IntoIter = PartitionIter;
189
190 fn into_iter(self) -> Self::IntoIter {
191 PartitionIter {
192 inner: self.buckets.into_iter(),
193 bucket_interval: self.bucket_interval,
194 }
195 }
196}
197
198pub struct PartitionIter {
200 inner: hashbrown::hash_map::IntoIter<BucketKey, BucketData>,
201 bucket_interval: u64,
202}
203
204impl Iterator for PartitionIter {
205 type Item = (ProjectKey, Bucket);
206
207 fn next(&mut self) -> Option<Self::Item> {
208 let (key, data) = self.inner.next()?;
209
210 Some((
211 key.project_key,
212 Bucket {
213 timestamp: key.timestamp,
214 width: self.bucket_interval,
215 name: key.metric_name,
216 tags: key.tags,
217 value: data.value,
218 metadata: data.metadata,
219 },
220 ))
221 }
222
223 fn size_hint(&self) -> (usize, Option<usize>) {
224 self.inner.size_hint()
225 }
226}
227
228impl std::iter::ExactSizeIterator for PartitionIter {
229 fn len(&self) -> usize {
230 self.inner.len()
231 }
232}
233
234impl std::iter::FusedIterator for PartitionIter {}
235
236fn emit_stats(name: &str, stats: inner::Stats) {
237 for namespace in MetricNamespace::all() {
238 relay_statsd::metric!(
239 gauge(MetricGauges::Buckets) = *stats.count_by_namespace.get(namespace),
240 namespace = namespace.as_str(),
241 aggregator = name
242 );
243 relay_statsd::metric!(
244 gauge(MetricGauges::BucketsCost) = *stats.cost_by_namespace.get(namespace),
245 namespace = namespace.as_str(),
246 aggregator = name
247 );
248 }
249}
250
251fn emit_flush_partition_stats(name: &str, stats: inner::PartitionStats) {
252 relay_statsd::metric!(counter(MetricCounters::FlushCount) += 1, aggregator = name);
253
254 for namespace in MetricNamespace::all() {
255 relay_statsd::metric!(
256 counter(MetricCounters::MergeMiss) += *stats.count_by_namespace.get(namespace),
257 namespace = namespace.as_str(),
258 aggregator = name,
259 );
260 relay_statsd::metric!(
261 counter(MetricCounters::MergeHit) += *stats.merges_by_namespace.get(namespace),
262 namespace = namespace.as_str(),
263 aggregator = name,
264 );
265 relay_statsd::metric!(
266 counter(MetricCounters::FlushCost) += *stats.cost_by_namespace.get(namespace),
267 namespace = namespace.as_str(),
268 aggregator = name,
269 );
270 }
271}