1use std::io::{ErrorKind, Read};
2use std::path::Path;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::time::Duration;
6
7use crate::envelope::EnvelopeError;
8
9use crate::Envelope;
10use crate::services::buffer::common::ProjectKeyPair;
11use crate::statsd::{RelayDistributions, RelayGauges, RelayTimers};
12use bytes::{Buf, Bytes};
13use chrono::{DateTime, Utc};
14use futures::stream::StreamExt;
15use hashbrown::HashSet;
16use relay_base_schema::project::{ParseProjectKeyError, ProjectKey};
17use relay_config::Config;
18use serde::{Deserialize, Serialize};
19use sqlx::migrate::MigrateError;
20use sqlx::query::Query;
21use sqlx::sqlite::{
22 SqliteArguments, SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions,
23 SqliteRow, SqliteSynchronous,
24};
25use sqlx::{Pool, Row, Sqlite};
26use tokio::fs::DirBuilder;
27use tokio::time::sleep;
28
29const ZSTD_MAGIC_WORD: &[u8] = &[40, 181, 47, 253];
33
34#[derive(Clone, Debug, Serialize, Deserialize)]
36pub struct DatabaseEnvelope {
37 received_at: i64,
38 own_key: ProjectKey,
39 sampling_key: ProjectKey,
40 encoded_envelope: Box<[u8]>,
41}
42
43#[derive(Clone, Debug)]
44pub struct DatabaseBatch {
45 received_at: i64,
46 own_key: ProjectKey,
47 sampling_key: ProjectKey,
48 envelopes: Vec<DatabaseEnvelope>,
49}
50
51impl DatabaseBatch {
52 pub fn len(&self) -> usize {
53 self.envelopes.len()
54 }
55}
56
57impl TryFrom<Vec<DatabaseEnvelope>> for DatabaseBatch {
58 type Error = ();
59
60 fn try_from(envelopes: Vec<DatabaseEnvelope>) -> Result<Self, Self::Error> {
61 let Some(last) = envelopes.last() else {
62 return Err(());
63 };
64 Ok(Self {
65 received_at: last.received_at,
66 own_key: last.own_key,
67 sampling_key: last.sampling_key,
68 envelopes,
69 })
70 }
71}
72
73impl From<DatabaseBatch> for Vec<DatabaseEnvelope> {
74 fn from(value: DatabaseBatch) -> Self {
75 value.envelopes
76 }
77}
78
79#[derive(Debug, thiserror::Error)]
80pub enum InsertEnvelopeError {
81 #[error("envelope conversion error: {0}")]
82 Envelope(#[from] EnvelopeError),
83 #[error("compression error: {0}")]
84 Zstd(#[from] std::io::Error),
85}
86
87impl DatabaseEnvelope {
88 const COMPRESSION_LEVEL: i32 = 1;
93
94 pub fn len(&self) -> usize {
95 self.encoded_envelope.len()
96 }
97
98 pub fn received_at(&self) -> DateTime<Utc> {
99 DateTime::from_timestamp_millis(self.received_at).unwrap_or(Utc::now())
100 }
101}
102
103impl TryFrom<DatabaseEnvelope> for Box<Envelope> {
104 type Error = InsertEnvelopeError;
105
106 fn try_from(value: DatabaseEnvelope) -> Result<Self, Self::Error> {
107 let received_at = value.received_at();
108 let DatabaseEnvelope {
109 received_at: _,
110 own_key,
111 sampling_key,
112 mut encoded_envelope,
113 } = value;
114
115 if encoded_envelope.starts_with(ZSTD_MAGIC_WORD) {
116 relay_statsd::metric!(
117 timer(RelayTimers::BufferEnvelopeDecompression, sample = 0.01),
118 {
119 encoded_envelope = zstd::decode_all(&*encoded_envelope)?.into_boxed_slice();
120 }
121 );
122 }
123
124 let mut envelope = Envelope::parse_bytes(Bytes::from(encoded_envelope))?;
125 debug_assert_eq!(envelope.meta().public_key(), own_key);
126 debug_assert!(
127 envelope
128 .sampling_key()
129 .is_none_or(|key| key == sampling_key)
130 );
131
132 envelope.set_received_at(received_at);
133
134 Ok(envelope)
135 }
136}
137
138impl<'a> TryFrom<&'a Envelope> for DatabaseEnvelope {
139 type Error = InsertEnvelopeError;
140
141 fn try_from(value: &'a Envelope) -> Result<Self, Self::Error> {
142 let own_key = value.meta().public_key();
143 let sampling_key = value.sampling_key().unwrap_or(own_key);
144
145 let serialized_envelope = value.to_vec()?;
146 relay_statsd::metric!(
147 distribution(RelayDistributions::BufferEnvelopeSize, sample = 0.01) =
148 serialized_envelope.len() as u64
149 );
150
151 let encoded_envelope = relay_statsd::metric!(
152 timer(RelayTimers::BufferEnvelopeCompression, sample = 0.01),
153 { zstd::encode_all(serialized_envelope.as_slice(), Self::COMPRESSION_LEVEL)? }
154 );
155 relay_statsd::metric!(
156 distribution(
157 RelayDistributions::BufferEnvelopeSizeCompressed,
158 sample = 0.01
159 ) = encoded_envelope.len() as u64
160 );
161
162 Ok(DatabaseEnvelope {
163 received_at: value.received_at().timestamp_millis(),
164 own_key,
165 sampling_key,
166 encoded_envelope: encoded_envelope.into_boxed_slice(),
167 })
168 }
169}
170
171#[derive(Debug, thiserror::Error)]
173pub enum SqliteEnvelopeStoreError {
174 #[error("failed to setup the database: {0}")]
175 SqlxSetupFailed(sqlx::Error),
176
177 #[error("failed to create the spool file: {0}")]
178 FileSetupError(std::io::Error),
179
180 #[error("failed to write to disk: {0}")]
181 WriteError(sqlx::Error),
182
183 #[error("failed to read from disk: {0}")]
184 FetchError(sqlx::Error),
185
186 #[error("failed to unpack envelopes: {0}")]
187 UnpackError(#[from] std::io::Error),
188
189 #[error("no file path for the spool was provided")]
190 NoFilePath,
191
192 #[error("failed to migrate the database: {0}")]
193 MigrationError(MigrateError),
194
195 #[error("failed to extract the envelope from the database")]
196 EnvelopeExtractionError,
197
198 #[error("failed to extract a project key from the database")]
199 ProjectKeyExtractionError(#[from] ParseProjectKeyError),
200
201 #[error("failed to get database file size: {0}")]
202 FileSizeReadFailed(sqlx::Error),
203}
204
205#[derive(Debug, Clone)]
206struct DiskUsage {
207 db: Pool<Sqlite>,
208 last_known_usage: Arc<AtomicU64>,
209 refresh_frequency: Duration,
210 partition_tag: String,
211}
212
213impl DiskUsage {
214 fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
216 Self {
217 db,
218 last_known_usage: Arc::new(AtomicU64::new(0)),
219 refresh_frequency,
220 partition_tag: partition_id.to_string(),
221 }
222 }
223
224 pub async fn prepare(
227 partition_id: u8,
228 db: Pool<Sqlite>,
229 refresh_frequency: Duration,
230 ) -> Result<Self, SqliteEnvelopeStoreError> {
231 let disk_usage = Self::new(partition_id, db.clone(), refresh_frequency);
232 let usage = Self::estimate_usage(&disk_usage.partition_tag, &db).await?;
233 disk_usage.last_known_usage.store(usage, Ordering::Relaxed);
234 disk_usage.start_background_refresh();
235
236 Ok(disk_usage)
237 }
238
239 fn usage(&self) -> u64 {
242 self.last_known_usage.load(Ordering::Relaxed)
243 }
244
245 fn start_background_refresh(&self) {
247 let db = self.db.clone();
248 let last_known_usage_weak = Arc::downgrade(&self.last_known_usage);
251 let refresh_frequency = self.refresh_frequency;
252
253 let partition_tag = self.partition_tag.clone();
254 relay_system::spawn!(async move {
255 loop {
256 let Some(last_known_usage) = last_known_usage_weak.upgrade() else {
259 break;
260 };
261
262 let usage = Self::estimate_usage(&partition_tag, &db).await;
263 let Ok(usage) = usage else {
264 relay_log::error!("failed to update the disk usage asynchronously");
265 return;
266 };
267
268 let current = last_known_usage.load(Ordering::Relaxed);
269 if last_known_usage
270 .compare_exchange_weak(current, usage, Ordering::Relaxed, Ordering::Relaxed)
271 .is_err()
272 {
273 relay_log::error!("failed to update the disk usage asynchronously");
274 };
275
276 sleep(refresh_frequency).await;
277 }
278 });
279 }
280
281 async fn estimate_usage(
283 partition_tag: &str,
284 db: &Pool<Sqlite>,
285 ) -> Result<u64, SqliteEnvelopeStoreError> {
286 let usage: i64 = build_estimate_size()
287 .fetch_one(db)
288 .await
289 .and_then(|r| r.try_get(0))
290 .map_err(SqliteEnvelopeStoreError::FileSizeReadFailed)?;
291
292 relay_statsd::metric!(
293 gauge(RelayGauges::BufferDiskUsed) = usage as u64,
294 partition_id = partition_tag
295 );
296
297 Ok(usage as u64)
298 }
299}
300
301#[derive(Debug, Clone)]
306pub struct SqliteEnvelopeStore {
307 db: Pool<Sqlite>,
308 disk_usage: DiskUsage,
309 partition_tag: String,
310}
311
312impl SqliteEnvelopeStore {
313 pub fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
315 Self {
316 db: db.clone(),
317 disk_usage: DiskUsage::new(partition_id, db, refresh_frequency),
318 partition_tag: partition_id.to_string(),
319 }
320 }
321
322 pub async fn prepare(
325 partition_id: u8,
326 config: &Config,
327 ) -> Result<SqliteEnvelopeStore, SqliteEnvelopeStoreError> {
328 let Some(path) = config.spool_envelopes_path(partition_id) else {
330 return Err(SqliteEnvelopeStoreError::NoFilePath);
331 };
332
333 relay_log::info!("buffer file {}", path.to_string_lossy());
334
335 Self::setup(&path).await?;
336
337 let options = SqliteConnectOptions::new()
338 .filename(&path)
339 .journal_mode(SqliteJournalMode::Wal)
348 .synchronous(SqliteSynchronous::Normal)
352 .auto_vacuum(SqliteAutoVacuum::Full)
358 .shared_cache(true);
362
363 let db = SqlitePoolOptions::new()
364 .max_connections(1)
365 .min_connections(1)
366 .connect_with(options)
367 .await
368 .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
369
370 Ok(SqliteEnvelopeStore {
371 db: db.clone(),
372 disk_usage: DiskUsage::prepare(
373 partition_id,
374 db,
375 config.spool_disk_usage_refresh_frequency_ms(),
376 )
377 .await?,
378 partition_tag: partition_id.to_string(),
379 })
380 }
381
382 async fn setup(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
387 Self::create_spool_directory(path).await?;
388
389 let options = SqliteConnectOptions::new()
390 .filename(path)
391 .journal_mode(SqliteJournalMode::Wal)
392 .create_if_missing(true);
393
394 let db = SqlitePoolOptions::new()
395 .connect_with(options)
396 .await
397 .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
398
399 sqlx::migrate!("../migrations")
400 .run(&db)
401 .await
402 .map_err(SqliteEnvelopeStoreError::MigrationError)?;
403
404 Ok(())
405 }
406
407 async fn create_spool_directory(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
409 let Some(parent) = path.parent() else {
410 return Ok(());
411 };
412
413 if !parent.as_os_str().is_empty() && !parent.exists() {
414 relay_log::debug!("creating directory for spooling file: {}", parent.display());
415 DirBuilder::new()
416 .recursive(true)
417 .create(&parent)
418 .await
419 .map_err(SqliteEnvelopeStoreError::FileSetupError)?;
420 }
421
422 Ok(())
423 }
424
425 pub async fn insert_batch(
427 &mut self,
428 envelopes: DatabaseBatch,
429 ) -> Result<(), SqliteEnvelopeStoreError> {
430 let DatabaseBatch {
431 received_at,
432 own_key,
433 sampling_key,
434 envelopes,
435 } = envelopes;
436
437 let count = envelopes.len();
438 let encoded = match count {
439 0 => {
440 debug_assert!(false, "should not be called with empty batch");
441 return Ok(());
442 }
443 1 => envelopes.into_iter().next().unwrap().encoded_envelope,
446 _more => pack_envelopes(envelopes),
447 };
448
449 let query = sqlx::query("INSERT INTO envelopes (received_at, own_key, sampling_key, count, envelope) VALUES (?, ?, ?, ?, ?);")
450 .bind(received_at)
451 .bind(own_key.as_str())
452 .bind(sampling_key.as_str())
453 .bind(count as u16)
454 .bind(encoded);
455
456 relay_statsd::metric!(
457 timer(RelayTimers::BufferSqlWrite),
458 partition_id = &self.partition_tag,
459 {
460 query
461 .execute(&self.db)
462 .await
463 .map_err(SqliteEnvelopeStoreError::WriteError)?;
464 }
465 );
466 Ok(())
467 }
468
469 pub async fn delete_batch(
471 &mut self,
472 own_key: ProjectKey,
473 sampling_key: ProjectKey,
474 ) -> Result<Option<DatabaseBatch>, SqliteEnvelopeStoreError> {
475 let mut rows = build_delete_and_fetch_many_envelopes(own_key, sampling_key).fetch(&self.db);
476 let Some(row) = rows.as_mut().next().await else {
477 return Ok(None);
478 };
479 let row = row.map_err(SqliteEnvelopeStoreError::FetchError)?;
480
481 Ok(Some(extract_batch(own_key, sampling_key, row)?))
482 }
483
484 pub async fn project_key_pairs(
487 &self,
488 ) -> Result<HashSet<ProjectKeyPair>, SqliteEnvelopeStoreError> {
489 let project_key_pairs = build_get_project_key_pairs()
490 .fetch_all(&self.db)
491 .await
492 .map_err(SqliteEnvelopeStoreError::FetchError)?;
493
494 let project_key_pairs = project_key_pairs
495 .into_iter()
496 .filter_map(|project_key_pair| extract_project_key_pair(project_key_pair).ok())
498 .collect();
499
500 Ok(project_key_pairs)
501 }
502
503 pub fn usage(&self) -> u64 {
505 self.disk_usage.usage()
506 }
507
508 pub async fn total_count(&self) -> Result<u64, SqliteEnvelopeStoreError> {
510 let row = build_count_all()
511 .fetch_one(&self.db)
512 .await
513 .map_err(SqliteEnvelopeStoreError::FetchError)?;
514
515 let total_count: i64 = row.get(0);
516 Ok(total_count as u64)
517 }
518}
519
520fn pack_envelopes(envelopes: Vec<DatabaseEnvelope>) -> Box<[u8]> {
521 let mut packed = vec![];
522 for envelope in envelopes {
523 packed.extend_from_slice(&envelope.received_at.to_le_bytes());
524 packed.extend_from_slice(&(envelope.encoded_envelope.len() as u32).to_le_bytes());
525 packed.extend_from_slice(&envelope.encoded_envelope);
526 }
527 packed.into_boxed_slice()
528}
529
530fn unpack_envelopes(
531 own_key: ProjectKey,
532 sampling_key: ProjectKey,
533 data: &[u8],
534) -> Result<Vec<DatabaseEnvelope>, std::io::Error> {
535 let mut envelopes = vec![];
536 let mut buf = data.reader();
537 loop {
538 let mut b = [0u8; 8];
539 match buf.read(&mut b)? {
540 0 => break,
542 n if n != b.len() => return Err(ErrorKind::UnexpectedEof.into()),
544 _ => {}
545 }
546 let received_at = i64::from_le_bytes(b);
547
548 let mut b = [0u8; 4];
549 buf.read_exact(&mut b)?;
550 let size = u32::from_le_bytes(b);
551
552 let mut b = vec![0u8; size as usize];
553 buf.read_exact(&mut b)?;
554
555 envelopes.push(DatabaseEnvelope {
556 received_at,
557 own_key,
558 sampling_key,
559 encoded_envelope: b.into_boxed_slice(),
560 });
561 }
562 Ok(envelopes)
563}
564
565fn extract_batch(
567 own_key: ProjectKey,
568 sampling_key: ProjectKey,
569 row: SqliteRow,
570) -> Result<DatabaseBatch, SqliteEnvelopeStoreError> {
571 let received_at: i64 = row
572 .try_get("received_at")
573 .map_err(SqliteEnvelopeStoreError::FetchError)?;
574 let data: Box<[u8]> = row
575 .try_get("envelope")
576 .map_err(SqliteEnvelopeStoreError::FetchError)?;
577 let count: u64 = row
578 .try_get("count")
579 .map_err(SqliteEnvelopeStoreError::FetchError)?;
580
581 let envelopes = match count {
582 0 => {
583 debug_assert!(false, "db should not contain empty row");
584 vec![]
585 }
586 1 => vec![DatabaseEnvelope {
587 received_at,
588 own_key,
589 sampling_key,
590 encoded_envelope: data,
591 }],
592 _more => unpack_envelopes(own_key, sampling_key, &data)?,
593 };
594
595 Ok(DatabaseBatch {
596 received_at,
597 own_key,
598 sampling_key,
599 envelopes,
600 })
601}
602
603fn extract_project_key_pair(row: SqliteRow) -> Result<ProjectKeyPair, SqliteEnvelopeStoreError> {
605 let own_key = row
606 .try_get("own_key")
607 .map_err(SqliteEnvelopeStoreError::FetchError)
608 .and_then(|key| {
609 ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
610 });
611 let sampling_key = row
612 .try_get("sampling_key")
613 .map_err(SqliteEnvelopeStoreError::FetchError)
614 .and_then(|key| {
615 ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
616 });
617
618 match (own_key, sampling_key) {
619 (Ok(own_key), Ok(sampling_key)) => Ok(ProjectKeyPair::new(own_key, sampling_key)),
620 (Err(err), _) | (_, Err(err)) => {
622 relay_log::error!("failed to extract a queue key from the spool record: {err}");
623
624 Err(err)
625 }
626 }
627}
628
629pub fn build_delete_and_fetch_many_envelopes<'a>(
631 own_key: ProjectKey,
632 project_key: ProjectKey,
633) -> Query<'a, Sqlite, SqliteArguments<'a>> {
634 sqlx::query(
635 "DELETE FROM
636 envelopes
637 WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ?
638 ORDER BY received_at DESC LIMIT 1)
639 RETURNING
640 received_at, own_key, sampling_key, envelope, count",
641 )
642 .bind(own_key.to_string())
643 .bind(project_key.to_string())
644}
645
646pub fn build_estimate_size<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
650 sqlx::query(
651 r#"SELECT (page_count - freelist_count) * page_size as size FROM pragma_page_count(), pragma_freelist_count(), pragma_page_size();"#,
652 )
653}
654
655pub fn build_get_project_key_pairs<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
657 sqlx::query("SELECT DISTINCT own_key, sampling_key FROM envelopes;")
658}
659
660pub fn build_count_all<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
665 sqlx::query("SELECT SUM(count) FROM envelopes;")
666}
667
668#[cfg(test)]
669mod tests {
670 use std::time::Duration;
671 use tokio::time::sleep;
672
673 use relay_base_schema::project::ProjectKey;
674
675 use super::*;
676 use crate::services::buffer::testutils::utils::{mock_envelopes, setup_db};
677
678 #[tokio::test]
679 async fn test_insert_and_delete_envelopes() {
680 let db = setup_db(true).await;
681 let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
682
683 let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
684 let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
685
686 let batches = [mock_envelopes(5), mock_envelopes(5)];
688 for batch in &batches {
689 assert!(
690 envelope_store
691 .insert_batch(
692 batch
693 .iter()
694 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
695 .collect::<Vec<_>>()
696 .try_into()
697 .unwrap()
698 )
699 .await
700 .is_ok()
701 );
702 }
703
704 let batch = envelope_store
706 .delete_batch(own_key, sampling_key)
707 .await
708 .unwrap()
709 .unwrap();
710 assert_eq!(batch.len(), 5);
711 for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
712 assert_eq!(
713 extracted_envelope.received_at().timestamp_millis(),
714 (&batches[1])[i].received_at().timestamp_millis()
715 );
716 }
717
718 let batch = envelope_store
720 .delete_batch(own_key, sampling_key)
721 .await
722 .unwrap()
723 .unwrap();
724 assert_eq!(batch.len(), 5);
725 for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
726 assert_eq!(
727 extracted_envelope.received_at().timestamp_millis(),
728 (&batches[0])[i].received_at().timestamp_millis()
729 );
730 }
731
732 assert!(
734 envelope_store
735 .delete_batch(own_key, sampling_key)
736 .await
737 .unwrap()
738 .is_none()
739 );
740 }
741
742 #[tokio::test]
743 async fn test_insert_and_delete_single() {
744 let db = setup_db(true).await;
745 let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
746
747 let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
748 let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
749
750 let inserted = mock_envelopes(1);
752
753 assert!(
754 envelope_store
755 .insert_batch(
756 inserted
757 .iter()
758 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
759 .collect::<Vec<_>>()
760 .try_into()
761 .unwrap()
762 )
763 .await
764 .is_ok()
765 );
766
767 let extracted = envelope_store
769 .delete_batch(own_key, sampling_key)
770 .await
771 .unwrap()
772 .unwrap();
773 assert_eq!(extracted.len(), 1);
774
775 assert_eq!(
776 extracted.envelopes[0].received_at().timestamp_millis(),
777 inserted[0].received_at().timestamp_millis()
778 );
779
780 assert!(
782 envelope_store
783 .delete_batch(own_key, sampling_key)
784 .await
785 .unwrap()
786 .is_none()
787 );
788 }
789
790 #[tokio::test]
791 async fn test_insert_and_get_project_keys_pairs() {
792 let db = setup_db(true).await;
793 let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
794
795 let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
796 let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
797
798 let envelopes = mock_envelopes(2);
800 envelope_store
801 .insert_batch(
802 envelopes
803 .into_iter()
804 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
805 .collect::<Vec<_>>()
806 .try_into()
807 .unwrap(),
808 )
809 .await
810 .unwrap();
811
812 let project_key_pairs = envelope_store.project_key_pairs().await.unwrap();
815 assert_eq!(project_key_pairs.len(), 1);
816 assert_eq!(
817 project_key_pairs.into_iter().last().unwrap(),
818 ProjectKeyPair::new(own_key, sampling_key)
819 );
820 }
821
822 #[tokio::test]
823 async fn test_estimate_disk_usage() {
824 let db = setup_db(true).await;
825 let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
826 let disk_usage = DiskUsage::prepare(0, db, Duration::from_millis(1))
827 .await
828 .unwrap();
829
830 let usage_1 = disk_usage.usage();
832 assert!(usage_1 > 0);
833
834 let envelopes = mock_envelopes(10);
836 store
837 .insert_batch(
838 envelopes
839 .into_iter()
840 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
841 .collect::<Vec<_>>()
842 .try_into()
843 .unwrap(),
844 )
845 .await
846 .unwrap();
847
848 sleep(Duration::from_millis(2)).await;
850
851 let usage_2 = disk_usage.usage();
853 assert!(usage_2 >= usage_1);
854 }
855
856 #[tokio::test]
857 async fn test_total_count() {
858 let db = setup_db(true).await;
859 let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
860
861 let envelopes = mock_envelopes(10);
862 store
863 .insert_batch(
864 envelopes
865 .iter()
866 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
867 .collect::<Vec<_>>()
868 .try_into()
869 .unwrap(),
870 )
871 .await
872 .unwrap();
873
874 assert_eq!(store.total_count().await.unwrap(), envelopes.len() as u64);
875 }
876}