relay_metrics/aggregator/
config.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use serde::{Deserialize, Serialize};

/// Configuration value for [`AggregatorConfig::flush_batching`].
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum FlushBatching {
    /// Shifts the flush time by an offset based on the project key.
    ///
    /// This allows buckets from the same project to be flushed together.
    #[default]
    Project,

    /// Shifts the flush time by an offset based on the bucket key itself.
    ///
    /// This allows for a completely random distribution of bucket flush times.
    ///
    /// It should only be used in processing Relays since this flushing behavior it's better
    /// suited for how Relay emits metrics to Kafka.
    Bucket,

    /// Shifts the flush time by an offset based on the partition key.
    ///
    /// This allows buckets belonging to the same partition to be flushed together. Requires
    /// [`flush_partitions`](AggregatorConfig::flush_partitions) to be set, otherwise this will fall
    /// back to [`FlushBatching::Project`].
    ///
    /// It should only be used in Relays with the `http.global_metrics` option set since when
    /// encoding metrics via the global endpoint we can leverage partitioning.
    Partition,

    /// Do not apply shift.
    None,
}

/// Parameters used by the [`crate::aggregator::Aggregator`].
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(default)]
pub struct AggregatorConfig {
    /// Determines the wall clock time interval for buckets in seconds.
    ///
    /// Defaults to `10` seconds. Every metric is sorted into a bucket of this size based on its
    /// timestamp. This defines the minimum granularity with which metrics can be queried later.
    pub bucket_interval: u32,

    /// The aggregator size.
    ///
    /// This determines how many individual bucket intervals, are stored in the aggregator.
    /// Increasing this number will add additional delay for backdated and future buckets.
    ///
    /// Defaults to: 1.
    pub aggregator_size: u32,

    /// The initial delay in seconds to wait before flushing a bucket.
    ///
    /// Defaults to `30` seconds. Before sending an aggregated bucket, this is the time Relay waits
    /// for buckets that are being reported in real time.
    pub initial_delay: u32,

    /// The age in seconds of the oldest allowed bucket timestamp.
    ///
    /// Defaults to 5 days.
    pub max_secs_in_past: u64,

    /// The time in seconds that a timestamp may be in the future.
    ///
    /// Defaults to 1 minute.
    pub max_secs_in_future: u64,

    /// Maximum amount of bytes used for metrics aggregation per project key.
    ///
    /// Similar measuring technique to [`Self::max_total_bucket_bytes`], but instead of a
    /// global/process-wide limit, it is enforced per project key.
    ///
    /// Defaults to `None`, i.e. no limit.
    pub max_project_key_bucket_bytes: Option<u64>,

    /// Maximum amount of bytes used for metrics aggregation.
    ///
    /// When aggregating metrics, Relay keeps track of how many bytes a metric takes in memory.
    /// This is only an approximation and does not take into account things such as pre-allocation
    /// in hashmaps.
    ///
    /// Defaults to `None`, i.e. no limit.
    pub max_total_bucket_bytes: Option<u64>,

    /// The number of logical partitions that can receive flushed buckets.
    pub flush_partitions: Option<u32>,

    /// The batching mode for the flushing of the aggregator.
    ///
    /// Batching is applied via shifts to the flushing time that is determined when the first bucket
    /// is inserted. Thanks to the shifts, Relay is able to prevent flushing all buckets from a
    /// bucket interval at the same time.
    ///
    /// For example, the aggregator can choose to shift by the same value all buckets within a given
    /// partition, effectively allowing all the elements of that partition to be flushed together.
    #[serde(alias = "shift_key")]
    pub flush_batching: FlushBatching,
}

impl Default for AggregatorConfig {
    fn default() -> Self {
        Self {
            bucket_interval: 10,
            aggregator_size: 1,
            initial_delay: 30,
            max_secs_in_past: 5 * 24 * 60 * 60, // 5 days
            max_secs_in_future: 60,             // 1 minute
            max_project_key_bucket_bytes: None,
            max_total_bucket_bytes: None,
            flush_batching: FlushBatching::default(),
            flush_partitions: None,
        }
    }
}