use std::error::Error;
use std::path::Path;
use std::pin::pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crate::envelope::EnvelopeError;
use crate::extractors::StartTime;
use crate::services::buffer::common::ProjectKeyPair;
use crate::statsd::RelayGauges;
use crate::Envelope;
use futures::stream::StreamExt;
use hashbrown::HashSet;
use relay_base_schema::project::{ParseProjectKeyError, ProjectKey};
use relay_common::time::UnixTimestamp;
use relay_config::Config;
use sqlx::migrate::MigrateError;
use sqlx::query::Query;
use sqlx::sqlite::{
SqliteArguments, SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions,
SqliteRow, SqliteSynchronous,
};
use sqlx::{Pool, QueryBuilder, Row, Sqlite};
use tokio::fs::DirBuilder;
use tokio::time::sleep;
pub struct InsertEnvelope {
received_at: i64,
own_key: ProjectKey,
sampling_key: ProjectKey,
encoded_envelope: Vec<u8>,
}
impl<'a> TryFrom<&'a Envelope> for InsertEnvelope {
type Error = EnvelopeError;
fn try_from(value: &'a Envelope) -> Result<Self, Self::Error> {
let own_key = value.meta().public_key();
let sampling_key = value.sampling_key().unwrap_or(own_key);
let encoded_envelope = match value.to_vec() {
Ok(encoded_envelope) => encoded_envelope,
Err(err) => {
relay_log::error!(
error = &err as &dyn Error,
own_key = own_key.to_string(),
sampling_key = sampling_key.to_string(),
"failed to serialize envelope",
);
return Err(err);
}
};
Ok(InsertEnvelope {
received_at: value.received_at().timestamp_millis(),
own_key,
sampling_key,
encoded_envelope,
})
}
}
#[derive(Debug, thiserror::Error)]
pub enum SqliteEnvelopeStoreError {
#[error("failed to setup the database: {0}")]
SqlxSetupFailed(sqlx::Error),
#[error("failed to create the spool file: {0}")]
FileSetupError(std::io::Error),
#[error("failed to write to disk: {0}")]
WriteError(sqlx::Error),
#[error("failed to read from disk: {0}")]
FetchError(sqlx::Error),
#[error("no file path for the spool was provided")]
NoFilePath,
#[error("failed to migrate the database: {0}")]
MigrationError(MigrateError),
#[error("failed to extract the envelope from the database")]
EnvelopeExtractionError,
#[error("failed to extract a project key from the database")]
ProjectKeyExtractionError(#[from] ParseProjectKeyError),
#[error("failed to get database file size: {0}")]
FileSizeReadFailed(sqlx::Error),
}
#[derive(Debug, Clone)]
struct DiskUsage {
db: Pool<Sqlite>,
last_known_usage: Arc<AtomicU64>,
refresh_frequency: Duration,
}
impl DiskUsage {
fn new(db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
Self {
db,
last_known_usage: Arc::new(AtomicU64::new(0)),
refresh_frequency,
}
}
pub async fn prepare(
db: Pool<Sqlite>,
refresh_frequency: Duration,
) -> Result<Self, SqliteEnvelopeStoreError> {
let usage = Self::estimate_usage(&db).await?;
let disk_usage = Self::new(db, refresh_frequency);
disk_usage.last_known_usage.store(usage, Ordering::Relaxed);
disk_usage.start_background_refresh();
Ok(disk_usage)
}
fn usage(&self) -> u64 {
self.last_known_usage.load(Ordering::Relaxed)
}
fn start_background_refresh(&self) {
let db = self.db.clone();
let last_known_usage_weak = Arc::downgrade(&self.last_known_usage);
let refresh_frequency = self.refresh_frequency;
tokio::spawn(async move {
loop {
let Some(last_known_usage) = last_known_usage_weak.upgrade() else {
break;
};
let usage = Self::estimate_usage(&db).await;
let Ok(usage) = usage else {
relay_log::error!("failed to update the disk usage asynchronously");
return;
};
let current = last_known_usage.load(Ordering::Relaxed);
if last_known_usage
.compare_exchange_weak(current, usage, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
relay_log::error!("failed to update the disk usage asynchronously");
};
sleep(refresh_frequency).await;
}
});
}
async fn estimate_usage(db: &Pool<Sqlite>) -> Result<u64, SqliteEnvelopeStoreError> {
let usage: i64 = build_estimate_size()
.fetch_one(db)
.await
.and_then(|r| r.try_get(0))
.map_err(SqliteEnvelopeStoreError::FileSizeReadFailed)?;
relay_statsd::metric!(gauge(RelayGauges::BufferDiskUsed) = usage as u64);
Ok(usage as u64)
}
}
#[derive(Debug, Clone)]
pub struct SqliteEnvelopeStore {
db: Pool<Sqlite>,
disk_usage: DiskUsage,
}
impl SqliteEnvelopeStore {
pub fn new(db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
Self {
db: db.clone(),
disk_usage: DiskUsage::new(db, refresh_frequency),
}
}
pub async fn prepare(config: &Config) -> Result<SqliteEnvelopeStore, SqliteEnvelopeStoreError> {
let Some(path) = config.spool_envelopes_path() else {
return Err(SqliteEnvelopeStoreError::NoFilePath);
};
relay_log::info!("buffer file {}", path.to_string_lossy());
Self::setup(&path).await?;
let options = SqliteConnectOptions::new()
.filename(&path)
.journal_mode(SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Full)
.shared_cache(true);
let db = SqlitePoolOptions::new()
.max_connections(config.spool_envelopes_max_connections())
.min_connections(config.spool_envelopes_min_connections())
.connect_with(options)
.await
.map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
Ok(SqliteEnvelopeStore {
db: db.clone(),
disk_usage: DiskUsage::prepare(db, config.spool_disk_usage_refresh_frequency_ms())
.await?,
})
}
async fn setup(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
Self::create_spool_directory(path).await?;
let options = SqliteConnectOptions::new()
.filename(path)
.journal_mode(SqliteJournalMode::Wal)
.create_if_missing(true);
let db = SqlitePoolOptions::new()
.connect_with(options)
.await
.map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
sqlx::migrate!("../migrations")
.run(&db)
.await
.map_err(SqliteEnvelopeStoreError::MigrationError)?;
Ok(())
}
async fn create_spool_directory(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
let Some(parent) = path.parent() else {
return Ok(());
};
if !parent.as_os_str().is_empty() && !parent.exists() {
relay_log::debug!("creating directory for spooling file: {}", parent.display());
DirBuilder::new()
.recursive(true)
.create(&parent)
.await
.map_err(SqliteEnvelopeStoreError::FileSetupError)?;
}
Ok(())
}
pub async fn insert_many(
&mut self,
envelopes: impl IntoIterator<Item = InsertEnvelope>,
) -> Result<(), SqliteEnvelopeStoreError> {
if let Err(err) = build_insert_many_envelopes(envelopes.into_iter())
.build()
.execute(&self.db)
.await
{
relay_log::error!(
error = &err as &dyn Error,
"failed to spool envelopes to disk",
);
return Err(SqliteEnvelopeStoreError::WriteError(err));
}
Ok(())
}
pub async fn delete_many(
&mut self,
own_key: ProjectKey,
sampling_key: ProjectKey,
limit: i64,
) -> Result<Vec<Box<Envelope>>, SqliteEnvelopeStoreError> {
let envelopes = build_delete_and_fetch_many_envelopes(own_key, sampling_key, limit)
.fetch(&self.db)
.peekable();
let mut envelopes = pin!(envelopes);
if envelopes.as_mut().peek().await.is_none() {
return Ok(vec![]);
}
let mut extracted_envelopes = Vec::with_capacity(limit as usize);
let mut db_error = None;
while let Some(envelope) = envelopes.as_mut().next().await {
let envelope = match envelope {
Ok(envelope) => envelope,
Err(err) => {
relay_log::error!(
error = &err as &dyn Error,
"failed to unspool the envelopes from the disk",
);
db_error = Some(err);
continue;
}
};
match extract_envelope(envelope) {
Ok(envelope) => {
extracted_envelopes.push(envelope);
}
Err(err) => {
relay_log::error!(
error = &err as &dyn Error,
"failed to extract the envelope unspooled from disk",
)
}
}
}
if extracted_envelopes.is_empty() {
if let Some(db_error) = db_error {
return Err(SqliteEnvelopeStoreError::FetchError(db_error));
}
}
extracted_envelopes.sort_by_key(|a| {
UnixTimestamp::from_datetime(a.received_at()).unwrap_or(UnixTimestamp::now())
});
Ok(extracted_envelopes)
}
pub async fn project_key_pairs(
&self,
) -> Result<HashSet<ProjectKeyPair>, SqliteEnvelopeStoreError> {
let project_key_pairs = build_get_project_key_pairs()
.fetch_all(&self.db)
.await
.map_err(SqliteEnvelopeStoreError::FetchError)?;
let project_key_pairs = project_key_pairs
.into_iter()
.filter_map(|project_key_pair| extract_project_key_pair(project_key_pair).ok())
.collect();
Ok(project_key_pairs)
}
pub fn usage(&self) -> u64 {
self.disk_usage.usage()
}
pub async fn total_count(&self) -> Result<u64, SqliteEnvelopeStoreError> {
let row = build_count_all()
.fetch_one(&self.db)
.await
.map_err(SqliteEnvelopeStoreError::FetchError)?;
let total_count: i64 = row.get(0);
Ok(total_count as u64)
}
}
fn extract_envelope(row: SqliteRow) -> Result<Box<Envelope>, SqliteEnvelopeStoreError> {
let envelope_row: Vec<u8> = row
.try_get("envelope")
.map_err(SqliteEnvelopeStoreError::FetchError)?;
let envelope_bytes = bytes::Bytes::from(envelope_row);
let mut envelope = Envelope::parse_bytes(envelope_bytes)
.map_err(|_| SqliteEnvelopeStoreError::EnvelopeExtractionError)?;
let received_at: i64 = row
.try_get("received_at")
.map_err(SqliteEnvelopeStoreError::FetchError)?;
let start_time = StartTime::from_timestamp_millis(received_at as u64);
envelope.set_start_time(start_time.into_inner());
Ok(envelope)
}
fn extract_project_key_pair(row: SqliteRow) -> Result<ProjectKeyPair, SqliteEnvelopeStoreError> {
let own_key = row
.try_get("own_key")
.map_err(SqliteEnvelopeStoreError::FetchError)
.and_then(|key| {
ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
});
let sampling_key = row
.try_get("sampling_key")
.map_err(SqliteEnvelopeStoreError::FetchError)
.and_then(|key| {
ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
});
match (own_key, sampling_key) {
(Ok(own_key), Ok(sampling_key)) => Ok(ProjectKeyPair::new(own_key, sampling_key)),
(Err(err), _) | (_, Err(err)) => {
relay_log::error!("failed to extract a queue key from the spool record: {err}");
Err(err)
}
}
}
fn build_insert_many_envelopes<'a>(
envelopes: impl Iterator<Item = InsertEnvelope>,
) -> QueryBuilder<'a, Sqlite> {
let mut builder: QueryBuilder<Sqlite> =
QueryBuilder::new("INSERT INTO envelopes (received_at, own_key, sampling_key, envelope) ");
builder.push_values(envelopes, |mut b, envelope| {
b.push_bind(envelope.received_at)
.push_bind(envelope.own_key.to_string())
.push_bind(envelope.sampling_key.to_string())
.push_bind(envelope.encoded_envelope);
});
builder
}
pub fn build_delete_and_fetch_many_envelopes<'a>(
own_key: ProjectKey,
project_key: ProjectKey,
batch_size: i64,
) -> Query<'a, Sqlite, SqliteArguments<'a>> {
sqlx::query(
"DELETE FROM
envelopes
WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ?
ORDER BY received_at DESC LIMIT ?)
RETURNING
received_at, own_key, sampling_key, envelope",
)
.bind(own_key.to_string())
.bind(project_key.to_string())
.bind(batch_size)
}
pub fn build_estimate_size<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
sqlx::query(
r#"SELECT (page_count - freelist_count) * page_size as size FROM pragma_page_count(), pragma_freelist_count(), pragma_page_size();"#,
)
}
pub fn build_get_project_key_pairs<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
sqlx::query("SELECT DISTINCT own_key, sampling_key FROM envelopes;")
}
pub fn build_count_all<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
sqlx::query("SELECT COUNT(1) FROM envelopes;")
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::time::sleep;
use relay_base_schema::project::ProjectKey;
use super::*;
use crate::services::buffer::testutils::utils::{mock_envelopes, setup_db};
#[tokio::test]
async fn test_insert_and_delete_envelopes() {
let db = setup_db(true).await;
let mut envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
let envelopes = mock_envelopes(10);
assert!(envelope_store
.insert_many(envelopes.iter().map(|e| e.as_ref().try_into().unwrap()))
.await
.is_ok());
let extracted_envelopes = envelope_store
.delete_many(own_key, sampling_key, 5)
.await
.unwrap();
assert_eq!(extracted_envelopes.len(), 5);
for (i, extracted_envelope) in extracted_envelopes.iter().enumerate().take(5) {
assert_eq!(extracted_envelope.event_id(), envelopes[5..][i].event_id());
}
let extracted_envelopes = envelope_store
.delete_many(own_key, sampling_key, 10)
.await
.unwrap();
assert_eq!(extracted_envelopes.len(), 5);
for (i, extracted_envelope) in extracted_envelopes.iter().enumerate().take(5) {
assert_eq!(extracted_envelope.event_id(), envelopes[0..5][i].event_id());
}
}
#[tokio::test]
async fn test_insert_and_get_project_keys_pairs() {
let db = setup_db(true).await;
let mut envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
let envelopes = mock_envelopes(2);
assert!(envelope_store
.insert_many(envelopes.iter().map(|e| e.as_ref().try_into().unwrap()))
.await
.is_ok());
let project_key_pairs = envelope_store.project_key_pairs().await.unwrap();
assert_eq!(project_key_pairs.len(), 1);
assert_eq!(
project_key_pairs.into_iter().last().unwrap(),
ProjectKeyPair::new(own_key, sampling_key)
);
}
#[tokio::test]
async fn test_estimate_disk_usage() {
let db = setup_db(true).await;
let mut store = SqliteEnvelopeStore::new(db.clone(), Duration::from_millis(1));
let disk_usage = DiskUsage::prepare(db, Duration::from_millis(1))
.await
.unwrap();
let usage_1 = disk_usage.usage();
assert!(usage_1 > 0);
let envelopes = mock_envelopes(10);
store
.insert_many(envelopes.iter().map(|e| e.as_ref().try_into().unwrap()))
.await
.unwrap();
sleep(Duration::from_millis(2)).await;
let usage_2 = disk_usage.usage();
assert!(usage_2 >= usage_1);
}
#[tokio::test]
async fn test_total_count() {
let db = setup_db(true).await;
let mut store = SqliteEnvelopeStore::new(db.clone(), Duration::from_millis(1));
let envelopes = mock_envelopes(10);
store
.insert_many(envelopes.iter().map(|e| e.as_ref().try_into().unwrap()))
.await
.unwrap();
assert_eq!(store.total_count().await.unwrap(), envelopes.len() as u64);
}
}