relay_threading/
metrics.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

/// Metrics for a single thread in an asynchronous pool.
///
/// This struct provides a way to track and query metrics specific to an individual thread, such as
/// the number of futures it has polled. It is designed for safe concurrent access across threads.
#[derive(Debug, Default)]
pub(crate) struct ThreadMetrics {
    /// Number of futures that are currently being polled concurrently by the thread in the pool.
    pub(crate) active_tasks: AtomicU64,
    /// Number of tasks that have been successfully driven to completion.
    ///
    /// This number will monotonically grow if not reset.
    pub(crate) finished_tasks: AtomicU64,
}

impl ThreadMetrics {
    /// Resets metrics that are monotonically increasing.
    pub fn reset(&self) {
        self.finished_tasks.store(0, Ordering::Relaxed);
    }
}

/// Metrics for the asynchronous pool.
#[derive(Debug)]
pub struct AsyncPoolMetrics<'a> {
    pub(crate) max_tasks: u64,
    pub(crate) queue_size: u64,
    pub(crate) threads_metrics: &'a [Arc<ThreadMetrics>],
}

impl AsyncPoolMetrics<'_> {
    /// Returns the amount of tasks in the pool's queue.
    pub fn queue_size(&self) -> u64 {
        self.queue_size
    }

    /// Returns the total number of finished tasks since the last poll.
    pub fn finished_tasks(&self) -> u64 {
        let total_finished_tasks: u64 = self
            .threads_metrics
            .iter()
            .map(|m| {
                let finished_tasks = m.finished_tasks.load(Ordering::Relaxed);
                m.reset();

                finished_tasks
            })
            .sum();

        total_finished_tasks
    }

    /// Returns the utilization metric for the pool.
    pub fn utilization(&self) -> f32 {
        let total_polled_futures: u64 = self
            .threads_metrics
            .iter()
            .map(|m| m.active_tasks.load(Ordering::Relaxed))
            .sum();

        (total_polled_futures as f32 / self.max_tasks as f32).clamp(0.0, 1.0) * 100.0
    }
}