relay_server/services/buffer/stack_provider/
sqlite.rs

1use std::error::Error;
2
3use relay_config::Config;
4
5use crate::services::buffer::common::ProjectKeyPair;
6use crate::services::buffer::envelope_stack::caching::CachingEnvelopeStack;
7use crate::services::buffer::envelope_store::sqlite::{
8    SqliteEnvelopeStore, SqliteEnvelopeStoreError,
9};
10use crate::services::buffer::stack_provider::{
11    InitializationState, StackCreationType, StackProvider,
12};
13use crate::statsd::RelayTimers;
14use crate::{EnvelopeStack, SqliteEnvelopeStack};
15
16#[derive(Debug)]
17pub struct SqliteStackProvider {
18    envelope_store: SqliteEnvelopeStore,
19    batch_size_bytes: usize,
20    max_disk_size: usize,
21    partition_id: u8,
22    ephemeral: bool,
23}
24
25#[warn(dead_code)]
26impl SqliteStackProvider {
27    /// Creates a new [`SqliteStackProvider`] from the provided [`Config`].
28    pub async fn new(partition_id: u8, config: &Config) -> Result<Self, SqliteEnvelopeStoreError> {
29        let envelope_store = SqliteEnvelopeStore::prepare(partition_id, config).await?;
30        Ok(Self {
31            envelope_store,
32            batch_size_bytes: config.spool_envelopes_batch_size_bytes(),
33            max_disk_size: config.spool_envelopes_max_disk_size(),
34            partition_id,
35            ephemeral: config.spool_ephemeral(),
36        })
37    }
38
39    /// Returns `true` if data is stored on non-persistent disks.
40    pub fn ephemeral(&self) -> bool {
41        self.ephemeral
42    }
43
44    /// Returns `true` when there might be data residing on disk, `false` otherwise.
45    fn assume_data_on_disk(stack_creation_type: StackCreationType) -> bool {
46        matches!(stack_creation_type, StackCreationType::Initialization)
47    }
48}
49
50impl StackProvider for SqliteStackProvider {
51    type Stack = CachingEnvelopeStack<SqliteEnvelopeStack>;
52
53    async fn initialize(&self) -> InitializationState {
54        match self.envelope_store.project_key_pairs().await {
55            Ok(project_key_pairs) => InitializationState::new(project_key_pairs),
56            Err(error) => {
57                relay_log::error!(
58                    error = &error as &dyn Error,
59                    "failed to initialize the sqlite stack provider"
60                );
61                InitializationState::empty()
62            }
63        }
64    }
65
66    fn create_stack(
67        &self,
68        stack_creation_type: StackCreationType,
69        project_key_pair: ProjectKeyPair,
70    ) -> Self::Stack {
71        let inner = SqliteEnvelopeStack::new(
72            self.partition_id,
73            self.envelope_store.clone(),
74            self.batch_size_bytes,
75            project_key_pair.own_key,
76            project_key_pair.sampling_key,
77            // We want to check the disk by default if we are creating the stack for the first time,
78            // since we might have some data on disk.
79            // On the other hand, if we are recreating a stack, it means that we popped it because
80            // it was empty, or we never had data on disk for that stack, so we assume by default
81            // that there is no need to check disk until some data is spooled.
82            Self::assume_data_on_disk(stack_creation_type),
83        );
84
85        CachingEnvelopeStack::new(inner)
86    }
87
88    fn has_store_capacity(&self) -> bool {
89        (self.envelope_store.usage() as usize) < self.max_disk_size
90    }
91
92    async fn store_total_count(&self) -> u64 {
93        self.envelope_store
94            .total_count()
95            .await
96            .unwrap_or_else(|error| {
97                relay_log::error!(
98                    error = &error as &dyn Error,
99                    "failed to get the total count of envelopes for the sqlite envelope store",
100                );
101                // In case we have an error, we default to communicating a total count of 0.
102                0
103            })
104    }
105
106    fn total_size(&self) -> Option<u64> {
107        Some(self.envelope_store.usage())
108    }
109
110    fn stack_type<'a>(&self) -> &'a str {
111        "sqlite"
112    }
113
114    async fn flush(&mut self, envelope_stacks: impl IntoIterator<Item = Self::Stack>) {
115        relay_log::trace!("Flushing sqlite envelope buffer");
116
117        let partition_tag = self.partition_id.to_string();
118        relay_statsd::metric!(
119            timer(RelayTimers::BufferDrain),
120            partition_id = &partition_tag,
121            {
122                for envelope_stack in envelope_stacks {
123                    envelope_stack.flush().await;
124                }
125            }
126        );
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use std::sync::Arc;
133
134    use relay_base_schema::project::ProjectKey;
135    use relay_config::Config;
136    use uuid::Uuid;
137
138    use crate::EnvelopeStack;
139    use crate::services::buffer::common::ProjectKeyPair;
140    use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
141    use crate::services::buffer::stack_provider::{StackCreationType, StackProvider};
142    use crate::services::buffer::testutils::utils::mock_envelopes;
143
144    fn mock_config() -> Arc<Config> {
145        let path = std::env::temp_dir()
146            .join(Uuid::new_v4().to_string())
147            .into_os_string()
148            .into_string()
149            .unwrap();
150
151        Config::from_json_value(serde_json::json!({
152            "spool": {
153                "envelopes": {
154                    "path": path,
155                    "disk_batch_size": 100,
156                    "max_batches": 1,
157                }
158            }
159        }))
160        .unwrap()
161        .into()
162    }
163
164    #[tokio::test]
165    async fn test_flush() {
166        let config = mock_config();
167        let mut stack_provider = SqliteStackProvider::new(0, &config).await.unwrap();
168
169        let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
170        let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
171
172        let mut envelope_stack = stack_provider.create_stack(
173            StackCreationType::New,
174            ProjectKeyPair::new(own_key, sampling_key),
175        );
176
177        let envelopes = mock_envelopes(10);
178        for envelope in envelopes {
179            envelope_stack.push(envelope).await.unwrap();
180        }
181
182        let envelope_store = stack_provider.envelope_store.clone();
183
184        // We make sure that no data is on disk since we will spool when more than 100 elements are
185        // in the in-memory stack.
186        assert_eq!(envelope_store.total_count().await.unwrap(), 0);
187
188        // We drain the stack provider, and we expect all in-memory envelopes to be spooled to disk.
189        stack_provider.flush(vec![envelope_stack]).await;
190        assert_eq!(envelope_store.total_count().await.unwrap(), 10);
191    }
192}