use std::io::{ErrorKind, Read};
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crate::envelope::EnvelopeError;
use crate::services::buffer::common::ProjectKeyPair;
use crate::statsd::{RelayGauges, RelayHistograms, RelayTimers};
use crate::Envelope;
use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc};
use futures::stream::StreamExt;
use hashbrown::HashSet;
use relay_base_schema::project::{ParseProjectKeyError, ProjectKey};
use relay_config::Config;
use serde::{Deserialize, Serialize};
use sqlx::migrate::MigrateError;
use sqlx::query::Query;
use sqlx::sqlite::{
SqliteArguments, SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions,
SqliteRow, SqliteSynchronous,
};
use sqlx::{Pool, Row, Sqlite};
use tokio::fs::DirBuilder;
use tokio::time::sleep;
const ZSTD_MAGIC_WORD: &[u8] = &[40, 181, 47, 253];
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DatabaseEnvelope {
received_at: i64,
own_key: ProjectKey,
sampling_key: ProjectKey,
encoded_envelope: Box<[u8]>,
}
#[derive(Clone, Debug)]
pub struct DatabaseBatch {
received_at: i64,
own_key: ProjectKey,
sampling_key: ProjectKey,
envelopes: Vec<DatabaseEnvelope>,
}
impl DatabaseBatch {
pub fn len(&self) -> usize {
self.envelopes.len()
}
}
impl TryFrom<Vec<DatabaseEnvelope>> for DatabaseBatch {
type Error = ();
fn try_from(envelopes: Vec<DatabaseEnvelope>) -> Result<Self, Self::Error> {
let Some(last) = envelopes.last() else {
return Err(());
};
Ok(Self {
received_at: last.received_at,
own_key: last.own_key,
sampling_key: last.sampling_key,
envelopes,
})
}
}
impl From<DatabaseBatch> for Vec<DatabaseEnvelope> {
fn from(value: DatabaseBatch) -> Self {
value.envelopes
}
}
#[derive(Debug, thiserror::Error)]
pub enum InsertEnvelopeError {
#[error("envelope conversion error: {0}")]
Envelope(#[from] EnvelopeError),
#[error("compression error: {0}")]
Zstd(#[from] std::io::Error),
}
impl DatabaseEnvelope {
const COMPRESSION_LEVEL: i32 = 1;
pub fn len(&self) -> usize {
self.encoded_envelope.len()
}
pub fn received_at(&self) -> DateTime<Utc> {
DateTime::from_timestamp_millis(self.received_at).unwrap_or(Utc::now())
}
}
impl TryFrom<DatabaseEnvelope> for Box<Envelope> {
type Error = InsertEnvelopeError;
fn try_from(value: DatabaseEnvelope) -> Result<Self, Self::Error> {
let received_at = value.received_at();
let DatabaseEnvelope {
received_at: _,
own_key,
sampling_key,
mut encoded_envelope,
} = value;
if encoded_envelope.starts_with(ZSTD_MAGIC_WORD) {
relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeDecompression), {
encoded_envelope = zstd::decode_all(&*encoded_envelope)?.into_boxed_slice();
});
}
let mut envelope = Envelope::parse_bytes(Bytes::from(encoded_envelope))?;
debug_assert_eq!(envelope.meta().public_key(), own_key);
debug_assert!(envelope
.sampling_key()
.map_or(true, |key| key == sampling_key));
envelope.set_received_at(received_at);
Ok(envelope)
}
}
impl<'a> TryFrom<&'a Envelope> for DatabaseEnvelope {
type Error = InsertEnvelopeError;
fn try_from(value: &'a Envelope) -> Result<Self, Self::Error> {
let own_key = value.meta().public_key();
let sampling_key = value.sampling_key().unwrap_or(own_key);
let serialized_envelope = value.to_vec()?;
relay_statsd::metric!(
histogram(RelayHistograms::BufferEnvelopeSize) = serialized_envelope.len() as u64
);
let encoded_envelope =
relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeCompression), {
zstd::encode_all(serialized_envelope.as_slice(), Self::COMPRESSION_LEVEL)?
});
relay_statsd::metric!(
histogram(RelayHistograms::BufferEnvelopeSizeCompressed) =
encoded_envelope.len() as u64
);
Ok(DatabaseEnvelope {
received_at: value.received_at().timestamp_millis(),
own_key,
sampling_key,
encoded_envelope: encoded_envelope.into_boxed_slice(),
})
}
}
#[derive(Debug, thiserror::Error)]
pub enum SqliteEnvelopeStoreError {
#[error("failed to setup the database: {0}")]
SqlxSetupFailed(sqlx::Error),
#[error("failed to create the spool file: {0}")]
FileSetupError(std::io::Error),
#[error("failed to write to disk: {0}")]
WriteError(sqlx::Error),
#[error("failed to read from disk: {0}")]
FetchError(sqlx::Error),
#[error("failed to unpack envelopes: {0}")]
UnpackError(#[from] std::io::Error),
#[error("no file path for the spool was provided")]
NoFilePath,
#[error("failed to migrate the database: {0}")]
MigrationError(MigrateError),
#[error("failed to extract the envelope from the database")]
EnvelopeExtractionError,
#[error("failed to extract a project key from the database")]
ProjectKeyExtractionError(#[from] ParseProjectKeyError),
#[error("failed to get database file size: {0}")]
FileSizeReadFailed(sqlx::Error),
}
#[derive(Debug, Clone)]
struct DiskUsage {
db: Pool<Sqlite>,
last_known_usage: Arc<AtomicU64>,
refresh_frequency: Duration,
partition_tag: String,
}
impl DiskUsage {
fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
Self {
db,
last_known_usage: Arc::new(AtomicU64::new(0)),
refresh_frequency,
partition_tag: partition_id.to_string(),
}
}
pub async fn prepare(
partition_id: u8,
db: Pool<Sqlite>,
refresh_frequency: Duration,
) -> Result<Self, SqliteEnvelopeStoreError> {
let disk_usage = Self::new(partition_id, db.clone(), refresh_frequency);
let usage = Self::estimate_usage(&disk_usage.partition_tag, &db).await?;
disk_usage.last_known_usage.store(usage, Ordering::Relaxed);
disk_usage.start_background_refresh();
Ok(disk_usage)
}
fn usage(&self) -> u64 {
self.last_known_usage.load(Ordering::Relaxed)
}
fn start_background_refresh(&self) {
let db = self.db.clone();
let last_known_usage_weak = Arc::downgrade(&self.last_known_usage);
let refresh_frequency = self.refresh_frequency;
let partition_tag = self.partition_tag.clone();
relay_system::spawn!(async move {
loop {
let Some(last_known_usage) = last_known_usage_weak.upgrade() else {
break;
};
let usage = Self::estimate_usage(&partition_tag, &db).await;
let Ok(usage) = usage else {
relay_log::error!("failed to update the disk usage asynchronously");
return;
};
let current = last_known_usage.load(Ordering::Relaxed);
if last_known_usage
.compare_exchange_weak(current, usage, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
relay_log::error!("failed to update the disk usage asynchronously");
};
sleep(refresh_frequency).await;
}
});
}
async fn estimate_usage(
partition_tag: &str,
db: &Pool<Sqlite>,
) -> Result<u64, SqliteEnvelopeStoreError> {
let usage: i64 = build_estimate_size()
.fetch_one(db)
.await
.and_then(|r| r.try_get(0))
.map_err(SqliteEnvelopeStoreError::FileSizeReadFailed)?;
relay_statsd::metric!(
gauge(RelayGauges::BufferDiskUsed) = usage as u64,
partition_id = partition_tag
);
Ok(usage as u64)
}
}
#[derive(Debug, Clone)]
pub struct SqliteEnvelopeStore {
db: Pool<Sqlite>,
disk_usage: DiskUsage,
partition_tag: String,
}
impl SqliteEnvelopeStore {
pub fn new(partition_id: u8, db: Pool<Sqlite>, refresh_frequency: Duration) -> Self {
Self {
db: db.clone(),
disk_usage: DiskUsage::new(partition_id, db, refresh_frequency),
partition_tag: partition_id.to_string(),
}
}
pub async fn prepare(
partition_id: u8,
config: &Config,
) -> Result<SqliteEnvelopeStore, SqliteEnvelopeStoreError> {
let Some(path) = config.spool_envelopes_path(partition_id) else {
return Err(SqliteEnvelopeStoreError::NoFilePath);
};
relay_log::info!("buffer file {}", path.to_string_lossy());
Self::setup(&path).await?;
let options = SqliteConnectOptions::new()
.filename(&path)
.journal_mode(SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Full)
.shared_cache(true);
let db = SqlitePoolOptions::new()
.max_connections(1)
.min_connections(1)
.connect_with(options)
.await
.map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
Ok(SqliteEnvelopeStore {
db: db.clone(),
disk_usage: DiskUsage::prepare(
partition_id,
db,
config.spool_disk_usage_refresh_frequency_ms(),
)
.await?,
partition_tag: partition_id.to_string(),
})
}
async fn setup(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
Self::create_spool_directory(path).await?;
let options = SqliteConnectOptions::new()
.filename(path)
.journal_mode(SqliteJournalMode::Wal)
.create_if_missing(true);
let db = SqlitePoolOptions::new()
.connect_with(options)
.await
.map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
sqlx::migrate!("../migrations")
.run(&db)
.await
.map_err(SqliteEnvelopeStoreError::MigrationError)?;
Ok(())
}
async fn create_spool_directory(path: &Path) -> Result<(), SqliteEnvelopeStoreError> {
let Some(parent) = path.parent() else {
return Ok(());
};
if !parent.as_os_str().is_empty() && !parent.exists() {
relay_log::debug!("creating directory for spooling file: {}", parent.display());
DirBuilder::new()
.recursive(true)
.create(&parent)
.await
.map_err(SqliteEnvelopeStoreError::FileSetupError)?;
}
Ok(())
}
pub async fn insert_batch(
&mut self,
envelopes: DatabaseBatch,
) -> Result<(), SqliteEnvelopeStoreError> {
let DatabaseBatch {
received_at,
own_key,
sampling_key,
envelopes,
} = envelopes;
let count = envelopes.len();
let encoded = match count {
0 => {
debug_assert!(false, "should not be called with empty batch");
return Ok(());
}
1 => envelopes.into_iter().next().unwrap().encoded_envelope,
_more => pack_envelopes(envelopes),
};
let query = sqlx::query("INSERT INTO envelopes (received_at, own_key, sampling_key, count, envelope) VALUES (?, ?, ?, ?, ?);")
.bind(received_at)
.bind(own_key.as_str())
.bind(sampling_key.as_str())
.bind(count as u16)
.bind(encoded);
relay_statsd::metric!(
timer(RelayTimers::BufferSqlWrite),
partition_id = &self.partition_tag,
{
query
.execute(&self.db)
.await
.map_err(SqliteEnvelopeStoreError::WriteError)?;
}
);
Ok(())
}
pub async fn delete_batch(
&mut self,
own_key: ProjectKey,
sampling_key: ProjectKey,
) -> Result<Option<DatabaseBatch>, SqliteEnvelopeStoreError> {
let mut rows = build_delete_and_fetch_many_envelopes(own_key, sampling_key).fetch(&self.db);
let Some(row) = rows.as_mut().next().await else {
return Ok(None);
};
let row = row.map_err(SqliteEnvelopeStoreError::FetchError)?;
Ok(Some(extract_batch(own_key, sampling_key, row)?))
}
pub async fn project_key_pairs(
&self,
) -> Result<HashSet<ProjectKeyPair>, SqliteEnvelopeStoreError> {
let project_key_pairs = build_get_project_key_pairs()
.fetch_all(&self.db)
.await
.map_err(SqliteEnvelopeStoreError::FetchError)?;
let project_key_pairs = project_key_pairs
.into_iter()
.filter_map(|project_key_pair| extract_project_key_pair(project_key_pair).ok())
.collect();
Ok(project_key_pairs)
}
pub fn usage(&self) -> u64 {
self.disk_usage.usage()
}
pub async fn total_count(&self) -> Result<u64, SqliteEnvelopeStoreError> {
let row = build_count_all()
.fetch_one(&self.db)
.await
.map_err(SqliteEnvelopeStoreError::FetchError)?;
let total_count: i64 = row.get(0);
Ok(total_count as u64)
}
}
fn pack_envelopes(envelopes: Vec<DatabaseEnvelope>) -> Box<[u8]> {
let mut packed = vec![];
for envelope in envelopes {
packed.extend_from_slice(&envelope.received_at.to_le_bytes());
packed.extend_from_slice(&(envelope.encoded_envelope.len() as u32).to_le_bytes());
packed.extend_from_slice(&envelope.encoded_envelope);
}
packed.into_boxed_slice()
}
fn unpack_envelopes(
own_key: ProjectKey,
sampling_key: ProjectKey,
data: &[u8],
) -> Result<Vec<DatabaseEnvelope>, std::io::Error> {
let mut envelopes = vec![];
let mut buf = data.reader();
loop {
let mut b = [0u8; 8];
match buf.read(&mut b)? {
0 => break,
n if n != b.len() => return Err(ErrorKind::UnexpectedEof.into()),
_ => {}
}
let received_at = i64::from_le_bytes(b);
let mut b = [0u8; 4];
buf.read_exact(&mut b)?;
let size = u32::from_le_bytes(b);
let mut b = vec![0u8; size as usize];
buf.read_exact(&mut b)?;
envelopes.push(DatabaseEnvelope {
received_at,
own_key,
sampling_key,
encoded_envelope: b.into_boxed_slice(),
});
}
Ok(envelopes)
}
fn extract_batch(
own_key: ProjectKey,
sampling_key: ProjectKey,
row: SqliteRow,
) -> Result<DatabaseBatch, SqliteEnvelopeStoreError> {
let received_at: i64 = row
.try_get("received_at")
.map_err(SqliteEnvelopeStoreError::FetchError)?;
let data: Box<[u8]> = row
.try_get("envelope")
.map_err(SqliteEnvelopeStoreError::FetchError)?;
let count: u64 = row
.try_get("count")
.map_err(SqliteEnvelopeStoreError::FetchError)?;
let envelopes = match count {
0 => {
debug_assert!(false, "db should not contain empty row");
vec![]
}
1 => vec![DatabaseEnvelope {
received_at,
own_key,
sampling_key,
encoded_envelope: data,
}],
_more => unpack_envelopes(own_key, sampling_key, &data)?,
};
Ok(DatabaseBatch {
received_at,
own_key,
sampling_key,
envelopes,
})
}
fn extract_project_key_pair(row: SqliteRow) -> Result<ProjectKeyPair, SqliteEnvelopeStoreError> {
let own_key = row
.try_get("own_key")
.map_err(SqliteEnvelopeStoreError::FetchError)
.and_then(|key| {
ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
});
let sampling_key = row
.try_get("sampling_key")
.map_err(SqliteEnvelopeStoreError::FetchError)
.and_then(|key| {
ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError)
});
match (own_key, sampling_key) {
(Ok(own_key), Ok(sampling_key)) => Ok(ProjectKeyPair::new(own_key, sampling_key)),
(Err(err), _) | (_, Err(err)) => {
relay_log::error!("failed to extract a queue key from the spool record: {err}");
Err(err)
}
}
}
pub fn build_delete_and_fetch_many_envelopes<'a>(
own_key: ProjectKey,
project_key: ProjectKey,
) -> Query<'a, Sqlite, SqliteArguments<'a>> {
sqlx::query(
"DELETE FROM
envelopes
WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ?
ORDER BY received_at DESC LIMIT 1)
RETURNING
received_at, own_key, sampling_key, envelope, count",
)
.bind(own_key.to_string())
.bind(project_key.to_string())
}
pub fn build_estimate_size<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
sqlx::query(
r#"SELECT (page_count - freelist_count) * page_size as size FROM pragma_page_count(), pragma_freelist_count(), pragma_page_size();"#,
)
}
pub fn build_get_project_key_pairs<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
sqlx::query("SELECT DISTINCT own_key, sampling_key FROM envelopes;")
}
pub fn build_count_all<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> {
sqlx::query("SELECT SUM(count) FROM envelopes;")
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::time::sleep;
use relay_base_schema::project::ProjectKey;
use super::*;
use crate::services::buffer::testutils::utils::{mock_envelopes, setup_db};
#[tokio::test]
async fn test_insert_and_delete_envelopes() {
let db = setup_db(true).await;
let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
let batches = [mock_envelopes(5), mock_envelopes(5)];
for batch in &batches {
assert!(envelope_store
.insert_batch(
batch
.iter()
.map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
.collect::<Vec<_>>()
.try_into()
.unwrap()
)
.await
.is_ok());
}
let batch = envelope_store
.delete_batch(own_key, sampling_key)
.await
.unwrap()
.unwrap();
assert_eq!(batch.len(), 5);
for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
assert_eq!(
extracted_envelope.received_at().timestamp_millis(),
(&batches[1])[i].received_at().timestamp_millis()
);
}
let batch = envelope_store
.delete_batch(own_key, sampling_key)
.await
.unwrap()
.unwrap();
assert_eq!(batch.len(), 5);
for (i, extracted_envelope) in batch.envelopes.iter().enumerate() {
assert_eq!(
extracted_envelope.received_at().timestamp_millis(),
(&batches[0])[i].received_at().timestamp_millis()
);
}
assert!(envelope_store
.delete_batch(own_key, sampling_key)
.await
.unwrap()
.is_none());
}
#[tokio::test]
async fn test_insert_and_delete_single() {
let db = setup_db(true).await;
let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
let inserted = mock_envelopes(1);
assert!(envelope_store
.insert_batch(
inserted
.iter()
.map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
.collect::<Vec<_>>()
.try_into()
.unwrap()
)
.await
.is_ok());
let extracted = envelope_store
.delete_batch(own_key, sampling_key)
.await
.unwrap()
.unwrap();
assert_eq!(extracted.len(), 1);
assert_eq!(
extracted.envelopes[0].received_at().timestamp_millis(),
inserted[0].received_at().timestamp_millis()
);
assert!(envelope_store
.delete_batch(own_key, sampling_key)
.await
.unwrap()
.is_none());
}
#[tokio::test]
async fn test_insert_and_get_project_keys_pairs() {
let db = setup_db(true).await;
let mut envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
let envelopes = mock_envelopes(2);
envelope_store
.insert_batch(
envelopes
.into_iter()
.map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
.collect::<Vec<_>>()
.try_into()
.unwrap(),
)
.await
.unwrap();
let project_key_pairs = envelope_store.project_key_pairs().await.unwrap();
assert_eq!(project_key_pairs.len(), 1);
assert_eq!(
project_key_pairs.into_iter().last().unwrap(),
ProjectKeyPair::new(own_key, sampling_key)
);
}
#[tokio::test]
async fn test_estimate_disk_usage() {
let db = setup_db(true).await;
let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
let disk_usage = DiskUsage::prepare(0, db, Duration::from_millis(1))
.await
.unwrap();
let usage_1 = disk_usage.usage();
assert!(usage_1 > 0);
let envelopes = mock_envelopes(10);
store
.insert_batch(
envelopes
.into_iter()
.map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
.collect::<Vec<_>>()
.try_into()
.unwrap(),
)
.await
.unwrap();
sleep(Duration::from_millis(2)).await;
let usage_2 = disk_usage.usage();
assert!(usage_2 >= usage_1);
}
#[tokio::test]
async fn test_total_count() {
let db = setup_db(true).await;
let mut store = SqliteEnvelopeStore::new(0, db.clone(), Duration::from_millis(1));
let envelopes = mock_envelopes(10);
store
.insert_batch(
envelopes
.iter()
.map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
.collect::<Vec<_>>()
.try_into()
.unwrap(),
)
.await
.unwrap();
assert_eq!(store.total_count().await.unwrap(), envelopes.len() as u64);
}
}