relay_system/
controller.rs

1use std::io;
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use tokio::sync::watch;
6
7/// Determines how to shut down the Relay system.
8///
9/// To initiate a shutdown, use [`Controller::shutdown`].
10#[derive(Clone, Copy, Debug)]
11pub enum ShutdownMode {
12    /// Shut down gracefully within the configured timeout.
13    ///
14    /// This will signal all components to finish their work and leaves time to submit pending data
15    /// to the upstream or preserve it for restart.
16    Graceful,
17    /// Shut down immediately without finishing pending work.
18    ///
19    /// Pending data may be lost.
20    Immediate,
21}
22
23/// Shutdown request message sent by the [`Controller`] to subscribed services.
24///
25/// A handler has to ensure that it doesn't take longer than `timeout` to resolve the future.
26/// Ideally, open work is persisted or finished in an orderly manner but no new requests are
27/// accepted anymore.
28///
29/// After the timeout the system will shut down regardless of what the receivers of this message
30/// do.
31///
32/// The return value is fully ignored. It is only `Result` such that futures can be executed inside
33/// a handler.
34#[derive(Debug, Clone)]
35pub struct Shutdown {
36    /// The timeout for this shutdown. `None` indicates an immediate forced shutdown.
37    pub timeout: Option<Duration>,
38}
39
40type Channel<T> = (watch::Sender<Option<T>>, watch::Receiver<Option<T>>);
41
42/// Global channel to notify all services of a shutdown.
43static SHUTDOWN: LazyLock<Channel<Shutdown>> = LazyLock::new(|| watch::channel(None));
44
45/// Internal channel to trigger a manual shutdown via [`Controller::shutdown`].
46static MANUAL_SHUTDOWN: LazyLock<Channel<ShutdownMode>> = LazyLock::new(|| watch::channel(None));
47
48/// Notifies a service about an upcoming shutdown.
49///
50/// This handle is returned by [`Controller::shutdown_handle`].
51// TODO: The receiver of this message can not yet signal they have completed shutdown.
52#[derive(Debug, Clone)]
53pub struct ShutdownHandle(watch::Receiver<Option<Shutdown>>);
54
55impl ShutdownHandle {
56    /// Returns the current shutdown state.
57    pub fn get(&self) -> Option<Shutdown> {
58        self.0.borrow().clone()
59    }
60
61    /// Wait for a shutdown.
62    ///
63    /// This receives all shutdown signals since the [`Controller`] has been started, even before
64    /// this shutdown handle has been obtained.
65    ///
66    /// # Cancel safety
67    ///
68    /// This method is cancellation safe and can be used in `select!`.
69    pub async fn notified(&mut self) -> Shutdown {
70        while self.0.changed().await.is_ok() {
71            if let Some(shutdown) = &*self.0.borrow() {
72                return shutdown.clone();
73            }
74        }
75
76        Shutdown { timeout: None }
77    }
78
79    /// Wait for the shutdown and timeout to complete.
80    ///
81    /// This waits for the first shutdown signal and then conditionally waits for the shutdown
82    /// timeout. If the shutdown timeout is interrupted by another signal, this function resolves
83    /// immediately.
84    ///
85    /// # Cancel safety
86    ///
87    /// This method is **not** cancel safe.
88    pub async fn finished(mut self) {
89        // Wait for the first signal to initiate shutdown.
90        let shutdown = self.notified().await;
91
92        // If this is a graceful signal, wait for either the timeout to elapse, or any other signal
93        // to upgrade to an immediate shutdown.
94        if let Some(timeout) = shutdown.timeout {
95            tokio::select! {
96                _ = self.notified() => (),
97                _ = tokio::time::sleep(timeout) => (),
98            }
99        }
100    }
101
102    /// Returns `true` when the shutdown signal has been sent.
103    pub fn shutting_down(&self) -> bool {
104        self.0.borrow().is_some()
105    }
106}
107
108/// Service to start and gracefully stop the system runtime.
109///
110/// This service offers a static API to wait for a shutdown signal or manually initiate the Relay
111/// shutdown. To use this functionality, it first needs to be started with [`Controller::start`].
112///
113/// To shut down gracefully, other services can register with [`Controller::shutdown_handle`]. When
114/// a shutdown signal is sent to the process, every service will receive a [`Shutdown`] message with
115/// an optional timeout. To wait for the entire shutdown sequence including the shutdown timeout
116/// instead, use [`finished`](ShutdownHandle::finished). It resolves when the shutdown has
117/// completed.
118///
119/// ## Signals
120///
121/// By default, the controller watches for process signals and converts them into graceful or
122/// immediate shutdown messages. These signals are platform-dependent:
123///
124///  - Unix: `SIGINT` and `SIGQUIT` trigger an immediate shutdown, `SIGTERM` a graceful one.
125///  - Windows: `CTRL-C`, `CTRL-BREAK`, `CTRL-CLOSE` all trigger an immediate shutdown.
126///
127/// ### Example
128///
129/// ```
130/// # #[cfg(feature = "test")]
131/// use relay_system::{Controller, Service, ServiceSpawnExt, Shutdown, ShutdownMode, TokioServiceSpawn};
132/// # #[cfg(not(feature = "test"))]
133/// # use relay_system::{Controller, Service, ServiceSpawnExt, Shutdown, ShutdownMode};
134/// use std::time::Duration;
135///
136/// struct MyService;
137///
138/// impl Service for MyService {
139///     type Interface = ();
140///
141///     async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
142///         let mut shutdown = Controller::shutdown_handle();
143///         loop {
144///             tokio::select! {
145///                 shutdown = shutdown.notified() => break, // Handle shutdown here
146///                 Some(message) = rx.recv() => (),         // Process incoming message
147///             }
148///         }
149///     }
150/// }
151///
152/// #[tokio::main(flavor = "current_thread")]
153/// async fn main() {
154///     // Start the controller near the beginning of application bootstrap. This allows other
155///     // services to register for shutdown messages.
156///     Controller::start(Duration::from_millis(10));
157///
158///     // Start all other services. Controller::shutdown_handle will use the same controller
159///     // instance and receives the configured shutdown timeout.
160///     # #[cfg(feature = "test")]
161///     let addr = TokioServiceSpawn.start(MyService);
162///
163///     // By triggering a shutdown, all subscribed services will be notified. This happens
164///     // automatically when a signal is sent to the process (e.g. SIGINT or SIGTERM).
165///     Controller::shutdown(ShutdownMode::Graceful);
166///
167///     // Wait for the system to shut down before winding down the application.
168///     Controller::shutdown_handle().finished().await;
169/// }
170/// ```
171#[derive(Debug)]
172pub struct Controller;
173
174impl Controller {
175    /// Starts a controller that monitors shutdown signals.
176    #[track_caller]
177    pub fn start(shutdown_timeout: Duration) {
178        crate::spawn!(monitor_shutdown(shutdown_timeout));
179    }
180
181    /// Manually initiates the shutdown process of the system.
182    pub fn shutdown(mode: ShutdownMode) {
183        let (ref tx, _) = *MANUAL_SHUTDOWN;
184        tx.send(Some(mode)).ok();
185    }
186
187    /// Returns a [handle](ShutdownHandle) to receive shutdown notifications.
188    pub fn shutdown_handle() -> ShutdownHandle {
189        let (_, ref rx) = *SHUTDOWN;
190        ShutdownHandle(rx.clone())
191    }
192}
193
194#[cfg(unix)]
195async fn monitor_shutdown(timeout: Duration) -> io::Result<()> {
196    use tokio::signal::unix::{SignalKind, signal};
197
198    let mut sig_int = signal(SignalKind::interrupt())?;
199    let mut sig_quit = signal(SignalKind::quit())?;
200    let mut sig_term = signal(SignalKind::terminate())?;
201
202    let (ref tx, _) = *SHUTDOWN;
203    let mut manual = MANUAL_SHUTDOWN.1.clone();
204
205    loop {
206        let timeout = tokio::select! {
207            biased;
208
209            Some(()) = sig_int.recv() => {
210                relay_log::info!("SIGINT received, exiting");
211                None
212            }
213            Some(()) = sig_quit.recv() => {
214                relay_log::info!("SIGQUIT received, exiting");
215                None
216            }
217            Some(()) = sig_term.recv() => {
218                relay_log::info!("SIGTERM received, stopping in {}s", timeout.as_secs());
219                Some(timeout)
220            }
221            Ok(()) = manual.changed() => match *manual.borrow() {
222                Some(ShutdownMode::Graceful) => {
223                    relay_log::info!("Graceful shutdown initiated, stopping in {}s", timeout.as_secs());
224                    Some(timeout)
225                }
226                Some(ShutdownMode::Immediate) => {
227                    relay_log::info!("Immediate shutdown initiated");
228                    None
229                },
230                None => continue,
231            },
232
233            else => break,
234        };
235
236        tx.send(Some(Shutdown { timeout })).ok();
237    }
238
239    Ok(())
240}
241
242#[cfg(windows)]
243async fn monitor_shutdown(timeout: Duration) -> io::Result<()> {
244    use tokio::signal::windows::{ctrl_break, ctrl_c, ctrl_close};
245
246    let mut ctrl_c = ctrl_c()?;
247    let mut ctrl_break = ctrl_break()?;
248    let mut ctrl_close = ctrl_close()?;
249
250    let (ref tx, _) = *SHUTDOWN;
251    let mut manual = MANUAL_SHUTDOWN.1.clone();
252
253    loop {
254        let timeout = tokio::select! {
255            biased;
256
257            Some(()) = ctrl_c.recv() => {
258                relay_log::info!("CTRL-C received, exiting");
259                None
260            }
261            Some(()) = ctrl_break.recv() => {
262                relay_log::info!("CTRL-BREAK received, exiting");
263                None
264            }
265            Some(()) = ctrl_close.recv() => {
266                relay_log::info!("CTRL-CLOSE received, exiting");
267                None
268            }
269            Ok(()) = manual.changed() => match *manual.borrow() {
270                Some(ShutdownMode::Graceful) => {
271                    relay_log::info!("Graceful shutdown initiated, stopping in {}s", timeout.as_secs());
272                    Some(timeout)
273                }
274                Some(ShutdownMode::Immediate) => {
275                    relay_log::info!("Immediate shutdown initiated");
276                    None
277                },
278                None => continue,
279            },
280
281            else => break,
282        };
283
284        tx.send(Some(Shutdown { timeout })).ok();
285    }
286
287    Ok(())
288}
289
290/*
291TODO: Tests disabled since there is no isolation. Should be re-enabled once Controller-instances are
292passed into services.
293
294#[cfg(test)]
295mod tests {
296    use tokio::time::Instant;
297
298    use super::*;
299
300    #[tokio::test]
301    async fn handle_receives_immediate_shutdown() {
302        tokio::time::pause();
303
304        Controller::start(Duration::from_secs(1));
305        let mut handle = Controller::shutdown_handle();
306
307        Controller::shutdown(ShutdownMode::Immediate);
308        let shutdown = handle.notified().await;
309        assert_eq!(shutdown.timeout, None);
310    }
311
312    #[tokio::test]
313    async fn receives_graceful_shutdown() {
314        tokio::time::pause();
315
316        let timeout = Duration::from_secs(1);
317        Controller::start(timeout);
318        let mut handle = Controller::shutdown_handle();
319
320        Controller::shutdown(ShutdownMode::Immediate);
321        let shutdown = handle.notified().await;
322        assert_eq!(shutdown.timeout, Some(timeout));
323    }
324
325    #[tokio::test]
326    async fn handle_receives_past_shutdown() {
327        tokio::time::pause();
328
329        Controller::start(Duration::from_secs(1));
330        Controller::shutdown(ShutdownMode::Immediate);
331
332        Controller::shutdown_handle().notified().await;
333        // should not block
334    }
335
336    #[tokio::test]
337    async fn handle_waits_for_timeout() {
338        tokio::time::pause();
339
340        let timeout = Duration::from_secs(1);
341        Controller::start(timeout);
342        let shutdown = Controller::shutdown_handle();
343
344        let start = Instant::now();
345        Controller::shutdown(ShutdownMode::Graceful);
346        shutdown.finished().await;
347
348        assert_eq!(Instant::now() - start, timeout);
349    }
350
351    #[tokio::test]
352    async fn finish_exits_early() {
353        tokio::time::pause();
354
355        Controller::start(Duration::from_secs(1));
356        let shutdown = Controller::shutdown_handle();
357
358        let start = Instant::now();
359        Controller::shutdown(ShutdownMode::Graceful);
360        Controller::shutdown(ShutdownMode::Immediate);
361
362        shutdown.finished().await;
363        assert_eq!(Instant::now(), start);
364    }
365}
366*/