relay_system/service/registry.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
use std::collections::BTreeMap;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex, PoisonError};
use std::time::Duration;
use crate::service::monitor::{RawMetrics, ServiceMonitor};
use crate::service::status::{ServiceJoinHandle, ServiceStatusJoinHandle};
use crate::{ServiceObj, TaskId};
/// A point in time snapshot of all started services and their [`ServiceMetrics`].
pub struct ServicesMetrics(BTreeMap<ServiceId, ServiceMetrics>);
impl ServicesMetrics {
/// Returns an iterator of all service identifiers and their [`ServiceMetrics`].
pub fn iter(&self) -> impl Iterator<Item = (ServiceId, ServiceMetrics)> + '_ {
self.0.iter().map(|(id, metrics)| (*id, *metrics))
}
}
/// Collected metrics of a single service.
#[derive(Debug, Clone, Copy)]
pub struct ServiceMetrics {
/// Amount of times the service was polled.
pub poll_count: u64,
/// Total amount of time the service was busy.
///
/// The busy duration starts at zero when the service is created and is increased
/// whenever the service is spending time processing work. Using this value can
/// indicate the load of the given service.
///
/// This number is monotonically increasing. It is never decremented or reset to zero.
pub total_busy_duration: Duration,
/// Approximate utilization of the service based on its [`Self::total_busy_duration`].
///
/// This value is a percentage in the range from `[0-100]` and recomputed periodically.
///
/// The measure is only updated when the when the service is polled. A service which
/// spends a long time idle may not have this measure updated for a long time.
pub utilization: u8,
}
/// A per runtime unique identifier for a started service.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct ServiceId {
task: TaskId,
instance_id: u32,
}
impl ServiceId {
/// Returns the name of the service.
pub fn name(&self) -> &'static str {
self.task.id()
}
/// Returns a for this service unique instance id.
///
/// The combination of [`Self::name`] and [`Self::instance_id`] is unique for each runtime.
pub fn instance_id(&self) -> u32 {
self.instance_id
}
}
#[derive(Debug)]
pub(crate) struct Registry {
inner: Mutex<Inner>,
}
impl Registry {
pub fn new() -> Self {
Self {
inner: Mutex::new(Inner {
services: Default::default(),
}),
}
}
pub fn start_in(
&self,
handle: &tokio::runtime::Handle,
service: ServiceObj,
) -> ServiceJoinHandle {
let mut inner = self.inner.lock().unwrap_or_else(PoisonError::into_inner);
inner.start_in(handle, service)
}
pub fn metrics(&self) -> ServicesMetrics {
let inner = self.inner.lock().unwrap_or_else(PoisonError::into_inner);
ServicesMetrics(inner.metrics().collect())
}
}
#[derive(Debug)]
struct Inner {
services: BTreeMap<TaskId, ServiceGroup>,
}
impl Inner {
fn start_in(
&mut self,
handle: &tokio::runtime::Handle,
service: ServiceObj,
) -> ServiceJoinHandle {
let task_id = TaskId::from(&service);
let group = self.services.entry(task_id).or_default();
// Services are allowed to process as much work as possible before yielding to other,
// lower priority tasks. We want to prioritize service backlogs over creating more work
// for these services.
let future = tokio::task::unconstrained(service.future);
let future = ServiceMonitor::wrap(future);
let metrics = Arc::clone(future.metrics());
let task_handle = crate::runtime::spawn_in(handle, task_id, future);
let (status_handle, handle) = crate::service::status::split(task_handle);
group.add(metrics, status_handle);
handle
}
fn metrics(&self) -> impl Iterator<Item = (ServiceId, ServiceMetrics)> + '_ {
self.services.iter().flat_map(|(task_id, group)| {
group.iter().map(|service| {
let id = ServiceId {
task: *task_id,
instance_id: service.instance_id,
};
let metrics = ServiceMetrics {
poll_count: service.metrics.poll_count.load(Ordering::Relaxed),
total_busy_duration: Duration::from_nanos(
service.metrics.total_duration_ns.load(Ordering::Relaxed),
),
utilization: service.metrics.utilization.load(Ordering::Relaxed),
};
(id, metrics)
})
})
}
}
/// Logical grouping for all service instances of the same service.
///
/// A single service can be started multiple times, each individual
/// instance of a specific service is tracked in this group.
///
/// The group keeps track of a unique per service identifier,
/// which stays unique for the duration of the runtime.
///
/// It also holds a list of all currently alive service instances.
#[derive(Debug, Default)]
struct ServiceGroup {
/// Next unique per-service id.
///
/// The next instance started for this group will be assigned the id
/// and the id is incremented in preparation for the following instance.
next_instance_id: u32,
/// All currently alive service instances or instances that have stopped
/// but are not yet remove from the list.
instances: Vec<ServiceInstance>,
}
impl ServiceGroup {
/// Adds a started service to the service group.
pub fn add(&mut self, metrics: Arc<RawMetrics>, handle: ServiceStatusJoinHandle) {
// Cleanup the group, evicting all finished services, while we're at it.
self.instances.retain(|s| !s.handle.is_finished());
let instance_id = self.next_instance_id;
self.next_instance_id += 1;
let service = ServiceInstance {
instance_id,
metrics,
handle,
};
self.instances.push(service);
}
/// Returns an iterator over all currently alive services.
pub fn iter(&self) -> impl Iterator<Item = &ServiceInstance> {
self.instances.iter().filter(|s| !s.handle.is_finished())
}
}
/// Collection of metadata the registry tracks per service instance.
#[derive(Debug)]
struct ServiceInstance {
/// The per service group unique id for this instance.
instance_id: u32,
/// A raw handle for all metrics tracked for this instance.
///
/// The handle gives raw access to all tracked metrics, these metrics
/// should be treated as **read-only**.
metrics: Arc<RawMetrics>,
/// A handle to the service instance.
///
/// The handle has information about the completion status of the service.
handle: ServiceStatusJoinHandle,
}