1use std::cmp::Ordering;
2use std::collections::BTreeSet;
3use std::convert::Infallible;
4use std::error::Error;
5use std::mem;
6use std::time::Duration;
7
8use chrono::{DateTime, Utc};
9use hashbrown::HashSet;
10use relay_base_schema::project::ProjectKey;
11use relay_config::Config;
12use tokio::time::{timeout, Instant};
13
14use crate::envelope::Envelope;
15use crate::envelope::Item;
16use crate::services::buffer::common::ProjectKeyPair;
17use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
18use crate::services::buffer::envelope_stack::EnvelopeStack;
19use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError;
20use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
21use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
22use crate::services::buffer::stack_provider::{StackCreationType, StackProvider};
23use crate::statsd::{RelayGauges, RelayHistograms, RelayTimers};
24use crate::utils::MemoryChecker;
25
26#[derive(Debug)]
34#[allow(private_interfaces)]
35pub enum PolymorphicEnvelopeBuffer {
36 InMemory(EnvelopeBuffer<MemoryStackProvider>),
38 Sqlite(EnvelopeBuffer<SqliteStackProvider>),
40}
41
42impl PolymorphicEnvelopeBuffer {
43 pub fn is_memory(&self) -> bool {
45 match self {
46 PolymorphicEnvelopeBuffer::InMemory(_) => true,
47 PolymorphicEnvelopeBuffer::Sqlite(_) => false,
48 }
49 }
50
51 pub async fn from_config(
54 partition_id: u8,
55 config: &Config,
56 memory_checker: MemoryChecker,
57 ) -> Result<Self, EnvelopeBufferError> {
58 let buffer = if config.spool_envelopes_path(partition_id).is_some() {
59 relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer");
60 let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(partition_id, config).await?;
61 Self::Sqlite(buffer)
62 } else {
63 relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer");
64 let buffer = EnvelopeBuffer::<MemoryStackProvider>::new(partition_id, memory_checker);
65 Self::InMemory(buffer)
66 };
67
68 Ok(buffer)
69 }
70
71 pub async fn initialize(&mut self) {
73 match self {
74 PolymorphicEnvelopeBuffer::InMemory(buffer) => buffer.initialize().await,
75 PolymorphicEnvelopeBuffer::Sqlite(buffer) => buffer.initialize().await,
76 }
77 }
78
79 pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
81 relay_statsd::metric!(
82 histogram(RelayHistograms::BufferEnvelopeBodySize) =
83 envelope.items().map(Item::len).sum::<usize>() as u64,
84 partition_id = self.partition_tag()
85 );
86
87 relay_statsd::metric!(
88 timer(RelayTimers::BufferPush),
89 partition_id = self.partition_tag(),
90 {
91 match self {
92 Self::Sqlite(buffer) => buffer.push(envelope).await,
93 Self::InMemory(buffer) => buffer.push(envelope).await,
94 }?;
95 }
96 );
97 Ok(())
98 }
99
100 pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
102 relay_statsd::metric!(
103 timer(RelayTimers::BufferPeek),
104 partition_id = self.partition_tag(),
105 {
106 match self {
107 Self::Sqlite(buffer) => buffer.peek().await,
108 Self::InMemory(buffer) => buffer.peek().await,
109 }
110 }
111 )
112 }
113
114 pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
116 let envelope = relay_statsd::metric!(
117 timer(RelayTimers::BufferPop),
118 partition_id = self.partition_tag(),
119 {
120 match self {
121 Self::Sqlite(buffer) => buffer.pop().await,
122 Self::InMemory(buffer) => buffer.pop().await,
123 }?
124 }
125 );
126 Ok(envelope)
127 }
128
129 pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
134 relay_log::trace!(
135 project_key = project.as_str(),
136 "buffer marked {}",
137 if is_ready { "ready" } else { "not ready" }
138 );
139 match self {
140 Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready),
141 Self::InMemory(buffer) => buffer.mark_ready(project, is_ready),
142 }
143 }
144
145 pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
151 match self {
152 Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
153 Self::InMemory(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
154 }
155 }
156
157 pub fn has_capacity(&self) -> bool {
159 match self {
160 Self::Sqlite(buffer) => buffer.has_capacity(),
161 Self::InMemory(buffer) => buffer.has_capacity(),
162 }
163 }
164
165 pub fn item_count(&self) -> u64 {
168 match self {
169 Self::Sqlite(buffer) => buffer.tracked_count,
170 Self::InMemory(buffer) => buffer.tracked_count,
171 }
172 }
173
174 pub fn total_size(&self) -> Option<u64> {
177 match self {
178 Self::Sqlite(buffer) => buffer.stack_provider.total_size(),
179 Self::InMemory(buffer) => buffer.stack_provider.total_size(),
180 }
181 }
182
183 pub async fn shutdown(&mut self) -> bool {
185 let Self::Sqlite(buffer) = self else {
189 relay_log::trace!("PolymorphicEnvelopeBuffer: shutdown procedure not needed");
190 return false;
191 };
192 buffer.flush().await;
193
194 true
195 }
196
197 fn partition_tag(&self) -> &str {
199 match self {
200 PolymorphicEnvelopeBuffer::InMemory(buffer) => &buffer.partition_tag,
201 PolymorphicEnvelopeBuffer::Sqlite(buffer) => &buffer.partition_tag,
202 }
203 }
204}
205
206#[derive(Debug, thiserror::Error)]
208pub enum EnvelopeBufferError {
209 #[error("sqlite")]
210 SqliteStore(#[from] SqliteEnvelopeStoreError),
211
212 #[error("sqlite")]
213 SqliteStack(#[from] SqliteEnvelopeStackError),
214
215 #[error("failed to push envelope to the buffer")]
216 PushFailed,
217}
218
219impl From<Infallible> for EnvelopeBufferError {
220 fn from(value: Infallible) -> Self {
221 match value {}
222 }
223}
224
225#[derive(Debug)]
230struct EnvelopeBuffer<P: StackProvider> {
231 priority_queue: priority_queue::PriorityQueue<QueueItem<ProjectKeyPair, P::Stack>, Priority>,
233 stacks_by_project: hashbrown::HashMap<ProjectKey, BTreeSet<ProjectKeyPair>>,
235 stack_provider: P,
240 total_count: i64,
247 tracked_count: u64,
253 total_count_initialized: bool,
258 partition_tag: String,
260}
261
262impl EnvelopeBuffer<MemoryStackProvider> {
263 pub fn new(partition_id: u8, memory_checker: MemoryChecker) -> Self {
265 Self {
266 stacks_by_project: Default::default(),
267 priority_queue: Default::default(),
268 stack_provider: MemoryStackProvider::new(memory_checker),
269 total_count: 0,
270 tracked_count: 0,
271 total_count_initialized: false,
272 partition_tag: partition_id.to_string(),
273 }
274 }
275}
276
277#[allow(dead_code)]
278impl EnvelopeBuffer<SqliteStackProvider> {
279 pub async fn new(partition_id: u8, config: &Config) -> Result<Self, EnvelopeBufferError> {
281 Ok(Self {
282 stacks_by_project: Default::default(),
283 priority_queue: Default::default(),
284 stack_provider: SqliteStackProvider::new(partition_id, config).await?,
285 total_count: 0,
286 tracked_count: 0,
287 total_count_initialized: false,
288 partition_tag: partition_id.to_string(),
289 })
290 }
291}
292
293impl<P: StackProvider> EnvelopeBuffer<P>
294where
295 EnvelopeBufferError: From<<P::Stack as EnvelopeStack>::Error>,
296{
297 pub async fn initialize(&mut self) {
300 relay_statsd::metric!(
301 timer(RelayTimers::BufferInitialization),
302 partition_id = &self.partition_tag,
303 {
304 let initialization_state = self.stack_provider.initialize().await;
305 self.load_stacks(initialization_state.project_key_pairs)
306 .await;
307 self.load_store_total_count().await;
308 }
309 );
310 }
311
312 pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
317 let received_at = envelope.received_at();
318
319 let project_key_pair = ProjectKeyPair::from_envelope(&envelope);
320 if let Some((
321 QueueItem {
322 key: _,
323 value: stack,
324 },
325 _,
326 )) = self.priority_queue.get_mut(&project_key_pair)
327 {
328 stack.push(envelope).await?;
329 } else {
330 self.push_stack(
333 StackCreationType::New,
334 ProjectKeyPair::from_envelope(&envelope),
335 Some(envelope),
336 )
337 .await?;
338 }
339 self.priority_queue
340 .change_priority_by(&project_key_pair, |prio| {
341 prio.received_at = received_at;
342 });
343
344 self.total_count += 1;
345 self.tracked_count += 1;
346 self.track_total_count();
347
348 Ok(())
349 }
350
351 pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
353 let Some((
354 QueueItem {
355 key: project_key_pair,
356 value: stack,
357 },
358 Priority {
359 readiness,
360 next_project_fetch,
361 ..
362 },
363 )) = self.priority_queue.peek_mut()
364 else {
365 return Ok(Peek::Empty);
366 };
367
368 let ready = readiness.ready();
369
370 Ok(match (stack.peek().await?, ready) {
371 (None, _) => Peek::Empty,
372 (Some(last_received_at), true) => Peek::Ready {
373 project_key_pair: *project_key_pair,
374 last_received_at,
375 },
376 (Some(last_received_at), false) => Peek::NotReady {
377 project_key_pair: *project_key_pair,
378 next_project_fetch: *next_project_fetch,
379 last_received_at,
380 },
381 })
382 }
383
384 pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
389 let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else {
390 return Ok(None);
391 };
392 let project_key_pair = *key;
393 let envelope = stack.pop().await?.expect("found an empty stack");
394
395 let last_received_at = stack.peek().await?;
396
397 match last_received_at {
398 None => {
399 self.pop_stack(project_key_pair);
400 }
401 Some(last_received_at) => {
402 self.priority_queue
403 .change_priority_by(&project_key_pair, |prio| {
404 prio.received_at = last_received_at;
405 });
406 }
407 }
408
409 self.total_count -= 1;
413 self.tracked_count = self.tracked_count.saturating_sub(1);
414 self.track_total_count();
415
416 Ok(Some(envelope))
417 }
418
419 pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
423 let mut changed = false;
424 if let Some(project_key_pairs) = self.stacks_by_project.get(project) {
425 for project_key_pair in project_key_pairs {
426 self.priority_queue
427 .change_priority_by(project_key_pair, |stack| {
428 let mut found = false;
429 for (subkey, readiness) in [
430 (
431 project_key_pair.own_key,
432 &mut stack.readiness.own_project_ready,
433 ),
434 (
435 project_key_pair.sampling_key,
436 &mut stack.readiness.sampling_project_ready,
437 ),
438 ] {
439 if subkey == *project {
440 found = true;
441 if *readiness != is_ready {
442 changed = true;
443 *readiness = is_ready;
444 }
445 }
446 }
447 debug_assert!(found);
448 });
449 }
450 }
451
452 changed
453 }
454
455 pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
461 self.priority_queue
462 .change_priority_by(project_key_pair, |stack| {
463 stack.next_project_fetch = Instant::now() + next_fetch;
466 });
467 }
468
469 pub fn has_capacity(&self) -> bool {
471 self.stack_provider.has_store_capacity()
472 }
473
474 pub async fn flush(&mut self) {
476 let priority_queue = mem::take(&mut self.priority_queue);
477 self.stack_provider
478 .flush(priority_queue.into_iter().map(|(q, _)| q.value))
479 .await;
480 }
481
482 async fn push_stack(
484 &mut self,
485 stack_creation_type: StackCreationType,
486 project_key_pair: ProjectKeyPair,
487 envelope: Option<Box<Envelope>>,
488 ) -> Result<(), EnvelopeBufferError> {
489 let received_at = envelope.as_ref().map_or(Utc::now(), |e| e.received_at());
490
491 let mut stack = self
492 .stack_provider
493 .create_stack(stack_creation_type, project_key_pair);
494 if let Some(envelope) = envelope {
495 stack.push(envelope).await?;
496 }
497
498 let previous_entry = self.priority_queue.push(
499 QueueItem {
500 key: project_key_pair,
501 value: stack,
502 },
503 Priority::new(received_at),
504 );
505 debug_assert!(previous_entry.is_none());
506 for project_key in project_key_pair.iter() {
507 self.stacks_by_project
508 .entry(project_key)
509 .or_default()
510 .insert(project_key_pair);
511 }
512 relay_statsd::metric!(
513 gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
514 partition_id = &self.partition_tag
515 );
516
517 Ok(())
518 }
519
520 fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) {
522 for project_key in project_key_pair.iter() {
523 self.stacks_by_project
524 .get_mut(&project_key)
525 .expect("project_key is missing from lookup")
526 .remove(&project_key_pair);
527 }
528 self.priority_queue.remove(&project_key_pair);
529
530 relay_statsd::metric!(
531 gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
532 partition_id = &self.partition_tag
533 );
534 }
535
536 async fn load_stacks(&mut self, project_key_pairs: HashSet<ProjectKeyPair>) {
538 for project_key_pair in project_key_pairs {
539 self.push_stack(StackCreationType::Initialization, project_key_pair, None)
540 .await
541 .expect("Pushing an empty stack raised an error");
542 }
543 }
544
545 async fn load_store_total_count(&mut self) {
551 let total_count = timeout(Duration::from_secs(1), async {
552 self.stack_provider.store_total_count().await
553 })
554 .await;
555 match total_count {
556 Ok(total_count) => {
557 self.total_count = total_count as i64;
558 self.total_count_initialized = true;
559 }
560 Err(error) => {
561 self.total_count_initialized = false;
562 relay_log::error!(
563 error = &error as &dyn Error,
564 "failed to load the total envelope count of the store",
565 );
566 }
567 };
568 self.track_total_count();
569 }
570
571 fn track_total_count(&self) {
573 let total_count = self.total_count as f64;
574 let initialized = match self.total_count_initialized {
575 true => "true",
576 false => "false",
577 };
578 relay_statsd::metric!(
579 histogram(RelayHistograms::BufferEnvelopesCount) = total_count,
580 initialized = initialized,
581 stack_type = self.stack_provider.stack_type(),
582 partition_id = &self.partition_tag
583 );
584 }
585}
586
587pub enum Peek {
589 Empty,
590 Ready {
591 project_key_pair: ProjectKeyPair,
592 last_received_at: DateTime<Utc>,
593 },
594 NotReady {
595 project_key_pair: ProjectKeyPair,
596 next_project_fetch: Instant,
597 last_received_at: DateTime<Utc>,
598 },
599}
600
601impl Peek {
602 pub fn last_received_at(&self) -> Option<DateTime<Utc>> {
603 match self {
604 Self::Empty => None,
605 Self::Ready {
606 last_received_at, ..
607 }
608 | Self::NotReady {
609 last_received_at, ..
610 } => Some(*last_received_at),
611 }
612 }
613}
614
615#[derive(Debug)]
616struct QueueItem<K, V> {
617 key: K,
618 value: V,
619}
620
621impl<K, V> std::borrow::Borrow<K> for QueueItem<K, V> {
622 fn borrow(&self) -> &K {
623 &self.key
624 }
625}
626
627impl<K: std::hash::Hash, V> std::hash::Hash for QueueItem<K, V> {
628 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
629 self.key.hash(state);
630 }
631}
632
633impl<K: PartialEq, V> PartialEq for QueueItem<K, V> {
634 fn eq(&self, other: &Self) -> bool {
635 self.key == other.key
636 }
637}
638
639impl<K: PartialEq, V> Eq for QueueItem<K, V> {}
640
641#[derive(Debug, Clone)]
642struct Priority {
643 readiness: Readiness,
644 received_at: DateTime<Utc>,
645 next_project_fetch: Instant,
646}
647
648impl Priority {
649 fn new(received_at: DateTime<Utc>) -> Self {
650 Self {
651 readiness: Readiness::new(),
652 received_at,
653 next_project_fetch: Instant::now(),
654 }
655 }
656}
657
658impl Ord for Priority {
659 fn cmp(&self, other: &Self) -> Ordering {
660 match (self.readiness.ready(), other.readiness.ready()) {
661 (true, true) => self.received_at.cmp(&other.received_at),
665 (true, false) => Ordering::Greater,
666 (false, true) => Ordering::Less,
667 (false, false) => self
670 .next_project_fetch
671 .cmp(&other.next_project_fetch)
672 .reverse()
673 .then(self.received_at.cmp(&other.received_at).reverse()),
674 }
675 }
676}
677
678impl PartialOrd for Priority {
679 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
680 Some(self.cmp(other))
681 }
682}
683
684impl PartialEq for Priority {
685 fn eq(&self, other: &Self) -> bool {
686 self.cmp(other).is_eq()
687 }
688}
689
690impl Eq for Priority {}
691
692#[derive(Debug, Clone, Copy)]
693struct Readiness {
694 own_project_ready: bool,
695 sampling_project_ready: bool,
696}
697
698impl Readiness {
699 fn new() -> Self {
700 Self {
703 own_project_ready: true,
704 sampling_project_ready: true,
705 }
706 }
707
708 fn ready(&self) -> bool {
709 self.own_project_ready && self.sampling_project_ready
710 }
711}
712
713#[cfg(test)]
714mod tests {
715 use relay_common::Dsn;
716 use relay_event_schema::protocol::EventId;
717 use relay_sampling::DynamicSamplingContext;
718 use std::str::FromStr;
719 use std::sync::Arc;
720 use uuid::Uuid;
721
722 use crate::envelope::{Item, ItemType};
723 use crate::extractors::RequestMeta;
724 use crate::services::buffer::common::ProjectKeyPair;
725 use crate::services::buffer::envelope_store::sqlite::DatabaseEnvelope;
726 use crate::services::buffer::testutils::utils::mock_envelopes;
727 use crate::utils::MemoryStat;
728 use crate::SqliteEnvelopeStore;
729
730 use super::*;
731
732 impl Peek {
733 fn is_empty(&self) -> bool {
734 matches!(self, Peek::Empty)
735 }
736 }
737
738 fn new_envelope(
739 own_key: ProjectKey,
740 sampling_key: Option<ProjectKey>,
741 event_id: Option<EventId>,
742 ) -> Box<Envelope> {
743 let mut envelope = Envelope::from_request(
744 None,
745 RequestMeta::new(Dsn::from_str(&format!("http://{own_key}@localhost/1")).unwrap()),
746 );
747 if let Some(sampling_key) = sampling_key {
748 envelope.set_dsc(DynamicSamplingContext {
749 public_key: sampling_key,
750 trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
751 release: None,
752 user: Default::default(),
753 replay_id: None,
754 environment: None,
755 transaction: None,
756 sample_rate: None,
757 sampled: None,
758 other: Default::default(),
759 });
760 envelope.add_item(Item::new(ItemType::Transaction));
761 }
762 if let Some(event_id) = event_id {
763 envelope.set_event_id(event_id);
764 }
765 envelope
766 }
767
768 fn mock_config(path: &str) -> Arc<Config> {
769 Config::from_json_value(serde_json::json!({
770 "spool": {
771 "envelopes": {
772 "path": path
773 }
774 }
775 }))
776 .unwrap()
777 .into()
778 }
779
780 fn mock_memory_checker() -> MemoryChecker {
781 MemoryChecker::new(MemoryStat::default(), mock_config("my/db/path").clone())
782 }
783
784 async fn peek_received_at(buffer: &mut EnvelopeBuffer<MemoryStackProvider>) -> DateTime<Utc> {
785 buffer.peek().await.unwrap().last_received_at().unwrap()
786 }
787
788 #[tokio::test]
789 async fn test_insert_pop() {
790 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
791
792 let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
793 let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
794 let project_key3 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
795
796 assert!(buffer.pop().await.unwrap().is_none());
797 assert!(buffer.peek().await.unwrap().is_empty());
798
799 let envelope1 = new_envelope(project_key1, None, None);
800 let time1 = envelope1.meta().received_at();
801 buffer.push(envelope1).await.unwrap();
802
803 let envelope2 = new_envelope(project_key2, None, None);
804 let time2 = envelope2.meta().received_at();
805 buffer.push(envelope2).await.unwrap();
806
807 assert_eq!(peek_received_at(&mut buffer).await, time2);
809
810 buffer.mark_ready(&project_key1, false);
811 buffer.mark_ready(&project_key2, false);
812
813 assert_eq!(peek_received_at(&mut buffer).await, time1);
815
816 let envelope3 = new_envelope(project_key3, None, None);
817 let time3 = envelope3.meta().received_at();
818 buffer.push(envelope3).await.unwrap();
819 buffer.mark_ready(&project_key3, false);
820
821 assert_eq!(peek_received_at(&mut buffer).await, time1);
823
824 buffer.mark_ready(&project_key3, true);
826 assert_eq!(peek_received_at(&mut buffer).await, time3);
827 assert_eq!(
828 buffer.pop().await.unwrap().unwrap().meta().public_key(),
829 project_key3
830 );
831
832 assert_eq!(peek_received_at(&mut buffer).await, time1);
834
835 buffer.mark_ready(&project_key1, true);
837 assert_eq!(peek_received_at(&mut buffer).await, time1);
838
839 buffer.mark_ready(&project_key2, true);
841 assert_eq!(peek_received_at(&mut buffer).await, time2);
842 assert_eq!(
843 buffer.pop().await.unwrap().unwrap().meta().public_key(),
844 project_key2
845 );
846
847 assert_eq!(
849 buffer.pop().await.unwrap().unwrap().meta().public_key(),
850 project_key1
851 );
852 assert!(buffer.pop().await.unwrap().is_none());
853 assert!(buffer.peek().await.unwrap().is_empty());
854 }
855
856 #[tokio::test]
857 async fn test_project_internal_order() {
858 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
859
860 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
861
862 let envelope1 = new_envelope(project_key, None, None);
863 let time1 = envelope1.meta().received_at();
864 let envelope2 = new_envelope(project_key, None, None);
865 let time2 = envelope2.meta().received_at();
866
867 assert!(time2 > time1);
868
869 buffer.push(envelope1).await.unwrap();
870 buffer.push(envelope2).await.unwrap();
871
872 assert_eq!(
873 buffer.pop().await.unwrap().unwrap().meta().received_at(),
874 time2
875 );
876 assert_eq!(
877 buffer.pop().await.unwrap().unwrap().meta().received_at(),
878 time1
879 );
880 assert!(buffer.pop().await.unwrap().is_none());
881 }
882
883 #[tokio::test]
884 async fn test_sampling_projects() {
885 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
886
887 let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
888 let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
889
890 let envelope1 = new_envelope(project_key1, None, None);
891 let time1 = envelope1.received_at();
892 buffer.push(envelope1).await.unwrap();
893
894 let envelope2 = new_envelope(project_key2, None, None);
895 let time2 = envelope2.received_at();
896 buffer.push(envelope2).await.unwrap();
897
898 let envelope3 = new_envelope(project_key1, Some(project_key2), None);
899 let time3 = envelope3.meta().received_at();
900 buffer.push(envelope3).await.unwrap();
901
902 buffer.mark_ready(&project_key1, false);
903 buffer.mark_ready(&project_key2, false);
904
905 assert_eq!(
907 buffer.peek().await.unwrap().last_received_at().unwrap(),
908 time1
909 );
910
911 buffer.mark_ready(&project_key2, true);
913 assert_eq!(
914 buffer.peek().await.unwrap().last_received_at().unwrap(),
915 time2
916 );
917
918 buffer.mark_ready(&project_key2, false);
920 assert_eq!(
921 buffer.peek().await.unwrap().last_received_at().unwrap(),
922 time1
923 );
924
925 buffer.mark_ready(&project_key1, true);
927 assert_eq!(
928 buffer.peek().await.unwrap().last_received_at().unwrap(),
929 time1
930 );
931
932 buffer.mark_ready(&project_key2, true);
934 assert_eq!(
935 buffer.pop().await.unwrap().unwrap().meta().received_at(),
936 time3
937 );
938 assert_eq!(
939 buffer.peek().await.unwrap().last_received_at().unwrap(),
940 time2
941 );
942
943 buffer.mark_ready(&project_key2, false);
944 assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time1);
945 assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time2);
946
947 assert!(buffer.pop().await.unwrap().is_none());
948 }
949
950 #[tokio::test]
951 async fn test_project_keys_distinct() {
952 let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
953 let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
954
955 let project_key_pair1 = ProjectKeyPair::new(project_key1, project_key2);
956 let project_key_pair2 = ProjectKeyPair::new(project_key2, project_key1);
957
958 assert_ne!(project_key_pair1, project_key_pair2);
959
960 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
961 buffer
962 .push(new_envelope(project_key1, Some(project_key2), None))
963 .await
964 .unwrap();
965 buffer
966 .push(new_envelope(project_key2, Some(project_key1), None))
967 .await
968 .unwrap();
969 assert_eq!(buffer.priority_queue.len(), 2);
970 }
971
972 #[test]
973 fn test_total_order() {
974 let p1 = Priority {
975 readiness: Readiness {
976 own_project_ready: true,
977 sampling_project_ready: true,
978 },
979 received_at: Utc::now(),
980 next_project_fetch: Instant::now(),
981 };
982 let mut p2 = p1.clone();
983 p2.next_project_fetch += Duration::from_millis(1);
984
985 assert_eq!(p1.cmp(&p2), Ordering::Equal);
987 assert_eq!(p1, p2);
988 }
989
990 #[tokio::test]
991 async fn test_last_peek_internal_order() {
992 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
993
994 let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
995 let event_id_1 = EventId::new();
996 let envelope1 = new_envelope(project_key_1, None, Some(event_id_1));
997 let time1 = envelope1.received_at();
998
999 let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap();
1000 let event_id_2 = EventId::new();
1001 let envelope2 = new_envelope(project_key_2, None, Some(event_id_2));
1002 let time2 = envelope2.received_at();
1003
1004 buffer.push(envelope1).await.unwrap();
1005 buffer.push(envelope2).await.unwrap();
1006
1007 buffer.mark_ready(&project_key_1, false);
1008 buffer.mark_ready(&project_key_2, false);
1009
1010 let Peek::NotReady {
1012 last_received_at, ..
1013 } = buffer.peek().await.unwrap()
1014 else {
1015 panic!();
1016 };
1017 assert_eq!(last_received_at, time1);
1018
1019 let Peek::NotReady {
1021 last_received_at,
1022 project_key_pair,
1023 ..
1024 } = buffer.peek().await.unwrap()
1025 else {
1026 panic!();
1027 };
1028 assert_eq!(last_received_at, time1);
1029 assert_ne!(last_received_at, time2);
1030
1031 buffer.mark_seen(&project_key_pair, Duration::ZERO);
1032
1033 let Peek::NotReady {
1035 last_received_at, ..
1036 } = buffer.peek().await.unwrap()
1037 else {
1038 panic!();
1039 };
1040 assert_eq!(last_received_at, time2);
1041 assert_ne!(last_received_at, time1);
1042
1043 let Peek::NotReady {
1044 last_received_at,
1045 project_key_pair,
1046 ..
1047 } = buffer.peek().await.unwrap()
1048 else {
1049 panic!();
1050 };
1051 assert_eq!(last_received_at, time2);
1052 assert_ne!(last_received_at, time1);
1053
1054 buffer.mark_seen(&project_key_pair, Duration::ZERO);
1055
1056 let Peek::NotReady {
1058 last_received_at, ..
1059 } = buffer.peek().await.unwrap()
1060 else {
1061 panic!();
1062 };
1063 assert_eq!(last_received_at, time1);
1064 assert_ne!(last_received_at, time2);
1065 }
1066
1067 #[tokio::test]
1068 async fn test_initialize_buffer() {
1069 let path = std::env::temp_dir()
1070 .join(Uuid::new_v4().to_string())
1071 .into_os_string()
1072 .into_string()
1073 .unwrap();
1074 let config = mock_config(&path);
1075 let mut store = SqliteEnvelopeStore::prepare(0, &config).await.unwrap();
1076 let mut buffer = EnvelopeBuffer::<SqliteStackProvider>::new(0, &config)
1077 .await
1078 .unwrap();
1079
1080 let envelopes = mock_envelopes(10);
1083 assert!(store
1084 .insert_batch(
1085 envelopes
1086 .into_iter()
1087 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
1088 .collect::<Vec<_>>()
1089 .try_into()
1090 .unwrap()
1091 )
1092 .await
1093 .is_ok());
1094
1095 assert!(buffer.priority_queue.is_empty());
1097 assert!(buffer.stacks_by_project.is_empty());
1098
1099 buffer.initialize().await;
1100
1101 assert_eq!(buffer.priority_queue.len(), 1);
1104 assert_eq!(buffer.stacks_by_project.len(), 2);
1107 }
1108}