relay_server/services/buffer/envelope_stack/
sqlite.rsuse std::fmt::Debug;
use std::num::NonZeroUsize;
use chrono::{DateTime, Utc};
use relay_base_schema::project::ProjectKey;
use crate::envelope::Envelope;
use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::envelope_store::sqlite::{
DatabaseBatch, DatabaseEnvelope, InsertEnvelopeError, SqliteEnvelopeStore,
SqliteEnvelopeStoreError,
};
use crate::statsd::{RelayCounters, RelayTimers};
#[derive(Debug, thiserror::Error)]
pub enum SqliteEnvelopeStackError {
#[error("envelope store error: {0}")]
EnvelopeStoreError(#[from] SqliteEnvelopeStoreError),
#[error("envelope encode error: {0}")]
Envelope(#[from] InsertEnvelopeError),
}
#[derive(Debug)]
pub struct SqliteEnvelopeStack {
envelope_store: SqliteEnvelopeStore,
batch_size_bytes: NonZeroUsize,
own_key: ProjectKey,
sampling_key: ProjectKey,
batch: Vec<DatabaseEnvelope>,
check_disk: bool,
partition_tag: String,
}
impl SqliteEnvelopeStack {
pub fn new(
partition_id: u8,
envelope_store: SqliteEnvelopeStore,
batch_size_bytes: usize,
own_key: ProjectKey,
sampling_key: ProjectKey,
check_disk: bool,
) -> Self {
Self {
envelope_store,
batch_size_bytes: NonZeroUsize::new(batch_size_bytes)
.expect("batch bytes should be > 0"),
own_key,
sampling_key,
batch: vec![],
check_disk,
partition_tag: partition_id.to_string(),
}
}
fn above_spool_threshold(&self) -> bool {
self.batch.iter().map(|e| e.len()).sum::<usize>() > self.batch_size_bytes.get()
}
async fn spool_to_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> {
let batch = std::mem::take(&mut self.batch);
let Ok(batch) = DatabaseBatch::try_from(batch) else {
return Ok(());
};
relay_statsd::metric!(
counter(RelayCounters::BufferSpooledEnvelopes) += batch.len() as u64,
partition_id = &self.partition_tag
);
relay_statsd::metric!(
timer(RelayTimers::BufferSpool),
partition_id = &self.partition_tag,
{
self.envelope_store
.insert_batch(batch)
.await
.map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?;
}
);
self.check_disk = true;
Ok(())
}
async fn unspool_from_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> {
debug_assert!(self.batch.is_empty());
let batch = relay_statsd::metric!(
timer(RelayTimers::BufferUnspool),
partition_id = &self.partition_tag,
{
self.envelope_store
.delete_batch(self.own_key, self.sampling_key)
.await
.map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?
}
);
match batch {
Some(batch) => {
self.batch = batch.into();
}
None => self.check_disk = false,
}
relay_statsd::metric!(
counter(RelayCounters::BufferUnspooledEnvelopes) += self.batch.len() as u64,
partition_id = &self.partition_tag
);
Ok(())
}
fn validate_envelope(&self, envelope: &Envelope) -> bool {
let own_key = envelope.meta().public_key();
let sampling_key = envelope.sampling_key().unwrap_or(own_key);
self.own_key == own_key && self.sampling_key == sampling_key
}
}
impl EnvelopeStack for SqliteEnvelopeStack {
type Error = SqliteEnvelopeStackError;
async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), Self::Error> {
debug_assert!(self.validate_envelope(&envelope));
if self.above_spool_threshold() {
self.spool_to_disk().await?;
}
let encoded_envelope = relay_statsd::metric!(
timer(RelayTimers::BufferEnvelopesSerialization),
partition_id = &self.partition_tag,
{ DatabaseEnvelope::try_from(envelope.as_ref())? }
);
self.batch.push(encoded_envelope);
Ok(())
}
async fn peek(&mut self) -> Result<Option<DateTime<Utc>>, Self::Error> {
if self.batch.is_empty() && self.check_disk {
self.unspool_from_disk().await?
}
let Some(envelope) = self.batch.last() else {
return Ok(None);
};
Ok(Some(envelope.received_at()))
}
async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
if self.batch.is_empty() && self.check_disk {
self.unspool_from_disk().await?
}
let Some(envelope) = self.batch.pop() else {
return Ok(None);
};
let envelope = envelope.try_into()?;
Ok(Some(envelope))
}
async fn flush(mut self) {
if let Err(e) = self.spool_to_disk().await {
relay_log::error!(error = &e as &dyn std::error::Error, "flush error");
}
}
}
#[cfg(test)]
mod tests {
use chrono::Utc;
use relay_base_schema::project::ProjectKey;
use std::time::Duration;
use super::*;
use crate::services::buffer::testutils::utils::{mock_envelope, mock_envelopes, setup_db};
fn calculate_compressed_size(envelopes: &[Box<Envelope>]) -> usize {
envelopes
.iter()
.map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap().len())
.sum()
}
#[tokio::test]
#[should_panic]
async fn test_push_with_mismatching_project_keys() {
let db = setup_db(false).await;
let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
0,
envelope_store,
10,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("c25ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);
let envelope = mock_envelope(Utc::now());
let _ = stack.push(envelope).await;
}
const COMPRESSED_ENVELOPE_SIZE: usize = 313;
#[tokio::test]
async fn test_push_when_db_is_not_valid() {
let db = setup_db(false).await;
let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
let envelopes = mock_envelopes(4);
let threshold_size = calculate_compressed_size(&envelopes) - 1;
let mut stack = SqliteEnvelopeStack::new(
0,
envelope_store,
threshold_size,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);
for envelope in envelopes.clone() {
assert!(stack.push(envelope).await.is_ok());
}
let envelope = mock_envelope(Utc::now());
assert!(matches!(
stack.push(envelope).await,
Err(SqliteEnvelopeStackError::EnvelopeStoreError(_))
));
let envelope = mock_envelope(Utc::now());
assert!(stack.push(envelope.clone()).await.is_ok());
assert_eq!(stack.batch.len(), 1);
let popped_envelope_1 = stack.pop().await.unwrap().unwrap();
assert_eq!(
popped_envelope_1.event_id().unwrap(),
envelope.event_id().unwrap()
);
assert_eq!(stack.batch.len(), 0);
}
#[tokio::test]
async fn test_pop_when_db_is_not_valid() {
let db = setup_db(false).await;
let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
0,
envelope_store,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);
assert!(matches!(
stack.pop().await,
Err(SqliteEnvelopeStackError::EnvelopeStoreError(_))
));
}
#[tokio::test]
async fn test_pop_when_stack_is_empty() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
0,
envelope_store,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);
assert!(stack.pop().await.unwrap().is_none());
}
#[tokio::test]
async fn test_push_below_threshold_and_pop() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
0,
envelope_store,
9999,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);
let envelopes = mock_envelopes(5);
for envelope in envelopes.clone() {
assert!(stack.push(envelope).await.is_ok());
}
assert_eq!(stack.batch.len(), 5);
let peeked = stack.peek().await.unwrap().unwrap();
assert_eq!(
peeked.timestamp_millis(),
envelopes.clone()[4].received_at().timestamp_millis()
);
for envelope in envelopes.iter().rev() {
let popped_envelope = stack.pop().await.unwrap().unwrap();
assert_eq!(
popped_envelope.event_id().unwrap(),
envelope.event_id().unwrap()
);
}
assert_eq!(stack.batch.len(), 0);
}
#[tokio::test]
async fn test_push_above_threshold_and_pop() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
let envelopes = mock_envelopes(7);
let threshold_size = calculate_compressed_size(&envelopes[..5]) - 1;
let mut stack = SqliteEnvelopeStack::new(
0,
envelope_store,
threshold_size,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);
for envelope in envelopes.clone() {
assert!(stack.push(envelope).await.is_ok());
}
assert_eq!(stack.batch.len(), 2);
let peeked = stack.peek().await.unwrap().unwrap();
assert_eq!(
peeked.timestamp_millis(),
envelopes[6].received_at().timestamp_millis()
);
for envelope in envelopes[5..7].iter().rev() {
let popped_envelope = stack.pop().await.unwrap().unwrap();
assert_eq!(
popped_envelope.event_id().unwrap(),
envelope.event_id().unwrap()
);
}
assert_eq!(stack.batch.len(), 0);
let peeked = stack.peek().await.unwrap().unwrap();
assert_eq!(
peeked.timestamp_millis(),
envelopes[4].received_at().timestamp_millis()
);
let envelope = mock_envelope(Utc::now());
assert!(stack.push(envelope.clone()).await.is_ok());
let popped_envelope = stack.pop().await.unwrap().unwrap();
assert_eq!(
popped_envelope.event_id().unwrap(),
envelope.event_id().unwrap()
);
for envelope in envelopes[0..5].iter().rev() {
let popped_envelope = stack.pop().await.unwrap().unwrap();
assert_eq!(
popped_envelope.event_id().unwrap(),
envelope.event_id().unwrap()
);
}
assert_eq!(stack.batch.len(), 0);
}
#[tokio::test]
async fn test_drain() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
0,
envelope_store.clone(),
10 * COMPRESSED_ENVELOPE_SIZE,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);
let envelopes = mock_envelopes(5);
for envelope in envelopes.clone() {
assert!(stack.push(envelope).await.is_ok());
}
assert_eq!(stack.batch.len(), 5);
assert_eq!(envelope_store.total_count().await.unwrap(), 0);
stack.flush().await;
assert_eq!(envelope_store.total_count().await.unwrap(), 5);
}
}