relay_server/services/buffer/envelope_store/
sqlite.rs

1use std::io::{ErrorKind, Read};
2use std::path::Path;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::time::Duration;
6
7use crate::envelope::EnvelopeError;
8
9use crate::Envelope;
10use crate::services::buffer::common::ProjectKeyPair;
11use crate::statsd::{RelayGauges, RelayHistograms, RelayTimers};
12use bytes::{Buf, Bytes};
13use chrono::{DateTime, Utc};
14use futures::stream::StreamExt;
15use hashbrown::HashSet;
16use relay_base_schema::project::{ParseProjectKeyError, ProjectKey};
17use relay_config::Config;
18use serde::{Deserialize, Serialize};
19use sqlx::migrate::MigrateError;
20use sqlx::query::Query;
21use sqlx::sqlite::{
22    SqliteArguments, SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions,
23    SqliteRow, SqliteSynchronous,
24};
25use sqlx::{Pool, Row, Sqlite};
26use tokio::fs::DirBuilder;
27use tokio::time::sleep;
28
29/// Fixed first 4 bytes for zstd compressed envelopes.
30///
31/// Used for backward compatibility to check whether an envelope on disk is zstd-encoded.
32const ZSTD_MAGIC_WORD: &[u8] = &[40, 181, 47, 253];
33
34/// Struct that contains all the fields of an [`Envelope`] that are mapped to the database columns.
35#[derive(Clone, Debug, Serialize, Deserialize)]
36pub struct DatabaseEnvelope {
37    received_at: i64,
38    own_key: ProjectKey,
39    sampling_key: ProjectKey,
40    encoded_envelope: Box<[u8]>,
41}
42
43#[derive(Clone, Debug)]
44pub struct DatabaseBatch {
45    received_at: i64,
46    own_key: ProjectKey,
47    sampling_key: ProjectKey,
48    envelopes: Vec<DatabaseEnvelope>,
49}
50
51impl DatabaseBatch {
52    pub fn len(&self) -> usize {
53        self.envelopes.len()
54    }
55}
56
57impl TryFrom<Vec<DatabaseEnvelope>> for DatabaseBatch {
58    type Error = ();
59
60    fn try_from(envelopes: Vec<DatabaseEnvelope>) -> Result<Self, Self::Error> {
61        let Some(last) = envelopes.last() else {
62            return Err(());
63        };
64        Ok(Self {
65            received_at: last.received_at,
66            own_key: last.own_key,
67            sampling_key: last.sampling_key,
68            envelopes,
69        })
70    }
71}
72
73impl From<DatabaseBatch> for Vec<DatabaseEnvelope> {
74    fn from(value: DatabaseBatch) -> Self {
75        value.envelopes
76    }
77}
78
79#[derive(Debug, thiserror::Error)]
80pub enum InsertEnvelopeError {
81    #[error("envelope conversion error: {0}")]
82    Envelope(#[from] EnvelopeError),
83    #[error("compression error: {0}")]
84    Zstd(#[from] std::io::Error),
85}
86
87impl DatabaseEnvelope {
88    // Use the lowest level of compression.
89    //
90    // Experiments showed that level 3 is significantly slower than level 1 while offering
91    // no significant size reduction for our use case.
92    const COMPRESSION_LEVEL: i32 = 1;
93
94    pub fn len(&self) -> usize {
95        self.encoded_envelope.len()
96    }
97
98    pub fn received_at(&self) -> DateTime<Utc> {
99        DateTime::from_timestamp_millis(self.received_at).unwrap_or(Utc::now())
100    }
101}
102
103impl TryFrom<DatabaseEnvelope> for Box<Envelope> {
104    type Error = InsertEnvelopeError;
105
106    fn try_from(value: DatabaseEnvelope) -> Result<Self, Self::Error> {
107        let received_at = value.received_at();
108        let DatabaseEnvelope {
109            received_at: _,
110            own_key,
111            sampling_key,
112            mut encoded_envelope,
113        } = value;
114
115        if encoded_envelope.starts_with(ZSTD_MAGIC_WORD) {
116            relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeDecompression), {
117                encoded_envelope = zstd::decode_all(&*encoded_envelope)?.into_boxed_slice();
118            });
119        }
120
121        let mut envelope = Envelope::parse_bytes(Bytes::from(encoded_envelope))?;
122        debug_assert_eq!(envelope.meta().public_key(), own_key);
123        debug_assert!(
124            envelope
125                .sampling_key()
126                .is_none_or(|key| key == sampling_key)
127        );
128
129        envelope.set_received_at(received_at);
130
131        Ok(envelope)
132    }
133}
134
135impl<'a> TryFrom<&'a Envelope> for DatabaseEnvelope {
136    type Error = InsertEnvelopeError;
137
138    fn try_from(value: &'a Envelope) -> Result<Self, Self::Error> {
139        let own_key = value.meta().public_key();
140        let sampling_key = value.sampling_key().unwrap_or(own_key);
141
142        let serialized_envelope = value.to_vec()?;
143        relay_statsd::metric!(
144            histogram(RelayHistograms::BufferEnvelopeSize) = serialized_envelope.len() as u64
145        );
146
147        let encoded_envelope =
148            relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeCompression), {
149                zstd::encode_all(serialized_envelope.as_slice(), Self::COMPRESSION_LEVEL)?
150            });
151        relay_statsd::metric!(
152            histogram(RelayHistograms::BufferEnvelopeSizeCompressed) =
153                encoded_envelope.len() as u64
154        );
155
156        Ok(DatabaseEnvelope {
157            received_at: value.received_at().timestamp_millis(),
158            own_key,
159            sampling_key,
160            encoded_envelope: encoded_envelope.into_boxed_slice(),
161        })
162    }
163}
164
165/// An error returned when doing an operation on [`SqliteEnvelopeStore`].
166#[derive(Debug, thiserror::Error)]
167pub enum SqliteEnvelopeStoreError {
168    #[error("failed to setup the database: {0}")]
169    SqlxSetupFailed(sqlx::Error),
170
171    #[error("failed to create the spool file: {0}")]
172    FileSetupError(std::io::Error),
173
174    #[error("failed to write to disk: {0}")]
175    WriteError(sqlx::Error),
176
177    #[error("failed to read from disk: {0}")]
178    FetchError(sqlx::Error),
179
180    #[error("failed to unpack envelopes: {0}")]
181    UnpackError(#[from] std::io::Error),
182
183    #[error("no file path for the spool was provided")]
184    NoFilePath,
185
186    #[error("failed to migrate the database: {0}")]
187    MigrationError(MigrateError),
188
189    #[error("failed to extract the envelope from the database")]
190    EnvelopeExtractionError,
191
192    #[error("failed to extract a project key from the database")]
193    ProjectKeyExtractionError(#[from] ParseProjectKeyError),
194
195    #[error("failed to get database file size: {0}")]
196    FileSizeReadFailed(sqlx::Error),
197}
198
199#[derive(Debug, Clone)]
200struct DiskUsage {
201    db: Pool<Sqlite>,
202    last_known_usage: Arc<AtomicU64>,
203    refresh_frequency: Duration,
204    partition_tag: String,
205}
206
207impl DiskUsage {
208    /// Creates a new empty [`DiskUsage`].
209    fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
210        Self {
211            db,
212            last_known_usage: Arc::new(AtomicU64::new(0)),
213            refresh_frequency,
214            partition_tag: partition_id.to_string(),
215        }
216    }
217
218    /// Prepares a [`DiskUsage`] instance with an initial reading of the database usage and fails
219    /// if not reading can be made.
220    pub async fn prepare(
221        partition_id: u8,
222        db: Pool<Sqlite>,
223        refresh_frequency: Duration,
224    ) -> Result<Self, SqliteEnvelopeStoreError> {
225        let disk_usage = Self::new(partition_id, db.clone(), refresh_frequency);
226        let usage = Self::estimate_usage(&disk_usage.partition_tag, &db).await?;
227        disk_usage.last_known_usage.store(usage, Ordering::Relaxed);
228        disk_usage.start_background_refresh();
229
230        Ok(disk_usage)
231    }
232
233    /// Returns the disk usage and asynchronously updates it in case a `refresh_frequency_ms`
234    /// elapsed.
235    fn usage(&self) -> u64 {
236        self.last_known_usage.load(Ordering::Relaxed)
237    }
238
239    /// Starts a background tokio task to update the database usage.
240    fn start_background_refresh(&self) {
241        let db = self.db.clone();
242        // We get a weak reference, to make sure that if `DiskUsage` is dropped, the reference can't
243        // be upgraded, causing the loop in the tokio task to exit.
244        let last_known_usage_weak = Arc::downgrade(&self.last_known_usage);
245        let refresh_frequency = self.refresh_frequency;
246
247        let partition_tag = self.partition_tag.clone();
248        relay_system::spawn!(async move {
249            loop {
250                // When our `Weak` reference can't be upgraded to an `Arc`, it means that the value
251                // is not referenced anymore by self, meaning that `DiskUsage` was dropped.
252                let Some(last_known_usage) = last_known_usage_weak.upgrade() else {
253                    break;
254                };
255
256                let usage = Self::estimate_usage(&partition_tag, &db).await;
257                let Ok(usage) = usage else {
258                    relay_log::error!("failed to update the disk usage asynchronously");
259                    return;
260                };
261
262                let current = last_known_usage.load(Ordering::Relaxed);
263                if last_known_usage
264                    .compare_exchange_weak(current, usage, Ordering::Relaxed, Ordering::Relaxed)
265                    .is_err()
266                {
267                    relay_log::error!("failed to update the disk usage asynchronously");
268                };
269
270                sleep(refresh_frequency).await;
271            }
272        });
273    }
274
275    /// Estimates the disk usage of the SQLite database.
276    async fn estimate_usage(
277        partition_tag: &str,
278        db: &Pool<Sqlite>,
279    ) -> Result<u64, SqliteEnvelopeStoreError> {
280        let usage: i64 = build_estimate_size()
281            .fetch_one(db)
282            .await
283            .and_then(|r| r.try_get(0))
284            .map_err(SqliteEnvelopeStoreError::FileSizeReadFailed)?;
285
286        relay_statsd::metric!(
287            gauge(RelayGauges::BufferDiskUsed) = usage as u64,
288            partition_id = partition_tag
289        );
290
291        Ok(usage as u64)
292    }
293}
294
295/// Struct that offers access to a SQLite-based store of [`Envelope`]s.
296///
297/// The goal of this struct is to hide away all the complexity of dealing with the database for
298/// reading and writing envelopes.
299#[derive(Debug, Clone)]
300pub struct SqliteEnvelopeStore {
301    db: Pool<Sqlite>,
302    disk_usage: DiskUsage,
303    partition_tag: String,
304}
305
306impl SqliteEnvelopeStore {
307    /// Initializes the [`SqliteEnvelopeStore`] with a supplied [`Pool`].
308    pub fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
309        Self {
310            db: db.clone(),
311            disk_usage: DiskUsage::new(partition_id, db, refresh_frequency),
312            partition_tag: partition_id.to_string(),
313        }
314    }
315
316    /// Prepares the [`SqliteEnvelopeStore`] by running all the necessary migrations and preparing
317    /// the folders where data will be stored.
318    pub async fn prepare(
319        partition_id: u8,
320        config: &Config,
321    ) -> Result<SqliteEnvelopeStore, SqliteEnvelopeStoreError> {
322        // If no path is provided, we can't do disk spooling.
323        let Some(path) = config.spool_envelopes_path(partition_id) else {
324            return Err(SqliteEnvelopeStoreError::NoFilePath);
325        };
326
327        relay_log::info!("buffer file {}", path.to_string_lossy());
328
329        Self::setup(&path).await?;
330
331        let options = SqliteConnectOptions::new()
332            .filename(&path)
333            // The WAL journaling mode uses a write-ahead log instead of a rollback journal to implement transactions.
334            // The WAL journaling mode is persistent; after being set it stays in effect
335            // across multiple database connections and after closing and reopening the database.
336            //
337            // 1. WAL is significantly faster in most scenarios.
338            // 2. WAL provides more concurrency as readers do not block writers and a writer does not block readers. Reading and writing can proceed concurrently.
339            // 3. Disk I/O operations tends to be more sequential using WAL.
340            // 4. WAL uses many fewer fsync() operations and is thus less vulnerable to problems on systems where the fsync() system call is broken.
341            .journal_mode(SqliteJournalMode::Wal)
342            // WAL mode is safe from corruption with synchronous=NORMAL.
343            // When synchronous is NORMAL, the SQLite database engine will still sync at the most critical moments, but less often than in FULL mode.
344            // Which guarantees good balance between safety and speed.
345            .synchronous(SqliteSynchronous::Normal)
346            // The freelist pages are moved to the end of the database file and the database file is truncated to remove the freelist pages at every
347            // transaction commit. Note, however, that auto-vacuum only truncates the freelist pages from the file.
348            // Auto-vacuum does not de-fragment the database nor repack individual database pages the way that the VACUUM command does.
349            //
350            // This will help us to keep the file size under some control.
351            .auto_vacuum(SqliteAutoVacuum::Full)
352            // If shared-cache mode is enabled and a thread establishes multiple
353            // connections to the same database, the connections share a single data and schema cache.
354            // This can significantly reduce the quantity of memory and IO required by the system.
355            .shared_cache(true);
356
357        let db = SqlitePoolOptions::new()
358            .max_connections(1)
359            .min_connections(1)
360            .connect_with(options)
361            .await
362            .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
363
364        Ok(SqliteEnvelopeStore {
365            db: db.clone(),
366            disk_usage: DiskUsage::prepare(
367                partition_id,
368                db,
369                config.spool_disk_usage_refresh_frequency_ms(),
370            )
371            .await?,
372            partition_tag: partition_id.to_string(),
373        })
374    }
375
376    /// Set up the database and return the current number of envelopes.
377    ///
378    /// The directories and spool file will be created if they don't already
379    /// exist.
380    async fn setup(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
381        Self::create_spool_directory(path).await?;
382
383        let options = SqliteConnectOptions::new()
384            .filename(path)
385            .journal_mode(SqliteJournalMode::Wal)
386            .create_if_missing(true);
387
388        let db = SqlitePoolOptions::new()
389            .connect_with(options)
390            .await
391            .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
392
393        sqlx::migrate!("../migrations")
394            .run(&db)
395            .await
396            .map_err(SqliteEnvelopeStoreError::MigrationError)?;
397
398        Ok(())
399    }
400
401    /// Creates the directories for the spool file.
402    async fn create_spool_directory(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
403        let Some(parent) = path.parent() else {
404            return Ok(());
405        };
406
407        if !parent.as_os_str().is_empty() && !parent.exists() {
408            relay_log::debug!("creating directory for spooling file: {}", parent.display());
409            DirBuilder::new()
410                .recursive(true)
411                .create(&parent)
412                .await
413                .map_err(SqliteEnvelopeStoreError::FileSetupError)?;
414        }
415
416        Ok(())
417    }
418
419    /// Inserts one or more envelopes into the database.
420    pub async fn insert_batch(
421        &mut self,
422        envelopes: DatabaseBatch,
423    ) -> Result<(), SqliteEnvelopeStoreError> {
424        let DatabaseBatch {
425            received_at,
426            own_key,
427            sampling_key,
428            envelopes,
429        } = envelopes;
430
431        let count = envelopes.len();
432        let encoded = match count {
433            0 => {
434                debug_assert!(false, "should not be called with empty batch");
435                return Ok(());
436            }
437            // special-casing single envelopes shaves off a little bit of time for large envelopes,
438            // but it's mainly for backward compatibility.
439            1 => envelopes.into_iter().next().unwrap().encoded_envelope,
440            _more => pack_envelopes(envelopes),
441        };
442
443        let query = sqlx::query("INSERT INTO envelopes (received_at, own_key, sampling_key, count, envelope) VALUES (?, ?, ?, ?, ?);")
444            .bind(received_at)
445            .bind(own_key.as_str())
446            .bind(sampling_key.as_str())
447            .bind(count as u16)
448            .bind(encoded);
449
450        relay_statsd::metric!(
451            timer(RelayTimers::BufferSqlWrite),
452            partition_id = &self.partition_tag,
453            {
454                query
455                    .execute(&self.db)
456                    .await
457                    .map_err(SqliteEnvelopeStoreError::WriteError)?;
458            }
459        );
460        Ok(())
461    }
462
463    /// Deletes and returns at most `limit` [`Envelope`]s from the database.
464    pub async fn delete_batch(
465        &mut self,
466        own_key: ProjectKey,
467        sampling_key: ProjectKey,
468    ) -> Result<Option<DatabaseBatch>, SqliteEnvelopeStoreError> {
469        let mut rows = build_delete_and_fetch_many_envelopes(own_key, sampling_key).fetch(&self.db);
470        let Some(row) = rows.as_mut().next().await else {
471            return Ok(None);
472        };
473        let row = row.map_err(SqliteEnvelopeStoreError::FetchError)?;
474
475        Ok(Some(extract_batch(own_key, sampling_key, row)?))
476    }
477
478    /// Returns a set of project key pairs, representing all the unique combinations of
479    /// `own_key` and `project_key` that are found in the database.
480    pub async fn project_key_pairs(
481        &self,
482    ) -> Result<HashSet<ProjectKeyPair>, SqliteEnvelopeStoreError> {
483        let project_key_pairs = build_get_project_key_pairs()
484            .fetch_all(&self.db)
485            .await
486            .map_err(SqliteEnvelopeStoreError::FetchError)?;
487
488        let project_key_pairs = project_key_pairs
489            .into_iter()
490            // Collect only keys we can extract.
491            .filter_map(|project_key_pair| extract_project_key_pair(project_key_pair).ok())
492            .collect();
493
494        Ok(project_key_pairs)
495    }
496
497    /// Returns an approximate measure of the used size of the database.
498    pub fn usage(&self) -> u64 {
499        self.disk_usage.usage()
500    }
501
502    /// Returns the total count of envelopes stored in the database.
503    pub async fn total_count(&self) -> Result<u64, SqliteEnvelopeStoreError> {
504        let row = build_count_all()
505            .fetch_one(&self.db)
506            .await
507            .map_err(SqliteEnvelopeStoreError::FetchError)?;
508
509        let total_count: i64 = row.get(0);
510        Ok(total_count as u64)
511    }
512}
513
514fn pack_envelopes(envelopes: Vec<DatabaseEnvelope>) -> Box<[u8]> {
515    let mut packed = vec![];
516    for envelope in envelopes {
517        packed.extend_from_slice(&envelope.received_at.to_le_bytes());
518        packed.extend_from_slice(&(envelope.encoded_envelope.len() as u32).to_le_bytes());
519        packed.extend_from_slice(&envelope.encoded_envelope);
520    }
521    packed.into_boxed_slice()
522}
523
524fn unpack_envelopes(
525    own_key: ProjectKey,
526    sampling_key: ProjectKey,
527    data: &[u8],
528) -> Result<Vec<DatabaseEnvelope>, std::io::Error> {
529    let mut envelopes = vec![];
530    let mut buf = data.reader();
531    loop {
532        let mut b = [0u8; 8];
533        match buf.read(&mut b)? {
534            // done:
535            0 => break,
536            // additional trailing bytes:
537            n if n != b.len() => return Err(ErrorKind::UnexpectedEof.into()),
538            _ => {}
539        }
540        let received_at = i64::from_le_bytes(b);
541
542        let mut b = [0u8; 4];
543        buf.read_exact(&mut b)?;
544        let size = u32::from_le_bytes(b);
545
546        let mut b = vec![0u8; size as usize];
547        buf.read_exact(&mut b)?;
548
549        envelopes.push(DatabaseEnvelope {
550            received_at,
551            own_key,
552            sampling_key,
553            encoded_envelope: b.into_boxed_slice(),
554        });
555    }
556    Ok(envelopes)
557}
558
559/// Loads a [`DatabaseEnvelope`] from a database row.
560fn extract_batch(
561    own_key: ProjectKey,
562    sampling_key: ProjectKey,
563    row: SqliteRow,
564) -> Result<DatabaseBatch, SqliteEnvelopeStoreError> {
565    let received_at: i64 = row
566        .try_get("received_at")
567        .map_err(SqliteEnvelopeStoreError::FetchError)?;
568    let data: Box<[u8]> = row
569        .try_get("envelope")
570        .map_err(SqliteEnvelopeStoreError::FetchError)?;
571    let count: u64 = row
572        .try_get("count")
573        .map_err(SqliteEnvelopeStoreError::FetchError)?;
574
575    let envelopes = match count {
576        0 => {
577            debug_assert!(false, "db should not contain empty row");
578            vec![]
579        }
580        1 => vec![DatabaseEnvelope {
581            received_at,
582            own_key,
583            sampling_key,
584            encoded_envelope: data,
585        }],
586        _more => unpack_envelopes(own_key, sampling_key, &data)?,
587    };
588
589    Ok(DatabaseBatch {
590        received_at,
591        own_key,
592        sampling_key,
593        envelopes,
594    })
595}
596
597/// Deserializes a pair of [`ProjectKey`] from the database.
598fn extract_project_key_pair(row: SqliteRow) -> Result<ProjectKeyPair, SqliteEnvelopeStoreError> {
599    let own_key = row
600        .try_get("own_key")
601        .map_err(SqliteEnvelopeStoreError::FetchError)
602        .and_then(|key| {
603            ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
604        });
605    let sampling_key = row
606        .try_get("sampling_key")
607        .map_err(SqliteEnvelopeStoreError::FetchError)
608        .and_then(|key| {
609            ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
610        });
611
612    match (own_key, sampling_key) {
613        (Ok(own_key), Ok(sampling_key)) => Ok(ProjectKeyPair::new(own_key, sampling_key)),
614        // Report the first found error.
615        (Err(err), _) | (_, Err(err)) => {
616            relay_log::error!("failed to extract a queue key from the spool record: {err}");
617
618            Err(err)
619        }
620    }
621}
622
623/// Builds a query that deletes many [`Envelope`] from the database.
624pub fn build_delete_and_fetch_many_envelopes<'a>(
625    own_key: ProjectKey,
626    project_key: ProjectKey,
627) -> Query<'a, Sqlite, SqliteArguments<'a>> {
628    sqlx::query(
629        "DELETE FROM
630            envelopes
631         WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ?
632            ORDER BY received_at DESC LIMIT 1)
633         RETURNING
634            received_at, own_key, sampling_key, envelope, count",
635    )
636    .bind(own_key.to_string())
637    .bind(project_key.to_string())
638}
639
640/// Creates a query which fetches the number of used database pages multiplied by the page size.
641///
642/// This info used to estimate the current allocated database size.
643pub fn build_estimate_size<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
644    sqlx::query(
645        r#"SELECT (page_count - freelist_count) * page_size as size FROM pragma_page_count(), pragma_freelist_count(), pragma_page_size();"#,
646    )
647}
648
649/// Returns the query to select all the unique combinations of own and sampling keys.
650pub fn build_get_project_key_pairs<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
651    sqlx::query("SELECT DISTINCT own_key, sampling_key FROM envelopes;")
652}
653
654/// Returns the query to count the number of envelopes on disk.
655///
656/// Please note that this query is SLOW because SQLite doesn't use any metadata to satisfy it,
657/// meaning that it has to scan through all the rows and count them.
658pub fn build_count_all<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
659    sqlx::query("SELECT SUM(count) FROM envelopes;")
660}
661
662#[cfg(test)]
663mod tests {
664    use std::time::Duration;
665    use tokio::time::sleep;
666
667    use relay_base_schema::project::ProjectKey;
668
669    use super::*;
670    use crate::services::buffer::testutils::utils::{mock_envelopes, setup_db};
671
672    #[tokio::test]
673    async fn test_insert_and_delete_envelopes() {
674        let db = setup_db(true).await;
675        let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
676
677        let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
678        let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
679
680        // We insert 10 envelopes.
681        let batches = [mock_envelopes(5), mock_envelopes(5)];
682        for batch in &batches {
683            assert!(
684                envelope_store
685                    .insert_batch(
686                        batch
687                            .iter()
688                            .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
689                            .collect::<Vec<_>>()
690                            .try_into()
691                            .unwrap()
692                    )
693                    .await
694                    .is_ok()
695            );
696        }
697
698        // We check that if we load 5, we get the newest 5.
699        let batch = envelope_store
700            .delete_batch(own_key, sampling_key)
701            .await
702            .unwrap()
703            .unwrap();
704        assert_eq!(batch.len(), 5);
705        for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
706            assert_eq!(
707                extracted_envelope.received_at().timestamp_millis(),
708                (&batches[1])[i].received_at().timestamp_millis()
709            );
710        }
711
712        // We check that if we load 5 more, we get the oldest 5.
713        let batch = envelope_store
714            .delete_batch(own_key, sampling_key)
715            .await
716            .unwrap()
717            .unwrap();
718        assert_eq!(batch.len(), 5);
719        for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
720            assert_eq!(
721                extracted_envelope.received_at().timestamp_millis(),
722                (&batches[0])[i].received_at().timestamp_millis()
723            );
724        }
725
726        // Store is empty.
727        assert!(
728            envelope_store
729                .delete_batch(own_key, sampling_key)
730                .await
731                .unwrap()
732                .is_none()
733        );
734    }
735
736    #[tokio::test]
737    async fn test_insert_and_delete_single() {
738        let db = setup_db(true).await;
739        let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
740
741        let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
742        let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
743
744        // We insert 10 envelopes.
745        let inserted = mock_envelopes(1);
746
747        assert!(
748            envelope_store
749                .insert_batch(
750                    inserted
751                        .iter()
752                        .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
753                        .collect::<Vec<_>>()
754                        .try_into()
755                        .unwrap()
756                )
757                .await
758                .is_ok()
759        );
760
761        // We check that if we load 5, we get the newest 5.
762        let extracted = envelope_store
763            .delete_batch(own_key, sampling_key)
764            .await
765            .unwrap()
766            .unwrap();
767        assert_eq!(extracted.len(), 1);
768
769        assert_eq!(
770            extracted.envelopes[0].received_at().timestamp_millis(),
771            inserted[0].received_at().timestamp_millis()
772        );
773
774        // Store is empty.
775        assert!(
776            envelope_store
777                .delete_batch(own_key, sampling_key)
778                .await
779                .unwrap()
780                .is_none()
781        );
782    }
783
784    #[tokio::test]
785    async fn test_insert_and_get_project_keys_pairs() {
786        let db = setup_db(true).await;
787        let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
788
789        let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
790        let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
791
792        // We insert 10 envelopes.
793        let envelopes = mock_envelopes(2);
794        envelope_store
795            .insert_batch(
796                envelopes
797                    .into_iter()
798                    .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
799                    .collect::<Vec<_>>()
800                    .try_into()
801                    .unwrap(),
802            )
803            .await
804            .unwrap();
805
806        // We check that we get back only one pair of project keys, since all envelopes have the
807        // same pair.
808        let project_key_pairs = envelope_store.project_key_pairs().await.unwrap();
809        assert_eq!(project_key_pairs.len(), 1);
810        assert_eq!(
811            project_key_pairs.into_iter().last().unwrap(),
812            ProjectKeyPair::new(own_key, sampling_key)
813        );
814    }
815
816    #[tokio::test]
817    async fn test_estimate_disk_usage() {
818        let db = setup_db(true).await;
819        let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
820        let disk_usage = DiskUsage::prepare(0, db, Duration::from_millis(1))
821            .await
822            .unwrap();
823
824        // We read the disk usage without envelopes stored.
825        let usage_1 = disk_usage.usage();
826        assert!(usage_1 > 0);
827
828        // We write 10 envelopes to increase the disk usage.
829        let envelopes = mock_envelopes(10);
830        store
831            .insert_batch(
832                envelopes
833                    .into_iter()
834                    .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
835                    .collect::<Vec<_>>()
836                    .try_into()
837                    .unwrap(),
838            )
839            .await
840            .unwrap();
841
842        // We wait for the refresh timeout of the disk usage task.
843        sleep(Duration::from_millis(2)).await;
844
845        // We now expect to read more disk usage because of the 10 elements.
846        let usage_2 = disk_usage.usage();
847        assert!(usage_2 >= usage_1);
848    }
849
850    #[tokio::test]
851    async fn test_total_count() {
852        let db = setup_db(true).await;
853        let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
854
855        let envelopes = mock_envelopes(10);
856        store
857            .insert_batch(
858                envelopes
859                    .iter()
860                    .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
861                    .collect::<Vec<_>>()
862                    .try_into()
863                    .unwrap(),
864            )
865            .await
866            .unwrap();
867
868        assert_eq!(store.total_count().await.unwrap(), envelopes.len() as u64);
869    }
870}