relay_server/services/
autoscaling.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use crate::services::buffer::PartitionedEnvelopeBuffer;
use crate::MemoryStat;
use relay_system::{AsyncResponse, Controller, FromMessage, Handle, Interface, Sender, Service};

/// Service that tracks internal relay metrics so that they can be exposed.
pub struct AutoscalingMetricService {
    /// For exposing internal memory usage of relay.
    memory_stat: MemoryStat,
    /// Reference to the spooler to get item count and total used size.
    envelope_buffer: PartitionedEnvelopeBuffer,
    /// Runtime handle to expose service utilization metrics.
    handle: Handle,
    /// This will always report `1` unless the instance is shutting down.
    up: u8,
}

impl AutoscalingMetricService {
    pub fn new(
        memory_stat: MemoryStat,
        envelope_buffer: PartitionedEnvelopeBuffer,
        handle: Handle,
    ) -> Self {
        Self {
            memory_stat,
            envelope_buffer,
            handle,
            up: 1,
        }
    }
}

impl Service for AutoscalingMetricService {
    type Interface = AutoscalingMetrics;

    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
        let mut shutdown = Controller::shutdown_handle();
        loop {
            tokio::select! {
                _ = shutdown.notified() => {
                    self.up = 0;
                },
                Some(message) = rx.recv() => {
                    match message {
                        AutoscalingMetrics::Check(sender) => {
                            let memory_usage = self.memory_stat.memory();
                            let metrics = self.handle
                                .current_services_metrics()
                                .iter()
                                .map(|(id, metric)| ServiceUtilization(id.name(), metric.utilization))
                                .collect();
                            sender.send(AutoscalingData {
                                memory_usage: memory_usage.used_percent(),
                                up: self.up,
                                total_size: self.envelope_buffer.total_storage_size(),
                                item_count: self.envelope_buffer.item_count(),
                                services_metrics: metrics
                            });
                        }
                    }
                }
            }
        }
    }
}

/// Supported operations within the internal metrics service.
pub enum AutoscalingMessageKind {
    /// Requests the current data from the service.
    Check,
}

/// This mirrors the same messages as [`AutoscalingMessageKind`] but it can be augmented
/// with additional data necessary for the service framework, for example a Sender.
pub enum AutoscalingMetrics {
    Check(Sender<AutoscalingData>),
}

impl Interface for AutoscalingMetrics {}

impl FromMessage<AutoscalingMessageKind> for AutoscalingMetrics {
    type Response = AsyncResponse<AutoscalingData>;

    fn from_message(message: AutoscalingMessageKind, sender: Sender<AutoscalingData>) -> Self {
        match message {
            AutoscalingMessageKind::Check => AutoscalingMetrics::Check(sender),
        }
    }
}

pub struct AutoscalingData {
    pub memory_usage: f32,
    pub up: u8,
    pub total_size: u64,
    pub item_count: u64,
    pub services_metrics: Vec<ServiceUtilization>,
}

pub struct ServiceUtilization(pub &'static str, pub u8);