relay_server/utils/scheduled/
queue.rs

1use priority_queue::PriorityQueue;
2use std::cmp::Reverse;
3use std::collections::BinaryHeap;
4use std::fmt;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use std::time::Duration;
9
10use futures::stream::FusedStream;
11use tokio::time::Instant;
12
13use futures::Stream;
14
15/// A scheduled queue that can be polled for when the next item is ready.
16pub struct ScheduledQueue<T> {
17    queue: BinaryHeap<Item<T>>,
18    sleep: Pin<Box<tokio::time::Sleep>>,
19}
20
21impl<T> ScheduledQueue<T> {
22    /// Schedules a new item to be yielded at `when`.
23    pub fn schedule(&mut self, when: Instant, value: T) {
24        self.queue.push(Item { when, value });
25    }
26
27    fn peek_when(&self) -> Option<Instant> {
28        self.queue.peek().map(|item| item.when)
29    }
30
31    fn pop_value(&mut self) -> Option<T> {
32        self.queue.pop().map(|item| item.value)
33    }
34
35    fn iter(&self) -> impl Iterator<Item = (Instant, &T)> + '_ {
36        self.queue.iter().map(|item| (item.when, &item.value))
37    }
38}
39
40/// A scheduled queue that can be polled for when the next item is ready.
41///
42/// Unlike [`ScheduledQueue`] every unique `T` can only be scheduled once,
43/// scheduling a value again moves the deadline instead.
44pub struct UniqueScheduledQueue<T>
45where
46    T: std::hash::Hash + Eq,
47{
48    queue: PriorityQueue<T, Reverse<Instant>>,
49    sleep: Pin<Box<tokio::time::Sleep>>,
50}
51
52impl<T: std::hash::Hash + Eq> UniqueScheduledQueue<T> {
53    /// Schedules an item to be yielded at `when`.
54    ///
55    /// If the item was net yet scheduled, it is inserted into the queue,
56    /// otherwise the previous schedule is moved to the new deadline.
57    pub fn schedule(&mut self, when: Instant, value: T) {
58        self.queue.push(value, Reverse(when));
59    }
60
61    /// Removes a value from the queue.
62    pub fn remove(&mut self, value: &T) {
63        self.queue.remove(value);
64    }
65
66    fn peek_when(&self) -> Option<Instant> {
67        self.queue.peek().map(|(_, Reverse(when))| *when)
68    }
69
70    fn pop_value(&mut self) -> Option<T> {
71        self.queue.pop().map(|(value, _)| value)
72    }
73
74    fn iter(&self) -> impl Iterator<Item = (Instant, &T)> + '_ {
75        self.queue
76            .iter()
77            .map(|(value, Reverse(when))| (*when, value))
78    }
79}
80
81macro_rules! impl_queue {
82    ($name:ident, $($where:tt)*) => {
83        impl<T: $($where)*> $name<T> {
84            /// Creates a new, empty [`Self`].
85            pub fn new() -> Self {
86                Self {
87                    queue: Default::default(),
88                    sleep: Box::pin(tokio::time::sleep(Duration::MAX)),
89                }
90            }
91
92            /// Returns the current size of the queue.
93            #[allow(dead_code)]
94            pub fn len(&self) -> usize {
95                self.queue.len()
96            }
97
98            /// Returns true if there are no items in the queue.
99            #[allow(dead_code)]
100            pub fn is_empty(&self) -> bool {
101                self.len() == 0
102            }
103        }
104
105        impl<T: $($where)*> Default for $name<T> {
106            fn default() -> Self {
107                Self::new()
108            }
109        }
110
111        impl<T: $($where)*> Unpin for $name<T> {}
112
113        impl<T: $($where)*> FusedStream for $name<T> {
114            fn is_terminated(&self) -> bool {
115                // The stream never returns `Poll::Ready(None)`.
116                false
117            }
118        }
119
120        impl<T: $($where)*> Stream for $name<T> {
121            type Item = T;
122
123            fn poll_next(
124                mut self: Pin<&mut Self>,
125                cx: &mut Context<'_>,
126            ) -> Poll<Option<Self::Item>> {
127                if let Some(when) = self.peek_when() {
128                    // The head of the queue changed, reset the deadline.
129                    if self.sleep.deadline() != when {
130                        self.sleep.as_mut().reset(when);
131                    }
132
133                    // Poll and wait for the next item to be ready.
134                    if self.sleep.as_mut().poll(cx).is_ready() {
135                        // Item is ready, yield it.
136                        let value = self.pop_value().expect("pop after peek");
137                        return Poll::Ready(Some(value));
138                    }
139                }
140
141                Poll::Pending
142            }
143        }
144
145        impl<T: $($where)*>  fmt::Debug for $name<T> where T: fmt::Debug {
146            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147                let now = Instant::now();
148                let mut f = f.debug_list();
149                for (when, value) in self.iter() {
150                    f.entry(&(when.saturating_duration_since(now), value));
151                }
152                f.finish()
153            }
154        }
155    };
156}
157
158impl_queue!(ScheduledQueue, Sized);
159impl_queue!(UniqueScheduledQueue, std::hash::Hash + Eq);
160
161struct Item<T> {
162    when: Instant,
163    value: T,
164}
165
166impl<T> PartialEq for Item<T> {
167    fn eq(&self, other: &Self) -> bool {
168        other.when == self.when
169    }
170}
171impl<T> Eq for Item<T> {}
172
173impl<T> PartialOrd for Item<T> {
174    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
175        Some(self.cmp(other))
176    }
177}
178
179impl<T> Ord for Item<T> {
180    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
181        self.when.cmp(&other.when).reverse()
182    }
183}
184
185impl<T> fmt::Debug for Item<T>
186where
187    T: fmt::Debug,
188{
189    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190        self.value.fmt(f)
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use futures::StreamExt;
197
198    use super::*;
199
200    #[tokio::test(start_paused = true)]
201    async fn test_scheduled_queue() {
202        let mut s = ScheduledQueue::new();
203
204        let start = Instant::now();
205
206        s.schedule(start + Duration::from_millis(100), 4);
207        s.schedule(start + Duration::from_millis(150), 5);
208
209        s.schedule(start + Duration::from_nanos(3), 2);
210        s.schedule(start + Duration::from_nanos(2), 2);
211        s.schedule(start + Duration::from_nanos(1), 1);
212
213        assert_eq!(s.len(), 5);
214        assert_eq!(s.next().await, Some(1));
215        assert_eq!(s.next().await, Some(2));
216        assert_eq!(s.next().await, Some(2));
217
218        // Schedule immediately!
219        s.schedule(start, 3);
220
221        assert_eq!(s.len(), 3);
222        assert_eq!(s.next().await, Some(3));
223        assert_eq!(s.next().await, Some(4));
224        assert_eq!(s.next().await, Some(5));
225
226        assert_eq!(s.len(), 0);
227        assert!(s.is_empty());
228    }
229
230    #[tokio::test(start_paused = true)]
231    async fn test_unique_scheduled_queue() {
232        let mut s = UniqueScheduledQueue::new();
233
234        let start = Instant::now();
235
236        s.schedule(start, "xxx");
237        s.schedule(start + Duration::from_nanos(1), "a");
238        s.schedule(start + Duration::from_nanos(2), "b");
239        s.schedule(start + Duration::from_millis(100), "c");
240        s.schedule(start + Duration::from_millis(150), "d");
241        s.schedule(start + Duration::from_millis(200), "e");
242
243        assert_eq!(s.len(), 6);
244        s.remove(&"xxx");
245        assert_eq!(s.len(), 5);
246
247        assert_eq!(s.next().await, Some("a"));
248        assert_eq!(s.len(), 4);
249
250        // Move `b` to the end.
251        s.schedule(start + Duration::from_secs(1), "b");
252        // Move `d` before `c`.
253        s.schedule(start + Duration::from_millis(99), "d");
254        // Immediately schedule a new element.
255        s.schedule(start, "x");
256
257        assert_eq!(s.len(), 5);
258        assert_eq!(s.next().await, Some("x"));
259        assert_eq!(s.next().await, Some("d"));
260        assert_eq!(s.next().await, Some("c"));
261        assert_eq!(s.next().await, Some("e"));
262        assert_eq!(s.next().await, Some("b"));
263
264        assert_eq!(s.len(), 0);
265        assert!(s.is_empty());
266    }
267}