use std::collections::VecDeque;
use std::fmt::Debug;
use std::num::NonZeroUsize;
use relay_base_schema::project::ProjectKey;
use crate::envelope::Envelope;
use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::envelope_store::sqlite::{
SqliteEnvelopeStore, SqliteEnvelopeStoreError,
};
use crate::statsd::{RelayCounters, RelayTimers};
#[derive(Debug, thiserror::Error)]
pub enum SqliteEnvelopeStackError {
#[error("an error occurred in the envelope store: {0}")]
EnvelopeStoreError(#[from] SqliteEnvelopeStoreError),
}
#[derive(Debug)]
pub struct SqliteEnvelopeStack {
envelope_store: SqliteEnvelopeStore,
spool_threshold: NonZeroUsize,
batch_size: NonZeroUsize,
own_key: ProjectKey,
sampling_key: ProjectKey,
#[allow(clippy::vec_box)]
batches_buffer: VecDeque<Vec<Box<Envelope>>>,
batches_buffer_size: usize,
check_disk: bool,
}
impl SqliteEnvelopeStack {
pub fn new(
envelope_store: SqliteEnvelopeStore,
disk_batch_size: usize,
max_batches: usize,
own_key: ProjectKey,
sampling_key: ProjectKey,
check_disk: bool,
) -> Self {
Self {
envelope_store,
spool_threshold: NonZeroUsize::new(disk_batch_size * max_batches)
.expect("the spool threshold must be > 0"),
batch_size: NonZeroUsize::new(disk_batch_size)
.expect("the disk batch size must be > 0"),
own_key,
sampling_key,
batches_buffer: VecDeque::with_capacity(max_batches),
batches_buffer_size: 0,
check_disk,
}
}
fn above_spool_threshold(&self) -> bool {
self.batches_buffer_size >= self.spool_threshold.get()
}
fn below_unspool_threshold(&self) -> bool {
self.batches_buffer_size == 0
}
async fn spool_to_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> {
let Some(envelopes) = self.batches_buffer.pop_front() else {
return Ok(());
};
self.batches_buffer_size -= envelopes.len();
relay_statsd::metric!(
counter(RelayCounters::BufferSpooledEnvelopes) += envelopes.len() as u64
);
let envelopes = envelopes.iter().filter_map(|e| e.as_ref().try_into().ok());
relay_statsd::metric!(timer(RelayTimers::BufferSpool), {
self.envelope_store
.insert_many(envelopes)
.await
.map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?;
});
self.check_disk = true;
Ok(())
}
async fn unspool_from_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> {
let envelopes = relay_statsd::metric!(timer(RelayTimers::BufferUnspool), {
self.envelope_store
.delete_many(
self.own_key,
self.sampling_key,
self.batch_size.get() as i64,
)
.await
.map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?
});
if envelopes.is_empty() {
self.check_disk = false;
return Ok(());
}
relay_statsd::metric!(
counter(RelayCounters::BufferUnspooledEnvelopes) += envelopes.len() as u64
);
self.batches_buffer_size += envelopes.len();
self.batches_buffer.push_front(envelopes);
Ok(())
}
fn validate_envelope(&self, envelope: &Envelope) -> bool {
let own_key = envelope.meta().public_key();
let sampling_key = envelope.sampling_key().unwrap_or(own_key);
self.own_key == own_key && self.sampling_key == sampling_key
}
}
impl EnvelopeStack for SqliteEnvelopeStack {
type Error = SqliteEnvelopeStackError;
async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), Self::Error> {
debug_assert!(self.validate_envelope(&envelope));
if self.above_spool_threshold() {
self.spool_to_disk().await?;
}
if let Some(last_batch) = self
.batches_buffer
.back_mut()
.filter(|last_batch| last_batch.len() < self.batch_size.get())
{
last_batch.push(envelope);
} else {
let mut new_batch = Vec::with_capacity(self.batch_size.get());
new_batch.push(envelope);
self.batches_buffer.push_back(new_batch);
}
self.batches_buffer_size += 1;
Ok(())
}
async fn peek(&mut self) -> Result<Option<&Envelope>, Self::Error> {
if self.below_unspool_threshold() && self.check_disk {
self.unspool_from_disk().await?
}
let last = self
.batches_buffer
.back()
.and_then(|last_batch| last_batch.last())
.map(|last_batch| last_batch.as_ref());
Ok(last)
}
async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
if self.below_unspool_threshold() && self.check_disk {
relay_log::trace!("Unspool from disk");
self.unspool_from_disk().await?
}
let result = self.batches_buffer.back_mut().and_then(|last_batch| {
self.batches_buffer_size -= 1;
relay_log::trace!("Popping from memory");
last_batch.pop()
});
if result.is_none() {
return Ok(None);
}
if self
.batches_buffer
.back()
.map_or(false, |last_batch| last_batch.is_empty())
{
self.batches_buffer.pop_back();
}
Ok(result)
}
fn flush(self) -> Vec<Box<Envelope>> {
self.batches_buffer.into_iter().flatten().collect()
}
}
#[cfg(test)]
mod tests {
use std::time::{Duration, Instant};
use relay_base_schema::project::ProjectKey;
use super::*;
use crate::services::buffer::testutils::utils::{mock_envelope, mock_envelopes, setup_db};
#[tokio::test]
#[should_panic]
async fn test_push_with_mismatching_project_keys() {
let db = setup_db(false).await;
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
2,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("c25ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);
let envelope = mock_envelope(Instant::now());
let _ = stack.push(envelope).await;
}
#[tokio::test]
async fn test_push_when_db_is_not_valid() {
let db = setup_db(false).await;
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
2,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);
let envelopes = mock_envelopes(4);
for envelope in envelopes.clone() {
assert!(stack.push(envelope).await.is_ok());
}
let envelope = mock_envelope(Instant::now());
assert!(matches!(
stack.push(envelope).await,
Err(SqliteEnvelopeStackError::EnvelopeStoreError(_))
));
let envelope = mock_envelope(Instant::now());
assert!(stack.push(envelope.clone()).await.is_ok());
assert_eq!(stack.batches_buffer_size, 3);
let popped_envelope_1 = stack.pop().await.unwrap().unwrap();
let popped_envelope_2 = stack.pop().await.unwrap().unwrap();
let popped_envelope_3 = stack.pop().await.unwrap().unwrap();
assert_eq!(
popped_envelope_1.event_id().unwrap(),
envelope.event_id().unwrap()
);
assert_eq!(
popped_envelope_2.event_id().unwrap(),
envelopes.clone()[3].event_id().unwrap()
);
assert_eq!(
popped_envelope_3.event_id().unwrap(),
envelopes.clone()[2].event_id().unwrap()
);
assert_eq!(stack.batches_buffer_size, 0);
}
#[tokio::test]
async fn test_pop_when_db_is_not_valid() {
let db = setup_db(false).await;
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
2,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);
assert!(matches!(
stack.pop().await,
Err(SqliteEnvelopeStackError::EnvelopeStoreError(_))
));
}
#[tokio::test]
async fn test_pop_when_stack_is_empty() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
2,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);
assert!(stack.pop().await.unwrap().is_none());
}
#[tokio::test]
async fn test_push_below_threshold_and_pop() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
5,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);
let envelopes = mock_envelopes(5);
for envelope in envelopes.clone() {
assert!(stack.push(envelope).await.is_ok());
}
assert_eq!(stack.batches_buffer_size, 5);
let peeked_envelope = stack.peek().await.unwrap().unwrap();
assert_eq!(
peeked_envelope.event_id().unwrap(),
envelopes.clone()[4].event_id().unwrap()
);
for envelope in envelopes.iter().rev() {
let popped_envelope = stack.pop().await.unwrap().unwrap();
assert_eq!(
popped_envelope.event_id().unwrap(),
envelope.event_id().unwrap()
);
}
}
#[tokio::test]
async fn test_push_above_threshold_and_pop() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
5,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);
let envelopes = mock_envelopes(15);
for envelope in envelopes.clone() {
assert!(stack.push(envelope).await.is_ok());
}
assert_eq!(stack.batches_buffer_size, 10);
let peeked_envelope = stack.peek().await.unwrap().unwrap();
assert_eq!(
peeked_envelope.event_id().unwrap(),
envelopes.clone()[14].event_id().unwrap()
);
for envelope in envelopes[5..15].iter().rev() {
let popped_envelope = stack.pop().await.unwrap().unwrap();
assert_eq!(
popped_envelope.event_id().unwrap(),
envelope.event_id().unwrap()
);
}
assert_eq!(stack.batches_buffer_size, 0);
let peeked_envelope = stack.peek().await.unwrap().unwrap();
assert_eq!(
peeked_envelope.event_id().unwrap(),
envelopes.clone()[4].event_id().unwrap()
);
let envelope = mock_envelope(Instant::now());
assert!(stack.push(envelope.clone()).await.is_ok());
let popped_envelope = stack.pop().await.unwrap().unwrap();
assert_eq!(
popped_envelope.event_id().unwrap(),
envelope.event_id().unwrap()
);
for envelope in envelopes[0..5].iter().rev() {
let popped_envelope = stack.pop().await.unwrap().unwrap();
assert_eq!(
popped_envelope.event_id().unwrap(),
envelope.event_id().unwrap()
);
}
assert_eq!(stack.batches_buffer_size, 0);
}
#[tokio::test]
async fn test_drain() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
envelope_store.clone(),
5,
1,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);
let envelopes = mock_envelopes(5);
for envelope in envelopes.clone() {
assert!(stack.push(envelope).await.is_ok());
}
assert_eq!(stack.batches_buffer_size, 5);
assert_eq!(envelope_store.total_count().await.unwrap(), 0);
let drained_envelopes = stack.flush();
assert_eq!(drained_envelopes.into_iter().collect::<Vec<_>>().len(), 5);
assert_eq!(envelope_store.total_count().await.unwrap(), 0);
}
}