relay_system/service/
registry.rs

1use std::collections::BTreeMap;
2use std::sync::atomic::Ordering;
3use std::sync::{Arc, Mutex, PoisonError};
4use std::time::Duration;
5
6use crate::monitor::MonitoredFuture;
7
8use crate::service::status::{ServiceJoinHandle, ServiceStatusJoinHandle};
9use crate::{RawMetrics, ServiceObj, TaskId};
10
11/// A point in time snapshot of all started services and their [`ServiceMetrics`].
12pub struct ServicesMetrics(BTreeMap<ServiceId, ServiceMetrics>);
13
14impl ServicesMetrics {
15    /// Returns an iterator of all service identifiers and their [`ServiceMetrics`].
16    pub fn iter(&self) -> impl Iterator<Item = (ServiceId, ServiceMetrics)> + '_ {
17        self.0.iter().map(|(id, metrics)| (*id, *metrics))
18    }
19}
20
21/// Collected metrics of a single service.
22#[derive(Debug, Clone, Copy)]
23pub struct ServiceMetrics {
24    /// Amount of times the service was polled.
25    pub poll_count: u64,
26    /// Total amount of time the service was busy.
27    ///
28    /// The busy duration starts at zero when the service is created and is increased
29    /// whenever the service is spending time processing work. Using this value can
30    /// indicate the load of the given service.
31    ///
32    /// This number is monotonically increasing. It is never decremented or reset to zero.
33    pub total_busy_duration: Duration,
34    /// Approximate utilization of the service based on its [`Self::total_busy_duration`].
35    ///
36    /// This value is a percentage in the range from `[0-100]` and recomputed periodically.
37    ///
38    /// The measure is only updated when the service is polled. A service which
39    /// spends a long time idle may not have this measure updated for a long time.
40    pub utilization: u8,
41}
42
43/// A per runtime unique identifier for a started service.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
45pub struct ServiceId {
46    task: TaskId,
47    instance_id: u32,
48}
49
50impl ServiceId {
51    /// Returns the name of the service.
52    pub fn name(&self) -> &'static str {
53        self.task.id()
54    }
55
56    /// Returns a for this service unique instance id.
57    ///
58    /// The combination of [`Self::name`] and [`Self::instance_id`] is unique for each runtime.
59    pub fn instance_id(&self) -> u32 {
60        self.instance_id
61    }
62}
63
64#[derive(Debug)]
65pub(crate) struct Registry {
66    inner: Mutex<Inner>,
67}
68
69impl Registry {
70    pub fn new() -> Self {
71        Self {
72            inner: Mutex::new(Inner {
73                services: Default::default(),
74            }),
75        }
76    }
77
78    pub fn start_in(
79        &self,
80        handle: &tokio::runtime::Handle,
81        service: ServiceObj,
82    ) -> ServiceJoinHandle {
83        let mut inner = self.inner.lock().unwrap_or_else(PoisonError::into_inner);
84        inner.start_in(handle, service)
85    }
86
87    pub fn metrics(&self) -> ServicesMetrics {
88        let inner = self.inner.lock().unwrap_or_else(PoisonError::into_inner);
89        ServicesMetrics(inner.metrics().collect())
90    }
91}
92
93#[derive(Debug)]
94struct Inner {
95    services: BTreeMap<TaskId, ServiceGroup>,
96}
97
98impl Inner {
99    fn start_in(
100        &mut self,
101        handle: &tokio::runtime::Handle,
102        service: ServiceObj,
103    ) -> ServiceJoinHandle {
104        let task_id = TaskId::from(&service);
105        let group = self.services.entry(task_id).or_default();
106
107        // Services are allowed to process as much work as possible before yielding to other,
108        // lower priority tasks. We want to prioritize service backlogs over creating more work
109        // for these services.
110        let future = tokio::task::unconstrained(service.future);
111        let future = MonitoredFuture::wrap(future);
112        let metrics = Arc::clone(future.metrics());
113
114        let task_handle = crate::runtime::spawn_in(handle, task_id, future);
115        let (status_handle, handle) = crate::service::status::split(task_handle);
116
117        group.add(metrics, status_handle);
118
119        handle
120    }
121
122    fn metrics(&self) -> impl Iterator<Item = (ServiceId, ServiceMetrics)> + '_ {
123        self.services.iter().flat_map(|(task_id, group)| {
124            group.iter().map(|service| {
125                let id = ServiceId {
126                    task: *task_id,
127                    instance_id: service.instance_id,
128                };
129
130                let metrics = ServiceMetrics {
131                    poll_count: service.metrics.poll_count.load(Ordering::Relaxed),
132                    total_busy_duration: Duration::from_nanos(
133                        service.metrics.total_duration_ns.load(Ordering::Relaxed),
134                    ),
135                    utilization: service.metrics.utilization.load(Ordering::Relaxed),
136                };
137
138                (id, metrics)
139            })
140        })
141    }
142}
143
144/// Logical grouping for all service instances of the same service.
145///
146/// A single service can be started multiple times, each individual
147/// instance of a specific service is tracked in this group.
148///
149/// The group keeps track of a unique per service identifier,
150/// which stays unique for the duration of the runtime.
151///
152/// It also holds a list of all currently alive service instances.
153#[derive(Debug, Default)]
154struct ServiceGroup {
155    /// Next unique per-service id.
156    ///
157    /// The next instance started for this group will be assigned the id
158    /// and the id is incremented in preparation for the following instance.
159    next_instance_id: u32,
160    /// All currently alive service instances or instances that have stopped
161    /// but are not yet remove from the list.
162    instances: Vec<ServiceInstance>,
163}
164
165impl ServiceGroup {
166    /// Adds a started service to the service group.
167    pub fn add(&mut self, metrics: Arc<RawMetrics>, handle: ServiceStatusJoinHandle) {
168        // Cleanup the group, evicting all finished services, while we're at it.
169        self.instances.retain(|s| !s.handle.is_finished());
170
171        let instance_id = self.next_instance_id;
172        self.next_instance_id += 1;
173
174        let service = ServiceInstance {
175            instance_id,
176            metrics,
177            handle,
178        };
179
180        self.instances.push(service);
181    }
182
183    /// Returns an iterator over all currently alive services.
184    pub fn iter(&self) -> impl Iterator<Item = &ServiceInstance> {
185        self.instances.iter().filter(|s| !s.handle.is_finished())
186    }
187}
188
189/// Collection of metadata the registry tracks per service instance.
190#[derive(Debug)]
191struct ServiceInstance {
192    /// The per-service group unique id for this instance.
193    instance_id: u32,
194    /// A raw handle for all metrics tracked for this instance.
195    ///
196    /// The handle gives raw access to all tracked metrics, these metrics
197    /// should be treated as **read-only**.
198    metrics: Arc<RawMetrics>,
199    /// A handle to the service instance.
200    ///
201    /// The handle has information about the completion status of the service.
202    handle: ServiceStatusJoinHandle,
203}