use std::error::Error;
use std::num::NonZeroU8;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use ahash::RandomState;
use chrono::DateTime;
use chrono::Utc;
use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
use relay_system::{Controller, Shutdown};
use relay_system::{Receiver, ServiceRunner};
use tokio::sync::mpsc::Permit;
use tokio::sync::{mpsc, watch};
use tokio::time::{timeout, Instant};
use crate::envelope::Envelope;
use crate::services::buffer::envelope_buffer::Peek;
use crate::services::global_config;
use crate::services::outcome::DiscardReason;
use crate::services::outcome::Outcome;
use crate::services::outcome::TrackOutcome;
use crate::services::processor::ProcessingGroup;
use crate::services::projects::cache::{legacy, ProjectCacheHandle, ProjectChange};
use crate::services::test_store::TestStore;
use crate::statsd::RelayTimers;
use crate::statsd::{RelayCounters, RelayHistograms};
use crate::utils::ManagedEnvelope;
use crate::MemoryChecker;
use crate::MemoryStat;
pub use envelope_buffer::EnvelopeBufferError;
pub use envelope_buffer::PolymorphicEnvelopeBuffer;
pub use envelope_stack::sqlite::SqliteEnvelopeStack;
pub use envelope_stack::EnvelopeStack;
pub use envelope_store::sqlite::SqliteEnvelopeStore;
pub use common::ProjectKeyPair;
mod common;
mod envelope_buffer;
mod envelope_stack;
mod envelope_store;
mod stack_provider;
mod testutils;
const PARTITIONING_HASHING_SEED: usize = 0;
#[derive(Debug)]
pub enum EnvelopeBuffer {
Push(Box<Envelope>),
NotReady(ProjectKey, Box<Envelope>),
}
impl EnvelopeBuffer {
fn name(&self) -> &'static str {
match &self {
EnvelopeBuffer::Push(_) => "push",
EnvelopeBuffer::NotReady(..) => "project_not_ready",
}
}
}
impl Interface for EnvelopeBuffer {}
impl FromMessage<Self> for EnvelopeBuffer {
type Response = NoResponse;
fn from_message(message: Self, _: ()) -> Self {
message
}
}
#[derive(Debug, Clone)]
pub struct PartitionedEnvelopeBuffer {
buffers: Arc<Vec<ObservableEnvelopeBuffer>>,
}
impl PartitionedEnvelopeBuffer {
#[allow(clippy::too_many_arguments)]
pub fn create(
partitions: NonZeroU8,
config: Arc<Config>,
memory_stat: MemoryStat,
global_config_rx: watch::Receiver<global_config::Status>,
envelopes_tx: mpsc::Sender<legacy::DequeuedEnvelope>,
project_cache_handle: ProjectCacheHandle,
outcome_aggregator: Addr<TrackOutcome>,
test_store: Addr<TestStore>,
runner: &mut ServiceRunner,
) -> Self {
let mut envelope_buffers = Vec::with_capacity(partitions.get() as usize);
for partition_id in 0..partitions.get() {
let envelope_buffer = EnvelopeBufferService::new(
partition_id,
config.clone(),
memory_stat.clone(),
global_config_rx.clone(),
Services {
envelopes_tx: envelopes_tx.clone(),
project_cache_handle: project_cache_handle.clone(),
outcome_aggregator: outcome_aggregator.clone(),
test_store: test_store.clone(),
},
)
.start_in(runner);
envelope_buffers.push(envelope_buffer);
}
Self {
buffers: Arc::new(envelope_buffers),
}
}
pub fn buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
let state = RandomState::with_seed(PARTITIONING_HASHING_SEED);
let buffer_index = (state.hash_one(project_key_pair) % self.buffers.len() as u64) as usize;
self.buffers
.get(buffer_index)
.expect("buffers should not be empty")
}
pub fn has_capacity(&self) -> bool {
if self.buffers.is_empty() {
return true;
}
self.buffers.iter().all(|buffer| buffer.has_capacity())
}
}
#[derive(Debug, Clone)]
pub struct ObservableEnvelopeBuffer {
addr: Addr<EnvelopeBuffer>,
has_capacity: Arc<AtomicBool>,
}
impl ObservableEnvelopeBuffer {
pub fn addr(&self) -> Addr<EnvelopeBuffer> {
self.addr.clone()
}
pub fn has_capacity(&self) -> bool {
self.has_capacity.load(Ordering::Relaxed)
}
}
#[derive(Clone)]
pub struct Services {
pub envelopes_tx: mpsc::Sender<legacy::DequeuedEnvelope>,
pub project_cache_handle: ProjectCacheHandle,
pub outcome_aggregator: Addr<TrackOutcome>,
pub test_store: Addr<TestStore>,
}
pub struct EnvelopeBufferService {
partition_id: u8,
config: Arc<Config>,
memory_stat: MemoryStat,
global_config_rx: watch::Receiver<global_config::Status>,
services: Services,
has_capacity: Arc<AtomicBool>,
sleep: Duration,
}
const DEFAULT_SLEEP: Duration = Duration::from_secs(1);
impl EnvelopeBufferService {
pub fn new(
partition_id: u8,
config: Arc<Config>,
memory_stat: MemoryStat,
global_config_rx: watch::Receiver<global_config::Status>,
services: Services,
) -> Self {
Self {
partition_id,
config,
memory_stat,
global_config_rx,
services,
has_capacity: Arc::new(AtomicBool::new(true)),
sleep: Duration::ZERO,
}
}
pub fn start_in(self, runner: &mut ServiceRunner) -> ObservableEnvelopeBuffer {
let has_capacity = self.has_capacity.clone();
let addr = runner.start(self);
ObservableEnvelopeBuffer { addr, has_capacity }
}
async fn ready_to_pop(
&mut self,
buffer: &PolymorphicEnvelopeBuffer,
dequeue: bool,
) -> Option<Permit<legacy::DequeuedEnvelope>> {
self.system_ready(buffer, dequeue).await;
if self.sleep > Duration::ZERO {
tokio::time::sleep(self.sleep).await;
}
self.services.envelopes_tx.reserve().await.ok()
}
async fn system_ready(&self, buffer: &PolymorphicEnvelopeBuffer, dequeue: bool) {
loop {
let memory_ready = buffer.is_memory() || self.memory_ready();
let global_config_ready = self.global_config_rx.borrow().is_ready();
if memory_ready && global_config_ready && dequeue {
return;
}
tokio::time::sleep(DEFAULT_SLEEP).await;
}
}
fn memory_ready(&self) -> bool {
self.memory_stat.memory().used_percent()
<= self.config.spool_max_backpressure_memory_percent()
}
async fn try_pop<'a>(
partition_tag: &str,
config: &Config,
buffer: &mut PolymorphicEnvelopeBuffer,
services: &Services,
envelopes_tx_permit: Permit<'a, legacy::DequeuedEnvelope>,
) -> Result<Duration, EnvelopeBufferError> {
let sleep = match buffer.peek().await? {
Peek::Empty => {
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "empty",
partition_id = partition_tag
);
DEFAULT_SLEEP }
Peek::Ready { last_received_at }
| Peek::NotReady {
last_received_at, ..
} if is_expired(last_received_at, config) => {
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "expired",
partition_id = partition_tag
);
let envelope = buffer
.pop()
.await?
.expect("Element disappeared despite exclusive excess");
Self::drop_expired(envelope, services);
Duration::ZERO }
Peek::Ready { .. } => {
relay_log::trace!("EnvelopeBufferService: popping envelope");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "ready",
partition_id = partition_tag
);
let envelope = buffer
.pop()
.await?
.expect("Element disappeared despite exclusive excess");
envelopes_tx_permit.send(legacy::DequeuedEnvelope(envelope));
Duration::ZERO }
Peek::NotReady {
project_key_pair,
next_project_fetch,
last_received_at: _,
} => {
relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "not_ready",
partition_id = partition_tag
);
if Instant::now() >= next_project_fetch {
relay_log::trace!("EnvelopeBufferService: requesting project(s) update");
let ProjectKeyPair {
own_key,
sampling_key,
} = project_key_pair;
services.project_cache_handle.fetch(own_key);
if sampling_key != own_key {
services.project_cache_handle.fetch(sampling_key);
}
buffer.mark_seen(&project_key_pair, DEFAULT_SLEEP);
}
DEFAULT_SLEEP }
};
Ok(sleep)
}
fn drop_expired(envelope: Box<Envelope>, services: &Services) {
let mut managed_envelope = ManagedEnvelope::new(
envelope,
services.outcome_aggregator.clone(),
services.test_store.clone(),
ProcessingGroup::Ungrouped,
);
managed_envelope.reject(Outcome::Invalid(DiscardReason::Timestamp));
}
async fn handle_message(
partition_tag: &str,
buffer: &mut PolymorphicEnvelopeBuffer,
services: &Services,
message: EnvelopeBuffer,
) {
match message {
EnvelopeBuffer::Push(envelope) => {
relay_log::trace!("EnvelopeBufferService: received push message");
Self::push(buffer, envelope).await;
}
EnvelopeBuffer::NotReady(project_key, envelope) => {
relay_log::trace!(
"EnvelopeBufferService: received project not ready message for project key {}",
&project_key
);
relay_statsd::metric!(
counter(RelayCounters::BufferEnvelopesReturned) += 1,
partition_id = partition_tag
);
Self::push(buffer, envelope).await;
let project = services.project_cache_handle.get(project_key);
buffer.mark_ready(&project_key, !project.state().is_pending());
}
};
}
async fn handle_shutdown(buffer: &mut PolymorphicEnvelopeBuffer, message: Shutdown) -> bool {
if let Some(shutdown_timeout) = message.timeout {
relay_log::trace!("EnvelopeBufferService: shutting down gracefully");
let shutdown_result = timeout(shutdown_timeout, buffer.shutdown()).await;
match shutdown_result {
Ok(shutdown_result) => {
return shutdown_result;
}
Err(error) => {
relay_log::error!(
error = &error as &dyn Error,
"the envelope buffer didn't shut down in time, some envelopes might be lost",
);
}
}
}
false
}
async fn push(buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box<Envelope>) {
if let Err(e) = buffer.push(envelope).await {
relay_log::error!(
error = &e as &dyn std::error::Error,
"failed to push envelope"
);
}
}
fn update_observable_state(&self, buffer: &mut PolymorphicEnvelopeBuffer) {
self.has_capacity
.store(buffer.has_capacity(), Ordering::Relaxed);
}
}
fn is_expired(last_received_at: DateTime<Utc>, config: &Config) -> bool {
(Utc::now() - last_received_at)
.to_std()
.is_ok_and(|age| age > config.spool_envelopes_max_age())
}
impl Service for EnvelopeBufferService {
type Interface = EnvelopeBuffer;
async fn run(mut self, mut rx: Receiver<Self::Interface>) {
let config = self.config.clone();
let memory_checker = MemoryChecker::new(self.memory_stat.clone(), config.clone());
let mut global_config_rx = self.global_config_rx.clone();
let services = self.services.clone();
let dequeue = Arc::<AtomicBool>::new(true.into());
let dequeue1 = dequeue.clone();
let mut buffer =
PolymorphicEnvelopeBuffer::from_config(self.partition_id, &config, memory_checker)
.await
.expect("failed to start the envelope buffer service");
buffer.initialize().await;
let partition_tag = self.partition_id.to_string();
let mut shutdown = Controller::shutdown_handle();
let mut project_changes = self.services.project_cache_handle.changes();
#[cfg(unix)]
relay_system::spawn!(async move {
use tokio::signal::unix::{signal, SignalKind};
let Ok(mut signal) = signal(SignalKind::user_defined1()) else {
return;
};
while let Some(()) = signal.recv().await {
let deq = !dequeue1.load(Ordering::Relaxed);
dequeue1.store(deq, Ordering::Relaxed);
relay_log::info!("SIGUSR1 receive, dequeue={}", deq);
}
});
relay_log::info!("EnvelopeBufferService {}: starting", self.partition_id);
loop {
let used_capacity =
self.services.envelopes_tx.max_capacity() - self.services.envelopes_tx.capacity();
relay_statsd::metric!(
histogram(RelayHistograms::BufferBackpressureEnvelopesCount) = used_capacity as u64,
partition_id = &partition_tag,
);
let mut sleep = DEFAULT_SLEEP;
let start = Instant::now();
tokio::select! {
Some(permit) = self.ready_to_pop(&buffer, dequeue.load(Ordering::Relaxed)) => {
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "pop", partition_id = &partition_tag);
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "pop", partition_id = &partition_tag, {
match Self::try_pop(&partition_tag, &config, &mut buffer, &services, permit).await {
Ok(new_sleep) => {
sleep = new_sleep;
}
Err(error) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
"failed to pop envelope"
);
}
}});
}
change = project_changes.recv() => {
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "project_change", partition_id = &partition_tag);
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "project_change", partition_id = &partition_tag, {
if let Ok(ProjectChange::Ready(project_key)) = change {
buffer.mark_ready(&project_key, true);
}
sleep = Duration::ZERO;
});
}
Some(message) = rx.recv() => {
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "handle_message", partition_id = &partition_tag);
let message_name = message.name();
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = message_name, partition_id = &partition_tag, {
Self::handle_message(&partition_tag, &mut buffer, &services, message).await;
sleep = Duration::ZERO;
});
}
shutdown = shutdown.notified() => {
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "shutdown", partition_id = &partition_tag);
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "shutdown", partition_id = &partition_tag, {
if Self::handle_shutdown(&mut buffer, shutdown).await {
break;
}
});
}
Ok(()) = global_config_rx.changed() => {
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "global_config_change", partition_id = &partition_tag);
sleep = Duration::ZERO;
}
else => break,
}
self.sleep = sleep;
self.update_observable_state(&mut buffer);
}
relay_log::info!("EnvelopeBufferService {}: stopping", self.partition_id);
}
}
#[cfg(test)]
mod tests {
use chrono::Utc;
use relay_dynamic_config::GlobalConfig;
use relay_quotas::DataCategory;
use std::time::Duration;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::services::projects::project::ProjectState;
use crate::testutils::new_envelope;
use crate::MemoryStat;
use super::*;
struct EnvelopeBufferServiceResult {
service: EnvelopeBufferService,
global_tx: watch::Sender<global_config::Status>,
envelopes_rx: mpsc::Receiver<legacy::DequeuedEnvelope>,
project_cache_handle: ProjectCacheHandle,
outcome_aggregator_rx: mpsc::UnboundedReceiver<TrackOutcome>,
}
fn envelope_buffer_service(
config_json: Option<serde_json::Value>,
global_config_status: global_config::Status,
) -> EnvelopeBufferServiceResult {
relay_log::init_test!();
let config = Arc::new(
config_json.map_or_else(Config::default, |c| Config::from_json_value(c).unwrap()),
);
let memory_stat = MemoryStat::default();
let (global_tx, global_rx) = watch::channel(global_config_status);
let (envelopes_tx, envelopes_rx) = mpsc::channel(5);
let (outcome_aggregator, outcome_aggregator_rx) = Addr::custom();
let project_cache_handle = ProjectCacheHandle::for_test();
let envelope_buffer_service = EnvelopeBufferService::new(
0,
config,
memory_stat,
global_rx,
Services {
envelopes_tx,
project_cache_handle: project_cache_handle.clone(),
outcome_aggregator,
test_store: Addr::dummy(),
},
);
EnvelopeBufferServiceResult {
service: envelope_buffer_service,
global_tx,
envelopes_rx,
project_cache_handle,
outcome_aggregator_rx,
}
}
#[tokio::test(start_paused = true)]
async fn capacity_is_updated() {
let EnvelopeBufferServiceResult {
service,
global_tx: _global_tx,
envelopes_rx: _envelopes_rx,
project_cache_handle,
outcome_aggregator_rx: _outcome_aggregator_rx,
} = envelope_buffer_service(None, global_config::Status::Pending);
service.has_capacity.store(false, Ordering::Relaxed);
let ObservableEnvelopeBuffer { has_capacity, .. } =
service.start_in(&mut ServiceRunner::new());
assert!(!has_capacity.load(Ordering::Relaxed));
tokio::time::advance(Duration::from_millis(100)).await;
let some_project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
project_cache_handle.test_set_project_state(some_project_key, ProjectState::Disabled);
tokio::time::advance(Duration::from_millis(100)).await;
assert!(has_capacity.load(Ordering::Relaxed));
}
#[tokio::test(start_paused = true)]
async fn pop_requires_global_config() {
let EnvelopeBufferServiceResult {
service,
global_tx,
envelopes_rx,
project_cache_handle,
outcome_aggregator_rx: _outcome_aggregator_rx,
..
} = envelope_buffer_service(None, global_config::Status::Pending);
let addr = service.start_detached();
let envelope = new_envelope(false, "foo");
let project_key = envelope.meta().public_key();
addr.send(EnvelopeBuffer::Push(envelope.clone()));
project_cache_handle.test_set_project_state(project_key, ProjectState::Disabled);
tokio::time::sleep(Duration::from_millis(1000)).await;
assert_eq!(envelopes_rx.len(), 0);
global_tx.send_replace(global_config::Status::Ready(Arc::new(
GlobalConfig::default(),
)));
tokio::time::sleep(Duration::from_millis(1000)).await;
assert_eq!(envelopes_rx.len(), 1);
}
#[tokio::test(start_paused = true)]
async fn pop_requires_memory_capacity() {
let EnvelopeBufferServiceResult {
service,
envelopes_rx,
project_cache_handle,
outcome_aggregator_rx: _outcome_aggregator_rx,
global_tx: _global_tx,
..
} = envelope_buffer_service(
Some(serde_json::json!({
"spool": {
"envelopes": {
"path": std::env::temp_dir().join(Uuid::new_v4().to_string()),
}
},
"health": {
"max_memory_bytes": 0,
}
})),
global_config::Status::Ready(Arc::new(GlobalConfig::default())),
);
let addr = service.start_detached();
let envelope = new_envelope(false, "foo");
let project_key = envelope.meta().public_key();
addr.send(EnvelopeBuffer::Push(envelope.clone()));
project_cache_handle.test_set_project_state(project_key, ProjectState::Disabled);
tokio::time::sleep(Duration::from_millis(1000)).await;
assert_eq!(envelopes_rx.len(), 0);
}
#[tokio::test(start_paused = true)]
async fn old_envelope_is_dropped() {
let EnvelopeBufferServiceResult {
service,
envelopes_rx,
project_cache_handle: _project_cache_handle,
mut outcome_aggregator_rx,
global_tx: _global_tx,
} = envelope_buffer_service(
Some(serde_json::json!({
"spool": {
"envelopes": {
"max_envelope_delay_secs": 1,
}
}
})),
global_config::Status::Ready(Arc::new(GlobalConfig::default())),
);
let config = service.config.clone();
let addr = service.start_detached();
tokio::time::sleep(Duration::from_millis(100)).await;
let mut envelope = new_envelope(false, "foo");
envelope.meta_mut().set_received_at(
Utc::now()
- chrono::Duration::seconds(2 * config.spool_envelopes_max_age().as_secs() as i64),
);
addr.send(EnvelopeBuffer::Push(envelope));
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(envelopes_rx.len(), 0);
let outcome = outcome_aggregator_rx.try_recv().unwrap();
assert_eq!(outcome.category, DataCategory::TransactionIndexed);
assert_eq!(outcome.quantity, 1);
}
#[tokio::test(start_paused = true)]
async fn test_update_project() {
let EnvelopeBufferServiceResult {
service,
mut envelopes_rx,
project_cache_handle,
global_tx: _global_tx,
outcome_aggregator_rx: _outcome_aggregator_rx,
} = envelope_buffer_service(
None,
global_config::Status::Ready(Arc::new(GlobalConfig::default())),
);
let addr = service.start_detached();
let envelope = new_envelope(false, "foo");
let project_key = envelope.meta().public_key();
tokio::time::sleep(Duration::from_secs(1)).await;
addr.send(EnvelopeBuffer::Push(envelope.clone()));
tokio::time::sleep(Duration::from_secs(3)).await;
let legacy::DequeuedEnvelope(envelope) = envelopes_rx.recv().await.unwrap();
addr.send(EnvelopeBuffer::NotReady(project_key, envelope));
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(project_cache_handle.test_num_fetches(), 2);
tokio::time::sleep(Duration::from_millis(1300)).await;
assert_eq!(project_cache_handle.test_num_fetches(), 3);
}
#[tokio::test(start_paused = true)]
async fn output_is_throttled() {
let EnvelopeBufferServiceResult {
service,
mut envelopes_rx,
project_cache_handle,
global_tx: _global_tx,
outcome_aggregator_rx: _outcome_aggregator_rx,
..
} = envelope_buffer_service(
None,
global_config::Status::Ready(Arc::new(GlobalConfig::default())),
);
let addr = service.start_detached();
let envelope = new_envelope(false, "foo");
let project_key = envelope.meta().public_key();
for _ in 0..10 {
addr.send(EnvelopeBuffer::Push(envelope.clone()));
}
project_cache_handle.test_set_project_state(project_key, ProjectState::Disabled);
tokio::time::sleep(Duration::from_millis(100)).await;
let mut messages = vec![];
envelopes_rx.recv_many(&mut messages, 100).await;
assert_eq!(
messages
.iter()
.filter(|message| matches!(message, legacy::DequeuedEnvelope(..)))
.count(),
5
);
tokio::time::sleep(Duration::from_millis(100)).await;
let mut messages = vec![];
envelopes_rx.recv_many(&mut messages, 100).await;
assert_eq!(
messages
.iter()
.filter(|message| matches!(message, legacy::DequeuedEnvelope(..)))
.count(),
5
);
}
#[tokio::test(start_paused = true)]
async fn test_partitioned_buffer() {
let mut runner = ServiceRunner::new();
let (_global_tx, global_rx) = watch::channel(global_config::Status::Ready(Arc::new(
GlobalConfig::default(),
)));
let (envelopes_tx, mut envelopes_rx) = mpsc::channel(10);
let (outcome_aggregator, _outcome_rx) = Addr::custom();
let project_cache_handle = ProjectCacheHandle::for_test();
let services = Services {
envelopes_tx,
project_cache_handle: project_cache_handle.clone(),
outcome_aggregator,
test_store: Addr::dummy(),
};
let config = Arc::new(Config::default());
let buffer1 = EnvelopeBufferService::new(
0,
config.clone(),
MemoryStat::default(),
global_rx.clone(),
services.clone(),
);
let buffer2 = EnvelopeBufferService::new(
1,
config.clone(),
MemoryStat::default(),
global_rx,
services,
);
let observable1 = buffer1.start_in(&mut runner);
let observable2 = buffer2.start_in(&mut runner);
let partitioned = PartitionedEnvelopeBuffer {
buffers: Arc::new(vec![observable1, observable2]),
};
let envelope1 = new_envelope(false, "foo");
let envelope2 = new_envelope(false, "bar");
let buffer1 = &partitioned.buffers[0];
let buffer2 = &partitioned.buffers[1];
buffer1.addr().send(EnvelopeBuffer::Push(envelope1));
buffer2.addr().send(EnvelopeBuffer::Push(envelope2));
assert!(envelopes_rx.recv().await.is_some());
assert!(envelopes_rx.recv().await.is_some());
assert!(envelopes_rx.is_empty());
}
}