1use std::cmp::Ordering;
2use std::collections::BTreeSet;
3use std::convert::Infallible;
4use std::error::Error;
5use std::mem;
6use std::time::Duration;
7
8use chrono::{DateTime, Utc};
9use hashbrown::HashSet;
10use relay_base_schema::project::ProjectKey;
11use relay_config::Config;
12use tokio::time::{Instant, timeout};
13
14use crate::envelope::Envelope;
15use crate::envelope::Item;
16use crate::services::buffer::common::ProjectKeyPair;
17use crate::services::buffer::envelope_stack::EnvelopeStack;
18use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
19use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError;
20use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
21use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
22use crate::services::buffer::stack_provider::{StackCreationType, StackProvider};
23use crate::statsd::{RelayDistributions, RelayGauges, RelayTimers};
24use crate::utils::MemoryChecker;
25
26#[derive(Debug)]
34#[allow(private_interfaces)]
35pub enum PolymorphicEnvelopeBuffer {
36 InMemory(EnvelopeBuffer<MemoryStackProvider>),
38 Sqlite(EnvelopeBuffer<SqliteStackProvider>),
40}
41
42impl PolymorphicEnvelopeBuffer {
43 pub fn is_memory(&self) -> bool {
45 match self {
46 PolymorphicEnvelopeBuffer::InMemory(_) => true,
47 PolymorphicEnvelopeBuffer::Sqlite(_) => false,
48 }
49 }
50
51 pub async fn from_config(
54 partition_id: u8,
55 config: &Config,
56 memory_checker: MemoryChecker,
57 ) -> Result<Self, EnvelopeBufferError> {
58 let buffer = if config.spool_envelopes_path(partition_id).is_some() {
59 relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer");
60 let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(partition_id, config).await?;
61 Self::Sqlite(buffer)
62 } else {
63 relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer");
64 let buffer = EnvelopeBuffer::<MemoryStackProvider>::new(partition_id, memory_checker);
65 Self::InMemory(buffer)
66 };
67
68 Ok(buffer)
69 }
70
71 pub async fn initialize(&mut self) {
73 match self {
74 PolymorphicEnvelopeBuffer::InMemory(buffer) => buffer.initialize().await,
75 PolymorphicEnvelopeBuffer::Sqlite(buffer) => buffer.initialize().await,
76 }
77 }
78
79 pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
81 relay_statsd::metric!(
82 distribution(RelayDistributions::BufferEnvelopeBodySize, sample = 0.01) =
83 envelope.items().map(Item::len).sum::<usize>() as u64,
84 partition_id = self.partition_tag()
85 );
86
87 relay_statsd::metric!(
88 timer(RelayTimers::BufferPush, sample = 0.01),
89 partition_id = self.partition_tag(),
90 {
91 match self {
92 Self::Sqlite(buffer) => buffer.push(envelope).await,
93 Self::InMemory(buffer) => buffer.push(envelope).await,
94 }
95 }
96 )
97 }
98
99 pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
101 relay_statsd::metric!(
102 timer(RelayTimers::BufferPeek, sample = 0.01),
103 partition_id = self.partition_tag(),
104 {
105 match self {
106 Self::Sqlite(buffer) => buffer.peek().await,
107 Self::InMemory(buffer) => buffer.peek().await,
108 }
109 }
110 )
111 }
112
113 pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
115 relay_statsd::metric!(
116 timer(RelayTimers::BufferPop, sample = 0.01),
117 partition_id = self.partition_tag(),
118 {
119 match self {
120 Self::Sqlite(buffer) => buffer.pop().await,
121 Self::InMemory(buffer) => buffer.pop().await,
122 }
123 }
124 )
125 }
126
127 pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
132 relay_log::trace!(
133 project_key = project.as_str(),
134 "buffer marked {}",
135 if is_ready { "ready" } else { "not ready" }
136 );
137 match self {
138 Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready),
139 Self::InMemory(buffer) => buffer.mark_ready(project, is_ready),
140 }
141 }
142
143 pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
149 match self {
150 Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
151 Self::InMemory(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
152 }
153 }
154
155 pub fn has_capacity(&self) -> bool {
157 match self {
158 Self::Sqlite(buffer) => buffer.has_capacity(),
159 Self::InMemory(buffer) => buffer.has_capacity(),
160 }
161 }
162
163 pub fn item_count(&self) -> u64 {
166 match self {
167 Self::Sqlite(buffer) => buffer.tracked_count,
168 Self::InMemory(buffer) => buffer.tracked_count,
169 }
170 }
171
172 pub fn total_size(&self) -> Option<u64> {
175 match self {
176 Self::Sqlite(buffer) => buffer.stack_provider.total_size(),
177 Self::InMemory(buffer) => buffer.stack_provider.total_size(),
178 }
179 }
180
181 pub async fn shutdown(&mut self) -> bool {
183 let Self::Sqlite(buffer) = self else {
187 relay_log::trace!("PolymorphicEnvelopeBuffer: shutdown procedure not needed");
188 return false;
189 };
190 buffer.flush().await;
191
192 true
193 }
194
195 fn partition_tag(&self) -> &str {
197 match self {
198 PolymorphicEnvelopeBuffer::InMemory(buffer) => &buffer.partition_tag,
199 PolymorphicEnvelopeBuffer::Sqlite(buffer) => &buffer.partition_tag,
200 }
201 }
202}
203
204#[derive(Debug, thiserror::Error)]
206pub enum EnvelopeBufferError {
207 #[error("sqlite")]
208 SqliteStore(#[from] SqliteEnvelopeStoreError),
209
210 #[error("sqlite")]
211 SqliteStack(#[from] SqliteEnvelopeStackError),
212
213 #[error("failed to push envelope to the buffer")]
214 PushFailed,
215}
216
217impl From<Infallible> for EnvelopeBufferError {
218 fn from(value: Infallible) -> Self {
219 match value {}
220 }
221}
222
223#[derive(Debug)]
228struct EnvelopeBuffer<P: StackProvider> {
229 priority_queue: priority_queue::PriorityQueue<QueueItem<ProjectKeyPair, P::Stack>, Priority>,
231 stacks_by_project: hashbrown::HashMap<ProjectKey, BTreeSet<ProjectKeyPair>>,
233 stack_provider: P,
238 total_count: i64,
245 tracked_count: u64,
251 total_count_initialized: bool,
256 partition_tag: String,
258}
259
260impl EnvelopeBuffer<MemoryStackProvider> {
261 pub fn new(partition_id: u8, memory_checker: MemoryChecker) -> Self {
263 Self {
264 stacks_by_project: Default::default(),
265 priority_queue: Default::default(),
266 stack_provider: MemoryStackProvider::new(memory_checker),
267 total_count: 0,
268 tracked_count: 0,
269 total_count_initialized: false,
270 partition_tag: partition_id.to_string(),
271 }
272 }
273}
274
275#[allow(dead_code)]
276impl EnvelopeBuffer<SqliteStackProvider> {
277 pub async fn new(partition_id: u8, config: &Config) -> Result<Self, EnvelopeBufferError> {
279 Ok(Self {
280 stacks_by_project: Default::default(),
281 priority_queue: Default::default(),
282 stack_provider: SqliteStackProvider::new(partition_id, config).await?,
283 total_count: 0,
284 tracked_count: 0,
285 total_count_initialized: false,
286 partition_tag: partition_id.to_string(),
287 })
288 }
289}
290
291impl<P: StackProvider> EnvelopeBuffer<P>
292where
293 EnvelopeBufferError: From<<P::Stack as EnvelopeStack>::Error>,
294{
295 pub async fn initialize(&mut self) {
298 relay_statsd::metric!(
299 timer(RelayTimers::BufferInitialization),
300 partition_id = &self.partition_tag,
301 {
302 let initialization_state = self.stack_provider.initialize().await;
303 self.load_stacks(initialization_state.project_key_pairs)
304 .await;
305 self.load_store_total_count().await;
306 }
307 );
308 }
309
310 pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
315 let received_at = envelope.received_at();
316
317 let project_key_pair = ProjectKeyPair::from_envelope(&envelope);
318 if let Some((
319 QueueItem {
320 key: _,
321 value: stack,
322 },
323 _,
324 )) = self.priority_queue.get_mut(&project_key_pair)
325 {
326 stack.push(envelope).await?;
327 } else {
328 self.push_stack(
331 StackCreationType::New,
332 ProjectKeyPair::from_envelope(&envelope),
333 Some(envelope),
334 )
335 .await?;
336 }
337 self.priority_queue
338 .change_priority_by(&project_key_pair, |prio| {
339 prio.received_at = received_at;
340 });
341
342 self.total_count += 1;
343 self.tracked_count += 1;
344 self.track_total_count();
345
346 Ok(())
347 }
348
349 pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
351 let Some((
352 QueueItem {
353 key: project_key_pair,
354 value: stack,
355 },
356 Priority {
357 readiness,
358 next_project_fetch,
359 ..
360 },
361 )) = self.priority_queue.peek_mut()
362 else {
363 return Ok(Peek::Empty);
364 };
365
366 let ready = readiness.ready();
367
368 Ok(match (stack.peek().await?, ready) {
369 (None, _) => Peek::Empty,
370 (Some(last_received_at), true) => Peek::Ready {
371 project_key_pair: *project_key_pair,
372 last_received_at,
373 },
374 (Some(last_received_at), false) => Peek::NotReady {
375 project_key_pair: *project_key_pair,
376 next_project_fetch: *next_project_fetch,
377 last_received_at,
378 },
379 })
380 }
381
382 pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
387 let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else {
388 return Ok(None);
389 };
390 let project_key_pair = *key;
391 let envelope = stack.pop().await?.expect("found an empty stack");
392
393 let last_received_at = stack.peek().await?;
394
395 match last_received_at {
396 None => {
397 self.pop_stack(project_key_pair);
398 }
399 Some(last_received_at) => {
400 self.priority_queue
401 .change_priority_by(&project_key_pair, |prio| {
402 prio.received_at = last_received_at;
403 });
404 }
405 }
406
407 self.total_count -= 1;
411 self.tracked_count = self.tracked_count.saturating_sub(1);
412 self.track_total_count();
413
414 Ok(Some(envelope))
415 }
416
417 pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
421 let mut changed = false;
422 if let Some(project_key_pairs) = self.stacks_by_project.get(project) {
423 for project_key_pair in project_key_pairs {
424 self.priority_queue
425 .change_priority_by(project_key_pair, |stack| {
426 let mut found = false;
427 for (subkey, readiness) in [
428 (
429 project_key_pair.own_key,
430 &mut stack.readiness.own_project_ready,
431 ),
432 (
433 project_key_pair.sampling_key,
434 &mut stack.readiness.sampling_project_ready,
435 ),
436 ] {
437 if subkey == *project {
438 found = true;
439 if *readiness != is_ready {
440 changed = true;
441 *readiness = is_ready;
442 }
443 }
444 }
445 debug_assert!(found);
446 });
447 }
448 }
449
450 changed
451 }
452
453 pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
459 self.priority_queue
460 .change_priority_by(project_key_pair, |stack| {
461 stack.next_project_fetch = Instant::now() + next_fetch;
464 });
465 }
466
467 pub fn has_capacity(&self) -> bool {
469 self.stack_provider.has_store_capacity()
470 }
471
472 pub async fn flush(&mut self) {
474 let priority_queue = mem::take(&mut self.priority_queue);
475 self.stack_provider
476 .flush(priority_queue.into_iter().map(|(q, _)| q.value))
477 .await;
478 }
479
480 async fn push_stack(
482 &mut self,
483 stack_creation_type: StackCreationType,
484 project_key_pair: ProjectKeyPair,
485 envelope: Option<Box<Envelope>>,
486 ) -> Result<(), EnvelopeBufferError> {
487 let received_at = envelope.as_ref().map_or(Utc::now(), |e| e.received_at());
488
489 let mut stack = self
490 .stack_provider
491 .create_stack(stack_creation_type, project_key_pair);
492 if let Some(envelope) = envelope {
493 stack.push(envelope).await?;
494 }
495
496 let previous_entry = self.priority_queue.push(
497 QueueItem {
498 key: project_key_pair,
499 value: stack,
500 },
501 Priority::new(received_at),
502 );
503 debug_assert!(previous_entry.is_none());
504 for project_key in project_key_pair.iter() {
505 self.stacks_by_project
506 .entry(project_key)
507 .or_default()
508 .insert(project_key_pair);
509 }
510 relay_statsd::metric!(
511 gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
512 partition_id = &self.partition_tag
513 );
514
515 Ok(())
516 }
517
518 fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) {
520 for project_key in project_key_pair.iter() {
521 self.stacks_by_project
522 .get_mut(&project_key)
523 .expect("project_key is missing from lookup")
524 .remove(&project_key_pair);
525 }
526 self.priority_queue.remove(&project_key_pair);
527
528 relay_statsd::metric!(
529 gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
530 partition_id = &self.partition_tag
531 );
532 }
533
534 async fn load_stacks(&mut self, project_key_pairs: HashSet<ProjectKeyPair>) {
536 for project_key_pair in project_key_pairs {
537 self.push_stack(StackCreationType::Initialization, project_key_pair, None)
538 .await
539 .expect("Pushing an empty stack raised an error");
540 }
541 }
542
543 async fn load_store_total_count(&mut self) {
549 let total_count = timeout(Duration::from_secs(1), async {
550 self.stack_provider.store_total_count().await
551 })
552 .await;
553 match total_count {
554 Ok(total_count) => {
555 self.total_count = total_count as i64;
556 self.total_count_initialized = true;
557 }
558 Err(error) => {
559 self.total_count_initialized = false;
560 relay_log::error!(
561 error = &error as &dyn Error,
562 "failed to load the total envelope count of the store",
563 );
564 }
565 };
566 self.track_total_count();
567 }
568
569 fn track_total_count(&self) {
571 let total_count = self.total_count as f64;
572 let initialized = match self.total_count_initialized {
573 true => "true",
574 false => "false",
575 };
576 relay_statsd::metric!(
577 gauge(RelayGauges::BufferEnvelopesCount) = total_count,
578 initialized = initialized,
579 stack_type = self.stack_provider.stack_type(),
580 partition_id = &self.partition_tag
581 );
582 }
583}
584
585pub enum Peek {
587 Empty,
588 Ready {
589 project_key_pair: ProjectKeyPair,
590 last_received_at: DateTime<Utc>,
591 },
592 NotReady {
593 project_key_pair: ProjectKeyPair,
594 next_project_fetch: Instant,
595 last_received_at: DateTime<Utc>,
596 },
597}
598
599impl Peek {
600 pub fn last_received_at(&self) -> Option<DateTime<Utc>> {
601 match self {
602 Self::Empty => None,
603 Self::Ready {
604 last_received_at, ..
605 }
606 | Self::NotReady {
607 last_received_at, ..
608 } => Some(*last_received_at),
609 }
610 }
611}
612
613#[derive(Debug)]
614struct QueueItem<K, V> {
615 key: K,
616 value: V,
617}
618
619impl<K, V> std::borrow::Borrow<K> for QueueItem<K, V> {
620 fn borrow(&self) -> &K {
621 &self.key
622 }
623}
624
625impl<K: std::hash::Hash, V> std::hash::Hash for QueueItem<K, V> {
626 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
627 self.key.hash(state);
628 }
629}
630
631impl<K: PartialEq, V> PartialEq for QueueItem<K, V> {
632 fn eq(&self, other: &Self) -> bool {
633 self.key == other.key
634 }
635}
636
637impl<K: PartialEq, V> Eq for QueueItem<K, V> {}
638
639#[derive(Debug, Clone)]
640struct Priority {
641 readiness: Readiness,
642 received_at: DateTime<Utc>,
643 next_project_fetch: Instant,
644}
645
646impl Priority {
647 fn new(received_at: DateTime<Utc>) -> Self {
648 Self {
649 readiness: Readiness::new(),
650 received_at,
651 next_project_fetch: Instant::now(),
652 }
653 }
654}
655
656impl Ord for Priority {
657 fn cmp(&self, other: &Self) -> Ordering {
658 match (self.readiness.ready(), other.readiness.ready()) {
659 (true, true) => self.received_at.cmp(&other.received_at),
663 (true, false) => Ordering::Greater,
664 (false, true) => Ordering::Less,
665 (false, false) => self
668 .next_project_fetch
669 .cmp(&other.next_project_fetch)
670 .reverse()
671 .then(self.received_at.cmp(&other.received_at).reverse()),
672 }
673 }
674}
675
676impl PartialOrd for Priority {
677 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
678 Some(self.cmp(other))
679 }
680}
681
682impl PartialEq for Priority {
683 fn eq(&self, other: &Self) -> bool {
684 self.cmp(other).is_eq()
685 }
686}
687
688impl Eq for Priority {}
689
690#[derive(Debug, Clone, Copy)]
691struct Readiness {
692 own_project_ready: bool,
693 sampling_project_ready: bool,
694}
695
696impl Readiness {
697 fn new() -> Self {
698 Self {
701 own_project_ready: true,
702 sampling_project_ready: true,
703 }
704 }
705
706 fn ready(&self) -> bool {
707 self.own_project_ready && self.sampling_project_ready
708 }
709}
710
711#[cfg(test)]
712mod tests {
713 use relay_common::Dsn;
714 use relay_event_schema::protocol::EventId;
715 use relay_sampling::DynamicSamplingContext;
716 use std::str::FromStr;
717 use std::sync::Arc;
718 use uuid::Uuid;
719
720 use crate::SqliteEnvelopeStore;
721 use crate::envelope::{Item, ItemType};
722 use crate::extractors::RequestMeta;
723 use crate::services::buffer::common::ProjectKeyPair;
724 use crate::services::buffer::envelope_store::sqlite::DatabaseEnvelope;
725 use crate::services::buffer::testutils::utils::mock_envelopes;
726 use crate::utils::MemoryStat;
727
728 use super::*;
729
730 impl Peek {
731 fn is_empty(&self) -> bool {
732 matches!(self, Peek::Empty)
733 }
734 }
735
736 fn new_envelope(
737 own_key: ProjectKey,
738 sampling_key: Option<ProjectKey>,
739 event_id: Option<EventId>,
740 ) -> Box<Envelope> {
741 let mut envelope = Envelope::from_request(
742 None,
743 RequestMeta::new(Dsn::from_str(&format!("http://{own_key}@localhost/1")).unwrap()),
744 );
745 if let Some(sampling_key) = sampling_key {
746 envelope.set_dsc(DynamicSamplingContext {
747 public_key: sampling_key,
748 trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
749 release: None,
750 user: Default::default(),
751 replay_id: None,
752 environment: None,
753 transaction: None,
754 sample_rate: None,
755 sampled: None,
756 other: Default::default(),
757 });
758 envelope.add_item(Item::new(ItemType::Transaction));
759 }
760 if let Some(event_id) = event_id {
761 envelope.set_event_id(event_id);
762 }
763 envelope
764 }
765
766 fn mock_config(path: &str) -> Arc<Config> {
767 Config::from_json_value(serde_json::json!({
768 "spool": {
769 "envelopes": {
770 "path": path
771 }
772 }
773 }))
774 .unwrap()
775 .into()
776 }
777
778 fn mock_memory_checker() -> MemoryChecker {
779 MemoryChecker::new(MemoryStat::default(), mock_config("my/db/path").clone())
780 }
781
782 async fn peek_received_at(buffer: &mut EnvelopeBuffer<MemoryStackProvider>) -> DateTime<Utc> {
783 buffer.peek().await.unwrap().last_received_at().unwrap()
784 }
785
786 #[tokio::test]
787 async fn test_insert_pop() {
788 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
789
790 let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
791 let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
792 let project_key3 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
793
794 assert!(buffer.pop().await.unwrap().is_none());
795 assert!(buffer.peek().await.unwrap().is_empty());
796
797 let envelope1 = new_envelope(project_key1, None, None);
798 let time1 = envelope1.meta().received_at();
799 buffer.push(envelope1).await.unwrap();
800
801 let envelope2 = new_envelope(project_key2, None, None);
802 let time2 = envelope2.meta().received_at();
803 buffer.push(envelope2).await.unwrap();
804
805 assert_eq!(peek_received_at(&mut buffer).await, time2);
807
808 buffer.mark_ready(&project_key1, false);
809 buffer.mark_ready(&project_key2, false);
810
811 assert_eq!(peek_received_at(&mut buffer).await, time1);
813
814 let envelope3 = new_envelope(project_key3, None, None);
815 let time3 = envelope3.meta().received_at();
816 buffer.push(envelope3).await.unwrap();
817 buffer.mark_ready(&project_key3, false);
818
819 assert_eq!(peek_received_at(&mut buffer).await, time1);
821
822 buffer.mark_ready(&project_key3, true);
824 assert_eq!(peek_received_at(&mut buffer).await, time3);
825 assert_eq!(
826 buffer.pop().await.unwrap().unwrap().meta().public_key(),
827 project_key3
828 );
829
830 assert_eq!(peek_received_at(&mut buffer).await, time1);
832
833 buffer.mark_ready(&project_key1, true);
835 assert_eq!(peek_received_at(&mut buffer).await, time1);
836
837 buffer.mark_ready(&project_key2, true);
839 assert_eq!(peek_received_at(&mut buffer).await, time2);
840 assert_eq!(
841 buffer.pop().await.unwrap().unwrap().meta().public_key(),
842 project_key2
843 );
844
845 assert_eq!(
847 buffer.pop().await.unwrap().unwrap().meta().public_key(),
848 project_key1
849 );
850 assert!(buffer.pop().await.unwrap().is_none());
851 assert!(buffer.peek().await.unwrap().is_empty());
852 }
853
854 #[tokio::test]
855 async fn test_project_internal_order() {
856 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
857
858 let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
859
860 let envelope1 = new_envelope(project_key, None, None);
861 let time1 = envelope1.meta().received_at();
862 let envelope2 = new_envelope(project_key, None, None);
863 let time2 = envelope2.meta().received_at();
864
865 assert!(time2 > time1);
866
867 buffer.push(envelope1).await.unwrap();
868 buffer.push(envelope2).await.unwrap();
869
870 assert_eq!(
871 buffer.pop().await.unwrap().unwrap().meta().received_at(),
872 time2
873 );
874 assert_eq!(
875 buffer.pop().await.unwrap().unwrap().meta().received_at(),
876 time1
877 );
878 assert!(buffer.pop().await.unwrap().is_none());
879 }
880
881 #[tokio::test]
882 async fn test_sampling_projects() {
883 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
884
885 let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
886 let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
887
888 let envelope1 = new_envelope(project_key1, None, None);
889 let time1 = envelope1.received_at();
890 buffer.push(envelope1).await.unwrap();
891
892 let envelope2 = new_envelope(project_key2, None, None);
893 let time2 = envelope2.received_at();
894 buffer.push(envelope2).await.unwrap();
895
896 let envelope3 = new_envelope(project_key1, Some(project_key2), None);
897 let time3 = envelope3.meta().received_at();
898 buffer.push(envelope3).await.unwrap();
899
900 buffer.mark_ready(&project_key1, false);
901 buffer.mark_ready(&project_key2, false);
902
903 assert_eq!(
905 buffer.peek().await.unwrap().last_received_at().unwrap(),
906 time1
907 );
908
909 buffer.mark_ready(&project_key2, true);
911 assert_eq!(
912 buffer.peek().await.unwrap().last_received_at().unwrap(),
913 time2
914 );
915
916 buffer.mark_ready(&project_key2, false);
918 assert_eq!(
919 buffer.peek().await.unwrap().last_received_at().unwrap(),
920 time1
921 );
922
923 buffer.mark_ready(&project_key1, true);
925 assert_eq!(
926 buffer.peek().await.unwrap().last_received_at().unwrap(),
927 time1
928 );
929
930 buffer.mark_ready(&project_key2, true);
932 assert_eq!(
933 buffer.pop().await.unwrap().unwrap().meta().received_at(),
934 time3
935 );
936 assert_eq!(
937 buffer.peek().await.unwrap().last_received_at().unwrap(),
938 time2
939 );
940
941 buffer.mark_ready(&project_key2, false);
942 assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time1);
943 assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time2);
944
945 assert!(buffer.pop().await.unwrap().is_none());
946 }
947
948 #[tokio::test]
949 async fn test_project_keys_distinct() {
950 let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
951 let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
952
953 let project_key_pair1 = ProjectKeyPair::new(project_key1, project_key2);
954 let project_key_pair2 = ProjectKeyPair::new(project_key2, project_key1);
955
956 assert_ne!(project_key_pair1, project_key_pair2);
957
958 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
959 buffer
960 .push(new_envelope(project_key1, Some(project_key2), None))
961 .await
962 .unwrap();
963 buffer
964 .push(new_envelope(project_key2, Some(project_key1), None))
965 .await
966 .unwrap();
967 assert_eq!(buffer.priority_queue.len(), 2);
968 }
969
970 #[test]
971 fn test_total_order() {
972 let p1 = Priority {
973 readiness: Readiness {
974 own_project_ready: true,
975 sampling_project_ready: true,
976 },
977 received_at: Utc::now(),
978 next_project_fetch: Instant::now(),
979 };
980 let mut p2 = p1.clone();
981 p2.next_project_fetch += Duration::from_millis(1);
982
983 assert_eq!(p1.cmp(&p2), Ordering::Equal);
985 assert_eq!(p1, p2);
986 }
987
988 #[tokio::test]
989 async fn test_last_peek_internal_order() {
990 let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
991
992 let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
993 let event_id_1 = EventId::new();
994 let envelope1 = new_envelope(project_key_1, None, Some(event_id_1));
995 let time1 = envelope1.received_at();
996
997 let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap();
998 let event_id_2 = EventId::new();
999 let envelope2 = new_envelope(project_key_2, None, Some(event_id_2));
1000 let time2 = envelope2.received_at();
1001
1002 buffer.push(envelope1).await.unwrap();
1003 buffer.push(envelope2).await.unwrap();
1004
1005 buffer.mark_ready(&project_key_1, false);
1006 buffer.mark_ready(&project_key_2, false);
1007
1008 let Peek::NotReady {
1010 last_received_at, ..
1011 } = buffer.peek().await.unwrap()
1012 else {
1013 panic!();
1014 };
1015 assert_eq!(last_received_at, time1);
1016
1017 let Peek::NotReady {
1019 last_received_at,
1020 project_key_pair,
1021 ..
1022 } = buffer.peek().await.unwrap()
1023 else {
1024 panic!();
1025 };
1026 assert_eq!(last_received_at, time1);
1027 assert_ne!(last_received_at, time2);
1028
1029 buffer.mark_seen(&project_key_pair, Duration::ZERO);
1030
1031 let Peek::NotReady {
1033 last_received_at, ..
1034 } = buffer.peek().await.unwrap()
1035 else {
1036 panic!();
1037 };
1038 assert_eq!(last_received_at, time2);
1039 assert_ne!(last_received_at, time1);
1040
1041 let Peek::NotReady {
1042 last_received_at,
1043 project_key_pair,
1044 ..
1045 } = buffer.peek().await.unwrap()
1046 else {
1047 panic!();
1048 };
1049 assert_eq!(last_received_at, time2);
1050 assert_ne!(last_received_at, time1);
1051
1052 buffer.mark_seen(&project_key_pair, Duration::ZERO);
1053
1054 let Peek::NotReady {
1056 last_received_at, ..
1057 } = buffer.peek().await.unwrap()
1058 else {
1059 panic!();
1060 };
1061 assert_eq!(last_received_at, time1);
1062 assert_ne!(last_received_at, time2);
1063 }
1064
1065 #[tokio::test]
1066 async fn test_initialize_buffer() {
1067 let path = std::env::temp_dir()
1068 .join(Uuid::new_v4().to_string())
1069 .into_os_string()
1070 .into_string()
1071 .unwrap();
1072 let config = mock_config(&path);
1073 let mut store = SqliteEnvelopeStore::prepare(0, &config).await.unwrap();
1074 let mut buffer = EnvelopeBuffer::<SqliteStackProvider>::new(0, &config)
1075 .await
1076 .unwrap();
1077
1078 let envelopes = mock_envelopes(10);
1081 assert!(
1082 store
1083 .insert_batch(
1084 envelopes
1085 .into_iter()
1086 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
1087 .collect::<Vec<_>>()
1088 .try_into()
1089 .unwrap()
1090 )
1091 .await
1092 .is_ok()
1093 );
1094
1095 assert!(buffer.priority_queue.is_empty());
1097 assert!(buffer.stacks_by_project.is_empty());
1098
1099 buffer.initialize().await;
1100
1101 assert_eq!(buffer.priority_queue.len(), 1);
1104 assert_eq!(buffer.stacks_by_project.len(), 2);
1107 }
1108}