1use std::io::{ErrorKind, Read};
2use std::path::Path;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::time::Duration;
6
7use crate::envelope::EnvelopeError;
8
9use crate::Envelope;
10use crate::services::buffer::common::ProjectKeyPair;
11use crate::statsd::{RelayDistributions, RelayGauges, RelayTimers};
12use bytes::{Buf, Bytes};
13use chrono::{DateTime, Utc};
14use futures::stream::StreamExt;
15use hashbrown::HashSet;
16use relay_base_schema::project::{ParseProjectKeyError, ProjectKey};
17use relay_config::Config;
18use serde::{Deserialize, Serialize};
19use sqlx::migrate::MigrateError;
20use sqlx::query::Query;
21use sqlx::sqlite::{
22 SqliteArguments, SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions,
23 SqliteRow, SqliteSynchronous,
24};
25use sqlx::{Pool, Row, Sqlite};
26use tokio::fs::DirBuilder;
27use tokio::time::sleep;
28
29const ZSTD_MAGIC_WORD: &[u8] = &[40, 181, 47, 253];
33
34#[derive(Clone, Debug, Serialize, Deserialize)]
36pub struct DatabaseEnvelope {
37 received_at: i64,
38 own_key: ProjectKey,
39 sampling_key: ProjectKey,
40 encoded_envelope: Box<[u8]>,
41}
42
43#[derive(Clone, Debug)]
44pub struct DatabaseBatch {
45 received_at: i64,
46 own_key: ProjectKey,
47 sampling_key: ProjectKey,
48 envelopes: Vec<DatabaseEnvelope>,
49}
50
51impl DatabaseBatch {
52 pub fn len(&self) -> usize {
53 self.envelopes.len()
54 }
55}
56
57impl TryFrom<Vec<DatabaseEnvelope>> for DatabaseBatch {
58 type Error = ();
59
60 fn try_from(envelopes: Vec<DatabaseEnvelope>) -> Result<Self, Self::Error> {
61 let Some(last) = envelopes.last() else {
62 return Err(());
63 };
64 Ok(Self {
65 received_at: last.received_at,
66 own_key: last.own_key,
67 sampling_key: last.sampling_key,
68 envelopes,
69 })
70 }
71}
72
73impl From<DatabaseBatch> for Vec<DatabaseEnvelope> {
74 fn from(value: DatabaseBatch) -> Self {
75 value.envelopes
76 }
77}
78
79#[derive(Debug, thiserror::Error)]
80pub enum InsertEnvelopeError {
81 #[error("envelope conversion error: {0}")]
82 Envelope(#[from] EnvelopeError),
83 #[error("compression error: {0}")]
84 Zstd(#[from] std::io::Error),
85}
86
87impl DatabaseEnvelope {
88 const COMPRESSION_LEVEL: i32 = 1;
93
94 pub fn len(&self) -> usize {
95 self.encoded_envelope.len()
96 }
97
98 pub fn received_at(&self) -> DateTime<Utc> {
99 DateTime::from_timestamp_millis(self.received_at).unwrap_or(Utc::now())
100 }
101}
102
103impl TryFrom<DatabaseEnvelope> for Box<Envelope> {
104 type Error = InsertEnvelopeError;
105
106 fn try_from(value: DatabaseEnvelope) -> Result<Self, Self::Error> {
107 let received_at = value.received_at();
108 let DatabaseEnvelope {
109 received_at: _,
110 own_key,
111 sampling_key,
112 mut encoded_envelope,
113 } = value;
114
115 if encoded_envelope.starts_with(ZSTD_MAGIC_WORD) {
116 relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeDecompression), {
117 encoded_envelope = zstd::decode_all(&*encoded_envelope)?.into_boxed_slice();
118 });
119 }
120
121 let mut envelope = Envelope::parse_bytes(Bytes::from(encoded_envelope))?;
122 debug_assert_eq!(envelope.meta().public_key(), own_key);
123 debug_assert!(
124 envelope
125 .sampling_key()
126 .is_none_or(|key| key == sampling_key)
127 );
128
129 envelope.set_received_at(received_at);
130
131 Ok(envelope)
132 }
133}
134
135impl<'a> TryFrom<&'a Envelope> for DatabaseEnvelope {
136 type Error = InsertEnvelopeError;
137
138 fn try_from(value: &'a Envelope) -> Result<Self, Self::Error> {
139 let own_key = value.meta().public_key();
140 let sampling_key = value.sampling_key().unwrap_or(own_key);
141
142 let serialized_envelope = value.to_vec()?;
143 relay_statsd::metric!(
144 distribution(RelayDistributions::BufferEnvelopeSize) = serialized_envelope.len() as u64
145 );
146
147 let encoded_envelope =
148 relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeCompression), {
149 zstd::encode_all(serialized_envelope.as_slice(), Self::COMPRESSION_LEVEL)?
150 });
151 relay_statsd::metric!(
152 distribution(RelayDistributions::BufferEnvelopeSizeCompressed) =
153 encoded_envelope.len() as u64
154 );
155
156 Ok(DatabaseEnvelope {
157 received_at: value.received_at().timestamp_millis(),
158 own_key,
159 sampling_key,
160 encoded_envelope: encoded_envelope.into_boxed_slice(),
161 })
162 }
163}
164
165#[derive(Debug, thiserror::Error)]
167pub enum SqliteEnvelopeStoreError {
168 #[error("failed to setup the database: {0}")]
169 SqlxSetupFailed(sqlx::Error),
170
171 #[error("failed to create the spool file: {0}")]
172 FileSetupError(std::io::Error),
173
174 #[error("failed to write to disk: {0}")]
175 WriteError(sqlx::Error),
176
177 #[error("failed to read from disk: {0}")]
178 FetchError(sqlx::Error),
179
180 #[error("failed to unpack envelopes: {0}")]
181 UnpackError(#[from] std::io::Error),
182
183 #[error("no file path for the spool was provided")]
184 NoFilePath,
185
186 #[error("failed to migrate the database: {0}")]
187 MigrationError(MigrateError),
188
189 #[error("failed to extract the envelope from the database")]
190 EnvelopeExtractionError,
191
192 #[error("failed to extract a project key from the database")]
193 ProjectKeyExtractionError(#[from] ParseProjectKeyError),
194
195 #[error("failed to get database file size: {0}")]
196 FileSizeReadFailed(sqlx::Error),
197}
198
199#[derive(Debug, Clone)]
200struct DiskUsage {
201 db: Pool<Sqlite>,
202 last_known_usage: Arc<AtomicU64>,
203 refresh_frequency: Duration,
204 partition_tag: String,
205}
206
207impl DiskUsage {
208 fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
210 Self {
211 db,
212 last_known_usage: Arc::new(AtomicU64::new(0)),
213 refresh_frequency,
214 partition_tag: partition_id.to_string(),
215 }
216 }
217
218 pub async fn prepare(
221 partition_id: u8,
222 db: Pool<Sqlite>,
223 refresh_frequency: Duration,
224 ) -> Result<Self, SqliteEnvelopeStoreError> {
225 let disk_usage = Self::new(partition_id, db.clone(), refresh_frequency);
226 let usage = Self::estimate_usage(&disk_usage.partition_tag, &db).await?;
227 disk_usage.last_known_usage.store(usage, Ordering::Relaxed);
228 disk_usage.start_background_refresh();
229
230 Ok(disk_usage)
231 }
232
233 fn usage(&self) -> u64 {
236 self.last_known_usage.load(Ordering::Relaxed)
237 }
238
239 fn start_background_refresh(&self) {
241 let db = self.db.clone();
242 let last_known_usage_weak = Arc::downgrade(&self.last_known_usage);
245 let refresh_frequency = self.refresh_frequency;
246
247 let partition_tag = self.partition_tag.clone();
248 relay_system::spawn!(async move {
249 while let Some(last_known_usage) = last_known_usage_weak.upgrade() {
252 let usage = Self::estimate_usage(&partition_tag, &db).await;
253 let Ok(usage) = usage else {
254 relay_log::error!("failed to update the disk usage asynchronously");
255 return;
256 };
257
258 let current = last_known_usage.load(Ordering::Relaxed);
259 if last_known_usage
260 .compare_exchange_weak(current, usage, Ordering::Relaxed, Ordering::Relaxed)
261 .is_err()
262 {
263 relay_log::error!("failed to update the disk usage asynchronously");
264 };
265
266 sleep(refresh_frequency).await;
267 }
268 });
269 }
270
271 async fn estimate_usage(
273 partition_tag: &str,
274 db: &Pool<Sqlite>,
275 ) -> Result<u64, SqliteEnvelopeStoreError> {
276 let usage: i64 = build_estimate_size()
277 .fetch_one(db)
278 .await
279 .and_then(|r| r.try_get(0))
280 .map_err(SqliteEnvelopeStoreError::FileSizeReadFailed)?;
281
282 relay_statsd::metric!(
283 gauge(RelayGauges::BufferDiskUsed) = usage as u64,
284 partition_id = partition_tag
285 );
286
287 Ok(usage as u64)
288 }
289}
290
291#[derive(Debug, Clone)]
296pub struct SqliteEnvelopeStore {
297 db: Pool<Sqlite>,
298 disk_usage: DiskUsage,
299 partition_tag: String,
300}
301
302impl SqliteEnvelopeStore {
303 pub fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
305 Self {
306 db: db.clone(),
307 disk_usage: DiskUsage::new(partition_id, db, refresh_frequency),
308 partition_tag: partition_id.to_string(),
309 }
310 }
311
312 pub async fn prepare(
315 partition_id: u8,
316 config: &Config,
317 ) -> Result<SqliteEnvelopeStore, SqliteEnvelopeStoreError> {
318 let Some(path) = config.spool_envelopes_path(partition_id) else {
320 return Err(SqliteEnvelopeStoreError::NoFilePath);
321 };
322
323 relay_log::info!("buffer file {}", path.to_string_lossy());
324
325 Self::setup(&path).await?;
326
327 let options = SqliteConnectOptions::new()
328 .filename(&path)
329 .journal_mode(SqliteJournalMode::Wal)
338 .synchronous(SqliteSynchronous::Normal)
342 .auto_vacuum(SqliteAutoVacuum::Full)
348 .shared_cache(true);
352
353 let db = SqlitePoolOptions::new()
354 .max_connections(1)
355 .min_connections(1)
356 .connect_with(options)
357 .await
358 .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
359
360 Ok(SqliteEnvelopeStore {
361 db: db.clone(),
362 disk_usage: DiskUsage::prepare(
363 partition_id,
364 db,
365 config.spool_disk_usage_refresh_frequency_ms(),
366 )
367 .await?,
368 partition_tag: partition_id.to_string(),
369 })
370 }
371
372 async fn setup(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
377 Self::create_spool_directory(path).await?;
378
379 let options = SqliteConnectOptions::new()
380 .filename(path)
381 .journal_mode(SqliteJournalMode::Wal)
382 .create_if_missing(true);
383
384 let db = SqlitePoolOptions::new()
385 .connect_with(options)
386 .await
387 .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
388
389 sqlx::migrate!("../migrations")
390 .run(&db)
391 .await
392 .map_err(SqliteEnvelopeStoreError::MigrationError)?;
393
394 Ok(())
395 }
396
397 async fn create_spool_directory(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
399 let Some(parent) = path.parent() else {
400 return Ok(());
401 };
402
403 if !parent.as_os_str().is_empty() && !parent.exists() {
404 relay_log::debug!("creating directory for spooling file: {}", parent.display());
405 DirBuilder::new()
406 .recursive(true)
407 .create(&parent)
408 .await
409 .map_err(SqliteEnvelopeStoreError::FileSetupError)?;
410 }
411
412 Ok(())
413 }
414
415 pub async fn insert_batch(
417 &mut self,
418 envelopes: DatabaseBatch,
419 ) -> Result<(), SqliteEnvelopeStoreError> {
420 let DatabaseBatch {
421 received_at,
422 own_key,
423 sampling_key,
424 envelopes,
425 } = envelopes;
426
427 let count = envelopes.len();
428 let encoded = match count {
429 0 => {
430 debug_assert!(false, "should not be called with empty batch");
431 return Ok(());
432 }
433 1 => envelopes.into_iter().next().unwrap().encoded_envelope,
436 _more => pack_envelopes(envelopes),
437 };
438
439 let query = sqlx::query("INSERT INTO envelopes (received_at, own_key, sampling_key, count, envelope) VALUES (?, ?, ?, ?, ?);")
440 .bind(received_at)
441 .bind(own_key.as_str())
442 .bind(sampling_key.as_str())
443 .bind(count as u16)
444 .bind(encoded);
445
446 relay_statsd::metric!(
447 timer(RelayTimers::BufferSqlWrite),
448 partition_id = &self.partition_tag,
449 {
450 query
451 .execute(&self.db)
452 .await
453 .map_err(SqliteEnvelopeStoreError::WriteError)?;
454 }
455 );
456 Ok(())
457 }
458
459 pub async fn delete_batch(
461 &mut self,
462 own_key: ProjectKey,
463 sampling_key: ProjectKey,
464 ) -> Result<Option<DatabaseBatch>, SqliteEnvelopeStoreError> {
465 let mut rows = build_delete_and_fetch_many_envelopes(own_key, sampling_key).fetch(&self.db);
466 let Some(row) = rows.as_mut().next().await else {
467 return Ok(None);
468 };
469 let row = row.map_err(SqliteEnvelopeStoreError::FetchError)?;
470
471 Ok(Some(extract_batch(own_key, sampling_key, row)?))
472 }
473
474 pub async fn project_key_pairs(
477 &self,
478 ) -> Result<HashSet<ProjectKeyPair>, SqliteEnvelopeStoreError> {
479 let project_key_pairs = build_get_project_key_pairs()
480 .fetch_all(&self.db)
481 .await
482 .map_err(SqliteEnvelopeStoreError::FetchError)?;
483
484 let project_key_pairs = project_key_pairs
485 .into_iter()
486 .filter_map(|project_key_pair| extract_project_key_pair(project_key_pair).ok())
488 .collect();
489
490 Ok(project_key_pairs)
491 }
492
493 pub fn usage(&self) -> u64 {
495 self.disk_usage.usage()
496 }
497
498 pub async fn total_count(&self) -> Result<u64, SqliteEnvelopeStoreError> {
500 let row = build_count_all()
501 .fetch_one(&self.db)
502 .await
503 .map_err(SqliteEnvelopeStoreError::FetchError)?;
504
505 let total_count: i64 = row.get(0);
506 Ok(total_count as u64)
507 }
508}
509
510fn pack_envelopes(envelopes: Vec<DatabaseEnvelope>) -> Box<[u8]> {
511 let mut packed = vec![];
512 for envelope in envelopes {
513 packed.extend_from_slice(&envelope.received_at.to_le_bytes());
514 packed.extend_from_slice(&(envelope.encoded_envelope.len() as u32).to_le_bytes());
515 packed.extend_from_slice(&envelope.encoded_envelope);
516 }
517 packed.into_boxed_slice()
518}
519
520fn unpack_envelopes(
521 own_key: ProjectKey,
522 sampling_key: ProjectKey,
523 data: &[u8],
524) -> Result<Vec<DatabaseEnvelope>, std::io::Error> {
525 let mut envelopes = vec![];
526 let mut buf = data.reader();
527 loop {
528 let mut b = [0u8; 8];
529 match buf.read(&mut b)? {
530 0 => break,
532 n if n != b.len() => return Err(ErrorKind::UnexpectedEof.into()),
534 _ => {}
535 }
536 let received_at = i64::from_le_bytes(b);
537
538 let mut b = [0u8; 4];
539 buf.read_exact(&mut b)?;
540 let size = u32::from_le_bytes(b);
541
542 let mut b = vec![0u8; size as usize];
543 buf.read_exact(&mut b)?;
544
545 envelopes.push(DatabaseEnvelope {
546 received_at,
547 own_key,
548 sampling_key,
549 encoded_envelope: b.into_boxed_slice(),
550 });
551 }
552 Ok(envelopes)
553}
554
555fn extract_batch(
557 own_key: ProjectKey,
558 sampling_key: ProjectKey,
559 row: SqliteRow,
560) -> Result<DatabaseBatch, SqliteEnvelopeStoreError> {
561 let received_at: i64 = row
562 .try_get("received_at")
563 .map_err(SqliteEnvelopeStoreError::FetchError)?;
564 let data: Box<[u8]> = row
565 .try_get("envelope")
566 .map_err(SqliteEnvelopeStoreError::FetchError)?;
567 let count: u64 = row
568 .try_get("count")
569 .map_err(SqliteEnvelopeStoreError::FetchError)?;
570
571 let envelopes = match count {
572 0 => {
573 debug_assert!(false, "db should not contain empty row");
574 vec![]
575 }
576 1 => vec![DatabaseEnvelope {
577 received_at,
578 own_key,
579 sampling_key,
580 encoded_envelope: data,
581 }],
582 _more => unpack_envelopes(own_key, sampling_key, &data)?,
583 };
584
585 Ok(DatabaseBatch {
586 received_at,
587 own_key,
588 sampling_key,
589 envelopes,
590 })
591}
592
593fn extract_project_key_pair(row: SqliteRow) -> Result<ProjectKeyPair, SqliteEnvelopeStoreError> {
595 let own_key = row
596 .try_get("own_key")
597 .map_err(SqliteEnvelopeStoreError::FetchError)
598 .and_then(|key| {
599 ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
600 });
601 let sampling_key = row
602 .try_get("sampling_key")
603 .map_err(SqliteEnvelopeStoreError::FetchError)
604 .and_then(|key| {
605 ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
606 });
607
608 match (own_key, sampling_key) {
609 (Ok(own_key), Ok(sampling_key)) => Ok(ProjectKeyPair::new(own_key, sampling_key)),
610 (Err(err), _) | (_, Err(err)) => {
612 relay_log::error!("failed to extract a queue key from the spool record: {err}");
613
614 Err(err)
615 }
616 }
617}
618
619pub fn build_delete_and_fetch_many_envelopes<'a>(
621 own_key: ProjectKey,
622 project_key: ProjectKey,
623) -> Query<'a, Sqlite, SqliteArguments<'a>> {
624 sqlx::query(
625 "DELETE FROM
626 envelopes
627 WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ?
628 ORDER BY received_at DESC LIMIT 1)
629 RETURNING
630 received_at, own_key, sampling_key, envelope, count",
631 )
632 .bind(own_key.to_string())
633 .bind(project_key.to_string())
634}
635
636pub fn build_estimate_size<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
640 sqlx::query(
641 r#"SELECT (page_count - freelist_count) * page_size as size FROM pragma_page_count(), pragma_freelist_count(), pragma_page_size();"#,
642 )
643}
644
645pub fn build_get_project_key_pairs<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
647 sqlx::query("SELECT DISTINCT own_key, sampling_key FROM envelopes;")
648}
649
650pub fn build_count_all<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
655 sqlx::query("SELECT SUM(count) FROM envelopes;")
656}
657
658#[cfg(test)]
659mod tests {
660 use std::time::Duration;
661 use tokio::time::sleep;
662
663 use relay_base_schema::project::ProjectKey;
664
665 use super::*;
666 use crate::services::buffer::testutils::utils::{mock_envelopes, setup_db};
667
668 #[tokio::test]
669 async fn test_insert_and_delete_envelopes() {
670 let db = setup_db(true).await;
671 let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
672
673 let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
674 let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
675
676 let batches = [mock_envelopes(5), mock_envelopes(5)];
678 for batch in &batches {
679 assert!(
680 envelope_store
681 .insert_batch(
682 batch
683 .iter()
684 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
685 .collect::<Vec<_>>()
686 .try_into()
687 .unwrap()
688 )
689 .await
690 .is_ok()
691 );
692 }
693
694 let batch = envelope_store
696 .delete_batch(own_key, sampling_key)
697 .await
698 .unwrap()
699 .unwrap();
700 assert_eq!(batch.len(), 5);
701 for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
702 assert_eq!(
703 extracted_envelope.received_at().timestamp_millis(),
704 (&batches[1])[i].received_at().timestamp_millis()
705 );
706 }
707
708 let batch = envelope_store
710 .delete_batch(own_key, sampling_key)
711 .await
712 .unwrap()
713 .unwrap();
714 assert_eq!(batch.len(), 5);
715 for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
716 assert_eq!(
717 extracted_envelope.received_at().timestamp_millis(),
718 (&batches[0])[i].received_at().timestamp_millis()
719 );
720 }
721
722 assert!(
724 envelope_store
725 .delete_batch(own_key, sampling_key)
726 .await
727 .unwrap()
728 .is_none()
729 );
730 }
731
732 #[tokio::test]
733 async fn test_insert_and_delete_single() {
734 let db = setup_db(true).await;
735 let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
736
737 let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
738 let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
739
740 let inserted = mock_envelopes(1);
742
743 assert!(
744 envelope_store
745 .insert_batch(
746 inserted
747 .iter()
748 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
749 .collect::<Vec<_>>()
750 .try_into()
751 .unwrap()
752 )
753 .await
754 .is_ok()
755 );
756
757 let extracted = envelope_store
759 .delete_batch(own_key, sampling_key)
760 .await
761 .unwrap()
762 .unwrap();
763 assert_eq!(extracted.len(), 1);
764
765 assert_eq!(
766 extracted.envelopes[0].received_at().timestamp_millis(),
767 inserted[0].received_at().timestamp_millis()
768 );
769
770 assert!(
772 envelope_store
773 .delete_batch(own_key, sampling_key)
774 .await
775 .unwrap()
776 .is_none()
777 );
778 }
779
780 #[tokio::test]
781 async fn test_insert_and_get_project_keys_pairs() {
782 let db = setup_db(true).await;
783 let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
784
785 let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
786 let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
787
788 let envelopes = mock_envelopes(2);
790 envelope_store
791 .insert_batch(
792 envelopes
793 .into_iter()
794 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
795 .collect::<Vec<_>>()
796 .try_into()
797 .unwrap(),
798 )
799 .await
800 .unwrap();
801
802 let project_key_pairs = envelope_store.project_key_pairs().await.unwrap();
805 assert_eq!(project_key_pairs.len(), 1);
806 assert_eq!(
807 project_key_pairs.into_iter().last().unwrap(),
808 ProjectKeyPair::new(own_key, sampling_key)
809 );
810 }
811
812 #[tokio::test]
813 async fn test_estimate_disk_usage() {
814 let db = setup_db(true).await;
815 let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
816 let disk_usage = DiskUsage::prepare(0, db, Duration::from_millis(1))
817 .await
818 .unwrap();
819
820 let usage_1 = disk_usage.usage();
822 assert!(usage_1 > 0);
823
824 let envelopes = mock_envelopes(10);
826 store
827 .insert_batch(
828 envelopes
829 .into_iter()
830 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
831 .collect::<Vec<_>>()
832 .try_into()
833 .unwrap(),
834 )
835 .await
836 .unwrap();
837
838 sleep(Duration::from_millis(2)).await;
840
841 let usage_2 = disk_usage.usage();
843 assert!(usage_2 >= usage_1);
844 }
845
846 #[tokio::test]
847 async fn test_total_count() {
848 let db = setup_db(true).await;
849 let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
850
851 let envelopes = mock_envelopes(10);
852 store
853 .insert_batch(
854 envelopes
855 .iter()
856 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
857 .collect::<Vec<_>>()
858 .try_into()
859 .unwrap(),
860 )
861 .await
862 .unwrap();
863
864 assert_eq!(store.total_count().await.unwrap(), envelopes.len() as u64);
865 }
866}