relay_server/utils/scheduled/
queue.rs1use priority_queue::PriorityQueue;
2use std::cmp::Reverse;
3use std::collections::BinaryHeap;
4use std::fmt;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use std::time::Duration;
9
10use futures::stream::FusedStream;
11use tokio::time::Instant;
12
13use futures::Stream;
14
15pub struct ScheduledQueue<T> {
17 queue: BinaryHeap<Item<T>>,
18 sleep: Pin<Box<tokio::time::Sleep>>,
19}
20
21impl<T> ScheduledQueue<T> {
22 pub fn schedule(&mut self, when: Instant, value: T) {
24 self.queue.push(Item { when, value });
25 }
26
27 fn peek_when(&self) -> Option<Instant> {
28 self.queue.peek().map(|item| item.when)
29 }
30
31 fn pop_value(&mut self) -> Option<T> {
32 self.queue.pop().map(|item| item.value)
33 }
34
35 fn iter(&self) -> impl Iterator<Item = (Instant, &T)> + '_ {
36 self.queue.iter().map(|item| (item.when, &item.value))
37 }
38}
39
40pub struct UniqueScheduledQueue<T>
45where
46 T: std::hash::Hash + Eq,
47{
48 queue: PriorityQueue<T, Reverse<Instant>>,
49 sleep: Pin<Box<tokio::time::Sleep>>,
50}
51
52impl<T: std::hash::Hash + Eq> UniqueScheduledQueue<T> {
53 pub fn schedule(&mut self, when: Instant, value: T) {
58 self.queue.push(value, Reverse(when));
59 }
60
61 pub fn remove(&mut self, value: &T) {
63 self.queue.remove(value);
64 }
65
66 fn peek_when(&self) -> Option<Instant> {
67 self.queue.peek().map(|(_, Reverse(when))| *when)
68 }
69
70 fn pop_value(&mut self) -> Option<T> {
71 self.queue.pop().map(|(value, _)| value)
72 }
73
74 fn iter(&self) -> impl Iterator<Item = (Instant, &T)> + '_ {
75 self.queue
76 .iter()
77 .map(|(value, Reverse(when))| (*when, value))
78 }
79}
80
81macro_rules! impl_queue {
82 ($name:ident, $($where:tt)*) => {
83 impl<T: $($where)*> $name<T> {
84 pub fn new() -> Self {
86 Self {
87 queue: Default::default(),
88 sleep: Box::pin(tokio::time::sleep(Duration::MAX)),
89 }
90 }
91
92 #[allow(dead_code)]
94 pub fn len(&self) -> usize {
95 self.queue.len()
96 }
97
98 #[allow(dead_code)]
100 pub fn is_empty(&self) -> bool {
101 self.len() == 0
102 }
103 }
104
105 impl<T: $($where)*> Default for $name<T> {
106 fn default() -> Self {
107 Self::new()
108 }
109 }
110
111 impl<T: $($where)*> Unpin for $name<T> {}
112
113 impl<T: $($where)*> FusedStream for $name<T> {
114 fn is_terminated(&self) -> bool {
115 false
117 }
118 }
119
120 impl<T: $($where)*> Stream for $name<T> {
121 type Item = T;
122
123 fn poll_next(
124 mut self: Pin<&mut Self>,
125 cx: &mut Context<'_>,
126 ) -> Poll<Option<Self::Item>> {
127 if let Some(when) = self.peek_when() {
128 if self.sleep.deadline() != when {
130 self.sleep.as_mut().reset(when);
131 }
132
133 if self.sleep.as_mut().poll(cx).is_ready() {
135 let value = self.pop_value().expect("pop after peek");
137 return Poll::Ready(Some(value));
138 }
139 }
140
141 Poll::Pending
142 }
143 }
144
145 impl<T: $($where)*> fmt::Debug for $name<T> where T: fmt::Debug {
146 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147 let now = Instant::now();
148 let mut f = f.debug_list();
149 for (when, value) in self.iter() {
150 f.entry(&(when.saturating_duration_since(now), value));
151 }
152 f.finish()
153 }
154 }
155 };
156}
157
158impl_queue!(ScheduledQueue, Sized);
159impl_queue!(UniqueScheduledQueue, std::hash::Hash + Eq);
160
161struct Item<T> {
162 when: Instant,
163 value: T,
164}
165
166impl<T> PartialEq for Item<T> {
167 fn eq(&self, other: &Self) -> bool {
168 other.when == self.when
169 }
170}
171impl<T> Eq for Item<T> {}
172
173impl<T> PartialOrd for Item<T> {
174 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
175 Some(self.cmp(other))
176 }
177}
178
179impl<T> Ord for Item<T> {
180 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
181 self.when.cmp(&other.when).reverse()
182 }
183}
184
185impl<T> fmt::Debug for Item<T>
186where
187 T: fmt::Debug,
188{
189 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190 self.value.fmt(f)
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use futures::StreamExt;
197
198 use super::*;
199
200 #[tokio::test(start_paused = true)]
201 async fn test_scheduled_queue() {
202 let mut s = ScheduledQueue::new();
203
204 let start = Instant::now();
205
206 s.schedule(start + Duration::from_millis(100), 4);
207 s.schedule(start + Duration::from_millis(150), 5);
208
209 s.schedule(start + Duration::from_nanos(3), 2);
210 s.schedule(start + Duration::from_nanos(2), 2);
211 s.schedule(start + Duration::from_nanos(1), 1);
212
213 assert_eq!(s.len(), 5);
214 assert_eq!(s.next().await, Some(1));
215 assert_eq!(s.next().await, Some(2));
216 assert_eq!(s.next().await, Some(2));
217
218 s.schedule(start, 3);
220
221 assert_eq!(s.len(), 3);
222 assert_eq!(s.next().await, Some(3));
223 assert_eq!(s.next().await, Some(4));
224 assert_eq!(s.next().await, Some(5));
225
226 assert_eq!(s.len(), 0);
227 assert!(s.is_empty());
228 }
229
230 #[tokio::test(start_paused = true)]
231 async fn test_unique_scheduled_queue() {
232 let mut s = UniqueScheduledQueue::new();
233
234 let start = Instant::now();
235
236 s.schedule(start, "xxx");
237 s.schedule(start + Duration::from_nanos(1), "a");
238 s.schedule(start + Duration::from_nanos(2), "b");
239 s.schedule(start + Duration::from_millis(100), "c");
240 s.schedule(start + Duration::from_millis(150), "d");
241 s.schedule(start + Duration::from_millis(200), "e");
242
243 assert_eq!(s.len(), 6);
244 s.remove(&"xxx");
245 assert_eq!(s.len(), 5);
246
247 assert_eq!(s.next().await, Some("a"));
248 assert_eq!(s.len(), 4);
249
250 s.schedule(start + Duration::from_secs(1), "b");
252 s.schedule(start + Duration::from_millis(99), "d");
254 s.schedule(start, "x");
256
257 assert_eq!(s.len(), 5);
258 assert_eq!(s.next().await, Some("x"));
259 assert_eq!(s.next().await, Some("d"));
260 assert_eq!(s.next().await, Some("c"));
261 assert_eq!(s.next().await, Some("e"));
262 assert_eq!(s.next().await, Some("b"));
263
264 assert_eq!(s.len(), 0);
265 assert!(s.is_empty());
266 }
267}