relay_server/services/
autoscaling.rs1use crate::MemoryStat;
2use crate::services::buffer::PartitionedEnvelopeBuffer;
3use crate::services::processor::EnvelopeProcessorServicePool;
4use relay_system::{
5 AsyncResponse, Controller, FromMessage, Handle, Interface, RuntimeMetrics, Sender, Service,
6};
7use tokio::time::Instant;
8
9pub struct AutoscalingMetricService {
11 memory_stat: MemoryStat,
13 envelope_buffer: PartitionedEnvelopeBuffer,
15 handle: Handle,
17 runtime_metrics: RuntimeMetrics,
19 last_runtime_check: Instant,
21 up: u8,
23 async_pool: EnvelopeProcessorServicePool,
25}
26
27impl AutoscalingMetricService {
28 pub fn new(
29 memory_stat: MemoryStat,
30 envelope_buffer: PartitionedEnvelopeBuffer,
31 handle: Handle,
32 async_pool: EnvelopeProcessorServicePool,
33 ) -> Self {
34 let runtime_metrics = handle.metrics();
35 Self {
36 memory_stat,
37 envelope_buffer,
38 handle,
39 runtime_metrics,
40 last_runtime_check: Instant::now(),
41 async_pool,
42 up: 1,
43 }
44 }
45}
46
47impl Service for AutoscalingMetricService {
48 type Interface = AutoscalingMetrics;
49
50 async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
51 let mut shutdown = Controller::shutdown_handle();
52 loop {
53 tokio::select! {
54 _ = shutdown.notified() => {
55 self.up = 0;
56 },
57 Some(message) = rx.recv() => {
58 match message {
59 AutoscalingMetrics::Check(sender) => {
60 let memory_usage = self.memory_stat.memory();
61 let metrics = self.handle
62 .current_services_metrics()
63 .iter()
64 .map(|(id, metric)| ServiceUtilization {
65 name: id.name(),
66 instance_id: id.instance_id(),
67 utilization: metric.utilization
68 }
69 )
70 .collect();
71 let worker_pool_utilization = self.async_pool.metrics().total_utilization();
72 let runtime_utilization = self.runtime_utilization();
73
74 sender.send(AutoscalingData {
75 memory_usage: memory_usage.used_percent(),
76 up: self.up,
77 total_size: self.envelope_buffer.total_storage_size(),
78 item_count: self.envelope_buffer.item_count(),
79 services_metrics: metrics,
80 worker_pool_utilization,
81 runtime_utilization
82 });
83 }
84 }
85 }
86 }
87 }
88 }
89}
90
91impl AutoscalingMetricService {
92 fn runtime_utilization(&mut self) -> u8 {
93 let last_checked = self.last_runtime_check.elapsed().as_secs_f64();
94 if last_checked < 0.001 {
96 return 0;
97 }
98 let avg_utilization = (0..self.runtime_metrics.num_workers())
99 .map(|worker_id| self.runtime_metrics.worker_total_busy_duration(worker_id))
100 .map(|busy| busy.as_secs_f64())
101 .sum::<f64>()
102 / last_checked
103 / (self.runtime_metrics.num_workers() as f64);
104
105 self.last_runtime_check = Instant::now();
106
107 (avg_utilization * 100.0).min(100.0) as u8
108 }
109}
110
111pub enum AutoscalingMessageKind {
113 Check,
115}
116
117pub enum AutoscalingMetrics {
120 Check(Sender<AutoscalingData>),
121}
122
123impl Interface for AutoscalingMetrics {}
124
125impl FromMessage<AutoscalingMessageKind> for AutoscalingMetrics {
126 type Response = AsyncResponse<AutoscalingData>;
127
128 fn from_message(message: AutoscalingMessageKind, sender: Sender<AutoscalingData>) -> Self {
129 match message {
130 AutoscalingMessageKind::Check => AutoscalingMetrics::Check(sender),
131 }
132 }
133}
134
135pub struct AutoscalingData {
137 pub memory_usage: f32,
139 pub up: u8,
141 pub total_size: u64,
143 pub item_count: u64,
145 pub worker_pool_utilization: u8,
147 pub services_metrics: Vec<ServiceUtilization>,
149 pub runtime_utilization: u8,
151}
152
153pub struct ServiceUtilization {
158 pub name: &'static str,
160 pub instance_id: u32,
162 pub utilization: u8,
164}
165
166impl ServiceUtilization {
167 pub fn new(name: &'static str, instance_id: u32, utilization: u8) -> Self {
168 Self {
169 name,
170 instance_id,
171 utilization,
172 }
173 }
174}