relay_system/controller.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
use std::io;
use std::time::Duration;
use once_cell::sync::Lazy;
use tokio::sync::watch;
/// Determines how to shut down the Relay system.
///
/// To initiate a shutdown, use [`Controller::shutdown`].
#[derive(Clone, Copy, Debug)]
pub enum ShutdownMode {
/// Shut down gracefully within the configured timeout.
///
/// This will signal all components to finish their work and leaves time to submit pending data
/// to the upstream or preserve it for restart.
Graceful,
/// Shut down immediately without finishing pending work.
///
/// Pending data may be lost.
Immediate,
}
/// Shutdown request message sent by the [`Controller`] to subscribed services.
///
/// A handler has to ensure that it doesn't take longer than `timeout` to resolve the future.
/// Ideally, open work is persisted or finished in an orderly manner but no new requests are
/// accepted anymore.
///
/// After the timeout the system will shut down regardless of what the receivers of this message
/// do.
///
/// The return value is fully ignored. It is only `Result` such that futures can be executed inside
/// a handler.
#[derive(Debug, Clone)]
pub struct Shutdown {
/// The timeout for this shutdown. `None` indicates an immediate forced shutdown.
pub timeout: Option<Duration>,
}
type Channel<T> = (watch::Sender<Option<T>>, watch::Receiver<Option<T>>);
/// Global channel to notify all services of a shutdown.
static SHUTDOWN: Lazy<Channel<Shutdown>> = Lazy::new(|| watch::channel(None));
/// Internal channel to trigger a manual shutdown via [`Controller::shutdown`].
static MANUAL_SHUTDOWN: Lazy<Channel<ShutdownMode>> = Lazy::new(|| watch::channel(None));
/// Notifies a service about an upcoming shutdown.
///
/// This handle is returned by [`Controller::shutdown_handle`].
// TODO: The receiver of this message can not yet signal they have completed shutdown.
pub struct ShutdownHandle(watch::Receiver<Option<Shutdown>>);
impl ShutdownHandle {
/// Returns the current shutdown state.
pub fn get(&self) -> Option<Shutdown> {
self.0.borrow().clone()
}
/// Wait for a shutdown.
///
/// This receives all shutdown signals since the [`Controller`] has been started, even before
/// this shutdown handle has been obtained.
///
/// # Cancel safety
///
/// This method is cancellation safe and can be used in `select!`.
pub async fn notified(&mut self) -> Shutdown {
while self.0.changed().await.is_ok() {
if let Some(shutdown) = &*self.0.borrow() {
return shutdown.clone();
}
}
Shutdown { timeout: None }
}
/// Wait for the shutdown and timeout to complete.
///
/// This waits for the first shutdown signal and then conditionally waits for the shutdown
/// timeout. If the shutdown timeout is interrupted by another signal, this function resolves
/// immediately.
///
/// # Cancel safety
///
/// This method is **not** cancel safe.
pub async fn finished(mut self) {
// Wait for the first signal to initiate shutdown.
let shutdown = self.notified().await;
// If this is a graceful signal, wait for either the timeout to elapse, or any other signal
// to upgrade to an immediate shutdown.
if let Some(timeout) = shutdown.timeout {
tokio::select! {
_ = self.notified() => (),
_ = tokio::time::sleep(timeout) => (),
}
}
}
}
/// Service to start and gracefully stop the system runtime.
///
/// This service offers a static API to wait for a shutdown signal or manually initiate the Relay
/// shutdown. To use this functionality, it first needs to be started with [`Controller::start`].
///
/// To shut down gracefully, other services can register with [`Controller::shutdown_handle`]. When
/// a shutdown signal is sent to the process, every service will receive a [`Shutdown`] message with
/// an optional timeout. To wait for the entire shutdown sequence including the shutdown timeout
/// instead, use [`finished`](ShutdownHandle::finished). It resolves when the shutdown has
/// completed.
///
/// ## Signals
///
/// By default, the controller watches for process signals and converts them into graceful or
/// immediate shutdown messages. These signals are platform-dependent:
///
/// - Unix: `SIGINT` and `SIGQUIT` trigger an immediate shutdown, `SIGTERM` a graceful one.
/// - Windows: `CTRL-C`, `CTRL-BREAK`, `CTRL-CLOSE` all trigger an immediate shutdown.
///
/// ### Example
///
/// ```
/// use relay_system::{Controller, Service, ServiceRunner, Shutdown, ShutdownMode};
/// use std::time::Duration;
///
/// struct MyService;
///
/// impl Service for MyService {
/// type Interface = ();
///
/// async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
/// let mut shutdown = Controller::shutdown_handle();
/// loop {
/// tokio::select! {
/// shutdown = shutdown.notified() => break, // Handle shutdown here
/// Some(message) = rx.recv() => (), // Process incoming message
/// }
/// }
/// }
/// }
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() {
/// // Start the controller near the beginning of application bootstrap. This allows other
/// // services to register for shutdown messages.
/// Controller::start(Duration::from_millis(10));
///
/// // Start all other services. Controller::shutdown_handle will use the same controller
/// // instance and receives the configured shutdown timeout.
/// let addr = ServiceRunner::new().start(MyService);
///
/// // By triggering a shutdown, all subscribed services will be notified. This happens
/// // automatically when a signal is sent to the process (e.g. SIGINT or SIGTERM).
/// Controller::shutdown(ShutdownMode::Graceful);
///
/// // Wait for the system to shut down before winding down the application.
/// Controller::shutdown_handle().finished().await;
/// }
/// ```
#[derive(Debug)]
pub struct Controller;
impl Controller {
/// Starts a controller that monitors shutdown signals.
#[track_caller]
pub fn start(shutdown_timeout: Duration) {
crate::spawn!(monitor_shutdown(shutdown_timeout));
}
/// Manually initiates the shutdown process of the system.
pub fn shutdown(mode: ShutdownMode) {
let (ref tx, _) = *MANUAL_SHUTDOWN;
tx.send(Some(mode)).ok();
}
/// Returns a [handle](ShutdownHandle) to receive shutdown notifications.
pub fn shutdown_handle() -> ShutdownHandle {
let (_, ref rx) = *SHUTDOWN;
ShutdownHandle(rx.clone())
}
}
#[cfg(unix)]
async fn monitor_shutdown(timeout: Duration) -> io::Result<()> {
use tokio::signal::unix::{signal, SignalKind};
let mut sig_int = signal(SignalKind::interrupt())?;
let mut sig_quit = signal(SignalKind::quit())?;
let mut sig_term = signal(SignalKind::terminate())?;
let (ref tx, _) = *SHUTDOWN;
let mut manual = MANUAL_SHUTDOWN.1.clone();
loop {
let timeout = tokio::select! {
biased;
Some(()) = sig_int.recv() => {
relay_log::info!("SIGINT received, exiting");
None
}
Some(()) = sig_quit.recv() => {
relay_log::info!("SIGQUIT received, exiting");
None
}
Some(()) = sig_term.recv() => {
relay_log::info!("SIGTERM received, stopping in {}s", timeout.as_secs());
Some(timeout)
}
Ok(()) = manual.changed() => match *manual.borrow() {
Some(ShutdownMode::Graceful) => {
relay_log::info!("Graceful shutdown initiated, stopping in {}s", timeout.as_secs());
Some(timeout)
}
Some(ShutdownMode::Immediate) => {
relay_log::info!("Immediate shutdown initiated");
None
},
None => continue,
},
else => break,
};
tx.send(Some(Shutdown { timeout })).ok();
}
Ok(())
}
#[cfg(windows)]
async fn monitor_shutdown(timeout: Duration) -> io::Result<()> {
use tokio::signal::windows::{ctrl_break, ctrl_c, ctrl_close};
let mut ctrl_c = ctrl_c()?;
let mut ctrl_break = ctrl_break()?;
let mut ctrl_close = ctrl_close()?;
let (ref tx, _) = *SHUTDOWN;
let mut manual = MANUAL_SHUTDOWN.1.clone();
loop {
let timeout = tokio::select! {
biased;
Some(()) = ctrl_c.recv() => {
relay_log::info!("CTRL-C received, exiting");
None
}
Some(()) = ctrl_break.recv() => {
relay_log::info!("CTRL-BREAK received, exiting");
None
}
Some(()) = ctrl_close.recv() => {
relay_log::info!("CTRL-CLOSE received, exiting");
None
}
Ok(()) = manual.changed() => match *manual.borrow() {
Some(ShutdownMode::Graceful) => {
relay_log::info!("Graceful shutdown initiated, stopping in {}s", timeout.as_secs());
Some(timeout)
}
Some(ShutdownMode::Immediate) => {
relay_log::info!("Immediate shutdown initiated");
None
},
None => continue,
},
else => break,
};
tx.send(Some(Shutdown { timeout })).ok();
}
Ok(())
}
/*
TODO: Tests disabled since there is no isloation. Should be re-enabled once Controller-instances are
passed into services.
#[cfg(test)]
mod tests {
use tokio::time::Instant;
use super::*;
#[tokio::test]
async fn handle_receives_immediate_shutdown() {
tokio::time::pause();
Controller::start(Duration::from_secs(1));
let mut handle = Controller::shutdown_handle();
Controller::shutdown(ShutdownMode::Immediate);
let shutdown = handle.notified().await;
assert_eq!(shutdown.timeout, None);
}
#[tokio::test]
async fn receives_graceful_shutdown() {
tokio::time::pause();
let timeout = Duration::from_secs(1);
Controller::start(timeout);
let mut handle = Controller::shutdown_handle();
Controller::shutdown(ShutdownMode::Immediate);
let shutdown = handle.notified().await;
assert_eq!(shutdown.timeout, Some(timeout));
}
#[tokio::test]
async fn handle_receives_past_shutdown() {
tokio::time::pause();
Controller::start(Duration::from_secs(1));
Controller::shutdown(ShutdownMode::Immediate);
Controller::shutdown_handle().notified().await;
// should not block
}
#[tokio::test]
async fn handle_waits_for_timeout() {
tokio::time::pause();
let timeout = Duration::from_secs(1);
Controller::start(timeout);
let shutdown = Controller::shutdown_handle();
let start = Instant::now();
Controller::shutdown(ShutdownMode::Graceful);
shutdown.finished().await;
assert_eq!(Instant::now() - start, timeout);
}
#[tokio::test]
async fn finish_exits_early() {
tokio::time::pause();
Controller::start(Duration::from_secs(1));
let shutdown = Controller::shutdown_handle();
let start = Instant::now();
Controller::shutdown(ShutdownMode::Graceful);
Controller::shutdown(ShutdownMode::Immediate);
shutdown.finished().await;
assert_eq!(Instant::now(), start);
}
}
*/