relay_metrics/aggregator/
config.rs

1use serde::{Deserialize, Serialize};
2
3/// Configuration value for [`AggregatorConfig::flush_batching`].
4#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
5#[serde(rename_all = "lowercase")]
6pub enum FlushBatching {
7    /// Shifts the flush time by an offset based on the project key.
8    ///
9    /// This allows buckets from the same project to be flushed together.
10    #[default]
11    Project,
12
13    /// Shifts the flush time by an offset based on the bucket key itself.
14    ///
15    /// This allows for a completely random distribution of bucket flush times.
16    ///
17    /// It should only be used in processing Relays since this flushing behavior it's better
18    /// suited for how Relay emits metrics to Kafka.
19    Bucket,
20
21    /// Shifts the flush time by an offset based on the partition key.
22    ///
23    /// This allows buckets belonging to the same partition to be flushed together. Requires
24    /// [`flush_partitions`](AggregatorConfig::flush_partitions) to be set, otherwise this will fall
25    /// back to [`FlushBatching::Project`].
26    ///
27    /// It should only be used in Relays with the `http.global_metrics` option set since when
28    /// encoding metrics via the global endpoint we can leverage partitioning.
29    Partition,
30
31    /// Do not apply shift.
32    None,
33}
34
35/// Parameters used by the [`crate::aggregator::Aggregator`].
36#[derive(Clone, Debug, Deserialize, Serialize)]
37#[serde(default)]
38pub struct AggregatorConfig {
39    /// Determines the wall clock time interval for buckets in seconds.
40    ///
41    /// Defaults to `10` seconds. Every metric is sorted into a bucket of this size based on its
42    /// timestamp. This defines the minimum granularity with which metrics can be queried later.
43    pub bucket_interval: u32,
44
45    /// The aggregator size.
46    ///
47    /// This determines how many individual bucket intervals, are stored in the aggregator.
48    /// Increasing this number will add additional delay for backdated and future buckets.
49    ///
50    /// Defaults to: 1.
51    pub aggregator_size: u32,
52
53    /// The initial delay in seconds to wait before flushing a bucket.
54    ///
55    /// Defaults to `30` seconds. Before sending an aggregated bucket, this is the time Relay waits
56    /// for buckets that are being reported in real time.
57    pub initial_delay: u32,
58
59    /// The age in seconds of the oldest allowed bucket timestamp.
60    ///
61    /// Defaults to 5 days.
62    pub max_secs_in_past: u64,
63
64    /// The time in seconds that a timestamp may be in the future.
65    ///
66    /// Defaults to 1 minute.
67    pub max_secs_in_future: u64,
68
69    /// Maximum amount of bytes used for metrics aggregation per project key.
70    ///
71    /// Similar measuring technique to [`Self::max_total_bucket_bytes`], but instead of a
72    /// global/process-wide limit, it is enforced per project key.
73    ///
74    /// Defaults to `None`, i.e. no limit.
75    pub max_project_key_bucket_bytes: Option<u64>,
76
77    /// Maximum amount of bytes used for metrics aggregation.
78    ///
79    /// When aggregating metrics, Relay keeps track of how many bytes a metric takes in memory.
80    /// This is only an approximation and does not take into account things such as pre-allocation
81    /// in hashmaps.
82    ///
83    /// Defaults to `None`, i.e. no limit.
84    pub max_total_bucket_bytes: Option<u64>,
85
86    /// The number of logical partitions that can receive flushed buckets.
87    pub flush_partitions: Option<u32>,
88
89    /// The batching mode for the flushing of the aggregator.
90    ///
91    /// Batching is applied via shifts to the flushing time that is determined when the first bucket
92    /// is inserted. Thanks to the shifts, Relay is able to prevent flushing all buckets from a
93    /// bucket interval at the same time.
94    ///
95    /// For example, the aggregator can choose to shift by the same value all buckets within a given
96    /// partition, effectively allowing all the elements of that partition to be flushed together.
97    #[serde(alias = "shift_key")]
98    pub flush_batching: FlushBatching,
99}
100
101impl Default for AggregatorConfig {
102    fn default() -> Self {
103        Self {
104            bucket_interval: 10,
105            aggregator_size: 1,
106            initial_delay: 30,
107            max_secs_in_past: 5 * 24 * 60 * 60, // 5 days
108            max_secs_in_future: 60,             // 1 minute
109            max_project_key_bucket_bytes: None,
110            max_total_bucket_bytes: None,
111            flush_batching: FlushBatching::default(),
112            flush_partitions: None,
113        }
114    }
115}