1use std::sync::Arc;
2
3use relay_config::Config;
4use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Sender, Service};
5use std::future::Future;
6use tokio::sync::watch;
7use tokio::time::{Instant, timeout};
8
9use crate::services::buffer::PartitionedEnvelopeBuffer;
10use crate::services::metrics::RouterHandle;
11use crate::services::upstream::{IsAuthenticated, UpstreamRelay};
12use crate::statsd::RelayTimers;
13use crate::utils::{MemoryCheck, MemoryChecker};
14
15#[derive(Clone, Copy, Debug, serde::Deserialize)]
17pub enum IsHealthy {
18 #[serde(rename = "live")]
20 Liveness,
21 #[serde(rename = "ready")]
24 Readiness,
25}
26
27#[derive(Debug, Copy, Clone)]
29pub enum Status {
30 Healthy,
32 Unhealthy,
34}
35
36impl From<bool> for Status {
37 fn from(value: bool) -> Self {
38 match value {
39 true => Self::Healthy,
40 false => Self::Unhealthy,
41 }
42 }
43}
44
45impl FromIterator<Status> for Status {
46 fn from_iter<T: IntoIterator<Item = Status>>(iter: T) -> Self {
47 let healthy = iter
48 .into_iter()
49 .all(|status| matches!(status, Self::Healthy));
50 Self::from(healthy)
51 }
52}
53
54pub struct HealthCheck(IsHealthy, Sender<Status>);
56
57impl Interface for HealthCheck {}
58
59impl FromMessage<IsHealthy> for HealthCheck {
60 type Response = AsyncResponse<Status>;
61
62 fn from_message(message: IsHealthy, sender: Sender<Status>) -> Self {
63 Self(message, sender)
64 }
65}
66
67#[derive(Debug)]
68struct StatusUpdate {
69 status: Status,
70 instant: Instant,
71}
72
73impl StatusUpdate {
74 pub fn new(status: Status) -> Self {
75 Self {
76 status,
77 instant: Instant::now(),
78 }
79 }
80}
81
82#[derive(Debug)]
84pub struct HealthCheckService {
85 config: Arc<Config>,
86 memory_checker: MemoryChecker,
87 aggregator: RouterHandle,
88 upstream_relay: Addr<UpstreamRelay>,
89 envelope_buffer: PartitionedEnvelopeBuffer,
90}
91
92impl HealthCheckService {
93 pub fn new(
95 config: Arc<Config>,
96 memory_checker: MemoryChecker,
97 aggregator: RouterHandle,
98 upstream_relay: Addr<UpstreamRelay>,
99 envelope_buffer: PartitionedEnvelopeBuffer,
100 ) -> Self {
101 Self {
102 config,
103 memory_checker,
104 aggregator,
105 upstream_relay,
106 envelope_buffer,
107 }
108 }
109
110 fn system_memory_probe(&mut self) -> Status {
111 if let MemoryCheck::Exceeded(memory) = self.memory_checker.check_memory_percent() {
112 relay_log::error!(
113 "Not enough memory, {} / {} ({:.2}% >= {:.2}%)",
114 memory.used,
115 memory.total,
116 memory.used_percent() * 100.0,
117 self.config.health_max_memory_watermark_percent() * 100.0,
118 );
119 return Status::Unhealthy;
120 }
121
122 if let MemoryCheck::Exceeded(memory) = self.memory_checker.check_memory_bytes() {
123 relay_log::error!(
124 "Not enough memory, {} / {} ({} >= {})",
125 memory.used,
126 memory.total,
127 memory.used,
128 self.config.health_max_memory_watermark_bytes(),
129 );
130 return Status::Unhealthy;
131 }
132
133 Status::Healthy
134 }
135
136 async fn auth_probe(&self) -> Status {
137 if !self.config.requires_auth() {
138 return Status::Healthy;
139 }
140
141 self.upstream_relay
142 .send(IsAuthenticated)
143 .await
144 .map_or(Status::Unhealthy, Status::from)
145 }
146
147 async fn aggregator_probe(&self) -> Status {
148 Status::from(self.aggregator.can_accept_metrics())
149 }
150
151 async fn spool_health_probe(&self) -> Status {
152 match self.envelope_buffer.has_capacity() {
153 true => Status::Healthy,
154 false => Status::Unhealthy,
155 }
156 }
157
158 async fn probe(&self, name: &'static str, fut: impl Future<Output = Status>) -> Status {
159 match timeout(self.config.health_probe_timeout(), fut).await {
160 Err(_) => {
161 relay_log::error!("Health check probe '{name}' timed out");
162 Status::Unhealthy
163 }
164 Ok(Status::Unhealthy) => {
165 relay_log::error!("Health check probe '{name}' failed");
166 Status::Unhealthy
167 }
168 Ok(Status::Healthy) => Status::Healthy,
169 }
170 }
171
172 async fn check_readiness(&mut self) -> Status {
173 let sys_mem = self.system_memory_probe();
175
176 let (sys_mem, auth, agg, proj) = tokio::join!(
177 self.probe("system memory", async { sys_mem }),
178 self.probe("auth", self.auth_probe()),
179 self.probe("aggregator", self.aggregator_probe()),
180 self.probe("spool health", self.spool_health_probe()),
181 );
182
183 Status::from_iter([sys_mem, auth, agg, proj])
184 }
185}
186
187impl Service for HealthCheckService {
188 type Interface = HealthCheck;
189
190 async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
191 let (update_tx, update_rx) = watch::channel(StatusUpdate::new(Status::Unhealthy));
192 let check_interval = self.config.health_refresh_interval();
193 let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1);
195
196 relay_system::spawn!(async move {
197 let shutdown = Controller::shutdown_handle();
198
199 while shutdown.get().is_none() {
200 let _ = update_tx.send(StatusUpdate::new(relay_statsd::metric!(
201 timer(RelayTimers::HealthCheckDuration),
202 type = "readiness",
203 { self.check_readiness().await }
204 )));
205
206 tokio::time::sleep(check_interval).await;
207 }
208
209 update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok();
211 });
212
213 while let Some(HealthCheck(message, sender)) = rx.recv().await {
214 let update = update_rx.borrow();
215
216 sender.send(if matches!(message, IsHealthy::Liveness) {
217 Status::Healthy
218 } else if update.instant.elapsed() >= status_timeout {
219 Status::Unhealthy
220 } else {
221 update.status
222 });
223 }
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230
231 #[test]
232 fn test_status() {
233 assert!(matches!(Status::from(true), Status::Healthy));
234 assert!(matches!(Status::from(false), Status::Unhealthy));
235
236 let s = [Status::Unhealthy, Status::Unhealthy].into_iter().collect();
237 assert!(matches!(s, Status::Unhealthy));
238
239 let s = [Status::Unhealthy, Status::Healthy].into_iter().collect();
240 assert!(matches!(s, Status::Unhealthy));
241
242 let s = [Status::Healthy, Status::Unhealthy].into_iter().collect();
243 assert!(matches!(s, Status::Unhealthy));
244
245 let s = [Status::Unhealthy].into_iter().collect();
246 assert!(matches!(s, Status::Unhealthy));
247
248 let s = std::iter::repeat(Status::Unhealthy).collect();
250 assert!(matches!(s, Status::Unhealthy));
251
252 let s = [Status::Healthy, Status::Healthy].into_iter().collect();
253 assert!(matches!(s, Status::Healthy));
254
255 let s = [Status::Healthy].into_iter().collect();
256 assert!(matches!(s, Status::Healthy));
257
258 let s = [].into_iter().collect();
259 assert!(matches!(s, Status::Healthy));
260 }
261}