relay_metrics/aggregator/
mod.rs

1//! Core functionality of metrics aggregation.
2
3use std::time::{Duration, SystemTime};
4
5use hashbrown::HashMap;
6use relay_base_schema::metrics::MetricNamespace;
7use relay_base_schema::project::ProjectKey;
8use relay_common::time::UnixTimestamp;
9
10use crate::statsd::{MetricCounters, MetricGauges};
11use crate::Bucket;
12
13mod config;
14mod inner;
15mod stats;
16
17pub use self::config::*;
18use self::inner::{BucketData, BucketKey};
19
20/// Default amount of partitions per second when there are no partitions configured.
21const DEFAULT_PARTITIONS_PER_SECOND: u32 = 64;
22
23/// Any error that may occur during aggregation.
24#[derive(Debug, thiserror::Error, PartialEq)]
25pub enum AggregateMetricsError {
26    /// Internal error: Attempted to merge two metric buckets of different types.
27    #[error("found incompatible metric types")]
28    InvalidTypes,
29    /// A metric bucket is too large for the global bytes limit.
30    #[error("total metrics limit exceeded")]
31    TotalLimitExceeded,
32    /// A metric bucket is too large for the per-project bytes limit.
33    #[error("project metrics limit exceeded")]
34    ProjectLimitExceeded,
35    /// The timestamp is outside the maximum allowed time range.
36    #[error("the timestamp '{0}' is outside the maximum allowed time range")]
37    InvalidTimestamp(UnixTimestamp),
38}
39
40/// A collector of [`Bucket`] submissions.
41///
42/// # Aggregation
43///
44/// Each metric is dispatched into the a [`Bucket`] depending on its project key (DSN), name, type,
45/// unit, tags and timestamp. The bucket timestamp is rounded to the precision declared by the
46/// `bucket_interval` field on the [AggregatorConfig] configuration.
47///
48/// Each bucket stores the accumulated value of submitted metrics:
49///
50/// - `Counter`: Sum of values.
51/// - `Distribution`: A list of values.
52/// - `Set`: A unique set of hashed values.
53/// - `Gauge`: A summary of the reported values, see [`GaugeValue`](crate::GaugeValue).
54///
55/// # Conflicts
56///
57/// Metrics are uniquely identified by the combination of their name, type and unit. It is allowed
58/// to send metrics of different types and units under the same name. For example, sending a metric
59/// once as set and once as distribution will result in two actual metrics being recorded.
60#[derive(Debug)]
61pub struct Aggregator {
62    name: String,
63    inner: inner::Inner,
64}
65
66impl Aggregator {
67    /// Creates a new named [`Self`].
68    pub fn named(name: String, config: &AggregatorConfig) -> Self {
69        let num_partitions = match config.flush_batching {
70            FlushBatching::Project => config.flush_partitions,
71            FlushBatching::Bucket => config.flush_partitions,
72            FlushBatching::Partition => config.flush_partitions,
73            FlushBatching::None => Some(0),
74        }
75        .unwrap_or(DEFAULT_PARTITIONS_PER_SECOND * config.bucket_interval.max(1));
76
77        Self {
78            name,
79            inner: inner::Inner::new(inner::Config {
80                start: UnixTimestamp::now(),
81                bucket_interval: config.bucket_interval,
82                num_time_slots: config.aggregator_size,
83                num_partitions,
84                delay: config.initial_delay,
85                max_total_bucket_bytes: config.max_total_bucket_bytes,
86                max_project_key_bucket_bytes: config.max_project_key_bucket_bytes,
87                max_secs_in_past: Some(config.max_secs_in_past),
88                max_secs_in_future: Some(config.max_secs_in_future),
89                partition_by: config.flush_batching,
90            }),
91        }
92    }
93
94    /// Returns the name of the aggregator.
95    pub fn name(&self) -> &str {
96        &self.name
97    }
98
99    /// Returns `true` if the aggregator contains any metric buckets.
100    pub fn is_empty(&self) -> bool {
101        self.inner.is_empty()
102    }
103
104    /// Merge a bucket into this aggregator.
105    pub fn merge(
106        &mut self,
107        project_key: ProjectKey,
108        bucket: Bucket,
109    ) -> Result<(), AggregateMetricsError> {
110        let key = BucketKey {
111            project_key,
112            timestamp: bucket.timestamp,
113            metric_name: bucket.name,
114            tags: bucket.tags,
115            extracted_from_indexed: bucket.metadata.extracted_from_indexed,
116        };
117
118        let value = BucketData {
119            value: bucket.value,
120            metadata: bucket.metadata,
121        };
122
123        self.inner.merge(key, value)
124    }
125
126    /// Attempts to flush the next batch from the aggregator.
127    ///
128    /// If it is too early to flush the next batch, the error contains the timestamp when the flush should be retried.
129    /// After a successful flush, retry immediately until an error is returned with the next flush
130    /// time, this makes sure time is eventually synchronized.
131    pub fn try_flush_next(&mut self, now: SystemTime) -> Result<Partition, Duration> {
132        let next_flush = SystemTime::UNIX_EPOCH + self.inner.next_flush_at();
133
134        if let Err(err) = now.duration_since(next_flush) {
135            // The flush time is in the future, return the amount of time to wait before the next flush.
136            return Err(err.duration());
137        }
138
139        // Emit global stats before flushing to emit the maximum size.
140        emit_stats(&self.name, self.inner.stats());
141
142        let partition = self.inner.flush_next();
143        emit_flush_partition_stats(&self.name, partition.stats);
144
145        Ok(Partition {
146            partition_key: partition.partition_key,
147            buckets: partition.buckets,
148            bucket_interval: self.inner.bucket_interval(),
149        })
150    }
151
152    /// Returns when the next partition is ready to be flushed using [`Self::try_flush_next`].
153    pub fn next_flush_at(&mut self, now: SystemTime) -> Duration {
154        let next_flush = SystemTime::UNIX_EPOCH + self.inner.next_flush_at();
155
156        match now.duration_since(next_flush) {
157            Ok(_) => Duration::ZERO,
158            Err(err) => err.duration(),
159        }
160    }
161
162    /// Consumes the aggregator and returns all contained partitions.
163    pub fn into_partitions(self) -> impl Iterator<Item = Partition> {
164        let bucket_interval = self.inner.bucket_interval();
165
166        emit_stats(&self.name, self.inner.stats());
167
168        self.inner.into_partitions().map(move |p| Partition {
169            partition_key: p.partition_key,
170            buckets: p.buckets,
171            bucket_interval,
172        })
173    }
174}
175
176/// A flushed partition from [`Aggregator::try_flush_next`].
177///
178/// The partition contains the partition key and all flushed buckets.
179pub struct Partition {
180    /// The partition key.
181    pub partition_key: u32,
182    buckets: HashMap<BucketKey, BucketData>,
183    bucket_interval: u64,
184}
185
186impl IntoIterator for Partition {
187    type Item = (ProjectKey, Bucket);
188    type IntoIter = PartitionIter;
189
190    fn into_iter(self) -> Self::IntoIter {
191        PartitionIter {
192            inner: self.buckets.into_iter(),
193            bucket_interval: self.bucket_interval,
194        }
195    }
196}
197
198/// Iterator yielded from [`Partition::into_iter`].
199pub struct PartitionIter {
200    inner: hashbrown::hash_map::IntoIter<BucketKey, BucketData>,
201    bucket_interval: u64,
202}
203
204impl Iterator for PartitionIter {
205    type Item = (ProjectKey, Bucket);
206
207    fn next(&mut self) -> Option<Self::Item> {
208        let (key, data) = self.inner.next()?;
209
210        Some((
211            key.project_key,
212            Bucket {
213                timestamp: key.timestamp,
214                width: self.bucket_interval,
215                name: key.metric_name,
216                tags: key.tags,
217                value: data.value,
218                metadata: data.metadata,
219            },
220        ))
221    }
222
223    fn size_hint(&self) -> (usize, Option<usize>) {
224        self.inner.size_hint()
225    }
226}
227
228impl std::iter::ExactSizeIterator for PartitionIter {
229    fn len(&self) -> usize {
230        self.inner.len()
231    }
232}
233
234impl std::iter::FusedIterator for PartitionIter {}
235
236fn emit_stats(name: &str, stats: inner::Stats) {
237    for namespace in MetricNamespace::all() {
238        relay_statsd::metric!(
239            gauge(MetricGauges::Buckets) = *stats.count_by_namespace.get(namespace),
240            namespace = namespace.as_str(),
241            aggregator = name
242        );
243        relay_statsd::metric!(
244            gauge(MetricGauges::BucketsCost) = *stats.cost_by_namespace.get(namespace),
245            namespace = namespace.as_str(),
246            aggregator = name
247        );
248    }
249}
250
251fn emit_flush_partition_stats(name: &str, stats: inner::PartitionStats) {
252    relay_statsd::metric!(counter(MetricCounters::FlushCount) += 1, aggregator = name);
253
254    for namespace in MetricNamespace::all() {
255        relay_statsd::metric!(
256            counter(MetricCounters::MergeMiss) += *stats.count_by_namespace.get(namespace),
257            namespace = namespace.as_str(),
258            aggregator = name,
259        );
260        relay_statsd::metric!(
261            counter(MetricCounters::MergeHit) += *stats.merges_by_namespace.get(namespace),
262            namespace = namespace.as_str(),
263            aggregator = name,
264        );
265        relay_statsd::metric!(
266            counter(MetricCounters::FlushCost) += *stats.cost_by_namespace.get(namespace),
267            namespace = namespace.as_str(),
268            aggregator = name,
269        );
270    }
271}