relay_system/service/
registry.rs1use std::collections::BTreeMap;
2use std::sync::atomic::Ordering;
3use std::sync::{Arc, Mutex, PoisonError};
4use std::time::Duration;
5
6use crate::monitor::MonitoredFuture;
7
8use crate::service::status::{ServiceJoinHandle, ServiceStatusJoinHandle};
9use crate::{RawMetrics, ServiceObj, TaskId};
10
11pub struct ServicesMetrics(BTreeMap<ServiceId, ServiceMetrics>);
13
14impl ServicesMetrics {
15 pub fn iter(&self) -> impl Iterator<Item = (ServiceId, ServiceMetrics)> + '_ {
17 self.0.iter().map(|(id, metrics)| (*id, *metrics))
18 }
19}
20
21#[derive(Debug, Clone, Copy)]
23pub struct ServiceMetrics {
24 pub poll_count: u64,
26 pub total_busy_duration: Duration,
34 pub utilization: u8,
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
45pub struct ServiceId {
46 task: TaskId,
47 instance_id: u32,
48}
49
50impl ServiceId {
51 pub fn name(&self) -> &'static str {
53 self.task.id()
54 }
55
56 pub fn instance_id(&self) -> u32 {
60 self.instance_id
61 }
62}
63
64#[derive(Debug)]
65pub(crate) struct Registry {
66 inner: Mutex<Inner>,
67}
68
69impl Registry {
70 pub fn new() -> Self {
71 Self {
72 inner: Mutex::new(Inner {
73 services: Default::default(),
74 }),
75 }
76 }
77
78 pub fn start_in(
79 &self,
80 handle: &tokio::runtime::Handle,
81 service: ServiceObj,
82 ) -> ServiceJoinHandle {
83 let mut inner = self.inner.lock().unwrap_or_else(PoisonError::into_inner);
84 inner.start_in(handle, service)
85 }
86
87 pub fn metrics(&self) -> ServicesMetrics {
88 let inner = self.inner.lock().unwrap_or_else(PoisonError::into_inner);
89 ServicesMetrics(inner.metrics().collect())
90 }
91}
92
93#[derive(Debug)]
94struct Inner {
95 services: BTreeMap<TaskId, ServiceGroup>,
96}
97
98impl Inner {
99 fn start_in(
100 &mut self,
101 handle: &tokio::runtime::Handle,
102 service: ServiceObj,
103 ) -> ServiceJoinHandle {
104 let task_id = TaskId::from(&service);
105 let group = self.services.entry(task_id).or_default();
106
107 let future = tokio::task::unconstrained(service.future);
111 let future = MonitoredFuture::wrap(future);
112 let metrics = Arc::clone(future.metrics());
113
114 let task_handle = crate::runtime::spawn_in(handle, task_id, future);
115 let (status_handle, handle) = crate::service::status::split(task_handle);
116
117 group.add(metrics, status_handle);
118
119 handle
120 }
121
122 fn metrics(&self) -> impl Iterator<Item = (ServiceId, ServiceMetrics)> + '_ {
123 self.services.iter().flat_map(|(task_id, group)| {
124 group.iter().map(|service| {
125 let id = ServiceId {
126 task: *task_id,
127 instance_id: service.instance_id,
128 };
129
130 let metrics = ServiceMetrics {
131 poll_count: service.metrics.poll_count.load(Ordering::Relaxed),
132 total_busy_duration: Duration::from_nanos(
133 service.metrics.total_duration_ns.load(Ordering::Relaxed),
134 ),
135 utilization: service.metrics.utilization.load(Ordering::Relaxed),
136 };
137
138 (id, metrics)
139 })
140 })
141 }
142}
143
144#[derive(Debug, Default)]
154struct ServiceGroup {
155 next_instance_id: u32,
160 instances: Vec<ServiceInstance>,
163}
164
165impl ServiceGroup {
166 pub fn add(&mut self, metrics: Arc<RawMetrics>, handle: ServiceStatusJoinHandle) {
168 self.instances.retain(|s| !s.handle.is_finished());
170
171 let instance_id = self.next_instance_id;
172 self.next_instance_id += 1;
173
174 let service = ServiceInstance {
175 instance_id,
176 metrics,
177 handle,
178 };
179
180 self.instances.push(service);
181 }
182
183 pub fn iter(&self) -> impl Iterator<Item = &ServiceInstance> {
185 self.instances.iter().filter(|s| !s.handle.is_finished())
186 }
187}
188
189#[derive(Debug)]
191struct ServiceInstance {
192 instance_id: u32,
194 metrics: Arc<RawMetrics>,
199 handle: ServiceStatusJoinHandle,
203}