relay_system/runtime/
handle.rs

1use std::sync::Arc;
2
3use futures::StreamExt as _;
4use futures::stream::FuturesUnordered;
5
6use crate::runtime::metrics::TokioCallbackMetrics;
7use crate::{RuntimeMetrics, ServiceJoinHandle, ServiceRegistry, ServiceSpawn, ServicesMetrics};
8
9#[derive(Debug)]
10struct HandleInner {
11    name: &'static str,
12    services: ServiceRegistry,
13    tokio: tokio::runtime::Handle,
14    tokio_cb_metrics: Arc<TokioCallbackMetrics>,
15}
16
17/// Handle to the [`Runtime`](crate::Runtime).
18///
19/// The handle is internally reference-counted and can be freely cloned.
20/// A handle can be obtained using the [`Runtime::handle`](crate::Runtime::handle) method.
21#[derive(Debug, Clone)]
22pub struct Handle {
23    inner: Arc<HandleInner>,
24}
25
26impl Handle {
27    pub(crate) fn new(
28        name: &'static str,
29        tokio: tokio::runtime::Handle,
30        tokio_cb_metrics: Arc<TokioCallbackMetrics>,
31    ) -> Self {
32        Self {
33            inner: Arc::new(HandleInner {
34                name,
35                services: ServiceRegistry::new(),
36                tokio,
37                tokio_cb_metrics,
38            }),
39        }
40    }
41
42    /// Returns a new [`RuntimeMetrics`] handle for this runtime.
43    pub fn metrics(&self) -> RuntimeMetrics {
44        Arc::clone(&self.inner.tokio_cb_metrics)
45            .into_metrics(self.inner.name, self.inner.tokio.metrics())
46    }
47
48    /// Returns all service metrics of all currently running services.
49    ///
50    /// Unlike [`Self::metrics`], this is not a handle to the metrics.
51    pub fn current_services_metrics(&self) -> ServicesMetrics {
52        self.inner.services.metrics()
53    }
54
55    /// Returns a new unique [`ServiceSet`] to spawn services and await their termination.
56    pub fn service_set(&self) -> ServiceSet {
57        ServiceSet {
58            handle: Arc::clone(&self.inner),
59            services: Default::default(),
60        }
61    }
62}
63
64impl ServiceSpawn for Handle {
65    fn start_obj(&self, service: crate::ServiceObj) {
66        self.inner.services.start_in(&self.inner.tokio, service);
67    }
68}
69
70/// Spawns and keeps track of running services.
71///
72/// A [`ServiceSet`] can be awaited for the completion of all started services
73/// on this [`ServiceSet`].
74///
75/// Every service started on this [`ServiceSet`] is attached to the [`Handle`]
76/// this set was created from, using [`Handle::service_set`].
77pub struct ServiceSet {
78    handle: Arc<HandleInner>,
79    services: FuturesUnordered<ServiceJoinHandle>,
80}
81
82impl ServiceSet {
83    /// Awaits until all services have finished.
84    ///
85    /// Panics if one of the spawned services has panicked.
86    pub async fn join(&mut self) {
87        while let Some(res) = self.services.next().await {
88            if let Some(panic) = res.err().and_then(|e| e.into_panic()) {
89                // Re-trigger panic to terminate the process:
90                std::panic::resume_unwind(panic);
91            }
92        }
93    }
94}
95
96impl ServiceSpawn for ServiceSet {
97    fn start_obj(&self, service: crate::ServiceObj) {
98        let handle = self.handle.services.start_in(&self.handle.tokio, service);
99        self.services.push(handle);
100    }
101}