relay_server/services/buffer/envelope_stack/
caching.rs

1use chrono::{DateTime, Utc};
2
3use super::EnvelopeStack;
4use crate::envelope::Envelope;
5
6/// An envelope stack implementation that caches one element in memory and delegates
7/// to another envelope stack for additional storage.
8#[derive(Debug)]
9pub struct CachingEnvelopeStack<S> {
10    /// The underlying envelope stack
11    inner: S,
12    /// The cached envelope (if any)
13    cached: Option<Box<Envelope>>,
14}
15
16impl<S> CachingEnvelopeStack<S>
17where
18    S: EnvelopeStack,
19{
20    /// Creates a new [`CachingEnvelopeStack`] wrapping the provided envelope stack
21    pub fn new(inner: S) -> Self {
22        Self {
23            inner,
24            cached: None,
25        }
26    }
27}
28
29impl<S> EnvelopeStack for CachingEnvelopeStack<S>
30where
31    S: EnvelopeStack,
32{
33    type Error = S::Error;
34
35    async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), Self::Error> {
36        if let Some(cached) = self.cached.take() {
37            self.inner.push(cached).await?;
38        }
39        self.cached = Some(envelope);
40
41        Ok(())
42    }
43
44    async fn peek(&mut self) -> Result<Option<DateTime<Utc>>, Self::Error> {
45        if let Some(ref envelope) = self.cached {
46            Ok(Some(envelope.received_at()))
47        } else {
48            self.inner.peek().await
49        }
50    }
51
52    async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
53        if let Some(envelope) = self.cached.take() {
54            Ok(Some(envelope))
55        } else {
56            self.inner.pop().await
57        }
58    }
59
60    async fn flush(mut self) {
61        if let Some(envelope) = self.cached {
62            if self.inner.push(envelope).await.is_err() {
63                relay_log::error!(
64                    "error while pushing the cached envelope in the inner stack during flushing",
65                );
66            }
67        }
68        self.inner.flush().await;
69    }
70}
71
72#[cfg(test)]
73mod tests {
74    use super::*;
75    use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack;
76    use crate::services::buffer::testutils::utils::mock_envelope;
77
78    #[tokio::test]
79    async fn test_caching_stack() {
80        let inner = MemoryEnvelopeStack::new();
81        let mut stack = CachingEnvelopeStack::new(inner);
82
83        // Create test envelopes with different timestamps
84        let envelope_1 = mock_envelope(Utc::now());
85        let envelope_2 = mock_envelope(Utc::now());
86
87        // Push 2 envelopes
88        stack.push(envelope_1).await.unwrap();
89        stack.push(envelope_2).await.unwrap();
90
91        // We pop the cached element.
92        assert!(stack.pop().await.unwrap().is_some());
93
94        // We peek the stack expecting it peeks the inner one.
95        assert!(stack.peek().await.unwrap().is_some());
96
97        // We pop the element and then check if the stack is empty.
98        assert!(stack.pop().await.unwrap().is_some());
99        assert!(stack.peek().await.unwrap().is_none());
100        assert!(stack.pop().await.unwrap().is_none());
101    }
102}