relay_server/services/buffer/envelope_store/
sqlite.rs

1use std::io::{ErrorKind, Read};
2use std::path::Path;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::time::Duration;
6
7use crate::envelope::EnvelopeError;
8
9use crate::Envelope;
10use crate::services::buffer::common::ProjectKeyPair;
11use crate::statsd::{RelayDistributions, RelayGauges, RelayTimers};
12use bytes::{Buf, Bytes};
13use chrono::{DateTime, Utc};
14use futures::stream::StreamExt;
15use hashbrown::HashSet;
16use relay_base_schema::project::{ParseProjectKeyError, ProjectKey};
17use relay_config::Config;
18use serde::{Deserialize, Serialize};
19use sqlx::migrate::MigrateError;
20use sqlx::query::Query;
21use sqlx::sqlite::{
22    SqliteArguments, SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions,
23    SqliteRow, SqliteSynchronous,
24};
25use sqlx::{Pool, Row, Sqlite};
26use tokio::fs::DirBuilder;
27use tokio::time::sleep;
28
29/// Fixed first 4 bytes for zstd compressed envelopes.
30///
31/// Used for backward compatibility to check whether an envelope on disk is zstd-encoded.
32const ZSTD_MAGIC_WORD: &[u8] = &[40, 181, 47, 253];
33
34/// Struct that contains all the fields of an [`Envelope`] that are mapped to the database columns.
35#[derive(Clone, Debug, Serialize, Deserialize)]
36pub struct DatabaseEnvelope {
37    received_at: i64,
38    own_key: ProjectKey,
39    sampling_key: ProjectKey,
40    encoded_envelope: Box<[u8]>,
41}
42
43#[derive(Clone, Debug)]
44pub struct DatabaseBatch {
45    received_at: i64,
46    own_key: ProjectKey,
47    sampling_key: ProjectKey,
48    envelopes: Vec<DatabaseEnvelope>,
49}
50
51impl DatabaseBatch {
52    pub fn len(&self) -> usize {
53        self.envelopes.len()
54    }
55}
56
57impl TryFrom<Vec<DatabaseEnvelope>> for DatabaseBatch {
58    type Error = ();
59
60    fn try_from(envelopes: Vec<DatabaseEnvelope>) -> Result<Self, Self::Error> {
61        let Some(last) = envelopes.last() else {
62            return Err(());
63        };
64        Ok(Self {
65            received_at: last.received_at,
66            own_key: last.own_key,
67            sampling_key: last.sampling_key,
68            envelopes,
69        })
70    }
71}
72
73impl From<DatabaseBatch> for Vec<DatabaseEnvelope> {
74    fn from(value: DatabaseBatch) -> Self {
75        value.envelopes
76    }
77}
78
79#[derive(Debug, thiserror::Error)]
80pub enum InsertEnvelopeError {
81    #[error("envelope conversion error: {0}")]
82    Envelope(#[from] EnvelopeError),
83    #[error("compression error: {0}")]
84    Zstd(#[from] std::io::Error),
85}
86
87impl DatabaseEnvelope {
88    // Use the lowest level of compression.
89    //
90    // Experiments showed that level 3 is significantly slower than level 1 while offering
91    // no significant size reduction for our use case.
92    const COMPRESSION_LEVEL: i32 = 1;
93
94    pub fn len(&self) -> usize {
95        self.encoded_envelope.len()
96    }
97
98    pub fn received_at(&self) -> DateTime<Utc> {
99        DateTime::from_timestamp_millis(self.received_at).unwrap_or(Utc::now())
100    }
101}
102
103impl TryFrom<DatabaseEnvelope> for Box<Envelope> {
104    type Error = InsertEnvelopeError;
105
106    fn try_from(value: DatabaseEnvelope) -> Result<Self, Self::Error> {
107        let received_at = value.received_at();
108        let DatabaseEnvelope {
109            received_at: _,
110            own_key,
111            sampling_key,
112            mut encoded_envelope,
113        } = value;
114
115        if encoded_envelope.starts_with(ZSTD_MAGIC_WORD) {
116            relay_statsd::metric!(
117                timer(RelayTimers::BufferEnvelopeDecompression, sample = 0.01),
118                {
119                    encoded_envelope = zstd::decode_all(&*encoded_envelope)?.into_boxed_slice();
120                }
121            );
122        }
123
124        let mut envelope = Envelope::parse_bytes(Bytes::from(encoded_envelope))?;
125        debug_assert_eq!(envelope.meta().public_key(), own_key);
126        debug_assert!(
127            envelope
128                .sampling_key()
129                .is_none_or(|key| key == sampling_key)
130        );
131
132        envelope.set_received_at(received_at);
133
134        Ok(envelope)
135    }
136}
137
138impl<'a> TryFrom<&'a Envelope> for DatabaseEnvelope {
139    type Error = InsertEnvelopeError;
140
141    fn try_from(value: &'a Envelope) -> Result<Self, Self::Error> {
142        let own_key = value.meta().public_key();
143        let sampling_key = value.sampling_key().unwrap_or(own_key);
144
145        let serialized_envelope = value.to_vec()?;
146        relay_statsd::metric!(
147            distribution(RelayDistributions::BufferEnvelopeSize, sample = 0.01) =
148                serialized_envelope.len() as u64
149        );
150
151        let encoded_envelope = relay_statsd::metric!(
152            timer(RelayTimers::BufferEnvelopeCompression, sample = 0.01),
153            { zstd::encode_all(serialized_envelope.as_slice(), Self::COMPRESSION_LEVEL)? }
154        );
155        relay_statsd::metric!(
156            distribution(
157                RelayDistributions::BufferEnvelopeSizeCompressed,
158                sample = 0.01
159            ) = encoded_envelope.len() as u64
160        );
161
162        Ok(DatabaseEnvelope {
163            received_at: value.received_at().timestamp_millis(),
164            own_key,
165            sampling_key,
166            encoded_envelope: encoded_envelope.into_boxed_slice(),
167        })
168    }
169}
170
171/// An error returned when doing an operation on [`SqliteEnvelopeStore`].
172#[derive(Debug, thiserror::Error)]
173pub enum SqliteEnvelopeStoreError {
174    #[error("failed to setup the database: {0}")]
175    SqlxSetupFailed(sqlx::Error),
176
177    #[error("failed to create the spool file: {0}")]
178    FileSetupError(std::io::Error),
179
180    #[error("failed to write to disk: {0}")]
181    WriteError(sqlx::Error),
182
183    #[error("failed to read from disk: {0}")]
184    FetchError(sqlx::Error),
185
186    #[error("failed to unpack envelopes: {0}")]
187    UnpackError(#[from] std::io::Error),
188
189    #[error("no file path for the spool was provided")]
190    NoFilePath,
191
192    #[error("failed to migrate the database: {0}")]
193    MigrationError(MigrateError),
194
195    #[error("failed to extract the envelope from the database")]
196    EnvelopeExtractionError,
197
198    #[error("failed to extract a project key from the database")]
199    ProjectKeyExtractionError(#[from] ParseProjectKeyError),
200
201    #[error("failed to get database file size: {0}")]
202    FileSizeReadFailed(sqlx::Error),
203}
204
205#[derive(Debug, Clone)]
206struct DiskUsage {
207    db: Pool<Sqlite>,
208    last_known_usage: Arc<AtomicU64>,
209    refresh_frequency: Duration,
210    partition_tag: String,
211}
212
213impl DiskUsage {
214    /// Creates a new empty [`DiskUsage`].
215    fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
216        Self {
217            db,
218            last_known_usage: Arc::new(AtomicU64::new(0)),
219            refresh_frequency,
220            partition_tag: partition_id.to_string(),
221        }
222    }
223
224    /// Prepares a [`DiskUsage`] instance with an initial reading of the database usage and fails
225    /// if not reading can be made.
226    pub async fn prepare(
227        partition_id: u8,
228        db: Pool<Sqlite>,
229        refresh_frequency: Duration,
230    ) -> Result<Self, SqliteEnvelopeStoreError> {
231        let disk_usage = Self::new(partition_id, db.clone(), refresh_frequency);
232        let usage = Self::estimate_usage(&disk_usage.partition_tag, &db).await?;
233        disk_usage.last_known_usage.store(usage, Ordering::Relaxed);
234        disk_usage.start_background_refresh();
235
236        Ok(disk_usage)
237    }
238
239    /// Returns the disk usage and asynchronously updates it in case a `refresh_frequency_ms`
240    /// elapsed.
241    fn usage(&self) -> u64 {
242        self.last_known_usage.load(Ordering::Relaxed)
243    }
244
245    /// Starts a background tokio task to update the database usage.
246    fn start_background_refresh(&self) {
247        let db = self.db.clone();
248        // We get a weak reference, to make sure that if `DiskUsage` is dropped, the reference can't
249        // be upgraded, causing the loop in the tokio task to exit.
250        let last_known_usage_weak = Arc::downgrade(&self.last_known_usage);
251        let refresh_frequency = self.refresh_frequency;
252
253        let partition_tag = self.partition_tag.clone();
254        relay_system::spawn!(async move {
255            loop {
256                // When our `Weak` reference can't be upgraded to an `Arc`, it means that the value
257                // is not referenced anymore by self, meaning that `DiskUsage` was dropped.
258                let Some(last_known_usage) = last_known_usage_weak.upgrade() else {
259                    break;
260                };
261
262                let usage = Self::estimate_usage(&partition_tag, &db).await;
263                let Ok(usage) = usage else {
264                    relay_log::error!("failed to update the disk usage asynchronously");
265                    return;
266                };
267
268                let current = last_known_usage.load(Ordering::Relaxed);
269                if last_known_usage
270                    .compare_exchange_weak(current, usage, Ordering::Relaxed, Ordering::Relaxed)
271                    .is_err()
272                {
273                    relay_log::error!("failed to update the disk usage asynchronously");
274                };
275
276                sleep(refresh_frequency).await;
277            }
278        });
279    }
280
281    /// Estimates the disk usage of the SQLite database.
282    async fn estimate_usage(
283        partition_tag: &str,
284        db: &Pool<Sqlite>,
285    ) -> Result<u64, SqliteEnvelopeStoreError> {
286        let usage: i64 = build_estimate_size()
287            .fetch_one(db)
288            .await
289            .and_then(|r| r.try_get(0))
290            .map_err(SqliteEnvelopeStoreError::FileSizeReadFailed)?;
291
292        relay_statsd::metric!(
293            gauge(RelayGauges::BufferDiskUsed) = usage as u64,
294            partition_id = partition_tag
295        );
296
297        Ok(usage as u64)
298    }
299}
300
301/// Struct that offers access to a SQLite-based store of [`Envelope`]s.
302///
303/// The goal of this struct is to hide away all the complexity of dealing with the database for
304/// reading and writing envelopes.
305#[derive(Debug, Clone)]
306pub struct SqliteEnvelopeStore {
307    db: Pool<Sqlite>,
308    disk_usage: DiskUsage,
309    partition_tag: String,
310}
311
312impl SqliteEnvelopeStore {
313    /// Initializes the [`SqliteEnvelopeStore`] with a supplied [`Pool`].
314    pub fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
315        Self {
316            db: db.clone(),
317            disk_usage: DiskUsage::new(partition_id, db, refresh_frequency),
318            partition_tag: partition_id.to_string(),
319        }
320    }
321
322    /// Prepares the [`SqliteEnvelopeStore`] by running all the necessary migrations and preparing
323    /// the folders where data will be stored.
324    pub async fn prepare(
325        partition_id: u8,
326        config: &Config,
327    ) -> Result<SqliteEnvelopeStore, SqliteEnvelopeStoreError> {
328        // If no path is provided, we can't do disk spooling.
329        let Some(path) = config.spool_envelopes_path(partition_id) else {
330            return Err(SqliteEnvelopeStoreError::NoFilePath);
331        };
332
333        relay_log::info!("buffer file {}", path.to_string_lossy());
334
335        Self::setup(&path).await?;
336
337        let options = SqliteConnectOptions::new()
338            .filename(&path)
339            // The WAL journaling mode uses a write-ahead log instead of a rollback journal to implement transactions.
340            // The WAL journaling mode is persistent; after being set it stays in effect
341            // across multiple database connections and after closing and reopening the database.
342            //
343            // 1. WAL is significantly faster in most scenarios.
344            // 2. WAL provides more concurrency as readers do not block writers and a writer does not block readers. Reading and writing can proceed concurrently.
345            // 3. Disk I/O operations tends to be more sequential using WAL.
346            // 4. WAL uses many fewer fsync() operations and is thus less vulnerable to problems on systems where the fsync() system call is broken.
347            .journal_mode(SqliteJournalMode::Wal)
348            // WAL mode is safe from corruption with synchronous=NORMAL.
349            // When synchronous is NORMAL, the SQLite database engine will still sync at the most critical moments, but less often than in FULL mode.
350            // Which guarantees good balance between safety and speed.
351            .synchronous(SqliteSynchronous::Normal)
352            // The freelist pages are moved to the end of the database file and the database file is truncated to remove the freelist pages at every
353            // transaction commit. Note, however, that auto-vacuum only truncates the freelist pages from the file.
354            // Auto-vacuum does not de-fragment the database nor repack individual database pages the way that the VACUUM command does.
355            //
356            // This will help us to keep the file size under some control.
357            .auto_vacuum(SqliteAutoVacuum::Full)
358            // If shared-cache mode is enabled and a thread establishes multiple
359            // connections to the same database, the connections share a single data and schema cache.
360            // This can significantly reduce the quantity of memory and IO required by the system.
361            .shared_cache(true);
362
363        let db = SqlitePoolOptions::new()
364            .max_connections(1)
365            .min_connections(1)
366            .connect_with(options)
367            .await
368            .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
369
370        Ok(SqliteEnvelopeStore {
371            db: db.clone(),
372            disk_usage: DiskUsage::prepare(
373                partition_id,
374                db,
375                config.spool_disk_usage_refresh_frequency_ms(),
376            )
377            .await?,
378            partition_tag: partition_id.to_string(),
379        })
380    }
381
382    /// Set up the database and return the current number of envelopes.
383    ///
384    /// The directories and spool file will be created if they don't already
385    /// exist.
386    async fn setup(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
387        Self::create_spool_directory(path).await?;
388
389        let options = SqliteConnectOptions::new()
390            .filename(path)
391            .journal_mode(SqliteJournalMode::Wal)
392            .create_if_missing(true);
393
394        let db = SqlitePoolOptions::new()
395            .connect_with(options)
396            .await
397            .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
398
399        sqlx::migrate!("../migrations")
400            .run(&db)
401            .await
402            .map_err(SqliteEnvelopeStoreError::MigrationError)?;
403
404        Ok(())
405    }
406
407    /// Creates the directories for the spool file.
408    async fn create_spool_directory(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
409        let Some(parent) = path.parent() else {
410            return Ok(());
411        };
412
413        if !parent.as_os_str().is_empty() && !parent.exists() {
414            relay_log::debug!("creating directory for spooling file: {}", parent.display());
415            DirBuilder::new()
416                .recursive(true)
417                .create(&parent)
418                .await
419                .map_err(SqliteEnvelopeStoreError::FileSetupError)?;
420        }
421
422        Ok(())
423    }
424
425    /// Inserts one or more envelopes into the database.
426    pub async fn insert_batch(
427        &mut self,
428        envelopes: DatabaseBatch,
429    ) -> Result<(), SqliteEnvelopeStoreError> {
430        let DatabaseBatch {
431            received_at,
432            own_key,
433            sampling_key,
434            envelopes,
435        } = envelopes;
436
437        let count = envelopes.len();
438        let encoded = match count {
439            0 => {
440                debug_assert!(false, "should not be called with empty batch");
441                return Ok(());
442            }
443            // special-casing single envelopes shaves off a little bit of time for large envelopes,
444            // but it's mainly for backward compatibility.
445            1 => envelopes.into_iter().next().unwrap().encoded_envelope,
446            _more => pack_envelopes(envelopes),
447        };
448
449        let query = sqlx::query("INSERT INTO envelopes (received_at, own_key, sampling_key, count, envelope) VALUES (?, ?, ?, ?, ?);")
450            .bind(received_at)
451            .bind(own_key.as_str())
452            .bind(sampling_key.as_str())
453            .bind(count as u16)
454            .bind(encoded);
455
456        relay_statsd::metric!(
457            timer(RelayTimers::BufferSqlWrite),
458            partition_id = &self.partition_tag,
459            {
460                query
461                    .execute(&self.db)
462                    .await
463                    .map_err(SqliteEnvelopeStoreError::WriteError)?;
464            }
465        );
466        Ok(())
467    }
468
469    /// Deletes and returns at most `limit` [`Envelope`]s from the database.
470    pub async fn delete_batch(
471        &mut self,
472        own_key: ProjectKey,
473        sampling_key: ProjectKey,
474    ) -> Result<Option<DatabaseBatch>, SqliteEnvelopeStoreError> {
475        let mut rows = build_delete_and_fetch_many_envelopes(own_key, sampling_key).fetch(&self.db);
476        let Some(row) = rows.as_mut().next().await else {
477            return Ok(None);
478        };
479        let row = row.map_err(SqliteEnvelopeStoreError::FetchError)?;
480
481        Ok(Some(extract_batch(own_key, sampling_key, row)?))
482    }
483
484    /// Returns a set of project key pairs, representing all the unique combinations of
485    /// `own_key` and `project_key` that are found in the database.
486    pub async fn project_key_pairs(
487        &self,
488    ) -> Result<HashSet<ProjectKeyPair>, SqliteEnvelopeStoreError> {
489        let project_key_pairs = build_get_project_key_pairs()
490            .fetch_all(&self.db)
491            .await
492            .map_err(SqliteEnvelopeStoreError::FetchError)?;
493
494        let project_key_pairs = project_key_pairs
495            .into_iter()
496            // Collect only keys we can extract.
497            .filter_map(|project_key_pair| extract_project_key_pair(project_key_pair).ok())
498            .collect();
499
500        Ok(project_key_pairs)
501    }
502
503    /// Returns an approximate measure of the used size of the database.
504    pub fn usage(&self) -> u64 {
505        self.disk_usage.usage()
506    }
507
508    /// Returns the total count of envelopes stored in the database.
509    pub async fn total_count(&self) -> Result<u64, SqliteEnvelopeStoreError> {
510        let row = build_count_all()
511            .fetch_one(&self.db)
512            .await
513            .map_err(SqliteEnvelopeStoreError::FetchError)?;
514
515        let total_count: i64 = row.get(0);
516        Ok(total_count as u64)
517    }
518}
519
520fn pack_envelopes(envelopes: Vec<DatabaseEnvelope>) -> Box<[u8]> {
521    let mut packed = vec![];
522    for envelope in envelopes {
523        packed.extend_from_slice(&envelope.received_at.to_le_bytes());
524        packed.extend_from_slice(&(envelope.encoded_envelope.len() as u32).to_le_bytes());
525        packed.extend_from_slice(&envelope.encoded_envelope);
526    }
527    packed.into_boxed_slice()
528}
529
530fn unpack_envelopes(
531    own_key: ProjectKey,
532    sampling_key: ProjectKey,
533    data: &[u8],
534) -> Result<Vec<DatabaseEnvelope>, std::io::Error> {
535    let mut envelopes = vec![];
536    let mut buf = data.reader();
537    loop {
538        let mut b = [0u8; 8];
539        match buf.read(&mut b)? {
540            // done:
541            0 => break,
542            // additional trailing bytes:
543            n if n != b.len() => return Err(ErrorKind::UnexpectedEof.into()),
544            _ => {}
545        }
546        let received_at = i64::from_le_bytes(b);
547
548        let mut b = [0u8; 4];
549        buf.read_exact(&mut b)?;
550        let size = u32::from_le_bytes(b);
551
552        let mut b = vec![0u8; size as usize];
553        buf.read_exact(&mut b)?;
554
555        envelopes.push(DatabaseEnvelope {
556            received_at,
557            own_key,
558            sampling_key,
559            encoded_envelope: b.into_boxed_slice(),
560        });
561    }
562    Ok(envelopes)
563}
564
565/// Loads a [`DatabaseEnvelope`] from a database row.
566fn extract_batch(
567    own_key: ProjectKey,
568    sampling_key: ProjectKey,
569    row: SqliteRow,
570) -> Result<DatabaseBatch, SqliteEnvelopeStoreError> {
571    let received_at: i64 = row
572        .try_get("received_at")
573        .map_err(SqliteEnvelopeStoreError::FetchError)?;
574    let data: Box<[u8]> = row
575        .try_get("envelope")
576        .map_err(SqliteEnvelopeStoreError::FetchError)?;
577    let count: u64 = row
578        .try_get("count")
579        .map_err(SqliteEnvelopeStoreError::FetchError)?;
580
581    let envelopes = match count {
582        0 => {
583            debug_assert!(false, "db should not contain empty row");
584            vec![]
585        }
586        1 => vec![DatabaseEnvelope {
587            received_at,
588            own_key,
589            sampling_key,
590            encoded_envelope: data,
591        }],
592        _more => unpack_envelopes(own_key, sampling_key, &data)?,
593    };
594
595    Ok(DatabaseBatch {
596        received_at,
597        own_key,
598        sampling_key,
599        envelopes,
600    })
601}
602
603/// Deserializes a pair of [`ProjectKey`] from the database.
604fn extract_project_key_pair(row: SqliteRow) -> Result<ProjectKeyPair, SqliteEnvelopeStoreError> {
605    let own_key = row
606        .try_get("own_key")
607        .map_err(SqliteEnvelopeStoreError::FetchError)
608        .and_then(|key| {
609            ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
610        });
611    let sampling_key = row
612        .try_get("sampling_key")
613        .map_err(SqliteEnvelopeStoreError::FetchError)
614        .and_then(|key| {
615            ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
616        });
617
618    match (own_key, sampling_key) {
619        (Ok(own_key), Ok(sampling_key)) => Ok(ProjectKeyPair::new(own_key, sampling_key)),
620        // Report the first found error.
621        (Err(err), _) | (_, Err(err)) => {
622            relay_log::error!("failed to extract a queue key from the spool record: {err}");
623
624            Err(err)
625        }
626    }
627}
628
629/// Builds a query that deletes many [`Envelope`] from the database.
630pub fn build_delete_and_fetch_many_envelopes<'a>(
631    own_key: ProjectKey,
632    project_key: ProjectKey,
633) -> Query<'a, Sqlite, SqliteArguments<'a>> {
634    sqlx::query(
635        "DELETE FROM
636            envelopes
637         WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ?
638            ORDER BY received_at DESC LIMIT 1)
639         RETURNING
640            received_at, own_key, sampling_key, envelope, count",
641    )
642    .bind(own_key.to_string())
643    .bind(project_key.to_string())
644}
645
646/// Creates a query which fetches the number of used database pages multiplied by the page size.
647///
648/// This info used to estimate the current allocated database size.
649pub fn build_estimate_size<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
650    sqlx::query(
651        r#"SELECT (page_count - freelist_count) * page_size as size FROM pragma_page_count(), pragma_freelist_count(), pragma_page_size();"#,
652    )
653}
654
655/// Returns the query to select all the unique combinations of own and sampling keys.
656pub fn build_get_project_key_pairs<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
657    sqlx::query("SELECT DISTINCT own_key, sampling_key FROM envelopes;")
658}
659
660/// Returns the query to count the number of envelopes on disk.
661///
662/// Please note that this query is SLOW because SQLite doesn't use any metadata to satisfy it,
663/// meaning that it has to scan through all the rows and count them.
664pub fn build_count_all<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
665    sqlx::query("SELECT SUM(count) FROM envelopes;")
666}
667
668#[cfg(test)]
669mod tests {
670    use std::time::Duration;
671    use tokio::time::sleep;
672
673    use relay_base_schema::project::ProjectKey;
674
675    use super::*;
676    use crate::services::buffer::testutils::utils::{mock_envelopes, setup_db};
677
678    #[tokio::test]
679    async fn test_insert_and_delete_envelopes() {
680        let db = setup_db(true).await;
681        let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
682
683        let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
684        let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
685
686        // We insert 10 envelopes.
687        let batches = [mock_envelopes(5), mock_envelopes(5)];
688        for batch in &batches {
689            assert!(
690                envelope_store
691                    .insert_batch(
692                        batch
693                            .iter()
694                            .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
695                            .collect::<Vec<_>>()
696                            .try_into()
697                            .unwrap()
698                    )
699                    .await
700                    .is_ok()
701            );
702        }
703
704        // We check that if we load 5, we get the newest 5.
705        let batch = envelope_store
706            .delete_batch(own_key, sampling_key)
707            .await
708            .unwrap()
709            .unwrap();
710        assert_eq!(batch.len(), 5);
711        for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
712            assert_eq!(
713                extracted_envelope.received_at().timestamp_millis(),
714                (&batches[1])[i].received_at().timestamp_millis()
715            );
716        }
717
718        // We check that if we load 5 more, we get the oldest 5.
719        let batch = envelope_store
720            .delete_batch(own_key, sampling_key)
721            .await
722            .unwrap()
723            .unwrap();
724        assert_eq!(batch.len(), 5);
725        for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
726            assert_eq!(
727                extracted_envelope.received_at().timestamp_millis(),
728                (&batches[0])[i].received_at().timestamp_millis()
729            );
730        }
731
732        // Store is empty.
733        assert!(
734            envelope_store
735                .delete_batch(own_key, sampling_key)
736                .await
737                .unwrap()
738                .is_none()
739        );
740    }
741
742    #[tokio::test]
743    async fn test_insert_and_delete_single() {
744        let db = setup_db(true).await;
745        let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
746
747        let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
748        let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
749
750        // We insert 10 envelopes.
751        let inserted = mock_envelopes(1);
752
753        assert!(
754            envelope_store
755                .insert_batch(
756                    inserted
757                        .iter()
758                        .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
759                        .collect::<Vec<_>>()
760                        .try_into()
761                        .unwrap()
762                )
763                .await
764                .is_ok()
765        );
766
767        // We check that if we load 5, we get the newest 5.
768        let extracted = envelope_store
769            .delete_batch(own_key, sampling_key)
770            .await
771            .unwrap()
772            .unwrap();
773        assert_eq!(extracted.len(), 1);
774
775        assert_eq!(
776            extracted.envelopes[0].received_at().timestamp_millis(),
777            inserted[0].received_at().timestamp_millis()
778        );
779
780        // Store is empty.
781        assert!(
782            envelope_store
783                .delete_batch(own_key, sampling_key)
784                .await
785                .unwrap()
786                .is_none()
787        );
788    }
789
790    #[tokio::test]
791    async fn test_insert_and_get_project_keys_pairs() {
792        let db = setup_db(true).await;
793        let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
794
795        let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
796        let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
797
798        // We insert 10 envelopes.
799        let envelopes = mock_envelopes(2);
800        envelope_store
801            .insert_batch(
802                envelopes
803                    .into_iter()
804                    .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
805                    .collect::<Vec<_>>()
806                    .try_into()
807                    .unwrap(),
808            )
809            .await
810            .unwrap();
811
812        // We check that we get back only one pair of project keys, since all envelopes have the
813        // same pair.
814        let project_key_pairs = envelope_store.project_key_pairs().await.unwrap();
815        assert_eq!(project_key_pairs.len(), 1);
816        assert_eq!(
817            project_key_pairs.into_iter().last().unwrap(),
818            ProjectKeyPair::new(own_key, sampling_key)
819        );
820    }
821
822    #[tokio::test]
823    async fn test_estimate_disk_usage() {
824        let db = setup_db(true).await;
825        let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
826        let disk_usage = DiskUsage::prepare(0, db, Duration::from_millis(1))
827            .await
828            .unwrap();
829
830        // We read the disk usage without envelopes stored.
831        let usage_1 = disk_usage.usage();
832        assert!(usage_1 > 0);
833
834        // We write 10 envelopes to increase the disk usage.
835        let envelopes = mock_envelopes(10);
836        store
837            .insert_batch(
838                envelopes
839                    .into_iter()
840                    .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
841                    .collect::<Vec<_>>()
842                    .try_into()
843                    .unwrap(),
844            )
845            .await
846            .unwrap();
847
848        // We wait for the refresh timeout of the disk usage task.
849        sleep(Duration::from_millis(2)).await;
850
851        // We now expect to read more disk usage because of the 10 elements.
852        let usage_2 = disk_usage.usage();
853        assert!(usage_2 >= usage_1);
854    }
855
856    #[tokio::test]
857    async fn test_total_count() {
858        let db = setup_db(true).await;
859        let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
860
861        let envelopes = mock_envelopes(10);
862        store
863            .insert_batch(
864                envelopes
865                    .iter()
866                    .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
867                    .collect::<Vec<_>>()
868                    .try_into()
869                    .unwrap(),
870            )
871            .await
872            .unwrap();
873
874        assert_eq!(store.total_count().await.unwrap(), envelopes.len() as u64);
875    }
876}