relay_server/utils/scheduled/
futures.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use crate::utils::ScheduledQueue;
8use futures::{
9    Stream, StreamExt,
10    stream::{FusedStream, FuturesUnordered},
11};
12use tokio::time::Instant;
13
14/// A set of tasks/futures that can be scheduled for execution.
15#[derive(Debug)]
16#[must_use = "streams do nothing unless polled"]
17pub struct FuturesScheduled<T> {
18    queue: ScheduledQueue<T>,
19    tasks: FuturesUnordered<T>,
20}
21
22impl<T> FuturesScheduled<T> {
23    /// Creates a new, empty [`FuturesScheduled`].
24    pub fn new() -> Self {
25        Self {
26            queue: ScheduledQueue::new(),
27            tasks: FuturesUnordered::new(),
28        }
29    }
30
31    /// Returns the total amount of scheduled and active tasks.
32    pub fn len(&self) -> usize {
33        self.queue.len() + self.tasks.len()
34    }
35
36    /// Schedules a new `task`.
37    ///
38    /// A `None` `when` value indicates the task should be scheduled immediately.
39    pub fn schedule(&mut self, when: Option<Instant>, task: T) {
40        match when {
41            Some(when) => self.queue.schedule(when, task),
42            None => self.tasks.push(task),
43        }
44    }
45}
46
47impl<T> Default for FuturesScheduled<T> {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53impl<T> Unpin for FuturesScheduled<T> {}
54
55impl<T> Stream for FuturesScheduled<T>
56where
57    T: Future,
58{
59    type Item = T::Output;
60
61    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62        // Queue is fused, but also never terminating, it is okay to match on `Some` directly here.
63        if let Poll::Ready(Some(next)) = self.queue.poll_next_unpin(cx) {
64            self.tasks.push(next);
65        }
66
67        match self.tasks.poll_next_unpin(cx) {
68            // An item is ready, yield it.
69            Poll::Ready(Some(next)) => Poll::Ready(Some(next)),
70            // There are no more tasks in the queue, this is now pending waiting for another task.
71            // It is fine to just remember this as pending, `FuturesUnordered` is fused.
72            Poll::Ready(None) => Poll::Pending,
73            // No task ready, keep waiting.
74            Poll::Pending => Poll::Pending,
75        }
76    }
77}
78
79impl<T> FusedStream for FuturesScheduled<T>
80where
81    T: Future,
82{
83    fn is_terminated(&self) -> bool {
84        // The stream never returns `Poll::Ready(None)`.
85        false
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use std::time::Duration;
92
93    use futures::future::{Ready, ready};
94
95    use super::*;
96
97    #[tokio::test]
98    async fn test_stask_empty() {
99        let mut f = FuturesScheduled::<Ready<()>>::new();
100
101        assert_eq!(f.len(), 0);
102        let mut next = f.next();
103        for _ in 0..10 {
104            assert_eq!(futures::poll!(&mut next), Poll::Pending);
105        }
106        assert_eq!(f.len(), 0);
107    }
108
109    #[tokio::test]
110    async fn test_stask_immediate_task() {
111        let mut f = FuturesScheduled::new();
112
113        f.schedule(None, ready(()));
114        assert_eq!(f.len(), 1);
115
116        let mut next = f.next();
117        assert_eq!(futures::poll!(&mut next), Poll::Ready(Some(())));
118        assert_eq!(f.len(), 0);
119    }
120
121    #[tokio::test(start_paused = true)]
122    async fn test_stask_scheduled_task() {
123        let mut f = FuturesScheduled::new();
124
125        f.schedule(Some(Instant::now() + Duration::from_secs(3)), ready(()));
126        assert_eq!(f.len(), 1);
127
128        let mut next = f.next();
129        assert_eq!(futures::poll!(&mut next), Poll::Pending);
130        tokio::time::sleep(Duration::from_millis(2800)).await;
131        assert_eq!(futures::poll!(&mut next), Poll::Pending);
132        tokio::time::sleep(Duration::from_millis(201)).await;
133        assert_eq!(futures::poll!(&mut next), Poll::Ready(Some(())));
134
135        assert_eq!(f.len(), 0);
136    }
137
138    #[tokio::test(start_paused = true)]
139    async fn test_stask_scheduled_task_next_cancelled() {
140        let mut f = FuturesScheduled::new();
141
142        f.schedule(Some(Instant::now() + Duration::from_secs(3)), ready(()));
143        assert_eq!(f.len(), 1);
144
145        let mut next = f.next();
146        assert_eq!(futures::poll!(&mut next), Poll::Pending);
147        tokio::time::sleep(Duration::from_millis(2800)).await;
148        assert_eq!(futures::poll!(&mut next), Poll::Pending);
149        drop(next);
150
151        assert_eq!(f.len(), 1);
152        tokio::time::sleep(Duration::from_millis(201)).await;
153        assert_eq!(futures::poll!(f.next()), Poll::Ready(Some(())));
154
155        assert_eq!(f.len(), 0);
156    }
157
158    #[tokio::test(start_paused = true)]
159    async fn test_stask_mixed_tasks() {
160        let mut f = FuturesScheduled::new();
161
162        let now = Instant::now();
163
164        f.schedule(None, ready(0));
165        f.schedule(Some(now + Duration::from_secs(2)), ready(2));
166        f.schedule(Some(now + Duration::from_secs(1)), ready(1));
167        f.schedule(Some(now + Duration::from_secs(3)), ready(3));
168        assert_eq!(f.len(), 4);
169
170        assert_eq!(f.next().await, Some(0));
171        assert_eq!(f.next().await, Some(1));
172        f.schedule(None, ready(90));
173        assert_eq!(f.next().await, Some(90));
174        f.schedule(Some(now), ready(91)); // Now in the past.
175        assert_eq!(f.next().await, Some(91));
176        assert_eq!(f.next().await, Some(2));
177        f.schedule(Some(now + Duration::from_secs(4)), ready(4));
178        assert_eq!(f.len(), 2);
179        assert_eq!(f.next().await, Some(3));
180        assert_eq!(f.next().await, Some(4));
181
182        assert_eq!(f.len(), 0);
183        assert!(Instant::now() < now + Duration::from_millis(4001));
184        assert_eq!(futures::poll!(f.next()), Poll::Pending);
185
186        f.schedule(Some(Instant::now()), ready(92));
187        assert_eq!(
188            tokio::time::timeout(Duration::from_millis(1), f.next())
189                .await
190                .unwrap(),
191            Some(92)
192        );
193
194        assert_eq!(futures::poll!(f.next()), Poll::Pending);
195        assert_eq!(f.len(), 0);
196    }
197}