relay_system/controller.rs
1use std::io;
2use std::time::Duration;
3
4use once_cell::sync::Lazy;
5use tokio::sync::watch;
6
7/// Determines how to shut down the Relay system.
8///
9/// To initiate a shutdown, use [`Controller::shutdown`].
10#[derive(Clone, Copy, Debug)]
11pub enum ShutdownMode {
12 /// Shut down gracefully within the configured timeout.
13 ///
14 /// This will signal all components to finish their work and leaves time to submit pending data
15 /// to the upstream or preserve it for restart.
16 Graceful,
17 /// Shut down immediately without finishing pending work.
18 ///
19 /// Pending data may be lost.
20 Immediate,
21}
22
23/// Shutdown request message sent by the [`Controller`] to subscribed services.
24///
25/// A handler has to ensure that it doesn't take longer than `timeout` to resolve the future.
26/// Ideally, open work is persisted or finished in an orderly manner but no new requests are
27/// accepted anymore.
28///
29/// After the timeout the system will shut down regardless of what the receivers of this message
30/// do.
31///
32/// The return value is fully ignored. It is only `Result` such that futures can be executed inside
33/// a handler.
34#[derive(Debug, Clone)]
35pub struct Shutdown {
36 /// The timeout for this shutdown. `None` indicates an immediate forced shutdown.
37 pub timeout: Option<Duration>,
38}
39
40type Channel<T> = (watch::Sender<Option<T>>, watch::Receiver<Option<T>>);
41
42/// Global channel to notify all services of a shutdown.
43static SHUTDOWN: Lazy<Channel<Shutdown>> = Lazy::new(|| watch::channel(None));
44
45/// Internal channel to trigger a manual shutdown via [`Controller::shutdown`].
46static MANUAL_SHUTDOWN: Lazy<Channel<ShutdownMode>> = Lazy::new(|| watch::channel(None));
47
48/// Notifies a service about an upcoming shutdown.
49///
50/// This handle is returned by [`Controller::shutdown_handle`].
51// TODO: The receiver of this message can not yet signal they have completed shutdown.
52pub struct ShutdownHandle(watch::Receiver<Option<Shutdown>>);
53
54impl ShutdownHandle {
55 /// Returns the current shutdown state.
56 pub fn get(&self) -> Option<Shutdown> {
57 self.0.borrow().clone()
58 }
59
60 /// Wait for a shutdown.
61 ///
62 /// This receives all shutdown signals since the [`Controller`] has been started, even before
63 /// this shutdown handle has been obtained.
64 ///
65 /// # Cancel safety
66 ///
67 /// This method is cancellation safe and can be used in `select!`.
68 pub async fn notified(&mut self) -> Shutdown {
69 while self.0.changed().await.is_ok() {
70 if let Some(shutdown) = &*self.0.borrow() {
71 return shutdown.clone();
72 }
73 }
74
75 Shutdown { timeout: None }
76 }
77
78 /// Wait for the shutdown and timeout to complete.
79 ///
80 /// This waits for the first shutdown signal and then conditionally waits for the shutdown
81 /// timeout. If the shutdown timeout is interrupted by another signal, this function resolves
82 /// immediately.
83 ///
84 /// # Cancel safety
85 ///
86 /// This method is **not** cancel safe.
87 pub async fn finished(mut self) {
88 // Wait for the first signal to initiate shutdown.
89 let shutdown = self.notified().await;
90
91 // If this is a graceful signal, wait for either the timeout to elapse, or any other signal
92 // to upgrade to an immediate shutdown.
93 if let Some(timeout) = shutdown.timeout {
94 tokio::select! {
95 _ = self.notified() => (),
96 _ = tokio::time::sleep(timeout) => (),
97 }
98 }
99 }
100}
101
102/// Service to start and gracefully stop the system runtime.
103///
104/// This service offers a static API to wait for a shutdown signal or manually initiate the Relay
105/// shutdown. To use this functionality, it first needs to be started with [`Controller::start`].
106///
107/// To shut down gracefully, other services can register with [`Controller::shutdown_handle`]. When
108/// a shutdown signal is sent to the process, every service will receive a [`Shutdown`] message with
109/// an optional timeout. To wait for the entire shutdown sequence including the shutdown timeout
110/// instead, use [`finished`](ShutdownHandle::finished). It resolves when the shutdown has
111/// completed.
112///
113/// ## Signals
114///
115/// By default, the controller watches for process signals and converts them into graceful or
116/// immediate shutdown messages. These signals are platform-dependent:
117///
118/// - Unix: `SIGINT` and `SIGQUIT` trigger an immediate shutdown, `SIGTERM` a graceful one.
119/// - Windows: `CTRL-C`, `CTRL-BREAK`, `CTRL-CLOSE` all trigger an immediate shutdown.
120///
121/// ### Example
122///
123/// ```
124/// # #[cfg(feature = "test")]
125/// use relay_system::{Controller, Service, ServiceSpawnExt, Shutdown, ShutdownMode, TokioServiceSpawn};
126/// # #[cfg(not(feature = "test"))]
127/// # use relay_system::{Controller, Service, ServiceSpawnExt, Shutdown, ShutdownMode};
128/// use std::time::Duration;
129///
130/// struct MyService;
131///
132/// impl Service for MyService {
133/// type Interface = ();
134///
135/// async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
136/// let mut shutdown = Controller::shutdown_handle();
137/// loop {
138/// tokio::select! {
139/// shutdown = shutdown.notified() => break, // Handle shutdown here
140/// Some(message) = rx.recv() => (), // Process incoming message
141/// }
142/// }
143/// }
144/// }
145///
146/// #[tokio::main(flavor = "current_thread")]
147/// async fn main() {
148/// // Start the controller near the beginning of application bootstrap. This allows other
149/// // services to register for shutdown messages.
150/// Controller::start(Duration::from_millis(10));
151///
152/// // Start all other services. Controller::shutdown_handle will use the same controller
153/// // instance and receives the configured shutdown timeout.
154/// # #[cfg(feature = "test")]
155/// let addr = TokioServiceSpawn.start(MyService);
156///
157/// // By triggering a shutdown, all subscribed services will be notified. This happens
158/// // automatically when a signal is sent to the process (e.g. SIGINT or SIGTERM).
159/// Controller::shutdown(ShutdownMode::Graceful);
160///
161/// // Wait for the system to shut down before winding down the application.
162/// Controller::shutdown_handle().finished().await;
163/// }
164/// ```
165#[derive(Debug)]
166pub struct Controller;
167
168impl Controller {
169 /// Starts a controller that monitors shutdown signals.
170 #[track_caller]
171 pub fn start(shutdown_timeout: Duration) {
172 crate::spawn!(monitor_shutdown(shutdown_timeout));
173 }
174
175 /// Manually initiates the shutdown process of the system.
176 pub fn shutdown(mode: ShutdownMode) {
177 let (ref tx, _) = *MANUAL_SHUTDOWN;
178 tx.send(Some(mode)).ok();
179 }
180
181 /// Returns a [handle](ShutdownHandle) to receive shutdown notifications.
182 pub fn shutdown_handle() -> ShutdownHandle {
183 let (_, ref rx) = *SHUTDOWN;
184 ShutdownHandle(rx.clone())
185 }
186}
187
188#[cfg(unix)]
189async fn monitor_shutdown(timeout: Duration) -> io::Result<()> {
190 use tokio::signal::unix::{SignalKind, signal};
191
192 let mut sig_int = signal(SignalKind::interrupt())?;
193 let mut sig_quit = signal(SignalKind::quit())?;
194 let mut sig_term = signal(SignalKind::terminate())?;
195
196 let (ref tx, _) = *SHUTDOWN;
197 let mut manual = MANUAL_SHUTDOWN.1.clone();
198
199 loop {
200 let timeout = tokio::select! {
201 biased;
202
203 Some(()) = sig_int.recv() => {
204 relay_log::info!("SIGINT received, exiting");
205 None
206 }
207 Some(()) = sig_quit.recv() => {
208 relay_log::info!("SIGQUIT received, exiting");
209 None
210 }
211 Some(()) = sig_term.recv() => {
212 relay_log::info!("SIGTERM received, stopping in {}s", timeout.as_secs());
213 Some(timeout)
214 }
215 Ok(()) = manual.changed() => match *manual.borrow() {
216 Some(ShutdownMode::Graceful) => {
217 relay_log::info!("Graceful shutdown initiated, stopping in {}s", timeout.as_secs());
218 Some(timeout)
219 }
220 Some(ShutdownMode::Immediate) => {
221 relay_log::info!("Immediate shutdown initiated");
222 None
223 },
224 None => continue,
225 },
226
227 else => break,
228 };
229
230 tx.send(Some(Shutdown { timeout })).ok();
231 }
232
233 Ok(())
234}
235
236#[cfg(windows)]
237async fn monitor_shutdown(timeout: Duration) -> io::Result<()> {
238 use tokio::signal::windows::{ctrl_break, ctrl_c, ctrl_close};
239
240 let mut ctrl_c = ctrl_c()?;
241 let mut ctrl_break = ctrl_break()?;
242 let mut ctrl_close = ctrl_close()?;
243
244 let (ref tx, _) = *SHUTDOWN;
245 let mut manual = MANUAL_SHUTDOWN.1.clone();
246
247 loop {
248 let timeout = tokio::select! {
249 biased;
250
251 Some(()) = ctrl_c.recv() => {
252 relay_log::info!("CTRL-C received, exiting");
253 None
254 }
255 Some(()) = ctrl_break.recv() => {
256 relay_log::info!("CTRL-BREAK received, exiting");
257 None
258 }
259 Some(()) = ctrl_close.recv() => {
260 relay_log::info!("CTRL-CLOSE received, exiting");
261 None
262 }
263 Ok(()) = manual.changed() => match *manual.borrow() {
264 Some(ShutdownMode::Graceful) => {
265 relay_log::info!("Graceful shutdown initiated, stopping in {}s", timeout.as_secs());
266 Some(timeout)
267 }
268 Some(ShutdownMode::Immediate) => {
269 relay_log::info!("Immediate shutdown initiated");
270 None
271 },
272 None => continue,
273 },
274
275 else => break,
276 };
277
278 tx.send(Some(Shutdown { timeout })).ok();
279 }
280
281 Ok(())
282}
283
284/*
285TODO: Tests disabled since there is no isloation. Should be re-enabled once Controller-instances are
286passed into services.
287
288#[cfg(test)]
289mod tests {
290 use tokio::time::Instant;
291
292 use super::*;
293
294 #[tokio::test]
295 async fn handle_receives_immediate_shutdown() {
296 tokio::time::pause();
297
298 Controller::start(Duration::from_secs(1));
299 let mut handle = Controller::shutdown_handle();
300
301 Controller::shutdown(ShutdownMode::Immediate);
302 let shutdown = handle.notified().await;
303 assert_eq!(shutdown.timeout, None);
304 }
305
306 #[tokio::test]
307 async fn receives_graceful_shutdown() {
308 tokio::time::pause();
309
310 let timeout = Duration::from_secs(1);
311 Controller::start(timeout);
312 let mut handle = Controller::shutdown_handle();
313
314 Controller::shutdown(ShutdownMode::Immediate);
315 let shutdown = handle.notified().await;
316 assert_eq!(shutdown.timeout, Some(timeout));
317 }
318
319 #[tokio::test]
320 async fn handle_receives_past_shutdown() {
321 tokio::time::pause();
322
323 Controller::start(Duration::from_secs(1));
324 Controller::shutdown(ShutdownMode::Immediate);
325
326 Controller::shutdown_handle().notified().await;
327 // should not block
328 }
329
330 #[tokio::test]
331 async fn handle_waits_for_timeout() {
332 tokio::time::pause();
333
334 let timeout = Duration::from_secs(1);
335 Controller::start(timeout);
336 let shutdown = Controller::shutdown_handle();
337
338 let start = Instant::now();
339 Controller::shutdown(ShutdownMode::Graceful);
340 shutdown.finished().await;
341
342 assert_eq!(Instant::now() - start, timeout);
343 }
344
345 #[tokio::test]
346 async fn finish_exits_early() {
347 tokio::time::pause();
348
349 Controller::start(Duration::from_secs(1));
350 let shutdown = Controller::shutdown_handle();
351
352 let start = Instant::now();
353 Controller::shutdown(ShutdownMode::Graceful);
354 Controller::shutdown(ShutdownMode::Immediate);
355
356 shutdown.finished().await;
357 assert_eq!(Instant::now(), start);
358 }
359}
360*/