relay_server/services/buffer/envelope_buffer/
mod.rs

1use std::cmp::Ordering;
2use std::collections::BTreeSet;
3use std::convert::Infallible;
4use std::error::Error;
5use std::mem;
6use std::time::Duration;
7
8use chrono::{DateTime, Utc};
9use hashbrown::HashSet;
10use relay_base_schema::project::ProjectKey;
11use relay_config::Config;
12use tokio::time::{Instant, timeout};
13
14use crate::envelope::Envelope;
15use crate::envelope::Item;
16use crate::services::buffer::common::ProjectKeyPair;
17use crate::services::buffer::envelope_stack::EnvelopeStack;
18use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
19use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError;
20use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
21use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
22use crate::services::buffer::stack_provider::{StackCreationType, StackProvider};
23use crate::statsd::{RelayDistributions, RelayGauges, RelayTimers};
24use crate::utils::MemoryChecker;
25
26/// Polymorphic envelope buffering interface.
27///
28/// The underlying buffer can either be disk-based or memory-based,
29/// depending on the given configuration.
30///
31/// NOTE: This is implemented as an enum because a trait object with async methods would not be
32/// object safe.
33#[derive(Debug)]
34#[allow(private_interfaces)]
35pub enum PolymorphicEnvelopeBuffer {
36    /// An enveloper buffer that uses in-memory envelopes stacks.
37    InMemory(EnvelopeBuffer<MemoryStackProvider>),
38    /// An enveloper buffer that uses sqlite envelopes stacks.
39    Sqlite(EnvelopeBuffer<SqliteStackProvider>),
40}
41
42impl PolymorphicEnvelopeBuffer {
43    /// Returns true if the implementation stores all envelopes in RAM.
44    pub fn is_memory(&self) -> bool {
45        match self {
46            Self::InMemory(_) => true,
47            Self::Sqlite(_) => false,
48        }
49    }
50
51    /// Creates either a memory-based or a disk-based envelope buffer,
52    /// depending on the given configuration.
53    pub async fn from_config(
54        partition_id: u8,
55        config: &Config,
56        memory_checker: MemoryChecker,
57    ) -> Result<Self, EnvelopeBufferError> {
58        let buffer = if config.spool_envelopes_path(partition_id).is_some() {
59            relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer");
60            let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(partition_id, config).await?;
61            Self::Sqlite(buffer)
62        } else {
63            relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer");
64            let buffer = EnvelopeBuffer::<MemoryStackProvider>::new(partition_id, memory_checker);
65            Self::InMemory(buffer)
66        };
67
68        Ok(buffer)
69    }
70
71    /// Initializes the envelope buffer.
72    pub async fn initialize(&mut self) {
73        match self {
74            PolymorphicEnvelopeBuffer::InMemory(buffer) => buffer.initialize().await,
75            PolymorphicEnvelopeBuffer::Sqlite(buffer) => buffer.initialize().await,
76        }
77    }
78
79    /// Adds an envelope to the buffer.
80    pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
81        relay_statsd::metric!(
82            distribution(RelayDistributions::BufferEnvelopeBodySize) =
83                envelope.items().map(Item::len).sum::<usize>() as u64,
84            partition_id = self.partition_tag()
85        );
86
87        relay_statsd::metric!(
88            timer(RelayTimers::BufferPush),
89            partition_id = self.partition_tag(),
90            {
91                match self {
92                    Self::Sqlite(buffer) => buffer.push(envelope).await,
93                    Self::InMemory(buffer) => buffer.push(envelope).await,
94                }
95            }
96        )
97    }
98
99    /// Returns a reference to the next-in-line envelope.
100    pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
101        relay_statsd::metric!(
102            timer(RelayTimers::BufferPeek),
103            partition_id = self.partition_tag(),
104            {
105                match self {
106                    Self::Sqlite(buffer) => buffer.peek().await,
107                    Self::InMemory(buffer) => buffer.peek().await,
108                }
109            }
110        )
111    }
112
113    /// Pops the next-in-line envelope.
114    pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
115        relay_statsd::metric!(
116            timer(RelayTimers::BufferPop),
117            partition_id = self.partition_tag(),
118            {
119                match self {
120                    Self::Sqlite(buffer) => buffer.pop().await,
121                    Self::InMemory(buffer) => buffer.pop().await,
122                }
123            }
124        )
125    }
126
127    /// Marks a project as ready or not ready.
128    ///
129    /// The buffer re-prioritizes its envelopes based on this information.
130    /// Returns `true` if at least one priority was changed.
131    pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
132        relay_log::trace!(
133            project_key = project.as_str(),
134            "buffer marked {}",
135            if is_ready { "ready" } else { "not ready" }
136        );
137        match self {
138            Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready),
139            Self::InMemory(buffer) => buffer.mark_ready(project, is_ready),
140        }
141    }
142
143    /// Marks a stack as seen.
144    ///
145    /// Non-ready stacks are deprioritized when they are marked as seen, such that
146    /// the next call to `.peek()` will look at a different stack. This prevents
147    /// head-of-line blocking.
148    pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
149        match self {
150            Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
151            Self::InMemory(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
152        }
153    }
154
155    /// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s.
156    pub fn has_capacity(&self) -> bool {
157        match self {
158            Self::Sqlite(buffer) => buffer.has_capacity(),
159            Self::InMemory(buffer) => buffer.has_capacity(),
160        }
161    }
162
163    /// Returns the total number of envelopes that have been spooled since the startup. It does
164    /// not include the count that existed in a persistent spooler before.
165    pub fn item_count(&self) -> u64 {
166        match self {
167            Self::Sqlite(buffer) => buffer.tracked_count,
168            Self::InMemory(buffer) => buffer.tracked_count,
169        }
170    }
171
172    /// Returns the total number of bytes that the spooler storage uses or `None` if the number
173    /// cannot be reliably determined.
174    pub fn total_size(&self) -> Option<u64> {
175        match self {
176            Self::Sqlite(buffer) => buffer.stack_provider.total_size(),
177            Self::InMemory(buffer) => buffer.stack_provider.total_size(),
178        }
179    }
180
181    /// Shuts down the [`PolymorphicEnvelopeBuffer`].
182    pub async fn shutdown(&mut self) -> bool {
183        // Currently, we want to flush the buffer only for disk, since the in memory implementation
184        // tries to not do anything and pop as many elements as possible within the shutdown
185        // timeout.
186        match self {
187            Self::Sqlite(buffer) if !buffer.stack_provider.ephemeral() => {
188                buffer.flush().await;
189                true
190            }
191            _ => {
192                relay_log::trace!("shutdown procedure not needed");
193                false
194            }
195        }
196    }
197
198    /// Returns the partition tag for this [`PolymorphicEnvelopeBuffer`].
199    fn partition_tag(&self) -> &str {
200        match self {
201            PolymorphicEnvelopeBuffer::InMemory(buffer) => &buffer.partition_tag,
202            PolymorphicEnvelopeBuffer::Sqlite(buffer) => &buffer.partition_tag,
203        }
204    }
205}
206
207/// Error that occurs while interacting with the envelope buffer.
208#[derive(Debug, thiserror::Error)]
209pub enum EnvelopeBufferError {
210    #[error("sqlite")]
211    SqliteStore(#[from] SqliteEnvelopeStoreError),
212
213    #[error("sqlite")]
214    SqliteStack(#[from] SqliteEnvelopeStackError),
215
216    #[error("failed to push envelope to the buffer")]
217    PushFailed,
218}
219
220impl From<Infallible> for EnvelopeBufferError {
221    fn from(value: Infallible) -> Self {
222        match value {}
223    }
224}
225
226/// An envelope buffer that holds an individual stack for each project/sampling project combination.
227///
228/// Envelope stacks are organized in a priority queue, and are re-prioritized every time an envelope
229/// is pushed, popped, or when a project becomes ready.
230#[derive(Debug)]
231struct EnvelopeBuffer<P: StackProvider> {
232    /// The central priority queue.
233    priority_queue: priority_queue::PriorityQueue<QueueItem<ProjectKeyPair, P::Stack>, Priority>,
234    /// A lookup table to find all stacks involving a project.
235    stacks_by_project: hashbrown::HashMap<ProjectKey, BTreeSet<ProjectKeyPair>>,
236    /// A provider of stacks that provides utilities to create stacks, check their capacity...
237    ///
238    /// This indirection is needed because different stack implementations might need different
239    /// initialization (e.g. a database connection).
240    stack_provider: P,
241    /// The total count of envelopes that the buffer is working with.
242    ///
243    /// Note that this count is not meant to be perfectly accurate since the initialization of the
244    /// count might not succeed if it takes more than a set timeout. For example, if we load the
245    /// count of all envelopes from disk, and it takes more than the time we set, we will mark the
246    /// initial count as 0 and just count incoming and outgoing envelopes from the buffer.
247    total_count: i64,
248    /// The total count of envelopes that the buffer is working with ignoring envelopes that
249    /// were previously stored on disk.
250    ///
251    /// On startup this will always be 0 and will only count incoming envelopes. If a reliable
252    /// count of currently buffered envelopes is required, prefer this over `total_count`
253    tracked_count: u64,
254    /// Whether the count initialization succeeded or not.
255    ///
256    /// This boolean is just used for tagging the metric that tracks the total count of envelopes
257    /// in the buffer.
258    total_count_initialized: bool,
259    /// The tag value of this partition which is used for reporting purposes.
260    partition_tag: String,
261}
262
263impl EnvelopeBuffer<MemoryStackProvider> {
264    /// Creates an empty memory-based buffer.
265    pub fn new(partition_id: u8, memory_checker: MemoryChecker) -> Self {
266        Self {
267            stacks_by_project: Default::default(),
268            priority_queue: Default::default(),
269            stack_provider: MemoryStackProvider::new(memory_checker),
270            total_count: 0,
271            tracked_count: 0,
272            total_count_initialized: false,
273            partition_tag: partition_id.to_string(),
274        }
275    }
276}
277
278#[allow(dead_code)]
279impl EnvelopeBuffer<SqliteStackProvider> {
280    /// Creates an empty sqlite-based buffer.
281    pub async fn new(partition_id: u8, config: &Config) -> Result<Self, EnvelopeBufferError> {
282        Ok(Self {
283            stacks_by_project: Default::default(),
284            priority_queue: Default::default(),
285            stack_provider: SqliteStackProvider::new(partition_id, config).await?,
286            total_count: 0,
287            tracked_count: 0,
288            total_count_initialized: false,
289            partition_tag: partition_id.to_string(),
290        })
291    }
292}
293
294impl<P: StackProvider> EnvelopeBuffer<P>
295where
296    EnvelopeBufferError: From<<P::Stack as EnvelopeStack>::Error>,
297{
298    /// Initializes the [`EnvelopeBuffer`] given the initialization state from the
299    /// [`StackProvider`].
300    pub async fn initialize(&mut self) {
301        relay_statsd::metric!(
302            timer(RelayTimers::BufferInitialization),
303            partition_id = &self.partition_tag,
304            {
305                let initialization_state = self.stack_provider.initialize().await;
306                self.load_stacks(initialization_state.project_key_pairs)
307                    .await;
308                self.load_store_total_count().await;
309            }
310        );
311    }
312
313    /// Pushes an envelope to the appropriate envelope stack and re-prioritizes the stack.
314    ///
315    /// If the envelope stack does not exist, a new stack is pushed to the priority queue.
316    /// The priority of the stack is updated with the envelope's received_at time.
317    pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
318        let received_at = envelope.received_at();
319
320        let project_key_pair = ProjectKeyPair::from_envelope(&envelope);
321        if let Some((
322            QueueItem {
323                key: _,
324                value: stack,
325            },
326            _,
327        )) = self.priority_queue.get_mut(&project_key_pair)
328        {
329            stack.push(envelope).await?;
330        } else {
331            // Since we have initialization code that creates all the necessary stacks, we assume
332            // that any new stack that is added during the envelope buffer's lifecycle, is recreated.
333            self.push_stack(
334                StackCreationType::New,
335                ProjectKeyPair::from_envelope(&envelope),
336                Some(envelope),
337            )
338            .await?;
339        }
340        self.priority_queue
341            .change_priority_by(&project_key_pair, |prio| {
342                prio.received_at = received_at;
343            });
344
345        self.total_count += 1;
346        self.tracked_count += 1;
347        self.track_total_count();
348
349        Ok(())
350    }
351
352    /// Returns a reference to the next-in-line envelope, if one exists.
353    pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
354        let Some((
355            QueueItem {
356                key: project_key_pair,
357                value: stack,
358            },
359            Priority {
360                readiness,
361                next_project_fetch,
362                ..
363            },
364        )) = self.priority_queue.peek_mut()
365        else {
366            return Ok(Peek::Empty);
367        };
368
369        let ready = readiness.ready();
370
371        Ok(match (stack.peek().await?, ready) {
372            (None, _) => Peek::Empty,
373            (Some(last_received_at), true) => Peek::Ready {
374                project_key_pair: *project_key_pair,
375                last_received_at,
376            },
377            (Some(last_received_at), false) => Peek::NotReady {
378                project_key_pair: *project_key_pair,
379                next_project_fetch: *next_project_fetch,
380                last_received_at,
381            },
382        })
383    }
384
385    /// Returns the next-in-line envelope, if one exists.
386    ///
387    /// The priority of the envelope's stack is updated with the next envelope's received_at
388    /// time. If the stack is empty after popping, it is removed from the priority queue.
389    pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
390        let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else {
391            return Ok(None);
392        };
393        let project_key_pair = *key;
394        let envelope = stack.pop().await?.expect("found an empty stack");
395
396        let last_received_at = stack.peek().await?;
397
398        match last_received_at {
399            None => {
400                self.pop_stack(project_key_pair);
401            }
402            Some(last_received_at) => {
403                self.priority_queue
404                    .change_priority_by(&project_key_pair, |prio| {
405                        prio.received_at = last_received_at;
406                    });
407            }
408        }
409
410        // We are fine with the count going negative, since it represents that more data was popped,
411        // than it was initially counted, meaning that we had a wrong total count from
412        // initialization.
413        self.total_count -= 1;
414        self.tracked_count = self.tracked_count.saturating_sub(1);
415        self.track_total_count();
416
417        Ok(Some(envelope))
418    }
419
420    /// Re-prioritizes all stacks that involve the given project key by setting it to "ready".
421    ///
422    /// Returns `true` if at least one priority was changed.
423    pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
424        let mut changed = false;
425        if let Some(project_key_pairs) = self.stacks_by_project.get(project) {
426            for project_key_pair in project_key_pairs {
427                self.priority_queue
428                    .change_priority_by(project_key_pair, |stack| {
429                        let mut found = false;
430                        for (subkey, readiness) in [
431                            (
432                                project_key_pair.own_key,
433                                &mut stack.readiness.own_project_ready,
434                            ),
435                            (
436                                project_key_pair.sampling_key,
437                                &mut stack.readiness.sampling_project_ready,
438                            ),
439                        ] {
440                            if subkey == *project {
441                                found = true;
442                                if *readiness != is_ready {
443                                    changed = true;
444                                    *readiness = is_ready;
445                                }
446                            }
447                        }
448                        debug_assert!(found);
449                    });
450            }
451        }
452
453        changed
454    }
455
456    /// Marks a stack as seen.
457    ///
458    /// Non-ready stacks are deprioritized when they are marked as seen, such that
459    /// the next call to `.peek()` will look at a different stack. This prevents
460    /// head-of-line blocking.
461    pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
462        self.priority_queue
463            .change_priority_by(project_key_pair, |stack| {
464                // We use the next project fetch to debounce project fetching and avoid head of
465                // line blocking of non-ready stacks.
466                stack.next_project_fetch = Instant::now() + next_fetch;
467            });
468    }
469
470    /// Returns `true` if the underlying storage has the capacity to store more envelopes.
471    pub fn has_capacity(&self) -> bool {
472        self.stack_provider.has_store_capacity()
473    }
474
475    /// Flushes the envelope buffer.
476    pub async fn flush(&mut self) {
477        let priority_queue = mem::take(&mut self.priority_queue);
478        self.stack_provider
479            .flush(priority_queue.into_iter().map(|(q, _)| q.value))
480            .await;
481    }
482
483    /// Pushes a new [`EnvelopeStack`] with the given [`Envelope`] inserted.
484    async fn push_stack(
485        &mut self,
486        stack_creation_type: StackCreationType,
487        project_key_pair: ProjectKeyPair,
488        envelope: Option<Box<Envelope>>,
489    ) -> Result<(), EnvelopeBufferError> {
490        let received_at = envelope.as_ref().map_or(Utc::now(), |e| e.received_at());
491
492        let mut stack = self
493            .stack_provider
494            .create_stack(stack_creation_type, project_key_pair);
495        if let Some(envelope) = envelope {
496            stack.push(envelope).await?;
497        }
498
499        let previous_entry = self.priority_queue.push(
500            QueueItem {
501                key: project_key_pair,
502                value: stack,
503            },
504            Priority::new(received_at),
505        );
506        debug_assert!(previous_entry.is_none());
507        for project_key in project_key_pair.iter() {
508            self.stacks_by_project
509                .entry(project_key)
510                .or_default()
511                .insert(project_key_pair);
512        }
513        relay_statsd::metric!(
514            gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
515            partition_id = &self.partition_tag
516        );
517
518        Ok(())
519    }
520
521    /// Pops an [`EnvelopeStack`] with the supplied [`EnvelopeBufferError`].
522    fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) {
523        for project_key in project_key_pair.iter() {
524            self.stacks_by_project
525                .get_mut(&project_key)
526                .expect("project_key is missing from lookup")
527                .remove(&project_key_pair);
528        }
529        self.priority_queue.remove(&project_key_pair);
530
531        relay_statsd::metric!(
532            gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64,
533            partition_id = &self.partition_tag
534        );
535    }
536
537    /// Creates all the [`EnvelopeStack`]s with no data given a set of [`ProjectKeyPair`].
538    async fn load_stacks(&mut self, project_key_pairs: HashSet<ProjectKeyPair>) {
539        for project_key_pair in project_key_pairs {
540            self.push_stack(StackCreationType::Initialization, project_key_pair, None)
541                .await
542                .expect("Pushing an empty stack raised an error");
543        }
544    }
545
546    /// Loads the total count from the store if it takes less than a specified duration.
547    ///
548    /// The total count returned by the store is related to the count of elements that the buffer
549    /// will process, besides the count of elements that will be added and removed during its
550    /// lifecycle
551    async fn load_store_total_count(&mut self) {
552        let total_count = timeout(Duration::from_secs(1), async {
553            self.stack_provider.store_total_count().await
554        })
555        .await;
556        match total_count {
557            Ok(total_count) => {
558                self.total_count = total_count as i64;
559                self.total_count_initialized = true;
560            }
561            Err(error) => {
562                self.total_count_initialized = false;
563                relay_log::error!(
564                    error = &error as &dyn Error,
565                    "failed to load the total envelope count of the store",
566                );
567            }
568        };
569        self.track_total_count();
570    }
571
572    /// Emits a metric to track the total count of envelopes that are in the envelope buffer.
573    fn track_total_count(&self) {
574        let total_count = self.total_count as f64;
575        let initialized = match self.total_count_initialized {
576            true => "true",
577            false => "false",
578        };
579        relay_statsd::metric!(
580            gauge(RelayGauges::BufferEnvelopesCount) = total_count,
581            initialized = initialized,
582            stack_type = self.stack_provider.stack_type(),
583            partition_id = &self.partition_tag
584        );
585    }
586}
587
588/// Contains the state of the first element in the buffer.
589pub enum Peek {
590    Empty,
591    Ready {
592        project_key_pair: ProjectKeyPair,
593        last_received_at: DateTime<Utc>,
594    },
595    NotReady {
596        project_key_pair: ProjectKeyPair,
597        next_project_fetch: Instant,
598        last_received_at: DateTime<Utc>,
599    },
600}
601
602impl Peek {
603    pub fn last_received_at(&self) -> Option<DateTime<Utc>> {
604        match self {
605            Self::Empty => None,
606            Self::Ready {
607                last_received_at, ..
608            }
609            | Self::NotReady {
610                last_received_at, ..
611            } => Some(*last_received_at),
612        }
613    }
614}
615
616#[derive(Debug)]
617struct QueueItem<K, V> {
618    key: K,
619    value: V,
620}
621
622impl<K, V> std::borrow::Borrow<K> for QueueItem<K, V> {
623    fn borrow(&self) -> &K {
624        &self.key
625    }
626}
627
628impl<K: std::hash::Hash, V> std::hash::Hash for QueueItem<K, V> {
629    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
630        self.key.hash(state);
631    }
632}
633
634impl<K: PartialEq, V> PartialEq for QueueItem<K, V> {
635    fn eq(&self, other: &Self) -> bool {
636        self.key == other.key
637    }
638}
639
640impl<K: PartialEq, V> Eq for QueueItem<K, V> {}
641
642#[derive(Debug, Clone)]
643struct Priority {
644    readiness: Readiness,
645    received_at: DateTime<Utc>,
646    next_project_fetch: Instant,
647}
648
649impl Priority {
650    fn new(received_at: DateTime<Utc>) -> Self {
651        Self {
652            readiness: Readiness::new(),
653            received_at,
654            next_project_fetch: Instant::now(),
655        }
656    }
657}
658
659impl Ord for Priority {
660    fn cmp(&self, other: &Self) -> Ordering {
661        match (self.readiness.ready(), other.readiness.ready()) {
662            // Assuming that two priorities differ only w.r.t. the `last_peek`, we want to prioritize
663            // stacks that were the least recently peeked. The rationale behind this is that we want
664            // to keep cycling through different stacks while peeking.
665            (true, true) => self.received_at.cmp(&other.received_at),
666            (true, false) => Ordering::Greater,
667            (false, true) => Ordering::Less,
668            // For non-ready stacks, we invert the priority, such that projects that are not
669            // ready and did not receive envelopes recently can be evicted.
670            (false, false) => self
671                .next_project_fetch
672                .cmp(&other.next_project_fetch)
673                .reverse()
674                .then(self.received_at.cmp(&other.received_at).reverse()),
675        }
676    }
677}
678
679impl PartialOrd for Priority {
680    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
681        Some(self.cmp(other))
682    }
683}
684
685impl PartialEq for Priority {
686    fn eq(&self, other: &Self) -> bool {
687        self.cmp(other).is_eq()
688    }
689}
690
691impl Eq for Priority {}
692
693#[derive(Debug, Clone, Copy)]
694struct Readiness {
695    own_project_ready: bool,
696    sampling_project_ready: bool,
697}
698
699impl Readiness {
700    fn new() -> Self {
701        // Optimistically set ready state to true.
702        // The large majority of stack creations are re-creations after a stack was emptied.
703        Self {
704            own_project_ready: true,
705            sampling_project_ready: true,
706        }
707    }
708
709    fn ready(&self) -> bool {
710        self.own_project_ready && self.sampling_project_ready
711    }
712}
713
714#[cfg(test)]
715mod tests {
716    use relay_common::Dsn;
717    use relay_event_schema::protocol::EventId;
718    use relay_sampling::DynamicSamplingContext;
719    use std::str::FromStr;
720    use std::sync::Arc;
721    use uuid::Uuid;
722
723    use crate::SqliteEnvelopeStore;
724    use crate::envelope::{Item, ItemType};
725    use crate::extractors::RequestMeta;
726    use crate::services::buffer::common::ProjectKeyPair;
727    use crate::services::buffer::envelope_store::sqlite::DatabaseEnvelope;
728    use crate::services::buffer::testutils::utils::mock_envelopes;
729    use crate::utils::MemoryStat;
730
731    use super::*;
732
733    impl Peek {
734        fn is_empty(&self) -> bool {
735            matches!(self, Peek::Empty)
736        }
737    }
738
739    fn new_envelope(
740        own_key: ProjectKey,
741        sampling_key: Option<ProjectKey>,
742        event_id: Option<EventId>,
743    ) -> Box<Envelope> {
744        let mut envelope = Envelope::from_request(
745            None,
746            RequestMeta::new(Dsn::from_str(&format!("http://{own_key}@localhost/1")).unwrap()),
747        );
748        if let Some(sampling_key) = sampling_key {
749            envelope.set_dsc(DynamicSamplingContext {
750                public_key: sampling_key,
751                trace_id: "67e5504410b1426f9247bb680e5fe0c8".parse().unwrap(),
752                release: None,
753                user: Default::default(),
754                replay_id: None,
755                environment: None,
756                transaction: None,
757                sample_rate: None,
758                sampled: None,
759                other: Default::default(),
760            });
761            envelope.add_item(Item::new(ItemType::Transaction));
762        }
763        if let Some(event_id) = event_id {
764            envelope.set_event_id(event_id);
765        }
766        envelope
767    }
768
769    fn mock_config(path: &str) -> Arc<Config> {
770        Config::from_json_value(serde_json::json!({
771            "spool": {
772                "envelopes": {
773                    "path": path
774                }
775            }
776        }))
777        .unwrap()
778        .into()
779    }
780
781    fn mock_memory_checker() -> MemoryChecker {
782        MemoryChecker::new(MemoryStat::default(), mock_config("my/db/path").clone())
783    }
784
785    async fn peek_received_at(buffer: &mut EnvelopeBuffer<MemoryStackProvider>) -> DateTime<Utc> {
786        buffer.peek().await.unwrap().last_received_at().unwrap()
787    }
788
789    #[tokio::test]
790    async fn test_insert_pop() {
791        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
792
793        let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
794        let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
795        let project_key3 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
796
797        assert!(buffer.pop().await.unwrap().is_none());
798        assert!(buffer.peek().await.unwrap().is_empty());
799
800        let envelope1 = new_envelope(project_key1, None, None);
801        let time1 = envelope1.meta().received_at();
802        buffer.push(envelope1).await.unwrap();
803
804        let envelope2 = new_envelope(project_key2, None, None);
805        let time2 = envelope2.meta().received_at();
806        buffer.push(envelope2).await.unwrap();
807
808        // Both projects are ready, so project 2 is on top (has the newest envelopes):
809        assert_eq!(peek_received_at(&mut buffer).await, time2);
810
811        buffer.mark_ready(&project_key1, false);
812        buffer.mark_ready(&project_key2, false);
813
814        // Both projects are not ready, so project 1 is on top (has the oldest envelopes):
815        assert_eq!(peek_received_at(&mut buffer).await, time1);
816
817        let envelope3 = new_envelope(project_key3, None, None);
818        let time3 = envelope3.meta().received_at();
819        buffer.push(envelope3).await.unwrap();
820        buffer.mark_ready(&project_key3, false);
821
822        // All projects are not ready, so project 1 is on top (has the oldest envelopes):
823        assert_eq!(peek_received_at(&mut buffer).await, time1);
824
825        // After marking a project ready, it goes to the top:
826        buffer.mark_ready(&project_key3, true);
827        assert_eq!(peek_received_at(&mut buffer).await, time3);
828        assert_eq!(
829            buffer.pop().await.unwrap().unwrap().meta().public_key(),
830            project_key3
831        );
832
833        // After popping, project 1 is on top again:
834        assert_eq!(peek_received_at(&mut buffer).await, time1);
835
836        // Mark project 1 as ready (still on top):
837        buffer.mark_ready(&project_key1, true);
838        assert_eq!(peek_received_at(&mut buffer).await, time1);
839
840        // Mark project 2 as ready as well (now on top because most recent):
841        buffer.mark_ready(&project_key2, true);
842        assert_eq!(peek_received_at(&mut buffer).await, time2);
843        assert_eq!(
844            buffer.pop().await.unwrap().unwrap().meta().public_key(),
845            project_key2
846        );
847
848        // Pop last element:
849        assert_eq!(
850            buffer.pop().await.unwrap().unwrap().meta().public_key(),
851            project_key1
852        );
853        assert!(buffer.pop().await.unwrap().is_none());
854        assert!(buffer.peek().await.unwrap().is_empty());
855    }
856
857    #[tokio::test]
858    async fn test_project_internal_order() {
859        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
860
861        let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
862
863        let envelope1 = new_envelope(project_key, None, None);
864        let time1 = envelope1.meta().received_at();
865        let envelope2 = new_envelope(project_key, None, None);
866        let time2 = envelope2.meta().received_at();
867
868        assert!(time2 > time1);
869
870        buffer.push(envelope1).await.unwrap();
871        buffer.push(envelope2).await.unwrap();
872
873        assert_eq!(
874            buffer.pop().await.unwrap().unwrap().meta().received_at(),
875            time2
876        );
877        assert_eq!(
878            buffer.pop().await.unwrap().unwrap().meta().received_at(),
879            time1
880        );
881        assert!(buffer.pop().await.unwrap().is_none());
882    }
883
884    #[tokio::test]
885    async fn test_sampling_projects() {
886        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
887
888        let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
889        let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
890
891        let envelope1 = new_envelope(project_key1, None, None);
892        let time1 = envelope1.received_at();
893        buffer.push(envelope1).await.unwrap();
894
895        let envelope2 = new_envelope(project_key2, None, None);
896        let time2 = envelope2.received_at();
897        buffer.push(envelope2).await.unwrap();
898
899        let envelope3 = new_envelope(project_key1, Some(project_key2), None);
900        let time3 = envelope3.meta().received_at();
901        buffer.push(envelope3).await.unwrap();
902
903        buffer.mark_ready(&project_key1, false);
904        buffer.mark_ready(&project_key2, false);
905
906        // Nothing is ready, instant1 is on top:
907        assert_eq!(
908            buffer.peek().await.unwrap().last_received_at().unwrap(),
909            time1
910        );
911
912        // Mark project 2 ready, gets on top:
913        buffer.mark_ready(&project_key2, true);
914        assert_eq!(
915            buffer.peek().await.unwrap().last_received_at().unwrap(),
916            time2
917        );
918
919        // Revert
920        buffer.mark_ready(&project_key2, false);
921        assert_eq!(
922            buffer.peek().await.unwrap().last_received_at().unwrap(),
923            time1
924        );
925
926        // Project 1 ready:
927        buffer.mark_ready(&project_key1, true);
928        assert_eq!(
929            buffer.peek().await.unwrap().last_received_at().unwrap(),
930            time1
931        );
932
933        // when both projects are ready, event no 3 ends up on top:
934        buffer.mark_ready(&project_key2, true);
935        assert_eq!(
936            buffer.pop().await.unwrap().unwrap().meta().received_at(),
937            time3
938        );
939        assert_eq!(
940            buffer.peek().await.unwrap().last_received_at().unwrap(),
941            time2
942        );
943
944        buffer.mark_ready(&project_key2, false);
945        assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time1);
946        assert_eq!(buffer.pop().await.unwrap().unwrap().received_at(), time2);
947
948        assert!(buffer.pop().await.unwrap().is_none());
949    }
950
951    #[tokio::test]
952    async fn test_project_keys_distinct() {
953        let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
954        let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
955
956        let project_key_pair1 = ProjectKeyPair::new(project_key1, project_key2);
957        let project_key_pair2 = ProjectKeyPair::new(project_key2, project_key1);
958
959        assert_ne!(project_key_pair1, project_key_pair2);
960
961        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
962        buffer
963            .push(new_envelope(project_key1, Some(project_key2), None))
964            .await
965            .unwrap();
966        buffer
967            .push(new_envelope(project_key2, Some(project_key1), None))
968            .await
969            .unwrap();
970        assert_eq!(buffer.priority_queue.len(), 2);
971    }
972
973    #[test]
974    fn test_total_order() {
975        let p1 = Priority {
976            readiness: Readiness {
977                own_project_ready: true,
978                sampling_project_ready: true,
979            },
980            received_at: Utc::now(),
981            next_project_fetch: Instant::now(),
982        };
983        let mut p2 = p1.clone();
984        p2.next_project_fetch += Duration::from_millis(1);
985
986        // Last peek does not matter because project is ready:
987        assert_eq!(p1.cmp(&p2), Ordering::Equal);
988        assert_eq!(p1, p2);
989    }
990
991    #[tokio::test]
992    async fn test_last_peek_internal_order() {
993        let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(0, mock_memory_checker());
994
995        let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
996        let event_id_1 = EventId::new();
997        let envelope1 = new_envelope(project_key_1, None, Some(event_id_1));
998        let time1 = envelope1.received_at();
999
1000        let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap();
1001        let event_id_2 = EventId::new();
1002        let envelope2 = new_envelope(project_key_2, None, Some(event_id_2));
1003        let time2 = envelope2.received_at();
1004
1005        buffer.push(envelope1).await.unwrap();
1006        buffer.push(envelope2).await.unwrap();
1007
1008        buffer.mark_ready(&project_key_1, false);
1009        buffer.mark_ready(&project_key_2, false);
1010
1011        // event_id_1 is first element:
1012        let Peek::NotReady {
1013            last_received_at, ..
1014        } = buffer.peek().await.unwrap()
1015        else {
1016            panic!();
1017        };
1018        assert_eq!(last_received_at, time1);
1019
1020        // Second peek returns same element:
1021        let Peek::NotReady {
1022            last_received_at,
1023            project_key_pair,
1024            ..
1025        } = buffer.peek().await.unwrap()
1026        else {
1027            panic!();
1028        };
1029        assert_eq!(last_received_at, time1);
1030        assert_ne!(last_received_at, time2);
1031
1032        buffer.mark_seen(&project_key_pair, Duration::ZERO);
1033
1034        // After mark_seen, event 2 is on top:
1035        let Peek::NotReady {
1036            last_received_at, ..
1037        } = buffer.peek().await.unwrap()
1038        else {
1039            panic!();
1040        };
1041        assert_eq!(last_received_at, time2);
1042        assert_ne!(last_received_at, time1);
1043
1044        let Peek::NotReady {
1045            last_received_at,
1046            project_key_pair,
1047            ..
1048        } = buffer.peek().await.unwrap()
1049        else {
1050            panic!();
1051        };
1052        assert_eq!(last_received_at, time2);
1053        assert_ne!(last_received_at, time1);
1054
1055        buffer.mark_seen(&project_key_pair, Duration::ZERO);
1056
1057        // After another mark_seen, cycle back to event 1:
1058        let Peek::NotReady {
1059            last_received_at, ..
1060        } = buffer.peek().await.unwrap()
1061        else {
1062            panic!();
1063        };
1064        assert_eq!(last_received_at, time1);
1065        assert_ne!(last_received_at, time2);
1066    }
1067
1068    #[tokio::test]
1069    async fn test_initialize_buffer() {
1070        let path = std::env::temp_dir()
1071            .join(Uuid::new_v4().to_string())
1072            .into_os_string()
1073            .into_string()
1074            .unwrap();
1075        let config = mock_config(&path);
1076        let mut store = SqliteEnvelopeStore::prepare(0, &config).await.unwrap();
1077        let mut buffer = EnvelopeBuffer::<SqliteStackProvider>::new(0, &config)
1078            .await
1079            .unwrap();
1080
1081        // We write 5 envelopes to disk so that we can check if they are loaded. These envelopes
1082        // belong to the same project keys, so they belong to the same envelope stack.
1083        let envelopes = mock_envelopes(10);
1084        assert!(
1085            store
1086                .insert_batch(
1087                    envelopes
1088                        .into_iter()
1089                        .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
1090                        .collect::<Vec<_>>()
1091                        .try_into()
1092                        .unwrap()
1093                )
1094                .await
1095                .is_ok()
1096        );
1097
1098        // We assume that the buffer is empty.
1099        assert!(buffer.priority_queue.is_empty());
1100        assert!(buffer.stacks_by_project.is_empty());
1101
1102        buffer.initialize().await;
1103
1104        // We assume that we loaded only 1 envelope stack, because of the project keys combinations
1105        // of the envelopes we inserted above.
1106        assert_eq!(buffer.priority_queue.len(), 1);
1107        // We expect to have an entry per project key, since we have 1 pair, the total entries
1108        // should be 2.
1109        assert_eq!(buffer.stacks_by_project.len(), 2);
1110    }
1111}