relay_threading/metrics.rs
1use relay_system::RawMetrics;
2use std::cmp::max;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6/// Metrics for a single thread in an asynchronous pool.
7///
8/// This struct provides a way to track and query metrics specific to an individual thread, such as
9/// the number of futures it has polled. It is designed for safe concurrent access across threads.
10#[derive(Debug, Default)]
11pub(crate) struct ThreadMetrics {
12 /// Number of futures that are currently being polled concurrently by the thread in the pool.
13 pub(crate) active_tasks: AtomicU64,
14 /// Number of tasks that have been successfully driven to completion.
15 ///
16 /// This number will monotonically grow if not reset.
17 pub(crate) finished_tasks: AtomicU64,
18 /// The raw metrics collected by the timed future.
19 pub(crate) raw_metrics: Arc<RawMetrics>,
20}
21
22impl ThreadMetrics {
23 /// Resets metrics that are monotonically increasing.
24 pub fn reset(&self) {
25 self.finished_tasks.store(0, Ordering::Relaxed);
26 }
27}
28
29/// Metrics for the asynchronous pool.
30#[derive(Debug)]
31pub struct AsyncPoolMetrics<'a> {
32 pub(crate) max_tasks: u64,
33 pub(crate) queue_size: u64,
34 pub(crate) threads_metrics: &'a [Arc<ThreadMetrics>],
35}
36
37impl AsyncPoolMetrics<'_> {
38 /// Returns the amount of tasks in the pool's queue.
39 pub fn queue_size(&self) -> u64 {
40 self.queue_size
41 }
42
43 /// Returns the total number of finished tasks since the last poll.
44 pub fn finished_tasks(&self) -> u64 {
45 let total_finished_tasks: u64 = self
46 .threads_metrics
47 .iter()
48 .map(|m| {
49 let finished_tasks = m.finished_tasks.load(Ordering::Relaxed);
50 m.reset();
51
52 finished_tasks
53 })
54 .sum();
55
56 total_finished_tasks
57 }
58
59 /// Returns the utilization metric for the pool.
60 ///
61 /// The cpu utilization is measured as the amount of busy work performed by each thread when polling
62 /// the futures.
63 ///
64 /// A cpu utilization of 100% indicates that the pool has been doing CPU-bound work for the duration
65 /// of the measurement.
66 /// A cpu utilization of 0% indicates that the pool didn't do any CPU-bound work for the duration
67 /// of the measurement.
68 ///
69 /// Note that this metric is collected and updated for each thread when the main future is polled,
70 /// thus if no work is being done, it will not be updated.
71 pub fn cpu_utilization(&self) -> u8 {
72 self.threads_metrics
73 .iter()
74 .map(|m| m.raw_metrics.utilization.load(Ordering::Relaxed))
75 .max()
76 .unwrap_or(100)
77 }
78
79 /// Returns the overall utilization of the pool.
80 ///
81 /// This metric provides a high-level view of how busy the pool is, combining both CPU-bound and
82 /// I/O-bound workloads into a single value. It is intended as a general signal of system load
83 /// and should be preferred when making scaling decisions.
84 ///
85 /// Unlike [`Self::cpu_utilization`] or [`Self::activity`], which reflect specific aspects of pool usage,
86 /// this method captures the maximum pressure observed in either dimension. This makes it more robust
87 /// in edge cases such as:
88 /// - High activity with low CPU utilization (e.g., I/O-bound workloads with many tasks waiting).
89 /// - Low activity with high CPU utilization (e.g., a few threads performing heavy computation).
90 pub fn total_utilization(&self) -> u8 {
91 max(self.cpu_utilization(), self.activity())
92 }
93
94 /// Returns the activity metric for the pool.
95 ///
96 /// The activity is measure as the amount of active tasks in the pool versus the maximum amount
97 /// of tasks that the pool can have active at the same time.
98 ///
99 /// An activity of 100% indicates that the pool is driving the maximum number of tasks that it
100 /// can.
101 /// An activity of 0% indicates that the pool is not driving any tasks.
102 pub fn activity(&self) -> u8 {
103 let total_polled_futures: u64 = self
104 .threads_metrics
105 .iter()
106 .map(|m| m.active_tasks.load(Ordering::Relaxed))
107 .sum();
108
109 ((total_polled_futures as f32 / self.max_tasks as f32).clamp(0.0, 1.0) * 100.0) as u8
110 }
111}