relay_system/controller.rs
1use std::io;
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use tokio::sync::watch;
6
7/// Determines how to shut down the Relay system.
8///
9/// To initiate a shutdown, use [`Controller::shutdown`].
10#[derive(Clone, Copy, Debug)]
11pub enum ShutdownMode {
12 /// Shut down gracefully within the configured timeout.
13 ///
14 /// This will signal all components to finish their work and leaves time to submit pending data
15 /// to the upstream or preserve it for restart.
16 Graceful,
17 /// Shut down immediately without finishing pending work.
18 ///
19 /// Pending data may be lost.
20 Immediate,
21}
22
23/// Shutdown request message sent by the [`Controller`] to subscribed services.
24///
25/// A handler has to ensure that it doesn't take longer than `timeout` to resolve the future.
26/// Ideally, open work is persisted or finished in an orderly manner but no new requests are
27/// accepted anymore.
28///
29/// After the timeout the system will shut down regardless of what the receivers of this message
30/// do.
31///
32/// The return value is fully ignored. It is only `Result` such that futures can be executed inside
33/// a handler.
34#[derive(Debug, Clone)]
35pub struct Shutdown {
36 /// The timeout for this shutdown. `None` indicates an immediate forced shutdown.
37 pub timeout: Option<Duration>,
38}
39
40type Channel<T> = (watch::Sender<Option<T>>, watch::Receiver<Option<T>>);
41
42/// Global channel to notify all services of a shutdown.
43static SHUTDOWN: LazyLock<Channel<Shutdown>> = LazyLock::new(|| watch::channel(None));
44
45/// Internal channel to trigger a manual shutdown via [`Controller::shutdown`].
46static MANUAL_SHUTDOWN: LazyLock<Channel<ShutdownMode>> = LazyLock::new(|| watch::channel(None));
47
48/// Notifies a service about an upcoming shutdown.
49///
50/// This handle is returned by [`Controller::shutdown_handle`].
51// TODO: The receiver of this message can not yet signal they have completed shutdown.
52#[derive(Debug, Clone)]
53pub struct ShutdownHandle(watch::Receiver<Option<Shutdown>>);
54
55impl ShutdownHandle {
56 /// Returns the current shutdown state.
57 pub fn get(&self) -> Option<Shutdown> {
58 self.0.borrow().clone()
59 }
60
61 /// Wait for a shutdown.
62 ///
63 /// This receives all shutdown signals since the [`Controller`] has been started, even before
64 /// this shutdown handle has been obtained.
65 ///
66 /// # Cancel safety
67 ///
68 /// This method is cancellation safe and can be used in `select!`.
69 pub async fn notified(&mut self) -> Shutdown {
70 while self.0.changed().await.is_ok() {
71 if let Some(shutdown) = &*self.0.borrow() {
72 return shutdown.clone();
73 }
74 }
75
76 Shutdown { timeout: None }
77 }
78
79 /// Wait for the shutdown and timeout to complete.
80 ///
81 /// This waits for the first shutdown signal and then conditionally waits for the shutdown
82 /// timeout. If the shutdown timeout is interrupted by another signal, this function resolves
83 /// immediately.
84 ///
85 /// # Cancel safety
86 ///
87 /// This method is **not** cancel safe.
88 pub async fn finished(mut self) {
89 // Wait for the first signal to initiate shutdown.
90 let shutdown = self.notified().await;
91
92 // If this is a graceful signal, wait for either the timeout to elapse, or any other signal
93 // to upgrade to an immediate shutdown.
94 if let Some(timeout) = shutdown.timeout {
95 tokio::select! {
96 _ = self.notified() => (),
97 _ = tokio::time::sleep(timeout) => (),
98 }
99 }
100 }
101
102 /// Returns `true` when the shutdown signal has been sent.
103 pub fn shutting_down(&self) -> bool {
104 self.0.borrow().is_some()
105 }
106}
107
108/// Service to start and gracefully stop the system runtime.
109///
110/// This service offers a static API to wait for a shutdown signal or manually initiate the Relay
111/// shutdown. To use this functionality, it first needs to be started with [`Controller::start`].
112///
113/// To shut down gracefully, other services can register with [`Controller::shutdown_handle`]. When
114/// a shutdown signal is sent to the process, every service will receive a [`Shutdown`] message with
115/// an optional timeout. To wait for the entire shutdown sequence including the shutdown timeout
116/// instead, use [`finished`](ShutdownHandle::finished). It resolves when the shutdown has
117/// completed.
118///
119/// ## Signals
120///
121/// By default, the controller watches for process signals and converts them into graceful or
122/// immediate shutdown messages. These signals are platform-dependent:
123///
124/// - Unix: `SIGINT` and `SIGQUIT` trigger an immediate shutdown, `SIGTERM` a graceful one.
125/// - Windows: `CTRL-C`, `CTRL-BREAK`, `CTRL-CLOSE` all trigger an immediate shutdown.
126///
127/// ### Example
128///
129/// ```
130/// # #[cfg(feature = "test")]
131/// use relay_system::{Controller, Service, ServiceSpawnExt, Shutdown, ShutdownMode, TokioServiceSpawn};
132/// # #[cfg(not(feature = "test"))]
133/// # use relay_system::{Controller, Service, ServiceSpawnExt, Shutdown, ShutdownMode};
134/// use std::time::Duration;
135///
136/// struct MyService;
137///
138/// impl Service for MyService {
139/// type Interface = ();
140///
141/// async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
142/// let mut shutdown = Controller::shutdown_handle();
143/// loop {
144/// tokio::select! {
145/// shutdown = shutdown.notified() => break, // Handle shutdown here
146/// Some(message) = rx.recv() => (), // Process incoming message
147/// }
148/// }
149/// }
150/// }
151///
152/// #[tokio::main(flavor = "current_thread")]
153/// async fn main() {
154/// // Start the controller near the beginning of application bootstrap. This allows other
155/// // services to register for shutdown messages.
156/// Controller::start(Duration::from_millis(10));
157///
158/// // Start all other services. Controller::shutdown_handle will use the same controller
159/// // instance and receives the configured shutdown timeout.
160/// # #[cfg(feature = "test")]
161/// let addr = TokioServiceSpawn.start(MyService);
162///
163/// // By triggering a shutdown, all subscribed services will be notified. This happens
164/// // automatically when a signal is sent to the process (e.g. SIGINT or SIGTERM).
165/// Controller::shutdown(ShutdownMode::Graceful);
166///
167/// // Wait for the system to shut down before winding down the application.
168/// Controller::shutdown_handle().finished().await;
169/// }
170/// ```
171#[derive(Debug)]
172pub struct Controller;
173
174impl Controller {
175 /// Starts a controller that monitors shutdown signals.
176 #[track_caller]
177 pub fn start(shutdown_timeout: Duration) {
178 crate::spawn!(monitor_shutdown(shutdown_timeout));
179 }
180
181 /// Manually initiates the shutdown process of the system.
182 pub fn shutdown(mode: ShutdownMode) {
183 let (ref tx, _) = *MANUAL_SHUTDOWN;
184 tx.send(Some(mode)).ok();
185 }
186
187 /// Returns a [handle](ShutdownHandle) to receive shutdown notifications.
188 pub fn shutdown_handle() -> ShutdownHandle {
189 let (_, ref rx) = *SHUTDOWN;
190 ShutdownHandle(rx.clone())
191 }
192}
193
194#[cfg(unix)]
195async fn monitor_shutdown(timeout: Duration) -> io::Result<()> {
196 use tokio::signal::unix::{SignalKind, signal};
197
198 let mut sig_int = signal(SignalKind::interrupt())?;
199 let mut sig_quit = signal(SignalKind::quit())?;
200 let mut sig_term = signal(SignalKind::terminate())?;
201
202 let (ref tx, _) = *SHUTDOWN;
203 let mut manual = MANUAL_SHUTDOWN.1.clone();
204
205 loop {
206 let timeout = tokio::select! {
207 biased;
208
209 Some(()) = sig_int.recv() => {
210 relay_log::info!("SIGINT received, exiting");
211 None
212 }
213 Some(()) = sig_quit.recv() => {
214 relay_log::info!("SIGQUIT received, exiting");
215 None
216 }
217 Some(()) = sig_term.recv() => {
218 relay_log::info!("SIGTERM received, stopping in {}s", timeout.as_secs());
219 Some(timeout)
220 }
221 Ok(()) = manual.changed() => match *manual.borrow() {
222 Some(ShutdownMode::Graceful) => {
223 relay_log::info!("Graceful shutdown initiated, stopping in {}s", timeout.as_secs());
224 Some(timeout)
225 }
226 Some(ShutdownMode::Immediate) => {
227 relay_log::info!("Immediate shutdown initiated");
228 None
229 },
230 None => continue,
231 },
232
233 else => break,
234 };
235
236 tx.send(Some(Shutdown { timeout })).ok();
237 }
238
239 Ok(())
240}
241
242#[cfg(windows)]
243async fn monitor_shutdown(timeout: Duration) -> io::Result<()> {
244 use tokio::signal::windows::{ctrl_break, ctrl_c, ctrl_close};
245
246 let mut ctrl_c = ctrl_c()?;
247 let mut ctrl_break = ctrl_break()?;
248 let mut ctrl_close = ctrl_close()?;
249
250 let (ref tx, _) = *SHUTDOWN;
251 let mut manual = MANUAL_SHUTDOWN.1.clone();
252
253 loop {
254 let timeout = tokio::select! {
255 biased;
256
257 Some(()) = ctrl_c.recv() => {
258 relay_log::info!("CTRL-C received, exiting");
259 None
260 }
261 Some(()) = ctrl_break.recv() => {
262 relay_log::info!("CTRL-BREAK received, exiting");
263 None
264 }
265 Some(()) = ctrl_close.recv() => {
266 relay_log::info!("CTRL-CLOSE received, exiting");
267 None
268 }
269 Ok(()) = manual.changed() => match *manual.borrow() {
270 Some(ShutdownMode::Graceful) => {
271 relay_log::info!("Graceful shutdown initiated, stopping in {}s", timeout.as_secs());
272 Some(timeout)
273 }
274 Some(ShutdownMode::Immediate) => {
275 relay_log::info!("Immediate shutdown initiated");
276 None
277 },
278 None => continue,
279 },
280
281 else => break,
282 };
283
284 tx.send(Some(Shutdown { timeout })).ok();
285 }
286
287 Ok(())
288}
289
290/*
291TODO: Tests disabled since there is no isolation. Should be re-enabled once Controller-instances are
292passed into services.
293
294#[cfg(test)]
295mod tests {
296 use tokio::time::Instant;
297
298 use super::*;
299
300 #[tokio::test]
301 async fn handle_receives_immediate_shutdown() {
302 tokio::time::pause();
303
304 Controller::start(Duration::from_secs(1));
305 let mut handle = Controller::shutdown_handle();
306
307 Controller::shutdown(ShutdownMode::Immediate);
308 let shutdown = handle.notified().await;
309 assert_eq!(shutdown.timeout, None);
310 }
311
312 #[tokio::test]
313 async fn receives_graceful_shutdown() {
314 tokio::time::pause();
315
316 let timeout = Duration::from_secs(1);
317 Controller::start(timeout);
318 let mut handle = Controller::shutdown_handle();
319
320 Controller::shutdown(ShutdownMode::Immediate);
321 let shutdown = handle.notified().await;
322 assert_eq!(shutdown.timeout, Some(timeout));
323 }
324
325 #[tokio::test]
326 async fn handle_receives_past_shutdown() {
327 tokio::time::pause();
328
329 Controller::start(Duration::from_secs(1));
330 Controller::shutdown(ShutdownMode::Immediate);
331
332 Controller::shutdown_handle().notified().await;
333 // should not block
334 }
335
336 #[tokio::test]
337 async fn handle_waits_for_timeout() {
338 tokio::time::pause();
339
340 let timeout = Duration::from_secs(1);
341 Controller::start(timeout);
342 let shutdown = Controller::shutdown_handle();
343
344 let start = Instant::now();
345 Controller::shutdown(ShutdownMode::Graceful);
346 shutdown.finished().await;
347
348 assert_eq!(Instant::now() - start, timeout);
349 }
350
351 #[tokio::test]
352 async fn finish_exits_early() {
353 tokio::time::pause();
354
355 Controller::start(Duration::from_secs(1));
356 let shutdown = Controller::shutdown_handle();
357
358 let start = Instant::now();
359 Controller::shutdown(ShutdownMode::Graceful);
360 Controller::shutdown(ShutdownMode::Immediate);
361
362 shutdown.finished().await;
363 assert_eq!(Instant::now(), start);
364 }
365}
366*/