relay_server/utils/scheduled/
futures.rs1use std::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use crate::utils::ScheduledQueue;
8use futures::{
9 Stream, StreamExt,
10 stream::{FusedStream, FuturesUnordered},
11};
12use tokio::time::Instant;
13
14#[derive(Debug)]
16#[must_use = "streams do nothing unless polled"]
17pub struct FuturesScheduled<T> {
18 queue: ScheduledQueue<T>,
19 tasks: FuturesUnordered<T>,
20}
21
22impl<T> FuturesScheduled<T> {
23 pub fn new() -> Self {
25 Self {
26 queue: ScheduledQueue::new(),
27 tasks: FuturesUnordered::new(),
28 }
29 }
30
31 pub fn len(&self) -> usize {
33 self.queue.len() + self.tasks.len()
34 }
35
36 pub fn schedule(&mut self, when: Option<Instant>, task: T) {
40 match when {
41 Some(when) => self.queue.schedule(when, task),
42 None => self.tasks.push(task),
43 }
44 }
45}
46
47impl<T> Default for FuturesScheduled<T> {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53impl<T> Unpin for FuturesScheduled<T> {}
54
55impl<T> Stream for FuturesScheduled<T>
56where
57 T: Future,
58{
59 type Item = T::Output;
60
61 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62 if let Poll::Ready(Some(next)) = self.queue.poll_next_unpin(cx) {
64 self.tasks.push(next);
65 }
66
67 match self.tasks.poll_next_unpin(cx) {
68 Poll::Ready(Some(next)) => Poll::Ready(Some(next)),
70 Poll::Ready(None) => Poll::Pending,
73 Poll::Pending => Poll::Pending,
75 }
76 }
77}
78
79impl<T> FusedStream for FuturesScheduled<T>
80where
81 T: Future,
82{
83 fn is_terminated(&self) -> bool {
84 false
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use std::time::Duration;
92
93 use futures::future::{Ready, ready};
94
95 use super::*;
96
97 #[tokio::test]
98 async fn test_stask_empty() {
99 let mut f = FuturesScheduled::<Ready<()>>::new();
100
101 assert_eq!(f.len(), 0);
102 let mut next = f.next();
103 for _ in 0..10 {
104 assert_eq!(futures::poll!(&mut next), Poll::Pending);
105 }
106 assert_eq!(f.len(), 0);
107 }
108
109 #[tokio::test]
110 async fn test_stask_immediate_task() {
111 let mut f = FuturesScheduled::new();
112
113 f.schedule(None, ready(()));
114 assert_eq!(f.len(), 1);
115
116 let mut next = f.next();
117 assert_eq!(futures::poll!(&mut next), Poll::Ready(Some(())));
118 assert_eq!(f.len(), 0);
119 }
120
121 #[tokio::test(start_paused = true)]
122 async fn test_stask_scheduled_task() {
123 let mut f = FuturesScheduled::new();
124
125 f.schedule(Some(Instant::now() + Duration::from_secs(3)), ready(()));
126 assert_eq!(f.len(), 1);
127
128 let mut next = f.next();
129 assert_eq!(futures::poll!(&mut next), Poll::Pending);
130 tokio::time::sleep(Duration::from_millis(2800)).await;
131 assert_eq!(futures::poll!(&mut next), Poll::Pending);
132 tokio::time::sleep(Duration::from_millis(201)).await;
133 assert_eq!(futures::poll!(&mut next), Poll::Ready(Some(())));
134
135 assert_eq!(f.len(), 0);
136 }
137
138 #[tokio::test(start_paused = true)]
139 async fn test_stask_scheduled_task_next_cancelled() {
140 let mut f = FuturesScheduled::new();
141
142 f.schedule(Some(Instant::now() + Duration::from_secs(3)), ready(()));
143 assert_eq!(f.len(), 1);
144
145 let mut next = f.next();
146 assert_eq!(futures::poll!(&mut next), Poll::Pending);
147 tokio::time::sleep(Duration::from_millis(2800)).await;
148 assert_eq!(futures::poll!(&mut next), Poll::Pending);
149 drop(next);
150
151 assert_eq!(f.len(), 1);
152 tokio::time::sleep(Duration::from_millis(201)).await;
153 assert_eq!(futures::poll!(f.next()), Poll::Ready(Some(())));
154
155 assert_eq!(f.len(), 0);
156 }
157
158 #[tokio::test(start_paused = true)]
159 async fn test_stask_mixed_tasks() {
160 let mut f = FuturesScheduled::new();
161
162 let now = Instant::now();
163
164 f.schedule(None, ready(0));
165 f.schedule(Some(now + Duration::from_secs(2)), ready(2));
166 f.schedule(Some(now + Duration::from_secs(1)), ready(1));
167 f.schedule(Some(now + Duration::from_secs(3)), ready(3));
168 assert_eq!(f.len(), 4);
169
170 assert_eq!(f.next().await, Some(0));
171 assert_eq!(f.next().await, Some(1));
172 f.schedule(None, ready(90));
173 assert_eq!(f.next().await, Some(90));
174 f.schedule(Some(now), ready(91)); assert_eq!(f.next().await, Some(91));
176 assert_eq!(f.next().await, Some(2));
177 f.schedule(Some(now + Duration::from_secs(4)), ready(4));
178 assert_eq!(f.len(), 2);
179 assert_eq!(f.next().await, Some(3));
180 assert_eq!(f.next().await, Some(4));
181
182 assert_eq!(f.len(), 0);
183 assert!(Instant::now() < now + Duration::from_millis(4001));
184 assert_eq!(futures::poll!(f.next()), Poll::Pending);
185
186 f.schedule(Some(Instant::now()), ready(92));
187 assert_eq!(
188 tokio::time::timeout(Duration::from_millis(1), f.next())
189 .await
190 .unwrap(),
191 Some(92)
192 );
193
194 assert_eq!(futures::poll!(f.next()), Poll::Pending);
195 assert_eq!(f.len(), 0);
196 }
197}