relay_system/
monitor.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
5use std::task::{Context, Poll};
6use tokio::time::{Duration, Instant};
7
8/// Minimum interval when utilization is recalculated.
9const UTILIZATION_UPDATE_THRESHOLD: Duration = Duration::from_secs(5);
10
11pin_project_lite::pin_project! {
12    /// A future that tracks metrics.
13    pub struct MonitoredFuture<F> {
14        #[pin]
15        inner: F,
16        metrics: Arc<RawMetrics>,
17        last_utilization_update: Instant,
18        poll_duration_accumulated_ns: u64
19    }
20}
21
22impl<F> MonitoredFuture<F> {
23    /// Wraps a future with the [`MonitoredFuture`].
24    pub fn wrap(inner: F) -> Self {
25        Self::wrap_with_metrics(inner, Arc::new(RawMetrics::default()))
26    }
27
28    /// Wraps a future with the [`MonitoredFuture`].
29    pub fn wrap_with_metrics(inner: F, metrics: Arc<RawMetrics>) -> Self {
30        Self {
31            inner,
32            metrics,
33            // The last time the utilization was updated.
34            last_utilization_update: Instant::now(),
35            // The poll duration that was accumulated across zero or more polls since the last
36            // refresh.
37            poll_duration_accumulated_ns: 0,
38        }
39    }
40
41    /// Provides access to the raw metrics tracked in this monitor.
42    pub fn metrics(&self) -> &Arc<RawMetrics> {
43        &self.metrics
44    }
45}
46
47impl<F> Future for MonitoredFuture<F>
48where
49    F: Future,
50{
51    type Output = F::Output;
52
53    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
54        let poll_start = Instant::now();
55
56        let this = self.project();
57        this.metrics.poll_count.fetch_add(1, Ordering::Relaxed);
58
59        let ret = this.inner.poll(cx);
60
61        let poll_end = Instant::now();
62        let poll_duration = poll_end - poll_start;
63        let poll_duration_ns = poll_duration.as_nanos().try_into().unwrap_or(u64::MAX);
64
65        this.metrics
66            .total_duration_ns
67            .fetch_add(poll_duration_ns, Ordering::Relaxed);
68        *this.poll_duration_accumulated_ns += poll_duration_ns;
69
70        let utilization_duration = poll_end - *this.last_utilization_update;
71        if utilization_duration >= UTILIZATION_UPDATE_THRESHOLD {
72            // The maximum possible time spent busy is the total time between the last measurement
73            // and the current measurement. We can extract a percentage from this.
74            let percentage = (*this.poll_duration_accumulated_ns * 100)
75                .div_ceil(utilization_duration.as_nanos().max(1) as u64);
76            this.metrics
77                .utilization
78                .store(percentage.min(100) as u8, Ordering::Relaxed);
79
80            *this.poll_duration_accumulated_ns = 0;
81            *this.last_utilization_update = poll_end;
82        }
83
84        ret
85    }
86}
87
88/// The raw metrics extracted from a [`MonitoredFuture`].
89///
90/// All access outside the [`MonitoredFuture`] must be *read* only.
91#[derive(Debug, Default)]
92pub struct RawMetrics {
93    /// Amount of times the service was polled.
94    pub poll_count: AtomicU64,
95    /// The total time the service spent in its poll function.
96    pub total_duration_ns: AtomicU64,
97    /// Estimated utilization percentage `[0-100]`
98    pub utilization: AtomicU8,
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104
105    #[tokio::test(start_paused = true)]
106    async fn test_monitor() {
107        let mut monitor = MonitoredFuture::wrap(Box::pin(async {
108            loop {
109                tokio::time::advance(Duration::from_millis(500)).await;
110            }
111        }));
112        let metrics = Arc::clone(monitor.metrics());
113
114        assert_eq!(metrics.poll_count.load(Ordering::Relaxed), 0);
115        let _ = futures::poll!(&mut monitor);
116        assert_eq!(metrics.poll_count.load(Ordering::Relaxed), 1);
117        let _ = futures::poll!(&mut monitor);
118        assert_eq!(metrics.poll_count.load(Ordering::Relaxed), 2);
119        let _ = futures::poll!(&mut monitor);
120        assert_eq!(metrics.poll_count.load(Ordering::Relaxed), 3);
121
122        assert_eq!(metrics.utilization.load(Ordering::Relaxed), 0);
123        assert_eq!(
124            metrics.total_duration_ns.load(Ordering::Relaxed),
125            1500000000
126        );
127
128        // Advance time just enough to perfectly hit the update threshold.
129        tokio::time::advance(UTILIZATION_UPDATE_THRESHOLD - Duration::from_secs(2)).await;
130
131        let _ = futures::poll!(&mut monitor);
132        assert_eq!(metrics.poll_count.load(Ordering::Relaxed), 4);
133        assert_eq!(metrics.utilization.load(Ordering::Relaxed), 40);
134        assert_eq!(
135            metrics.total_duration_ns.load(Ordering::Relaxed),
136            2000000000
137        );
138    }
139}