relay_server/services/buffer/envelope_buffer/
mod.rs

1use std::cmp::Ordering;
2use std::collections::BTreeSet;
3use std::convert::Infallible;
4use std::error::Error;
5use std::mem;
6use std::time::Duration;
7
8use chrono::{DateTime, Utc};
9use hashbrown::HashSet;
10use relay_base_schema::project::ProjectKey;
11use relay_config::Config;
12use tokio::time::{timeout, Instant};
13
14use crate::envelope::Envelope;
15use crate::envelope::Item;
16use crate::services::buffer::common::ProjectKeyPair;
17use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
18use crate::services::buffer::envelope_stack::EnvelopeStack;
19use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError;
20use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
21use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
22use crate::services::buffer::stack_provider::{StackCreationType, StackProvider};
23use crate::statsd::{RelayGauges, RelayHistograms, RelayTimers};
24use crate::utils::MemoryChecker;
25
26/// Polymorphic envelope buffering interface.
27///
28/// The underlying buffer can either be disk-based or memory-based,
29/// depending on the given configuration.
30///
31/// NOTE: This is implemented as an enum because a trait object with async methods would not be
32/// object safe.
33#[derive(Debug)]
34#[allow(private_interfaces)]
35pub enum PolymorphicEnvelopeBuffer {
36    /// An enveloper buffer that uses in-memory envelopes stacks.
37    InMemory(EnvelopeBuffer<MemoryStackProvider>),
38    /// An enveloper buffer that uses sqlite envelopes stacks.
39    Sqlite(EnvelopeBuffer<SqliteStackProvider>),
40}
41
42impl PolymorphicEnvelopeBuffer {
43    /// Returns true if the implementation stores all envelopes in RAM.
44    pub fn is_memory(&self) -> bool {
45        match self {
46            PolymorphicEnvelopeBuffer::InMemory(_) => true,
47            PolymorphicEnvelopeBuffer::Sqlite(_) => false,
48        }
49    }
50
51    /// Creates either a memory-based or a disk-based envelope buffer,
52    /// depending on the given configuration.
53    pub async fn from_config(
54        partition_id: u8,
55        config: &Config,
56        memory_checker: MemoryChecker,
57    ) -> Result<Self, EnvelopeBufferError> {
58        let buffer = if config.spool_envelopes_path(partition_id).is_some() {
59            relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer");
60            let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(partition_id, config).await?;
61            Self::Sqlite(buffer)
62        } else {
63            relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer");
64            let buffer = EnvelopeBuffer::<MemoryStackProvider>::new(partition_id, memory_checker);
65            Self::InMemory(buffer)
66        };
67
68        Ok(buffer)
69    }
70
71    /// Initializes the envelope buffer.
72    pub async fn initialize(&mut self) {
73        match self {
74            PolymorphicEnvelopeBuffer::InMemory(buffer) => buffer.initialize().await,
75            PolymorphicEnvelopeBuffer::Sqlite(buffer) => buffer.initialize().await,
76        }
77    }
78
79    /// Adds an envelope to the buffer.
80    pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
81        relay_statsd::metric!(
82            histogram(RelayHistograms::BufferEnvelopeBodySize) =
83                envelope.items().map(Item::len).sum::<usize>() as u64,
84            partition_id = self.partition_tag()
85        );
86
87        relay_statsd::metric!(
88            timer(RelayTimers::BufferPush),
89            partition_id = self.partition_tag(),
90            {
91                match self {
92                    Self::Sqlite(buffer) => buffer.push(envelope).await,
93                    Self::InMemory(buffer) => buffer.push(envelope).await,
94                }?;
95            }
96        );
97        Ok(())
98    }
99
100    /// Returns a reference to the next-in-line envelope.
101    pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
102        relay_statsd::metric!(
103            timer(RelayTimers::BufferPeek),
104            partition_id = self.partition_tag(),
105            {
106                match self {
107                    Self::Sqlite(buffer) => buffer.peek().await,
108                    Self::InMemory(buffer) => buffer.peek().await,
109                }
110            }
111        )
112    }
113
114    /// Pops the next-in-line envelope.
115    pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
116        let envelope = relay_statsd::metric!(
117            timer(RelayTimers::BufferPop),
118            partition_id = self.partition_tag(),
119            {
120                match self {
121                    Self::Sqlite(buffer) => buffer.pop().await,
122                    Self::InMemory(buffer) => buffer.pop().await,
123                }?
124            }
125        );
126        Ok(envelope)
127    }
128
129    /// Marks a project as ready or not ready.
130    ///
131    /// The buffer re-prioritizes its envelopes based on this information.
132    /// Returns `true` if at least one priority was changed.
133    pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
134        relay_log::trace!(
135            project_key = project.as_str(),
136            "buffer marked {}",
137            if is_ready { "ready" } else { "not ready" }
138        );
139        match self {
140            Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready),
141            Self::InMemory(buffer) => buffer.mark_ready(project, is_ready),
142        }
143    }
144
145    /// Marks a stack as seen.
146    ///
147    /// Non-ready stacks are deprioritized when they are marked as seen, such that
148    /// the next call to `.peek()` will look at a different stack. This prevents
149    /// head-of-line blocking.
150    pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
151        match self {
152            Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
153            Self::InMemory(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
154        }
155    }
156
157    /// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s.
158    pub fn has_capacity(&self) -> bool {
159        match self {
160            Self::Sqlite(buffer) => buffer.has_capacity(),
161            Self::InMemory(buffer) => buffer.has_capacity(),
162        }
163    }
164
165    /// Returns the total number of envelopes that have been spooled since the startup. It does
166    /// not include the count that existed in a persistent spooler before.
167    pub fn item_count(&self) -> u64 {
168        match self {
169            Self::Sqlite(buffer) => buffer.tracked_count,
170            Self::InMemory(buffer) => buffer.tracked_count,
171        }
172    }
173
174    /// Returns the total number of bytes that the spooler storage uses or `None` if the number
175    /// cannot be reliably determined.
176    pub fn total_size(&self) -> Option<u64> {
177        match self {
178            Self::Sqlite(buffer) => buffer.stack_provider.total_size(),
179            Self::InMemory(buffer) => buffer.stack_provider.total_size(),
180        }
181    }
182
183    /// Shuts down the [`PolymorphicEnvelopeBuffer`].
184    pub async fn shutdown(&mut self) -> bool {
185        // Currently, we want to flush the buffer only for disk, since the in memory implementation
186        // tries to not do anything and pop as many elements as possible within the shutdown
187        // timeout.
188        let Self::Sqlite(buffer) = self else {
189            relay_log::trace!("PolymorphicEnvelopeBuffer: shutdown procedure not needed");
190            return false;
191        };
192        buffer.flush().await;
193
194        true
195    }
196
197    /// Returns the partition tag for this [`PolymorphicEnvelopeBuffer`].
198    fn partition_tag(&self) -> &str {
199        match self {
200            PolymorphicEnvelopeBuffer::InMemory(buffer) => &buffer.partition_tag,
201            PolymorphicEnvelopeBuffer::Sqlite(buffer) => &buffer.partition_tag,
202        }
203    }
204}
205
206/// Error that occurs while interacting with the envelope buffer.
207#[derive(Debug, thiserror::Error)]
208pub enum EnvelopeBufferError {
209    #[error("sqlite")]
210    SqliteStore(#[from] SqliteEnvelopeStoreError),
211
212    #[error("sqlite")]
213    SqliteStack(#[from] SqliteEnvelopeStackError),
214
215    #[error("failed to push envelope to the buffer")]
216    PushFailed,
217}
218
219impl From<Infallible> for EnvelopeBufferError {
220    fn from(value: Infallible) -> Self {
221        match value {}
222    }
223}
224
225/// An envelope buffer that holds an individual stack for each project/sampling project combination.
226///
227/// Envelope stacks are organized in a priority queue, and are re-prioritized every time an envelope
228/// is pushed, popped, or when a project becomes ready.
229#[derive(Debug)]
230struct EnvelopeBuffer<P: StackProvider> {
231    /// The central priority queue.
232    priority_queue: priority_queue::PriorityQueue<QueueItem<ProjectKeyPair, P::Stack>, Priority>,
233    /// A lookup table to find all stacks involving a project.
234    stacks_by_project: hashbrown::HashMap<ProjectKey, BTreeSet<ProjectKeyPair>>,
235    /// A provider of stacks that provides utilities to create stacks, check their capacity...
236    ///
237    /// This indirection is needed because different stack implementations might need different
238    /// initialization (e.g. a database connection).
239    stack_provider: P,
240    /// The total count of envelopes that the buffer is working with.
241    ///
242    /// Note that this count is not meant to be perfectly accurate since the initialization of the
243    /// count might not succeed if it takes more than a set timeout. For example, if we load the
244    /// count of all envelopes from disk, and it takes more than the time we set, we will mark the
245    /// initial count as 0 and just count incoming and outgoing envelopes from the buffer.
246    total_count: i64,
247    /// The total count of envelopes that the buffer is working with ignoring envelopes that
248    /// were previously stored on disk.
249    ///
250    /// On startup this will always be 0 and will only count incoming envelopes. If a reliable
251    /// count of currently buffered envelopes is required, prefer this over `total_count`
252    tracked_count: u64,
253    /// Whether the count initialization succeeded or not.
254    ///
255    /// This boolean is just used for tagging the metric that tracks the total count of envelopes
256    /// in the buffer.
257    total_count_initialized: bool,
258    /// The tag value of this partition which is used for reporting purposes.
259    partition_tag: String,
260}
261
262impl EnvelopeBuffer<MemoryStackProvider> {
263    /// Creates an empty memory-based buffer.
264    pub fn new(partition_id: u8, memory_checker: MemoryChecker) -> Self {
265        Self {
266            stacks_by_project: Default::default(),
267            priority_queue: Default::default(),
268            stack_provider: MemoryStackProvider::new(memory_checker),
269            total_count: 0,
270            tracked_count: 0,
271            total_count_initialized: false,
272            partition_tag: partition_id.to_string(),
273        }
274    }
275}
276
277#[allow(dead_code)]
278impl EnvelopeBuffer<SqliteStackProvider> {
279    /// Creates an empty sqlite-based buffer.
280    pub async fn new(partition_id: u8, config: &Config) -> Result<Self, EnvelopeBufferError> {
281        Ok(Self {
282            stacks_by_project: Default::default(),
283            priority_queue: Default::default(),
284            stack_provider: SqliteStackProvider::new(partition_id, config).await?,
285            total_count: 0,
286            tracked_count: 0,
287            total_count_initialized: false,
288            partition_tag: partition_id.to_string(),
289        })
290    }
291}
292
293impl<P: StackProvider> EnvelopeBuffer<P>
294where
295    EnvelopeBufferError: From<<P::Stack as EnvelopeStack>::Error>,
296{
297    /// Initializes the [`EnvelopeBuffer`] given the initialization state from the
298    /// [`StackProvider`].
299    pub async fn initialize(&mut self) {
300        relay_statsd::metric!(
301            timer(RelayTimers::BufferInitialization),
302            partition_id = &self.partition_tag,
303            {
304                let initialization_state = self.stack_provider.initialize().await;
305                self.load_stacks(initialization_state.project_key_pairs)
306                    .await;
307                self.load_store_total_count().await;
308            }
309        );
310    }
311
312    /// Pushes an envelope to the appropriate envelope stack and re-prioritizes the stack.
313    ///
314    /// If the envelope stack does not exist, a new stack is pushed to the priority queue.
315    /// The priority of the stack is updated with the envelope's received_at time.
316    pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
317        let received_at = envelope.received_at();
318
319        let project_key_pair = ProjectKeyPair::from_envelope(&envelope);
320        if let Some((
321            QueueItem {
322                key: _,
323                value: stack,
324            },
325            _,
326        )) = self.priority_queue.get_mut(&project_key_pair)
327        {
328            stack.push(envelope).await?;
329        } else {
330            // Since we have initialization code that creates all the necessary stacks, we assume
331            // that any new stack that is added during the envelope buffer's lifecycle, is recreated.
332            self.push_stack(
333                StackCreationType::New,
334                ProjectKeyPair::from_envelope(&envelope),
335                Some(envelope),
336            )
337            .await?;
338        }
339        self.priority_queue
340            .change_priority_by(&project_key_pair, |prio| {
341                prio.received_at = received_at;
342            });
343
344        self.total_count += 1;
345        self.tracked_count += 1;
346        self.track_total_count();
347
348        Ok(())
349    }
350
351    /// Returns a reference to the next-in-line envelope, if one exists.
352    pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
353        let Some((
354            QueueItem {
355                key: project_key_pair,
356                value: stack,
357            },
358            Priority {
359                readiness,
360                next_project_fetch,
361                ..
362            },
363        )) = self.priority_queue.peek_mut()
364        else {
365            return Ok(Peek::Empty);
366        };
367
368        let ready = readiness.ready();
369
370        Ok(match (stack.peek().await?, ready) {
371            (None, _) => Peek::Empty,
372            (Some(last_received_at), true) => Peek::Ready {
373                project_key_pair: *project_key_pair,
374                last_received_at,
375            },
376            (Some(last_received_at), false) => Peek::NotReady {
377                project_key_pair: *project_key_pair,
378                next_project_fetch: *next_project_fetch,
379                last_received_at,
380            },
381        })
382    }
383
384    /// Returns the next-in-line envelope, if one exists.
385    ///
386    /// The priority of the envelope's stack is updated with the next envelope's received_at
387    /// time. If the stack is empty after popping, it is removed from the priority queue.
388    pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
389        let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else {
390            return Ok(None);
391        };
392        let project_key_pair = *key;
393        let envelope = stack.pop().await?.expect("found an empty stack");
394
395        let last_received_at = stack.peek().await?;
396
397        match last_received_at {
398            None => {
399                self.pop_stack(project_key_pair);
400            }
401            Some(last_received_at) => {
402                self.priority_queue
403                    .change_priority_by(&project_key_pair, |prio| {
404                        prio.received_at = last_received_at;
405                    });
406            }
407        }
408
409        // We are fine with the count going negative, since it represents that more data was popped,
410        // than it was initially counted, meaning that we had a wrong total count from
411        // initialization.
412        self.total_count -= 1;
413        self.tracked_count = self.tracked_count.saturating_sub(1);
414        self.track_total_count();
415
416        Ok(Some(envelope))
417    }
418
419    /// Re-prioritizes all stacks that involve the given project key by setting it to "ready".
420    ///
421    /// Returns `true` if at least one priority was changed.
422    pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
423        let mut changed = false;
424        if let Some(project_key_pairs) = self.stacks_by_project.get(project) {
425            for project_key_pair in project_key_pairs {
426                self.priority_queue
427                    .change_priority_by(project_key_pair, |stack| {
428                        let mut found = false;
429                        for (subkey, readiness) in [
430                            (
431                                project_key_pair.own_key,
432                                &mut stack.readiness.own_project_ready,
433                            ),
434                            (
435                                project_key_pair.sampling_key,
436                                &mut stack.readiness.sampling_project_ready,
437                            ),
438                        ] {
439                            if subkey == *project {
440                                found = true;
441                                if *readiness != is_ready {
442                                    changed = true;
443                                    *readiness = is_ready;
444                                }
445                            }
446                        }
447                        debug_assert!(found);
448                    });
449            }
450        }
451
452        changed
453    }
454
455    /// Marks a stack as seen.
456    ///
457    /// Non-ready stacks are deprioritized when they are marked as seen, such that
458    /// the next call to `.peek()` will look at a different stack. This prevents
459    /// head-of-line blocking.
460    pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
461        self.priority_queue
462            .change_priority_by(project_key_pair, |stack| {
463                // We use the next project fetch to debounce project fetching and avoid head of
464                // line blocking of non-ready stacks.
465                stack.next_project_fetch = Instant::now() + next_fetch;
466            });
467    }
468
469    /// Returns `true` if the underlying storage has the capacity to store more envelopes.
470    pub fn has_capacity(&self) -> bool {
471        self.stack_provider.has_store_capacity()
472    }
473
474    /// Flushes the envelope buffer.
475    pub async fn flush(&mut self) {
476        let priority_queue = mem::take(&mut self.priority_queue);
477        self.stack_provider
478            .flush(priority_queue.into_iter().map(|(q, _)| q.value))
479            .await;
480    }
481
482    /// Pushes a new [`EnvelopeStack`] with the given [`Envelope`] inserted.
483    async fn push_stack(
484        &mut self,
485        stack_creation_type: StackCreationType,
486        project_key_pair: ProjectKeyPair,
487        envelope: Option<Box<Envelope>>,
488    ) -> Result<(), EnvelopeBufferError> {
489        let received_at = envelope.as_ref().map_or(Utc::now(), |e| e.received_at());
490
491        let mut stack = self
492            .stack_provider
493            .create_stack(stack_creation_type, project_key_pair);
494        if let Some(envelope) = envelope {
495            stack.push(envelope).await?;
496        }
497
498        let previous_entry = self.priority_queue.push(
499            QueueItem {
500                key: project_key_pair,
501                value: stack,
502            },
503            Priority::new(received_at),
504        );
505        debug_assert!(previous_entry.is_none());
506        for project_key in project_key_pair.iter() {
507            self.stacks_by_project
508                .entry(project_key)
509                .or_default()
510                .insert(project_key_pair);
511        }
512        relay_statsd::metric!(
513            gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
514            partition_id = &self.partition_tag
515        );
516
517        Ok(())
518    }
519
520    /// Pops an [`EnvelopeStack`] with the supplied [`EnvelopeBufferError`].
521    fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) {
522        for project_key in project_key_pair.iter() {
523            self.stacks_by_project
524                .get_mut(&project_key)
525                .expect("project_key is missing from lookup")
526                .remove(&project_key_pair);
527        }
528        self.priority_queue.remove(&project_key_pair);
529
530        relay_statsd::metric!(
531            gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
532            partition_id = &self.partition_tag
533        );
534    }
535
536    /// Creates all the [`EnvelopeStack`]s with no data given a set of [`ProjectKeyPair`].
537    async fn load_stacks(&mut self, project_key_pairs: HashSet<ProjectKeyPair>) {
538        for project_key_pair in project_key_pairs {
539            self.push_stack(StackCreationType::Initialization, project_key_pair, None)
540                .await
541                .expect("Pushing an empty stack raised an error");
542        }
543    }
544
545    /// Loads the total count from the store if it takes less than a specified duration.
546    ///
547    /// The total count returned by the store is related to the count of elements that the buffer
548    /// will process, besides the count of elements that will be added and removed during its
549    /// lifecycle
550    async fn load_store_total_count(&mut self) {
551        let total_count = timeout(Duration::from_secs(1), async {
552            self.stack_provider.store_total_count().await
553        })
554        .await;
555        match total_count {
556            Ok(total_count) => {
557                self.total_count = total_count as i64;
558                self.total_count_initialized = true;
559            }
560            Err(error) => {
561                self.total_count_initialized = false;
562                relay_log::error!(
563                    error = &error as &dyn Error,
564                    "failed to load the total envelope count of the store",
565                );
566            }
567        };
568        self.track_total_count();
569    }
570
571    /// Emits a metric to track the total count of envelopes that are in the envelope buffer.
572    fn track_total_count(&self) {
573        let total_count = self.total_count as f64;
574        let initialized = match self.total_count_initialized {
575            true => "true",
576            false => "false",
577        };
578        relay_statsd::metric!(
579            histogram(RelayHistograms::BufferEnvelopesCount) = total_count,
580            initialized = initialized,
581            stack_type = self.stack_provider.stack_type(),
582            partition_id = &self.partition_tag
583        );
584    }
585}
586
587/// Contains the state of the first element in the buffer.
588pub enum Peek {
589    Empty,
590    Ready {
591        project_key_pair: ProjectKeyPair,
592        last_received_at: DateTime<Utc>,
593    },
594    NotReady {
595        project_key_pair: ProjectKeyPair,
596        next_project_fetch: Instant,
597        last_received_at: DateTime<Utc>,
598    },
599}
600
601impl Peek {
602    pub fn last_received_at(&self) -> Option<DateTime<Utc>> {
603        match self {
604            Self::Empty => None,
605            Self::Ready {
606                last_received_at, ..
607            }
608            | Self::NotReady {
609                last_received_at, ..
610            } => Some(*last_received_at),
611        }
612    }
613}
614
615#[derive(Debug)]
616struct QueueItem<K, V> {
617    key: K,
618    value: V,
619}
620
621impl<K, V> std::borrow::Borrow<K> for QueueItem<K, V> {
622    fn borrow(&self) -> &K {
623        &self.key
624    }
625}
626
627impl<K: std::hash::Hash, V> std::hash::Hash for QueueItem<K, V> {
628    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
629        self.key.hash(state);
630    }
631}
632
633impl<K: PartialEq, V> PartialEq for QueueItem<K, V> {
634    fn eq(&self, other: &Self) -> bool {
635        self.key == other.key
636    }
637}
638
639impl<K: PartialEq, V> Eq for QueueItem<K, V> {}
640
641#[derive(Debug, Clone)]
642struct Priority {
643    readiness: Readiness,
644    received_at: DateTime<Utc>,
645    next_project_fetch: Instant,
646}
647
648impl Priority {
649    fn new(received_at: DateTime<Utc>) -> Self {
650        Self {
651            readiness: Readiness::new(),
652            received_at,
653            next_project_fetch: Instant::now(),
654        }
655    }
656}
657
658impl Ord for Priority {
659    fn cmp(&self, other: &Self) -> Ordering {
660        match (self.readiness.ready(), other.readiness.ready()) {
661            // Assuming that two priorities differ only w.r.t. the `last_peek`, we want to prioritize
662            // stacks that were the least recently peeked. The rationale behind this is that we want
663            // to keep cycling through different stacks while peeking.
664            (true, true) => self.received_at.cmp(&other.received_at),
665            (true, false) => Ordering::Greater,
666            (false, true) => Ordering::Less,
667            // For non-ready stacks, we invert the priority, such that projects that are not
668            // ready and did not receive envelopes recently can be evicted.
669            (false, false) => self
670                .next_project_fetch
671                .cmp(&other.next_project_fetch)
672                .reverse()
673                .then(self.received_at.cmp(&other.received_at).reverse()),
674        }
675    }
676}
677
678impl PartialOrd for Priority {
679    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
680        Some(self.cmp(other))
681    }
682}
683
684impl PartialEq for Priority {
685    fn eq(&self, other: &Self) -> bool {
686        self.cmp(other).is_eq()
687    }
688}
689
690impl Eq for Priority {}
691
692#[derive(Debug, Clone, Copy)]
693struct Readiness {
694    own_project_ready: bool,
695    sampling_project_ready: bool,
696}
697
698impl Readiness {
699    fn new() -> Self {
700        // Optimistically set ready state to true.
701        // The large majority of stack creations are re-creations after a stack was emptied.
702        Self {
703            own_project_ready: true,
704            sampling_project_ready: true,
705        }
706    }
707
708    fn ready(&self) -> bool {
709        self.own_project_ready && self.sampling_project_ready
710    }
711}
712
713#[cfg(test)]
714mod tests {
715    use relay_common::Dsn;
716    use relay_event_schema::protocol::EventId;
717    use relay_sampling::DynamicSamplingContext;
718    use std::str::FromStr;
719    use std::sync::Arc;
720    use uuid::Uuid;
721
722    use crate::envelope::{Item, ItemType};
723    use crate::extractors::RequestMeta;
724    use crate::services::buffer::common::ProjectKeyPair;
725    use crate::services::buffer::envelope_store::sqlite::DatabaseEnvelope;
726    use crate::services::buffer::testutils::utils::mock_envelopes;
727    use crate::utils::MemoryStat;
728    use crate::SqliteEnvelopeStore;
729
730    use super::*;
731
732    impl Peek {
733        fn is_empty(&self) -> bool {
734            matches!(self, Peek::Empty)
735        }
736    }
737
738    fn new_envelope(
739        own_key: ProjectKey,
740        sampling_key: Option<ProjectKey>,
741        event_id: Option<EventId>,
742    ) -> Box<Envelope> {
743        let mut envelope = Envelope::from_request(
744            None,
745            RequestMeta::new(Dsn::from_str(&format!("http://{own_key}@localhost/1")).unwrap()),
746        );
747        if let Some(sampling_key) = sampling_key {
748            envelope.set_dsc(DynamicSamplingContext {
749                public_key: sampling_key,
750                trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
751                release: None,
752                user: Default::default(),
753                replay_id: None,
754                environment: None,
755                transaction: None,
756                sample_rate: None,
757                sampled: None,
758                other: Default::default(),
759            });
760            envelope.add_item(Item::new(ItemType::Transaction));
761        }
762        if let Some(event_id) = event_id {
763            envelope.set_event_id(event_id);
764        }
765        envelope
766    }
767
768    fn mock_config(path: &str) -> Arc<Config> {
769        Config::from_json_value(serde_json::json!({
770            "spool": {
771                "envelopes": {
772                    "path": path
773                }
774            }
775        }))
776        .unwrap()
777        .into()
778    }
779
780    fn mock_memory_checker() -> MemoryChecker {
781        MemoryChecker::new(MemoryStat::default(), mock_config("my/db/path").clone())
782    }
783
784    async fn peek_received_at(buffer: &mut EnvelopeBuffer<MemoryStackProvider>) -> DateTime<Utc> {
785        buffer.peek().await.unwrap().last_received_at().unwrap()
786    }
787
788    #[tokio::test]
789    async fn test_insert_pop() {
790        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
791
792        let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
793        let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
794        let project_key3 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
795
796        assert!(buffer.pop().await.unwrap().is_none());
797        assert!(buffer.peek().await.unwrap().is_empty());
798
799        let envelope1 = new_envelope(project_key1, None, None);
800        let time1 = envelope1.meta().received_at();
801        buffer.push(envelope1).await.unwrap();
802
803        let envelope2 = new_envelope(project_key2, None, None);
804        let time2 = envelope2.meta().received_at();
805        buffer.push(envelope2).await.unwrap();
806
807        // Both projects are ready, so project 2 is on top (has the newest envelopes):
808        assert_eq!(peek_received_at(&mut buffer).await, time2);
809
810        buffer.mark_ready(&project_key1, false);
811        buffer.mark_ready(&project_key2, false);
812
813        // Both projects are not ready, so project 1 is on top (has the oldest envelopes):
814        assert_eq!(peek_received_at(&mut buffer).await, time1);
815
816        let envelope3 = new_envelope(project_key3, None, None);
817        let time3 = envelope3.meta().received_at();
818        buffer.push(envelope3).await.unwrap();
819        buffer.mark_ready(&project_key3, false);
820
821        // All projects are not ready, so project 1 is on top (has the oldest envelopes):
822        assert_eq!(peek_received_at(&mut buffer).await, time1);
823
824        // After marking a project ready, it goes to the top:
825        buffer.mark_ready(&project_key3, true);
826        assert_eq!(peek_received_at(&mut buffer).await, time3);
827        assert_eq!(
828            buffer.pop().await.unwrap().unwrap().meta().public_key(),
829            project_key3
830        );
831
832        // After popping, project 1 is on top again:
833        assert_eq!(peek_received_at(&mut buffer).await, time1);
834
835        // Mark project 1 as ready (still on top):
836        buffer.mark_ready(&project_key1, true);
837        assert_eq!(peek_received_at(&mut buffer).await, time1);
838
839        // Mark project 2 as ready as well (now on top because most recent):
840        buffer.mark_ready(&project_key2, true);
841        assert_eq!(peek_received_at(&mut buffer).await, time2);
842        assert_eq!(
843            buffer.pop().await.unwrap().unwrap().meta().public_key(),
844            project_key2
845        );
846
847        // Pop last element:
848        assert_eq!(
849            buffer.pop().await.unwrap().unwrap().meta().public_key(),
850            project_key1
851        );
852        assert!(buffer.pop().await.unwrap().is_none());
853        assert!(buffer.peek().await.unwrap().is_empty());
854    }
855
856    #[tokio::test]
857    async fn test_project_internal_order() {
858        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
859
860        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
861
862        let envelope1 = new_envelope(project_key, None, None);
863        let time1 = envelope1.meta().received_at();
864        let envelope2 = new_envelope(project_key, None, None);
865        let time2 = envelope2.meta().received_at();
866
867        assert!(time2 > time1);
868
869        buffer.push(envelope1).await.unwrap();
870        buffer.push(envelope2).await.unwrap();
871
872        assert_eq!(
873            buffer.pop().await.unwrap().unwrap().meta().received_at(),
874            time2
875        );
876        assert_eq!(
877            buffer.pop().await.unwrap().unwrap().meta().received_at(),
878            time1
879        );
880        assert!(buffer.pop().await.unwrap().is_none());
881    }
882
883    #[tokio::test]
884    async fn test_sampling_projects() {
885        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
886
887        let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
888        let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
889
890        let envelope1 = new_envelope(project_key1, None, None);
891        let time1 = envelope1.received_at();
892        buffer.push(envelope1).await.unwrap();
893
894        let envelope2 = new_envelope(project_key2, None, None);
895        let time2 = envelope2.received_at();
896        buffer.push(envelope2).await.unwrap();
897
898        let envelope3 = new_envelope(project_key1, Some(project_key2), None);
899        let time3 = envelope3.meta().received_at();
900        buffer.push(envelope3).await.unwrap();
901
902        buffer.mark_ready(&project_key1, false);
903        buffer.mark_ready(&project_key2, false);
904
905        // Nothing is ready, instant1 is on top:
906        assert_eq!(
907            buffer.peek().await.unwrap().last_received_at().unwrap(),
908            time1
909        );
910
911        // Mark project 2 ready, gets on top:
912        buffer.mark_ready(&project_key2, true);
913        assert_eq!(
914            buffer.peek().await.unwrap().last_received_at().unwrap(),
915            time2
916        );
917
918        // Revert
919        buffer.mark_ready(&project_key2, false);
920        assert_eq!(
921            buffer.peek().await.unwrap().last_received_at().unwrap(),
922            time1
923        );
924
925        // Project 1 ready:
926        buffer.mark_ready(&project_key1, true);
927        assert_eq!(
928            buffer.peek().await.unwrap().last_received_at().unwrap(),
929            time1
930        );
931
932        // when both projects are ready, event no 3 ends up on top:
933        buffer.mark_ready(&project_key2, true);
934        assert_eq!(
935            buffer.pop().await.unwrap().unwrap().meta().received_at(),
936            time3
937        );
938        assert_eq!(
939            buffer.peek().await.unwrap().last_received_at().unwrap(),
940            time2
941        );
942
943        buffer.mark_ready(&project_key2, false);
944        assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time1);
945        assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time2);
946
947        assert!(buffer.pop().await.unwrap().is_none());
948    }
949
950    #[tokio::test]
951    async fn test_project_keys_distinct() {
952        let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
953        let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
954
955        let project_key_pair1 = ProjectKeyPair::new(project_key1, project_key2);
956        let project_key_pair2 = ProjectKeyPair::new(project_key2, project_key1);
957
958        assert_ne!(project_key_pair1, project_key_pair2);
959
960        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
961        buffer
962            .push(new_envelope(project_key1, Some(project_key2), None))
963            .await
964            .unwrap();
965        buffer
966            .push(new_envelope(project_key2, Some(project_key1), None))
967            .await
968            .unwrap();
969        assert_eq!(buffer.priority_queue.len(), 2);
970    }
971
972    #[test]
973    fn test_total_order() {
974        let p1 = Priority {
975            readiness: Readiness {
976                own_project_ready: true,
977                sampling_project_ready: true,
978            },
979            received_at: Utc::now(),
980            next_project_fetch: Instant::now(),
981        };
982        let mut p2 = p1.clone();
983        p2.next_project_fetch += Duration::from_millis(1);
984
985        // Last peek does not matter because project is ready:
986        assert_eq!(p1.cmp(&p2), Ordering::Equal);
987        assert_eq!(p1, p2);
988    }
989
990    #[tokio::test]
991    async fn test_last_peek_internal_order() {
992        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
993
994        let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
995        let event_id_1 = EventId::new();
996        let envelope1 = new_envelope(project_key_1, None, Some(event_id_1));
997        let time1 = envelope1.received_at();
998
999        let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap();
1000        let event_id_2 = EventId::new();
1001        let envelope2 = new_envelope(project_key_2, None, Some(event_id_2));
1002        let time2 = envelope2.received_at();
1003
1004        buffer.push(envelope1).await.unwrap();
1005        buffer.push(envelope2).await.unwrap();
1006
1007        buffer.mark_ready(&project_key_1, false);
1008        buffer.mark_ready(&project_key_2, false);
1009
1010        // event_id_1 is first element:
1011        let Peek::NotReady {
1012            last_received_at, ..
1013        } = buffer.peek().await.unwrap()
1014        else {
1015            panic!();
1016        };
1017        assert_eq!(last_received_at, time1);
1018
1019        // Second peek returns same element:
1020        let Peek::NotReady {
1021            last_received_at,
1022            project_key_pair,
1023            ..
1024        } = buffer.peek().await.unwrap()
1025        else {
1026            panic!();
1027        };
1028        assert_eq!(last_received_at, time1);
1029        assert_ne!(last_received_at, time2);
1030
1031        buffer.mark_seen(&project_key_pair, Duration::ZERO);
1032
1033        // After mark_seen, event 2 is on top:
1034        let Peek::NotReady {
1035            last_received_at, ..
1036        } = buffer.peek().await.unwrap()
1037        else {
1038            panic!();
1039        };
1040        assert_eq!(last_received_at, time2);
1041        assert_ne!(last_received_at, time1);
1042
1043        let Peek::NotReady {
1044            last_received_at,
1045            project_key_pair,
1046            ..
1047        } = buffer.peek().await.unwrap()
1048        else {
1049            panic!();
1050        };
1051        assert_eq!(last_received_at, time2);
1052        assert_ne!(last_received_at, time1);
1053
1054        buffer.mark_seen(&project_key_pair, Duration::ZERO);
1055
1056        // After another mark_seen, cycle back to event 1:
1057        let Peek::NotReady {
1058            last_received_at, ..
1059        } = buffer.peek().await.unwrap()
1060        else {
1061            panic!();
1062        };
1063        assert_eq!(last_received_at, time1);
1064        assert_ne!(last_received_at, time2);
1065    }
1066
1067    #[tokio::test]
1068    async fn test_initialize_buffer() {
1069        let path = std::env::temp_dir()
1070            .join(Uuid::new_v4().to_string())
1071            .into_os_string()
1072            .into_string()
1073            .unwrap();
1074        let config = mock_config(&path);
1075        let mut store = SqliteEnvelopeStore::prepare(0, &config).await.unwrap();
1076        let mut buffer = EnvelopeBuffer::<SqliteStackProvider>::new(0, &config)
1077            .await
1078            .unwrap();
1079
1080        // We write 5 envelopes to disk so that we can check if they are loaded. These envelopes
1081        // belong to the same project keys, so they belong to the same envelope stack.
1082        let envelopes = mock_envelopes(10);
1083        assert!(store
1084            .insert_batch(
1085                envelopes
1086                    .into_iter()
1087                    .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
1088                    .collect::<Vec<_>>()
1089                    .try_into()
1090                    .unwrap()
1091            )
1092            .await
1093            .is_ok());
1094
1095        // We assume that the buffer is empty.
1096        assert!(buffer.priority_queue.is_empty());
1097        assert!(buffer.stacks_by_project.is_empty());
1098
1099        buffer.initialize().await;
1100
1101        // We assume that we loaded only 1 envelope stack, because of the project keys combinations
1102        // of the envelopes we inserted above.
1103        assert_eq!(buffer.priority_queue.len(), 1);
1104        // We expect to have an entry per project key, since we have 1 pair, the total entries
1105        // should be 2.
1106        assert_eq!(buffer.stacks_by_project.len(), 2);
1107    }
1108}