relay_system/runtime/
metrics.rs1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::time::Duration;
4
5#[derive(Debug)]
15pub struct RuntimeMetrics {
16 name: &'static str,
17 cb: Arc<TokioCallbackMetrics>,
18 tokio: tokio::runtime::RuntimeMetrics,
19 budget_forced_yield_count: AtomicU64,
21 worker_state: Box<[WorkerState]>,
22}
23
24macro_rules! impl_diff_metric {
25 ($(#[$doc:meta])* $name:ident) => {
26 $(#[$doc])*
27 #[doc = concat!("See also: [`tokio::runtime::RuntimeMetrics::", stringify!($name), "`]")]
29 #[track_caller]
32 pub fn $name(&self) -> u64 {
33 let stat = self.tokio.$name() as u64;
34 let prev = self.$name.swap(stat, Ordering::Relaxed);
35 stat.saturating_sub(prev)
36 }
37 };
38 (worker: $(#[$doc:meta])* $name:ident) => {
39 $(#[$doc])*
40 #[doc = concat!("See also: [`tokio::runtime::RuntimeMetrics::", stringify!($name), "`]")]
42 #[track_caller]
45 pub fn $name(&self, worker: usize) -> u64 {
46 let stat = self.tokio.$name(worker) as u64;
47 let prev = self.worker_state[worker].$name.swap(stat, Ordering::Relaxed);
48 stat.saturating_sub(prev)
49 }
50 };
51}
52
53impl RuntimeMetrics {
54 pub fn name(&self) -> &'static str {
56 self.name
57 }
58
59 pub fn num_idle_threads(&self) -> usize {
61 self.cb.idle_threads.load(Ordering::Relaxed)
62 }
63
64 pub fn num_alive_tasks(&self) -> usize {
68 self.tokio.num_alive_tasks()
69 }
70
71 pub fn blocking_queue_depth(&self) -> usize {
76 self.tokio.blocking_queue_depth()
77 }
78
79 impl_diff_metric!(
80 budget_forced_yield_count
83 );
84
85 pub fn num_blocking_threads(&self) -> usize {
89 self.tokio.num_blocking_threads()
90 }
91
92 pub fn num_idle_blocking_threads(&self) -> usize {
97 self.tokio.num_idle_blocking_threads()
98 }
99
100 pub fn num_workers(&self) -> usize {
104 self.tokio.num_workers()
105 }
106
107 pub fn worker_local_queue_depth(&self, worker: usize) -> usize {
112 self.tokio.worker_local_queue_depth(worker)
113 }
114
115 pub fn worker_mean_poll_time(&self, worker: usize) -> Duration {
119 self.tokio.worker_mean_poll_time(worker)
120 }
121
122 impl_diff_metric!(
123 worker:
124 worker_local_schedule_count
127 );
128 impl_diff_metric!(
129 worker:
130 worker_noop_count
133 );
134 impl_diff_metric!(
135 worker:
136 worker_overflow_count
139 );
140 impl_diff_metric!(
141 worker:
142 worker_park_count
144 );
145 impl_diff_metric!(
146 worker:
147 worker_poll_count
149 );
150 impl_diff_metric!(
151 worker:
152 worker_steal_count
155 );
156 impl_diff_metric!(
157 worker:
158 worker_steal_operations
161 );
162
163 pub fn worker_total_busy_duration(&self, worker: usize) -> Duration {
169 let stat = self.tokio.worker_total_busy_duration(worker).as_secs_f64();
170 let prev = self.worker_state[worker]
171 .worker_total_busy_duration
172 .swap(stat.to_bits(), Ordering::Relaxed);
173 Duration::from_secs_f64(stat - f64::from_bits(prev))
174 }
175}
176
177#[derive(Debug, Default)]
178struct WorkerState {
179 worker_local_schedule_count: AtomicU64,
180 worker_noop_count: AtomicU64,
181 worker_overflow_count: AtomicU64,
182 worker_park_count: AtomicU64,
183 worker_poll_count: AtomicU64,
184 worker_steal_count: AtomicU64,
185 worker_steal_operations: AtomicU64,
186 worker_total_busy_duration: AtomicU64,
187}
188
189impl WorkerState {
190 pub fn for_workers(num: usize) -> Box<[WorkerState]> {
191 let mut v = Vec::with_capacity(num);
192 v.resize_with(num, WorkerState::default);
193 v.into_boxed_slice()
194 }
195}
196
197#[derive(Debug, Default)]
199pub struct TokioCallbackMetrics {
200 idle_threads: AtomicUsize,
201}
202
203impl TokioCallbackMetrics {
204 pub fn register(self: &Arc<Self>, builder: &mut tokio::runtime::Builder) {
205 builder.on_thread_park({
206 let this = Arc::clone(self);
207 move || {
208 this.idle_threads.fetch_add(1, Ordering::Relaxed);
209 }
210 });
211 builder.on_thread_unpark({
212 let this = Arc::clone(self);
213 move || {
214 this.idle_threads.fetch_sub(1, Ordering::Relaxed);
215 }
216 });
217 }
218
219 pub fn into_metrics(
220 self: Arc<Self>,
221 name: &'static str,
222 tokio: tokio::runtime::RuntimeMetrics,
223 ) -> RuntimeMetrics {
224 let workers = tokio.num_workers();
225 RuntimeMetrics {
226 name,
227 cb: self,
228 tokio,
229 budget_forced_yield_count: AtomicU64::new(0),
230 worker_state: WorkerState::for_workers(workers),
231 }
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 #[cfg(target_os = "linux")] use super::*;
239
240 #[test]
241 #[cfg(target_os = "linux")] fn test_metric_diff() {
243 let rt = crate::Runtime::builder("test").worker_threads(1).build();
244
245 let metrics = rt.handle().metrics();
246
247 rt.block_on(async move {
248 let tokio_metrics = tokio::runtime::Handle::current().metrics();
249
250 assert_eq!(metrics.num_workers(), 1);
251 assert_eq!(tokio_metrics.num_workers(), 1);
252
253 assert_eq!(metrics.worker_local_schedule_count(0), 0);
254 assert_eq!(tokio_metrics.worker_local_schedule_count(0), 0);
255
256 crate::spawn!(tokio::time::sleep(Duration::from_millis(1)))
258 .await
259 .unwrap();
260
261 assert_eq!(metrics.worker_local_schedule_count(0), 1);
262 assert_eq!(tokio_metrics.worker_local_schedule_count(0), 1);
263
264 crate::spawn!(tokio::time::sleep(Duration::from_millis(1)))
266 .await
267 .unwrap();
268
269 assert_eq!(metrics.worker_local_schedule_count(0), 1);
271 assert_eq!(tokio_metrics.worker_local_schedule_count(0), 2);
273 });
274 }
275}