relay_server/services/
autoscaling.rsuse crate::services::buffer::PartitionedEnvelopeBuffer;
use crate::MemoryStat;
use relay_system::{AsyncResponse, Controller, FromMessage, Handle, Interface, Sender, Service};
pub struct AutoscalingMetricService {
memory_stat: MemoryStat,
envelope_buffer: PartitionedEnvelopeBuffer,
handle: Handle,
up: u8,
}
impl AutoscalingMetricService {
pub fn new(
memory_stat: MemoryStat,
envelope_buffer: PartitionedEnvelopeBuffer,
handle: Handle,
) -> Self {
Self {
memory_stat,
envelope_buffer,
handle,
up: 1,
}
}
}
impl Service for AutoscalingMetricService {
type Interface = AutoscalingMetrics;
async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
let mut shutdown = Controller::shutdown_handle();
loop {
tokio::select! {
_ = shutdown.notified() => {
self.up = 0;
},
Some(message) = rx.recv() => {
match message {
AutoscalingMetrics::Check(sender) => {
let memory_usage = self.memory_stat.memory();
let metrics = self.handle
.current_services_metrics()
.iter()
.map(|(id, metric)| ServiceUtilization(id.name(), metric.utilization))
.collect();
sender.send(AutoscalingData {
memory_usage: memory_usage.used_percent(),
up: self.up,
total_size: self.envelope_buffer.total_storage_size(),
item_count: self.envelope_buffer.item_count(),
services_metrics: metrics
});
}
}
}
}
}
}
}
pub enum AutoscalingMessageKind {
Check,
}
pub enum AutoscalingMetrics {
Check(Sender<AutoscalingData>),
}
impl Interface for AutoscalingMetrics {}
impl FromMessage<AutoscalingMessageKind> for AutoscalingMetrics {
type Response = AsyncResponse<AutoscalingData>;
fn from_message(message: AutoscalingMessageKind, sender: Sender<AutoscalingData>) -> Self {
match message {
AutoscalingMessageKind::Check => AutoscalingMetrics::Check(sender),
}
}
}
pub struct AutoscalingData {
pub memory_usage: f32,
pub up: u8,
pub total_size: u64,
pub item_count: u64,
pub services_metrics: Vec<ServiceUtilization>,
}
pub struct ServiceUtilization(pub &'static str, pub u8);