relay_server/services/
health_check.rs

1use std::sync::Arc;
2
3use relay_config::Config;
4use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Sender, Service};
5use std::future::Future;
6use tokio::sync::watch;
7use tokio::time::{Instant, timeout};
8
9use crate::services::buffer::PartitionedEnvelopeBuffer;
10use crate::services::metrics::RouterHandle;
11use crate::services::upstream::{IsAuthenticated, UpstreamRelay};
12use crate::statsd::RelayTimers;
13use crate::utils::{MemoryCheck, MemoryChecker};
14
15/// Checks whether Relay is alive and healthy based on its variant.
16#[derive(Clone, Copy, Debug, serde::Deserialize)]
17pub enum IsHealthy {
18    /// Check if the Relay is alive at all.
19    #[serde(rename = "live")]
20    Liveness,
21    /// Check if the Relay is in a state where the load balancer should route traffic to it (i.e.
22    /// it's both live/alive and not too busy).
23    #[serde(rename = "ready")]
24    Readiness,
25}
26
27/// Health check status.
28#[derive(Debug, Copy, Clone)]
29pub enum Status {
30    /// Relay is healthy.
31    Healthy,
32    /// Relay is unhealthy.
33    Unhealthy,
34}
35
36impl From<bool> for Status {
37    fn from(value: bool) -> Self {
38        match value {
39            true => Self::Healthy,
40            false => Self::Unhealthy,
41        }
42    }
43}
44
45impl FromIterator<Status> for Status {
46    fn from_iter<T: IntoIterator<Item = Status>>(iter: T) -> Self {
47        let healthy = iter
48            .into_iter()
49            .all(|status| matches!(status, Self::Healthy));
50        Self::from(healthy)
51    }
52}
53
54/// Service interface for the [`IsHealthy`] message.
55pub struct HealthCheck(IsHealthy, Sender<Status>);
56
57impl Interface for HealthCheck {}
58
59impl FromMessage<IsHealthy> for HealthCheck {
60    type Response = AsyncResponse<Status>;
61
62    fn from_message(message: IsHealthy, sender: Sender<Status>) -> Self {
63        Self(message, sender)
64    }
65}
66
67#[derive(Debug)]
68struct StatusUpdate {
69    status: Status,
70    instant: Instant,
71}
72
73impl StatusUpdate {
74    pub fn new(status: Status) -> Self {
75        Self {
76            status,
77            instant: Instant::now(),
78        }
79    }
80}
81
82/// Service implementing the [`HealthCheck`] interface.
83#[derive(Debug)]
84pub struct HealthCheckService {
85    config: Arc<Config>,
86    memory_checker: MemoryChecker,
87    aggregator: Option<RouterHandle>,
88    upstream_relay: Addr<UpstreamRelay>,
89    envelope_buffer: PartitionedEnvelopeBuffer,
90}
91
92impl HealthCheckService {
93    /// Creates a new instance of the HealthCheck service.
94    pub fn new(
95        config: Arc<Config>,
96        memory_checker: MemoryChecker,
97        aggregator: Option<RouterHandle>,
98        upstream_relay: Addr<UpstreamRelay>,
99        envelope_buffer: PartitionedEnvelopeBuffer,
100    ) -> Self {
101        Self {
102            config,
103            memory_checker,
104            aggregator,
105            upstream_relay,
106            envelope_buffer,
107        }
108    }
109
110    fn system_memory_probe(&mut self) -> Status {
111        if let MemoryCheck::Exceeded(memory) = self.memory_checker.check_memory_percent() {
112            relay_log::error!(
113                "Not enough memory, {} / {} ({:.2}% >= {:.2}%)",
114                memory.used,
115                memory.total,
116                memory.used_percent() * 100.0,
117                self.config.health_max_memory_watermark_percent() * 100.0,
118            );
119            return Status::Unhealthy;
120        }
121
122        if let MemoryCheck::Exceeded(memory) = self.memory_checker.check_memory_bytes() {
123            relay_log::error!(
124                "Not enough memory, {} / {} ({} >= {})",
125                memory.used,
126                memory.total,
127                memory.used,
128                self.config.health_max_memory_watermark_bytes(),
129            );
130            return Status::Unhealthy;
131        }
132
133        Status::Healthy
134    }
135
136    async fn auth_probe(&self) -> Status {
137        if !self.config.requires_auth() {
138            return Status::Healthy;
139        }
140
141        self.upstream_relay
142            .send(IsAuthenticated)
143            .await
144            .map_or(Status::Unhealthy, Status::from)
145    }
146
147    async fn aggregator_probe(&self) -> Status {
148        self.aggregator
149            .as_ref()
150            .map(|agg| Status::from(agg.can_accept_metrics()))
151            .unwrap_or(Status::Healthy)
152    }
153
154    async fn spool_health_probe(&self) -> Status {
155        match self.envelope_buffer.has_capacity() {
156            true => Status::Healthy,
157            false => Status::Unhealthy,
158        }
159    }
160
161    async fn probe(&self, name: &'static str, fut: impl Future<Output = Status>) -> Status {
162        match timeout(self.config.health_probe_timeout(), fut).await {
163            Err(_) => {
164                relay_log::error!("Health check probe '{name}' timed out");
165                Status::Unhealthy
166            }
167            Ok(Status::Unhealthy) => {
168                relay_log::error!("Health check probe '{name}' failed");
169                Status::Unhealthy
170            }
171            Ok(Status::Healthy) => Status::Healthy,
172        }
173    }
174
175    async fn check_readiness(&mut self) -> Status {
176        // System memory is sync and requires mutable access, but we still want to log errors.
177        let sys_mem = self.system_memory_probe();
178
179        let (sys_mem, auth, agg, proj) = tokio::join!(
180            self.probe("system memory", async { sys_mem }),
181            self.probe("auth", self.auth_probe()),
182            self.probe("aggregator", self.aggregator_probe()),
183            self.probe("spool health", self.spool_health_probe()),
184        );
185
186        Status::from_iter([sys_mem, auth, agg, proj])
187    }
188}
189
190impl Service for HealthCheckService {
191    type Interface = HealthCheck;
192
193    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
194        let (update_tx, update_rx) = watch::channel(StatusUpdate::new(Status::Unhealthy));
195        let check_interval = self.config.health_refresh_interval();
196        // Add 10% buffer to the internal timeouts to avoid race conditions.
197        let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1);
198
199        relay_system::spawn!(async move {
200            let shutdown = Controller::shutdown_handle();
201
202            while shutdown.get().is_none() {
203                let _ = update_tx.send(StatusUpdate::new(relay_statsd::metric!(
204                    timer(RelayTimers::HealthCheckDuration),
205                    type = "readiness",
206                    { self.check_readiness().await }
207                )));
208
209                tokio::time::sleep(check_interval).await;
210            }
211
212            // Shutdown marks readiness health check as unhealthy.
213            update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok();
214        });
215
216        while let Some(HealthCheck(message, sender)) = rx.recv().await {
217            let update = update_rx.borrow();
218
219            sender.send(if matches!(message, IsHealthy::Liveness) {
220                Status::Healthy
221            } else if update.instant.elapsed() >= status_timeout {
222                Status::Unhealthy
223            } else {
224                update.status
225            });
226        }
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233
234    #[test]
235    fn test_status() {
236        assert!(matches!(Status::from(true), Status::Healthy));
237        assert!(matches!(Status::from(false), Status::Unhealthy));
238
239        let s = [Status::Unhealthy, Status::Unhealthy].into_iter().collect();
240        assert!(matches!(s, Status::Unhealthy));
241
242        let s = [Status::Unhealthy, Status::Healthy].into_iter().collect();
243        assert!(matches!(s, Status::Unhealthy));
244
245        let s = [Status::Healthy, Status::Unhealthy].into_iter().collect();
246        assert!(matches!(s, Status::Unhealthy));
247
248        let s = [Status::Unhealthy].into_iter().collect();
249        assert!(matches!(s, Status::Unhealthy));
250
251        // The iterator should short circuit.
252        let s = std::iter::repeat(Status::Unhealthy).collect();
253        assert!(matches!(s, Status::Unhealthy));
254
255        let s = [Status::Healthy, Status::Healthy].into_iter().collect();
256        assert!(matches!(s, Status::Healthy));
257
258        let s = [Status::Healthy].into_iter().collect();
259        assert!(matches!(s, Status::Healthy));
260
261        let s = [].into_iter().collect();
262        assert!(matches!(s, Status::Healthy));
263    }
264}