relay_system/service/
status.rs

1use std::fmt::{self, Debug};
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use tokio::task::JoinHandle;
7
8use futures::future::{FutureExt as _, Shared};
9
10/// The service failed.
11#[derive(Debug)]
12pub struct ServiceError(tokio::task::JoinError);
13
14impl ServiceError {
15    /// Returns true if the error was caused by a panic.
16    pub fn is_panic(&self) -> bool {
17        self.0.is_panic()
18    }
19
20    /// Consumes the error and returns the panic that caused it.
21    ///
22    /// Returns `None` if the error was not caused by a panic.
23    pub fn into_panic(self) -> Option<Box<dyn std::any::Any + Send + 'static>> {
24        self.0.try_into_panic().ok()
25    }
26}
27
28impl fmt::Display for ServiceError {
29    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30        write!(f, "{}", &self.0)
31    }
32}
33
34impl std::error::Error for ServiceError {}
35
36/// An owned handle to await the termination of a service.
37///
38/// This is very similar to a [`std::thread::JoinHandle`] or [`tokio::task::JoinHandle`].
39///
40/// The handle does not need to be awaited or polled for the service to start execution.
41/// On drop, the join handle will detach from the service and the service will continue execution.
42pub struct ServiceJoinHandle {
43    fut: Option<Shared<MapJoinResult>>,
44    error_rx: tokio::sync::oneshot::Receiver<tokio::task::JoinError>,
45    handle: tokio::task::AbortHandle,
46}
47
48impl ServiceJoinHandle {
49    /// Returns `true` if the service has finished.
50    pub fn is_finished(&self) -> bool {
51        self.handle.is_finished()
52    }
53}
54
55impl Future for ServiceJoinHandle {
56    type Output = Result<(), ServiceError>;
57
58    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
59        if let Some(fut) = &mut self.fut {
60            if let Ok(()) = futures::ready!(fut.poll_unpin(cx)) {
61                return Poll::Ready(Ok(()));
62            }
63        }
64        self.fut = None;
65
66        match futures::ready!(self.error_rx.poll_unpin(cx)) {
67            Ok(error) => Poll::Ready(Err(ServiceError(error))),
68            Err(_) => Poll::Ready(Ok(())),
69        }
70    }
71}
72
73/// A [`ServiceError`] without the service error/panic.
74///
75/// It does not contain the original error, just the status.
76#[derive(Debug, Clone)]
77pub struct ServiceStatusError {
78    is_panic: bool,
79}
80
81impl ServiceStatusError {
82    /// Returns true if the error was caused by a panic.
83    pub fn is_panic(&self) -> bool {
84        self.is_panic
85    }
86}
87
88impl fmt::Display for ServiceStatusError {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        match self.is_panic() {
91            true => write!(f, "service panic"),
92            false => write!(f, "service failed"),
93        }
94    }
95}
96
97impl std::error::Error for ServiceStatusError {}
98
99/// A companion handle to [`ServiceJoinHandle`].
100///
101/// The handle can also be awaited and queried for the termination status of a service,
102/// but unlike the [`ServiceJoinHandle`] it only reports an error status and not
103/// the original error/panic.
104///
105/// This handle can also be freely cloned and therefor awaited multiple times.
106#[derive(Debug, Clone)]
107pub struct ServiceStatusJoinHandle {
108    fut: Shared<MapJoinResult>,
109    handle: tokio::task::AbortHandle,
110}
111
112impl ServiceStatusJoinHandle {
113    /// Returns `true` if the service has finished.
114    pub fn is_finished(&self) -> bool {
115        self.handle.is_finished()
116    }
117}
118
119impl Future for ServiceStatusJoinHandle {
120    type Output = Result<(), ServiceStatusError>;
121
122    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
123        self.fut.poll_unpin(cx)
124    }
125}
126
127/// Turns a [`tokio::task::JoinHandle<()>`] from a service task into two separate handles.
128///
129/// Each returned handle can be awaited for the termination of a service
130/// and be queried for early termination synchronously using `is_terminated`,
131/// but only the [`ServiceJoinHandle`] yields the original error/panic.
132pub(crate) fn split(
133    handle: tokio::task::JoinHandle<()>,
134) -> (ServiceStatusJoinHandle, ServiceJoinHandle) {
135    let (tx, rx) = tokio::sync::oneshot::channel();
136
137    let handle1 = handle.abort_handle();
138    let handle2 = handle.abort_handle();
139
140    let shared = MapJoinResult {
141        handle,
142        error: Some(tx),
143    }
144    .shared();
145
146    (
147        ServiceStatusJoinHandle {
148            handle: handle1,
149            fut: shared.clone(),
150        },
151        ServiceJoinHandle {
152            error_rx: rx,
153            handle: handle2,
154            fut: Some(shared),
155        },
156    )
157}
158
159/// Utility future which detaches the error/panic of a [`JoinHandle`].
160#[derive(Debug)]
161struct MapJoinResult {
162    handle: JoinHandle<()>,
163    error: Option<tokio::sync::oneshot::Sender<tokio::task::JoinError>>,
164}
165
166impl Future for MapJoinResult {
167    type Output = Result<(), ServiceStatusError>;
168
169    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
170        let ret = match futures::ready!(self.handle.poll_unpin(cx)) {
171            Ok(()) => Ok(()),
172            Err(error) => {
173                let status = ServiceStatusError {
174                    is_panic: error.is_panic(),
175                };
176
177                let _ = self
178                    .error
179                    .take()
180                    .expect("shared future to not be ready multiple times")
181                    .send(error);
182
183                Err(status)
184            }
185        };
186
187        Poll::Ready(ret)
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194
195    macro_rules! assert_pending {
196        ($fut:expr) => {
197            match &mut $fut {
198                fut => {
199                    for _ in 0..30 {
200                        assert!(matches!(futures::poll!(&mut *fut), Poll::Pending));
201                    }
202                }
203            }
204        };
205    }
206
207    #[tokio::test]
208    async fn test_split_no_error() {
209        let (tx, rx) = tokio::sync::oneshot::channel();
210
211        let (mut status, mut error) = split(crate::spawn!(async move {
212            rx.await.unwrap();
213        }));
214
215        assert_pending!(status);
216        assert_pending!(error);
217
218        assert!(!status.is_finished());
219        assert!(!error.is_finished());
220
221        tx.send(()).unwrap();
222
223        assert!(status.await.is_ok());
224        assert!(error.is_finished());
225        assert!(error.await.is_ok());
226    }
227
228    #[tokio::test]
229    async fn test_split_with_error_await_status_first() {
230        let (tx, rx) = tokio::sync::oneshot::channel();
231
232        let (mut status, mut error) = split(crate::spawn!(async move {
233            rx.await.unwrap();
234            panic!("test panic");
235        }));
236
237        assert_pending!(status);
238        assert_pending!(error);
239
240        assert!(!status.is_finished());
241        assert!(!error.is_finished());
242
243        tx.send(()).unwrap();
244
245        let status = status.await.unwrap_err();
246        assert!(status.is_panic());
247
248        assert!(error.is_finished());
249
250        let error = error.await.unwrap_err();
251        assert!(error.is_panic());
252        assert!(error.into_panic().unwrap().downcast_ref() == Some(&"test panic"));
253    }
254
255    #[tokio::test]
256    async fn test_split_with_error_await_error_first() {
257        let (tx, rx) = tokio::sync::oneshot::channel();
258
259        let (mut status, mut error) = split(crate::spawn!(async move {
260            rx.await.unwrap();
261            panic!("test panic");
262        }));
263
264        assert_pending!(status);
265        assert_pending!(error);
266
267        assert!(!status.is_finished());
268        assert!(!error.is_finished());
269
270        tx.send(()).unwrap();
271
272        let error = error.await.unwrap_err();
273        assert!(error.is_panic());
274        assert!(error.into_panic().unwrap().downcast_ref() == Some(&"test panic"));
275
276        assert!(status.is_finished());
277
278        let status = status.await.unwrap_err();
279        assert!(status.is_panic());
280    }
281}