relay_metrics/aggregator/
mod.rs1use std::time::{Duration, SystemTime};
4
5use hashbrown::HashMap;
6use relay_base_schema::metrics::MetricNamespace;
7use relay_base_schema::project::ProjectKey;
8use relay_common::time::UnixTimestamp;
9
10use crate::Bucket;
11use crate::statsd::{MetricCounters, MetricGauges};
12
13mod config;
14mod inner;
15mod stats;
16
17pub use self::config::*;
18use self::inner::{BucketData, BucketKey};
19
20const DEFAULT_PARTITIONS_PER_SECOND: u32 = 64;
22
23#[derive(Debug, thiserror::Error, PartialEq)]
25pub enum AggregateMetricsError {
26    #[error("found incompatible metric types")]
28    InvalidTypes,
29    #[error("total metrics limit exceeded")]
31    TotalLimitExceeded,
32    #[error("project metrics limit exceeded")]
34    ProjectLimitExceeded,
35    #[error("the timestamp '{0}' is outside the maximum allowed time range")]
37    InvalidTimestamp(UnixTimestamp),
38}
39
40#[derive(Debug)]
61pub struct Aggregator {
62    name: String,
63    inner: inner::Inner,
64}
65
66impl Aggregator {
67    pub fn named(name: String, config: &AggregatorConfig) -> Self {
69        let num_partitions = match config.flush_batching {
70            FlushBatching::Project => config.flush_partitions,
71            FlushBatching::Bucket => config.flush_partitions,
72            FlushBatching::Partition => config.flush_partitions,
73            FlushBatching::None => Some(0),
74        }
75        .unwrap_or(DEFAULT_PARTITIONS_PER_SECOND * config.bucket_interval.max(1));
76
77        Self {
78            name,
79            inner: inner::Inner::new(inner::Config {
80                start: UnixTimestamp::now(),
81                bucket_interval: config.bucket_interval,
82                num_time_slots: config.aggregator_size,
83                num_partitions,
84                delay: config.initial_delay,
85                max_total_bucket_bytes: config.max_total_bucket_bytes,
86                max_project_key_bucket_bytes: config.max_project_key_bucket_bytes,
87                max_secs_in_past: Some(config.max_secs_in_past),
88                max_secs_in_future: Some(config.max_secs_in_future),
89                partition_by: config.flush_batching,
90            }),
91        }
92    }
93
94    pub fn name(&self) -> &str {
96        &self.name
97    }
98
99    pub fn is_empty(&self) -> bool {
101        self.inner.is_empty()
102    }
103
104    pub fn merge(
106        &mut self,
107        project_key: ProjectKey,
108        bucket: Bucket,
109    ) -> Result<(), AggregateMetricsError> {
110        let key = BucketKey {
111            project_key,
112            timestamp: bucket.timestamp,
113            metric_name: bucket.name,
114            tags: bucket.tags,
115            extracted_from_indexed: bucket.metadata.extracted_from_indexed,
116        };
117
118        let value = BucketData {
119            value: bucket.value,
120            metadata: bucket.metadata,
121        };
122
123        self.inner.merge(key, value)
124    }
125
126    pub fn try_flush_next(&mut self, now: SystemTime) -> Result<Partition, Duration> {
132        let next_flush = SystemTime::UNIX_EPOCH + self.inner.next_flush_at();
133
134        if let Err(err) = now.duration_since(next_flush) {
135            return Err(err.duration());
137        }
138
139        emit_stats(&self.name, self.inner.stats());
141
142        let partition = self.inner.flush_next();
143        emit_flush_partition_stats(&self.name, partition.stats);
144
145        Ok(Partition {
146            partition_key: partition.partition_key,
147            buckets: partition.buckets,
148            bucket_interval: self.inner.bucket_interval(),
149        })
150    }
151
152    pub fn next_flush_at(&mut self, now: SystemTime) -> Duration {
154        let next_flush = SystemTime::UNIX_EPOCH + self.inner.next_flush_at();
155
156        match now.duration_since(next_flush) {
157            Ok(_) => Duration::ZERO,
158            Err(err) => err.duration(),
159        }
160    }
161
162    pub fn into_partitions(self) -> impl Iterator<Item = Partition> {
164        let bucket_interval = self.inner.bucket_interval();
165
166        emit_stats(&self.name, self.inner.stats());
167
168        self.inner.into_partitions().map(move |p| Partition {
169            partition_key: p.partition_key,
170            buckets: p.buckets,
171            bucket_interval,
172        })
173    }
174}
175
176pub struct Partition {
180    pub partition_key: u32,
182    buckets: HashMap<BucketKey, BucketData>,
183    bucket_interval: u64,
184}
185
186impl IntoIterator for Partition {
187    type Item = (ProjectKey, Bucket);
188    type IntoIter = PartitionIter;
189
190    fn into_iter(self) -> Self::IntoIter {
191        PartitionIter {
192            inner: self.buckets.into_iter(),
193            bucket_interval: self.bucket_interval,
194        }
195    }
196}
197
198pub struct PartitionIter {
200    inner: hashbrown::hash_map::IntoIter<BucketKey, BucketData>,
201    bucket_interval: u64,
202}
203
204impl Iterator for PartitionIter {
205    type Item = (ProjectKey, Bucket);
206
207    fn next(&mut self) -> Option<Self::Item> {
208        let (key, data) = self.inner.next()?;
209
210        Some((
211            key.project_key,
212            Bucket {
213                timestamp: key.timestamp,
214                width: self.bucket_interval,
215                name: key.metric_name,
216                tags: key.tags,
217                value: data.value,
218                metadata: data.metadata,
219            },
220        ))
221    }
222
223    fn size_hint(&self) -> (usize, Option<usize>) {
224        self.inner.size_hint()
225    }
226}
227
228impl std::iter::ExactSizeIterator for PartitionIter {
229    fn len(&self) -> usize {
230        self.inner.len()
231    }
232}
233
234impl std::iter::FusedIterator for PartitionIter {}
235
236fn emit_stats(name: &str, stats: inner::Stats) {
237    for namespace in MetricNamespace::all() {
238        relay_statsd::metric!(
239            gauge(MetricGauges::Buckets) = *stats.count_by_namespace.get(namespace),
240            namespace = namespace.as_str(),
241            aggregator = name
242        );
243        relay_statsd::metric!(
244            gauge(MetricGauges::BucketsCost) = *stats.cost_by_namespace.get(namespace),
245            namespace = namespace.as_str(),
246            aggregator = name
247        );
248    }
249}
250
251fn emit_flush_partition_stats(name: &str, stats: inner::PartitionStats) {
252    relay_statsd::metric!(counter(MetricCounters::FlushCount) += 1, aggregator = name);
253
254    for namespace in MetricNamespace::all() {
255        relay_statsd::metric!(
256            counter(MetricCounters::MergeMiss) += *stats.count_by_namespace.get(namespace),
257            namespace = namespace.as_str(),
258            aggregator = name,
259        );
260        relay_statsd::metric!(
261            counter(MetricCounters::MergeHit) += *stats.merges_by_namespace.get(namespace),
262            namespace = namespace.as_str(),
263            aggregator = name,
264        );
265        relay_statsd::metric!(
266            counter(MetricCounters::FlushCost) += *stats.cost_by_namespace.get(namespace),
267            namespace = namespace.as_str(),
268            aggregator = name,
269        );
270    }
271}