1use std::cmp::Ordering;
2use std::collections::BTreeSet;
3use std::convert::Infallible;
4use std::error::Error;
5use std::mem;
6use std::time::Duration;
7
8use chrono::{DateTime, Utc};
9use hashbrown::HashSet;
10use relay_base_schema::project::ProjectKey;
11use relay_config::Config;
12use tokio::time::{Instant, timeout};
13
14use crate::envelope::Envelope;
15use crate::envelope::Item;
16use crate::services::buffer::common::ProjectKeyPair;
17use crate::services::buffer::envelope_stack::EnvelopeStack;
18use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
19use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError;
20use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
21use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
22use crate::services::buffer::stack_provider::{StackCreationType, StackProvider};
23use crate::statsd::{RelayDistributions, RelayGauges, RelayTimers};
24use crate::utils::MemoryChecker;
25
26#[derive(Debug)]
34#[allow(private_interfaces)]
35pub enum PolymorphicEnvelopeBuffer {
36 InMemory(EnvelopeBuffer<MemoryStackProvider>),
38 Sqlite(EnvelopeBuffer<SqliteStackProvider>),
40}
41
42impl PolymorphicEnvelopeBuffer {
43 pub fn is_memory(&self) -> bool {
45 match self {
46 Self::InMemory(_) => true,
47 Self::Sqlite(_) => false,
48 }
49 }
50
51 pub async fn from_config(
54 partition_id: u8,
55 config: &Config,
56 memory_checker: MemoryChecker,
57 ) -> Result<Self, EnvelopeBufferError> {
58 let buffer = if config.spool_envelopes_path(partition_id).is_some() {
59 relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer");
60 let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(partition_id, config).await?;
61 Self::Sqlite(buffer)
62 } else {
63 relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer");
64 let buffer = EnvelopeBuffer::<MemoryStackProvider>::new(partition_id, memory_checker);
65 Self::InMemory(buffer)
66 };
67
68 Ok(buffer)
69 }
70
71 pub async fn initialize(&mut self) {
73 match self {
74 PolymorphicEnvelopeBuffer::InMemory(buffer) => buffer.initialize().await,
75 PolymorphicEnvelopeBuffer::Sqlite(buffer) => buffer.initialize().await,
76 }
77 }
78
79 pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
81 relay_statsd::metric!(
82 distribution(RelayDistributions::BufferEnvelopeBodySize) =
83 envelope.items().map(Item::len).sum::<usize>() as u64,
84 partition_id = self.partition_tag()
85 );
86
87 relay_statsd::metric!(
88 timer(RelayTimers::BufferPush),
89 partition_id = self.partition_tag(),
90 {
91 match self {
92 Self::Sqlite(buffer) => buffer.push(envelope).await,
93 Self::InMemory(buffer) => buffer.push(envelope).await,
94 }
95 }
96 )
97 }
98
99 pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
101 relay_statsd::metric!(
102 timer(RelayTimers::BufferPeek),
103 partition_id = self.partition_tag(),
104 {
105 match self {
106 Self::Sqlite(buffer) => buffer.peek().await,
107 Self::InMemory(buffer) => buffer.peek().await,
108 }
109 }
110 )
111 }
112
113 pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
115 relay_statsd::metric!(
116 timer(RelayTimers::BufferPop),
117 partition_id = self.partition_tag(),
118 {
119 match self {
120 Self::Sqlite(buffer) => buffer.pop().await,
121 Self::InMemory(buffer) => buffer.pop().await,
122 }
123 }
124 )
125 }
126
127 pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
132 relay_log::trace!(
133 project_key = project.as_str(),
134 "buffer marked {}",
135 if is_ready { "ready" } else { "not ready" }
136 );
137 match self {
138 Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready),
139 Self::InMemory(buffer) => buffer.mark_ready(project, is_ready),
140 }
141 }
142
143 pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
149 match self {
150 Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
151 Self::InMemory(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
152 }
153 }
154
155 pub fn has_capacity(&self) -> bool {
157 match self {
158 Self::Sqlite(buffer) => buffer.has_capacity(),
159 Self::InMemory(buffer) => buffer.has_capacity(),
160 }
161 }
162
163 pub fn item_count(&self) -> u64 {
166 match self {
167 Self::Sqlite(buffer) => buffer.tracked_count,
168 Self::InMemory(buffer) => buffer.tracked_count,
169 }
170 }
171
172 pub fn total_size(&self) -> Option<u64> {
175 match self {
176 Self::Sqlite(buffer) => buffer.stack_provider.total_size(),
177 Self::InMemory(buffer) => buffer.stack_provider.total_size(),
178 }
179 }
180
181 pub async fn shutdown(&mut self) -> bool {
183 match self {
187 Self::Sqlite(buffer) if !buffer.stack_provider.ephemeral() => {
188 buffer.flush().await;
189 true
190 }
191 _ => {
192 relay_log::trace!("shutdown procedure not needed");
193 false
194 }
195 }
196 }
197
198 fn partition_tag(&self) -> &str {
200 match self {
201 PolymorphicEnvelopeBuffer::InMemory(buffer) => &buffer.partition_tag,
202 PolymorphicEnvelopeBuffer::Sqlite(buffer) => &buffer.partition_tag,
203 }
204 }
205}
206
207#[derive(Debug, thiserror::Error)]
209pub enum EnvelopeBufferError {
210 #[error("sqlite")]
211 SqliteStore(#[from] SqliteEnvelopeStoreError),
212
213 #[error("sqlite")]
214 SqliteStack(#[from] SqliteEnvelopeStackError),
215
216 #[error("failed to push envelope to the buffer")]
217 PushFailed,
218}
219
220impl From<Infallible> for EnvelopeBufferError {
221 fn from(value: Infallible) -> Self {
222 match value {}
223 }
224}
225
226#[derive(Debug)]
231struct EnvelopeBuffer<P: StackProvider> {
232 priority_queue: priority_queue::PriorityQueue<QueueItem<ProjectKeyPair, P::Stack>, Priority>,
234 stacks_by_project: hashbrown::HashMap<ProjectKey, BTreeSet<ProjectKeyPair>>,
236 stack_provider: P,
241 total_count: i64,
248 tracked_count: u64,
254 total_count_initialized: bool,
259 partition_tag: String,
261}
262
263impl EnvelopeBuffer<MemoryStackProvider> {
264 pub fn new(partition_id: u8, memory_checker: MemoryChecker) -> Self {
266 Self {
267 stacks_by_project: Default::default(),
268 priority_queue: Default::default(),
269 stack_provider: MemoryStackProvider::new(memory_checker),
270 total_count: 0,
271 tracked_count: 0,
272 total_count_initialized: false,
273 partition_tag: partition_id.to_string(),
274 }
275 }
276}
277
278#[allow(dead_code)]
279impl EnvelopeBuffer<SqliteStackProvider> {
280 pub async fn new(partition_id: u8, config: &Config) -> Result<Self, EnvelopeBufferError> {
282 Ok(Self {
283 stacks_by_project: Default::default(),
284 priority_queue: Default::default(),
285 stack_provider: SqliteStackProvider::new(partition_id, config).await?,
286 total_count: 0,
287 tracked_count: 0,
288 total_count_initialized: false,
289 partition_tag: partition_id.to_string(),
290 })
291 }
292}
293
294impl<P: StackProvider> EnvelopeBuffer<P>
295where
296 EnvelopeBufferError: From<<P::Stack as EnvelopeStack>::Error>,
297{
298 pub async fn initialize(&mut self) {
301 relay_statsd::metric!(
302 timer(RelayTimers::BufferInitialization),
303 partition_id = &self.partition_tag,
304 {
305 let initialization_state = self.stack_provider.initialize().await;
306 self.load_stacks(initialization_state.project_key_pairs)
307 .await;
308 self.load_store_total_count().await;
309 }
310 );
311 }
312
313 pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
318 let received_at = envelope.received_at();
319
320 let project_key_pair = ProjectKeyPair::from_envelope(&envelope);
321 if let Some((
322 QueueItem {
323 key: _,
324 value: stack,
325 },
326 _,
327 )) = self.priority_queue.get_mut(&project_key_pair)
328 {
329 stack.push(envelope).await?;
330 } else {
331 self.push_stack(
334 StackCreationType::New,
335 ProjectKeyPair::from_envelope(&envelope),
336 Some(envelope),
337 )
338 .await?;
339 }
340 self.priority_queue
341 .change_priority_by(&project_key_pair, |prio| {
342 prio.received_at = received_at;
343 });
344
345 self.total_count += 1;
346 self.tracked_count += 1;
347 self.track_total_count();
348
349 Ok(())
350 }
351
352 pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
354 let Some((
355 QueueItem {
356 key: project_key_pair,
357 value: stack,
358 },
359 Priority {
360 readiness,
361 next_project_fetch,
362 ..
363 },
364 )) = self.priority_queue.peek_mut()
365 else {
366 return Ok(Peek::Empty);
367 };
368
369 let ready = readiness.ready();
370
371 Ok(match (stack.peek().await?, ready) {
372 (None, _) => Peek::Empty,
373 (Some(last_received_at), true) => Peek::Ready {
374 project_key_pair: *project_key_pair,
375 last_received_at,
376 },
377 (Some(last_received_at), false) => Peek::NotReady {
378 project_key_pair: *project_key_pair,
379 next_project_fetch: *next_project_fetch,
380 last_received_at,
381 },
382 })
383 }
384
385 pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
390 let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else {
391 return Ok(None);
392 };
393 let project_key_pair = *key;
394 let envelope = stack.pop().await?.expect("found an empty stack");
395
396 let last_received_at = stack.peek().await?;
397
398 match last_received_at {
399 None => {
400 self.pop_stack(project_key_pair);
401 }
402 Some(last_received_at) => {
403 self.priority_queue
404 .change_priority_by(&project_key_pair, |prio| {
405 prio.received_at = last_received_at;
406 });
407 }
408 }
409
410 self.total_count -= 1;
414 self.tracked_count = self.tracked_count.saturating_sub(1);
415 self.track_total_count();
416
417 Ok(Some(envelope))
418 }
419
420 pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
424 let mut changed = false;
425 if let Some(project_key_pairs) = self.stacks_by_project.get(project) {
426 for project_key_pair in project_key_pairs {
427 self.priority_queue
428 .change_priority_by(project_key_pair, |stack| {
429 let mut found = false;
430 for (subkey, readiness) in [
431 (
432 project_key_pair.own_key,
433 &mut stack.readiness.own_project_ready,
434 ),
435 (
436 project_key_pair.sampling_key,
437 &mut stack.readiness.sampling_project_ready,
438 ),
439 ] {
440 if subkey == *project {
441 found = true;
442 if *readiness != is_ready {
443 changed = true;
444 *readiness = is_ready;
445 }
446 }
447 }
448 debug_assert!(found);
449 });
450 }
451 }
452
453 changed
454 }
455
456 pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
462 self.priority_queue
463 .change_priority_by(project_key_pair, |stack| {
464 stack.next_project_fetch = Instant::now() + next_fetch;
467 });
468 }
469
470 pub fn has_capacity(&self) -> bool {
472 self.stack_provider.has_store_capacity()
473 }
474
475 pub async fn flush(&mut self) {
477 let priority_queue = mem::take(&mut self.priority_queue);
478 self.stack_provider
479 .flush(priority_queue.into_iter().map(|(q, _)| q.value))
480 .await;
481 }
482
483 async fn push_stack(
485 &mut self,
486 stack_creation_type: StackCreationType,
487 project_key_pair: ProjectKeyPair,
488 envelope: Option<Box<Envelope>>,
489 ) -> Result<(), EnvelopeBufferError> {
490 let received_at = envelope.as_ref().map_or(Utc::now(), |e| e.received_at());
491
492 let mut stack = self
493 .stack_provider
494 .create_stack(stack_creation_type, project_key_pair);
495 if let Some(envelope) = envelope {
496 stack.push(envelope).await?;
497 }
498
499 let previous_entry = self.priority_queue.push(
500 QueueItem {
501 key: project_key_pair,
502 value: stack,
503 },
504 Priority::new(received_at),
505 );
506 debug_assert!(previous_entry.is_none());
507 for project_key in project_key_pair.iter() {
508 self.stacks_by_project
509 .entry(project_key)
510 .or_default()
511 .insert(project_key_pair);
512 }
513 relay_statsd::metric!(
514 gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
515 partition_id = &self.partition_tag
516 );
517
518 Ok(())
519 }
520
521 fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) {
523 for project_key in project_key_pair.iter() {
524 self.stacks_by_project
525 .get_mut(&project_key)
526 .expect("project_key is missing from lookup")
527 .remove(&project_key_pair);
528 }
529 self.priority_queue.remove(&project_key_pair);
530
531 relay_statsd::metric!(
532 gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
533 partition_id = &self.partition_tag
534 );
535 }
536
537 async fn load_stacks(&mut self, project_key_pairs: HashSet<ProjectKeyPair>) {
539 for project_key_pair in project_key_pairs {
540 self.push_stack(StackCreationType::Initialization, project_key_pair, None)
541 .await
542 .expect("Pushing an empty stack raised an error");
543 }
544 }
545
546 async fn load_store_total_count(&mut self) {
552 let total_count = timeout(Duration::from_secs(1), async {
553 self.stack_provider.store_total_count().await
554 })
555 .await;
556 match total_count {
557 Ok(total_count) => {
558 self.total_count = total_count as i64;
559 self.total_count_initialized = true;
560 }
561 Err(error) => {
562 self.total_count_initialized = false;
563 relay_log::error!(
564 error = &error as &dyn Error,
565 "failed to load the total envelope count of the store",
566 );
567 }
568 };
569 self.track_total_count();
570 }
571
572 fn track_total_count(&self) {
574 let total_count = self.total_count as f64;
575 let initialized = match self.total_count_initialized {
576 true => "true",
577 false => "false",
578 };
579 relay_statsd::metric!(
580 gauge(RelayGauges::BufferEnvelopesCount) = total_count,
581 initialized = initialized,
582 stack_type = self.stack_provider.stack_type(),
583 partition_id = &self.partition_tag
584 );
585 }
586}
587
588pub enum Peek {
590 Empty,
591 Ready {
592 project_key_pair: ProjectKeyPair,
593 last_received_at: DateTime<Utc>,
594 },
595 NotReady {
596 project_key_pair: ProjectKeyPair,
597 next_project_fetch: Instant,
598 last_received_at: DateTime<Utc>,
599 },
600}
601
602impl Peek {
603 pub fn last_received_at(&self) -> Option<DateTime<Utc>> {
604 match self {
605 Self::Empty => None,
606 Self::Ready {
607 last_received_at, ..
608 }
609 | Self::NotReady {
610 last_received_at, ..
611 } => Some(*last_received_at),
612 }
613 }
614}
615
616#[derive(Debug)]
617struct QueueItem<K, V> {
618 key: K,
619 value: V,
620}
621
622impl<K, V> std::borrow::Borrow<K> for QueueItem<K, V> {
623 fn borrow(&self) -> &K {
624 &self.key
625 }
626}
627
628impl<K: std::hash::Hash, V> std::hash::Hash for QueueItem<K, V> {
629 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
630 self.key.hash(state);
631 }
632}
633
634impl<K: PartialEq, V> PartialEq for QueueItem<K, V> {
635 fn eq(&self, other: &Self) -> bool {
636 self.key == other.key
637 }
638}
639
640impl<K: PartialEq, V> Eq for QueueItem<K, V> {}
641
642#[derive(Debug, Clone)]
643struct Priority {
644 readiness: Readiness,
645 received_at: DateTime<Utc>,
646 next_project_fetch: Instant,
647}
648
649impl Priority {
650 fn new(received_at: DateTime<Utc>) -> Self {
651 Self {
652 readiness: Readiness::new(),
653 received_at,
654 next_project_fetch: Instant::now(),
655 }
656 }
657}
658
659impl Ord for Priority {
660 fn cmp(&self, other: &Self) -> Ordering {
661 match (self.readiness.ready(), other.readiness.ready()) {
662 (true, true) => self.received_at.cmp(&other.received_at),
666 (true, false) => Ordering::Greater,
667 (false, true) => Ordering::Less,
668 (false, false) => self
671 .next_project_fetch
672 .cmp(&other.next_project_fetch)
673 .reverse()
674 .then(self.received_at.cmp(&other.received_at).reverse()),
675 }
676 }
677}
678
679impl PartialOrd for Priority {
680 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
681 Some(self.cmp(other))
682 }
683}
684
685impl PartialEq for Priority {
686 fn eq(&self, other: &Self) -> bool {
687 self.cmp(other).is_eq()
688 }
689}
690
691impl Eq for Priority {}
692
693#[derive(Debug, Clone, Copy)]
694struct Readiness {
695 own_project_ready: bool,
696 sampling_project_ready: bool,
697}
698
699impl Readiness {
700 fn new() -> Self {
701 Self {
704 own_project_ready: true,
705 sampling_project_ready: true,
706 }
707 }
708
709 fn ready(&self) -> bool {
710 self.own_project_ready && self.sampling_project_ready
711 }
712}
713
714#[cfg(test)]
715mod tests {
716 use relay_common::Dsn;
717 use relay_event_schema::protocol::EventId;
718 use relay_sampling::DynamicSamplingContext;
719 use std::str::FromStr;
720 use std::sync::Arc;
721 use uuid::Uuid;
722
723 use crate::SqliteEnvelopeStore;
724 use crate::envelope::{Item, ItemType};
725 use crate::extractors::RequestMeta;
726 use crate::services::buffer::common::ProjectKeyPair;
727 use crate::services::buffer::envelope_store::sqlite::DatabaseEnvelope;
728 use crate::services::buffer::testutils::utils::mock_envelopes;
729 use crate::utils::MemoryStat;
730
731 use super::*;
732
733 impl Peek {
734 fn is_empty(&self) -> bool {
735 matches!(self, Peek::Empty)
736 }
737 }
738
739 fn new_envelope(
740 own_key: ProjectKey,
741 sampling_key: Option<ProjectKey>,
742 event_id: Option<EventId>,
743 ) -> Box<Envelope> {
744 let mut envelope = Envelope::from_request(
745 None,
746 RequestMeta::new(Dsn::from_str(&format!("http://{own_key}@localhost/1")).unwrap()),
747 );
748 if let Some(sampling_key) = sampling_key {
749 envelope.set_dsc(DynamicSamplingContext {
750 public_key: sampling_key,
751 trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
752 release: None,
753 user: Default::default(),
754 replay_id: None,
755 environment: None,
756 transaction: None,
757 sample_rate: None,
758 sampled: None,
759 other: Default::default(),
760 });
761 envelope.add_item(Item::new(ItemType::Transaction));
762 }
763 if let Some(event_id) = event_id {
764 envelope.set_event_id(event_id);
765 }
766 envelope
767 }
768
769 fn mock_config(path: &str) -> Arc<Config> {
770 Config::from_json_value(serde_json::json!({
771 "spool": {
772 "envelopes": {
773 "path": path
774 }
775 }
776 }))
777 .unwrap()
778 .into()
779 }
780
781 fn mock_memory_checker() -> MemoryChecker {
782 MemoryChecker::new(MemoryStat::default(), mock_config("my/db/path").clone())
783 }
784
785 async fn peek_received_at(buffer: &mut EnvelopeBuffer<MemoryStackProvider>) -> DateTime<Utc> {
786 buffer.peek().await.unwrap().last_received_at().unwrap()
787 }
788
789 #[tokio::test]
790 async fn test_insert_pop() {
791 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
792
793 let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
794 let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
795 let project_key3 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
796
797 assert!(buffer.pop().await.unwrap().is_none());
798 assert!(buffer.peek().await.unwrap().is_empty());
799
800 let envelope1 = new_envelope(project_key1, None, None);
801 let time1 = envelope1.meta().received_at();
802 buffer.push(envelope1).await.unwrap();
803
804 let envelope2 = new_envelope(project_key2, None, None);
805 let time2 = envelope2.meta().received_at();
806 buffer.push(envelope2).await.unwrap();
807
808 assert_eq!(peek_received_at(&mut buffer).await, time2);
810
811 buffer.mark_ready(&project_key1, false);
812 buffer.mark_ready(&project_key2, false);
813
814 assert_eq!(peek_received_at(&mut buffer).await, time1);
816
817 let envelope3 = new_envelope(project_key3, None, None);
818 let time3 = envelope3.meta().received_at();
819 buffer.push(envelope3).await.unwrap();
820 buffer.mark_ready(&project_key3, false);
821
822 assert_eq!(peek_received_at(&mut buffer).await, time1);
824
825 buffer.mark_ready(&project_key3, true);
827 assert_eq!(peek_received_at(&mut buffer).await, time3);
828 assert_eq!(
829 buffer.pop().await.unwrap().unwrap().meta().public_key(),
830 project_key3
831 );
832
833 assert_eq!(peek_received_at(&mut buffer).await, time1);
835
836 buffer.mark_ready(&project_key1, true);
838 assert_eq!(peek_received_at(&mut buffer).await, time1);
839
840 buffer.mark_ready(&project_key2, true);
842 assert_eq!(peek_received_at(&mut buffer).await, time2);
843 assert_eq!(
844 buffer.pop().await.unwrap().unwrap().meta().public_key(),
845 project_key2
846 );
847
848 assert_eq!(
850 buffer.pop().await.unwrap().unwrap().meta().public_key(),
851 project_key1
852 );
853 assert!(buffer.pop().await.unwrap().is_none());
854 assert!(buffer.peek().await.unwrap().is_empty());
855 }
856
857 #[tokio::test]
858 async fn test_project_internal_order() {
859 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
860
861 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
862
863 let envelope1 = new_envelope(project_key, None, None);
864 let time1 = envelope1.meta().received_at();
865 let envelope2 = new_envelope(project_key, None, None);
866 let time2 = envelope2.meta().received_at();
867
868 assert!(time2 > time1);
869
870 buffer.push(envelope1).await.unwrap();
871 buffer.push(envelope2).await.unwrap();
872
873 assert_eq!(
874 buffer.pop().await.unwrap().unwrap().meta().received_at(),
875 time2
876 );
877 assert_eq!(
878 buffer.pop().await.unwrap().unwrap().meta().received_at(),
879 time1
880 );
881 assert!(buffer.pop().await.unwrap().is_none());
882 }
883
884 #[tokio::test]
885 async fn test_sampling_projects() {
886 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
887
888 let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
889 let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
890
891 let envelope1 = new_envelope(project_key1, None, None);
892 let time1 = envelope1.received_at();
893 buffer.push(envelope1).await.unwrap();
894
895 let envelope2 = new_envelope(project_key2, None, None);
896 let time2 = envelope2.received_at();
897 buffer.push(envelope2).await.unwrap();
898
899 let envelope3 = new_envelope(project_key1, Some(project_key2), None);
900 let time3 = envelope3.meta().received_at();
901 buffer.push(envelope3).await.unwrap();
902
903 buffer.mark_ready(&project_key1, false);
904 buffer.mark_ready(&project_key2, false);
905
906 assert_eq!(
908 buffer.peek().await.unwrap().last_received_at().unwrap(),
909 time1
910 );
911
912 buffer.mark_ready(&project_key2, true);
914 assert_eq!(
915 buffer.peek().await.unwrap().last_received_at().unwrap(),
916 time2
917 );
918
919 buffer.mark_ready(&project_key2, false);
921 assert_eq!(
922 buffer.peek().await.unwrap().last_received_at().unwrap(),
923 time1
924 );
925
926 buffer.mark_ready(&project_key1, true);
928 assert_eq!(
929 buffer.peek().await.unwrap().last_received_at().unwrap(),
930 time1
931 );
932
933 buffer.mark_ready(&project_key2, true);
935 assert_eq!(
936 buffer.pop().await.unwrap().unwrap().meta().received_at(),
937 time3
938 );
939 assert_eq!(
940 buffer.peek().await.unwrap().last_received_at().unwrap(),
941 time2
942 );
943
944 buffer.mark_ready(&project_key2, false);
945 assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time1);
946 assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time2);
947
948 assert!(buffer.pop().await.unwrap().is_none());
949 }
950
951 #[tokio::test]
952 async fn test_project_keys_distinct() {
953 let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
954 let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
955
956 let project_key_pair1 = ProjectKeyPair::new(project_key1, project_key2);
957 let project_key_pair2 = ProjectKeyPair::new(project_key2, project_key1);
958
959 assert_ne!(project_key_pair1, project_key_pair2);
960
961 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
962 buffer
963 .push(new_envelope(project_key1, Some(project_key2), None))
964 .await
965 .unwrap();
966 buffer
967 .push(new_envelope(project_key2, Some(project_key1), None))
968 .await
969 .unwrap();
970 assert_eq!(buffer.priority_queue.len(), 2);
971 }
972
973 #[test]
974 fn test_total_order() {
975 let p1 = Priority {
976 readiness: Readiness {
977 own_project_ready: true,
978 sampling_project_ready: true,
979 },
980 received_at: Utc::now(),
981 next_project_fetch: Instant::now(),
982 };
983 let mut p2 = p1.clone();
984 p2.next_project_fetch += Duration::from_millis(1);
985
986 assert_eq!(p1.cmp(&p2), Ordering::Equal);
988 assert_eq!(p1, p2);
989 }
990
991 #[tokio::test]
992 async fn test_last_peek_internal_order() {
993 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
994
995 let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
996 let event_id_1 = EventId::new();
997 let envelope1 = new_envelope(project_key_1, None, Some(event_id_1));
998 let time1 = envelope1.received_at();
999
1000 let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap();
1001 let event_id_2 = EventId::new();
1002 let envelope2 = new_envelope(project_key_2, None, Some(event_id_2));
1003 let time2 = envelope2.received_at();
1004
1005 buffer.push(envelope1).await.unwrap();
1006 buffer.push(envelope2).await.unwrap();
1007
1008 buffer.mark_ready(&project_key_1, false);
1009 buffer.mark_ready(&project_key_2, false);
1010
1011 let Peek::NotReady {
1013 last_received_at, ..
1014 } = buffer.peek().await.unwrap()
1015 else {
1016 panic!();
1017 };
1018 assert_eq!(last_received_at, time1);
1019
1020 let Peek::NotReady {
1022 last_received_at,
1023 project_key_pair,
1024 ..
1025 } = buffer.peek().await.unwrap()
1026 else {
1027 panic!();
1028 };
1029 assert_eq!(last_received_at, time1);
1030 assert_ne!(last_received_at, time2);
1031
1032 buffer.mark_seen(&project_key_pair, Duration::ZERO);
1033
1034 let Peek::NotReady {
1036 last_received_at, ..
1037 } = buffer.peek().await.unwrap()
1038 else {
1039 panic!();
1040 };
1041 assert_eq!(last_received_at, time2);
1042 assert_ne!(last_received_at, time1);
1043
1044 let Peek::NotReady {
1045 last_received_at,
1046 project_key_pair,
1047 ..
1048 } = buffer.peek().await.unwrap()
1049 else {
1050 panic!();
1051 };
1052 assert_eq!(last_received_at, time2);
1053 assert_ne!(last_received_at, time1);
1054
1055 buffer.mark_seen(&project_key_pair, Duration::ZERO);
1056
1057 let Peek::NotReady {
1059 last_received_at, ..
1060 } = buffer.peek().await.unwrap()
1061 else {
1062 panic!();
1063 };
1064 assert_eq!(last_received_at, time1);
1065 assert_ne!(last_received_at, time2);
1066 }
1067
1068 #[tokio::test]
1069 async fn test_initialize_buffer() {
1070 let path = std::env::temp_dir()
1071 .join(Uuid::new_v4().to_string())
1072 .into_os_string()
1073 .into_string()
1074 .unwrap();
1075 let config = mock_config(&path);
1076 let mut store = SqliteEnvelopeStore::prepare(0, &config).await.unwrap();
1077 let mut buffer = EnvelopeBuffer::<SqliteStackProvider>::new(0, &config)
1078 .await
1079 .unwrap();
1080
1081 let envelopes = mock_envelopes(10);
1084 assert!(
1085 store
1086 .insert_batch(
1087 envelopes
1088 .into_iter()
1089 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
1090 .collect::<Vec<_>>()
1091 .try_into()
1092 .unwrap()
1093 )
1094 .await
1095 .is_ok()
1096 );
1097
1098 assert!(buffer.priority_queue.is_empty());
1100 assert!(buffer.stacks_by_project.is_empty());
1101
1102 buffer.initialize().await;
1103
1104 assert_eq!(buffer.priority_queue.len(), 1);
1107 assert_eq!(buffer.stacks_by_project.len(), 2);
1110 }
1111}