relay_system/runtime/
metrics.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

/// Handle to the [`Runtime`](crate::Runtime)'s metrics.
///
/// Unlike [`tokio::runtime::RuntimeMetrics`] this handle returns the relative
/// increase in values since the last time the specific metric was queried.
///
/// This makes it possible to emit these metrics as counters more easily through statsd.
///
/// It also exposes some other runtime metrics which Tokio does not directly
/// expose through the [`tokio::runtime::RuntimeMetrics`] handle.
#[derive(Debug)]
pub struct RuntimeMetrics {
    name: &'static str,
    cb: Arc<TokioCallbackMetrics>,
    tokio: tokio::runtime::RuntimeMetrics,
    // Metric state:
    budget_forced_yield_count: AtomicU64,
    worker_state: Box<[WorkerState]>,
}

macro_rules! impl_diff_metric {
    ($(#[$doc:meta])* $name:ident) => {
        $(#[$doc])*
        ///
        #[doc = concat!("See also: [`tokio::runtime::RuntimeMetrics::", stringify!($name), "`]")]
        ///
        /// Returns the difference since the function was last called.
        #[track_caller]
        pub fn $name(&self) -> u64 {
            let stat = self.tokio.$name() as u64;
            let prev = self.$name.swap(stat, Ordering::Relaxed);
            stat.saturating_sub(prev)
        }
    };
    (worker: $(#[$doc:meta])* $name:ident) => {
        $(#[$doc])*
        ///
        #[doc = concat!("See also: [`tokio::runtime::RuntimeMetrics::", stringify!($name), "`]")]
        ///
        /// Returns the difference since the function was last called.
        #[track_caller]
        pub fn $name(&self, worker: usize) -> u64 {
            let stat = self.tokio.$name(worker) as u64;
            let prev = self.worker_state[worker].$name.swap(stat, Ordering::Relaxed);
            stat.saturating_sub(prev)
        }
    };
}

impl RuntimeMetrics {
    /// Returns the runtime name.
    pub fn name(&self) -> &'static str {
        self.name
    }

    /// Returns the amount of current threads idle.
    pub fn num_idle_threads(&self) -> usize {
        self.cb.idle_threads.load(Ordering::Relaxed)
    }

    /// Returns the amount of currently alive tasks in the runtime.
    ///
    /// See also: [`tokio::runtime::RuntimeMetrics::num_alive_tasks`].
    pub fn num_alive_tasks(&self) -> usize {
        self.tokio.num_alive_tasks()
    }

    /// Returns the number of tasks currently scheduled in the blocking
    /// thread pool, spawned using `spawn_blocking`.
    ///
    /// See also: [`tokio::runtime::RuntimeMetrics::blocking_queue_depth`].
    pub fn blocking_queue_depth(&self) -> usize {
        self.tokio.blocking_queue_depth()
    }

    impl_diff_metric!(
        /// Returns the number of times that tasks have been forced to yield back to the scheduler
        /// after exhausting their task budgets.
        budget_forced_yield_count
    );

    /// Returns the number of additional threads spawned by the runtime.
    ///
    /// See also: [`tokio::runtime::RuntimeMetrics::num_blocking_threads`].
    pub fn num_blocking_threads(&self) -> usize {
        self.tokio.num_blocking_threads()
    }

    /// Returns the number of idle threads, which have spawned by the runtime
    /// for `spawn_blocking` calls.
    ///
    /// See also: [`tokio::runtime::RuntimeMetrics::num_idle_blocking_threads`].
    pub fn num_idle_blocking_threads(&self) -> usize {
        self.tokio.num_idle_blocking_threads()
    }

    /// Returns the number of worker threads used by the runtime.
    ///
    /// See also: [`tokio::runtime::RuntimeMetrics::num_workers`].
    pub fn num_workers(&self) -> usize {
        self.tokio.num_workers()
    }

    /// Returns the number of tasks currently scheduled in the given worker's
    /// local queue.
    ///
    /// See also: [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`].
    pub fn worker_local_queue_depth(&self, worker: usize) -> usize {
        self.tokio.worker_local_queue_depth(worker)
    }

    /// Returns the mean duration of task polls, in nanoseconds.
    ///
    /// See also: [`tokio::runtime::RuntimeMetrics::worker_mean_poll_time`].
    pub fn worker_mean_poll_time(&self, worker: usize) -> Duration {
        self.tokio.worker_mean_poll_time(worker)
    }

    impl_diff_metric!(
        worker:
        /// Returns the number of tasks scheduled from **within** the runtime on the
        /// given worker's local queue.
        worker_local_schedule_count
    );
    impl_diff_metric!(
        worker:
        /// Returns the number of times the given worker thread unparked but
        /// performed no work before parking again.
        worker_noop_count
    );
    impl_diff_metric!(
        worker:
        /// Returns the number of times the given worker thread saturated its local
        /// queue.
        worker_overflow_count
    );
    impl_diff_metric!(
        worker:
        /// Returns the total number of times the given worker thread has parked.
        worker_park_count
    );
    impl_diff_metric!(
        worker:
        /// Returns the number of tasks the given worker thread has polled.
        worker_poll_count
    );
    impl_diff_metric!(
        worker:
        /// Returns the number of tasks the given worker thread stole from
        /// another worker thread.
        worker_steal_count
    );
    impl_diff_metric!(
        worker:
        /// Returns the number of times the given worker thread stole tasks from
        /// another worker thread.
        worker_steal_operations
    );

    /// Returns the amount of time the given worker thread has been busy, in seconds.
    ///
    /// See also: [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`].
    ///
    /// Returns the difference since the function was last called.
    pub fn worker_total_busy_duration(&self, worker: usize) -> Duration {
        let stat = self.tokio.worker_total_busy_duration(worker).as_secs_f64();
        let prev = self.worker_state[worker]
            .worker_total_busy_duration
            .swap(stat.to_bits(), Ordering::Relaxed);
        Duration::from_secs_f64(stat - f64::from_bits(prev))
    }
}

#[derive(Debug, Default)]
struct WorkerState {
    worker_local_schedule_count: AtomicU64,
    worker_noop_count: AtomicU64,
    worker_overflow_count: AtomicU64,
    worker_park_count: AtomicU64,
    worker_poll_count: AtomicU64,
    worker_steal_count: AtomicU64,
    worker_steal_operations: AtomicU64,
    worker_total_busy_duration: AtomicU64,
}

impl WorkerState {
    pub fn for_workers(num: usize) -> Box<[WorkerState]> {
        let mut v = Vec::with_capacity(num);
        v.resize_with(num, WorkerState::default);
        v.into_boxed_slice()
    }
}

/// Keeps track of Tokio's callback metrics.
#[derive(Debug, Default)]
pub struct TokioCallbackMetrics {
    idle_threads: AtomicUsize,
}

impl TokioCallbackMetrics {
    pub fn register(self: &Arc<Self>, builder: &mut tokio::runtime::Builder) {
        builder.on_thread_park({
            let this = Arc::clone(self);
            move || {
                this.idle_threads.fetch_add(1, Ordering::Relaxed);
            }
        });
        builder.on_thread_unpark({
            let this = Arc::clone(self);
            move || {
                this.idle_threads.fetch_sub(1, Ordering::Relaxed);
            }
        });
    }

    pub fn into_metrics(
        self: Arc<Self>,
        name: &'static str,
        tokio: tokio::runtime::RuntimeMetrics,
    ) -> RuntimeMetrics {
        let workers = tokio.num_workers();
        RuntimeMetrics {
            name,
            cb: self,
            tokio,
            budget_forced_yield_count: AtomicU64::new(0),
            worker_state: WorkerState::for_workers(workers),
        }
    }
}

#[cfg(test)]
mod tests {
    #[cfg(target_os = "linux")] // Test relies on Tokio/Platform specific internals.
    use super::*;

    #[test]
    #[cfg(target_os = "linux")] // Test relies on Tokio/Platform specific internals.
    fn test_metric_diff() {
        let rt = crate::Runtime::builder("test").worker_threads(1).build();

        let metrics = rt.metrics();

        rt.block_on(async move {
            let tokio_metrics = tokio::runtime::Handle::current().metrics();

            assert_eq!(metrics.num_workers(), 1);
            assert_eq!(tokio_metrics.num_workers(), 1);

            assert_eq!(metrics.worker_local_schedule_count(0), 0);
            assert_eq!(tokio_metrics.worker_local_schedule_count(0), 0);

            // Increase local worker schedule count by awaiting a timer.
            crate::spawn!(tokio::time::sleep(Duration::from_nanos(10)))
                .await
                .unwrap();

            assert_eq!(metrics.worker_local_schedule_count(0), 1);
            assert_eq!(tokio_metrics.worker_local_schedule_count(0), 1);

            // Increase it again.
            crate::spawn!(tokio::time::sleep(Duration::from_nanos(10)))
                .await
                .unwrap();

            // The difference is `1`.
            assert_eq!(metrics.worker_local_schedule_count(0), 1);
            // The total count is `2`.
            assert_eq!(tokio_metrics.worker_local_schedule_count(0), 2);
        });
    }
}