1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
5use std::task::{Context, Poll};
6use tokio::time::{Duration, Instant};
7
8const UTILIZATION_UPDATE_THRESHOLD: Duration = Duration::from_secs(5);
10
11pin_project_lite::pin_project! {
12 pub struct MonitoredFuture<F> {
14 #[pin]
15 inner: F,
16 metrics: Arc<RawMetrics>,
17 last_utilization_update: Instant,
18 poll_duration_accumulated_ns: u64
19 }
20}
21
22impl<F> MonitoredFuture<F> {
23 pub fn wrap(inner: F) -> Self {
25 Self::wrap_with_metrics(inner, Arc::new(RawMetrics::default()))
26 }
27
28 pub fn wrap_with_metrics(inner: F, metrics: Arc<RawMetrics>) -> Self {
30 Self {
31 inner,
32 metrics,
33 last_utilization_update: Instant::now(),
35 poll_duration_accumulated_ns: 0,
38 }
39 }
40
41 pub fn metrics(&self) -> &Arc<RawMetrics> {
43 &self.metrics
44 }
45}
46
47impl<F> Future for MonitoredFuture<F>
48where
49 F: Future,
50{
51 type Output = F::Output;
52
53 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
54 let poll_start = Instant::now();
55
56 let this = self.project();
57 this.metrics.poll_count.fetch_add(1, Ordering::Relaxed);
58
59 let ret = this.inner.poll(cx);
60
61 let poll_end = Instant::now();
62 let poll_duration = poll_end - poll_start;
63 let poll_duration_ns = poll_duration.as_nanos().try_into().unwrap_or(u64::MAX);
64
65 this.metrics
66 .total_duration_ns
67 .fetch_add(poll_duration_ns, Ordering::Relaxed);
68 *this.poll_duration_accumulated_ns += poll_duration_ns;
69
70 let utilization_duration = poll_end - *this.last_utilization_update;
71 if utilization_duration >= UTILIZATION_UPDATE_THRESHOLD {
72 let percentage = (*this.poll_duration_accumulated_ns * 100)
75 .div_ceil(utilization_duration.as_nanos().max(1) as u64);
76 this.metrics
77 .utilization
78 .store(percentage.min(100) as u8, Ordering::Relaxed);
79
80 *this.poll_duration_accumulated_ns = 0;
81 *this.last_utilization_update = poll_end;
82 }
83
84 ret
85 }
86}
87
88#[derive(Debug, Default)]
92pub struct RawMetrics {
93 pub poll_count: AtomicU64,
95 pub total_duration_ns: AtomicU64,
97 pub utilization: AtomicU8,
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104
105 #[tokio::test(start_paused = true)]
106 async fn test_monitor() {
107 let mut monitor = MonitoredFuture::wrap(Box::pin(async {
108 loop {
109 tokio::time::advance(Duration::from_millis(500)).await;
110 }
111 }));
112 let metrics = Arc::clone(monitor.metrics());
113
114 assert_eq!(metrics.poll_count.load(Ordering::Relaxed), 0);
115 let _ = futures::poll!(&mut monitor);
116 assert_eq!(metrics.poll_count.load(Ordering::Relaxed), 1);
117 let _ = futures::poll!(&mut monitor);
118 assert_eq!(metrics.poll_count.load(Ordering::Relaxed), 2);
119 let _ = futures::poll!(&mut monitor);
120 assert_eq!(metrics.poll_count.load(Ordering::Relaxed), 3);
121
122 assert_eq!(metrics.utilization.load(Ordering::Relaxed), 0);
123 assert_eq!(
124 metrics.total_duration_ns.load(Ordering::Relaxed),
125 1500000000
126 );
127
128 tokio::time::advance(UTILIZATION_UPDATE_THRESHOLD - Duration::from_secs(2)).await;
130
131 let _ = futures::poll!(&mut monitor);
132 assert_eq!(metrics.poll_count.load(Ordering::Relaxed), 4);
133 assert_eq!(metrics.utilization.load(Ordering::Relaxed), 40);
134 assert_eq!(
135 metrics.total_duration_ns.load(Ordering::Relaxed),
136 2000000000
137 );
138 }
139}