relay_system/runtime/
metrics.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::time::Duration;
4
5/// Handle to the [`Runtime`](crate::Runtime)'s metrics.
6///
7/// Unlike [`tokio::runtime::RuntimeMetrics`] this handle returns the relative
8/// increase in values since the last time the specific metric was queried.
9///
10/// This makes it possible to emit these metrics as counters more easily through statsd.
11///
12/// It also exposes some other runtime metrics which Tokio does not directly
13/// expose through the [`tokio::runtime::RuntimeMetrics`] handle.
14#[derive(Debug)]
15pub struct RuntimeMetrics {
16    name: &'static str,
17    cb: Arc<TokioCallbackMetrics>,
18    tokio: tokio::runtime::RuntimeMetrics,
19    // Metric state:
20    budget_forced_yield_count: AtomicU64,
21    worker_state: Box<[WorkerState]>,
22}
23
24macro_rules! impl_diff_metric {
25    ($(#[$doc:meta])* $name:ident) => {
26        $(#[$doc])*
27        ///
28        #[doc = concat!("See also: [`tokio::runtime::RuntimeMetrics::", stringify!($name), "`]")]
29        ///
30        /// Returns the difference since the function was last called.
31        #[track_caller]
32        pub fn $name(&self) -> u64 {
33            let stat = self.tokio.$name() as u64;
34            let prev = self.$name.swap(stat, Ordering::Relaxed);
35            stat.saturating_sub(prev)
36        }
37    };
38    (worker: $(#[$doc:meta])* $name:ident) => {
39        $(#[$doc])*
40        ///
41        #[doc = concat!("See also: [`tokio::runtime::RuntimeMetrics::", stringify!($name), "`]")]
42        ///
43        /// Returns the difference since the function was last called.
44        #[track_caller]
45        pub fn $name(&self, worker: usize) -> u64 {
46            let stat = self.tokio.$name(worker) as u64;
47            let prev = self.worker_state[worker].$name.swap(stat, Ordering::Relaxed);
48            stat.saturating_sub(prev)
49        }
50    };
51}
52
53impl RuntimeMetrics {
54    /// Returns the runtime name.
55    pub fn name(&self) -> &'static str {
56        self.name
57    }
58
59    /// Returns the amount of current threads idle.
60    pub fn num_idle_threads(&self) -> usize {
61        self.cb.idle_threads.load(Ordering::Relaxed)
62    }
63
64    /// Returns the amount of currently alive tasks in the runtime.
65    ///
66    /// See also: [`tokio::runtime::RuntimeMetrics::num_alive_tasks`].
67    pub fn num_alive_tasks(&self) -> usize {
68        self.tokio.num_alive_tasks()
69    }
70
71    /// Returns the number of tasks currently scheduled in the blocking
72    /// thread pool, spawned using `spawn_blocking`.
73    ///
74    /// See also: [`tokio::runtime::RuntimeMetrics::blocking_queue_depth`].
75    pub fn blocking_queue_depth(&self) -> usize {
76        self.tokio.blocking_queue_depth()
77    }
78
79    impl_diff_metric!(
80        /// Returns the number of times that tasks have been forced to yield back to the scheduler
81        /// after exhausting their task budgets.
82        budget_forced_yield_count
83    );
84
85    /// Returns the number of additional threads spawned by the runtime.
86    ///
87    /// See also: [`tokio::runtime::RuntimeMetrics::num_blocking_threads`].
88    pub fn num_blocking_threads(&self) -> usize {
89        self.tokio.num_blocking_threads()
90    }
91
92    /// Returns the number of idle threads, which have spawned by the runtime
93    /// for `spawn_blocking` calls.
94    ///
95    /// See also: [`tokio::runtime::RuntimeMetrics::num_idle_blocking_threads`].
96    pub fn num_idle_blocking_threads(&self) -> usize {
97        self.tokio.num_idle_blocking_threads()
98    }
99
100    /// Returns the number of worker threads used by the runtime.
101    ///
102    /// See also: [`tokio::runtime::RuntimeMetrics::num_workers`].
103    pub fn num_workers(&self) -> usize {
104        self.tokio.num_workers()
105    }
106
107    /// Returns the number of tasks currently scheduled in the given worker's
108    /// local queue.
109    ///
110    /// See also: [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`].
111    pub fn worker_local_queue_depth(&self, worker: usize) -> usize {
112        self.tokio.worker_local_queue_depth(worker)
113    }
114
115    /// Returns the mean duration of task polls, in nanoseconds.
116    ///
117    /// See also: [`tokio::runtime::RuntimeMetrics::worker_mean_poll_time`].
118    pub fn worker_mean_poll_time(&self, worker: usize) -> Duration {
119        self.tokio.worker_mean_poll_time(worker)
120    }
121
122    impl_diff_metric!(
123        worker:
124        /// Returns the number of tasks scheduled from **within** the runtime on the
125        /// given worker's local queue.
126        worker_local_schedule_count
127    );
128    impl_diff_metric!(
129        worker:
130        /// Returns the number of times the given worker thread unparked but
131        /// performed no work before parking again.
132        worker_noop_count
133    );
134    impl_diff_metric!(
135        worker:
136        /// Returns the number of times the given worker thread saturated its local
137        /// queue.
138        worker_overflow_count
139    );
140    impl_diff_metric!(
141        worker:
142        /// Returns the total number of times the given worker thread has parked.
143        worker_park_count
144    );
145    impl_diff_metric!(
146        worker:
147        /// Returns the number of tasks the given worker thread has polled.
148        worker_poll_count
149    );
150    impl_diff_metric!(
151        worker:
152        /// Returns the number of tasks the given worker thread stole from
153        /// another worker thread.
154        worker_steal_count
155    );
156    impl_diff_metric!(
157        worker:
158        /// Returns the number of times the given worker thread stole tasks from
159        /// another worker thread.
160        worker_steal_operations
161    );
162
163    /// Returns the amount of time the given worker thread has been busy, in seconds.
164    ///
165    /// See also: [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`].
166    ///
167    /// Returns the difference since the function was last called.
168    pub fn worker_total_busy_duration(&self, worker: usize) -> Duration {
169        let stat = self.tokio.worker_total_busy_duration(worker).as_secs_f64();
170        let prev = self.worker_state[worker]
171            .worker_total_busy_duration
172            .swap(stat.to_bits(), Ordering::Relaxed);
173        Duration::from_secs_f64(stat - f64::from_bits(prev))
174    }
175}
176
177#[derive(Debug, Default)]
178struct WorkerState {
179    worker_local_schedule_count: AtomicU64,
180    worker_noop_count: AtomicU64,
181    worker_overflow_count: AtomicU64,
182    worker_park_count: AtomicU64,
183    worker_poll_count: AtomicU64,
184    worker_steal_count: AtomicU64,
185    worker_steal_operations: AtomicU64,
186    worker_total_busy_duration: AtomicU64,
187}
188
189impl WorkerState {
190    pub fn for_workers(num: usize) -> Box<[WorkerState]> {
191        let mut v = Vec::with_capacity(num);
192        v.resize_with(num, WorkerState::default);
193        v.into_boxed_slice()
194    }
195}
196
197/// Keeps track of Tokio's callback metrics.
198#[derive(Debug, Default)]
199pub struct TokioCallbackMetrics {
200    idle_threads: AtomicUsize,
201}
202
203impl TokioCallbackMetrics {
204    pub fn register(self: &Arc<Self>, builder: &mut tokio::runtime::Builder) {
205        builder.on_thread_park({
206            let this = Arc::clone(self);
207            move || {
208                this.idle_threads.fetch_add(1, Ordering::Relaxed);
209            }
210        });
211        builder.on_thread_unpark({
212            let this = Arc::clone(self);
213            move || {
214                this.idle_threads.fetch_sub(1, Ordering::Relaxed);
215            }
216        });
217    }
218
219    pub fn into_metrics(
220        self: Arc<Self>,
221        name: &'static str,
222        tokio: tokio::runtime::RuntimeMetrics,
223    ) -> RuntimeMetrics {
224        let workers = tokio.num_workers();
225        RuntimeMetrics {
226            name,
227            cb: self,
228            tokio,
229            budget_forced_yield_count: AtomicU64::new(0),
230            worker_state: WorkerState::for_workers(workers),
231        }
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    #[cfg(target_os = "linux")] // Test relies on Tokio/Platform specific internals.
238    use super::*;
239
240    #[test]
241    #[cfg(target_os = "linux")] // Test relies on Tokio/Platform specific internals.
242    fn test_metric_diff() {
243        let rt = crate::Runtime::builder("test").worker_threads(1).build();
244
245        let metrics = rt.handle().metrics();
246
247        rt.block_on(async move {
248            let tokio_metrics = tokio::runtime::Handle::current().metrics();
249
250            assert_eq!(metrics.num_workers(), 1);
251            assert_eq!(tokio_metrics.num_workers(), 1);
252
253            assert_eq!(metrics.worker_local_schedule_count(0), 0);
254            assert_eq!(tokio_metrics.worker_local_schedule_count(0), 0);
255
256            // Increase local worker schedule count by awaiting a timer.
257            crate::spawn!(tokio::time::sleep(Duration::from_millis(1)))
258                .await
259                .unwrap();
260
261            assert_eq!(metrics.worker_local_schedule_count(0), 1);
262            assert_eq!(tokio_metrics.worker_local_schedule_count(0), 1);
263
264            // Increase it again.
265            crate::spawn!(tokio::time::sleep(Duration::from_millis(1)))
266                .await
267                .unwrap();
268
269            // The difference is `1`.
270            assert_eq!(metrics.worker_local_schedule_count(0), 1);
271            // The total count is `2`.
272            assert_eq!(tokio_metrics.worker_local_schedule_count(0), 2);
273        });
274    }
275}