relay_server/services/buffer/envelope_store/
sqlite.rs

1use std::io::{ErrorKind, Read};
2use std::path::Path;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::envelope::EnvelopeError;
8
9use crate::services::buffer::common::ProjectKeyPair;
10use crate::statsd::{RelayGauges, RelayHistograms, RelayTimers};
11use crate::Envelope;
12use bytes::{Buf, Bytes};
13use chrono::{DateTime, Utc};
14use futures::stream::StreamExt;
15use hashbrown::HashSet;
16use relay_base_schema::project::{ParseProjectKeyError, ProjectKey};
17use relay_config::Config;
18use serde::{Deserialize, Serialize};
19use sqlx::migrate::MigrateError;
20use sqlx::query::Query;
21use sqlx::sqlite::{
22    SqliteArguments, SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions,
23    SqliteRow, SqliteSynchronous,
24};
25use sqlx::{Pool, Row, Sqlite};
26use tokio::fs::DirBuilder;
27use tokio::time::sleep;
28
29/// Fixed first 4 bytes for zstd compressed envelopes.
30///
31/// Used for backward compatibility to check whether an envelope on disk is zstd-encoded.
32const ZSTD_MAGIC_WORD: &[u8] = &[40, 181, 47, 253];
33
34/// Struct that contains all the fields of an [`Envelope`] that are mapped to the database columns.
35#[derive(Clone, Debug, Serialize, Deserialize)]
36pub struct DatabaseEnvelope {
37    received_at: i64,
38    own_key: ProjectKey,
39    sampling_key: ProjectKey,
40    encoded_envelope: Box<[u8]>,
41}
42
43#[derive(Clone, Debug)]
44pub struct DatabaseBatch {
45    received_at: i64,
46    own_key: ProjectKey,
47    sampling_key: ProjectKey,
48    envelopes: Vec<DatabaseEnvelope>,
49}
50
51impl DatabaseBatch {
52    pub fn len(&self) -> usize {
53        self.envelopes.len()
54    }
55}
56
57impl TryFrom<Vec<DatabaseEnvelope>> for DatabaseBatch {
58    type Error = ();
59
60    fn try_from(envelopes: Vec<DatabaseEnvelope>) -> Result<Self, Self::Error> {
61        let Some(last) = envelopes.last() else {
62            return Err(());
63        };
64        Ok(Self {
65            received_at: last.received_at,
66            own_key: last.own_key,
67            sampling_key: last.sampling_key,
68            envelopes,
69        })
70    }
71}
72
73impl From<DatabaseBatch> for Vec<DatabaseEnvelope> {
74    fn from(value: DatabaseBatch) -> Self {
75        value.envelopes
76    }
77}
78
79#[derive(Debug, thiserror::Error)]
80pub enum InsertEnvelopeError {
81    #[error("envelope conversion error: {0}")]
82    Envelope(#[from] EnvelopeError),
83    #[error("compression error: {0}")]
84    Zstd(#[from] std::io::Error),
85}
86
87impl DatabaseEnvelope {
88    // Use the lowest level of compression.
89    //
90    // Experiments showed that level 3 is significantly slower than level 1 while offering
91    // no significant size reduction for our use case.
92    const COMPRESSION_LEVEL: i32 = 1;
93
94    pub fn len(&self) -> usize {
95        self.encoded_envelope.len()
96    }
97
98    pub fn received_at(&self) -> DateTime<Utc> {
99        DateTime::from_timestamp_millis(self.received_at).unwrap_or(Utc::now())
100    }
101}
102
103impl TryFrom<DatabaseEnvelope> for Box<Envelope> {
104    type Error = InsertEnvelopeError;
105
106    fn try_from(value: DatabaseEnvelope) -> Result<Self, Self::Error> {
107        let received_at = value.received_at();
108        let DatabaseEnvelope {
109            received_at: _,
110            own_key,
111            sampling_key,
112            mut encoded_envelope,
113        } = value;
114
115        if encoded_envelope.starts_with(ZSTD_MAGIC_WORD) {
116            relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeDecompression), {
117                encoded_envelope = zstd::decode_all(&*encoded_envelope)?.into_boxed_slice();
118            });
119        }
120
121        let mut envelope = Envelope::parse_bytes(Bytes::from(encoded_envelope))?;
122        debug_assert_eq!(envelope.meta().public_key(), own_key);
123        debug_assert!(envelope
124            .sampling_key()
125            .is_none_or(|key| key == sampling_key));
126
127        envelope.set_received_at(received_at);
128
129        Ok(envelope)
130    }
131}
132
133impl<'a> TryFrom<&'a Envelope> for DatabaseEnvelope {
134    type Error = InsertEnvelopeError;
135
136    fn try_from(value: &'a Envelope) -> Result<Self, Self::Error> {
137        let own_key = value.meta().public_key();
138        let sampling_key = value.sampling_key().unwrap_or(own_key);
139
140        let serialized_envelope = value.to_vec()?;
141        relay_statsd::metric!(
142            histogram(RelayHistograms::BufferEnvelopeSize) = serialized_envelope.len() as u64
143        );
144
145        let encoded_envelope =
146            relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeCompression), {
147                zstd::encode_all(serialized_envelope.as_slice(), Self::COMPRESSION_LEVEL)?
148            });
149        relay_statsd::metric!(
150            histogram(RelayHistograms::BufferEnvelopeSizeCompressed) =
151                encoded_envelope.len() as u64
152        );
153
154        Ok(DatabaseEnvelope {
155            received_at: value.received_at().timestamp_millis(),
156            own_key,
157            sampling_key,
158            encoded_envelope: encoded_envelope.into_boxed_slice(),
159        })
160    }
161}
162
163/// An error returned when doing an operation on [`SqliteEnvelopeStore`].
164#[derive(Debug, thiserror::Error)]
165pub enum SqliteEnvelopeStoreError {
166    #[error("failed to setup the database: {0}")]
167    SqlxSetupFailed(sqlx::Error),
168
169    #[error("failed to create the spool file: {0}")]
170    FileSetupError(std::io::Error),
171
172    #[error("failed to write to disk: {0}")]
173    WriteError(sqlx::Error),
174
175    #[error("failed to read from disk: {0}")]
176    FetchError(sqlx::Error),
177
178    #[error("failed to unpack envelopes: {0}")]
179    UnpackError(#[from] std::io::Error),
180
181    #[error("no file path for the spool was provided")]
182    NoFilePath,
183
184    #[error("failed to migrate the database: {0}")]
185    MigrationError(MigrateError),
186
187    #[error("failed to extract the envelope from the database")]
188    EnvelopeExtractionError,
189
190    #[error("failed to extract a project key from the database")]
191    ProjectKeyExtractionError(#[from] ParseProjectKeyError),
192
193    #[error("failed to get database file size: {0}")]
194    FileSizeReadFailed(sqlx::Error),
195}
196
197#[derive(Debug, Clone)]
198struct DiskUsage {
199    db: Pool<Sqlite>,
200    last_known_usage: Arc<AtomicU64>,
201    refresh_frequency: Duration,
202    partition_tag: String,
203}
204
205impl DiskUsage {
206    /// Creates a new empty [`DiskUsage`].
207    fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
208        Self {
209            db,
210            last_known_usage: Arc::new(AtomicU64::new(0)),
211            refresh_frequency,
212            partition_tag: partition_id.to_string(),
213        }
214    }
215
216    /// Prepares a [`DiskUsage`] instance with an initial reading of the database usage and fails
217    /// if not reading can be made.
218    pub async fn prepare(
219        partition_id: u8,
220        db: Pool<Sqlite>,
221        refresh_frequency: Duration,
222    ) -> Result<Self, SqliteEnvelopeStoreError> {
223        let disk_usage = Self::new(partition_id, db.clone(), refresh_frequency);
224        let usage = Self::estimate_usage(&disk_usage.partition_tag, &db).await?;
225        disk_usage.last_known_usage.store(usage, Ordering::Relaxed);
226        disk_usage.start_background_refresh();
227
228        Ok(disk_usage)
229    }
230
231    /// Returns the disk usage and asynchronously updates it in case a `refresh_frequency_ms`
232    /// elapsed.
233    fn usage(&self) -> u64 {
234        self.last_known_usage.load(Ordering::Relaxed)
235    }
236
237    /// Starts a background tokio task to update the database usage.
238    fn start_background_refresh(&self) {
239        let db = self.db.clone();
240        // We get a weak reference, to make sure that if `DiskUsage` is dropped, the reference can't
241        // be upgraded, causing the loop in the tokio task to exit.
242        let last_known_usage_weak = Arc::downgrade(&self.last_known_usage);
243        let refresh_frequency = self.refresh_frequency;
244
245        let partition_tag = self.partition_tag.clone();
246        relay_system::spawn!(async move {
247            loop {
248                // When our `Weak` reference can't be upgraded to an `Arc`, it means that the value
249                // is not referenced anymore by self, meaning that `DiskUsage` was dropped.
250                let Some(last_known_usage) = last_known_usage_weak.upgrade() else {
251                    break;
252                };
253
254                let usage = Self::estimate_usage(&partition_tag, &db).await;
255                let Ok(usage) = usage else {
256                    relay_log::error!("failed to update the disk usage asynchronously");
257                    return;
258                };
259
260                let current = last_known_usage.load(Ordering::Relaxed);
261                if last_known_usage
262                    .compare_exchange_weak(current, usage, Ordering::Relaxed, Ordering::Relaxed)
263                    .is_err()
264                {
265                    relay_log::error!("failed to update the disk usage asynchronously");
266                };
267
268                sleep(refresh_frequency).await;
269            }
270        });
271    }
272
273    /// Estimates the disk usage of the SQLite database.
274    async fn estimate_usage(
275        partition_tag: &str,
276        db: &Pool<Sqlite>,
277    ) -> Result<u64, SqliteEnvelopeStoreError> {
278        let usage: i64 = build_estimate_size()
279            .fetch_one(db)
280            .await
281            .and_then(|r| r.try_get(0))
282            .map_err(SqliteEnvelopeStoreError::FileSizeReadFailed)?;
283
284        relay_statsd::metric!(
285            gauge(RelayGauges::BufferDiskUsed) = usage as u64,
286            partition_id = partition_tag
287        );
288
289        Ok(usage as u64)
290    }
291}
292
293/// Struct that offers access to a SQLite-based store of [`Envelope`]s.
294///
295/// The goal of this struct is to hide away all the complexity of dealing with the database for
296/// reading and writing envelopes.
297#[derive(Debug, Clone)]
298pub struct SqliteEnvelopeStore {
299    db: Pool<Sqlite>,
300    disk_usage: DiskUsage,
301    partition_tag: String,
302}
303
304impl SqliteEnvelopeStore {
305    /// Initializes the [`SqliteEnvelopeStore`] with a supplied [`Pool`].
306    pub fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
307        Self {
308            db: db.clone(),
309            disk_usage: DiskUsage::new(partition_id, db, refresh_frequency),
310            partition_tag: partition_id.to_string(),
311        }
312    }
313
314    /// Prepares the [`SqliteEnvelopeStore`] by running all the necessary migrations and preparing
315    /// the folders where data will be stored.
316    pub async fn prepare(
317        partition_id: u8,
318        config: &Config,
319    ) -> Result<SqliteEnvelopeStore, SqliteEnvelopeStoreError> {
320        // If no path is provided, we can't do disk spooling.
321        let Some(path) = config.spool_envelopes_path(partition_id) else {
322            return Err(SqliteEnvelopeStoreError::NoFilePath);
323        };
324
325        relay_log::info!("buffer file {}", path.to_string_lossy());
326
327        Self::setup(&path).await?;
328
329        let options = SqliteConnectOptions::new()
330            .filename(&path)
331            // The WAL journaling mode uses a write-ahead log instead of a rollback journal to implement transactions.
332            // The WAL journaling mode is persistent; after being set it stays in effect
333            // across multiple database connections and after closing and reopening the database.
334            //
335            // 1. WAL is significantly faster in most scenarios.
336            // 2. WAL provides more concurrency as readers do not block writers and a writer does not block readers. Reading and writing can proceed concurrently.
337            // 3. Disk I/O operations tends to be more sequential using WAL.
338            // 4. WAL uses many fewer fsync() operations and is thus less vulnerable to problems on systems where the fsync() system call is broken.
339            .journal_mode(SqliteJournalMode::Wal)
340            // WAL mode is safe from corruption with synchronous=NORMAL.
341            // When synchronous is NORMAL, the SQLite database engine will still sync at the most critical moments, but less often than in FULL mode.
342            // Which guarantees good balance between safety and speed.
343            .synchronous(SqliteSynchronous::Normal)
344            // The freelist pages are moved to the end of the database file and the database file is truncated to remove the freelist pages at every
345            // transaction commit. Note, however, that auto-vacuum only truncates the freelist pages from the file.
346            // Auto-vacuum does not de-fragment the database nor repack individual database pages the way that the VACUUM command does.
347            //
348            // This will help us to keep the file size under some control.
349            .auto_vacuum(SqliteAutoVacuum::Full)
350            // If shared-cache mode is enabled and a thread establishes multiple
351            // connections to the same database, the connections share a single data and schema cache.
352            // This can significantly reduce the quantity of memory and IO required by the system.
353            .shared_cache(true);
354
355        let db = SqlitePoolOptions::new()
356            .max_connections(1)
357            .min_connections(1)
358            .connect_with(options)
359            .await
360            .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
361
362        Ok(SqliteEnvelopeStore {
363            db: db.clone(),
364            disk_usage: DiskUsage::prepare(
365                partition_id,
366                db,
367                config.spool_disk_usage_refresh_frequency_ms(),
368            )
369            .await?,
370            partition_tag: partition_id.to_string(),
371        })
372    }
373
374    /// Set up the database and return the current number of envelopes.
375    ///
376    /// The directories and spool file will be created if they don't already
377    /// exist.
378    async fn setup(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
379        Self::create_spool_directory(path).await?;
380
381        let options = SqliteConnectOptions::new()
382            .filename(path)
383            .journal_mode(SqliteJournalMode::Wal)
384            .create_if_missing(true);
385
386        let db = SqlitePoolOptions::new()
387            .connect_with(options)
388            .await
389            .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
390
391        sqlx::migrate!("../migrations")
392            .run(&db)
393            .await
394            .map_err(SqliteEnvelopeStoreError::MigrationError)?;
395
396        Ok(())
397    }
398
399    /// Creates the directories for the spool file.
400    async fn create_spool_directory(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
401        let Some(parent) = path.parent() else {
402            return Ok(());
403        };
404
405        if !parent.as_os_str().is_empty() && !parent.exists() {
406            relay_log::debug!("creating directory for spooling file: {}", parent.display());
407            DirBuilder::new()
408                .recursive(true)
409                .create(&parent)
410                .await
411                .map_err(SqliteEnvelopeStoreError::FileSetupError)?;
412        }
413
414        Ok(())
415    }
416
417    /// Inserts one or more envelopes into the database.
418    pub async fn insert_batch(
419        &mut self,
420        envelopes: DatabaseBatch,
421    ) -> Result<(), SqliteEnvelopeStoreError> {
422        let DatabaseBatch {
423            received_at,
424            own_key,
425            sampling_key,
426            envelopes,
427        } = envelopes;
428
429        let count = envelopes.len();
430        let encoded = match count {
431            0 => {
432                debug_assert!(false, "should not be called with empty batch");
433                return Ok(());
434            }
435            // special-casing single envelopes shaves off a little bit of time for large envelopes,
436            // but it's mainly for backward compatibility.
437            1 => envelopes.into_iter().next().unwrap().encoded_envelope,
438            _more => pack_envelopes(envelopes),
439        };
440
441        let query = sqlx::query("INSERT INTO envelopes (received_at, own_key, sampling_key, count, envelope) VALUES (?, ?, ?, ?, ?);")
442            .bind(received_at)
443            .bind(own_key.as_str())
444            .bind(sampling_key.as_str())
445            .bind(count as u16)
446            .bind(encoded);
447
448        relay_statsd::metric!(
449            timer(RelayTimers::BufferSqlWrite),
450            partition_id = &self.partition_tag,
451            {
452                query
453                    .execute(&self.db)
454                    .await
455                    .map_err(SqliteEnvelopeStoreError::WriteError)?;
456            }
457        );
458        Ok(())
459    }
460
461    /// Deletes and returns at most `limit` [`Envelope`]s from the database.
462    pub async fn delete_batch(
463        &mut self,
464        own_key: ProjectKey,
465        sampling_key: ProjectKey,
466    ) -> Result<Option<DatabaseBatch>, SqliteEnvelopeStoreError> {
467        let mut rows = build_delete_and_fetch_many_envelopes(own_key, sampling_key).fetch(&self.db);
468        let Some(row) = rows.as_mut().next().await else {
469            return Ok(None);
470        };
471        let row = row.map_err(SqliteEnvelopeStoreError::FetchError)?;
472
473        Ok(Some(extract_batch(own_key, sampling_key, row)?))
474    }
475
476    /// Returns a set of project key pairs, representing all the unique combinations of
477    /// `own_key` and `project_key` that are found in the database.
478    pub async fn project_key_pairs(
479        &self,
480    ) -> Result<HashSet<ProjectKeyPair>, SqliteEnvelopeStoreError> {
481        let project_key_pairs = build_get_project_key_pairs()
482            .fetch_all(&self.db)
483            .await
484            .map_err(SqliteEnvelopeStoreError::FetchError)?;
485
486        let project_key_pairs = project_key_pairs
487            .into_iter()
488            // Collect only keys we can extract.
489            .filter_map(|project_key_pair| extract_project_key_pair(project_key_pair).ok())
490            .collect();
491
492        Ok(project_key_pairs)
493    }
494
495    /// Returns an approximate measure of the used size of the database.
496    pub fn usage(&self) -> u64 {
497        self.disk_usage.usage()
498    }
499
500    /// Returns the total count of envelopes stored in the database.
501    pub async fn total_count(&self) -> Result<u64, SqliteEnvelopeStoreError> {
502        let row = build_count_all()
503            .fetch_one(&self.db)
504            .await
505            .map_err(SqliteEnvelopeStoreError::FetchError)?;
506
507        let total_count: i64 = row.get(0);
508        Ok(total_count as u64)
509    }
510}
511
512fn pack_envelopes(envelopes: Vec<DatabaseEnvelope>) -> Box<[u8]> {
513    let mut packed = vec![];
514    for envelope in envelopes {
515        packed.extend_from_slice(&envelope.received_at.to_le_bytes());
516        packed.extend_from_slice(&(envelope.encoded_envelope.len() as u32).to_le_bytes());
517        packed.extend_from_slice(&envelope.encoded_envelope);
518    }
519    packed.into_boxed_slice()
520}
521
522fn unpack_envelopes(
523    own_key: ProjectKey,
524    sampling_key: ProjectKey,
525    data: &[u8],
526) -> Result<Vec<DatabaseEnvelope>, std::io::Error> {
527    let mut envelopes = vec![];
528    let mut buf = data.reader();
529    loop {
530        let mut b = [0u8; 8];
531        match buf.read(&mut b)? {
532            // done:
533            0 => break,
534            // additional trailing bytes:
535            n if n != b.len() => return Err(ErrorKind::UnexpectedEof.into()),
536            _ => {}
537        }
538        let received_at = i64::from_le_bytes(b);
539
540        let mut b = [0u8; 4];
541        buf.read_exact(&mut b)?;
542        let size = u32::from_le_bytes(b);
543
544        let mut b = vec![0u8; size as usize];
545        buf.read_exact(&mut b)?;
546
547        envelopes.push(DatabaseEnvelope {
548            received_at,
549            own_key,
550            sampling_key,
551            encoded_envelope: b.into_boxed_slice(),
552        });
553    }
554    Ok(envelopes)
555}
556
557/// Loads a [`DatabaseEnvelope`] from a database row.
558fn extract_batch(
559    own_key: ProjectKey,
560    sampling_key: ProjectKey,
561    row: SqliteRow,
562) -> Result<DatabaseBatch, SqliteEnvelopeStoreError> {
563    let received_at: i64 = row
564        .try_get("received_at")
565        .map_err(SqliteEnvelopeStoreError::FetchError)?;
566    let data: Box<[u8]> = row
567        .try_get("envelope")
568        .map_err(SqliteEnvelopeStoreError::FetchError)?;
569    let count: u64 = row
570        .try_get("count")
571        .map_err(SqliteEnvelopeStoreError::FetchError)?;
572
573    let envelopes = match count {
574        0 => {
575            debug_assert!(false, "db should not contain empty row");
576            vec![]
577        }
578        1 => vec![DatabaseEnvelope {
579            received_at,
580            own_key,
581            sampling_key,
582            encoded_envelope: data,
583        }],
584        _more => unpack_envelopes(own_key, sampling_key, &data)?,
585    };
586
587    Ok(DatabaseBatch {
588        received_at,
589        own_key,
590        sampling_key,
591        envelopes,
592    })
593}
594
595/// Deserializes a pair of [`ProjectKey`] from the database.
596fn extract_project_key_pair(row: SqliteRow) -> Result<ProjectKeyPair, SqliteEnvelopeStoreError> {
597    let own_key = row
598        .try_get("own_key")
599        .map_err(SqliteEnvelopeStoreError::FetchError)
600        .and_then(|key| {
601            ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
602        });
603    let sampling_key = row
604        .try_get("sampling_key")
605        .map_err(SqliteEnvelopeStoreError::FetchError)
606        .and_then(|key| {
607            ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
608        });
609
610    match (own_key, sampling_key) {
611        (Ok(own_key), Ok(sampling_key)) => Ok(ProjectKeyPair::new(own_key, sampling_key)),
612        // Report the first found error.
613        (Err(err), _) | (_, Err(err)) => {
614            relay_log::error!("failed to extract a queue key from the spool record: {err}");
615
616            Err(err)
617        }
618    }
619}
620
621/// Builds a query that deletes many [`Envelope`] from the database.
622pub fn build_delete_and_fetch_many_envelopes<'a>(
623    own_key: ProjectKey,
624    project_key: ProjectKey,
625) -> Query<'a, Sqlite, SqliteArguments<'a>> {
626    sqlx::query(
627        "DELETE FROM
628            envelopes
629         WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ?
630            ORDER BY received_at DESC LIMIT 1)
631         RETURNING
632            received_at, own_key, sampling_key, envelope, count",
633    )
634    .bind(own_key.to_string())
635    .bind(project_key.to_string())
636}
637
638/// Creates a query which fetches the number of used database pages multiplied by the page size.
639///
640/// This info used to estimate the current allocated database size.
641pub fn build_estimate_size<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
642    sqlx::query(
643        r#"SELECT (page_count - freelist_count) * page_size as size FROM pragma_page_count(), pragma_freelist_count(), pragma_page_size();"#,
644    )
645}
646
647/// Returns the query to select all the unique combinations of own and sampling keys.
648pub fn build_get_project_key_pairs<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
649    sqlx::query("SELECT DISTINCT own_key, sampling_key FROM envelopes;")
650}
651
652/// Returns the query to count the number of envelopes on disk.
653///
654/// Please note that this query is SLOW because SQLite doesn't use any metadata to satisfy it,
655/// meaning that it has to scan through all the rows and count them.
656pub fn build_count_all<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
657    sqlx::query("SELECT SUM(count) FROM envelopes;")
658}
659
660#[cfg(test)]
661mod tests {
662    use std::time::Duration;
663    use tokio::time::sleep;
664
665    use relay_base_schema::project::ProjectKey;
666
667    use super::*;
668    use crate::services::buffer::testutils::utils::{mock_envelopes, setup_db};
669
670    #[tokio::test]
671    async fn test_insert_and_delete_envelopes() {
672        let db = setup_db(true).await;
673        let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
674
675        let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
676        let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
677
678        // We insert 10 envelopes.
679        let batches = [mock_envelopes(5), mock_envelopes(5)];
680        for batch in &batches {
681            assert!(envelope_store
682                .insert_batch(
683                    batch
684                        .iter()
685                        .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
686                        .collect::<Vec<_>>()
687                        .try_into()
688                        .unwrap()
689                )
690                .await
691                .is_ok());
692        }
693
694        // We check that if we load 5, we get the newest 5.
695        let batch = envelope_store
696            .delete_batch(own_key, sampling_key)
697            .await
698            .unwrap()
699            .unwrap();
700        assert_eq!(batch.len(), 5);
701        for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
702            assert_eq!(
703                extracted_envelope.received_at().timestamp_millis(),
704                (&batches[1])[i].received_at().timestamp_millis()
705            );
706        }
707
708        // We check that if we load 5 more, we get the oldest 5.
709        let batch = envelope_store
710            .delete_batch(own_key, sampling_key)
711            .await
712            .unwrap()
713            .unwrap();
714        assert_eq!(batch.len(), 5);
715        for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
716            assert_eq!(
717                extracted_envelope.received_at().timestamp_millis(),
718                (&batches[0])[i].received_at().timestamp_millis()
719            );
720        }
721
722        // Store is empty.
723        assert!(envelope_store
724            .delete_batch(own_key, sampling_key)
725            .await
726            .unwrap()
727            .is_none());
728    }
729
730    #[tokio::test]
731    async fn test_insert_and_delete_single() {
732        let db = setup_db(true).await;
733        let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
734
735        let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
736        let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
737
738        // We insert 10 envelopes.
739        let inserted = mock_envelopes(1);
740
741        assert!(envelope_store
742            .insert_batch(
743                inserted
744                    .iter()
745                    .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
746                    .collect::<Vec<_>>()
747                    .try_into()
748                    .unwrap()
749            )
750            .await
751            .is_ok());
752
753        // We check that if we load 5, we get the newest 5.
754        let extracted = envelope_store
755            .delete_batch(own_key, sampling_key)
756            .await
757            .unwrap()
758            .unwrap();
759        assert_eq!(extracted.len(), 1);
760
761        assert_eq!(
762            extracted.envelopes[0].received_at().timestamp_millis(),
763            inserted[0].received_at().timestamp_millis()
764        );
765
766        // Store is empty.
767        assert!(envelope_store
768            .delete_batch(own_key, sampling_key)
769            .await
770            .unwrap()
771            .is_none());
772    }
773
774    #[tokio::test]
775    async fn test_insert_and_get_project_keys_pairs() {
776        let db = setup_db(true).await;
777        let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
778
779        let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
780        let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
781
782        // We insert 10 envelopes.
783        let envelopes = mock_envelopes(2);
784        envelope_store
785            .insert_batch(
786                envelopes
787                    .into_iter()
788                    .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
789                    .collect::<Vec<_>>()
790                    .try_into()
791                    .unwrap(),
792            )
793            .await
794            .unwrap();
795
796        // We check that we get back only one pair of project keys, since all envelopes have the
797        // same pair.
798        let project_key_pairs = envelope_store.project_key_pairs().await.unwrap();
799        assert_eq!(project_key_pairs.len(), 1);
800        assert_eq!(
801            project_key_pairs.into_iter().last().unwrap(),
802            ProjectKeyPair::new(own_key, sampling_key)
803        );
804    }
805
806    #[tokio::test]
807    async fn test_estimate_disk_usage() {
808        let db = setup_db(true).await;
809        let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
810        let disk_usage = DiskUsage::prepare(0, db, Duration::from_millis(1))
811            .await
812            .unwrap();
813
814        // We read the disk usage without envelopes stored.
815        let usage_1 = disk_usage.usage();
816        assert!(usage_1 > 0);
817
818        // We write 10 envelopes to increase the disk usage.
819        let envelopes = mock_envelopes(10);
820        store
821            .insert_batch(
822                envelopes
823                    .into_iter()
824                    .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
825                    .collect::<Vec<_>>()
826                    .try_into()
827                    .unwrap(),
828            )
829            .await
830            .unwrap();
831
832        // We wait for the refresh timeout of the disk usage task.
833        sleep(Duration::from_millis(2)).await;
834
835        // We now expect to read more disk usage because of the 10 elements.
836        let usage_2 = disk_usage.usage();
837        assert!(usage_2 >= usage_1);
838    }
839
840    #[tokio::test]
841    async fn test_total_count() {
842        let db = setup_db(true).await;
843        let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
844
845        let envelopes = mock_envelopes(10);
846        store
847            .insert_batch(
848                envelopes
849                    .iter()
850                    .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
851                    .collect::<Vec<_>>()
852                    .try_into()
853                    .unwrap(),
854            )
855            .await
856            .unwrap();
857
858        assert_eq!(store.total_count().await.unwrap(), envelopes.len() as u64);
859    }
860}