Skip to main content

relay_server/services/buffer/envelope_buffer/
mod.rs

1use std::cmp::Ordering;
2use std::collections::BTreeSet;
3use std::convert::Infallible;
4use std::error::Error;
5use std::mem;
6use std::time::Duration;
7
8use chrono::{DateTime, Utc};
9use hashbrown::HashSet;
10use relay_base_schema::project::ProjectKey;
11use relay_config::Config;
12use tokio::time::{Instant, timeout};
13
14use crate::envelope::Envelope;
15use crate::envelope::Item;
16use crate::services::buffer::common::ProjectKeyPair;
17use crate::services::buffer::envelope_stack::EnvelopeStack;
18use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
19use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError;
20use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
21use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
22use crate::services::buffer::stack_provider::{StackCreationType, StackProvider};
23use crate::statsd::{RelayDistributions, RelayGauges, RelayTimers};
24use crate::utils::MemoryChecker;
25
26/// Polymorphic envelope buffering interface.
27///
28/// The underlying buffer can either be disk-based or memory-based,
29/// depending on the given configuration.
30///
31/// NOTE: This is implemented as an enum because a trait object with async methods would not be
32/// object safe.
33#[derive(Debug)]
34#[allow(private_interfaces)]
35pub enum PolymorphicEnvelopeBuffer {
36    /// An enveloper buffer that uses in-memory envelopes stacks.
37    InMemory(EnvelopeBuffer<MemoryStackProvider>),
38    /// An enveloper buffer that uses sqlite envelopes stacks.
39    Sqlite(EnvelopeBuffer<SqliteStackProvider>),
40}
41
42impl PolymorphicEnvelopeBuffer {
43    /// Returns true if the implementation stores all envelopes in RAM.
44    pub fn is_memory(&self) -> bool {
45        match self {
46            Self::InMemory(_) => true,
47            Self::Sqlite(_) => false,
48        }
49    }
50
51    /// Creates either a memory-based or a disk-based envelope buffer,
52    /// depending on the given configuration.
53    pub async fn from_config(
54        partition_id: u8,
55        config: &Config,
56        memory_checker: MemoryChecker,
57    ) -> Result<Self, EnvelopeBufferError> {
58        let buffer = if config.spool_envelopes_path(partition_id).is_some() {
59            relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer");
60            let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(partition_id, config).await?;
61            Self::Sqlite(buffer)
62        } else {
63            relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer");
64            let buffer = EnvelopeBuffer::<MemoryStackProvider>::new(partition_id, memory_checker);
65            Self::InMemory(buffer)
66        };
67
68        Ok(buffer)
69    }
70
71    /// Initializes the envelope buffer.
72    pub async fn initialize(&mut self) {
73        match self {
74            PolymorphicEnvelopeBuffer::InMemory(buffer) => buffer.initialize().await,
75            PolymorphicEnvelopeBuffer::Sqlite(buffer) => buffer.initialize().await,
76        }
77    }
78
79    /// Adds an envelope to the buffer.
80    pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
81        relay_statsd::metric!(
82            distribution(RelayDistributions::BufferEnvelopeBodySize) =
83                envelope.items().map(Item::len).sum::<usize>() as u64,
84            partition_id = self.partition_tag()
85        );
86
87        relay_statsd::metric!(
88            timer(RelayTimers::BufferPush),
89            partition_id = self.partition_tag(),
90            {
91                match self {
92                    Self::Sqlite(buffer) => buffer.push(envelope).await,
93                    Self::InMemory(buffer) => buffer.push(envelope).await,
94                }
95            }
96        )
97    }
98
99    /// Returns a reference to the next-in-line envelope.
100    pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
101        relay_statsd::metric!(
102            timer(RelayTimers::BufferPeek),
103            partition_id = self.partition_tag(),
104            {
105                match self {
106                    Self::Sqlite(buffer) => buffer.peek().await,
107                    Self::InMemory(buffer) => buffer.peek().await,
108                }
109            }
110        )
111    }
112
113    /// Pops the next-in-line envelope.
114    pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
115        relay_statsd::metric!(
116            timer(RelayTimers::BufferPop),
117            partition_id = self.partition_tag(),
118            {
119                match self {
120                    Self::Sqlite(buffer) => buffer.pop().await,
121                    Self::InMemory(buffer) => buffer.pop().await,
122                }
123            }
124        )
125    }
126
127    /// Marks a project as ready or not ready.
128    ///
129    /// The buffer re-prioritizes its envelopes based on this information.
130    /// Returns `true` if at least one priority was changed.
131    pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
132        relay_log::trace!(
133            project_key = project.as_str(),
134            "buffer marked {}",
135            if is_ready { "ready" } else { "not ready" }
136        );
137        match self {
138            Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready),
139            Self::InMemory(buffer) => buffer.mark_ready(project, is_ready),
140        }
141    }
142
143    /// Marks a stack as seen.
144    ///
145    /// Non-ready stacks are deprioritized when they are marked as seen, such that
146    /// the next call to `.peek()` will look at a different stack. This prevents
147    /// head-of-line blocking.
148    pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
149        match self {
150            Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
151            Self::InMemory(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
152        }
153    }
154
155    /// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s.
156    pub fn has_capacity(&self) -> bool {
157        match self {
158            Self::Sqlite(buffer) => buffer.has_capacity(),
159            Self::InMemory(buffer) => buffer.has_capacity(),
160        }
161    }
162
163    /// Returns the total number of envelopes that have been spooled since the startup. It does
164    /// not include the count that existed in a persistent spooler before.
165    pub fn item_count(&self) -> u64 {
166        match self {
167            Self::Sqlite(buffer) => buffer.tracked_count,
168            Self::InMemory(buffer) => buffer.tracked_count,
169        }
170    }
171
172    /// Returns the total number of bytes that the spooler storage uses or `None` if the number
173    /// cannot be reliably determined.
174    pub fn total_size(&self) -> Option<u64> {
175        match self {
176            Self::Sqlite(buffer) => buffer.stack_provider.total_size(),
177            Self::InMemory(buffer) => buffer.stack_provider.total_size(),
178        }
179    }
180
181    /// Shuts down the [`PolymorphicEnvelopeBuffer`].
182    pub async fn shutdown(&mut self) -> bool {
183        // Currently, we want to flush the buffer only for disk, since the in memory implementation
184        // tries to not do anything and pop as many elements as possible within the shutdown
185        // timeout.
186        match self {
187            Self::Sqlite(buffer) if !buffer.stack_provider.ephemeral() => {
188                buffer.flush().await;
189                true
190            }
191            _ => {
192                relay_log::trace!("shutdown procedure not needed");
193                false
194            }
195        }
196    }
197
198    /// Returns the partition tag for this [`PolymorphicEnvelopeBuffer`].
199    fn partition_tag(&self) -> &str {
200        match self {
201            PolymorphicEnvelopeBuffer::InMemory(buffer) => &buffer.partition_tag,
202            PolymorphicEnvelopeBuffer::Sqlite(buffer) => &buffer.partition_tag,
203        }
204    }
205}
206
207/// Error that occurs while interacting with the envelope buffer.
208#[derive(Debug, thiserror::Error)]
209pub enum EnvelopeBufferError {
210    #[error("sqlite")]
211    SqliteStore(#[from] SqliteEnvelopeStoreError),
212
213    #[error("sqlite")]
214    SqliteStack(#[from] SqliteEnvelopeStackError),
215
216    #[error("failed to push envelope to the buffer")]
217    PushFailed,
218}
219
220impl From<Infallible> for EnvelopeBufferError {
221    fn from(value: Infallible) -> Self {
222        match value {}
223    }
224}
225
226/// An envelope buffer that holds an individual stack for each project/sampling project combination.
227///
228/// Envelope stacks are organized in a priority queue, and are re-prioritized every time an envelope
229/// is pushed, popped, or when a project becomes ready.
230#[derive(Debug)]
231struct EnvelopeBuffer<P: StackProvider> {
232    /// The central priority queue.
233    priority_queue: priority_queue::PriorityQueue<QueueItem<ProjectKeyPair, P::Stack>, Priority>,
234    /// A lookup table to find all stacks involving a project.
235    stacks_by_project: hashbrown::HashMap<ProjectKey, BTreeSet<ProjectKeyPair>>,
236    /// A provider of stacks that provides utilities to create stacks, check their capacity...
237    ///
238    /// This indirection is needed because different stack implementations might need different
239    /// initialization (e.g. a database connection).
240    stack_provider: P,
241    /// The total count of envelopes that the buffer is working with.
242    ///
243    /// Note that this count is not meant to be perfectly accurate since the initialization of the
244    /// count might not succeed if it takes more than a set timeout. For example, if we load the
245    /// count of all envelopes from disk, and it takes more than the time we set, we will mark the
246    /// initial count as 0 and just count incoming and outgoing envelopes from the buffer.
247    total_count: i64,
248    /// The total count of envelopes that the buffer is working with ignoring envelopes that
249    /// were previously stored on disk.
250    ///
251    /// On startup this will always be 0 and will only count incoming envelopes. If a reliable
252    /// count of currently buffered envelopes is required, prefer this over `total_count`
253    tracked_count: u64,
254    /// Whether the count initialization succeeded or not.
255    ///
256    /// This boolean is just used for tagging the metric that tracks the total count of envelopes
257    /// in the buffer.
258    total_count_initialized: bool,
259    /// The tag value of this partition which is used for reporting purposes.
260    partition_tag: String,
261}
262
263impl EnvelopeBuffer<MemoryStackProvider> {
264    /// Creates an empty memory-based buffer.
265    pub fn new(partition_id: u8, memory_checker: MemoryChecker) -> Self {
266        Self {
267            stacks_by_project: Default::default(),
268            priority_queue: Default::default(),
269            stack_provider: MemoryStackProvider::new(memory_checker),
270            total_count: 0,
271            tracked_count: 0,
272            total_count_initialized: false,
273            partition_tag: partition_id.to_string(),
274        }
275    }
276}
277
278#[allow(dead_code)]
279impl EnvelopeBuffer<SqliteStackProvider> {
280    /// Creates an empty sqlite-based buffer.
281    pub async fn new(partition_id: u8, config: &Config) -> Result<Self, EnvelopeBufferError> {
282        Ok(Self {
283            stacks_by_project: Default::default(),
284            priority_queue: Default::default(),
285            stack_provider: SqliteStackProvider::new(partition_id, config).await?,
286            total_count: 0,
287            tracked_count: 0,
288            total_count_initialized: false,
289            partition_tag: partition_id.to_string(),
290        })
291    }
292}
293
294impl<P: StackProvider> EnvelopeBuffer<P>
295where
296    EnvelopeBufferError: From<<P::Stack as EnvelopeStack>::Error>,
297{
298    /// Initializes the [`EnvelopeBuffer`] given the initialization state from the
299    /// [`StackProvider`].
300    pub async fn initialize(&mut self) {
301        relay_statsd::metric!(
302            timer(RelayTimers::BufferInitialization),
303            partition_id = &self.partition_tag,
304            {
305                let initialization_state = self.stack_provider.initialize().await;
306                self.load_stacks(initialization_state.project_key_pairs)
307                    .await;
308                self.load_store_total_count().await;
309            }
310        );
311    }
312
313    /// Pushes an envelope to the appropriate envelope stack and re-prioritizes the stack.
314    ///
315    /// If the envelope stack does not exist, a new stack is pushed to the priority queue.
316    /// The priority of the stack is updated with the envelope's received_at time.
317    pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
318        let received_at = envelope.received_at();
319
320        let project_key_pair = ProjectKeyPair::from_envelope(&envelope);
321        if let Some((
322            QueueItem {
323                key: _,
324                value: stack,
325            },
326            _,
327        )) = self.priority_queue.get_mut(&project_key_pair)
328        {
329            stack.push(envelope).await?;
330        } else {
331            // Since we have initialization code that creates all the necessary stacks, we assume
332            // that any new stack that is added during the envelope buffer's lifecycle, is recreated.
333            self.push_stack(
334                StackCreationType::New,
335                ProjectKeyPair::from_envelope(&envelope),
336                Some(envelope),
337            )
338            .await?;
339        }
340        self.priority_queue
341            .change_priority_by(&project_key_pair, |prio| {
342                prio.received_at = received_at;
343            });
344
345        self.total_count += 1;
346        self.tracked_count += 1;
347        self.track_total_count();
348
349        Ok(())
350    }
351
352    /// Returns a reference to the next-in-line envelope, if one exists.
353    pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
354        let Some((
355            QueueItem {
356                key: project_key_pair,
357                value: stack,
358            },
359            Priority {
360                readiness,
361                next_project_fetch,
362                ..
363            },
364        )) = self.priority_queue.peek_mut()
365        else {
366            return Ok(Peek::Empty);
367        };
368
369        let ready = readiness.ready();
370
371        Ok(match (stack.peek().await?, ready) {
372            (None, _) => Peek::Empty,
373            (Some(last_received_at), true) => Peek::Ready {
374                project_key_pair: *project_key_pair,
375                last_received_at,
376            },
377            (Some(last_received_at), false) => Peek::NotReady {
378                project_key_pair: *project_key_pair,
379                next_project_fetch: *next_project_fetch,
380                last_received_at,
381            },
382        })
383    }
384
385    /// Returns the next-in-line envelope, if one exists.
386    ///
387    /// The priority of the envelope's stack is updated with the next envelope's received_at
388    /// time. If the stack is empty after popping, it is removed from the priority queue.
389    pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
390        let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else {
391            return Ok(None);
392        };
393        let project_key_pair = *key;
394        let envelope = stack.pop().await?.expect("found an empty stack");
395
396        let last_received_at = stack.peek().await?;
397
398        match last_received_at {
399            None => {
400                self.pop_stack(project_key_pair);
401            }
402            Some(last_received_at) => {
403                self.priority_queue
404                    .change_priority_by(&project_key_pair, |prio| {
405                        prio.received_at = last_received_at;
406                    });
407            }
408        }
409
410        // We are fine with the count going negative, since it represents that more data was popped,
411        // than it was initially counted, meaning that we had a wrong total count from
412        // initialization.
413        self.total_count -= 1;
414        self.tracked_count = self.tracked_count.saturating_sub(1);
415        self.track_total_count();
416
417        Ok(Some(envelope))
418    }
419
420    /// Re-prioritizes all stacks that involve the given project key by setting it to "ready".
421    ///
422    /// Returns `true` if at least one priority was changed.
423    pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
424        let mut changed = false;
425        if let Some(project_key_pairs) = self.stacks_by_project.get(project) {
426            for project_key_pair in project_key_pairs {
427                self.priority_queue
428                    .change_priority_by(project_key_pair, |stack| {
429                        let mut found = false;
430                        for (subkey, readiness) in [
431                            (
432                                project_key_pair.own_key,
433                                &mut stack.readiness.own_project_ready,
434                            ),
435                            (
436                                project_key_pair.sampling_key,
437                                &mut stack.readiness.sampling_project_ready,
438                            ),
439                        ] {
440                            if subkey == *project {
441                                found = true;
442                                if *readiness != is_ready {
443                                    changed = true;
444                                    *readiness = is_ready;
445                                }
446                            }
447                        }
448                        debug_assert!(found);
449                    });
450            }
451        }
452
453        changed
454    }
455
456    /// Marks a stack as seen.
457    ///
458    /// Non-ready stacks are deprioritized when they are marked as seen, such that
459    /// the next call to `.peek()` will look at a different stack. This prevents
460    /// head-of-line blocking.
461    pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
462        self.priority_queue
463            .change_priority_by(project_key_pair, |stack| {
464                // We use the next project fetch to debounce project fetching and avoid head of
465                // line blocking of non-ready stacks.
466                stack.next_project_fetch = Instant::now() + next_fetch;
467            });
468    }
469
470    /// Returns `true` if the underlying storage has the capacity to store more envelopes.
471    pub fn has_capacity(&self) -> bool {
472        self.stack_provider.has_store_capacity()
473    }
474
475    /// Flushes the envelope buffer.
476    pub async fn flush(&mut self) {
477        let priority_queue = mem::take(&mut self.priority_queue);
478        self.stack_provider
479            .flush(priority_queue.into_iter().map(|(q, _)| q.value))
480            .await;
481    }
482
483    /// Pushes a new [`EnvelopeStack`] with the given [`Envelope`] inserted.
484    async fn push_stack(
485        &mut self,
486        stack_creation_type: StackCreationType,
487        project_key_pair: ProjectKeyPair,
488        envelope: Option<Box<Envelope>>,
489    ) -> Result<(), EnvelopeBufferError> {
490        let received_at = envelope.as_ref().map_or(Utc::now(), |e| e.received_at());
491
492        let mut stack = self
493            .stack_provider
494            .create_stack(stack_creation_type, project_key_pair);
495        if let Some(envelope) = envelope {
496            stack.push(envelope).await?;
497        }
498
499        let previous_entry = self.priority_queue.push(
500            QueueItem {
501                key: project_key_pair,
502                value: stack,
503            },
504            Priority::new(received_at),
505        );
506        debug_assert!(previous_entry.is_none());
507        for project_key in project_key_pair.iter() {
508            self.stacks_by_project
509                .entry(project_key)
510                .or_default()
511                .insert(project_key_pair);
512        }
513        relay_statsd::metric!(
514            gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
515            partition_id = &self.partition_tag
516        );
517
518        Ok(())
519    }
520
521    /// Pops an [`EnvelopeStack`] with the supplied [`EnvelopeBufferError`].
522    fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) {
523        for project_key in project_key_pair.iter() {
524            self.stacks_by_project
525                .get_mut(&project_key)
526                .expect("project_key is missing from lookup")
527                .remove(&project_key_pair);
528        }
529        self.priority_queue.remove(&project_key_pair);
530
531        relay_statsd::metric!(
532            gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
533            partition_id = &self.partition_tag
534        );
535    }
536
537    /// Creates all the [`EnvelopeStack`]s with no data given a set of [`ProjectKeyPair`].
538    async fn load_stacks(&mut self, project_key_pairs: HashSet<ProjectKeyPair>) {
539        for project_key_pair in project_key_pairs {
540            self.push_stack(StackCreationType::Initialization, project_key_pair, None)
541                .await
542                .expect("Pushing an empty stack raised an error");
543        }
544    }
545
546    /// Loads the total count from the store if it takes less than a specified duration.
547    ///
548    /// The total count returned by the store is related to the count of elements that the buffer
549    /// will process, besides the count of elements that will be added and removed during its
550    /// lifecycle
551    async fn load_store_total_count(&mut self) {
552        let total_count = timeout(Duration::from_secs(1), async {
553            self.stack_provider.store_total_count().await
554        })
555        .await;
556        match total_count {
557            Ok(total_count) => {
558                self.total_count = total_count as i64;
559                self.total_count_initialized = true;
560            }
561            Err(error) => {
562                self.total_count_initialized = false;
563                relay_log::error!(
564                    error = &error as &dyn Error,
565                    "failed to load the total envelope count of the store",
566                );
567            }
568        };
569        self.track_total_count();
570    }
571
572    /// Emits a metric to track the total count of envelopes that are in the envelope buffer.
573    fn track_total_count(&self) {
574        let total_count = self.total_count as f64;
575        let initialized = match self.total_count_initialized {
576            true => "true",
577            false => "false",
578        };
579        relay_statsd::metric!(
580            gauge(RelayGauges::BufferEnvelopesCount) = total_count,
581            initialized = initialized,
582            stack_type = self.stack_provider.stack_type(),
583            partition_id = &self.partition_tag
584        );
585    }
586}
587
588/// Contains the state of the first element in the buffer.
589pub enum Peek {
590    Empty,
591    Ready {
592        project_key_pair: ProjectKeyPair,
593        last_received_at: DateTime<Utc>,
594    },
595    NotReady {
596        project_key_pair: ProjectKeyPair,
597        next_project_fetch: Instant,
598        last_received_at: DateTime<Utc>,
599    },
600}
601
602impl Peek {
603    pub fn last_received_at(&self) -> Option<DateTime<Utc>> {
604        match self {
605            Self::Empty => None,
606            Self::Ready {
607                last_received_at, ..
608            }
609            | Self::NotReady {
610                last_received_at, ..
611            } => Some(*last_received_at),
612        }
613    }
614}
615
616#[derive(Debug)]
617struct QueueItem<K, V> {
618    key: K,
619    value: V,
620}
621
622impl<K, V> std::borrow::Borrow<K> for QueueItem<K, V> {
623    fn borrow(&self) -> &K {
624        &self.key
625    }
626}
627
628impl<K: std::hash::Hash, V> std::hash::Hash for QueueItem<K, V> {
629    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
630        self.key.hash(state);
631    }
632}
633
634impl<K: PartialEq, V> PartialEq for QueueItem<K, V> {
635    fn eq(&self, other: &Self) -> bool {
636        self.key == other.key
637    }
638}
639
640impl<K: PartialEq, V> Eq for QueueItem<K, V> {}
641
642#[derive(Debug, Clone)]
643struct Priority {
644    readiness: Readiness,
645    received_at: DateTime<Utc>,
646    next_project_fetch: Instant,
647}
648
649impl Priority {
650    fn new(received_at: DateTime<Utc>) -> Self {
651        Self {
652            readiness: Readiness::new(),
653            received_at,
654            next_project_fetch: Instant::now(),
655        }
656    }
657}
658
659impl Ord for Priority {
660    fn cmp(&self, other: &Self) -> Ordering {
661        match (self.readiness.ready(), other.readiness.ready()) {
662            // Assuming that two priorities differ only w.r.t. the `last_peek`, we want to prioritize
663            // stacks that were the least recently peeked. The rationale behind this is that we want
664            // to keep cycling through different stacks while peeking.
665            (true, true) => self.received_at.cmp(&other.received_at),
666            (true, false) => Ordering::Greater,
667            (false, true) => Ordering::Less,
668            // For non-ready stacks, we invert the priority, such that projects that are not
669            // ready and did not receive envelopes recently can be evicted.
670            (false, false) => self
671                .next_project_fetch
672                .cmp(&other.next_project_fetch)
673                .reverse()
674                .then(self.received_at.cmp(&other.received_at).reverse()),
675        }
676    }
677}
678
679impl PartialOrd for Priority {
680    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
681        Some(self.cmp(other))
682    }
683}
684
685impl PartialEq for Priority {
686    fn eq(&self, other: &Self) -> bool {
687        self.cmp(other).is_eq()
688    }
689}
690
691impl Eq for Priority {}
692
693#[derive(Debug, Clone, Copy)]
694struct Readiness {
695    own_project_ready: bool,
696    sampling_project_ready: bool,
697}
698
699impl Readiness {
700    fn new() -> Self {
701        // Optimistically set ready state to true.
702        // The large majority of stack creations are re-creations after a stack was emptied.
703        Self {
704            own_project_ready: true,
705            sampling_project_ready: true,
706        }
707    }
708
709    fn ready(&self) -> bool {
710        self.own_project_ready && self.sampling_project_ready
711    }
712}
713
714#[cfg(test)]
715mod tests {
716    use relay_base_schema::project::ProjectId;
717    use relay_common::Dsn;
718    use relay_event_schema::protocol::EventId;
719    use relay_sampling::DynamicSamplingContext;
720    use std::str::FromStr;
721    use std::sync::Arc;
722    use uuid::Uuid;
723
724    use crate::SqliteEnvelopeStore;
725    use crate::envelope::{Item, ItemType};
726    use crate::extractors::RequestMeta;
727    use crate::services::buffer::common::ProjectKeyPair;
728    use crate::services::buffer::envelope_store::sqlite::DatabaseEnvelope;
729    use crate::services::buffer::testutils::utils::mock_envelopes;
730    use crate::utils::MemoryStat;
731
732    use super::*;
733
734    impl Peek {
735        fn is_empty(&self) -> bool {
736            matches!(self, Peek::Empty)
737        }
738    }
739
740    fn new_envelope(
741        own_key: ProjectKey,
742        sampling_key: Option<ProjectKey>,
743        event_id: Option<EventId>,
744    ) -> Box<Envelope> {
745        let mut envelope = Envelope::from_request(
746            None,
747            RequestMeta::new(Dsn::from_str(&format!("http://{own_key}@localhost/1")).unwrap()),
748        );
749        if let Some(sampling_key) = sampling_key {
750            envelope.set_dsc(DynamicSamplingContext {
751                public_key: sampling_key,
752                project_id: Some(ProjectId::new(42)),
753                trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
754                release: None,
755                user: Default::default(),
756                replay_id: None,
757                environment: None,
758                transaction: None,
759                sample_rate: None,
760                sampled: None,
761                other: Default::default(),
762            });
763            envelope.add_item(Item::new(ItemType::Transaction));
764        }
765        if let Some(event_id) = event_id {
766            envelope.set_event_id(event_id);
767        }
768        envelope
769    }
770
771    fn mock_config(path: &str) -> Arc<Config> {
772        Config::from_json_value(serde_json::json!({
773            "spool": {
774                "envelopes": {
775                    "path": path
776                }
777            }
778        }))
779        .unwrap()
780        .into()
781    }
782
783    fn mock_memory_checker() -> MemoryChecker {
784        MemoryChecker::new(MemoryStat::default(), mock_config("my/db/path").clone())
785    }
786
787    async fn peek_received_at(buffer: &mut EnvelopeBuffer<MemoryStackProvider>) -> DateTime<Utc> {
788        buffer.peek().await.unwrap().last_received_at().unwrap()
789    }
790
791    #[tokio::test]
792    async fn test_insert_pop() {
793        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
794
795        let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
796        let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
797        let project_key3 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
798
799        assert!(buffer.pop().await.unwrap().is_none());
800        assert!(buffer.peek().await.unwrap().is_empty());
801
802        let envelope1 = new_envelope(project_key1, None, None);
803        let time1 = envelope1.meta().received_at();
804        buffer.push(envelope1).await.unwrap();
805
806        let envelope2 = new_envelope(project_key2, None, None);
807        let time2 = envelope2.meta().received_at();
808        buffer.push(envelope2).await.unwrap();
809
810        // Both projects are ready, so project 2 is on top (has the newest envelopes):
811        assert_eq!(peek_received_at(&mut buffer).await, time2);
812
813        buffer.mark_ready(&project_key1, false);
814        buffer.mark_ready(&project_key2, false);
815
816        // Both projects are not ready, so project 1 is on top (has the oldest envelopes):
817        assert_eq!(peek_received_at(&mut buffer).await, time1);
818
819        let envelope3 = new_envelope(project_key3, None, None);
820        let time3 = envelope3.meta().received_at();
821        buffer.push(envelope3).await.unwrap();
822        buffer.mark_ready(&project_key3, false);
823
824        // All projects are not ready, so project 1 is on top (has the oldest envelopes):
825        assert_eq!(peek_received_at(&mut buffer).await, time1);
826
827        // After marking a project ready, it goes to the top:
828        buffer.mark_ready(&project_key3, true);
829        assert_eq!(peek_received_at(&mut buffer).await, time3);
830        assert_eq!(
831            buffer.pop().await.unwrap().unwrap().meta().public_key(),
832            project_key3
833        );
834
835        // After popping, project 1 is on top again:
836        assert_eq!(peek_received_at(&mut buffer).await, time1);
837
838        // Mark project 1 as ready (still on top):
839        buffer.mark_ready(&project_key1, true);
840        assert_eq!(peek_received_at(&mut buffer).await, time1);
841
842        // Mark project 2 as ready as well (now on top because most recent):
843        buffer.mark_ready(&project_key2, true);
844        assert_eq!(peek_received_at(&mut buffer).await, time2);
845        assert_eq!(
846            buffer.pop().await.unwrap().unwrap().meta().public_key(),
847            project_key2
848        );
849
850        // Pop last element:
851        assert_eq!(
852            buffer.pop().await.unwrap().unwrap().meta().public_key(),
853            project_key1
854        );
855        assert!(buffer.pop().await.unwrap().is_none());
856        assert!(buffer.peek().await.unwrap().is_empty());
857    }
858
859    #[tokio::test]
860    async fn test_project_internal_order() {
861        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
862
863        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
864
865        let envelope1 = new_envelope(project_key, None, None);
866        let time1 = envelope1.meta().received_at();
867        let envelope2 = new_envelope(project_key, None, None);
868        let time2 = envelope2.meta().received_at();
869
870        assert!(time2 > time1);
871
872        buffer.push(envelope1).await.unwrap();
873        buffer.push(envelope2).await.unwrap();
874
875        assert_eq!(
876            buffer.pop().await.unwrap().unwrap().meta().received_at(),
877            time2
878        );
879        assert_eq!(
880            buffer.pop().await.unwrap().unwrap().meta().received_at(),
881            time1
882        );
883        assert!(buffer.pop().await.unwrap().is_none());
884    }
885
886    #[tokio::test]
887    async fn test_sampling_projects() {
888        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
889
890        let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
891        let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
892
893        let envelope1 = new_envelope(project_key1, None, None);
894        let time1 = envelope1.received_at();
895        buffer.push(envelope1).await.unwrap();
896
897        let envelope2 = new_envelope(project_key2, None, None);
898        let time2 = envelope2.received_at();
899        buffer.push(envelope2).await.unwrap();
900
901        let envelope3 = new_envelope(project_key1, Some(project_key2), None);
902        let time3 = envelope3.meta().received_at();
903        buffer.push(envelope3).await.unwrap();
904
905        buffer.mark_ready(&project_key1, false);
906        buffer.mark_ready(&project_key2, false);
907
908        // Nothing is ready, instant1 is on top:
909        assert_eq!(
910            buffer.peek().await.unwrap().last_received_at().unwrap(),
911            time1
912        );
913
914        // Mark project 2 ready, gets on top:
915        buffer.mark_ready(&project_key2, true);
916        assert_eq!(
917            buffer.peek().await.unwrap().last_received_at().unwrap(),
918            time2
919        );
920
921        // Revert
922        buffer.mark_ready(&project_key2, false);
923        assert_eq!(
924            buffer.peek().await.unwrap().last_received_at().unwrap(),
925            time1
926        );
927
928        // Project 1 ready:
929        buffer.mark_ready(&project_key1, true);
930        assert_eq!(
931            buffer.peek().await.unwrap().last_received_at().unwrap(),
932            time1
933        );
934
935        // when both projects are ready, event no 3 ends up on top:
936        buffer.mark_ready(&project_key2, true);
937        assert_eq!(
938            buffer.pop().await.unwrap().unwrap().meta().received_at(),
939            time3
940        );
941        assert_eq!(
942            buffer.peek().await.unwrap().last_received_at().unwrap(),
943            time2
944        );
945
946        buffer.mark_ready(&project_key2, false);
947        assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time1);
948        assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time2);
949
950        assert!(buffer.pop().await.unwrap().is_none());
951    }
952
953    #[tokio::test]
954    async fn test_project_keys_distinct() {
955        let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
956        let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
957
958        let project_key_pair1 = ProjectKeyPair::new(project_key1, project_key2);
959        let project_key_pair2 = ProjectKeyPair::new(project_key2, project_key1);
960
961        assert_ne!(project_key_pair1, project_key_pair2);
962
963        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
964        buffer
965            .push(new_envelope(project_key1, Some(project_key2), None))
966            .await
967            .unwrap();
968        buffer
969            .push(new_envelope(project_key2, Some(project_key1), None))
970            .await
971            .unwrap();
972        assert_eq!(buffer.priority_queue.len(), 2);
973    }
974
975    #[test]
976    fn test_total_order() {
977        let p1 = Priority {
978            readiness: Readiness {
979                own_project_ready: true,
980                sampling_project_ready: true,
981            },
982            received_at: Utc::now(),
983            next_project_fetch: Instant::now(),
984        };
985        let mut p2 = p1.clone();
986        p2.next_project_fetch += Duration::from_millis(1);
987
988        // Last peek does not matter because project is ready:
989        assert_eq!(p1.cmp(&p2), Ordering::Equal);
990        assert_eq!(p1, p2);
991    }
992
993    #[tokio::test]
994    async fn test_last_peek_internal_order() {
995        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
996
997        let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
998        let event_id_1 = EventId::new();
999        let envelope1 = new_envelope(project_key_1, None, Some(event_id_1));
1000        let time1 = envelope1.received_at();
1001
1002        let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap();
1003        let event_id_2 = EventId::new();
1004        let envelope2 = new_envelope(project_key_2, None, Some(event_id_2));
1005        let time2 = envelope2.received_at();
1006
1007        buffer.push(envelope1).await.unwrap();
1008        buffer.push(envelope2).await.unwrap();
1009
1010        buffer.mark_ready(&project_key_1, false);
1011        buffer.mark_ready(&project_key_2, false);
1012
1013        // event_id_1 is first element:
1014        let Peek::NotReady {
1015            last_received_at, ..
1016        } = buffer.peek().await.unwrap()
1017        else {
1018            panic!();
1019        };
1020        assert_eq!(last_received_at, time1);
1021
1022        // Second peek returns same element:
1023        let Peek::NotReady {
1024            last_received_at,
1025            project_key_pair,
1026            ..
1027        } = buffer.peek().await.unwrap()
1028        else {
1029            panic!();
1030        };
1031        assert_eq!(last_received_at, time1);
1032        assert_ne!(last_received_at, time2);
1033
1034        buffer.mark_seen(&project_key_pair, Duration::ZERO);
1035
1036        // After mark_seen, event 2 is on top:
1037        let Peek::NotReady {
1038            last_received_at, ..
1039        } = buffer.peek().await.unwrap()
1040        else {
1041            panic!();
1042        };
1043        assert_eq!(last_received_at, time2);
1044        assert_ne!(last_received_at, time1);
1045
1046        let Peek::NotReady {
1047            last_received_at,
1048            project_key_pair,
1049            ..
1050        } = buffer.peek().await.unwrap()
1051        else {
1052            panic!();
1053        };
1054        assert_eq!(last_received_at, time2);
1055        assert_ne!(last_received_at, time1);
1056
1057        buffer.mark_seen(&project_key_pair, Duration::ZERO);
1058
1059        // After another mark_seen, cycle back to event 1:
1060        let Peek::NotReady {
1061            last_received_at, ..
1062        } = buffer.peek().await.unwrap()
1063        else {
1064            panic!();
1065        };
1066        assert_eq!(last_received_at, time1);
1067        assert_ne!(last_received_at, time2);
1068    }
1069
1070    #[tokio::test]
1071    async fn test_initialize_buffer() {
1072        let path = std::env::temp_dir()
1073            .join(Uuid::new_v4().to_string())
1074            .into_os_string()
1075            .into_string()
1076            .unwrap();
1077        let config = mock_config(&path);
1078        let mut store = SqliteEnvelopeStore::prepare(0, &config).await.unwrap();
1079        let mut buffer = EnvelopeBuffer::<SqliteStackProvider>::new(0, &config)
1080            .await
1081            .unwrap();
1082
1083        // We write 5 envelopes to disk so that we can check if they are loaded. These envelopes
1084        // belong to the same project keys, so they belong to the same envelope stack.
1085        let envelopes = mock_envelopes(10);
1086        assert!(
1087            store
1088                .insert_batch(
1089                    envelopes
1090                        .into_iter()
1091                        .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
1092                        .collect::<Vec<_>>()
1093                        .try_into()
1094                        .unwrap()
1095                )
1096                .await
1097                .is_ok()
1098        );
1099
1100        // We assume that the buffer is empty.
1101        assert!(buffer.priority_queue.is_empty());
1102        assert!(buffer.stacks_by_project.is_empty());
1103
1104        buffer.initialize().await;
1105
1106        // We assume that we loaded only 1 envelope stack, because of the project keys combinations
1107        // of the envelopes we inserted above.
1108        assert_eq!(buffer.priority_queue.len(), 1);
1109        // We expect to have an entry per project key, since we have 1 pair, the total entries
1110        // should be 2.
1111        assert_eq!(buffer.stacks_by_project.len(), 2);
1112    }
1113}