1use std::io::{ErrorKind, Read};
2use std::path::Path;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::envelope::EnvelopeError;
8
9use crate::services::buffer::common::ProjectKeyPair;
10use crate::statsd::{RelayGauges, RelayHistograms, RelayTimers};
11use crate::Envelope;
12use bytes::{Buf, Bytes};
13use chrono::{DateTime, Utc};
14use futures::stream::StreamExt;
15use hashbrown::HashSet;
16use relay_base_schema::project::{ParseProjectKeyError, ProjectKey};
17use relay_config::Config;
18use serde::{Deserialize, Serialize};
19use sqlx::migrate::MigrateError;
20use sqlx::query::Query;
21use sqlx::sqlite::{
22 SqliteArguments, SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions,
23 SqliteRow, SqliteSynchronous,
24};
25use sqlx::{Pool, Row, Sqlite};
26use tokio::fs::DirBuilder;
27use tokio::time::sleep;
28
29const ZSTD_MAGIC_WORD: &[u8] = &[40, 181, 47, 253];
33
34#[derive(Clone, Debug, Serialize, Deserialize)]
36pub struct DatabaseEnvelope {
37 received_at: i64,
38 own_key: ProjectKey,
39 sampling_key: ProjectKey,
40 encoded_envelope: Box<[u8]>,
41}
42
43#[derive(Clone, Debug)]
44pub struct DatabaseBatch {
45 received_at: i64,
46 own_key: ProjectKey,
47 sampling_key: ProjectKey,
48 envelopes: Vec<DatabaseEnvelope>,
49}
50
51impl DatabaseBatch {
52 pub fn len(&self) -> usize {
53 self.envelopes.len()
54 }
55}
56
57impl TryFrom<Vec<DatabaseEnvelope>> for DatabaseBatch {
58 type Error = ();
59
60 fn try_from(envelopes: Vec<DatabaseEnvelope>) -> Result<Self, Self::Error> {
61 let Some(last) = envelopes.last() else {
62 return Err(());
63 };
64 Ok(Self {
65 received_at: last.received_at,
66 own_key: last.own_key,
67 sampling_key: last.sampling_key,
68 envelopes,
69 })
70 }
71}
72
73impl From<DatabaseBatch> for Vec<DatabaseEnvelope> {
74 fn from(value: DatabaseBatch) -> Self {
75 value.envelopes
76 }
77}
78
79#[derive(Debug, thiserror::Error)]
80pub enum InsertEnvelopeError {
81 #[error("envelope conversion error: {0}")]
82 Envelope(#[from] EnvelopeError),
83 #[error("compression error: {0}")]
84 Zstd(#[from] std::io::Error),
85}
86
87impl DatabaseEnvelope {
88 const COMPRESSION_LEVEL: i32 = 1;
93
94 pub fn len(&self) -> usize {
95 self.encoded_envelope.len()
96 }
97
98 pub fn received_at(&self) -> DateTime<Utc> {
99 DateTime::from_timestamp_millis(self.received_at).unwrap_or(Utc::now())
100 }
101}
102
103impl TryFrom<DatabaseEnvelope> for Box<Envelope> {
104 type Error = InsertEnvelopeError;
105
106 fn try_from(value: DatabaseEnvelope) -> Result<Self, Self::Error> {
107 let received_at = value.received_at();
108 let DatabaseEnvelope {
109 received_at: _,
110 own_key,
111 sampling_key,
112 mut encoded_envelope,
113 } = value;
114
115 if encoded_envelope.starts_with(ZSTD_MAGIC_WORD) {
116 relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeDecompression), {
117 encoded_envelope = zstd::decode_all(&*encoded_envelope)?.into_boxed_slice();
118 });
119 }
120
121 let mut envelope = Envelope::parse_bytes(Bytes::from(encoded_envelope))?;
122 debug_assert_eq!(envelope.meta().public_key(), own_key);
123 debug_assert!(envelope
124 .sampling_key()
125 .is_none_or(|key| key == sampling_key));
126
127 envelope.set_received_at(received_at);
128
129 Ok(envelope)
130 }
131}
132
133impl<'a> TryFrom<&'a Envelope> for DatabaseEnvelope {
134 type Error = InsertEnvelopeError;
135
136 fn try_from(value: &'a Envelope) -> Result<Self, Self::Error> {
137 let own_key = value.meta().public_key();
138 let sampling_key = value.sampling_key().unwrap_or(own_key);
139
140 let serialized_envelope = value.to_vec()?;
141 relay_statsd::metric!(
142 histogram(RelayHistograms::BufferEnvelopeSize) = serialized_envelope.len() as u64
143 );
144
145 let encoded_envelope =
146 relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeCompression), {
147 zstd::encode_all(serialized_envelope.as_slice(), Self::COMPRESSION_LEVEL)?
148 });
149 relay_statsd::metric!(
150 histogram(RelayHistograms::BufferEnvelopeSizeCompressed) =
151 encoded_envelope.len() as u64
152 );
153
154 Ok(DatabaseEnvelope {
155 received_at: value.received_at().timestamp_millis(),
156 own_key,
157 sampling_key,
158 encoded_envelope: encoded_envelope.into_boxed_slice(),
159 })
160 }
161}
162
163#[derive(Debug, thiserror::Error)]
165pub enum SqliteEnvelopeStoreError {
166 #[error("failed to setup the database: {0}")]
167 SqlxSetupFailed(sqlx::Error),
168
169 #[error("failed to create the spool file: {0}")]
170 FileSetupError(std::io::Error),
171
172 #[error("failed to write to disk: {0}")]
173 WriteError(sqlx::Error),
174
175 #[error("failed to read from disk: {0}")]
176 FetchError(sqlx::Error),
177
178 #[error("failed to unpack envelopes: {0}")]
179 UnpackError(#[from] std::io::Error),
180
181 #[error("no file path for the spool was provided")]
182 NoFilePath,
183
184 #[error("failed to migrate the database: {0}")]
185 MigrationError(MigrateError),
186
187 #[error("failed to extract the envelope from the database")]
188 EnvelopeExtractionError,
189
190 #[error("failed to extract a project key from the database")]
191 ProjectKeyExtractionError(#[from] ParseProjectKeyError),
192
193 #[error("failed to get database file size: {0}")]
194 FileSizeReadFailed(sqlx::Error),
195}
196
197#[derive(Debug, Clone)]
198struct DiskUsage {
199 db: Pool<Sqlite>,
200 last_known_usage: Arc<AtomicU64>,
201 refresh_frequency: Duration,
202 partition_tag: String,
203}
204
205impl DiskUsage {
206 fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
208 Self {
209 db,
210 last_known_usage: Arc::new(AtomicU64::new(0)),
211 refresh_frequency,
212 partition_tag: partition_id.to_string(),
213 }
214 }
215
216 pub async fn prepare(
219 partition_id: u8,
220 db: Pool<Sqlite>,
221 refresh_frequency: Duration,
222 ) -> Result<Self, SqliteEnvelopeStoreError> {
223 let disk_usage = Self::new(partition_id, db.clone(), refresh_frequency);
224 let usage = Self::estimate_usage(&disk_usage.partition_tag, &db).await?;
225 disk_usage.last_known_usage.store(usage, Ordering::Relaxed);
226 disk_usage.start_background_refresh();
227
228 Ok(disk_usage)
229 }
230
231 fn usage(&self) -> u64 {
234 self.last_known_usage.load(Ordering::Relaxed)
235 }
236
237 fn start_background_refresh(&self) {
239 let db = self.db.clone();
240 let last_known_usage_weak = Arc::downgrade(&self.last_known_usage);
243 let refresh_frequency = self.refresh_frequency;
244
245 let partition_tag = self.partition_tag.clone();
246 relay_system::spawn!(async move {
247 loop {
248 let Some(last_known_usage) = last_known_usage_weak.upgrade() else {
251 break;
252 };
253
254 let usage = Self::estimate_usage(&partition_tag, &db).await;
255 let Ok(usage) = usage else {
256 relay_log::error!("failed to update the disk usage asynchronously");
257 return;
258 };
259
260 let current = last_known_usage.load(Ordering::Relaxed);
261 if last_known_usage
262 .compare_exchange_weak(current, usage, Ordering::Relaxed, Ordering::Relaxed)
263 .is_err()
264 {
265 relay_log::error!("failed to update the disk usage asynchronously");
266 };
267
268 sleep(refresh_frequency).await;
269 }
270 });
271 }
272
273 async fn estimate_usage(
275 partition_tag: &str,
276 db: &Pool<Sqlite>,
277 ) -> Result<u64, SqliteEnvelopeStoreError> {
278 let usage: i64 = build_estimate_size()
279 .fetch_one(db)
280 .await
281 .and_then(|r| r.try_get(0))
282 .map_err(SqliteEnvelopeStoreError::FileSizeReadFailed)?;
283
284 relay_statsd::metric!(
285 gauge(RelayGauges::BufferDiskUsed) = usage as u64,
286 partition_id = partition_tag
287 );
288
289 Ok(usage as u64)
290 }
291}
292
293#[derive(Debug, Clone)]
298pub struct SqliteEnvelopeStore {
299 db: Pool<Sqlite>,
300 disk_usage: DiskUsage,
301 partition_tag: String,
302}
303
304impl SqliteEnvelopeStore {
305 pub fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
307 Self {
308 db: db.clone(),
309 disk_usage: DiskUsage::new(partition_id, db, refresh_frequency),
310 partition_tag: partition_id.to_string(),
311 }
312 }
313
314 pub async fn prepare(
317 partition_id: u8,
318 config: &Config,
319 ) -> Result<SqliteEnvelopeStore, SqliteEnvelopeStoreError> {
320 let Some(path) = config.spool_envelopes_path(partition_id) else {
322 return Err(SqliteEnvelopeStoreError::NoFilePath);
323 };
324
325 relay_log::info!("buffer file {}", path.to_string_lossy());
326
327 Self::setup(&path).await?;
328
329 let options = SqliteConnectOptions::new()
330 .filename(&path)
331 .journal_mode(SqliteJournalMode::Wal)
340 .synchronous(SqliteSynchronous::Normal)
344 .auto_vacuum(SqliteAutoVacuum::Full)
350 .shared_cache(true);
354
355 let db = SqlitePoolOptions::new()
356 .max_connections(1)
357 .min_connections(1)
358 .connect_with(options)
359 .await
360 .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
361
362 Ok(SqliteEnvelopeStore {
363 db: db.clone(),
364 disk_usage: DiskUsage::prepare(
365 partition_id,
366 db,
367 config.spool_disk_usage_refresh_frequency_ms(),
368 )
369 .await?,
370 partition_tag: partition_id.to_string(),
371 })
372 }
373
374 async fn setup(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
379 Self::create_spool_directory(path).await?;
380
381 let options = SqliteConnectOptions::new()
382 .filename(path)
383 .journal_mode(SqliteJournalMode::Wal)
384 .create_if_missing(true);
385
386 let db = SqlitePoolOptions::new()
387 .connect_with(options)
388 .await
389 .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
390
391 sqlx::migrate!("../migrations")
392 .run(&db)
393 .await
394 .map_err(SqliteEnvelopeStoreError::MigrationError)?;
395
396 Ok(())
397 }
398
399 async fn create_spool_directory(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
401 let Some(parent) = path.parent() else {
402 return Ok(());
403 };
404
405 if !parent.as_os_str().is_empty() && !parent.exists() {
406 relay_log::debug!("creating directory for spooling file: {}", parent.display());
407 DirBuilder::new()
408 .recursive(true)
409 .create(&parent)
410 .await
411 .map_err(SqliteEnvelopeStoreError::FileSetupError)?;
412 }
413
414 Ok(())
415 }
416
417 pub async fn insert_batch(
419 &mut self,
420 envelopes: DatabaseBatch,
421 ) -> Result<(), SqliteEnvelopeStoreError> {
422 let DatabaseBatch {
423 received_at,
424 own_key,
425 sampling_key,
426 envelopes,
427 } = envelopes;
428
429 let count = envelopes.len();
430 let encoded = match count {
431 0 => {
432 debug_assert!(false, "should not be called with empty batch");
433 return Ok(());
434 }
435 1 => envelopes.into_iter().next().unwrap().encoded_envelope,
438 _more => pack_envelopes(envelopes),
439 };
440
441 let query = sqlx::query("INSERT INTO envelopes (received_at, own_key, sampling_key, count, envelope) VALUES (?, ?, ?, ?, ?);")
442 .bind(received_at)
443 .bind(own_key.as_str())
444 .bind(sampling_key.as_str())
445 .bind(count as u16)
446 .bind(encoded);
447
448 relay_statsd::metric!(
449 timer(RelayTimers::BufferSqlWrite),
450 partition_id = &self.partition_tag,
451 {
452 query
453 .execute(&self.db)
454 .await
455 .map_err(SqliteEnvelopeStoreError::WriteError)?;
456 }
457 );
458 Ok(())
459 }
460
461 pub async fn delete_batch(
463 &mut self,
464 own_key: ProjectKey,
465 sampling_key: ProjectKey,
466 ) -> Result<Option<DatabaseBatch>, SqliteEnvelopeStoreError> {
467 let mut rows = build_delete_and_fetch_many_envelopes(own_key, sampling_key).fetch(&self.db);
468 let Some(row) = rows.as_mut().next().await else {
469 return Ok(None);
470 };
471 let row = row.map_err(SqliteEnvelopeStoreError::FetchError)?;
472
473 Ok(Some(extract_batch(own_key, sampling_key, row)?))
474 }
475
476 pub async fn project_key_pairs(
479 &self,
480 ) -> Result<HashSet<ProjectKeyPair>, SqliteEnvelopeStoreError> {
481 let project_key_pairs = build_get_project_key_pairs()
482 .fetch_all(&self.db)
483 .await
484 .map_err(SqliteEnvelopeStoreError::FetchError)?;
485
486 let project_key_pairs = project_key_pairs
487 .into_iter()
488 .filter_map(|project_key_pair| extract_project_key_pair(project_key_pair).ok())
490 .collect();
491
492 Ok(project_key_pairs)
493 }
494
495 pub fn usage(&self) -> u64 {
497 self.disk_usage.usage()
498 }
499
500 pub async fn total_count(&self) -> Result<u64, SqliteEnvelopeStoreError> {
502 let row = build_count_all()
503 .fetch_one(&self.db)
504 .await
505 .map_err(SqliteEnvelopeStoreError::FetchError)?;
506
507 let total_count: i64 = row.get(0);
508 Ok(total_count as u64)
509 }
510}
511
512fn pack_envelopes(envelopes: Vec<DatabaseEnvelope>) -> Box<[u8]> {
513 let mut packed = vec![];
514 for envelope in envelopes {
515 packed.extend_from_slice(&envelope.received_at.to_le_bytes());
516 packed.extend_from_slice(&(envelope.encoded_envelope.len() as u32).to_le_bytes());
517 packed.extend_from_slice(&envelope.encoded_envelope);
518 }
519 packed.into_boxed_slice()
520}
521
522fn unpack_envelopes(
523 own_key: ProjectKey,
524 sampling_key: ProjectKey,
525 data: &[u8],
526) -> Result<Vec<DatabaseEnvelope>, std::io::Error> {
527 let mut envelopes = vec![];
528 let mut buf = data.reader();
529 loop {
530 let mut b = [0u8; 8];
531 match buf.read(&mut b)? {
532 0 => break,
534 n if n != b.len() => return Err(ErrorKind::UnexpectedEof.into()),
536 _ => {}
537 }
538 let received_at = i64::from_le_bytes(b);
539
540 let mut b = [0u8; 4];
541 buf.read_exact(&mut b)?;
542 let size = u32::from_le_bytes(b);
543
544 let mut b = vec![0u8; size as usize];
545 buf.read_exact(&mut b)?;
546
547 envelopes.push(DatabaseEnvelope {
548 received_at,
549 own_key,
550 sampling_key,
551 encoded_envelope: b.into_boxed_slice(),
552 });
553 }
554 Ok(envelopes)
555}
556
557fn extract_batch(
559 own_key: ProjectKey,
560 sampling_key: ProjectKey,
561 row: SqliteRow,
562) -> Result<DatabaseBatch, SqliteEnvelopeStoreError> {
563 let received_at: i64 = row
564 .try_get("received_at")
565 .map_err(SqliteEnvelopeStoreError::FetchError)?;
566 let data: Box<[u8]> = row
567 .try_get("envelope")
568 .map_err(SqliteEnvelopeStoreError::FetchError)?;
569 let count: u64 = row
570 .try_get("count")
571 .map_err(SqliteEnvelopeStoreError::FetchError)?;
572
573 let envelopes = match count {
574 0 => {
575 debug_assert!(false, "db should not contain empty row");
576 vec![]
577 }
578 1 => vec![DatabaseEnvelope {
579 received_at,
580 own_key,
581 sampling_key,
582 encoded_envelope: data,
583 }],
584 _more => unpack_envelopes(own_key, sampling_key, &data)?,
585 };
586
587 Ok(DatabaseBatch {
588 received_at,
589 own_key,
590 sampling_key,
591 envelopes,
592 })
593}
594
595fn extract_project_key_pair(row: SqliteRow) -> Result<ProjectKeyPair, SqliteEnvelopeStoreError> {
597 let own_key = row
598 .try_get("own_key")
599 .map_err(SqliteEnvelopeStoreError::FetchError)
600 .and_then(|key| {
601 ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
602 });
603 let sampling_key = row
604 .try_get("sampling_key")
605 .map_err(SqliteEnvelopeStoreError::FetchError)
606 .and_then(|key| {
607 ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
608 });
609
610 match (own_key, sampling_key) {
611 (Ok(own_key), Ok(sampling_key)) => Ok(ProjectKeyPair::new(own_key, sampling_key)),
612 (Err(err), _) | (_, Err(err)) => {
614 relay_log::error!("failed to extract a queue key from the spool record: {err}");
615
616 Err(err)
617 }
618 }
619}
620
621pub fn build_delete_and_fetch_many_envelopes<'a>(
623 own_key: ProjectKey,
624 project_key: ProjectKey,
625) -> Query<'a, Sqlite, SqliteArguments<'a>> {
626 sqlx::query(
627 "DELETE FROM
628 envelopes
629 WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ?
630 ORDER BY received_at DESC LIMIT 1)
631 RETURNING
632 received_at, own_key, sampling_key, envelope, count",
633 )
634 .bind(own_key.to_string())
635 .bind(project_key.to_string())
636}
637
638pub fn build_estimate_size<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
642 sqlx::query(
643 r#"SELECT (page_count - freelist_count) * page_size as size FROM pragma_page_count(), pragma_freelist_count(), pragma_page_size();"#,
644 )
645}
646
647pub fn build_get_project_key_pairs<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
649 sqlx::query("SELECT DISTINCT own_key, sampling_key FROM envelopes;")
650}
651
652pub fn build_count_all<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
657 sqlx::query("SELECT SUM(count) FROM envelopes;")
658}
659
660#[cfg(test)]
661mod tests {
662 use std::time::Duration;
663 use tokio::time::sleep;
664
665 use relay_base_schema::project::ProjectKey;
666
667 use super::*;
668 use crate::services::buffer::testutils::utils::{mock_envelopes, setup_db};
669
670 #[tokio::test]
671 async fn test_insert_and_delete_envelopes() {
672 let db = setup_db(true).await;
673 let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
674
675 let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
676 let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
677
678 let batches = [mock_envelopes(5), mock_envelopes(5)];
680 for batch in &batches {
681 assert!(envelope_store
682 .insert_batch(
683 batch
684 .iter()
685 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
686 .collect::<Vec<_>>()
687 .try_into()
688 .unwrap()
689 )
690 .await
691 .is_ok());
692 }
693
694 let batch = envelope_store
696 .delete_batch(own_key, sampling_key)
697 .await
698 .unwrap()
699 .unwrap();
700 assert_eq!(batch.len(), 5);
701 for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
702 assert_eq!(
703 extracted_envelope.received_at().timestamp_millis(),
704 (&batches[1])[i].received_at().timestamp_millis()
705 );
706 }
707
708 let batch = envelope_store
710 .delete_batch(own_key, sampling_key)
711 .await
712 .unwrap()
713 .unwrap();
714 assert_eq!(batch.len(), 5);
715 for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
716 assert_eq!(
717 extracted_envelope.received_at().timestamp_millis(),
718 (&batches[0])[i].received_at().timestamp_millis()
719 );
720 }
721
722 assert!(envelope_store
724 .delete_batch(own_key, sampling_key)
725 .await
726 .unwrap()
727 .is_none());
728 }
729
730 #[tokio::test]
731 async fn test_insert_and_delete_single() {
732 let db = setup_db(true).await;
733 let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
734
735 let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
736 let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
737
738 let inserted = mock_envelopes(1);
740
741 assert!(envelope_store
742 .insert_batch(
743 inserted
744 .iter()
745 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
746 .collect::<Vec<_>>()
747 .try_into()
748 .unwrap()
749 )
750 .await
751 .is_ok());
752
753 let extracted = envelope_store
755 .delete_batch(own_key, sampling_key)
756 .await
757 .unwrap()
758 .unwrap();
759 assert_eq!(extracted.len(), 1);
760
761 assert_eq!(
762 extracted.envelopes[0].received_at().timestamp_millis(),
763 inserted[0].received_at().timestamp_millis()
764 );
765
766 assert!(envelope_store
768 .delete_batch(own_key, sampling_key)
769 .await
770 .unwrap()
771 .is_none());
772 }
773
774 #[tokio::test]
775 async fn test_insert_and_get_project_keys_pairs() {
776 let db = setup_db(true).await;
777 let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
778
779 let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
780 let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
781
782 let envelopes = mock_envelopes(2);
784 envelope_store
785 .insert_batch(
786 envelopes
787 .into_iter()
788 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
789 .collect::<Vec<_>>()
790 .try_into()
791 .unwrap(),
792 )
793 .await
794 .unwrap();
795
796 let project_key_pairs = envelope_store.project_key_pairs().await.unwrap();
799 assert_eq!(project_key_pairs.len(), 1);
800 assert_eq!(
801 project_key_pairs.into_iter().last().unwrap(),
802 ProjectKeyPair::new(own_key, sampling_key)
803 );
804 }
805
806 #[tokio::test]
807 async fn test_estimate_disk_usage() {
808 let db = setup_db(true).await;
809 let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
810 let disk_usage = DiskUsage::prepare(0, db, Duration::from_millis(1))
811 .await
812 .unwrap();
813
814 let usage_1 = disk_usage.usage();
816 assert!(usage_1 > 0);
817
818 let envelopes = mock_envelopes(10);
820 store
821 .insert_batch(
822 envelopes
823 .into_iter()
824 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
825 .collect::<Vec<_>>()
826 .try_into()
827 .unwrap(),
828 )
829 .await
830 .unwrap();
831
832 sleep(Duration::from_millis(2)).await;
834
835 let usage_2 = disk_usage.usage();
837 assert!(usage_2 >= usage_1);
838 }
839
840 #[tokio::test]
841 async fn test_total_count() {
842 let db = setup_db(true).await;
843 let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
844
845 let envelopes = mock_envelopes(10);
846 store
847 .insert_batch(
848 envelopes
849 .iter()
850 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
851 .collect::<Vec<_>>()
852 .try_into()
853 .unwrap(),
854 )
855 .await
856 .unwrap();
857
858 assert_eq!(store.total_count().await.unwrap(), envelopes.len() as u64);
859 }
860}