relay_threading/
metrics.rs

1use relay_system::RawMetrics;
2use std::cmp::max;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6/// Metrics for a single thread in an asynchronous pool.
7///
8/// This struct provides a way to track and query metrics specific to an individual thread, such as
9/// the number of futures it has polled. It is designed for safe concurrent access across threads.
10#[derive(Debug, Default)]
11pub(crate) struct ThreadMetrics {
12    /// Number of futures that are currently being polled concurrently by the thread in the pool.
13    pub(crate) active_tasks: AtomicU64,
14    /// Number of tasks that have been successfully driven to completion.
15    ///
16    /// This number will monotonically grow if not reset.
17    pub(crate) finished_tasks: AtomicU64,
18    /// The raw metrics collected by the timed future.
19    pub(crate) raw_metrics: Arc<RawMetrics>,
20}
21
22impl ThreadMetrics {
23    /// Resets metrics that are monotonically increasing.
24    pub fn reset(&self) {
25        self.finished_tasks.store(0, Ordering::Relaxed);
26    }
27}
28
29/// Metrics for the asynchronous pool.
30#[derive(Debug)]
31pub struct AsyncPoolMetrics<'a> {
32    pub(crate) max_tasks: u64,
33    pub(crate) queue_size: u64,
34    pub(crate) threads_metrics: &'a [Arc<ThreadMetrics>],
35}
36
37impl AsyncPoolMetrics<'_> {
38    /// Returns the amount of tasks in the pool's queue.
39    pub fn queue_size(&self) -> u64 {
40        self.queue_size
41    }
42
43    /// Returns the total number of finished tasks since the last poll.
44    pub fn finished_tasks(&self) -> u64 {
45        let total_finished_tasks: u64 = self
46            .threads_metrics
47            .iter()
48            .map(|m| {
49                let finished_tasks = m.finished_tasks.load(Ordering::Relaxed);
50                m.reset();
51
52                finished_tasks
53            })
54            .sum();
55
56        total_finished_tasks
57    }
58
59    /// Returns the utilization metric for the pool.
60    ///
61    /// The cpu utilization is measured as the amount of busy work performed by each thread when polling
62    /// the futures.
63    ///
64    /// A cpu utilization of 100% indicates that the pool has been doing CPU-bound work for the duration
65    /// of the measurement.
66    /// A cpu utilization of 0% indicates that the pool didn't do any CPU-bound work for the duration
67    /// of the measurement.
68    ///
69    /// Note that this metric is collected and updated for each thread when the main future is polled,
70    /// thus if no work is being done, it will not be updated.
71    pub fn cpu_utilization(&self) -> u8 {
72        self.threads_metrics
73            .iter()
74            .map(|m| m.raw_metrics.utilization.load(Ordering::Relaxed))
75            .max()
76            .unwrap_or(100)
77    }
78
79    /// Returns the overall utilization of the pool.
80    ///
81    /// This metric provides a high-level view of how busy the pool is, combining both CPU-bound and
82    /// I/O-bound workloads into a single value. It is intended as a general signal of system load
83    /// and should be preferred when making scaling decisions.
84    ///
85    /// Unlike [`Self::cpu_utilization`] or [`Self::activity`], which reflect specific aspects of pool usage,
86    /// this method captures the maximum pressure observed in either dimension. This makes it more robust
87    /// in edge cases such as:
88    /// - High activity with low CPU utilization (e.g., I/O-bound workloads with many tasks waiting).
89    /// - Low activity with high CPU utilization (e.g., a few threads performing heavy computation).
90    pub fn total_utilization(&self) -> u8 {
91        max(self.cpu_utilization(), self.activity())
92    }
93
94    /// Returns the activity metric for the pool.
95    ///
96    /// The activity is measure as the amount of active tasks in the pool versus the maximum amount
97    /// of tasks that the pool can have active at the same time.
98    ///
99    /// An activity of 100% indicates that the pool is driving the maximum number of tasks that it
100    /// can.
101    /// An activity of 0% indicates that the pool is not driving any tasks.
102    pub fn activity(&self) -> u8 {
103        let total_polled_futures: u64 = self
104            .threads_metrics
105            .iter()
106            .map(|m| m.active_tasks.load(Ordering::Relaxed))
107            .sum();
108
109        ((total_polled_futures as f32 / self.max_tasks as f32).clamp(0.0, 1.0) * 100.0) as u8
110    }
111}