relay_server/services/buffer/envelope_buffer/
mod.rs

1use std::cmp::Ordering;
2use std::collections::BTreeSet;
3use std::convert::Infallible;
4use std::error::Error;
5use std::mem;
6use std::time::Duration;
7
8use chrono::{DateTime, Utc};
9use hashbrown::HashSet;
10use relay_base_schema::project::ProjectKey;
11use relay_config::Config;
12use tokio::time::{Instant, timeout};
13
14use crate::envelope::Envelope;
15use crate::envelope::Item;
16use crate::services::buffer::common::ProjectKeyPair;
17use crate::services::buffer::envelope_stack::EnvelopeStack;
18use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
19use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError;
20use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
21use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
22use crate::services::buffer::stack_provider::{StackCreationType, StackProvider};
23use crate::statsd::{RelayDistributions, RelayGauges, RelayTimers};
24use crate::utils::MemoryChecker;
25
26/// Polymorphic envelope buffering interface.
27///
28/// The underlying buffer can either be disk-based or memory-based,
29/// depending on the given configuration.
30///
31/// NOTE: This is implemented as an enum because a trait object with async methods would not be
32/// object safe.
33#[derive(Debug)]
34#[allow(private_interfaces)]
35pub enum PolymorphicEnvelopeBuffer {
36    /// An enveloper buffer that uses in-memory envelopes stacks.
37    InMemory(EnvelopeBuffer<MemoryStackProvider>),
38    /// An enveloper buffer that uses sqlite envelopes stacks.
39    Sqlite(EnvelopeBuffer<SqliteStackProvider>),
40}
41
42impl PolymorphicEnvelopeBuffer {
43    /// Returns true if the implementation stores all envelopes in RAM.
44    pub fn is_memory(&self) -> bool {
45        match self {
46            PolymorphicEnvelopeBuffer::InMemory(_) => true,
47            PolymorphicEnvelopeBuffer::Sqlite(_) => false,
48        }
49    }
50
51    /// Creates either a memory-based or a disk-based envelope buffer,
52    /// depending on the given configuration.
53    pub async fn from_config(
54        partition_id: u8,
55        config: &Config,
56        memory_checker: MemoryChecker,
57    ) -> Result<Self, EnvelopeBufferError> {
58        let buffer = if config.spool_envelopes_path(partition_id).is_some() {
59            relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer");
60            let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(partition_id, config).await?;
61            Self::Sqlite(buffer)
62        } else {
63            relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer");
64            let buffer = EnvelopeBuffer::<MemoryStackProvider>::new(partition_id, memory_checker);
65            Self::InMemory(buffer)
66        };
67
68        Ok(buffer)
69    }
70
71    /// Initializes the envelope buffer.
72    pub async fn initialize(&mut self) {
73        match self {
74            PolymorphicEnvelopeBuffer::InMemory(buffer) => buffer.initialize().await,
75            PolymorphicEnvelopeBuffer::Sqlite(buffer) => buffer.initialize().await,
76        }
77    }
78
79    /// Adds an envelope to the buffer.
80    pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
81        relay_statsd::metric!(
82            distribution(RelayDistributions::BufferEnvelopeBodySize, sample = 0.01) =
83                envelope.items().map(Item::len).sum::<usize>() as u64,
84            partition_id = self.partition_tag()
85        );
86
87        relay_statsd::metric!(
88            timer(RelayTimers::BufferPush, sample = 0.01),
89            partition_id = self.partition_tag(),
90            {
91                match self {
92                    Self::Sqlite(buffer) => buffer.push(envelope).await,
93                    Self::InMemory(buffer) => buffer.push(envelope).await,
94                }
95            }
96        )
97    }
98
99    /// Returns a reference to the next-in-line envelope.
100    pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
101        relay_statsd::metric!(
102            timer(RelayTimers::BufferPeek, sample = 0.01),
103            partition_id = self.partition_tag(),
104            {
105                match self {
106                    Self::Sqlite(buffer) => buffer.peek().await,
107                    Self::InMemory(buffer) => buffer.peek().await,
108                }
109            }
110        )
111    }
112
113    /// Pops the next-in-line envelope.
114    pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
115        relay_statsd::metric!(
116            timer(RelayTimers::BufferPop, sample = 0.01),
117            partition_id = self.partition_tag(),
118            {
119                match self {
120                    Self::Sqlite(buffer) => buffer.pop().await,
121                    Self::InMemory(buffer) => buffer.pop().await,
122                }
123            }
124        )
125    }
126
127    /// Marks a project as ready or not ready.
128    ///
129    /// The buffer re-prioritizes its envelopes based on this information.
130    /// Returns `true` if at least one priority was changed.
131    pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
132        relay_log::trace!(
133            project_key = project.as_str(),
134            "buffer marked {}",
135            if is_ready { "ready" } else { "not ready" }
136        );
137        match self {
138            Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready),
139            Self::InMemory(buffer) => buffer.mark_ready(project, is_ready),
140        }
141    }
142
143    /// Marks a stack as seen.
144    ///
145    /// Non-ready stacks are deprioritized when they are marked as seen, such that
146    /// the next call to `.peek()` will look at a different stack. This prevents
147    /// head-of-line blocking.
148    pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
149        match self {
150            Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
151            Self::InMemory(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
152        }
153    }
154
155    /// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s.
156    pub fn has_capacity(&self) -> bool {
157        match self {
158            Self::Sqlite(buffer) => buffer.has_capacity(),
159            Self::InMemory(buffer) => buffer.has_capacity(),
160        }
161    }
162
163    /// Returns the total number of envelopes that have been spooled since the startup. It does
164    /// not include the count that existed in a persistent spooler before.
165    pub fn item_count(&self) -> u64 {
166        match self {
167            Self::Sqlite(buffer) => buffer.tracked_count,
168            Self::InMemory(buffer) => buffer.tracked_count,
169        }
170    }
171
172    /// Returns the total number of bytes that the spooler storage uses or `None` if the number
173    /// cannot be reliably determined.
174    pub fn total_size(&self) -> Option<u64> {
175        match self {
176            Self::Sqlite(buffer) => buffer.stack_provider.total_size(),
177            Self::InMemory(buffer) => buffer.stack_provider.total_size(),
178        }
179    }
180
181    /// Shuts down the [`PolymorphicEnvelopeBuffer`].
182    pub async fn shutdown(&mut self) -> bool {
183        // Currently, we want to flush the buffer only for disk, since the in memory implementation
184        // tries to not do anything and pop as many elements as possible within the shutdown
185        // timeout.
186        let Self::Sqlite(buffer) = self else {
187            relay_log::trace!("PolymorphicEnvelopeBuffer: shutdown procedure not needed");
188            return false;
189        };
190        buffer.flush().await;
191
192        true
193    }
194
195    /// Returns the partition tag for this [`PolymorphicEnvelopeBuffer`].
196    fn partition_tag(&self) -> &str {
197        match self {
198            PolymorphicEnvelopeBuffer::InMemory(buffer) => &buffer.partition_tag,
199            PolymorphicEnvelopeBuffer::Sqlite(buffer) => &buffer.partition_tag,
200        }
201    }
202}
203
204/// Error that occurs while interacting with the envelope buffer.
205#[derive(Debug, thiserror::Error)]
206pub enum EnvelopeBufferError {
207    #[error("sqlite")]
208    SqliteStore(#[from] SqliteEnvelopeStoreError),
209
210    #[error("sqlite")]
211    SqliteStack(#[from] SqliteEnvelopeStackError),
212
213    #[error("failed to push envelope to the buffer")]
214    PushFailed,
215}
216
217impl From<Infallible> for EnvelopeBufferError {
218    fn from(value: Infallible) -> Self {
219        match value {}
220    }
221}
222
223/// An envelope buffer that holds an individual stack for each project/sampling project combination.
224///
225/// Envelope stacks are organized in a priority queue, and are re-prioritized every time an envelope
226/// is pushed, popped, or when a project becomes ready.
227#[derive(Debug)]
228struct EnvelopeBuffer<P: StackProvider> {
229    /// The central priority queue.
230    priority_queue: priority_queue::PriorityQueue<QueueItem<ProjectKeyPair, P::Stack>, Priority>,
231    /// A lookup table to find all stacks involving a project.
232    stacks_by_project: hashbrown::HashMap<ProjectKey, BTreeSet<ProjectKeyPair>>,
233    /// A provider of stacks that provides utilities to create stacks, check their capacity...
234    ///
235    /// This indirection is needed because different stack implementations might need different
236    /// initialization (e.g. a database connection).
237    stack_provider: P,
238    /// The total count of envelopes that the buffer is working with.
239    ///
240    /// Note that this count is not meant to be perfectly accurate since the initialization of the
241    /// count might not succeed if it takes more than a set timeout. For example, if we load the
242    /// count of all envelopes from disk, and it takes more than the time we set, we will mark the
243    /// initial count as 0 and just count incoming and outgoing envelopes from the buffer.
244    total_count: i64,
245    /// The total count of envelopes that the buffer is working with ignoring envelopes that
246    /// were previously stored on disk.
247    ///
248    /// On startup this will always be 0 and will only count incoming envelopes. If a reliable
249    /// count of currently buffered envelopes is required, prefer this over `total_count`
250    tracked_count: u64,
251    /// Whether the count initialization succeeded or not.
252    ///
253    /// This boolean is just used for tagging the metric that tracks the total count of envelopes
254    /// in the buffer.
255    total_count_initialized: bool,
256    /// The tag value of this partition which is used for reporting purposes.
257    partition_tag: String,
258}
259
260impl EnvelopeBuffer<MemoryStackProvider> {
261    /// Creates an empty memory-based buffer.
262    pub fn new(partition_id: u8, memory_checker: MemoryChecker) -> Self {
263        Self {
264            stacks_by_project: Default::default(),
265            priority_queue: Default::default(),
266            stack_provider: MemoryStackProvider::new(memory_checker),
267            total_count: 0,
268            tracked_count: 0,
269            total_count_initialized: false,
270            partition_tag: partition_id.to_string(),
271        }
272    }
273}
274
275#[allow(dead_code)]
276impl EnvelopeBuffer<SqliteStackProvider> {
277    /// Creates an empty sqlite-based buffer.
278    pub async fn new(partition_id: u8, config: &Config) -> Result<Self, EnvelopeBufferError> {
279        Ok(Self {
280            stacks_by_project: Default::default(),
281            priority_queue: Default::default(),
282            stack_provider: SqliteStackProvider::new(partition_id, config).await?,
283            total_count: 0,
284            tracked_count: 0,
285            total_count_initialized: false,
286            partition_tag: partition_id.to_string(),
287        })
288    }
289}
290
291impl<P: StackProvider> EnvelopeBuffer<P>
292where
293    EnvelopeBufferError: From<<P::Stack as EnvelopeStack>::Error>,
294{
295    /// Initializes the [`EnvelopeBuffer`] given the initialization state from the
296    /// [`StackProvider`].
297    pub async fn initialize(&mut self) {
298        relay_statsd::metric!(
299            timer(RelayTimers::BufferInitialization),
300            partition_id = &self.partition_tag,
301            {
302                let initialization_state = self.stack_provider.initialize().await;
303                self.load_stacks(initialization_state.project_key_pairs)
304                    .await;
305                self.load_store_total_count().await;
306            }
307        );
308    }
309
310    /// Pushes an envelope to the appropriate envelope stack and re-prioritizes the stack.
311    ///
312    /// If the envelope stack does not exist, a new stack is pushed to the priority queue.
313    /// The priority of the stack is updated with the envelope's received_at time.
314    pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
315        let received_at = envelope.received_at();
316
317        let project_key_pair = ProjectKeyPair::from_envelope(&envelope);
318        if let Some((
319            QueueItem {
320                key: _,
321                value: stack,
322            },
323            _,
324        )) = self.priority_queue.get_mut(&project_key_pair)
325        {
326            stack.push(envelope).await?;
327        } else {
328            // Since we have initialization code that creates all the necessary stacks, we assume
329            // that any new stack that is added during the envelope buffer's lifecycle, is recreated.
330            self.push_stack(
331                StackCreationType::New,
332                ProjectKeyPair::from_envelope(&envelope),
333                Some(envelope),
334            )
335            .await?;
336        }
337        self.priority_queue
338            .change_priority_by(&project_key_pair, |prio| {
339                prio.received_at = received_at;
340            });
341
342        self.total_count += 1;
343        self.tracked_count += 1;
344        self.track_total_count();
345
346        Ok(())
347    }
348
349    /// Returns a reference to the next-in-line envelope, if one exists.
350    pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
351        let Some((
352            QueueItem {
353                key: project_key_pair,
354                value: stack,
355            },
356            Priority {
357                readiness,
358                next_project_fetch,
359                ..
360            },
361        )) = self.priority_queue.peek_mut()
362        else {
363            return Ok(Peek::Empty);
364        };
365
366        let ready = readiness.ready();
367
368        Ok(match (stack.peek().await?, ready) {
369            (None, _) => Peek::Empty,
370            (Some(last_received_at), true) => Peek::Ready {
371                project_key_pair: *project_key_pair,
372                last_received_at,
373            },
374            (Some(last_received_at), false) => Peek::NotReady {
375                project_key_pair: *project_key_pair,
376                next_project_fetch: *next_project_fetch,
377                last_received_at,
378            },
379        })
380    }
381
382    /// Returns the next-in-line envelope, if one exists.
383    ///
384    /// The priority of the envelope's stack is updated with the next envelope's received_at
385    /// time. If the stack is empty after popping, it is removed from the priority queue.
386    pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
387        let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else {
388            return Ok(None);
389        };
390        let project_key_pair = *key;
391        let envelope = stack.pop().await?.expect("found an empty stack");
392
393        let last_received_at = stack.peek().await?;
394
395        match last_received_at {
396            None => {
397                self.pop_stack(project_key_pair);
398            }
399            Some(last_received_at) => {
400                self.priority_queue
401                    .change_priority_by(&project_key_pair, |prio| {
402                        prio.received_at = last_received_at;
403                    });
404            }
405        }
406
407        // We are fine with the count going negative, since it represents that more data was popped,
408        // than it was initially counted, meaning that we had a wrong total count from
409        // initialization.
410        self.total_count -= 1;
411        self.tracked_count = self.tracked_count.saturating_sub(1);
412        self.track_total_count();
413
414        Ok(Some(envelope))
415    }
416
417    /// Re-prioritizes all stacks that involve the given project key by setting it to "ready".
418    ///
419    /// Returns `true` if at least one priority was changed.
420    pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
421        let mut changed = false;
422        if let Some(project_key_pairs) = self.stacks_by_project.get(project) {
423            for project_key_pair in project_key_pairs {
424                self.priority_queue
425                    .change_priority_by(project_key_pair, |stack| {
426                        let mut found = false;
427                        for (subkey, readiness) in [
428                            (
429                                project_key_pair.own_key,
430                                &mut stack.readiness.own_project_ready,
431                            ),
432                            (
433                                project_key_pair.sampling_key,
434                                &mut stack.readiness.sampling_project_ready,
435                            ),
436                        ] {
437                            if subkey == *project {
438                                found = true;
439                                if *readiness != is_ready {
440                                    changed = true;
441                                    *readiness = is_ready;
442                                }
443                            }
444                        }
445                        debug_assert!(found);
446                    });
447            }
448        }
449
450        changed
451    }
452
453    /// Marks a stack as seen.
454    ///
455    /// Non-ready stacks are deprioritized when they are marked as seen, such that
456    /// the next call to `.peek()` will look at a different stack. This prevents
457    /// head-of-line blocking.
458    pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
459        self.priority_queue
460            .change_priority_by(project_key_pair, |stack| {
461                // We use the next project fetch to debounce project fetching and avoid head of
462                // line blocking of non-ready stacks.
463                stack.next_project_fetch = Instant::now() + next_fetch;
464            });
465    }
466
467    /// Returns `true` if the underlying storage has the capacity to store more envelopes.
468    pub fn has_capacity(&self) -> bool {
469        self.stack_provider.has_store_capacity()
470    }
471
472    /// Flushes the envelope buffer.
473    pub async fn flush(&mut self) {
474        let priority_queue = mem::take(&mut self.priority_queue);
475        self.stack_provider
476            .flush(priority_queue.into_iter().map(|(q, _)| q.value))
477            .await;
478    }
479
480    /// Pushes a new [`EnvelopeStack`] with the given [`Envelope`] inserted.
481    async fn push_stack(
482        &mut self,
483        stack_creation_type: StackCreationType,
484        project_key_pair: ProjectKeyPair,
485        envelope: Option<Box<Envelope>>,
486    ) -> Result<(), EnvelopeBufferError> {
487        let received_at = envelope.as_ref().map_or(Utc::now(), |e| e.received_at());
488
489        let mut stack = self
490            .stack_provider
491            .create_stack(stack_creation_type, project_key_pair);
492        if let Some(envelope) = envelope {
493            stack.push(envelope).await?;
494        }
495
496        let previous_entry = self.priority_queue.push(
497            QueueItem {
498                key: project_key_pair,
499                value: stack,
500            },
501            Priority::new(received_at),
502        );
503        debug_assert!(previous_entry.is_none());
504        for project_key in project_key_pair.iter() {
505            self.stacks_by_project
506                .entry(project_key)
507                .or_default()
508                .insert(project_key_pair);
509        }
510        relay_statsd::metric!(
511            gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
512            partition_id = &self.partition_tag
513        );
514
515        Ok(())
516    }
517
518    /// Pops an [`EnvelopeStack`] with the supplied [`EnvelopeBufferError`].
519    fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) {
520        for project_key in project_key_pair.iter() {
521            self.stacks_by_project
522                .get_mut(&project_key)
523                .expect("project_key is missing from lookup")
524                .remove(&project_key_pair);
525        }
526        self.priority_queue.remove(&project_key_pair);
527
528        relay_statsd::metric!(
529            gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
530            partition_id = &self.partition_tag
531        );
532    }
533
534    /// Creates all the [`EnvelopeStack`]s with no data given a set of [`ProjectKeyPair`].
535    async fn load_stacks(&mut self, project_key_pairs: HashSet<ProjectKeyPair>) {
536        for project_key_pair in project_key_pairs {
537            self.push_stack(StackCreationType::Initialization, project_key_pair, None)
538                .await
539                .expect("Pushing an empty stack raised an error");
540        }
541    }
542
543    /// Loads the total count from the store if it takes less than a specified duration.
544    ///
545    /// The total count returned by the store is related to the count of elements that the buffer
546    /// will process, besides the count of elements that will be added and removed during its
547    /// lifecycle
548    async fn load_store_total_count(&mut self) {
549        let total_count = timeout(Duration::from_secs(1), async {
550            self.stack_provider.store_total_count().await
551        })
552        .await;
553        match total_count {
554            Ok(total_count) => {
555                self.total_count = total_count as i64;
556                self.total_count_initialized = true;
557            }
558            Err(error) => {
559                self.total_count_initialized = false;
560                relay_log::error!(
561                    error = &error as &dyn Error,
562                    "failed to load the total envelope count of the store",
563                );
564            }
565        };
566        self.track_total_count();
567    }
568
569    /// Emits a metric to track the total count of envelopes that are in the envelope buffer.
570    fn track_total_count(&self) {
571        let total_count = self.total_count as f64;
572        let initialized = match self.total_count_initialized {
573            true => "true",
574            false => "false",
575        };
576        relay_statsd::metric!(
577            gauge(RelayGauges::BufferEnvelopesCount) = total_count,
578            initialized = initialized,
579            stack_type = self.stack_provider.stack_type(),
580            partition_id = &self.partition_tag
581        );
582    }
583}
584
585/// Contains the state of the first element in the buffer.
586pub enum Peek {
587    Empty,
588    Ready {
589        project_key_pair: ProjectKeyPair,
590        last_received_at: DateTime<Utc>,
591    },
592    NotReady {
593        project_key_pair: ProjectKeyPair,
594        next_project_fetch: Instant,
595        last_received_at: DateTime<Utc>,
596    },
597}
598
599impl Peek {
600    pub fn last_received_at(&self) -> Option<DateTime<Utc>> {
601        match self {
602            Self::Empty => None,
603            Self::Ready {
604                last_received_at, ..
605            }
606            | Self::NotReady {
607                last_received_at, ..
608            } => Some(*last_received_at),
609        }
610    }
611}
612
613#[derive(Debug)]
614struct QueueItem<K, V> {
615    key: K,
616    value: V,
617}
618
619impl<K, V> std::borrow::Borrow<K> for QueueItem<K, V> {
620    fn borrow(&self) -> &K {
621        &self.key
622    }
623}
624
625impl<K: std::hash::Hash, V> std::hash::Hash for QueueItem<K, V> {
626    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
627        self.key.hash(state);
628    }
629}
630
631impl<K: PartialEq, V> PartialEq for QueueItem<K, V> {
632    fn eq(&self, other: &Self) -> bool {
633        self.key == other.key
634    }
635}
636
637impl<K: PartialEq, V> Eq for QueueItem<K, V> {}
638
639#[derive(Debug, Clone)]
640struct Priority {
641    readiness: Readiness,
642    received_at: DateTime<Utc>,
643    next_project_fetch: Instant,
644}
645
646impl Priority {
647    fn new(received_at: DateTime<Utc>) -> Self {
648        Self {
649            readiness: Readiness::new(),
650            received_at,
651            next_project_fetch: Instant::now(),
652        }
653    }
654}
655
656impl Ord for Priority {
657    fn cmp(&self, other: &Self) -> Ordering {
658        match (self.readiness.ready(), other.readiness.ready()) {
659            // Assuming that two priorities differ only w.r.t. the `last_peek`, we want to prioritize
660            // stacks that were the least recently peeked. The rationale behind this is that we want
661            // to keep cycling through different stacks while peeking.
662            (true, true) => self.received_at.cmp(&other.received_at),
663            (true, false) => Ordering::Greater,
664            (false, true) => Ordering::Less,
665            // For non-ready stacks, we invert the priority, such that projects that are not
666            // ready and did not receive envelopes recently can be evicted.
667            (false, false) => self
668                .next_project_fetch
669                .cmp(&other.next_project_fetch)
670                .reverse()
671                .then(self.received_at.cmp(&other.received_at).reverse()),
672        }
673    }
674}
675
676impl PartialOrd for Priority {
677    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
678        Some(self.cmp(other))
679    }
680}
681
682impl PartialEq for Priority {
683    fn eq(&self, other: &Self) -> bool {
684        self.cmp(other).is_eq()
685    }
686}
687
688impl Eq for Priority {}
689
690#[derive(Debug, Clone, Copy)]
691struct Readiness {
692    own_project_ready: bool,
693    sampling_project_ready: bool,
694}
695
696impl Readiness {
697    fn new() -> Self {
698        // Optimistically set ready state to true.
699        // The large majority of stack creations are re-creations after a stack was emptied.
700        Self {
701            own_project_ready: true,
702            sampling_project_ready: true,
703        }
704    }
705
706    fn ready(&self) -> bool {
707        self.own_project_ready && self.sampling_project_ready
708    }
709}
710
711#[cfg(test)]
712mod tests {
713    use relay_common::Dsn;
714    use relay_event_schema::protocol::EventId;
715    use relay_sampling::DynamicSamplingContext;
716    use std::str::FromStr;
717    use std::sync::Arc;
718    use uuid::Uuid;
719
720    use crate::SqliteEnvelopeStore;
721    use crate::envelope::{Item, ItemType};
722    use crate::extractors::RequestMeta;
723    use crate::services::buffer::common::ProjectKeyPair;
724    use crate::services::buffer::envelope_store::sqlite::DatabaseEnvelope;
725    use crate::services::buffer::testutils::utils::mock_envelopes;
726    use crate::utils::MemoryStat;
727
728    use super::*;
729
730    impl Peek {
731        fn is_empty(&self) -> bool {
732            matches!(self, Peek::Empty)
733        }
734    }
735
736    fn new_envelope(
737        own_key: ProjectKey,
738        sampling_key: Option<ProjectKey>,
739        event_id: Option<EventId>,
740    ) -> Box<Envelope> {
741        let mut envelope = Envelope::from_request(
742            None,
743            RequestMeta::new(Dsn::from_str(&format!("http://{own_key}@localhost/1")).unwrap()),
744        );
745        if let Some(sampling_key) = sampling_key {
746            envelope.set_dsc(DynamicSamplingContext {
747                public_key: sampling_key,
748                trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
749                release: None,
750                user: Default::default(),
751                replay_id: None,
752                environment: None,
753                transaction: None,
754                sample_rate: None,
755                sampled: None,
756                other: Default::default(),
757            });
758            envelope.add_item(Item::new(ItemType::Transaction));
759        }
760        if let Some(event_id) = event_id {
761            envelope.set_event_id(event_id);
762        }
763        envelope
764    }
765
766    fn mock_config(path: &str) -> Arc<Config> {
767        Config::from_json_value(serde_json::json!({
768            "spool": {
769                "envelopes": {
770                    "path": path
771                }
772            }
773        }))
774        .unwrap()
775        .into()
776    }
777
778    fn mock_memory_checker() -> MemoryChecker {
779        MemoryChecker::new(MemoryStat::default(), mock_config("my/db/path").clone())
780    }
781
782    async fn peek_received_at(buffer: &mut EnvelopeBuffer<MemoryStackProvider>) -> DateTime<Utc> {
783        buffer.peek().await.unwrap().last_received_at().unwrap()
784    }
785
786    #[tokio::test]
787    async fn test_insert_pop() {
788        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
789
790        let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
791        let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
792        let project_key3 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
793
794        assert!(buffer.pop().await.unwrap().is_none());
795        assert!(buffer.peek().await.unwrap().is_empty());
796
797        let envelope1 = new_envelope(project_key1, None, None);
798        let time1 = envelope1.meta().received_at();
799        buffer.push(envelope1).await.unwrap();
800
801        let envelope2 = new_envelope(project_key2, None, None);
802        let time2 = envelope2.meta().received_at();
803        buffer.push(envelope2).await.unwrap();
804
805        // Both projects are ready, so project 2 is on top (has the newest envelopes):
806        assert_eq!(peek_received_at(&mut buffer).await, time2);
807
808        buffer.mark_ready(&project_key1, false);
809        buffer.mark_ready(&project_key2, false);
810
811        // Both projects are not ready, so project 1 is on top (has the oldest envelopes):
812        assert_eq!(peek_received_at(&mut buffer).await, time1);
813
814        let envelope3 = new_envelope(project_key3, None, None);
815        let time3 = envelope3.meta().received_at();
816        buffer.push(envelope3).await.unwrap();
817        buffer.mark_ready(&project_key3, false);
818
819        // All projects are not ready, so project 1 is on top (has the oldest envelopes):
820        assert_eq!(peek_received_at(&mut buffer).await, time1);
821
822        // After marking a project ready, it goes to the top:
823        buffer.mark_ready(&project_key3, true);
824        assert_eq!(peek_received_at(&mut buffer).await, time3);
825        assert_eq!(
826            buffer.pop().await.unwrap().unwrap().meta().public_key(),
827            project_key3
828        );
829
830        // After popping, project 1 is on top again:
831        assert_eq!(peek_received_at(&mut buffer).await, time1);
832
833        // Mark project 1 as ready (still on top):
834        buffer.mark_ready(&project_key1, true);
835        assert_eq!(peek_received_at(&mut buffer).await, time1);
836
837        // Mark project 2 as ready as well (now on top because most recent):
838        buffer.mark_ready(&project_key2, true);
839        assert_eq!(peek_received_at(&mut buffer).await, time2);
840        assert_eq!(
841            buffer.pop().await.unwrap().unwrap().meta().public_key(),
842            project_key2
843        );
844
845        // Pop last element:
846        assert_eq!(
847            buffer.pop().await.unwrap().unwrap().meta().public_key(),
848            project_key1
849        );
850        assert!(buffer.pop().await.unwrap().is_none());
851        assert!(buffer.peek().await.unwrap().is_empty());
852    }
853
854    #[tokio::test]
855    async fn test_project_internal_order() {
856        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
857
858        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
859
860        let envelope1 = new_envelope(project_key, None, None);
861        let time1 = envelope1.meta().received_at();
862        let envelope2 = new_envelope(project_key, None, None);
863        let time2 = envelope2.meta().received_at();
864
865        assert!(time2 > time1);
866
867        buffer.push(envelope1).await.unwrap();
868        buffer.push(envelope2).await.unwrap();
869
870        assert_eq!(
871            buffer.pop().await.unwrap().unwrap().meta().received_at(),
872            time2
873        );
874        assert_eq!(
875            buffer.pop().await.unwrap().unwrap().meta().received_at(),
876            time1
877        );
878        assert!(buffer.pop().await.unwrap().is_none());
879    }
880
881    #[tokio::test]
882    async fn test_sampling_projects() {
883        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
884
885        let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
886        let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
887
888        let envelope1 = new_envelope(project_key1, None, None);
889        let time1 = envelope1.received_at();
890        buffer.push(envelope1).await.unwrap();
891
892        let envelope2 = new_envelope(project_key2, None, None);
893        let time2 = envelope2.received_at();
894        buffer.push(envelope2).await.unwrap();
895
896        let envelope3 = new_envelope(project_key1, Some(project_key2), None);
897        let time3 = envelope3.meta().received_at();
898        buffer.push(envelope3).await.unwrap();
899
900        buffer.mark_ready(&project_key1, false);
901        buffer.mark_ready(&project_key2, false);
902
903        // Nothing is ready, instant1 is on top:
904        assert_eq!(
905            buffer.peek().await.unwrap().last_received_at().unwrap(),
906            time1
907        );
908
909        // Mark project 2 ready, gets on top:
910        buffer.mark_ready(&project_key2, true);
911        assert_eq!(
912            buffer.peek().await.unwrap().last_received_at().unwrap(),
913            time2
914        );
915
916        // Revert
917        buffer.mark_ready(&project_key2, false);
918        assert_eq!(
919            buffer.peek().await.unwrap().last_received_at().unwrap(),
920            time1
921        );
922
923        // Project 1 ready:
924        buffer.mark_ready(&project_key1, true);
925        assert_eq!(
926            buffer.peek().await.unwrap().last_received_at().unwrap(),
927            time1
928        );
929
930        // when both projects are ready, event no 3 ends up on top:
931        buffer.mark_ready(&project_key2, true);
932        assert_eq!(
933            buffer.pop().await.unwrap().unwrap().meta().received_at(),
934            time3
935        );
936        assert_eq!(
937            buffer.peek().await.unwrap().last_received_at().unwrap(),
938            time2
939        );
940
941        buffer.mark_ready(&project_key2, false);
942        assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time1);
943        assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time2);
944
945        assert!(buffer.pop().await.unwrap().is_none());
946    }
947
948    #[tokio::test]
949    async fn test_project_keys_distinct() {
950        let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
951        let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
952
953        let project_key_pair1 = ProjectKeyPair::new(project_key1, project_key2);
954        let project_key_pair2 = ProjectKeyPair::new(project_key2, project_key1);
955
956        assert_ne!(project_key_pair1, project_key_pair2);
957
958        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
959        buffer
960            .push(new_envelope(project_key1, Some(project_key2), None))
961            .await
962            .unwrap();
963        buffer
964            .push(new_envelope(project_key2, Some(project_key1), None))
965            .await
966            .unwrap();
967        assert_eq!(buffer.priority_queue.len(), 2);
968    }
969
970    #[test]
971    fn test_total_order() {
972        let p1 = Priority {
973            readiness: Readiness {
974                own_project_ready: true,
975                sampling_project_ready: true,
976            },
977            received_at: Utc::now(),
978            next_project_fetch: Instant::now(),
979        };
980        let mut p2 = p1.clone();
981        p2.next_project_fetch += Duration::from_millis(1);
982
983        // Last peek does not matter because project is ready:
984        assert_eq!(p1.cmp(&p2), Ordering::Equal);
985        assert_eq!(p1, p2);
986    }
987
988    #[tokio::test]
989    async fn test_last_peek_internal_order() {
990        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
991
992        let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
993        let event_id_1 = EventId::new();
994        let envelope1 = new_envelope(project_key_1, None, Some(event_id_1));
995        let time1 = envelope1.received_at();
996
997        let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap();
998        let event_id_2 = EventId::new();
999        let envelope2 = new_envelope(project_key_2, None, Some(event_id_2));
1000        let time2 = envelope2.received_at();
1001
1002        buffer.push(envelope1).await.unwrap();
1003        buffer.push(envelope2).await.unwrap();
1004
1005        buffer.mark_ready(&project_key_1, false);
1006        buffer.mark_ready(&project_key_2, false);
1007
1008        // event_id_1 is first element:
1009        let Peek::NotReady {
1010            last_received_at, ..
1011        } = buffer.peek().await.unwrap()
1012        else {
1013            panic!();
1014        };
1015        assert_eq!(last_received_at, time1);
1016
1017        // Second peek returns same element:
1018        let Peek::NotReady {
1019            last_received_at,
1020            project_key_pair,
1021            ..
1022        } = buffer.peek().await.unwrap()
1023        else {
1024            panic!();
1025        };
1026        assert_eq!(last_received_at, time1);
1027        assert_ne!(last_received_at, time2);
1028
1029        buffer.mark_seen(&project_key_pair, Duration::ZERO);
1030
1031        // After mark_seen, event 2 is on top:
1032        let Peek::NotReady {
1033            last_received_at, ..
1034        } = buffer.peek().await.unwrap()
1035        else {
1036            panic!();
1037        };
1038        assert_eq!(last_received_at, time2);
1039        assert_ne!(last_received_at, time1);
1040
1041        let Peek::NotReady {
1042            last_received_at,
1043            project_key_pair,
1044            ..
1045        } = buffer.peek().await.unwrap()
1046        else {
1047            panic!();
1048        };
1049        assert_eq!(last_received_at, time2);
1050        assert_ne!(last_received_at, time1);
1051
1052        buffer.mark_seen(&project_key_pair, Duration::ZERO);
1053
1054        // After another mark_seen, cycle back to event 1:
1055        let Peek::NotReady {
1056            last_received_at, ..
1057        } = buffer.peek().await.unwrap()
1058        else {
1059            panic!();
1060        };
1061        assert_eq!(last_received_at, time1);
1062        assert_ne!(last_received_at, time2);
1063    }
1064
1065    #[tokio::test]
1066    async fn test_initialize_buffer() {
1067        let path = std::env::temp_dir()
1068            .join(Uuid::new_v4().to_string())
1069            .into_os_string()
1070            .into_string()
1071            .unwrap();
1072        let config = mock_config(&path);
1073        let mut store = SqliteEnvelopeStore::prepare(0, &config).await.unwrap();
1074        let mut buffer = EnvelopeBuffer::<SqliteStackProvider>::new(0, &config)
1075            .await
1076            .unwrap();
1077
1078        // We write 5 envelopes to disk so that we can check if they are loaded. These envelopes
1079        // belong to the same project keys, so they belong to the same envelope stack.
1080        let envelopes = mock_envelopes(10);
1081        assert!(
1082            store
1083                .insert_batch(
1084                    envelopes
1085                        .into_iter()
1086                        .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
1087                        .collect::<Vec<_>>()
1088                        .try_into()
1089                        .unwrap()
1090                )
1091                .await
1092                .is_ok()
1093        );
1094
1095        // We assume that the buffer is empty.
1096        assert!(buffer.priority_queue.is_empty());
1097        assert!(buffer.stacks_by_project.is_empty());
1098
1099        buffer.initialize().await;
1100
1101        // We assume that we loaded only 1 envelope stack, because of the project keys combinations
1102        // of the envelopes we inserted above.
1103        assert_eq!(buffer.priority_queue.len(), 1);
1104        // We expect to have an entry per project key, since we have 1 pair, the total entries
1105        // should be 2.
1106        assert_eq!(buffer.stacks_by_project.len(), 2);
1107    }
1108}