1use std::io::{ErrorKind, Read};
2use std::path::Path;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::time::Duration;
6
7use crate::envelope::EnvelopeError;
8
9use crate::Envelope;
10use crate::services::buffer::common::ProjectKeyPair;
11use crate::statsd::{RelayGauges, RelayHistograms, RelayTimers};
12use bytes::{Buf, Bytes};
13use chrono::{DateTime, Utc};
14use futures::stream::StreamExt;
15use hashbrown::HashSet;
16use relay_base_schema::project::{ParseProjectKeyError, ProjectKey};
17use relay_config::Config;
18use serde::{Deserialize, Serialize};
19use sqlx::migrate::MigrateError;
20use sqlx::query::Query;
21use sqlx::sqlite::{
22 SqliteArguments, SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions,
23 SqliteRow, SqliteSynchronous,
24};
25use sqlx::{Pool, Row, Sqlite};
26use tokio::fs::DirBuilder;
27use tokio::time::sleep;
28
29const ZSTD_MAGIC_WORD: &[u8] = &[40, 181, 47, 253];
33
34#[derive(Clone, Debug, Serialize, Deserialize)]
36pub struct DatabaseEnvelope {
37 received_at: i64,
38 own_key: ProjectKey,
39 sampling_key: ProjectKey,
40 encoded_envelope: Box<[u8]>,
41}
42
43#[derive(Clone, Debug)]
44pub struct DatabaseBatch {
45 received_at: i64,
46 own_key: ProjectKey,
47 sampling_key: ProjectKey,
48 envelopes: Vec<DatabaseEnvelope>,
49}
50
51impl DatabaseBatch {
52 pub fn len(&self) -> usize {
53 self.envelopes.len()
54 }
55}
56
57impl TryFrom<Vec<DatabaseEnvelope>> for DatabaseBatch {
58 type Error = ();
59
60 fn try_from(envelopes: Vec<DatabaseEnvelope>) -> Result<Self, Self::Error> {
61 let Some(last) = envelopes.last() else {
62 return Err(());
63 };
64 Ok(Self {
65 received_at: last.received_at,
66 own_key: last.own_key,
67 sampling_key: last.sampling_key,
68 envelopes,
69 })
70 }
71}
72
73impl From<DatabaseBatch> for Vec<DatabaseEnvelope> {
74 fn from(value: DatabaseBatch) -> Self {
75 value.envelopes
76 }
77}
78
79#[derive(Debug, thiserror::Error)]
80pub enum InsertEnvelopeError {
81 #[error("envelope conversion error: {0}")]
82 Envelope(#[from] EnvelopeError),
83 #[error("compression error: {0}")]
84 Zstd(#[from] std::io::Error),
85}
86
87impl DatabaseEnvelope {
88 const COMPRESSION_LEVEL: i32 = 1;
93
94 pub fn len(&self) -> usize {
95 self.encoded_envelope.len()
96 }
97
98 pub fn received_at(&self) -> DateTime<Utc> {
99 DateTime::from_timestamp_millis(self.received_at).unwrap_or(Utc::now())
100 }
101}
102
103impl TryFrom<DatabaseEnvelope> for Box<Envelope> {
104 type Error = InsertEnvelopeError;
105
106 fn try_from(value: DatabaseEnvelope) -> Result<Self, Self::Error> {
107 let received_at = value.received_at();
108 let DatabaseEnvelope {
109 received_at: _,
110 own_key,
111 sampling_key,
112 mut encoded_envelope,
113 } = value;
114
115 if encoded_envelope.starts_with(ZSTD_MAGIC_WORD) {
116 relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeDecompression), {
117 encoded_envelope = zstd::decode_all(&*encoded_envelope)?.into_boxed_slice();
118 });
119 }
120
121 let mut envelope = Envelope::parse_bytes(Bytes::from(encoded_envelope))?;
122 debug_assert_eq!(envelope.meta().public_key(), own_key);
123 debug_assert!(
124 envelope
125 .sampling_key()
126 .is_none_or(|key| key == sampling_key)
127 );
128
129 envelope.set_received_at(received_at);
130
131 Ok(envelope)
132 }
133}
134
135impl<'a> TryFrom<&'a Envelope> for DatabaseEnvelope {
136 type Error = InsertEnvelopeError;
137
138 fn try_from(value: &'a Envelope) -> Result<Self, Self::Error> {
139 let own_key = value.meta().public_key();
140 let sampling_key = value.sampling_key().unwrap_or(own_key);
141
142 let serialized_envelope = value.to_vec()?;
143 relay_statsd::metric!(
144 histogram(RelayHistograms::BufferEnvelopeSize) = serialized_envelope.len() as u64
145 );
146
147 let encoded_envelope =
148 relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeCompression), {
149 zstd::encode_all(serialized_envelope.as_slice(), Self::COMPRESSION_LEVEL)?
150 });
151 relay_statsd::metric!(
152 histogram(RelayHistograms::BufferEnvelopeSizeCompressed) =
153 encoded_envelope.len() as u64
154 );
155
156 Ok(DatabaseEnvelope {
157 received_at: value.received_at().timestamp_millis(),
158 own_key,
159 sampling_key,
160 encoded_envelope: encoded_envelope.into_boxed_slice(),
161 })
162 }
163}
164
165#[derive(Debug, thiserror::Error)]
167pub enum SqliteEnvelopeStoreError {
168 #[error("failed to setup the database: {0}")]
169 SqlxSetupFailed(sqlx::Error),
170
171 #[error("failed to create the spool file: {0}")]
172 FileSetupError(std::io::Error),
173
174 #[error("failed to write to disk: {0}")]
175 WriteError(sqlx::Error),
176
177 #[error("failed to read from disk: {0}")]
178 FetchError(sqlx::Error),
179
180 #[error("failed to unpack envelopes: {0}")]
181 UnpackError(#[from] std::io::Error),
182
183 #[error("no file path for the spool was provided")]
184 NoFilePath,
185
186 #[error("failed to migrate the database: {0}")]
187 MigrationError(MigrateError),
188
189 #[error("failed to extract the envelope from the database")]
190 EnvelopeExtractionError,
191
192 #[error("failed to extract a project key from the database")]
193 ProjectKeyExtractionError(#[from] ParseProjectKeyError),
194
195 #[error("failed to get database file size: {0}")]
196 FileSizeReadFailed(sqlx::Error),
197}
198
199#[derive(Debug, Clone)]
200struct DiskUsage {
201 db: Pool<Sqlite>,
202 last_known_usage: Arc<AtomicU64>,
203 refresh_frequency: Duration,
204 partition_tag: String,
205}
206
207impl DiskUsage {
208 fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
210 Self {
211 db,
212 last_known_usage: Arc::new(AtomicU64::new(0)),
213 refresh_frequency,
214 partition_tag: partition_id.to_string(),
215 }
216 }
217
218 pub async fn prepare(
221 partition_id: u8,
222 db: Pool<Sqlite>,
223 refresh_frequency: Duration,
224 ) -> Result<Self, SqliteEnvelopeStoreError> {
225 let disk_usage = Self::new(partition_id, db.clone(), refresh_frequency);
226 let usage = Self::estimate_usage(&disk_usage.partition_tag, &db).await?;
227 disk_usage.last_known_usage.store(usage, Ordering::Relaxed);
228 disk_usage.start_background_refresh();
229
230 Ok(disk_usage)
231 }
232
233 fn usage(&self) -> u64 {
236 self.last_known_usage.load(Ordering::Relaxed)
237 }
238
239 fn start_background_refresh(&self) {
241 let db = self.db.clone();
242 let last_known_usage_weak = Arc::downgrade(&self.last_known_usage);
245 let refresh_frequency = self.refresh_frequency;
246
247 let partition_tag = self.partition_tag.clone();
248 relay_system::spawn!(async move {
249 loop {
250 let Some(last_known_usage) = last_known_usage_weak.upgrade() else {
253 break;
254 };
255
256 let usage = Self::estimate_usage(&partition_tag, &db).await;
257 let Ok(usage) = usage else {
258 relay_log::error!("failed to update the disk usage asynchronously");
259 return;
260 };
261
262 let current = last_known_usage.load(Ordering::Relaxed);
263 if last_known_usage
264 .compare_exchange_weak(current, usage, Ordering::Relaxed, Ordering::Relaxed)
265 .is_err()
266 {
267 relay_log::error!("failed to update the disk usage asynchronously");
268 };
269
270 sleep(refresh_frequency).await;
271 }
272 });
273 }
274
275 async fn estimate_usage(
277 partition_tag: &str,
278 db: &Pool<Sqlite>,
279 ) -> Result<u64, SqliteEnvelopeStoreError> {
280 let usage: i64 = build_estimate_size()
281 .fetch_one(db)
282 .await
283 .and_then(|r| r.try_get(0))
284 .map_err(SqliteEnvelopeStoreError::FileSizeReadFailed)?;
285
286 relay_statsd::metric!(
287 gauge(RelayGauges::BufferDiskUsed) = usage as u64,
288 partition_id = partition_tag
289 );
290
291 Ok(usage as u64)
292 }
293}
294
295#[derive(Debug, Clone)]
300pub struct SqliteEnvelopeStore {
301 db: Pool<Sqlite>,
302 disk_usage: DiskUsage,
303 partition_tag: String,
304}
305
306impl SqliteEnvelopeStore {
307 pub fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
309 Self {
310 db: db.clone(),
311 disk_usage: DiskUsage::new(partition_id, db, refresh_frequency),
312 partition_tag: partition_id.to_string(),
313 }
314 }
315
316 pub async fn prepare(
319 partition_id: u8,
320 config: &Config,
321 ) -> Result<SqliteEnvelopeStore, SqliteEnvelopeStoreError> {
322 let Some(path) = config.spool_envelopes_path(partition_id) else {
324 return Err(SqliteEnvelopeStoreError::NoFilePath);
325 };
326
327 relay_log::info!("buffer file {}", path.to_string_lossy());
328
329 Self::setup(&path).await?;
330
331 let options = SqliteConnectOptions::new()
332 .filename(&path)
333 .journal_mode(SqliteJournalMode::Wal)
342 .synchronous(SqliteSynchronous::Normal)
346 .auto_vacuum(SqliteAutoVacuum::Full)
352 .shared_cache(true);
356
357 let db = SqlitePoolOptions::new()
358 .max_connections(1)
359 .min_connections(1)
360 .connect_with(options)
361 .await
362 .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
363
364 Ok(SqliteEnvelopeStore {
365 db: db.clone(),
366 disk_usage: DiskUsage::prepare(
367 partition_id,
368 db,
369 config.spool_disk_usage_refresh_frequency_ms(),
370 )
371 .await?,
372 partition_tag: partition_id.to_string(),
373 })
374 }
375
376 async fn setup(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
381 Self::create_spool_directory(path).await?;
382
383 let options = SqliteConnectOptions::new()
384 .filename(path)
385 .journal_mode(SqliteJournalMode::Wal)
386 .create_if_missing(true);
387
388 let db = SqlitePoolOptions::new()
389 .connect_with(options)
390 .await
391 .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
392
393 sqlx::migrate!("../migrations")
394 .run(&db)
395 .await
396 .map_err(SqliteEnvelopeStoreError::MigrationError)?;
397
398 Ok(())
399 }
400
401 async fn create_spool_directory(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
403 let Some(parent) = path.parent() else {
404 return Ok(());
405 };
406
407 if !parent.as_os_str().is_empty() && !parent.exists() {
408 relay_log::debug!("creating directory for spooling file: {}", parent.display());
409 DirBuilder::new()
410 .recursive(true)
411 .create(&parent)
412 .await
413 .map_err(SqliteEnvelopeStoreError::FileSetupError)?;
414 }
415
416 Ok(())
417 }
418
419 pub async fn insert_batch(
421 &mut self,
422 envelopes: DatabaseBatch,
423 ) -> Result<(), SqliteEnvelopeStoreError> {
424 let DatabaseBatch {
425 received_at,
426 own_key,
427 sampling_key,
428 envelopes,
429 } = envelopes;
430
431 let count = envelopes.len();
432 let encoded = match count {
433 0 => {
434 debug_assert!(false, "should not be called with empty batch");
435 return Ok(());
436 }
437 1 => envelopes.into_iter().next().unwrap().encoded_envelope,
440 _more => pack_envelopes(envelopes),
441 };
442
443 let query = sqlx::query("INSERT INTO envelopes (received_at, own_key, sampling_key, count, envelope) VALUES (?, ?, ?, ?, ?);")
444 .bind(received_at)
445 .bind(own_key.as_str())
446 .bind(sampling_key.as_str())
447 .bind(count as u16)
448 .bind(encoded);
449
450 relay_statsd::metric!(
451 timer(RelayTimers::BufferSqlWrite),
452 partition_id = &self.partition_tag,
453 {
454 query
455 .execute(&self.db)
456 .await
457 .map_err(SqliteEnvelopeStoreError::WriteError)?;
458 }
459 );
460 Ok(())
461 }
462
463 pub async fn delete_batch(
465 &mut self,
466 own_key: ProjectKey,
467 sampling_key: ProjectKey,
468 ) -> Result<Option<DatabaseBatch>, SqliteEnvelopeStoreError> {
469 let mut rows = build_delete_and_fetch_many_envelopes(own_key, sampling_key).fetch(&self.db);
470 let Some(row) = rows.as_mut().next().await else {
471 return Ok(None);
472 };
473 let row = row.map_err(SqliteEnvelopeStoreError::FetchError)?;
474
475 Ok(Some(extract_batch(own_key, sampling_key, row)?))
476 }
477
478 pub async fn project_key_pairs(
481 &self,
482 ) -> Result<HashSet<ProjectKeyPair>, SqliteEnvelopeStoreError> {
483 let project_key_pairs = build_get_project_key_pairs()
484 .fetch_all(&self.db)
485 .await
486 .map_err(SqliteEnvelopeStoreError::FetchError)?;
487
488 let project_key_pairs = project_key_pairs
489 .into_iter()
490 .filter_map(|project_key_pair| extract_project_key_pair(project_key_pair).ok())
492 .collect();
493
494 Ok(project_key_pairs)
495 }
496
497 pub fn usage(&self) -> u64 {
499 self.disk_usage.usage()
500 }
501
502 pub async fn total_count(&self) -> Result<u64, SqliteEnvelopeStoreError> {
504 let row = build_count_all()
505 .fetch_one(&self.db)
506 .await
507 .map_err(SqliteEnvelopeStoreError::FetchError)?;
508
509 let total_count: i64 = row.get(0);
510 Ok(total_count as u64)
511 }
512}
513
514fn pack_envelopes(envelopes: Vec<DatabaseEnvelope>) -> Box<[u8]> {
515 let mut packed = vec![];
516 for envelope in envelopes {
517 packed.extend_from_slice(&envelope.received_at.to_le_bytes());
518 packed.extend_from_slice(&(envelope.encoded_envelope.len() as u32).to_le_bytes());
519 packed.extend_from_slice(&envelope.encoded_envelope);
520 }
521 packed.into_boxed_slice()
522}
523
524fn unpack_envelopes(
525 own_key: ProjectKey,
526 sampling_key: ProjectKey,
527 data: &[u8],
528) -> Result<Vec<DatabaseEnvelope>, std::io::Error> {
529 let mut envelopes = vec![];
530 let mut buf = data.reader();
531 loop {
532 let mut b = [0u8; 8];
533 match buf.read(&mut b)? {
534 0 => break,
536 n if n != b.len() => return Err(ErrorKind::UnexpectedEof.into()),
538 _ => {}
539 }
540 let received_at = i64::from_le_bytes(b);
541
542 let mut b = [0u8; 4];
543 buf.read_exact(&mut b)?;
544 let size = u32::from_le_bytes(b);
545
546 let mut b = vec![0u8; size as usize];
547 buf.read_exact(&mut b)?;
548
549 envelopes.push(DatabaseEnvelope {
550 received_at,
551 own_key,
552 sampling_key,
553 encoded_envelope: b.into_boxed_slice(),
554 });
555 }
556 Ok(envelopes)
557}
558
559fn extract_batch(
561 own_key: ProjectKey,
562 sampling_key: ProjectKey,
563 row: SqliteRow,
564) -> Result<DatabaseBatch, SqliteEnvelopeStoreError> {
565 let received_at: i64 = row
566 .try_get("received_at")
567 .map_err(SqliteEnvelopeStoreError::FetchError)?;
568 let data: Box<[u8]> = row
569 .try_get("envelope")
570 .map_err(SqliteEnvelopeStoreError::FetchError)?;
571 let count: u64 = row
572 .try_get("count")
573 .map_err(SqliteEnvelopeStoreError::FetchError)?;
574
575 let envelopes = match count {
576 0 => {
577 debug_assert!(false, "db should not contain empty row");
578 vec![]
579 }
580 1 => vec![DatabaseEnvelope {
581 received_at,
582 own_key,
583 sampling_key,
584 encoded_envelope: data,
585 }],
586 _more => unpack_envelopes(own_key, sampling_key, &data)?,
587 };
588
589 Ok(DatabaseBatch {
590 received_at,
591 own_key,
592 sampling_key,
593 envelopes,
594 })
595}
596
597fn extract_project_key_pair(row: SqliteRow) -> Result<ProjectKeyPair, SqliteEnvelopeStoreError> {
599 let own_key = row
600 .try_get("own_key")
601 .map_err(SqliteEnvelopeStoreError::FetchError)
602 .and_then(|key| {
603 ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
604 });
605 let sampling_key = row
606 .try_get("sampling_key")
607 .map_err(SqliteEnvelopeStoreError::FetchError)
608 .and_then(|key| {
609 ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
610 });
611
612 match (own_key, sampling_key) {
613 (Ok(own_key), Ok(sampling_key)) => Ok(ProjectKeyPair::new(own_key, sampling_key)),
614 (Err(err), _) | (_, Err(err)) => {
616 relay_log::error!("failed to extract a queue key from the spool record: {err}");
617
618 Err(err)
619 }
620 }
621}
622
623pub fn build_delete_and_fetch_many_envelopes<'a>(
625 own_key: ProjectKey,
626 project_key: ProjectKey,
627) -> Query<'a, Sqlite, SqliteArguments<'a>> {
628 sqlx::query(
629 "DELETE FROM
630 envelopes
631 WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ?
632 ORDER BY received_at DESC LIMIT 1)
633 RETURNING
634 received_at, own_key, sampling_key, envelope, count",
635 )
636 .bind(own_key.to_string())
637 .bind(project_key.to_string())
638}
639
640pub fn build_estimate_size<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
644 sqlx::query(
645 r#"SELECT (page_count - freelist_count) * page_size as size FROM pragma_page_count(), pragma_freelist_count(), pragma_page_size();"#,
646 )
647}
648
649pub fn build_get_project_key_pairs<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
651 sqlx::query("SELECT DISTINCT own_key, sampling_key FROM envelopes;")
652}
653
654pub fn build_count_all<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
659 sqlx::query("SELECT SUM(count) FROM envelopes;")
660}
661
662#[cfg(test)]
663mod tests {
664 use std::time::Duration;
665 use tokio::time::sleep;
666
667 use relay_base_schema::project::ProjectKey;
668
669 use super::*;
670 use crate::services::buffer::testutils::utils::{mock_envelopes, setup_db};
671
672 #[tokio::test]
673 async fn test_insert_and_delete_envelopes() {
674 let db = setup_db(true).await;
675 let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
676
677 let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
678 let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
679
680 let batches = [mock_envelopes(5), mock_envelopes(5)];
682 for batch in &batches {
683 assert!(
684 envelope_store
685 .insert_batch(
686 batch
687 .iter()
688 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
689 .collect::<Vec<_>>()
690 .try_into()
691 .unwrap()
692 )
693 .await
694 .is_ok()
695 );
696 }
697
698 let batch = envelope_store
700 .delete_batch(own_key, sampling_key)
701 .await
702 .unwrap()
703 .unwrap();
704 assert_eq!(batch.len(), 5);
705 for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
706 assert_eq!(
707 extracted_envelope.received_at().timestamp_millis(),
708 (&batches[1])[i].received_at().timestamp_millis()
709 );
710 }
711
712 let batch = envelope_store
714 .delete_batch(own_key, sampling_key)
715 .await
716 .unwrap()
717 .unwrap();
718 assert_eq!(batch.len(), 5);
719 for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
720 assert_eq!(
721 extracted_envelope.received_at().timestamp_millis(),
722 (&batches[0])[i].received_at().timestamp_millis()
723 );
724 }
725
726 assert!(
728 envelope_store
729 .delete_batch(own_key, sampling_key)
730 .await
731 .unwrap()
732 .is_none()
733 );
734 }
735
736 #[tokio::test]
737 async fn test_insert_and_delete_single() {
738 let db = setup_db(true).await;
739 let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
740
741 let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
742 let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
743
744 let inserted = mock_envelopes(1);
746
747 assert!(
748 envelope_store
749 .insert_batch(
750 inserted
751 .iter()
752 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
753 .collect::<Vec<_>>()
754 .try_into()
755 .unwrap()
756 )
757 .await
758 .is_ok()
759 );
760
761 let extracted = envelope_store
763 .delete_batch(own_key, sampling_key)
764 .await
765 .unwrap()
766 .unwrap();
767 assert_eq!(extracted.len(), 1);
768
769 assert_eq!(
770 extracted.envelopes[0].received_at().timestamp_millis(),
771 inserted[0].received_at().timestamp_millis()
772 );
773
774 assert!(
776 envelope_store
777 .delete_batch(own_key, sampling_key)
778 .await
779 .unwrap()
780 .is_none()
781 );
782 }
783
784 #[tokio::test]
785 async fn test_insert_and_get_project_keys_pairs() {
786 let db = setup_db(true).await;
787 let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
788
789 let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
790 let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
791
792 let envelopes = mock_envelopes(2);
794 envelope_store
795 .insert_batch(
796 envelopes
797 .into_iter()
798 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
799 .collect::<Vec<_>>()
800 .try_into()
801 .unwrap(),
802 )
803 .await
804 .unwrap();
805
806 let project_key_pairs = envelope_store.project_key_pairs().await.unwrap();
809 assert_eq!(project_key_pairs.len(), 1);
810 assert_eq!(
811 project_key_pairs.into_iter().last().unwrap(),
812 ProjectKeyPair::new(own_key, sampling_key)
813 );
814 }
815
816 #[tokio::test]
817 async fn test_estimate_disk_usage() {
818 let db = setup_db(true).await;
819 let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
820 let disk_usage = DiskUsage::prepare(0, db, Duration::from_millis(1))
821 .await
822 .unwrap();
823
824 let usage_1 = disk_usage.usage();
826 assert!(usage_1 > 0);
827
828 let envelopes = mock_envelopes(10);
830 store
831 .insert_batch(
832 envelopes
833 .into_iter()
834 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
835 .collect::<Vec<_>>()
836 .try_into()
837 .unwrap(),
838 )
839 .await
840 .unwrap();
841
842 sleep(Duration::from_millis(2)).await;
844
845 let usage_2 = disk_usage.usage();
847 assert!(usage_2 >= usage_1);
848 }
849
850 #[tokio::test]
851 async fn test_total_count() {
852 let db = setup_db(true).await;
853 let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
854
855 let envelopes = mock_envelopes(10);
856 store
857 .insert_batch(
858 envelopes
859 .iter()
860 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
861 .collect::<Vec<_>>()
862 .try_into()
863 .unwrap(),
864 )
865 .await
866 .unwrap();
867
868 assert_eq!(store.total_count().await.unwrap(), envelopes.len() as u64);
869 }
870}