1use std::cmp::Ordering;
2use std::collections::BTreeSet;
3use std::convert::Infallible;
4use std::error::Error;
5use std::mem;
6use std::time::Duration;
7
8use chrono::{DateTime, Utc};
9use hashbrown::HashSet;
10use relay_base_schema::project::ProjectKey;
11use relay_config::Config;
12use tokio::time::{Instant, timeout};
13
14use crate::envelope::Envelope;
15use crate::envelope::Item;
16use crate::services::buffer::common::ProjectKeyPair;
17use crate::services::buffer::envelope_stack::EnvelopeStack;
18use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
19use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError;
20use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
21use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
22use crate::services::buffer::stack_provider::{StackCreationType, StackProvider};
23use crate::statsd::{RelayDistributions, RelayGauges, RelayTimers};
24use crate::utils::MemoryChecker;
25
26#[derive(Debug)]
34#[allow(private_interfaces)]
35pub enum PolymorphicEnvelopeBuffer {
36 InMemory(EnvelopeBuffer<MemoryStackProvider>),
38 Sqlite(EnvelopeBuffer<SqliteStackProvider>),
40}
41
42impl PolymorphicEnvelopeBuffer {
43 pub fn is_memory(&self) -> bool {
45 match self {
46 Self::InMemory(_) => true,
47 Self::Sqlite(_) => false,
48 }
49 }
50
51 pub async fn from_config(
54 partition_id: u8,
55 config: &Config,
56 memory_checker: MemoryChecker,
57 ) -> Result<Self, EnvelopeBufferError> {
58 let buffer = if config.spool_envelopes_path(partition_id).is_some() {
59 relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer");
60 let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(partition_id, config).await?;
61 Self::Sqlite(buffer)
62 } else {
63 relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer");
64 let buffer = EnvelopeBuffer::<MemoryStackProvider>::new(partition_id, memory_checker);
65 Self::InMemory(buffer)
66 };
67
68 Ok(buffer)
69 }
70
71 pub async fn initialize(&mut self) {
73 match self {
74 PolymorphicEnvelopeBuffer::InMemory(buffer) => buffer.initialize().await,
75 PolymorphicEnvelopeBuffer::Sqlite(buffer) => buffer.initialize().await,
76 }
77 }
78
79 pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
81 relay_statsd::metric!(
82 distribution(RelayDistributions::BufferEnvelopeBodySize) =
83 envelope.items().map(Item::len).sum::<usize>() as u64,
84 partition_id = self.partition_tag()
85 );
86
87 relay_statsd::metric!(
88 timer(RelayTimers::BufferPush),
89 partition_id = self.partition_tag(),
90 {
91 match self {
92 Self::Sqlite(buffer) => buffer.push(envelope).await,
93 Self::InMemory(buffer) => buffer.push(envelope).await,
94 }
95 }
96 )
97 }
98
99 pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
101 relay_statsd::metric!(
102 timer(RelayTimers::BufferPeek),
103 partition_id = self.partition_tag(),
104 {
105 match self {
106 Self::Sqlite(buffer) => buffer.peek().await,
107 Self::InMemory(buffer) => buffer.peek().await,
108 }
109 }
110 )
111 }
112
113 pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
115 relay_statsd::metric!(
116 timer(RelayTimers::BufferPop),
117 partition_id = self.partition_tag(),
118 {
119 match self {
120 Self::Sqlite(buffer) => buffer.pop().await,
121 Self::InMemory(buffer) => buffer.pop().await,
122 }
123 }
124 )
125 }
126
127 pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
132 relay_log::trace!(
133 project_key = project.as_str(),
134 "buffer marked {}",
135 if is_ready { "ready" } else { "not ready" }
136 );
137 match self {
138 Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready),
139 Self::InMemory(buffer) => buffer.mark_ready(project, is_ready),
140 }
141 }
142
143 pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
149 match self {
150 Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
151 Self::InMemory(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
152 }
153 }
154
155 pub fn has_capacity(&self) -> bool {
157 match self {
158 Self::Sqlite(buffer) => buffer.has_capacity(),
159 Self::InMemory(buffer) => buffer.has_capacity(),
160 }
161 }
162
163 pub fn item_count(&self) -> u64 {
166 match self {
167 Self::Sqlite(buffer) => buffer.tracked_count,
168 Self::InMemory(buffer) => buffer.tracked_count,
169 }
170 }
171
172 pub fn total_size(&self) -> Option<u64> {
175 match self {
176 Self::Sqlite(buffer) => buffer.stack_provider.total_size(),
177 Self::InMemory(buffer) => buffer.stack_provider.total_size(),
178 }
179 }
180
181 pub async fn shutdown(&mut self) -> bool {
183 match self {
187 Self::Sqlite(buffer) if !buffer.stack_provider.ephemeral() => {
188 buffer.flush().await;
189 true
190 }
191 _ => {
192 relay_log::trace!("shutdown procedure not needed");
193 false
194 }
195 }
196 }
197
198 fn partition_tag(&self) -> &str {
200 match self {
201 PolymorphicEnvelopeBuffer::InMemory(buffer) => &buffer.partition_tag,
202 PolymorphicEnvelopeBuffer::Sqlite(buffer) => &buffer.partition_tag,
203 }
204 }
205}
206
207#[derive(Debug, thiserror::Error)]
209pub enum EnvelopeBufferError {
210 #[error("sqlite")]
211 SqliteStore(#[from] SqliteEnvelopeStoreError),
212
213 #[error("sqlite")]
214 SqliteStack(#[from] SqliteEnvelopeStackError),
215
216 #[error("failed to push envelope to the buffer")]
217 PushFailed,
218}
219
220impl From<Infallible> for EnvelopeBufferError {
221 fn from(value: Infallible) -> Self {
222 match value {}
223 }
224}
225
226#[derive(Debug)]
231struct EnvelopeBuffer<P: StackProvider> {
232 priority_queue: priority_queue::PriorityQueue<QueueItem<ProjectKeyPair, P::Stack>, Priority>,
234 stacks_by_project: hashbrown::HashMap<ProjectKey, BTreeSet<ProjectKeyPair>>,
236 stack_provider: P,
241 total_count: i64,
248 tracked_count: u64,
254 total_count_initialized: bool,
259 partition_tag: String,
261}
262
263impl EnvelopeBuffer<MemoryStackProvider> {
264 pub fn new(partition_id: u8, memory_checker: MemoryChecker) -> Self {
266 Self {
267 stacks_by_project: Default::default(),
268 priority_queue: Default::default(),
269 stack_provider: MemoryStackProvider::new(memory_checker),
270 total_count: 0,
271 tracked_count: 0,
272 total_count_initialized: false,
273 partition_tag: partition_id.to_string(),
274 }
275 }
276}
277
278#[allow(dead_code)]
279impl EnvelopeBuffer<SqliteStackProvider> {
280 pub async fn new(partition_id: u8, config: &Config) -> Result<Self, EnvelopeBufferError> {
282 Ok(Self {
283 stacks_by_project: Default::default(),
284 priority_queue: Default::default(),
285 stack_provider: SqliteStackProvider::new(partition_id, config).await?,
286 total_count: 0,
287 tracked_count: 0,
288 total_count_initialized: false,
289 partition_tag: partition_id.to_string(),
290 })
291 }
292}
293
294impl<P: StackProvider> EnvelopeBuffer<P>
295where
296 EnvelopeBufferError: From<<P::Stack as EnvelopeStack>::Error>,
297{
298 pub async fn initialize(&mut self) {
301 relay_statsd::metric!(
302 timer(RelayTimers::BufferInitialization),
303 partition_id = &self.partition_tag,
304 {
305 let initialization_state = self.stack_provider.initialize().await;
306 self.load_stacks(initialization_state.project_key_pairs)
307 .await;
308 self.load_store_total_count().await;
309 }
310 );
311 }
312
313 pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
318 let received_at = envelope.received_at();
319
320 let project_key_pair = ProjectKeyPair::from_envelope(&envelope);
321 if let Some((
322 QueueItem {
323 key: _,
324 value: stack,
325 },
326 _,
327 )) = self.priority_queue.get_mut(&project_key_pair)
328 {
329 stack.push(envelope).await?;
330 } else {
331 self.push_stack(
334 StackCreationType::New,
335 ProjectKeyPair::from_envelope(&envelope),
336 Some(envelope),
337 )
338 .await?;
339 }
340 self.priority_queue
341 .change_priority_by(&project_key_pair, |prio| {
342 prio.received_at = received_at;
343 });
344
345 self.total_count += 1;
346 self.tracked_count += 1;
347 self.track_total_count();
348
349 Ok(())
350 }
351
352 pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
354 let Some((
355 QueueItem {
356 key: project_key_pair,
357 value: stack,
358 },
359 Priority {
360 readiness,
361 next_project_fetch,
362 ..
363 },
364 )) = self.priority_queue.peek_mut()
365 else {
366 return Ok(Peek::Empty);
367 };
368
369 let ready = readiness.ready();
370
371 Ok(match (stack.peek().await?, ready) {
372 (None, _) => Peek::Empty,
373 (Some(last_received_at), true) => Peek::Ready {
374 project_key_pair: *project_key_pair,
375 last_received_at,
376 },
377 (Some(last_received_at), false) => Peek::NotReady {
378 project_key_pair: *project_key_pair,
379 next_project_fetch: *next_project_fetch,
380 last_received_at,
381 },
382 })
383 }
384
385 pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
390 let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else {
391 return Ok(None);
392 };
393 let project_key_pair = *key;
394 let envelope = stack.pop().await?.expect("found an empty stack");
395
396 let last_received_at = stack.peek().await?;
397
398 match last_received_at {
399 None => {
400 self.pop_stack(project_key_pair);
401 }
402 Some(last_received_at) => {
403 self.priority_queue
404 .change_priority_by(&project_key_pair, |prio| {
405 prio.received_at = last_received_at;
406 });
407 }
408 }
409
410 self.total_count -= 1;
414 self.tracked_count = self.tracked_count.saturating_sub(1);
415 self.track_total_count();
416
417 Ok(Some(envelope))
418 }
419
420 pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
424 let mut changed = false;
425 if let Some(project_key_pairs) = self.stacks_by_project.get(project) {
426 for project_key_pair in project_key_pairs {
427 self.priority_queue
428 .change_priority_by(project_key_pair, |stack| {
429 let mut found = false;
430 for (subkey, readiness) in [
431 (
432 project_key_pair.own_key,
433 &mut stack.readiness.own_project_ready,
434 ),
435 (
436 project_key_pair.sampling_key,
437 &mut stack.readiness.sampling_project_ready,
438 ),
439 ] {
440 if subkey == *project {
441 found = true;
442 if *readiness != is_ready {
443 changed = true;
444 *readiness = is_ready;
445 }
446 }
447 }
448 debug_assert!(found);
449 });
450 }
451 }
452
453 changed
454 }
455
456 pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
462 self.priority_queue
463 .change_priority_by(project_key_pair, |stack| {
464 stack.next_project_fetch = Instant::now() + next_fetch;
467 });
468 }
469
470 pub fn has_capacity(&self) -> bool {
472 self.stack_provider.has_store_capacity()
473 }
474
475 pub async fn flush(&mut self) {
477 let priority_queue = mem::take(&mut self.priority_queue);
478 self.stack_provider
479 .flush(priority_queue.into_iter().map(|(q, _)| q.value))
480 .await;
481 }
482
483 async fn push_stack(
485 &mut self,
486 stack_creation_type: StackCreationType,
487 project_key_pair: ProjectKeyPair,
488 envelope: Option<Box<Envelope>>,
489 ) -> Result<(), EnvelopeBufferError> {
490 let received_at = envelope.as_ref().map_or(Utc::now(), |e| e.received_at());
491
492 let mut stack = self
493 .stack_provider
494 .create_stack(stack_creation_type, project_key_pair);
495 if let Some(envelope) = envelope {
496 stack.push(envelope).await?;
497 }
498
499 let previous_entry = self.priority_queue.push(
500 QueueItem {
501 key: project_key_pair,
502 value: stack,
503 },
504 Priority::new(received_at),
505 );
506 debug_assert!(previous_entry.is_none());
507 for project_key in project_key_pair.iter() {
508 self.stacks_by_project
509 .entry(project_key)
510 .or_default()
511 .insert(project_key_pair);
512 }
513 relay_statsd::metric!(
514 gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
515 partition_id = &self.partition_tag
516 );
517
518 Ok(())
519 }
520
521 fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) {
523 for project_key in project_key_pair.iter() {
524 self.stacks_by_project
525 .get_mut(&project_key)
526 .expect("project_key is missing from lookup")
527 .remove(&project_key_pair);
528 }
529 self.priority_queue.remove(&project_key_pair);
530
531 relay_statsd::metric!(
532 gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
533 partition_id = &self.partition_tag
534 );
535 }
536
537 async fn load_stacks(&mut self, project_key_pairs: HashSet<ProjectKeyPair>) {
539 for project_key_pair in project_key_pairs {
540 self.push_stack(StackCreationType::Initialization, project_key_pair, None)
541 .await
542 .expect("Pushing an empty stack raised an error");
543 }
544 }
545
546 async fn load_store_total_count(&mut self) {
552 let total_count = timeout(Duration::from_secs(1), async {
553 self.stack_provider.store_total_count().await
554 })
555 .await;
556 match total_count {
557 Ok(total_count) => {
558 self.total_count = total_count as i64;
559 self.total_count_initialized = true;
560 }
561 Err(error) => {
562 self.total_count_initialized = false;
563 relay_log::error!(
564 error = &error as &dyn Error,
565 "failed to load the total envelope count of the store",
566 );
567 }
568 };
569 self.track_total_count();
570 }
571
572 fn track_total_count(&self) {
574 let total_count = self.total_count as f64;
575 let initialized = match self.total_count_initialized {
576 true => "true",
577 false => "false",
578 };
579 relay_statsd::metric!(
580 gauge(RelayGauges::BufferEnvelopesCount) = total_count,
581 initialized = initialized,
582 stack_type = self.stack_provider.stack_type(),
583 partition_id = &self.partition_tag
584 );
585 }
586}
587
588pub enum Peek {
590 Empty,
591 Ready {
592 project_key_pair: ProjectKeyPair,
593 last_received_at: DateTime<Utc>,
594 },
595 NotReady {
596 project_key_pair: ProjectKeyPair,
597 next_project_fetch: Instant,
598 last_received_at: DateTime<Utc>,
599 },
600}
601
602impl Peek {
603 pub fn last_received_at(&self) -> Option<DateTime<Utc>> {
604 match self {
605 Self::Empty => None,
606 Self::Ready {
607 last_received_at, ..
608 }
609 | Self::NotReady {
610 last_received_at, ..
611 } => Some(*last_received_at),
612 }
613 }
614}
615
616#[derive(Debug)]
617struct QueueItem<K, V> {
618 key: K,
619 value: V,
620}
621
622impl<K, V> std::borrow::Borrow<K> for QueueItem<K, V> {
623 fn borrow(&self) -> &K {
624 &self.key
625 }
626}
627
628impl<K: std::hash::Hash, V> std::hash::Hash for QueueItem<K, V> {
629 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
630 self.key.hash(state);
631 }
632}
633
634impl<K: PartialEq, V> PartialEq for QueueItem<K, V> {
635 fn eq(&self, other: &Self) -> bool {
636 self.key == other.key
637 }
638}
639
640impl<K: PartialEq, V> Eq for QueueItem<K, V> {}
641
642#[derive(Debug, Clone)]
643struct Priority {
644 readiness: Readiness,
645 received_at: DateTime<Utc>,
646 next_project_fetch: Instant,
647}
648
649impl Priority {
650 fn new(received_at: DateTime<Utc>) -> Self {
651 Self {
652 readiness: Readiness::new(),
653 received_at,
654 next_project_fetch: Instant::now(),
655 }
656 }
657}
658
659impl Ord for Priority {
660 fn cmp(&self, other: &Self) -> Ordering {
661 match (self.readiness.ready(), other.readiness.ready()) {
662 (true, true) => self.received_at.cmp(&other.received_at),
666 (true, false) => Ordering::Greater,
667 (false, true) => Ordering::Less,
668 (false, false) => self
671 .next_project_fetch
672 .cmp(&other.next_project_fetch)
673 .reverse()
674 .then(self.received_at.cmp(&other.received_at).reverse()),
675 }
676 }
677}
678
679impl PartialOrd for Priority {
680 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
681 Some(self.cmp(other))
682 }
683}
684
685impl PartialEq for Priority {
686 fn eq(&self, other: &Self) -> bool {
687 self.cmp(other).is_eq()
688 }
689}
690
691impl Eq for Priority {}
692
693#[derive(Debug, Clone, Copy)]
694struct Readiness {
695 own_project_ready: bool,
696 sampling_project_ready: bool,
697}
698
699impl Readiness {
700 fn new() -> Self {
701 Self {
704 own_project_ready: true,
705 sampling_project_ready: true,
706 }
707 }
708
709 fn ready(&self) -> bool {
710 self.own_project_ready && self.sampling_project_ready
711 }
712}
713
714#[cfg(test)]
715mod tests {
716 use relay_base_schema::project::ProjectId;
717 use relay_common::Dsn;
718 use relay_event_schema::protocol::EventId;
719 use relay_sampling::DynamicSamplingContext;
720 use std::str::FromStr;
721 use std::sync::Arc;
722 use uuid::Uuid;
723
724 use crate::SqliteEnvelopeStore;
725 use crate::envelope::{Item, ItemType};
726 use crate::extractors::RequestMeta;
727 use crate::services::buffer::common::ProjectKeyPair;
728 use crate::services::buffer::envelope_store::sqlite::DatabaseEnvelope;
729 use crate::services::buffer::testutils::utils::mock_envelopes;
730 use crate::utils::MemoryStat;
731
732 use super::*;
733
734 impl Peek {
735 fn is_empty(&self) -> bool {
736 matches!(self, Peek::Empty)
737 }
738 }
739
740 fn new_envelope(
741 own_key: ProjectKey,
742 sampling_key: Option<ProjectKey>,
743 event_id: Option<EventId>,
744 ) -> Box<Envelope> {
745 let mut envelope = Envelope::from_request(
746 None,
747 RequestMeta::new(Dsn::from_str(&format!("http://{own_key}@localhost/1")).unwrap()),
748 );
749 if let Some(sampling_key) = sampling_key {
750 envelope.set_dsc(DynamicSamplingContext {
751 public_key: sampling_key,
752 project_id: Some(ProjectId::new(42)),
753 trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
754 release: None,
755 user: Default::default(),
756 replay_id: None,
757 environment: None,
758 transaction: None,
759 sample_rate: None,
760 sampled: None,
761 other: Default::default(),
762 });
763 envelope.add_item(Item::new(ItemType::Transaction));
764 }
765 if let Some(event_id) = event_id {
766 envelope.set_event_id(event_id);
767 }
768 envelope
769 }
770
771 fn mock_config(path: &str) -> Arc<Config> {
772 Config::from_json_value(serde_json::json!({
773 "spool": {
774 "envelopes": {
775 "path": path
776 }
777 }
778 }))
779 .unwrap()
780 .into()
781 }
782
783 fn mock_memory_checker() -> MemoryChecker {
784 MemoryChecker::new(MemoryStat::default(), mock_config("my/db/path").clone())
785 }
786
787 async fn peek_received_at(buffer: &mut EnvelopeBuffer<MemoryStackProvider>) -> DateTime<Utc> {
788 buffer.peek().await.unwrap().last_received_at().unwrap()
789 }
790
791 #[tokio::test]
792 async fn test_insert_pop() {
793 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
794
795 let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
796 let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
797 let project_key3 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
798
799 assert!(buffer.pop().await.unwrap().is_none());
800 assert!(buffer.peek().await.unwrap().is_empty());
801
802 let envelope1 = new_envelope(project_key1, None, None);
803 let time1 = envelope1.meta().received_at();
804 buffer.push(envelope1).await.unwrap();
805
806 let envelope2 = new_envelope(project_key2, None, None);
807 let time2 = envelope2.meta().received_at();
808 buffer.push(envelope2).await.unwrap();
809
810 assert_eq!(peek_received_at(&mut buffer).await, time2);
812
813 buffer.mark_ready(&project_key1, false);
814 buffer.mark_ready(&project_key2, false);
815
816 assert_eq!(peek_received_at(&mut buffer).await, time1);
818
819 let envelope3 = new_envelope(project_key3, None, None);
820 let time3 = envelope3.meta().received_at();
821 buffer.push(envelope3).await.unwrap();
822 buffer.mark_ready(&project_key3, false);
823
824 assert_eq!(peek_received_at(&mut buffer).await, time1);
826
827 buffer.mark_ready(&project_key3, true);
829 assert_eq!(peek_received_at(&mut buffer).await, time3);
830 assert_eq!(
831 buffer.pop().await.unwrap().unwrap().meta().public_key(),
832 project_key3
833 );
834
835 assert_eq!(peek_received_at(&mut buffer).await, time1);
837
838 buffer.mark_ready(&project_key1, true);
840 assert_eq!(peek_received_at(&mut buffer).await, time1);
841
842 buffer.mark_ready(&project_key2, true);
844 assert_eq!(peek_received_at(&mut buffer).await, time2);
845 assert_eq!(
846 buffer.pop().await.unwrap().unwrap().meta().public_key(),
847 project_key2
848 );
849
850 assert_eq!(
852 buffer.pop().await.unwrap().unwrap().meta().public_key(),
853 project_key1
854 );
855 assert!(buffer.pop().await.unwrap().is_none());
856 assert!(buffer.peek().await.unwrap().is_empty());
857 }
858
859 #[tokio::test]
860 async fn test_project_internal_order() {
861 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
862
863 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
864
865 let envelope1 = new_envelope(project_key, None, None);
866 let time1 = envelope1.meta().received_at();
867 let envelope2 = new_envelope(project_key, None, None);
868 let time2 = envelope2.meta().received_at();
869
870 assert!(time2 > time1);
871
872 buffer.push(envelope1).await.unwrap();
873 buffer.push(envelope2).await.unwrap();
874
875 assert_eq!(
876 buffer.pop().await.unwrap().unwrap().meta().received_at(),
877 time2
878 );
879 assert_eq!(
880 buffer.pop().await.unwrap().unwrap().meta().received_at(),
881 time1
882 );
883 assert!(buffer.pop().await.unwrap().is_none());
884 }
885
886 #[tokio::test]
887 async fn test_sampling_projects() {
888 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
889
890 let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
891 let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
892
893 let envelope1 = new_envelope(project_key1, None, None);
894 let time1 = envelope1.received_at();
895 buffer.push(envelope1).await.unwrap();
896
897 let envelope2 = new_envelope(project_key2, None, None);
898 let time2 = envelope2.received_at();
899 buffer.push(envelope2).await.unwrap();
900
901 let envelope3 = new_envelope(project_key1, Some(project_key2), None);
902 let time3 = envelope3.meta().received_at();
903 buffer.push(envelope3).await.unwrap();
904
905 buffer.mark_ready(&project_key1, false);
906 buffer.mark_ready(&project_key2, false);
907
908 assert_eq!(
910 buffer.peek().await.unwrap().last_received_at().unwrap(),
911 time1
912 );
913
914 buffer.mark_ready(&project_key2, true);
916 assert_eq!(
917 buffer.peek().await.unwrap().last_received_at().unwrap(),
918 time2
919 );
920
921 buffer.mark_ready(&project_key2, false);
923 assert_eq!(
924 buffer.peek().await.unwrap().last_received_at().unwrap(),
925 time1
926 );
927
928 buffer.mark_ready(&project_key1, true);
930 assert_eq!(
931 buffer.peek().await.unwrap().last_received_at().unwrap(),
932 time1
933 );
934
935 buffer.mark_ready(&project_key2, true);
937 assert_eq!(
938 buffer.pop().await.unwrap().unwrap().meta().received_at(),
939 time3
940 );
941 assert_eq!(
942 buffer.peek().await.unwrap().last_received_at().unwrap(),
943 time2
944 );
945
946 buffer.mark_ready(&project_key2, false);
947 assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time1);
948 assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time2);
949
950 assert!(buffer.pop().await.unwrap().is_none());
951 }
952
953 #[tokio::test]
954 async fn test_project_keys_distinct() {
955 let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
956 let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
957
958 let project_key_pair1 = ProjectKeyPair::new(project_key1, project_key2);
959 let project_key_pair2 = ProjectKeyPair::new(project_key2, project_key1);
960
961 assert_ne!(project_key_pair1, project_key_pair2);
962
963 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
964 buffer
965 .push(new_envelope(project_key1, Some(project_key2), None))
966 .await
967 .unwrap();
968 buffer
969 .push(new_envelope(project_key2, Some(project_key1), None))
970 .await
971 .unwrap();
972 assert_eq!(buffer.priority_queue.len(), 2);
973 }
974
975 #[test]
976 fn test_total_order() {
977 let p1 = Priority {
978 readiness: Readiness {
979 own_project_ready: true,
980 sampling_project_ready: true,
981 },
982 received_at: Utc::now(),
983 next_project_fetch: Instant::now(),
984 };
985 let mut p2 = p1.clone();
986 p2.next_project_fetch += Duration::from_millis(1);
987
988 assert_eq!(p1.cmp(&p2), Ordering::Equal);
990 assert_eq!(p1, p2);
991 }
992
993 #[tokio::test]
994 async fn test_last_peek_internal_order() {
995 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
996
997 let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
998 let event_id_1 = EventId::new();
999 let envelope1 = new_envelope(project_key_1, None, Some(event_id_1));
1000 let time1 = envelope1.received_at();
1001
1002 let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap();
1003 let event_id_2 = EventId::new();
1004 let envelope2 = new_envelope(project_key_2, None, Some(event_id_2));
1005 let time2 = envelope2.received_at();
1006
1007 buffer.push(envelope1).await.unwrap();
1008 buffer.push(envelope2).await.unwrap();
1009
1010 buffer.mark_ready(&project_key_1, false);
1011 buffer.mark_ready(&project_key_2, false);
1012
1013 let Peek::NotReady {
1015 last_received_at, ..
1016 } = buffer.peek().await.unwrap()
1017 else {
1018 panic!();
1019 };
1020 assert_eq!(last_received_at, time1);
1021
1022 let Peek::NotReady {
1024 last_received_at,
1025 project_key_pair,
1026 ..
1027 } = buffer.peek().await.unwrap()
1028 else {
1029 panic!();
1030 };
1031 assert_eq!(last_received_at, time1);
1032 assert_ne!(last_received_at, time2);
1033
1034 buffer.mark_seen(&project_key_pair, Duration::ZERO);
1035
1036 let Peek::NotReady {
1038 last_received_at, ..
1039 } = buffer.peek().await.unwrap()
1040 else {
1041 panic!();
1042 };
1043 assert_eq!(last_received_at, time2);
1044 assert_ne!(last_received_at, time1);
1045
1046 let Peek::NotReady {
1047 last_received_at,
1048 project_key_pair,
1049 ..
1050 } = buffer.peek().await.unwrap()
1051 else {
1052 panic!();
1053 };
1054 assert_eq!(last_received_at, time2);
1055 assert_ne!(last_received_at, time1);
1056
1057 buffer.mark_seen(&project_key_pair, Duration::ZERO);
1058
1059 let Peek::NotReady {
1061 last_received_at, ..
1062 } = buffer.peek().await.unwrap()
1063 else {
1064 panic!();
1065 };
1066 assert_eq!(last_received_at, time1);
1067 assert_ne!(last_received_at, time2);
1068 }
1069
1070 #[tokio::test]
1071 async fn test_initialize_buffer() {
1072 let path = std::env::temp_dir()
1073 .join(Uuid::new_v4().to_string())
1074 .into_os_string()
1075 .into_string()
1076 .unwrap();
1077 let config = mock_config(&path);
1078 let mut store = SqliteEnvelopeStore::prepare(0, &config).await.unwrap();
1079 let mut buffer = EnvelopeBuffer::<SqliteStackProvider>::new(0, &config)
1080 .await
1081 .unwrap();
1082
1083 let envelopes = mock_envelopes(10);
1086 assert!(
1087 store
1088 .insert_batch(
1089 envelopes
1090 .into_iter()
1091 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
1092 .collect::<Vec<_>>()
1093 .try_into()
1094 .unwrap()
1095 )
1096 .await
1097 .is_ok()
1098 );
1099
1100 assert!(buffer.priority_queue.is_empty());
1102 assert!(buffer.stacks_by_project.is_empty());
1103
1104 buffer.initialize().await;
1105
1106 assert_eq!(buffer.priority_queue.len(), 1);
1109 assert_eq!(buffer.stacks_by_project.len(), 2);
1112 }
1113}