relay_server/services/buffer/stack_provider/
sqlite.rs

1use std::error::Error;
2
3use relay_config::Config;
4
5use crate::services::buffer::common::ProjectKeyPair;
6use crate::services::buffer::envelope_stack::caching::CachingEnvelopeStack;
7use crate::services::buffer::envelope_store::sqlite::{
8    SqliteEnvelopeStore, SqliteEnvelopeStoreError,
9};
10use crate::services::buffer::stack_provider::{
11    InitializationState, StackCreationType, StackProvider,
12};
13use crate::statsd::RelayTimers;
14use crate::{EnvelopeStack, SqliteEnvelopeStack};
15
16#[derive(Debug)]
17pub struct SqliteStackProvider {
18    envelope_store: SqliteEnvelopeStore,
19    batch_size_bytes: usize,
20    max_disk_size: usize,
21    partition_id: u8,
22}
23
24#[warn(dead_code)]
25impl SqliteStackProvider {
26    /// Creates a new [`SqliteStackProvider`] from the provided [`Config`].
27    pub async fn new(partition_id: u8, config: &Config) -> Result<Self, SqliteEnvelopeStoreError> {
28        let envelope_store = SqliteEnvelopeStore::prepare(partition_id, config).await?;
29        Ok(Self {
30            envelope_store,
31            batch_size_bytes: config.spool_envelopes_batch_size_bytes(),
32            max_disk_size: config.spool_envelopes_max_disk_size(),
33            partition_id,
34        })
35    }
36
37    /// Returns `true` when there might be data residing on disk, `false` otherwise.
38    fn assume_data_on_disk(stack_creation_type: StackCreationType) -> bool {
39        matches!(stack_creation_type, StackCreationType::Initialization)
40    }
41}
42
43impl StackProvider for SqliteStackProvider {
44    type Stack = CachingEnvelopeStack<SqliteEnvelopeStack>;
45
46    async fn initialize(&self) -> InitializationState {
47        match self.envelope_store.project_key_pairs().await {
48            Ok(project_key_pairs) => InitializationState::new(project_key_pairs),
49            Err(error) => {
50                relay_log::error!(
51                    error = &error as &dyn Error,
52                    "failed to initialize the sqlite stack provider"
53                );
54                InitializationState::empty()
55            }
56        }
57    }
58
59    fn create_stack(
60        &self,
61        stack_creation_type: StackCreationType,
62        project_key_pair: ProjectKeyPair,
63    ) -> Self::Stack {
64        let inner = SqliteEnvelopeStack::new(
65            self.partition_id,
66            self.envelope_store.clone(),
67            self.batch_size_bytes,
68            project_key_pair.own_key,
69            project_key_pair.sampling_key,
70            // We want to check the disk by default if we are creating the stack for the first time,
71            // since we might have some data on disk.
72            // On the other hand, if we are recreating a stack, it means that we popped it because
73            // it was empty, or we never had data on disk for that stack, so we assume by default
74            // that there is no need to check disk until some data is spooled.
75            Self::assume_data_on_disk(stack_creation_type),
76        );
77
78        CachingEnvelopeStack::new(inner)
79    }
80
81    fn has_store_capacity(&self) -> bool {
82        (self.envelope_store.usage() as usize) < self.max_disk_size
83    }
84
85    async fn store_total_count(&self) -> u64 {
86        self.envelope_store
87            .total_count()
88            .await
89            .unwrap_or_else(|error| {
90                relay_log::error!(
91                    error = &error as &dyn Error,
92                    "failed to get the total count of envelopes for the sqlite envelope store",
93                );
94                // In case we have an error, we default to communicating a total count of 0.
95                0
96            })
97    }
98
99    fn total_size(&self) -> Option<u64> {
100        Some(self.envelope_store.usage())
101    }
102
103    fn stack_type<'a>(&self) -> &'a str {
104        "sqlite"
105    }
106
107    async fn flush(&mut self, envelope_stacks: impl IntoIterator<Item = Self::Stack>) {
108        relay_log::trace!("Flushing sqlite envelope buffer");
109
110        let partition_tag = self.partition_id.to_string();
111        relay_statsd::metric!(
112            timer(RelayTimers::BufferDrain),
113            partition_id = &partition_tag,
114            {
115                for envelope_stack in envelope_stacks {
116                    envelope_stack.flush().await;
117                }
118            }
119        );
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use std::sync::Arc;
126
127    use relay_base_schema::project::ProjectKey;
128    use relay_config::Config;
129    use uuid::Uuid;
130
131    use crate::EnvelopeStack;
132    use crate::services::buffer::common::ProjectKeyPair;
133    use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
134    use crate::services::buffer::stack_provider::{StackCreationType, StackProvider};
135    use crate::services::buffer::testutils::utils::mock_envelopes;
136
137    fn mock_config() -> Arc<Config> {
138        let path = std::env::temp_dir()
139            .join(Uuid::new_v4().to_string())
140            .into_os_string()
141            .into_string()
142            .unwrap();
143
144        Config::from_json_value(serde_json::json!({
145            "spool": {
146                "envelopes": {
147                    "path": path,
148                    "disk_batch_size": 100,
149                    "max_batches": 1,
150                }
151            }
152        }))
153        .unwrap()
154        .into()
155    }
156
157    #[tokio::test]
158    async fn test_flush() {
159        let config = mock_config();
160        let mut stack_provider = SqliteStackProvider::new(0, &config).await.unwrap();
161
162        let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
163        let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
164
165        let mut envelope_stack = stack_provider.create_stack(
166            StackCreationType::New,
167            ProjectKeyPair::new(own_key, sampling_key),
168        );
169
170        let envelopes = mock_envelopes(10);
171        for envelope in envelopes {
172            envelope_stack.push(envelope).await.unwrap();
173        }
174
175        let envelope_store = stack_provider.envelope_store.clone();
176
177        // We make sure that no data is on disk since we will spool when more than 100 elements are
178        // in the in-memory stack.
179        assert_eq!(envelope_store.total_count().await.unwrap(), 0);
180
181        // We drain the stack provider, and we expect all in-memory envelopes to be spooled to disk.
182        stack_provider.flush(vec![envelope_stack]).await;
183        assert_eq!(envelope_store.total_count().await.unwrap(), 10);
184    }
185}