use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::convert::Infallible;
use std::error::Error;
use std::mem;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::Arc;
use std::time::Duration;
use chrono::{DateTime, Utc};
use hashbrown::HashSet;
use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use tokio::time::{timeout, Instant};
use crate::envelope::Envelope;
use crate::envelope::Item;
use crate::services::buffer::common::ProjectKeyPair;
use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError;
use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
use crate::services::buffer::stack_provider::{StackCreationType, StackProvider};
use crate::statsd::{RelayGauges, RelayHistograms, RelayTimers};
use crate::utils::MemoryChecker;
#[derive(Debug)]
#[allow(private_interfaces)]
pub enum PolymorphicEnvelopeBuffer {
InMemory(EnvelopeBuffer<MemoryStackProvider>),
Sqlite(EnvelopeBuffer<SqliteStackProvider>),
}
impl PolymorphicEnvelopeBuffer {
pub fn is_memory(&self) -> bool {
match self {
PolymorphicEnvelopeBuffer::InMemory(_) => true,
PolymorphicEnvelopeBuffer::Sqlite(_) => false,
}
}
pub async fn from_config(
partition_id: u8,
config: &Config,
memory_checker: MemoryChecker,
) -> Result<Self, EnvelopeBufferError> {
let buffer = if config.spool_envelopes_path(partition_id).is_some() {
relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer");
let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(partition_id, config).await?;
Self::Sqlite(buffer)
} else {
relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer");
let buffer = EnvelopeBuffer::<MemoryStackProvider>::new(partition_id, memory_checker);
Self::InMemory(buffer)
};
Ok(buffer)
}
pub async fn initialize(&mut self) {
match self {
PolymorphicEnvelopeBuffer::InMemory(buffer) => buffer.initialize().await,
PolymorphicEnvelopeBuffer::Sqlite(buffer) => buffer.initialize().await,
}
}
pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
relay_statsd::metric!(
histogram(RelayHistograms::BufferEnvelopeBodySize) =
envelope.items().map(Item::len).sum::<usize>() as u64,
partition_id = self.partition_tag()
);
relay_statsd::metric!(
timer(RelayTimers::BufferPush),
partition_id = self.partition_tag(),
{
match self {
Self::Sqlite(buffer) => buffer.push(envelope).await,
Self::InMemory(buffer) => buffer.push(envelope).await,
}?;
}
);
Ok(())
}
pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
relay_statsd::metric!(
timer(RelayTimers::BufferPeek),
partition_id = self.partition_tag(),
{
match self {
Self::Sqlite(buffer) => buffer.peek().await,
Self::InMemory(buffer) => buffer.peek().await,
}
}
)
}
pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
let envelope = relay_statsd::metric!(
timer(RelayTimers::BufferPop),
partition_id = self.partition_tag(),
{
match self {
Self::Sqlite(buffer) => buffer.pop().await,
Self::InMemory(buffer) => buffer.pop().await,
}?
}
);
Ok(envelope)
}
pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
relay_log::trace!(
project_key = project.as_str(),
"buffer marked {}",
if is_ready { "ready" } else { "not ready" }
);
match self {
Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready),
Self::InMemory(buffer) => buffer.mark_ready(project, is_ready),
}
}
pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
match self {
Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
Self::InMemory(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
}
}
pub fn has_capacity(&self) -> bool {
match self {
Self::Sqlite(buffer) => buffer.has_capacity(),
Self::InMemory(buffer) => buffer.has_capacity(),
}
}
pub async fn shutdown(&mut self) -> bool {
let Self::Sqlite(buffer) = self else {
relay_log::trace!("PolymorphicEnvelopeBuffer: shutdown procedure not needed");
return false;
};
buffer.flush().await;
true
}
fn partition_tag(&self) -> &str {
match self {
PolymorphicEnvelopeBuffer::InMemory(buffer) => &buffer.partition_tag,
PolymorphicEnvelopeBuffer::Sqlite(buffer) => &buffer.partition_tag,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum EnvelopeBufferError {
#[error("sqlite")]
SqliteStore(#[from] SqliteEnvelopeStoreError),
#[error("sqlite")]
SqliteStack(#[from] SqliteEnvelopeStackError),
#[error("failed to push envelope to the buffer")]
PushFailed,
}
impl From<Infallible> for EnvelopeBufferError {
fn from(value: Infallible) -> Self {
match value {}
}
}
#[derive(Debug)]
struct EnvelopeBuffer<P: StackProvider> {
priority_queue: priority_queue::PriorityQueue<QueueItem<ProjectKeyPair, P::Stack>, Priority>,
stacks_by_project: hashbrown::HashMap<ProjectKey, BTreeSet<ProjectKeyPair>>,
stack_provider: P,
total_count: Arc<AtomicI64>,
total_count_initialized: bool,
partition_tag: String,
}
impl EnvelopeBuffer<MemoryStackProvider> {
pub fn new(partition_id: u8, memory_checker: MemoryChecker) -> Self {
Self {
stacks_by_project: Default::default(),
priority_queue: Default::default(),
stack_provider: MemoryStackProvider::new(memory_checker),
total_count: Arc::new(AtomicI64::new(0)),
total_count_initialized: false,
partition_tag: partition_id.to_string(),
}
}
}
#[allow(dead_code)]
impl EnvelopeBuffer<SqliteStackProvider> {
pub async fn new(partition_id: u8, config: &Config) -> Result<Self, EnvelopeBufferError> {
Ok(Self {
stacks_by_project: Default::default(),
priority_queue: Default::default(),
stack_provider: SqliteStackProvider::new(partition_id, config).await?,
total_count: Arc::new(AtomicI64::new(0)),
total_count_initialized: false,
partition_tag: partition_id.to_string(),
})
}
}
impl<P: StackProvider> EnvelopeBuffer<P>
where
EnvelopeBufferError: From<<P::Stack as EnvelopeStack>::Error>,
{
pub async fn initialize(&mut self) {
relay_statsd::metric!(
timer(RelayTimers::BufferInitialization),
partition_id = &self.partition_tag,
{
let initialization_state = self.stack_provider.initialize().await;
self.load_stacks(initialization_state.project_key_pairs)
.await;
self.load_store_total_count().await;
}
);
}
pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
let received_at = envelope.received_at();
let project_key_pair = ProjectKeyPair::from_envelope(&envelope);
if let Some((
QueueItem {
key: _,
value: stack,
},
_,
)) = self.priority_queue.get_mut(&project_key_pair)
{
stack.push(envelope).await?;
} else {
self.push_stack(
StackCreationType::New,
ProjectKeyPair::from_envelope(&envelope),
Some(envelope),
)
.await?;
}
self.priority_queue
.change_priority_by(&project_key_pair, |prio| {
prio.received_at = received_at;
});
self.total_count.fetch_add(1, AtomicOrdering::SeqCst);
self.track_total_count();
Ok(())
}
pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
let Some((
QueueItem {
key: project_key_pair,
value: stack,
},
Priority {
readiness,
next_project_fetch,
..
},
)) = self.priority_queue.peek_mut()
else {
return Ok(Peek::Empty);
};
let ready = readiness.ready();
Ok(match (stack.peek().await?, ready) {
(None, _) => Peek::Empty,
(Some(last_received_at), true) => Peek::Ready { last_received_at },
(Some(last_received_at), false) => Peek::NotReady {
project_key_pair: *project_key_pair,
next_project_fetch: *next_project_fetch,
last_received_at,
},
})
}
pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else {
return Ok(None);
};
let project_key_pair = *key;
let envelope = stack.pop().await?.expect("found an empty stack");
let last_received_at = stack.peek().await?;
match last_received_at {
None => {
self.pop_stack(project_key_pair);
}
Some(last_received_at) => {
self.priority_queue
.change_priority_by(&project_key_pair, |prio| {
prio.received_at = last_received_at;
});
}
}
self.total_count.fetch_sub(1, AtomicOrdering::SeqCst);
self.track_total_count();
Ok(Some(envelope))
}
pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
let mut changed = false;
if let Some(project_key_pairs) = self.stacks_by_project.get(project) {
for project_key_pair in project_key_pairs {
self.priority_queue
.change_priority_by(project_key_pair, |stack| {
let mut found = false;
for (subkey, readiness) in [
(
project_key_pair.own_key,
&mut stack.readiness.own_project_ready,
),
(
project_key_pair.sampling_key,
&mut stack.readiness.sampling_project_ready,
),
] {
if subkey == *project {
found = true;
if *readiness != is_ready {
changed = true;
*readiness = is_ready;
}
}
}
debug_assert!(found);
});
}
}
changed
}
pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
self.priority_queue
.change_priority_by(project_key_pair, |stack| {
stack.next_project_fetch = Instant::now() + next_fetch;
});
}
pub fn has_capacity(&self) -> bool {
self.stack_provider.has_store_capacity()
}
pub async fn flush(&mut self) {
let priority_queue = mem::take(&mut self.priority_queue);
self.stack_provider
.flush(priority_queue.into_iter().map(|(q, _)| q.value))
.await;
}
async fn push_stack(
&mut self,
stack_creation_type: StackCreationType,
project_key_pair: ProjectKeyPair,
envelope: Option<Box<Envelope>>,
) -> Result<(), EnvelopeBufferError> {
let received_at = envelope.as_ref().map_or(Utc::now(), |e| e.received_at());
let mut stack = self
.stack_provider
.create_stack(stack_creation_type, project_key_pair);
if let Some(envelope) = envelope {
stack.push(envelope).await?;
}
let previous_entry = self.priority_queue.push(
QueueItem {
key: project_key_pair,
value: stack,
},
Priority::new(received_at),
);
debug_assert!(previous_entry.is_none());
for project_key in project_key_pair.iter() {
self.stacks_by_project
.entry(project_key)
.or_default()
.insert(project_key_pair);
}
relay_statsd::metric!(
gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
partition_id = &self.partition_tag
);
Ok(())
}
fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) {
for project_key in project_key_pair.iter() {
self.stacks_by_project
.get_mut(&project_key)
.expect("project_key is missing from lookup")
.remove(&project_key_pair);
}
self.priority_queue.remove(&project_key_pair);
relay_statsd::metric!(
gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
partition_id = &self.partition_tag
);
}
async fn load_stacks(&mut self, project_key_pairs: HashSet<ProjectKeyPair>) {
for project_key_pair in project_key_pairs {
self.push_stack(StackCreationType::Initialization, project_key_pair, None)
.await
.expect("Pushing an empty stack raised an error");
}
}
async fn load_store_total_count(&mut self) {
let total_count = timeout(Duration::from_secs(1), async {
self.stack_provider.store_total_count().await
})
.await;
match total_count {
Ok(total_count) => {
self.total_count
.store(total_count as i64, AtomicOrdering::SeqCst);
self.total_count_initialized = true;
}
Err(error) => {
self.total_count_initialized = false;
relay_log::error!(
error = &error as &dyn Error,
"failed to load the total envelope count of the store",
);
}
};
self.track_total_count();
}
fn track_total_count(&self) {
let total_count = self.total_count.load(AtomicOrdering::SeqCst) as f64;
let initialized = match self.total_count_initialized {
true => "true",
false => "false",
};
relay_statsd::metric!(
histogram(RelayHistograms::BufferEnvelopesCount) = total_count,
initialized = initialized,
stack_type = self.stack_provider.stack_type(),
partition_id = &self.partition_tag
);
}
}
pub enum Peek {
Empty,
Ready {
last_received_at: DateTime<Utc>,
},
NotReady {
project_key_pair: ProjectKeyPair,
next_project_fetch: Instant,
last_received_at: DateTime<Utc>,
},
}
impl Peek {
pub fn last_received_at(&self) -> Option<DateTime<Utc>> {
match self {
Self::Empty => None,
Self::Ready { last_received_at }
| Self::NotReady {
last_received_at, ..
} => Some(*last_received_at),
}
}
}
#[derive(Debug)]
struct QueueItem<K, V> {
key: K,
value: V,
}
impl<K, V> std::borrow::Borrow<K> for QueueItem<K, V> {
fn borrow(&self) -> &K {
&self.key
}
}
impl<K: std::hash::Hash, V> std::hash::Hash for QueueItem<K, V> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.key.hash(state);
}
}
impl<K: PartialEq, V> PartialEq for QueueItem<K, V> {
fn eq(&self, other: &Self) -> bool {
self.key == other.key
}
}
impl<K: PartialEq, V> Eq for QueueItem<K, V> {}
#[derive(Debug, Clone)]
struct Priority {
readiness: Readiness,
received_at: DateTime<Utc>,
next_project_fetch: Instant,
}
impl Priority {
fn new(received_at: DateTime<Utc>) -> Self {
Self {
readiness: Readiness::new(),
received_at,
next_project_fetch: Instant::now(),
}
}
}
impl Ord for Priority {
fn cmp(&self, other: &Self) -> Ordering {
match (self.readiness.ready(), other.readiness.ready()) {
(true, true) => self.received_at.cmp(&other.received_at),
(true, false) => Ordering::Greater,
(false, true) => Ordering::Less,
(false, false) => self
.next_project_fetch
.cmp(&other.next_project_fetch)
.reverse()
.then(self.received_at.cmp(&other.received_at).reverse()),
}
}
}
impl PartialOrd for Priority {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for Priority {
fn eq(&self, other: &Self) -> bool {
self.cmp(other).is_eq()
}
}
impl Eq for Priority {}
#[derive(Debug, Clone, Copy)]
struct Readiness {
own_project_ready: bool,
sampling_project_ready: bool,
}
impl Readiness {
fn new() -> Self {
Self {
own_project_ready: true,
sampling_project_ready: true,
}
}
fn ready(&self) -> bool {
self.own_project_ready && self.sampling_project_ready
}
}
#[cfg(test)]
mod tests {
use relay_common::Dsn;
use relay_event_schema::protocol::EventId;
use relay_sampling::DynamicSamplingContext;
use std::str::FromStr;
use std::sync::Arc;
use uuid::Uuid;
use crate::envelope::{Item, ItemType};
use crate::extractors::RequestMeta;
use crate::services::buffer::common::ProjectKeyPair;
use crate::services::buffer::envelope_store::sqlite::DatabaseEnvelope;
use crate::services::buffer::testutils::utils::mock_envelopes;
use crate::utils::MemoryStat;
use crate::SqliteEnvelopeStore;
use super::*;
impl Peek {
fn is_empty(&self) -> bool {
matches!(self, Peek::Empty)
}
}
fn new_envelope(
own_key: ProjectKey,
sampling_key: Option<ProjectKey>,
event_id: Option<EventId>,
) -> Box<Envelope> {
let mut envelope = Envelope::from_request(
None,
RequestMeta::new(Dsn::from_str(&format!("http://{own_key}@localhost/1")).unwrap()),
);
if let Some(sampling_key) = sampling_key {
envelope.set_dsc(DynamicSamplingContext {
public_key: sampling_key,
trace_id: Uuid::new_v4(),
release: None,
user: Default::default(),
replay_id: None,
environment: None,
transaction: None,
sample_rate: None,
sampled: None,
other: Default::default(),
});
envelope.add_item(Item::new(ItemType::Transaction));
}
if let Some(event_id) = event_id {
envelope.set_event_id(event_id);
}
envelope
}
fn mock_config(path: &str) -> Arc<Config> {
Config::from_json_value(serde_json::json!({
"spool": {
"envelopes": {
"path": path
}
}
}))
.unwrap()
.into()
}
fn mock_memory_checker() -> MemoryChecker {
MemoryChecker::new(MemoryStat::default(), mock_config("my/db/path").clone())
}
async fn peek_received_at(buffer: &mut EnvelopeBuffer<MemoryStackProvider>) -> DateTime<Utc> {
buffer.peek().await.unwrap().last_received_at().unwrap()
}
#[tokio::test]
async fn test_insert_pop() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let project_key3 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
assert!(buffer.pop().await.unwrap().is_none());
assert!(buffer.peek().await.unwrap().is_empty());
let envelope1 = new_envelope(project_key1, None, None);
let time1 = envelope1.meta().received_at();
buffer.push(envelope1).await.unwrap();
let envelope2 = new_envelope(project_key2, None, None);
let time2 = envelope2.meta().received_at();
buffer.push(envelope2).await.unwrap();
assert_eq!(peek_received_at(&mut buffer).await, time2);
buffer.mark_ready(&project_key1, false);
buffer.mark_ready(&project_key2, false);
assert_eq!(peek_received_at(&mut buffer).await, time1);
let envelope3 = new_envelope(project_key3, None, None);
let time3 = envelope3.meta().received_at();
buffer.push(envelope3).await.unwrap();
buffer.mark_ready(&project_key3, false);
assert_eq!(peek_received_at(&mut buffer).await, time1);
buffer.mark_ready(&project_key3, true);
assert_eq!(peek_received_at(&mut buffer).await, time3);
assert_eq!(
buffer.pop().await.unwrap().unwrap().meta().public_key(),
project_key3
);
assert_eq!(peek_received_at(&mut buffer).await, time1);
buffer.mark_ready(&project_key1, true);
assert_eq!(peek_received_at(&mut buffer).await, time1);
buffer.mark_ready(&project_key2, true);
assert_eq!(peek_received_at(&mut buffer).await, time2);
assert_eq!(
buffer.pop().await.unwrap().unwrap().meta().public_key(),
project_key2
);
assert_eq!(
buffer.pop().await.unwrap().unwrap().meta().public_key(),
project_key1
);
assert!(buffer.pop().await.unwrap().is_none());
assert!(buffer.peek().await.unwrap().is_empty());
}
#[tokio::test]
async fn test_project_internal_order() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let envelope1 = new_envelope(project_key, None, None);
let time1 = envelope1.meta().received_at();
let envelope2 = new_envelope(project_key, None, None);
let time2 = envelope2.meta().received_at();
assert!(time2 > time1);
buffer.push(envelope1).await.unwrap();
buffer.push(envelope2).await.unwrap();
assert_eq!(
buffer.pop().await.unwrap().unwrap().meta().received_at(),
time2
);
assert_eq!(
buffer.pop().await.unwrap().unwrap().meta().received_at(),
time1
);
assert!(buffer.pop().await.unwrap().is_none());
}
#[tokio::test]
async fn test_sampling_projects() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
let envelope1 = new_envelope(project_key1, None, None);
let time1 = envelope1.received_at();
buffer.push(envelope1).await.unwrap();
let envelope2 = new_envelope(project_key2, None, None);
let time2 = envelope2.received_at();
buffer.push(envelope2).await.unwrap();
let envelope3 = new_envelope(project_key1, Some(project_key2), None);
let time3 = envelope3.meta().received_at();
buffer.push(envelope3).await.unwrap();
buffer.mark_ready(&project_key1, false);
buffer.mark_ready(&project_key2, false);
assert_eq!(
buffer.peek().await.unwrap().last_received_at().unwrap(),
time1
);
buffer.mark_ready(&project_key2, true);
assert_eq!(
buffer.peek().await.unwrap().last_received_at().unwrap(),
time2
);
buffer.mark_ready(&project_key2, false);
assert_eq!(
buffer.peek().await.unwrap().last_received_at().unwrap(),
time1
);
buffer.mark_ready(&project_key1, true);
assert_eq!(
buffer.peek().await.unwrap().last_received_at().unwrap(),
time1
);
buffer.mark_ready(&project_key2, true);
assert_eq!(
buffer.pop().await.unwrap().unwrap().meta().received_at(),
time3
);
assert_eq!(
buffer.peek().await.unwrap().last_received_at().unwrap(),
time2
);
buffer.mark_ready(&project_key2, false);
assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time1);
assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time2);
assert!(buffer.pop().await.unwrap().is_none());
}
#[tokio::test]
async fn test_project_keys_distinct() {
let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
let project_key_pair1 = ProjectKeyPair::new(project_key1, project_key2);
let project_key_pair2 = ProjectKeyPair::new(project_key2, project_key1);
assert_ne!(project_key_pair1, project_key_pair2);
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
buffer
.push(new_envelope(project_key1, Some(project_key2), None))
.await
.unwrap();
buffer
.push(new_envelope(project_key2, Some(project_key1), None))
.await
.unwrap();
assert_eq!(buffer.priority_queue.len(), 2);
}
#[test]
fn test_total_order() {
let p1 = Priority {
readiness: Readiness {
own_project_ready: true,
sampling_project_ready: true,
},
received_at: Utc::now(),
next_project_fetch: Instant::now(),
};
let mut p2 = p1.clone();
p2.next_project_fetch += Duration::from_millis(1);
assert_eq!(p1.cmp(&p2), Ordering::Equal);
assert_eq!(p1, p2);
}
#[tokio::test]
async fn test_last_peek_internal_order() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let event_id_1 = EventId::new();
let envelope1 = new_envelope(project_key_1, None, Some(event_id_1));
let time1 = envelope1.received_at();
let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap();
let event_id_2 = EventId::new();
let envelope2 = new_envelope(project_key_2, None, Some(event_id_2));
let time2 = envelope2.received_at();
buffer.push(envelope1).await.unwrap();
buffer.push(envelope2).await.unwrap();
buffer.mark_ready(&project_key_1, false);
buffer.mark_ready(&project_key_2, false);
let Peek::NotReady {
last_received_at, ..
} = buffer.peek().await.unwrap()
else {
panic!();
};
assert_eq!(last_received_at, time1);
let Peek::NotReady {
last_received_at,
project_key_pair,
..
} = buffer.peek().await.unwrap()
else {
panic!();
};
assert_eq!(last_received_at, time1);
assert_ne!(last_received_at, time2);
buffer.mark_seen(&project_key_pair, Duration::ZERO);
let Peek::NotReady {
last_received_at, ..
} = buffer.peek().await.unwrap()
else {
panic!();
};
assert_eq!(last_received_at, time2);
assert_ne!(last_received_at, time1);
let Peek::NotReady {
last_received_at,
project_key_pair,
..
} = buffer.peek().await.unwrap()
else {
panic!();
};
assert_eq!(last_received_at, time2);
assert_ne!(last_received_at, time1);
buffer.mark_seen(&project_key_pair, Duration::ZERO);
let Peek::NotReady {
last_received_at, ..
} = buffer.peek().await.unwrap()
else {
panic!();
};
assert_eq!(last_received_at, time1);
assert_ne!(last_received_at, time2);
}
#[tokio::test]
async fn test_initialize_buffer() {
let path = std::env::temp_dir()
.join(Uuid::new_v4().to_string())
.into_os_string()
.into_string()
.unwrap();
let config = mock_config(&path);
let mut store = SqliteEnvelopeStore::prepare(0, &config).await.unwrap();
let mut buffer = EnvelopeBuffer::<SqliteStackProvider>::new(0, &config)
.await
.unwrap();
let envelopes = mock_envelopes(10);
assert!(store
.insert_batch(
envelopes
.into_iter()
.map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
.collect::<Vec<_>>()
.try_into()
.unwrap()
)
.await
.is_ok());
assert!(buffer.priority_queue.is_empty());
assert!(buffer.stacks_by_project.is_empty());
buffer.initialize().await;
assert_eq!(buffer.priority_queue.len(), 1);
assert_eq!(buffer.stacks_by_project.len(), 2);
}
}