relay_server/services/
autoscaling.rs

1use crate::MemoryStat;
2use crate::services::buffer::PartitionedEnvelopeBuffer;
3use crate::services::processor::EnvelopeProcessorServicePool;
4use relay_system::{
5    AsyncResponse, Controller, FromMessage, Handle, Interface, RuntimeMetrics, Sender, Service,
6};
7use tokio::time::Instant;
8
9/// Service that tracks internal relay metrics so that they can be exposed.
10pub struct AutoscalingMetricService {
11    /// For exposing internal memory usage of relay.
12    memory_stat: MemoryStat,
13    /// Reference to the spooler to get item count and total used size.
14    envelope_buffer: PartitionedEnvelopeBuffer,
15    /// Runtime handle to expose service utilization metrics.
16    handle: Handle,
17    /// Gives access to runtime metrics.
18    runtime_metrics: RuntimeMetrics,
19    /// The last time the runtime utilization was checked.
20    last_runtime_check: Instant,
21    /// This will always report `1` unless the instance is shutting down.
22    up: u8,
23    /// Gives access to AsyncPool metrics.
24    async_pool: EnvelopeProcessorServicePool,
25}
26
27impl AutoscalingMetricService {
28    pub fn new(
29        memory_stat: MemoryStat,
30        envelope_buffer: PartitionedEnvelopeBuffer,
31        handle: Handle,
32        async_pool: EnvelopeProcessorServicePool,
33    ) -> Self {
34        let runtime_metrics = handle.metrics();
35        Self {
36            memory_stat,
37            envelope_buffer,
38            handle,
39            runtime_metrics,
40            last_runtime_check: Instant::now(),
41            async_pool,
42            up: 1,
43        }
44    }
45}
46
47impl Service for AutoscalingMetricService {
48    type Interface = AutoscalingMetrics;
49
50    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
51        let mut shutdown = Controller::shutdown_handle();
52        loop {
53            tokio::select! {
54                _ = shutdown.notified() => {
55                    self.up = 0;
56                },
57                Some(message) = rx.recv() => {
58                    match message {
59                        AutoscalingMetrics::Check(sender) => {
60                            let memory_usage = self.memory_stat.memory();
61                            let metrics = self.handle
62                                .current_services_metrics()
63                                .iter()
64                                .map(|(id, metric)| ServiceUtilization {
65                                    name: id.name(),
66                                    instance_id: id.instance_id(),
67                                    utilization: metric.utilization
68                                }
69                            )
70                                .collect();
71                            let worker_pool_utilization = self.async_pool.metrics().total_utilization();
72                            let runtime_utilization = self.runtime_utilization();
73
74                            sender.send(AutoscalingData {
75                                memory_usage: memory_usage.used_percent(),
76                                up: self.up,
77                                total_size: self.envelope_buffer.total_storage_size(),
78                                item_count: self.envelope_buffer.item_count(),
79                                services_metrics: metrics,
80                                worker_pool_utilization,
81                                runtime_utilization
82                            });
83                        }
84                    }
85                }
86            }
87        }
88    }
89}
90
91impl AutoscalingMetricService {
92    fn runtime_utilization(&mut self) -> u8 {
93        let last_checked = self.last_runtime_check.elapsed().as_secs_f64();
94        // Prevent division by 0 in case it's checked in rapid succession.
95        if last_checked < 0.001 {
96            return 0;
97        }
98        let avg_utilization = (0..self.runtime_metrics.num_workers())
99            .map(|worker_id| self.runtime_metrics.worker_total_busy_duration(worker_id))
100            .map(|busy| busy.as_secs_f64())
101            .sum::<f64>()
102            / last_checked
103            / (self.runtime_metrics.num_workers() as f64);
104
105        self.last_runtime_check = Instant::now();
106
107        (avg_utilization * 100.0).min(100.0) as u8
108    }
109}
110
111/// Supported operations within the internal metrics service.
112pub enum AutoscalingMessageKind {
113    /// Requests the current data from the service.
114    Check,
115}
116
117/// This mirrors the same messages as [`AutoscalingMessageKind`] but it can be augmented
118/// with additional data necessary for the service framework, for example a Sender.
119pub enum AutoscalingMetrics {
120    Check(Sender<AutoscalingData>),
121}
122
123impl Interface for AutoscalingMetrics {}
124
125impl FromMessage<AutoscalingMessageKind> for AutoscalingMetrics {
126    type Response = AsyncResponse<AutoscalingData>;
127
128    fn from_message(message: AutoscalingMessageKind, sender: Sender<AutoscalingData>) -> Self {
129        match message {
130            AutoscalingMessageKind::Check => AutoscalingMetrics::Check(sender),
131        }
132    }
133}
134
135/// Contains data that is used for autoscaling.
136pub struct AutoscalingData {
137    /// Memory usage of relay.
138    pub memory_usage: f32,
139    /// Is `1` if relay is running, `0` if it's shutting down.
140    pub up: u8,
141    /// The total number of bytes used by the spooler.
142    pub total_size: u64,
143    /// The total number of envelopes in the spooler.
144    pub item_count: u64,
145    /// Worker pool utilization in percent.
146    pub worker_pool_utilization: u8,
147    /// List of service utilization.
148    pub services_metrics: Vec<ServiceUtilization>,
149    /// Utilization of the async runtime.
150    pub runtime_utilization: u8,
151}
152
153/// Contains the minimal required information for service utilization.
154///
155/// A service can have multiple instances which will all have the same name.
156/// Those instances are distinguished by the `instance_id`.
157pub struct ServiceUtilization {
158    /// The service name.
159    pub name: &'static str,
160    /// The id of the specific service instance.
161    pub instance_id: u32,
162    /// Utilization as percentage.
163    pub utilization: u8,
164}
165
166impl ServiceUtilization {
167    pub fn new(name: &'static str, instance_id: u32, utilization: u8) -> Self {
168        Self {
169            name,
170            instance_id,
171            utilization,
172        }
173    }
174}