relay_server/services/buffer/envelope_stack/
caching.rs1use chrono::{DateTime, Utc};
2
3use super::EnvelopeStack;
4use crate::envelope::Envelope;
5
6#[derive(Debug)]
9pub struct CachingEnvelopeStack<S> {
10 inner: S,
12 cached: Option<Box<Envelope>>,
14}
15
16impl<S> CachingEnvelopeStack<S>
17where
18 S: EnvelopeStack,
19{
20 pub fn new(inner: S) -> Self {
22 Self {
23 inner,
24 cached: None,
25 }
26 }
27}
28
29impl<S> EnvelopeStack for CachingEnvelopeStack<S>
30where
31 S: EnvelopeStack,
32{
33 type Error = S::Error;
34
35 async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), Self::Error> {
36 if let Some(cached) = self.cached.take() {
37 self.inner.push(cached).await?;
38 }
39 self.cached = Some(envelope);
40
41 Ok(())
42 }
43
44 async fn peek(&mut self) -> Result<Option<DateTime<Utc>>, Self::Error> {
45 if let Some(ref envelope) = self.cached {
46 Ok(Some(envelope.received_at()))
47 } else {
48 self.inner.peek().await
49 }
50 }
51
52 async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
53 if let Some(envelope) = self.cached.take() {
54 Ok(Some(envelope))
55 } else {
56 self.inner.pop().await
57 }
58 }
59
60 async fn flush(mut self) {
61 if let Some(envelope) = self.cached {
62 if self.inner.push(envelope).await.is_err() {
63 relay_log::error!(
64 "error while pushing the cached envelope in the inner stack during flushing",
65 );
66 }
67 }
68 self.inner.flush().await;
69 }
70}
71
72#[cfg(test)]
73mod tests {
74 use super::*;
75 use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack;
76 use crate::services::buffer::testutils::utils::mock_envelope;
77
78 #[tokio::test]
79 async fn test_caching_stack() {
80 let inner = MemoryEnvelopeStack::new();
81 let mut stack = CachingEnvelopeStack::new(inner);
82
83 let envelope_1 = mock_envelope(Utc::now());
85 let envelope_2 = mock_envelope(Utc::now());
86
87 stack.push(envelope_1).await.unwrap();
89 stack.push(envelope_2).await.unwrap();
90
91 assert!(stack.pop().await.unwrap().is_some());
93
94 assert!(stack.peek().await.unwrap().is_some());
96
97 assert!(stack.pop().await.unwrap().is_some());
99 assert!(stack.peek().await.unwrap().is_none());
100 assert!(stack.pop().await.unwrap().is_none());
101 }
102}