relay_system/runtime/
handle.rs1use std::sync::Arc;
2
3use futures::StreamExt as _;
4use futures::stream::FuturesUnordered;
5
6use crate::runtime::metrics::TokioCallbackMetrics;
7use crate::{RuntimeMetrics, ServiceJoinHandle, ServiceRegistry, ServiceSpawn, ServicesMetrics};
8
9#[derive(Debug)]
10struct HandleInner {
11 name: &'static str,
12 services: ServiceRegistry,
13 tokio: tokio::runtime::Handle,
14 tokio_cb_metrics: Arc<TokioCallbackMetrics>,
15}
16
17#[derive(Debug, Clone)]
22pub struct Handle {
23 inner: Arc<HandleInner>,
24}
25
26impl Handle {
27 pub(crate) fn new(
28 name: &'static str,
29 tokio: tokio::runtime::Handle,
30 tokio_cb_metrics: Arc<TokioCallbackMetrics>,
31 ) -> Self {
32 Self {
33 inner: Arc::new(HandleInner {
34 name,
35 services: ServiceRegistry::new(),
36 tokio,
37 tokio_cb_metrics,
38 }),
39 }
40 }
41
42 pub fn metrics(&self) -> RuntimeMetrics {
44 Arc::clone(&self.inner.tokio_cb_metrics)
45 .into_metrics(self.inner.name, self.inner.tokio.metrics())
46 }
47
48 pub fn current_services_metrics(&self) -> ServicesMetrics {
52 self.inner.services.metrics()
53 }
54
55 pub fn service_set(&self) -> ServiceSet {
57 ServiceSet {
58 handle: Arc::clone(&self.inner),
59 services: Default::default(),
60 }
61 }
62}
63
64impl ServiceSpawn for Handle {
65 fn start_obj(&self, service: crate::ServiceObj) {
66 self.inner.services.start_in(&self.inner.tokio, service);
67 }
68}
69
70pub struct ServiceSet {
78 handle: Arc<HandleInner>,
79 services: FuturesUnordered<ServiceJoinHandle>,
80}
81
82impl ServiceSet {
83 pub async fn join(&mut self) {
87 while let Some(res) = self.services.next().await {
88 if let Some(panic) = res.err().and_then(|e| e.into_panic()) {
89 std::panic::resume_unwind(panic);
91 }
92 }
93 }
94}
95
96impl ServiceSpawn for ServiceSet {
97 fn start_obj(&self, service: crate::ServiceObj) {
98 let handle = self.handle.services.start_in(&self.handle.tokio, service);
99 self.services.push(handle);
100 }
101}