relay_server/services/
health_check.rs

1use std::sync::Arc;
2
3use relay_config::Config;
4use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Sender, Service};
5use std::future::Future;
6use tokio::sync::watch;
7use tokio::time::{Instant, timeout};
8
9use crate::services::buffer::PartitionedEnvelopeBuffer;
10use crate::services::metrics::RouterHandle;
11use crate::services::upstream::{IsAuthenticated, UpstreamRelay};
12use crate::statsd::RelayTimers;
13use crate::utils::{MemoryCheck, MemoryChecker};
14
15/// Checks whether Relay is alive and healthy based on its variant.
16#[derive(Clone, Copy, Debug, serde::Deserialize)]
17pub enum IsHealthy {
18    /// Check if the Relay is alive at all.
19    #[serde(rename = "live")]
20    Liveness,
21    /// Check if the Relay is in a state where the load balancer should route traffic to it (i.e.
22    /// it's both live/alive and not too busy).
23    #[serde(rename = "ready")]
24    Readiness,
25}
26
27/// Health check status.
28#[derive(Debug, Copy, Clone)]
29pub enum Status {
30    /// Relay is healthy.
31    Healthy,
32    /// Relay is unhealthy.
33    Unhealthy,
34}
35
36impl From<bool> for Status {
37    fn from(value: bool) -> Self {
38        match value {
39            true => Self::Healthy,
40            false => Self::Unhealthy,
41        }
42    }
43}
44
45impl FromIterator<Status> for Status {
46    fn from_iter<T: IntoIterator<Item = Status>>(iter: T) -> Self {
47        let healthy = iter
48            .into_iter()
49            .all(|status| matches!(status, Self::Healthy));
50        Self::from(healthy)
51    }
52}
53
54/// Service interface for the [`IsHealthy`] message.
55pub struct HealthCheck(IsHealthy, Sender<Status>);
56
57impl Interface for HealthCheck {}
58
59impl FromMessage<IsHealthy> for HealthCheck {
60    type Response = AsyncResponse<Status>;
61
62    fn from_message(message: IsHealthy, sender: Sender<Status>) -> Self {
63        Self(message, sender)
64    }
65}
66
67#[derive(Debug)]
68struct StatusUpdate {
69    status: Status,
70    instant: Instant,
71}
72
73impl StatusUpdate {
74    pub fn new(status: Status) -> Self {
75        Self {
76            status,
77            instant: Instant::now(),
78        }
79    }
80}
81
82/// Service implementing the [`HealthCheck`] interface.
83#[derive(Debug)]
84pub struct HealthCheckService {
85    config: Arc<Config>,
86    memory_checker: MemoryChecker,
87    aggregator: RouterHandle,
88    upstream_relay: Addr<UpstreamRelay>,
89    envelope_buffer: PartitionedEnvelopeBuffer,
90}
91
92impl HealthCheckService {
93    /// Creates a new instance of the HealthCheck service.
94    pub fn new(
95        config: Arc<Config>,
96        memory_checker: MemoryChecker,
97        aggregator: RouterHandle,
98        upstream_relay: Addr<UpstreamRelay>,
99        envelope_buffer: PartitionedEnvelopeBuffer,
100    ) -> Self {
101        Self {
102            config,
103            memory_checker,
104            aggregator,
105            upstream_relay,
106            envelope_buffer,
107        }
108    }
109
110    fn system_memory_probe(&mut self) -> Status {
111        if let MemoryCheck::Exceeded(memory) = self.memory_checker.check_memory_percent() {
112            relay_log::error!(
113                "Not enough memory, {} / {} ({:.2}% >= {:.2}%)",
114                memory.used,
115                memory.total,
116                memory.used_percent() * 100.0,
117                self.config.health_max_memory_watermark_percent() * 100.0,
118            );
119            return Status::Unhealthy;
120        }
121
122        if let MemoryCheck::Exceeded(memory) = self.memory_checker.check_memory_bytes() {
123            relay_log::error!(
124                "Not enough memory, {} / {} ({} >= {})",
125                memory.used,
126                memory.total,
127                memory.used,
128                self.config.health_max_memory_watermark_bytes(),
129            );
130            return Status::Unhealthy;
131        }
132
133        Status::Healthy
134    }
135
136    async fn auth_probe(&self) -> Status {
137        if !self.config.requires_auth() {
138            return Status::Healthy;
139        }
140
141        self.upstream_relay
142            .send(IsAuthenticated)
143            .await
144            .map_or(Status::Unhealthy, Status::from)
145    }
146
147    async fn aggregator_probe(&self) -> Status {
148        Status::from(self.aggregator.can_accept_metrics())
149    }
150
151    async fn spool_health_probe(&self) -> Status {
152        match self.envelope_buffer.has_capacity() {
153            true => Status::Healthy,
154            false => Status::Unhealthy,
155        }
156    }
157
158    async fn probe(&self, name: &'static str, fut: impl Future<Output = Status>) -> Status {
159        match timeout(self.config.health_probe_timeout(), fut).await {
160            Err(_) => {
161                relay_log::error!("Health check probe '{name}' timed out");
162                Status::Unhealthy
163            }
164            Ok(Status::Unhealthy) => {
165                relay_log::error!("Health check probe '{name}' failed");
166                Status::Unhealthy
167            }
168            Ok(Status::Healthy) => Status::Healthy,
169        }
170    }
171
172    async fn check_readiness(&mut self) -> Status {
173        // System memory is sync and requires mutable access, but we still want to log errors.
174        let sys_mem = self.system_memory_probe();
175
176        let (sys_mem, auth, agg, proj) = tokio::join!(
177            self.probe("system memory", async { sys_mem }),
178            self.probe("auth", self.auth_probe()),
179            self.probe("aggregator", self.aggregator_probe()),
180            self.probe("spool health", self.spool_health_probe()),
181        );
182
183        Status::from_iter([sys_mem, auth, agg, proj])
184    }
185}
186
187impl Service for HealthCheckService {
188    type Interface = HealthCheck;
189
190    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
191        let (update_tx, update_rx) = watch::channel(StatusUpdate::new(Status::Unhealthy));
192        let check_interval = self.config.health_refresh_interval();
193        // Add 10% buffer to the internal timeouts to avoid race conditions.
194        let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1);
195
196        relay_system::spawn!(async move {
197            let shutdown = Controller::shutdown_handle();
198
199            while shutdown.get().is_none() {
200                let _ = update_tx.send(StatusUpdate::new(relay_statsd::metric!(
201                    timer(RelayTimers::HealthCheckDuration),
202                    type = "readiness",
203                    { self.check_readiness().await }
204                )));
205
206                tokio::time::sleep(check_interval).await;
207            }
208
209            // Shutdown marks readiness health check as unhealthy.
210            update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok();
211        });
212
213        while let Some(HealthCheck(message, sender)) = rx.recv().await {
214            let update = update_rx.borrow();
215
216            sender.send(if matches!(message, IsHealthy::Liveness) {
217                Status::Healthy
218            } else if update.instant.elapsed() >= status_timeout {
219                Status::Unhealthy
220            } else {
221                update.status
222            });
223        }
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230
231    #[test]
232    fn test_status() {
233        assert!(matches!(Status::from(true), Status::Healthy));
234        assert!(matches!(Status::from(false), Status::Unhealthy));
235
236        let s = [Status::Unhealthy, Status::Unhealthy].into_iter().collect();
237        assert!(matches!(s, Status::Unhealthy));
238
239        let s = [Status::Unhealthy, Status::Healthy].into_iter().collect();
240        assert!(matches!(s, Status::Unhealthy));
241
242        let s = [Status::Healthy, Status::Unhealthy].into_iter().collect();
243        assert!(matches!(s, Status::Unhealthy));
244
245        let s = [Status::Unhealthy].into_iter().collect();
246        assert!(matches!(s, Status::Unhealthy));
247
248        // The iterator should short circuit.
249        let s = std::iter::repeat(Status::Unhealthy).collect();
250        assert!(matches!(s, Status::Unhealthy));
251
252        let s = [Status::Healthy, Status::Healthy].into_iter().collect();
253        assert!(matches!(s, Status::Healthy));
254
255        let s = [Status::Healthy].into_iter().collect();
256        assert!(matches!(s, Status::Healthy));
257
258        let s = [].into_iter().collect();
259        assert!(matches!(s, Status::Healthy));
260    }
261}