relay_system/runtime/
handle.rsuse std::sync::Arc;
use futures::stream::FuturesUnordered;
use futures::StreamExt as _;
use crate::runtime::metrics::TokioCallbackMetrics;
use crate::{RuntimeMetrics, ServiceJoinHandle, ServiceRegistry, ServiceSpawn, ServicesMetrics};
#[derive(Debug)]
struct HandleInner {
name: &'static str,
services: ServiceRegistry,
tokio: tokio::runtime::Handle,
tokio_cb_metrics: Arc<TokioCallbackMetrics>,
}
#[derive(Debug, Clone)]
pub struct Handle {
inner: Arc<HandleInner>,
}
impl Handle {
pub(crate) fn new(
name: &'static str,
tokio: tokio::runtime::Handle,
tokio_cb_metrics: Arc<TokioCallbackMetrics>,
) -> Self {
Self {
inner: Arc::new(HandleInner {
name,
services: ServiceRegistry::new(),
tokio,
tokio_cb_metrics,
}),
}
}
pub fn metrics(&self) -> RuntimeMetrics {
Arc::clone(&self.inner.tokio_cb_metrics)
.into_metrics(self.inner.name, self.inner.tokio.metrics())
}
pub fn current_services_metrics(&self) -> ServicesMetrics {
self.inner.services.metrics()
}
pub fn service_set(&self) -> ServiceSet {
ServiceSet {
handle: Arc::clone(&self.inner),
services: Default::default(),
}
}
}
impl ServiceSpawn for Handle {
fn start_obj(&self, service: crate::ServiceObj) {
self.inner.services.start_in(&self.inner.tokio, service);
}
}
pub struct ServiceSet {
handle: Arc<HandleInner>,
services: FuturesUnordered<ServiceJoinHandle>,
}
impl ServiceSet {
pub async fn join(&mut self) {
while let Some(res) = self.services.next().await {
if let Some(panic) = res.err().and_then(|e| e.into_panic()) {
std::panic::resume_unwind(panic);
}
}
}
}
impl ServiceSpawn for ServiceSet {
fn start_obj(&self, service: crate::ServiceObj) {
let handle = self.handle.services.start_in(&self.handle.tokio, service);
self.services.push(handle);
}
}