relay_system/
controller.rs

1use std::io;
2use std::time::Duration;
3
4use once_cell::sync::Lazy;
5use tokio::sync::watch;
6
7/// Determines how to shut down the Relay system.
8///
9/// To initiate a shutdown, use [`Controller::shutdown`].
10#[derive(Clone, Copy, Debug)]
11pub enum ShutdownMode {
12    /// Shut down gracefully within the configured timeout.
13    ///
14    /// This will signal all components to finish their work and leaves time to submit pending data
15    /// to the upstream or preserve it for restart.
16    Graceful,
17    /// Shut down immediately without finishing pending work.
18    ///
19    /// Pending data may be lost.
20    Immediate,
21}
22
23/// Shutdown request message sent by the [`Controller`] to subscribed services.
24///
25/// A handler has to ensure that it doesn't take longer than `timeout` to resolve the future.
26/// Ideally, open work is persisted or finished in an orderly manner but no new requests are
27/// accepted anymore.
28///
29/// After the timeout the system will shut down regardless of what the receivers of this message
30/// do.
31///
32/// The return value is fully ignored. It is only `Result` such that futures can be executed inside
33/// a handler.
34#[derive(Debug, Clone)]
35pub struct Shutdown {
36    /// The timeout for this shutdown. `None` indicates an immediate forced shutdown.
37    pub timeout: Option<Duration>,
38}
39
40type Channel<T> = (watch::Sender<Option<T>>, watch::Receiver<Option<T>>);
41
42/// Global channel to notify all services of a shutdown.
43static SHUTDOWN: Lazy<Channel<Shutdown>> = Lazy::new(|| watch::channel(None));
44
45/// Internal channel to trigger a manual shutdown via [`Controller::shutdown`].
46static MANUAL_SHUTDOWN: Lazy<Channel<ShutdownMode>> = Lazy::new(|| watch::channel(None));
47
48/// Notifies a service about an upcoming shutdown.
49///
50/// This handle is returned by [`Controller::shutdown_handle`].
51// TODO: The receiver of this message can not yet signal they have completed shutdown.
52pub struct ShutdownHandle(watch::Receiver<Option<Shutdown>>);
53
54impl ShutdownHandle {
55    /// Returns the current shutdown state.
56    pub fn get(&self) -> Option<Shutdown> {
57        self.0.borrow().clone()
58    }
59
60    /// Wait for a shutdown.
61    ///
62    /// This receives all shutdown signals since the [`Controller`] has been started, even before
63    /// this shutdown handle has been obtained.
64    ///
65    /// # Cancel safety
66    ///
67    /// This method is cancellation safe and can be used in `select!`.
68    pub async fn notified(&mut self) -> Shutdown {
69        while self.0.changed().await.is_ok() {
70            if let Some(shutdown) = &*self.0.borrow() {
71                return shutdown.clone();
72            }
73        }
74
75        Shutdown { timeout: None }
76    }
77
78    /// Wait for the shutdown and timeout to complete.
79    ///
80    /// This waits for the first shutdown signal and then conditionally waits for the shutdown
81    /// timeout. If the shutdown timeout is interrupted by another signal, this function resolves
82    /// immediately.
83    ///
84    /// # Cancel safety
85    ///
86    /// This method is **not** cancel safe.
87    pub async fn finished(mut self) {
88        // Wait for the first signal to initiate shutdown.
89        let shutdown = self.notified().await;
90
91        // If this is a graceful signal, wait for either the timeout to elapse, or any other signal
92        // to upgrade to an immediate shutdown.
93        if let Some(timeout) = shutdown.timeout {
94            tokio::select! {
95                _ = self.notified() => (),
96                _ = tokio::time::sleep(timeout) => (),
97            }
98        }
99    }
100}
101
102/// Service to start and gracefully stop the system runtime.
103///
104/// This service offers a static API to wait for a shutdown signal or manually initiate the Relay
105/// shutdown. To use this functionality, it first needs to be started with [`Controller::start`].
106///
107/// To shut down gracefully, other services can register with [`Controller::shutdown_handle`]. When
108/// a shutdown signal is sent to the process, every service will receive a [`Shutdown`] message with
109/// an optional timeout. To wait for the entire shutdown sequence including the shutdown timeout
110/// instead, use [`finished`](ShutdownHandle::finished). It resolves when the shutdown has
111/// completed.
112///
113/// ## Signals
114///
115/// By default, the controller watches for process signals and converts them into graceful or
116/// immediate shutdown messages. These signals are platform-dependent:
117///
118///  - Unix: `SIGINT` and `SIGQUIT` trigger an immediate shutdown, `SIGTERM` a graceful one.
119///  - Windows: `CTRL-C`, `CTRL-BREAK`, `CTRL-CLOSE` all trigger an immediate shutdown.
120///
121/// ### Example
122///
123/// ```
124/// # #[cfg(feature = "test")]
125/// use relay_system::{Controller, Service, ServiceSpawnExt, Shutdown, ShutdownMode, TokioServiceSpawn};
126/// # #[cfg(not(feature = "test"))]
127/// # use relay_system::{Controller, Service, ServiceSpawnExt, Shutdown, ShutdownMode};
128/// use std::time::Duration;
129///
130/// struct MyService;
131///
132/// impl Service for MyService {
133///     type Interface = ();
134///
135///     async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
136///         let mut shutdown = Controller::shutdown_handle();
137///         loop {
138///             tokio::select! {
139///                 shutdown = shutdown.notified() => break, // Handle shutdown here
140///                 Some(message) = rx.recv() => (),         // Process incoming message
141///             }
142///         }
143///     }
144/// }
145///
146/// #[tokio::main(flavor = "current_thread")]
147/// async fn main() {
148///     // Start the controller near the beginning of application bootstrap. This allows other
149///     // services to register for shutdown messages.
150///     Controller::start(Duration::from_millis(10));
151///
152///     // Start all other services. Controller::shutdown_handle will use the same controller
153///     // instance and receives the configured shutdown timeout.
154///     # #[cfg(feature = "test")]
155///     let addr = TokioServiceSpawn.start(MyService);
156///
157///     // By triggering a shutdown, all subscribed services will be notified. This happens
158///     // automatically when a signal is sent to the process (e.g. SIGINT or SIGTERM).
159///     Controller::shutdown(ShutdownMode::Graceful);
160///
161///     // Wait for the system to shut down before winding down the application.
162///     Controller::shutdown_handle().finished().await;
163/// }
164/// ```
165#[derive(Debug)]
166pub struct Controller;
167
168impl Controller {
169    /// Starts a controller that monitors shutdown signals.
170    #[track_caller]
171    pub fn start(shutdown_timeout: Duration) {
172        crate::spawn!(monitor_shutdown(shutdown_timeout));
173    }
174
175    /// Manually initiates the shutdown process of the system.
176    pub fn shutdown(mode: ShutdownMode) {
177        let (ref tx, _) = *MANUAL_SHUTDOWN;
178        tx.send(Some(mode)).ok();
179    }
180
181    /// Returns a [handle](ShutdownHandle) to receive shutdown notifications.
182    pub fn shutdown_handle() -> ShutdownHandle {
183        let (_, ref rx) = *SHUTDOWN;
184        ShutdownHandle(rx.clone())
185    }
186}
187
188#[cfg(unix)]
189async fn monitor_shutdown(timeout: Duration) -> io::Result<()> {
190    use tokio::signal::unix::{SignalKind, signal};
191
192    let mut sig_int = signal(SignalKind::interrupt())?;
193    let mut sig_quit = signal(SignalKind::quit())?;
194    let mut sig_term = signal(SignalKind::terminate())?;
195
196    let (ref tx, _) = *SHUTDOWN;
197    let mut manual = MANUAL_SHUTDOWN.1.clone();
198
199    loop {
200        let timeout = tokio::select! {
201            biased;
202
203            Some(()) = sig_int.recv() => {
204                relay_log::info!("SIGINT received, exiting");
205                None
206            }
207            Some(()) = sig_quit.recv() => {
208                relay_log::info!("SIGQUIT received, exiting");
209                None
210            }
211            Some(()) = sig_term.recv() => {
212                relay_log::info!("SIGTERM received, stopping in {}s", timeout.as_secs());
213                Some(timeout)
214            }
215            Ok(()) = manual.changed() => match *manual.borrow() {
216                Some(ShutdownMode::Graceful) => {
217                    relay_log::info!("Graceful shutdown initiated, stopping in {}s", timeout.as_secs());
218                    Some(timeout)
219                }
220                Some(ShutdownMode::Immediate) => {
221                    relay_log::info!("Immediate shutdown initiated");
222                    None
223                },
224                None => continue,
225            },
226
227            else => break,
228        };
229
230        tx.send(Some(Shutdown { timeout })).ok();
231    }
232
233    Ok(())
234}
235
236#[cfg(windows)]
237async fn monitor_shutdown(timeout: Duration) -> io::Result<()> {
238    use tokio::signal::windows::{ctrl_break, ctrl_c, ctrl_close};
239
240    let mut ctrl_c = ctrl_c()?;
241    let mut ctrl_break = ctrl_break()?;
242    let mut ctrl_close = ctrl_close()?;
243
244    let (ref tx, _) = *SHUTDOWN;
245    let mut manual = MANUAL_SHUTDOWN.1.clone();
246
247    loop {
248        let timeout = tokio::select! {
249            biased;
250
251            Some(()) = ctrl_c.recv() => {
252                relay_log::info!("CTRL-C received, exiting");
253                None
254            }
255            Some(()) = ctrl_break.recv() => {
256                relay_log::info!("CTRL-BREAK received, exiting");
257                None
258            }
259            Some(()) = ctrl_close.recv() => {
260                relay_log::info!("CTRL-CLOSE received, exiting");
261                None
262            }
263            Ok(()) = manual.changed() => match *manual.borrow() {
264                Some(ShutdownMode::Graceful) => {
265                    relay_log::info!("Graceful shutdown initiated, stopping in {}s", timeout.as_secs());
266                    Some(timeout)
267                }
268                Some(ShutdownMode::Immediate) => {
269                    relay_log::info!("Immediate shutdown initiated");
270                    None
271                },
272                None => continue,
273            },
274
275            else => break,
276        };
277
278        tx.send(Some(Shutdown { timeout })).ok();
279    }
280
281    Ok(())
282}
283
284/*
285TODO: Tests disabled since there is no isloation. Should be re-enabled once Controller-instances are
286passed into services.
287
288#[cfg(test)]
289mod tests {
290    use tokio::time::Instant;
291
292    use super::*;
293
294    #[tokio::test]
295    async fn handle_receives_immediate_shutdown() {
296        tokio::time::pause();
297
298        Controller::start(Duration::from_secs(1));
299        let mut handle = Controller::shutdown_handle();
300
301        Controller::shutdown(ShutdownMode::Immediate);
302        let shutdown = handle.notified().await;
303        assert_eq!(shutdown.timeout, None);
304    }
305
306    #[tokio::test]
307    async fn receives_graceful_shutdown() {
308        tokio::time::pause();
309
310        let timeout = Duration::from_secs(1);
311        Controller::start(timeout);
312        let mut handle = Controller::shutdown_handle();
313
314        Controller::shutdown(ShutdownMode::Immediate);
315        let shutdown = handle.notified().await;
316        assert_eq!(shutdown.timeout, Some(timeout));
317    }
318
319    #[tokio::test]
320    async fn handle_receives_past_shutdown() {
321        tokio::time::pause();
322
323        Controller::start(Duration::from_secs(1));
324        Controller::shutdown(ShutdownMode::Immediate);
325
326        Controller::shutdown_handle().notified().await;
327        // should not block
328    }
329
330    #[tokio::test]
331    async fn handle_waits_for_timeout() {
332        tokio::time::pause();
333
334        let timeout = Duration::from_secs(1);
335        Controller::start(timeout);
336        let shutdown = Controller::shutdown_handle();
337
338        let start = Instant::now();
339        Controller::shutdown(ShutdownMode::Graceful);
340        shutdown.finished().await;
341
342        assert_eq!(Instant::now() - start, timeout);
343    }
344
345    #[tokio::test]
346    async fn finish_exits_early() {
347        tokio::time::pause();
348
349        Controller::start(Duration::from_secs(1));
350        let shutdown = Controller::shutdown_handle();
351
352        let start = Instant::now();
353        Controller::shutdown(ShutdownMode::Graceful);
354        Controller::shutdown(ShutdownMode::Immediate);
355
356        shutdown.finished().await;
357        assert_eq!(Instant::now(), start);
358    }
359}
360*/