relay_server/services/buffer/envelope_stack/
sqlite.rs

1use std::fmt::Debug;
2use std::num::NonZeroUsize;
3
4use chrono::{DateTime, Utc};
5use relay_base_schema::project::ProjectKey;
6
7use crate::envelope::Envelope;
8use crate::services::buffer::envelope_stack::EnvelopeStack;
9use crate::services::buffer::envelope_store::sqlite::{
10    DatabaseBatch, DatabaseEnvelope, InsertEnvelopeError, SqliteEnvelopeStore,
11    SqliteEnvelopeStoreError,
12};
13use crate::statsd::{RelayCounters, RelayTimers};
14
15/// An error returned when doing an operation on [`SqliteEnvelopeStack`].
16#[derive(Debug, thiserror::Error)]
17pub enum SqliteEnvelopeStackError {
18    #[error("envelope store error: {0}")]
19    EnvelopeStoreError(#[from] SqliteEnvelopeStoreError),
20    #[error("envelope encode error: {0}")]
21    Envelope(#[from] InsertEnvelopeError),
22}
23
24#[derive(Debug)]
25/// An [`EnvelopeStack`] that is implemented on an SQLite database.
26///
27/// For efficiency reasons, the implementation has an in-memory buffer that is periodically spooled
28/// to disk in a batched way.
29pub struct SqliteEnvelopeStack {
30    /// Shared SQLite database pool which will be used to read and write from disk.
31    envelope_store: SqliteEnvelopeStore,
32    /// Maximum number of bytes in the in-memory cache before we write to disk.
33    batch_size_bytes: NonZeroUsize,
34    /// The project key of the project to which all the envelopes belong.
35    own_key: ProjectKey,
36    /// The project key of the root project of the trace to which all the envelopes belong.
37    sampling_key: ProjectKey,
38    /// In-memory stack containing a batch of envelopes that either have not been written to disk yet, or have been read from disk recently.
39    batch: Vec<DatabaseEnvelope>,
40    /// Boolean representing whether calls to `push()` and `peek()` check disk in case not enough
41    /// elements are available in the `batches_buffer`.
42    check_disk: bool,
43    /// The tag value of this partition which is used for reporting purposes.
44    partition_tag: String,
45}
46
47impl SqliteEnvelopeStack {
48    /// Creates a new empty [`SqliteEnvelopeStack`].
49    pub fn new(
50        partition_id: u8,
51        envelope_store: SqliteEnvelopeStore,
52        batch_size_bytes: usize,
53        own_key: ProjectKey,
54        sampling_key: ProjectKey,
55        check_disk: bool,
56    ) -> Self {
57        Self {
58            envelope_store,
59            batch_size_bytes: NonZeroUsize::new(batch_size_bytes)
60                .expect("batch bytes should be > 0"),
61            own_key,
62            sampling_key,
63            batch: vec![],
64            check_disk,
65            partition_tag: partition_id.to_string(),
66        }
67    }
68
69    /// Threshold above which the [`SqliteEnvelopeStack`] will spool data from the `buffer` to disk.
70    fn above_spool_threshold(&self) -> bool {
71        self.batch.iter().map(|e| e.len()).sum::<usize>() > self.batch_size_bytes.get()
72    }
73
74    /// Spools to disk a batch of envelopes from the `batch`.
75    ///
76    /// In case there is a failure while writing envelopes, all the envelopes that were enqueued
77    /// to be written to disk are lost. The explanation for this behavior can be found in the body
78    /// of the method.
79    async fn spool_to_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> {
80        let batch = std::mem::take(&mut self.batch);
81        let Ok(batch) = DatabaseBatch::try_from(batch) else {
82            return Ok(());
83        };
84
85        relay_statsd::metric!(
86            counter(RelayCounters::BufferSpooledEnvelopes) += batch.len() as u64,
87            partition_id = &self.partition_tag
88        );
89
90        // When early return here, we are acknowledging that the elements that we popped from
91        // the buffer are lost in case of failure. We are doing this on purposes, since if we were
92        // to have a database corruption during runtime, and we were to put the values back into
93        // the buffer we will end up with an infinite cycle.
94        relay_statsd::metric!(
95            timer(RelayTimers::BufferSpool),
96            partition_id = &self.partition_tag,
97            {
98                self.envelope_store
99                    .insert_batch(batch)
100                    .await
101                    .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?;
102            }
103        );
104
105        // If we successfully spooled to disk, we know that data should be there.
106        self.check_disk = true;
107
108        Ok(())
109    }
110
111    /// Unspools from disk a batch of envelopes and appends them to the `batch`.
112    ///
113    /// In case there is a failure while deleting envelopes, the envelopes will be lost.
114    async fn unspool_from_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> {
115        debug_assert!(self.batch.is_empty());
116        let batch = relay_statsd::metric!(
117            timer(RelayTimers::BufferUnspool),
118            partition_id = &self.partition_tag,
119            {
120                self.envelope_store
121                    .delete_batch(self.own_key, self.sampling_key)
122                    .await
123                    .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?
124            }
125        );
126
127        match batch {
128            Some(batch) => {
129                self.batch = batch.into();
130            }
131            None => self.check_disk = false,
132        }
133
134        relay_statsd::metric!(
135            counter(RelayCounters::BufferUnspooledEnvelopes) += self.batch.len() as u64,
136            partition_id = &self.partition_tag
137        );
138        Ok(())
139    }
140
141    /// Validates that the incoming [`Envelope`] has the same project keys at the
142    /// [`SqliteEnvelopeStack`].
143    fn validate_envelope(&self, envelope: &Envelope) -> bool {
144        let own_key = envelope.meta().public_key();
145        let sampling_key = envelope.sampling_key().unwrap_or(own_key);
146
147        self.own_key == own_key && self.sampling_key == sampling_key
148    }
149}
150
151impl EnvelopeStack for SqliteEnvelopeStack {
152    type Error = SqliteEnvelopeStackError;
153
154    async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), Self::Error> {
155        debug_assert!(self.validate_envelope(&envelope));
156
157        if self.above_spool_threshold() {
158            self.spool_to_disk().await?;
159        }
160
161        let encoded_envelope = relay_statsd::metric!(
162            timer(RelayTimers::BufferEnvelopesSerialization),
163            partition_id = &self.partition_tag,
164            { DatabaseEnvelope::try_from(envelope.as_ref())? }
165        );
166        self.batch.push(encoded_envelope);
167
168        Ok(())
169    }
170
171    async fn peek(&mut self) -> Result<Option<DateTime<Utc>>, Self::Error> {
172        if self.batch.is_empty() && self.check_disk {
173            self.unspool_from_disk().await?
174        }
175
176        let Some(envelope) = self.batch.last() else {
177            return Ok(None);
178        };
179
180        Ok(Some(envelope.received_at()))
181    }
182
183    async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
184        if self.batch.is_empty() && self.check_disk {
185            self.unspool_from_disk().await?
186        }
187
188        let Some(envelope) = self.batch.pop() else {
189            return Ok(None);
190        };
191        let envelope = envelope.try_into()?;
192
193        Ok(Some(envelope))
194    }
195
196    async fn flush(mut self) {
197        if let Err(e) = self.spool_to_disk().await {
198            relay_log::error!(error = &e as &dyn std::error::Error, "flush error");
199        }
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use chrono::Utc;
206    use relay_base_schema::project::ProjectKey;
207    use std::time::Duration;
208
209    use super::*;
210    use crate::services::buffer::testutils::utils::{mock_envelope, mock_envelopes, setup_db};
211
212    /// Helper function to calculate the total size of a slice of envelopes after compression
213    fn calculate_compressed_size(envelopes: &[Box<Envelope>]) -> usize {
214        envelopes
215            .iter()
216            .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap().len())
217            .sum()
218    }
219
220    #[tokio::test]
221    #[should_panic]
222    async fn test_push_with_mismatching_project_keys() {
223        let db = setup_db(false).await;
224        let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
225        let mut stack = SqliteEnvelopeStack::new(
226            0,
227            envelope_store,
228            10,
229            ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
230            ProjectKey::parse("c25ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
231            true,
232        );
233
234        let envelope = mock_envelope(Utc::now());
235        let _ = stack.push(envelope).await;
236    }
237
238    const COMPRESSED_ENVELOPE_SIZE: usize = 313;
239
240    #[tokio::test]
241    async fn test_push_when_db_is_not_valid() {
242        let db = setup_db(false).await;
243        let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
244
245        // Create envelopes first so we can calculate actual size
246        let envelopes = mock_envelopes(4);
247        let threshold_size = calculate_compressed_size(&envelopes) - 1;
248
249        let mut stack = SqliteEnvelopeStack::new(
250            0,
251            envelope_store,
252            threshold_size,
253            ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
254            ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
255            true,
256        );
257
258        // We push the 4 envelopes without errors because they are below the threshold.
259        for envelope in envelopes.clone() {
260            assert!(stack.push(envelope).await.is_ok());
261        }
262
263        // We push 1 more envelope which results in spooling, which fails because of a database
264        // problem.
265        let envelope = mock_envelope(Utc::now());
266        assert!(matches!(
267            stack.push(envelope).await,
268            Err(SqliteEnvelopeStackError::EnvelopeStoreError(_))
269        ));
270
271        // The stack now contains the last of the 1 elements that were added. If we add a new one
272        // we will end up with 2.
273        let envelope = mock_envelope(Utc::now());
274        assert!(stack.push(envelope.clone()).await.is_ok());
275        assert_eq!(stack.batch.len(), 1);
276
277        // We pop the remaining elements, expecting the last added envelope to be on top.
278        let popped_envelope_1 = stack.pop().await.unwrap().unwrap();
279        assert_eq!(
280            popped_envelope_1.event_id().unwrap(),
281            envelope.event_id().unwrap()
282        );
283        assert_eq!(stack.batch.len(), 0);
284    }
285
286    #[tokio::test]
287    async fn test_pop_when_db_is_not_valid() {
288        let db = setup_db(false).await;
289        let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
290        let mut stack = SqliteEnvelopeStack::new(
291            0,
292            envelope_store,
293            2,
294            ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
295            ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
296            true,
297        );
298
299        // We pop with an invalid db.
300        assert!(matches!(
301            stack.pop().await,
302            Err(SqliteEnvelopeStackError::EnvelopeStoreError(_))
303        ));
304    }
305
306    #[tokio::test]
307    async fn test_pop_when_stack_is_empty() {
308        let db = setup_db(true).await;
309        let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
310        let mut stack = SqliteEnvelopeStack::new(
311            0,
312            envelope_store,
313            2,
314            ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
315            ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
316            true,
317        );
318
319        // We pop with no elements.
320        // We pop with no elements.
321        assert!(stack.pop().await.unwrap().is_none());
322    }
323
324    #[tokio::test]
325    async fn test_push_below_threshold_and_pop() {
326        let db = setup_db(true).await;
327        let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
328        let mut stack = SqliteEnvelopeStack::new(
329            0,
330            envelope_store,
331            9999,
332            ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
333            ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
334            true,
335        );
336
337        let envelopes = mock_envelopes(5);
338
339        // We push 5 envelopes.
340        for envelope in envelopes.clone() {
341            assert!(stack.push(envelope).await.is_ok());
342        }
343        assert_eq!(stack.batch.len(), 5);
344
345        // We peek the top element.
346        let peeked = stack.peek().await.unwrap().unwrap();
347        assert_eq!(
348            peeked.timestamp_millis(),
349            envelopes.clone()[4].received_at().timestamp_millis()
350        );
351
352        // We pop 5 envelopes.
353        for envelope in envelopes.iter().rev() {
354            let popped_envelope = stack.pop().await.unwrap().unwrap();
355            assert_eq!(
356                popped_envelope.event_id().unwrap(),
357                envelope.event_id().unwrap()
358            );
359        }
360
361        assert_eq!(stack.batch.len(), 0);
362    }
363
364    #[tokio::test]
365    async fn test_push_above_threshold_and_pop() {
366        let db = setup_db(true).await;
367        let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
368
369        // Create envelopes first so we can calculate actual size
370        let envelopes = mock_envelopes(7);
371        let threshold_size = calculate_compressed_size(&envelopes[..5]) - 1;
372
373        // Create stack with threshold just below the size of first 5 envelopes
374        let mut stack = SqliteEnvelopeStack::new(
375            0,
376            envelope_store,
377            threshold_size,
378            ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
379            ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
380            true,
381        );
382
383        // We push 7 envelopes.
384        for envelope in envelopes.clone() {
385            assert!(stack.push(envelope).await.is_ok());
386        }
387        assert_eq!(stack.batch.len(), 2);
388
389        // We peek the top element.
390        let peeked = stack.peek().await.unwrap().unwrap();
391        assert_eq!(
392            peeked.timestamp_millis(),
393            envelopes[6].received_at().timestamp_millis()
394        );
395
396        // We pop envelopes, and we expect that the last 2 are in memory, since the first 5
397        // should have been spooled to disk.
398        for envelope in envelopes[5..7].iter().rev() {
399            let popped_envelope = stack.pop().await.unwrap().unwrap();
400            assert_eq!(
401                popped_envelope.event_id().unwrap(),
402                envelope.event_id().unwrap()
403            );
404        }
405        assert_eq!(stack.batch.len(), 0);
406
407        // We peek the top element, which since the buffer is empty should result in a disk load.
408        let peeked = stack.peek().await.unwrap().unwrap();
409        assert_eq!(
410            peeked.timestamp_millis(),
411            envelopes[4].received_at().timestamp_millis()
412        );
413
414        // We insert a new envelope, to test the load from disk happening during `peek()` gives
415        // priority to this envelope in the stack.
416        let envelope = mock_envelope(Utc::now());
417        assert!(stack.push(envelope.clone()).await.is_ok());
418
419        // We pop and expect the newly inserted element.
420        let popped_envelope = stack.pop().await.unwrap().unwrap();
421        assert_eq!(
422            popped_envelope.event_id().unwrap(),
423            envelope.event_id().unwrap()
424        );
425
426        // We pop 5 envelopes, which should not result in a disk load since `peek()` already should
427        // have caused it.
428        for envelope in envelopes[0..5].iter().rev() {
429            let popped_envelope = stack.pop().await.unwrap().unwrap();
430            assert_eq!(
431                popped_envelope.event_id().unwrap(),
432                envelope.event_id().unwrap()
433            );
434        }
435        assert_eq!(stack.batch.len(), 0);
436    }
437
438    #[tokio::test]
439    async fn test_drain() {
440        let db = setup_db(true).await;
441        let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
442        let mut stack = SqliteEnvelopeStack::new(
443            0,
444            envelope_store.clone(),
445            10 * COMPRESSED_ENVELOPE_SIZE,
446            ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
447            ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
448            true,
449        );
450
451        let envelopes = mock_envelopes(5);
452
453        // We push 5 envelopes and check that there is nothing on disk.
454        for envelope in envelopes.clone() {
455            assert!(stack.push(envelope).await.is_ok());
456        }
457        assert_eq!(stack.batch.len(), 5);
458        assert_eq!(envelope_store.total_count().await.unwrap(), 0);
459
460        // We drain the stack and make sure everything was spooled to disk.
461        stack.flush().await;
462        assert_eq!(envelope_store.total_count().await.unwrap(), 5);
463    }
464}