relay_server/services/buffer/envelope_store/
sqlite.rs

1use std::io::{ErrorKind, Read};
2use std::path::Path;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::time::Duration;
6
7use crate::envelope::EnvelopeError;
8
9use crate::Envelope;
10use crate::services::buffer::common::ProjectKeyPair;
11use crate::statsd::{RelayDistributions, RelayGauges, RelayTimers};
12use bytes::{Buf, Bytes};
13use chrono::{DateTime, Utc};
14use futures::stream::StreamExt;
15use hashbrown::HashSet;
16use relay_base_schema::project::{ParseProjectKeyError, ProjectKey};
17use relay_config::Config;
18use serde::{Deserialize, Serialize};
19use sqlx::migrate::MigrateError;
20use sqlx::query::Query;
21use sqlx::sqlite::{
22    SqliteArguments, SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions,
23    SqliteRow, SqliteSynchronous,
24};
25use sqlx::{Pool, Row, Sqlite};
26use tokio::fs::DirBuilder;
27use tokio::time::sleep;
28
29/// Fixed first 4 bytes for zstd compressed envelopes.
30///
31/// Used for backward compatibility to check whether an envelope on disk is zstd-encoded.
32const ZSTD_MAGIC_WORD: &[u8] = &[40, 181, 47, 253];
33
34/// Struct that contains all the fields of an [`Envelope`] that are mapped to the database columns.
35#[derive(Clone, Debug, Serialize, Deserialize)]
36pub struct DatabaseEnvelope {
37    received_at: i64,
38    own_key: ProjectKey,
39    sampling_key: ProjectKey,
40    encoded_envelope: Box<[u8]>,
41}
42
43#[derive(Clone, Debug)]
44pub struct DatabaseBatch {
45    received_at: i64,
46    own_key: ProjectKey,
47    sampling_key: ProjectKey,
48    envelopes: Vec<DatabaseEnvelope>,
49}
50
51impl DatabaseBatch {
52    pub fn len(&self) -> usize {
53        self.envelopes.len()
54    }
55}
56
57impl TryFrom<Vec<DatabaseEnvelope>> for DatabaseBatch {
58    type Error = ();
59
60    fn try_from(envelopes: Vec<DatabaseEnvelope>) -> Result<Self, Self::Error> {
61        let Some(last) = envelopes.last() else {
62            return Err(());
63        };
64        Ok(Self {
65            received_at: last.received_at,
66            own_key: last.own_key,
67            sampling_key: last.sampling_key,
68            envelopes,
69        })
70    }
71}
72
73impl From<DatabaseBatch> for Vec<DatabaseEnvelope> {
74    fn from(value: DatabaseBatch) -> Self {
75        value.envelopes
76    }
77}
78
79#[derive(Debug, thiserror::Error)]
80pub enum InsertEnvelopeError {
81    #[error("envelope conversion error: {0}")]
82    Envelope(#[from] EnvelopeError),
83    #[error("compression error: {0}")]
84    Zstd(#[from] std::io::Error),
85}
86
87impl DatabaseEnvelope {
88    // Use the lowest level of compression.
89    //
90    // Experiments showed that level 3 is significantly slower than level 1 while offering
91    // no significant size reduction for our use case.
92    const COMPRESSION_LEVEL: i32 = 1;
93
94    pub fn len(&self) -> usize {
95        self.encoded_envelope.len()
96    }
97
98    pub fn received_at(&self) -> DateTime<Utc> {
99        DateTime::from_timestamp_millis(self.received_at).unwrap_or(Utc::now())
100    }
101}
102
103impl TryFrom<DatabaseEnvelope> for Box<Envelope> {
104    type Error = InsertEnvelopeError;
105
106    fn try_from(value: DatabaseEnvelope) -> Result<Self, Self::Error> {
107        let received_at = value.received_at();
108        let DatabaseEnvelope {
109            received_at: _,
110            own_key,
111            sampling_key,
112            mut encoded_envelope,
113        } = value;
114
115        if encoded_envelope.starts_with(ZSTD_MAGIC_WORD) {
116            relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeDecompression), {
117                encoded_envelope = zstd::decode_all(&*encoded_envelope)?.into_boxed_slice();
118            });
119        }
120
121        let mut envelope = Envelope::parse_bytes(Bytes::from(encoded_envelope))?;
122        debug_assert_eq!(envelope.meta().public_key(), own_key);
123        debug_assert!(
124            envelope
125                .sampling_key()
126                .is_none_or(|key| key == sampling_key)
127        );
128
129        envelope.set_received_at(received_at);
130
131        Ok(envelope)
132    }
133}
134
135impl<'a> TryFrom<&'a Envelope> for DatabaseEnvelope {
136    type Error = InsertEnvelopeError;
137
138    fn try_from(value: &'a Envelope) -> Result<Self, Self::Error> {
139        let own_key = value.meta().public_key();
140        let sampling_key = value.sampling_key().unwrap_or(own_key);
141
142        let serialized_envelope = value.to_vec()?;
143        relay_statsd::metric!(
144            distribution(RelayDistributions::BufferEnvelopeSize) = serialized_envelope.len() as u64
145        );
146
147        let encoded_envelope =
148            relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeCompression), {
149                zstd::encode_all(serialized_envelope.as_slice(), Self::COMPRESSION_LEVEL)?
150            });
151        relay_statsd::metric!(
152            distribution(RelayDistributions::BufferEnvelopeSizeCompressed) =
153                encoded_envelope.len() as u64
154        );
155
156        Ok(DatabaseEnvelope {
157            received_at: value.received_at().timestamp_millis(),
158            own_key,
159            sampling_key,
160            encoded_envelope: encoded_envelope.into_boxed_slice(),
161        })
162    }
163}
164
165/// An error returned when doing an operation on [`SqliteEnvelopeStore`].
166#[derive(Debug, thiserror::Error)]
167pub enum SqliteEnvelopeStoreError {
168    #[error("failed to setup the database: {0}")]
169    SqlxSetupFailed(sqlx::Error),
170
171    #[error("failed to create the spool file: {0}")]
172    FileSetupError(std::io::Error),
173
174    #[error("failed to write to disk: {0}")]
175    WriteError(sqlx::Error),
176
177    #[error("failed to read from disk: {0}")]
178    FetchError(sqlx::Error),
179
180    #[error("failed to unpack envelopes: {0}")]
181    UnpackError(#[from] std::io::Error),
182
183    #[error("no file path for the spool was provided")]
184    NoFilePath,
185
186    #[error("failed to migrate the database: {0}")]
187    MigrationError(MigrateError),
188
189    #[error("failed to extract the envelope from the database")]
190    EnvelopeExtractionError,
191
192    #[error("failed to extract a project key from the database")]
193    ProjectKeyExtractionError(#[from] ParseProjectKeyError),
194
195    #[error("failed to get database file size: {0}")]
196    FileSizeReadFailed(sqlx::Error),
197}
198
199#[derive(Debug, Clone)]
200struct DiskUsage {
201    db: Pool<Sqlite>,
202    last_known_usage: Arc<AtomicU64>,
203    refresh_frequency: Duration,
204    partition_tag: String,
205}
206
207impl DiskUsage {
208    /// Creates a new empty [`DiskUsage`].
209    fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
210        Self {
211            db,
212            last_known_usage: Arc::new(AtomicU64::new(0)),
213            refresh_frequency,
214            partition_tag: partition_id.to_string(),
215        }
216    }
217
218    /// Prepares a [`DiskUsage`] instance with an initial reading of the database usage and fails
219    /// if not reading can be made.
220    pub async fn prepare(
221        partition_id: u8,
222        db: Pool<Sqlite>,
223        refresh_frequency: Duration,
224    ) -> Result<Self, SqliteEnvelopeStoreError> {
225        let disk_usage = Self::new(partition_id, db.clone(), refresh_frequency);
226        let usage = Self::estimate_usage(&disk_usage.partition_tag, &db).await?;
227        disk_usage.last_known_usage.store(usage, Ordering::Relaxed);
228        disk_usage.start_background_refresh();
229
230        Ok(disk_usage)
231    }
232
233    /// Returns the disk usage and asynchronously updates it in case a `refresh_frequency_ms`
234    /// elapsed.
235    fn usage(&self) -> u64 {
236        self.last_known_usage.load(Ordering::Relaxed)
237    }
238
239    /// Starts a background tokio task to update the database usage.
240    fn start_background_refresh(&self) {
241        let db = self.db.clone();
242        // We get a weak reference, to make sure that if `DiskUsage` is dropped, the reference can't
243        // be upgraded, causing the loop in the tokio task to exit.
244        let last_known_usage_weak = Arc::downgrade(&self.last_known_usage);
245        let refresh_frequency = self.refresh_frequency;
246
247        let partition_tag = self.partition_tag.clone();
248        relay_system::spawn!(async move {
249            // When our `Weak` reference can't be upgraded to an `Arc`, it means that the value
250            // is not referenced anymore by self, meaning that `DiskUsage` was dropped.
251            while let Some(last_known_usage) = last_known_usage_weak.upgrade() {
252                let usage = Self::estimate_usage(&partition_tag, &db).await;
253                let Ok(usage) = usage else {
254                    relay_log::error!("failed to update the disk usage asynchronously");
255                    return;
256                };
257
258                let current = last_known_usage.load(Ordering::Relaxed);
259                if last_known_usage
260                    .compare_exchange_weak(current, usage, Ordering::Relaxed, Ordering::Relaxed)
261                    .is_err()
262                {
263                    relay_log::error!("failed to update the disk usage asynchronously");
264                };
265
266                sleep(refresh_frequency).await;
267            }
268        });
269    }
270
271    /// Estimates the disk usage of the SQLite database.
272    async fn estimate_usage(
273        partition_tag: &str,
274        db: &Pool<Sqlite>,
275    ) -> Result<u64, SqliteEnvelopeStoreError> {
276        let usage: i64 = build_estimate_size()
277            .fetch_one(db)
278            .await
279            .and_then(|r| r.try_get(0))
280            .map_err(SqliteEnvelopeStoreError::FileSizeReadFailed)?;
281
282        relay_statsd::metric!(
283            gauge(RelayGauges::BufferDiskUsed) = usage as u64,
284            partition_id = partition_tag
285        );
286
287        Ok(usage as u64)
288    }
289}
290
291/// Struct that offers access to a SQLite-based store of [`Envelope`]s.
292///
293/// The goal of this struct is to hide away all the complexity of dealing with the database for
294/// reading and writing envelopes.
295#[derive(Debug, Clone)]
296pub struct SqliteEnvelopeStore {
297    db: Pool<Sqlite>,
298    disk_usage: DiskUsage,
299    partition_tag: String,
300}
301
302impl SqliteEnvelopeStore {
303    /// Initializes the [`SqliteEnvelopeStore`] with a supplied [`Pool`].
304    pub fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
305        Self {
306            db: db.clone(),
307            disk_usage: DiskUsage::new(partition_id, db, refresh_frequency),
308            partition_tag: partition_id.to_string(),
309        }
310    }
311
312    /// Prepares the [`SqliteEnvelopeStore`] by running all the necessary migrations and preparing
313    /// the folders where data will be stored.
314    pub async fn prepare(
315        partition_id: u8,
316        config: &Config,
317    ) -> Result<SqliteEnvelopeStore, SqliteEnvelopeStoreError> {
318        // If no path is provided, we can't do disk spooling.
319        let Some(path) = config.spool_envelopes_path(partition_id) else {
320            return Err(SqliteEnvelopeStoreError::NoFilePath);
321        };
322
323        relay_log::info!("buffer file {}", path.to_string_lossy());
324
325        Self::setup(&path).await?;
326
327        let options = SqliteConnectOptions::new()
328            .filename(&path)
329            // The WAL journaling mode uses a write-ahead log instead of a rollback journal to implement transactions.
330            // The WAL journaling mode is persistent; after being set it stays in effect
331            // across multiple database connections and after closing and reopening the database.
332            //
333            // 1. WAL is significantly faster in most scenarios.
334            // 2. WAL provides more concurrency as readers do not block writers and a writer does not block readers. Reading and writing can proceed concurrently.
335            // 3. Disk I/O operations tends to be more sequential using WAL.
336            // 4. WAL uses many fewer fsync() operations and is thus less vulnerable to problems on systems where the fsync() system call is broken.
337            .journal_mode(SqliteJournalMode::Wal)
338            // WAL mode is safe from corruption with synchronous=NORMAL.
339            // When synchronous is NORMAL, the SQLite database engine will still sync at the most critical moments, but less often than in FULL mode.
340            // Which guarantees good balance between safety and speed.
341            .synchronous(SqliteSynchronous::Normal)
342            // The freelist pages are moved to the end of the database file and the database file is truncated to remove the freelist pages at every
343            // transaction commit. Note, however, that auto-vacuum only truncates the freelist pages from the file.
344            // Auto-vacuum does not de-fragment the database nor repack individual database pages the way that the VACUUM command does.
345            //
346            // This will help us to keep the file size under some control.
347            .auto_vacuum(SqliteAutoVacuum::Full)
348            // If shared-cache mode is enabled and a thread establishes multiple
349            // connections to the same database, the connections share a single data and schema cache.
350            // This can significantly reduce the quantity of memory and IO required by the system.
351            .shared_cache(true);
352
353        let db = SqlitePoolOptions::new()
354            .max_connections(1)
355            .min_connections(1)
356            .connect_with(options)
357            .await
358            .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
359
360        Ok(SqliteEnvelopeStore {
361            db: db.clone(),
362            disk_usage: DiskUsage::prepare(
363                partition_id,
364                db,
365                config.spool_disk_usage_refresh_frequency_ms(),
366            )
367            .await?,
368            partition_tag: partition_id.to_string(),
369        })
370    }
371
372    /// Set up the database and return the current number of envelopes.
373    ///
374    /// The directories and spool file will be created if they don't already
375    /// exist.
376    async fn setup(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
377        Self::create_spool_directory(path).await?;
378
379        let options = SqliteConnectOptions::new()
380            .filename(path)
381            .journal_mode(SqliteJournalMode::Wal)
382            .create_if_missing(true);
383
384        let db = SqlitePoolOptions::new()
385            .connect_with(options)
386            .await
387            .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
388
389        sqlx::migrate!("../migrations")
390            .run(&db)
391            .await
392            .map_err(SqliteEnvelopeStoreError::MigrationError)?;
393
394        Ok(())
395    }
396
397    /// Creates the directories for the spool file.
398    async fn create_spool_directory(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
399        let Some(parent) = path.parent() else {
400            return Ok(());
401        };
402
403        if !parent.as_os_str().is_empty() && !parent.exists() {
404            relay_log::debug!("creating directory for spooling file: {}", parent.display());
405            DirBuilder::new()
406                .recursive(true)
407                .create(&parent)
408                .await
409                .map_err(SqliteEnvelopeStoreError::FileSetupError)?;
410        }
411
412        Ok(())
413    }
414
415    /// Inserts one or more envelopes into the database.
416    pub async fn insert_batch(
417        &mut self,
418        envelopes: DatabaseBatch,
419    ) -> Result<(), SqliteEnvelopeStoreError> {
420        let DatabaseBatch {
421            received_at,
422            own_key,
423            sampling_key,
424            envelopes,
425        } = envelopes;
426
427        let count = envelopes.len();
428        let encoded = match count {
429            0 => {
430                debug_assert!(false, "should not be called with empty batch");
431                return Ok(());
432            }
433            // special-casing single envelopes shaves off a little bit of time for large envelopes,
434            // but it's mainly for backward compatibility.
435            1 => envelopes.into_iter().next().unwrap().encoded_envelope,
436            _more => pack_envelopes(envelopes),
437        };
438
439        let query = sqlx::query("INSERT INTO envelopes (received_at, own_key, sampling_key, count, envelope) VALUES (?, ?, ?, ?, ?);")
440            .bind(received_at)
441            .bind(own_key.as_str())
442            .bind(sampling_key.as_str())
443            .bind(count as u16)
444            .bind(encoded);
445
446        relay_statsd::metric!(
447            timer(RelayTimers::BufferSqlWrite),
448            partition_id = &self.partition_tag,
449            {
450                query
451                    .execute(&self.db)
452                    .await
453                    .map_err(SqliteEnvelopeStoreError::WriteError)?;
454            }
455        );
456        Ok(())
457    }
458
459    /// Deletes and returns at most `limit` [`Envelope`]s from the database.
460    pub async fn delete_batch(
461        &mut self,
462        own_key: ProjectKey,
463        sampling_key: ProjectKey,
464    ) -> Result<Option<DatabaseBatch>, SqliteEnvelopeStoreError> {
465        let mut rows = build_delete_and_fetch_many_envelopes(own_key, sampling_key).fetch(&self.db);
466        let Some(row) = rows.as_mut().next().await else {
467            return Ok(None);
468        };
469        let row = row.map_err(SqliteEnvelopeStoreError::FetchError)?;
470
471        Ok(Some(extract_batch(own_key, sampling_key, row)?))
472    }
473
474    /// Returns a set of project key pairs, representing all the unique combinations of
475    /// `own_key` and `project_key` that are found in the database.
476    pub async fn project_key_pairs(
477        &self,
478    ) -> Result<HashSet<ProjectKeyPair>, SqliteEnvelopeStoreError> {
479        let project_key_pairs = build_get_project_key_pairs()
480            .fetch_all(&self.db)
481            .await
482            .map_err(SqliteEnvelopeStoreError::FetchError)?;
483
484        let project_key_pairs = project_key_pairs
485            .into_iter()
486            // Collect only keys we can extract.
487            .filter_map(|project_key_pair| extract_project_key_pair(project_key_pair).ok())
488            .collect();
489
490        Ok(project_key_pairs)
491    }
492
493    /// Returns an approximate measure of the used size of the database.
494    pub fn usage(&self) -> u64 {
495        self.disk_usage.usage()
496    }
497
498    /// Returns the total count of envelopes stored in the database.
499    pub async fn total_count(&self) -> Result<u64, SqliteEnvelopeStoreError> {
500        let row = build_count_all()
501            .fetch_one(&self.db)
502            .await
503            .map_err(SqliteEnvelopeStoreError::FetchError)?;
504
505        let total_count: i64 = row.get(0);
506        Ok(total_count as u64)
507    }
508}
509
510fn pack_envelopes(envelopes: Vec<DatabaseEnvelope>) -> Box<[u8]> {
511    let mut packed = vec![];
512    for envelope in envelopes {
513        packed.extend_from_slice(&envelope.received_at.to_le_bytes());
514        packed.extend_from_slice(&(envelope.encoded_envelope.len() as u32).to_le_bytes());
515        packed.extend_from_slice(&envelope.encoded_envelope);
516    }
517    packed.into_boxed_slice()
518}
519
520fn unpack_envelopes(
521    own_key: ProjectKey,
522    sampling_key: ProjectKey,
523    data: &[u8],
524) -> Result<Vec<DatabaseEnvelope>, std::io::Error> {
525    let mut envelopes = vec![];
526    let mut buf = data.reader();
527    loop {
528        let mut b = [0u8; 8];
529        match buf.read(&mut b)? {
530            // done:
531            0 => break,
532            // additional trailing bytes:
533            n if n != b.len() => return Err(ErrorKind::UnexpectedEof.into()),
534            _ => {}
535        }
536        let received_at = i64::from_le_bytes(b);
537
538        let mut b = [0u8; 4];
539        buf.read_exact(&mut b)?;
540        let size = u32::from_le_bytes(b);
541
542        let mut b = vec![0u8; size as usize];
543        buf.read_exact(&mut b)?;
544
545        envelopes.push(DatabaseEnvelope {
546            received_at,
547            own_key,
548            sampling_key,
549            encoded_envelope: b.into_boxed_slice(),
550        });
551    }
552    Ok(envelopes)
553}
554
555/// Loads a [`DatabaseEnvelope`] from a database row.
556fn extract_batch(
557    own_key: ProjectKey,
558    sampling_key: ProjectKey,
559    row: SqliteRow,
560) -> Result<DatabaseBatch, SqliteEnvelopeStoreError> {
561    let received_at: i64 = row
562        .try_get("received_at")
563        .map_err(SqliteEnvelopeStoreError::FetchError)?;
564    let data: Box<[u8]> = row
565        .try_get("envelope")
566        .map_err(SqliteEnvelopeStoreError::FetchError)?;
567    let count: u64 = row
568        .try_get("count")
569        .map_err(SqliteEnvelopeStoreError::FetchError)?;
570
571    let envelopes = match count {
572        0 => {
573            debug_assert!(false, "db should not contain empty row");
574            vec![]
575        }
576        1 => vec![DatabaseEnvelope {
577            received_at,
578            own_key,
579            sampling_key,
580            encoded_envelope: data,
581        }],
582        _more => unpack_envelopes(own_key, sampling_key, &data)?,
583    };
584
585    Ok(DatabaseBatch {
586        received_at,
587        own_key,
588        sampling_key,
589        envelopes,
590    })
591}
592
593/// Deserializes a pair of [`ProjectKey`] from the database.
594fn extract_project_key_pair(row: SqliteRow) -> Result<ProjectKeyPair, SqliteEnvelopeStoreError> {
595    let own_key = row
596        .try_get("own_key")
597        .map_err(SqliteEnvelopeStoreError::FetchError)
598        .and_then(|key| {
599            ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
600        });
601    let sampling_key = row
602        .try_get("sampling_key")
603        .map_err(SqliteEnvelopeStoreError::FetchError)
604        .and_then(|key| {
605            ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
606        });
607
608    match (own_key, sampling_key) {
609        (Ok(own_key), Ok(sampling_key)) => Ok(ProjectKeyPair::new(own_key, sampling_key)),
610        // Report the first found error.
611        (Err(err), _) | (_, Err(err)) => {
612            relay_log::error!("failed to extract a queue key from the spool record: {err}");
613
614            Err(err)
615        }
616    }
617}
618
619/// Builds a query that deletes many [`Envelope`] from the database.
620pub fn build_delete_and_fetch_many_envelopes<'a>(
621    own_key: ProjectKey,
622    project_key: ProjectKey,
623) -> Query<'a, Sqlite, SqliteArguments<'a>> {
624    sqlx::query(
625        "DELETE FROM
626            envelopes
627         WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ?
628            ORDER BY received_at DESC LIMIT 1)
629         RETURNING
630            received_at, own_key, sampling_key, envelope, count",
631    )
632    .bind(own_key.to_string())
633    .bind(project_key.to_string())
634}
635
636/// Creates a query which fetches the number of used database pages multiplied by the page size.
637///
638/// This info used to estimate the current allocated database size.
639pub fn build_estimate_size<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
640    sqlx::query(
641        r#"SELECT (page_count - freelist_count) * page_size as size FROM pragma_page_count(), pragma_freelist_count(), pragma_page_size();"#,
642    )
643}
644
645/// Returns the query to select all the unique combinations of own and sampling keys.
646pub fn build_get_project_key_pairs<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
647    sqlx::query("SELECT DISTINCT own_key, sampling_key FROM envelopes;")
648}
649
650/// Returns the query to count the number of envelopes on disk.
651///
652/// Please note that this query is SLOW because SQLite doesn't use any metadata to satisfy it,
653/// meaning that it has to scan through all the rows and count them.
654pub fn build_count_all<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
655    sqlx::query("SELECT SUM(count) FROM envelopes;")
656}
657
658#[cfg(test)]
659mod tests {
660    use std::time::Duration;
661    use tokio::time::sleep;
662
663    use relay_base_schema::project::ProjectKey;
664
665    use super::*;
666    use crate::services::buffer::testutils::utils::{mock_envelopes, setup_db};
667
668    #[tokio::test]
669    async fn test_insert_and_delete_envelopes() {
670        let db = setup_db(true).await;
671        let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
672
673        let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
674        let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
675
676        // We insert 10 envelopes.
677        let batches = [mock_envelopes(5), mock_envelopes(5)];
678        for batch in &batches {
679            assert!(
680                envelope_store
681                    .insert_batch(
682                        batch
683                            .iter()
684                            .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
685                            .collect::<Vec<_>>()
686                            .try_into()
687                            .unwrap()
688                    )
689                    .await
690                    .is_ok()
691            );
692        }
693
694        // We check that if we load 5, we get the newest 5.
695        let batch = envelope_store
696            .delete_batch(own_key, sampling_key)
697            .await
698            .unwrap()
699            .unwrap();
700        assert_eq!(batch.len(), 5);
701        for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
702            assert_eq!(
703                extracted_envelope.received_at().timestamp_millis(),
704                (&batches[1])[i].received_at().timestamp_millis()
705            );
706        }
707
708        // We check that if we load 5 more, we get the oldest 5.
709        let batch = envelope_store
710            .delete_batch(own_key, sampling_key)
711            .await
712            .unwrap()
713            .unwrap();
714        assert_eq!(batch.len(), 5);
715        for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
716            assert_eq!(
717                extracted_envelope.received_at().timestamp_millis(),
718                (&batches[0])[i].received_at().timestamp_millis()
719            );
720        }
721
722        // Store is empty.
723        assert!(
724            envelope_store
725                .delete_batch(own_key, sampling_key)
726                .await
727                .unwrap()
728                .is_none()
729        );
730    }
731
732    #[tokio::test]
733    async fn test_insert_and_delete_single() {
734        let db = setup_db(true).await;
735        let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
736
737        let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
738        let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
739
740        // We insert 10 envelopes.
741        let inserted = mock_envelopes(1);
742
743        assert!(
744            envelope_store
745                .insert_batch(
746                    inserted
747                        .iter()
748                        .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
749                        .collect::<Vec<_>>()
750                        .try_into()
751                        .unwrap()
752                )
753                .await
754                .is_ok()
755        );
756
757        // We check that if we load 5, we get the newest 5.
758        let extracted = envelope_store
759            .delete_batch(own_key, sampling_key)
760            .await
761            .unwrap()
762            .unwrap();
763        assert_eq!(extracted.len(), 1);
764
765        assert_eq!(
766            extracted.envelopes[0].received_at().timestamp_millis(),
767            inserted[0].received_at().timestamp_millis()
768        );
769
770        // Store is empty.
771        assert!(
772            envelope_store
773                .delete_batch(own_key, sampling_key)
774                .await
775                .unwrap()
776                .is_none()
777        );
778    }
779
780    #[tokio::test]
781    async fn test_insert_and_get_project_keys_pairs() {
782        let db = setup_db(true).await;
783        let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
784
785        let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
786        let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
787
788        // We insert 10 envelopes.
789        let envelopes = mock_envelopes(2);
790        envelope_store
791            .insert_batch(
792                envelopes
793                    .into_iter()
794                    .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
795                    .collect::<Vec<_>>()
796                    .try_into()
797                    .unwrap(),
798            )
799            .await
800            .unwrap();
801
802        // We check that we get back only one pair of project keys, since all envelopes have the
803        // same pair.
804        let project_key_pairs = envelope_store.project_key_pairs().await.unwrap();
805        assert_eq!(project_key_pairs.len(), 1);
806        assert_eq!(
807            project_key_pairs.into_iter().last().unwrap(),
808            ProjectKeyPair::new(own_key, sampling_key)
809        );
810    }
811
812    #[tokio::test]
813    async fn test_estimate_disk_usage() {
814        let db = setup_db(true).await;
815        let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
816        let disk_usage = DiskUsage::prepare(0, db, Duration::from_millis(1))
817            .await
818            .unwrap();
819
820        // We read the disk usage without envelopes stored.
821        let usage_1 = disk_usage.usage();
822        assert!(usage_1 > 0);
823
824        // We write 10 envelopes to increase the disk usage.
825        let envelopes = mock_envelopes(10);
826        store
827            .insert_batch(
828                envelopes
829                    .into_iter()
830                    .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
831                    .collect::<Vec<_>>()
832                    .try_into()
833                    .unwrap(),
834            )
835            .await
836            .unwrap();
837
838        // We wait for the refresh timeout of the disk usage task.
839        sleep(Duration::from_millis(2)).await;
840
841        // We now expect to read more disk usage because of the 10 elements.
842        let usage_2 = disk_usage.usage();
843        assert!(usage_2 >= usage_1);
844    }
845
846    #[tokio::test]
847    async fn test_total_count() {
848        let db = setup_db(true).await;
849        let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
850
851        let envelopes = mock_envelopes(10);
852        store
853            .insert_batch(
854                envelopes
855                    .iter()
856                    .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
857                    .collect::<Vec<_>>()
858                    .try_into()
859                    .unwrap(),
860            )
861            .await
862            .unwrap();
863
864        assert_eq!(store.total_count().await.unwrap(), envelopes.len() as u64);
865    }
866}