relay_system/service/
status.rs1use std::fmt::{self, Debug};
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use tokio::task::JoinHandle;
7
8use futures::future::{FutureExt as _, Shared};
9
10#[derive(Debug)]
12pub struct ServiceError(tokio::task::JoinError);
13
14impl ServiceError {
15 pub fn is_panic(&self) -> bool {
17 self.0.is_panic()
18 }
19
20 pub fn into_panic(self) -> Option<Box<dyn std::any::Any + Send + 'static>> {
24 self.0.try_into_panic().ok()
25 }
26}
27
28impl fmt::Display for ServiceError {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 write!(f, "{}", &self.0)
31 }
32}
33
34impl std::error::Error for ServiceError {}
35
36pub struct ServiceJoinHandle {
43 fut: Option<Shared<MapJoinResult>>,
44 error_rx: tokio::sync::oneshot::Receiver<tokio::task::JoinError>,
45 handle: tokio::task::AbortHandle,
46}
47
48impl ServiceJoinHandle {
49 pub fn is_finished(&self) -> bool {
51 self.handle.is_finished()
52 }
53}
54
55impl Future for ServiceJoinHandle {
56 type Output = Result<(), ServiceError>;
57
58 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
59 if let Some(fut) = &mut self.fut {
60 if let Ok(()) = futures::ready!(fut.poll_unpin(cx)) {
61 return Poll::Ready(Ok(()));
62 }
63 }
64 self.fut = None;
65
66 match futures::ready!(self.error_rx.poll_unpin(cx)) {
67 Ok(error) => Poll::Ready(Err(ServiceError(error))),
68 Err(_) => Poll::Ready(Ok(())),
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
77pub struct ServiceStatusError {
78 is_panic: bool,
79}
80
81impl ServiceStatusError {
82 pub fn is_panic(&self) -> bool {
84 self.is_panic
85 }
86}
87
88impl fmt::Display for ServiceStatusError {
89 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90 match self.is_panic() {
91 true => write!(f, "service panic"),
92 false => write!(f, "service failed"),
93 }
94 }
95}
96
97impl std::error::Error for ServiceStatusError {}
98
99#[derive(Debug, Clone)]
107pub struct ServiceStatusJoinHandle {
108 fut: Shared<MapJoinResult>,
109 handle: tokio::task::AbortHandle,
110}
111
112impl ServiceStatusJoinHandle {
113 pub fn is_finished(&self) -> bool {
115 self.handle.is_finished()
116 }
117}
118
119impl Future for ServiceStatusJoinHandle {
120 type Output = Result<(), ServiceStatusError>;
121
122 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
123 self.fut.poll_unpin(cx)
124 }
125}
126
127pub(crate) fn split(
133 handle: tokio::task::JoinHandle<()>,
134) -> (ServiceStatusJoinHandle, ServiceJoinHandle) {
135 let (tx, rx) = tokio::sync::oneshot::channel();
136
137 let handle1 = handle.abort_handle();
138 let handle2 = handle.abort_handle();
139
140 let shared = MapJoinResult {
141 handle,
142 error: Some(tx),
143 }
144 .shared();
145
146 (
147 ServiceStatusJoinHandle {
148 handle: handle1,
149 fut: shared.clone(),
150 },
151 ServiceJoinHandle {
152 error_rx: rx,
153 handle: handle2,
154 fut: Some(shared),
155 },
156 )
157}
158
159#[derive(Debug)]
161struct MapJoinResult {
162 handle: JoinHandle<()>,
163 error: Option<tokio::sync::oneshot::Sender<tokio::task::JoinError>>,
164}
165
166impl Future for MapJoinResult {
167 type Output = Result<(), ServiceStatusError>;
168
169 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
170 let ret = match futures::ready!(self.handle.poll_unpin(cx)) {
171 Ok(()) => Ok(()),
172 Err(error) => {
173 let status = ServiceStatusError {
174 is_panic: error.is_panic(),
175 };
176
177 let _ = self
178 .error
179 .take()
180 .expect("shared future to not be ready multiple times")
181 .send(error);
182
183 Err(status)
184 }
185 };
186
187 Poll::Ready(ret)
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194
195 macro_rules! assert_pending {
196 ($fut:expr) => {
197 match &mut $fut {
198 fut => {
199 for _ in 0..30 {
200 assert!(matches!(futures::poll!(&mut *fut), Poll::Pending));
201 }
202 }
203 }
204 };
205 }
206
207 #[tokio::test]
208 async fn test_split_no_error() {
209 let (tx, rx) = tokio::sync::oneshot::channel();
210
211 let (mut status, mut error) = split(crate::spawn!(async move {
212 rx.await.unwrap();
213 }));
214
215 assert_pending!(status);
216 assert_pending!(error);
217
218 assert!(!status.is_finished());
219 assert!(!error.is_finished());
220
221 tx.send(()).unwrap();
222
223 assert!(status.await.is_ok());
224 assert!(error.is_finished());
225 assert!(error.await.is_ok());
226 }
227
228 #[tokio::test]
229 async fn test_split_with_error_await_status_first() {
230 let (tx, rx) = tokio::sync::oneshot::channel();
231
232 let (mut status, mut error) = split(crate::spawn!(async move {
233 rx.await.unwrap();
234 panic!("test panic");
235 }));
236
237 assert_pending!(status);
238 assert_pending!(error);
239
240 assert!(!status.is_finished());
241 assert!(!error.is_finished());
242
243 tx.send(()).unwrap();
244
245 let status = status.await.unwrap_err();
246 assert!(status.is_panic());
247
248 assert!(error.is_finished());
249
250 let error = error.await.unwrap_err();
251 assert!(error.is_panic());
252 assert!(error.into_panic().unwrap().downcast_ref() == Some(&"test panic"));
253 }
254
255 #[tokio::test]
256 async fn test_split_with_error_await_error_first() {
257 let (tx, rx) = tokio::sync::oneshot::channel();
258
259 let (mut status, mut error) = split(crate::spawn!(async move {
260 rx.await.unwrap();
261 panic!("test panic");
262 }));
263
264 assert_pending!(status);
265 assert_pending!(error);
266
267 assert!(!status.is_finished());
268 assert!(!error.is_finished());
269
270 tx.send(()).unwrap();
271
272 let error = error.await.unwrap_err();
273 assert!(error.is_panic());
274 assert!(error.into_panic().unwrap().downcast_ref() == Some(&"test panic"));
275
276 assert!(status.is_finished());
277
278 let status = status.await.unwrap_err();
279 assert!(status.is_panic());
280 }
281}