1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
use std::io;
use std::time::Duration;

use once_cell::sync::Lazy;
use tokio::sync::watch;

/// Determines how to shut down the Relay system.
///
/// To initiate a shutdown, use [`Controller::shutdown`].
#[derive(Clone, Copy, Debug)]
pub enum ShutdownMode {
    /// Shut down gracefully within the configured timeout.
    ///
    /// This will signal all components to finish their work and leaves time to submit pending data
    /// to the upstream or preserve it for restart.
    Graceful,
    /// Shut down immediately without finishing pending work.
    ///
    /// Pending data may be lost.
    Immediate,
}

/// Shutdown request message sent by the [`Controller`] to subscribed services.
///
/// A handler has to ensure that it doesn't take longer than `timeout` to resolve the future.
/// Ideally, open work is persisted or finished in an orderly manner but no new requests are
/// accepted anymore.
///
/// After the timeout the system will shut down regardless of what the receivers of this message
/// do.
///
/// The return value is fully ignored. It is only `Result` such that futures can be executed inside
/// a handler.
#[derive(Debug, Clone)]
pub struct Shutdown {
    /// The timeout for this shutdown. `None` indicates an immediate forced shutdown.
    pub timeout: Option<Duration>,
}

type Channel<T> = (watch::Sender<Option<T>>, watch::Receiver<Option<T>>);

/// Global channel to notify all services of a shutdown.
static SHUTDOWN: Lazy<Channel<Shutdown>> = Lazy::new(|| watch::channel(None));

/// Internal channel to trigger a manual shutdown via [`Controller::shutdown`].
static MANUAL_SHUTDOWN: Lazy<Channel<ShutdownMode>> = Lazy::new(|| watch::channel(None));

/// Notifies a service about an upcoming shutdown.
///
/// This handle is returned by [`Controller::shutdown_handle`].
// TODO: The receiver of this message can not yet signal they have completed shutdown.
pub struct ShutdownHandle(watch::Receiver<Option<Shutdown>>);

impl ShutdownHandle {
    /// Returns the current shutdown state.
    pub fn get(&self) -> Option<Shutdown> {
        self.0.borrow().clone()
    }

    /// Wait for a shutdown.
    ///
    /// This receives all shutdown signals since the [`Controller`] has been started, even before
    /// this shutdown handle has been obtained.
    ///
    /// # Cancel safety
    ///
    /// This method is cancellation safe and can be used in `select!`.
    pub async fn notified(&mut self) -> Shutdown {
        while self.0.changed().await.is_ok() {
            if let Some(shutdown) = &*self.0.borrow() {
                return shutdown.clone();
            }
        }

        Shutdown { timeout: None }
    }

    /// Wait for the shutdown and timeout to complete.
    ///
    /// This waits for the first shutdown signal and then conditionally waits for the shutdown
    /// timeout. If the shutdown timeout is interrupted by another signal, this function resolves
    /// immediately.
    ///
    /// # Cancel safety
    ///
    /// This method is **not** cancel safe.
    pub async fn finished(mut self) {
        // Wait for the first signal to initiate shutdown.
        let shutdown = self.notified().await;

        // If this is a graceful signal, wait for either the timeout to elapse, or any other signal
        // to upgrade to an immediate shutdown.
        if let Some(timeout) = shutdown.timeout {
            tokio::select! {
                _ = self.notified() => (),
                _ = tokio::time::sleep(timeout) => (),
            }
        }
    }
}

/// Service to start and gracefully stop the system runtime.
///
/// This service offers a static API to wait for a shutdown signal or manually initiate the Relay
/// shutdown. To use this functionality, it first needs to be started with [`Controller::start`].
///
/// To shut down gracefully, other services can register with [`Controller::shutdown_handle`]. When
/// a shutdown signal is sent to the process, every service will receive a [`Shutdown`] message with
/// an optional timeout. To wait for the entire shutdown sequence including the shutdown timeout
/// instead, use [`finished`](ShutdownHandle::finished). It resolves when the shutdown has
/// completed.
///
/// ## Signals
///
/// By default, the controller watches for process signals and converts them into graceful or
/// immediate shutdown messages. These signals are platform-dependent:
///
///  - Unix: `SIGINT` and `SIGQUIT` trigger an immediate shutdown, `SIGTERM` a graceful one.
///  - Windows: `CTRL-C`, `CTRL-BREAK`, `CTRL-CLOSE` all trigger an immediate shutdown.
///
/// ### Example
///
/// ```
/// use relay_system::{Controller, Service, Shutdown, ShutdownMode};
/// use std::time::Duration;
///
/// struct MyService;
///
/// impl Service for MyService {
///     type Interface = ();
///
///     fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
///         tokio::spawn(async move {
///             let mut shutdown = Controller::shutdown_handle();
///
///             loop {
///                 tokio::select! {
///                     shutdown = shutdown.notified() => break, // Handle shutdown here
///                     Some(message) = rx.recv() => (),         // Process incoming message
///                 }
///             }
///         });
///     }
/// }
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() {
///     // Start the controller near the beginning of application bootstrap. This allows other
///     // services to register for shutdown messages.
///     Controller::start(Duration::from_millis(10));
///
///     // Start all other services. Controller::shutdown_handle will use the same controller
///     // instance and receives the configured shutdown timeout.
///     let addr = MyService.start();
///
///     // By triggering a shutdown, all attached services will be notified. This happens
///     // automatically when a signal is sent to the process (e.g. SIGINT or SIGTERM).
///     Controller::shutdown(ShutdownMode::Graceful);
///
///     // Wait for the system to shut down before winding down the application.
///     Controller::shutdown_handle().finished().await;
/// }
/// ```
#[derive(Debug)]
pub struct Controller;

impl Controller {
    /// Starts a controller that monitors shutdown signals.
    pub fn start(shutdown_timeout: Duration) {
        tokio::spawn(monitor_shutdown(shutdown_timeout));
    }

    /// Manually initiates the shutdown process of the system.
    pub fn shutdown(mode: ShutdownMode) {
        let (ref tx, _) = *MANUAL_SHUTDOWN;
        tx.send(Some(mode)).ok();
    }

    /// Returns a [handle](ShutdownHandle) to receive shutdown notifications.
    pub fn shutdown_handle() -> ShutdownHandle {
        let (_, ref rx) = *SHUTDOWN;
        ShutdownHandle(rx.clone())
    }
}

#[cfg(unix)]
async fn monitor_shutdown(timeout: Duration) -> io::Result<()> {
    use tokio::signal::unix::{signal, SignalKind};

    let mut sig_int = signal(SignalKind::interrupt())?;
    let mut sig_quit = signal(SignalKind::quit())?;
    let mut sig_term = signal(SignalKind::terminate())?;

    let (ref tx, _) = *SHUTDOWN;
    let mut manual = MANUAL_SHUTDOWN.1.clone();

    loop {
        let timeout = tokio::select! {
            biased;

            Some(()) = sig_int.recv() => {
                relay_log::info!("SIGINT received, exiting");
                None
            }
            Some(()) = sig_quit.recv() => {
                relay_log::info!("SIGQUIT received, exiting");
                None
            }
            Some(()) = sig_term.recv() => {
                relay_log::info!("SIGTERM received, stopping in {}s", timeout.as_secs());
                Some(timeout)
            }
            Ok(()) = manual.changed() => match *manual.borrow() {
                Some(ShutdownMode::Graceful) => {
                    relay_log::info!("Graceful shutdown initiated, stopping in {}s", timeout.as_secs());
                    Some(timeout)
                }
                Some(ShutdownMode::Immediate) => {
                    relay_log::info!("Immediate shutdown initiated");
                    None
                },
                None => continue,
            },

            else => break,
        };

        tx.send(Some(Shutdown { timeout })).ok();
    }

    Ok(())
}

#[cfg(windows)]
async fn monitor_shutdown(timeout: Duration) -> io::Result<()> {
    use tokio::signal::windows::{ctrl_break, ctrl_c, ctrl_close};

    let mut ctrl_c = ctrl_c()?;
    let mut ctrl_break = ctrl_break()?;
    let mut ctrl_close = ctrl_close()?;

    let (ref tx, _) = *SHUTDOWN;
    let mut manual = MANUAL_SHUTDOWN.1.clone();

    loop {
        let timeout = tokio::select! {
            biased;

            Some(()) = ctrl_c.recv() => {
                relay_log::info!("CTRL-C received, exiting");
                None
            }
            Some(()) = ctrl_break.recv() => {
                relay_log::info!("CTRL-BREAK received, exiting");
                None
            }
            Some(()) = ctrl_close.recv() => {
                relay_log::info!("CTRL-CLOSE received, exiting");
                None
            }
            Ok(()) = manual.changed() => match *manual.borrow() {
                Some(ShutdownMode::Graceful) => {
                    relay_log::info!("Graceful shutdown initiated, stopping in {}s", timeout.as_secs());
                    Some(timeout)
                }
                Some(ShutdownMode::Immediate) => {
                    relay_log::info!("Immediate shutdown initiated");
                    None
                },
                None => continue,
            },

            else => break,
        };

        tx.send(Some(Shutdown { timeout })).ok();
    }

    Ok(())
}

/*
TODO: Tests disabled since there is no isloation. Should be re-enabled once Controller-instances are
passed into services.

#[cfg(test)]
mod tests {
    use tokio::time::Instant;

    use super::*;

    #[tokio::test]
    async fn handle_receives_immediate_shutdown() {
        tokio::time::pause();

        Controller::start(Duration::from_secs(1));
        let mut handle = Controller::shutdown_handle();

        Controller::shutdown(ShutdownMode::Immediate);
        let shutdown = handle.notified().await;
        assert_eq!(shutdown.timeout, None);
    }

    #[tokio::test]
    async fn receives_graceful_shutdown() {
        tokio::time::pause();

        let timeout = Duration::from_secs(1);
        Controller::start(timeout);
        let mut handle = Controller::shutdown_handle();

        Controller::shutdown(ShutdownMode::Immediate);
        let shutdown = handle.notified().await;
        assert_eq!(shutdown.timeout, Some(timeout));
    }

    #[tokio::test]
    async fn handle_receives_past_shutdown() {
        tokio::time::pause();

        Controller::start(Duration::from_secs(1));
        Controller::shutdown(ShutdownMode::Immediate);

        Controller::shutdown_handle().notified().await;
        // should not block
    }

    #[tokio::test]
    async fn handle_waits_for_timeout() {
        tokio::time::pause();

        let timeout = Duration::from_secs(1);
        Controller::start(timeout);
        let shutdown = Controller::shutdown_handle();

        let start = Instant::now();
        Controller::shutdown(ShutdownMode::Graceful);
        shutdown.finished().await;

        assert_eq!(Instant::now() - start, timeout);
    }

    #[tokio::test]
    async fn finish_exits_early() {
        tokio::time::pause();

        Controller::start(Duration::from_secs(1));
        let shutdown = Controller::shutdown_handle();

        let start = Instant::now();
        Controller::shutdown(ShutdownMode::Graceful);
        Controller::shutdown(ShutdownMode::Immediate);

        shutdown.finished().await;
        assert_eq!(Instant::now(), start);
    }
}
*/