relay_server/services/buffer/stack_provider/
sqlite.rs1use std::error::Error;
2
3use relay_config::Config;
4
5use crate::services::buffer::common::ProjectKeyPair;
6use crate::services::buffer::envelope_stack::caching::CachingEnvelopeStack;
7use crate::services::buffer::envelope_store::sqlite::{
8 SqliteEnvelopeStore, SqliteEnvelopeStoreError,
9};
10use crate::services::buffer::stack_provider::{
11 InitializationState, StackCreationType, StackProvider,
12};
13use crate::statsd::RelayTimers;
14use crate::{EnvelopeStack, SqliteEnvelopeStack};
15
16#[derive(Debug)]
17pub struct SqliteStackProvider {
18 envelope_store: SqliteEnvelopeStore,
19 batch_size_bytes: usize,
20 max_disk_size: usize,
21 partition_id: u8,
22 ephemeral: bool,
23}
24
25#[warn(dead_code)]
26impl SqliteStackProvider {
27 pub async fn new(partition_id: u8, config: &Config) -> Result<Self, SqliteEnvelopeStoreError> {
29 let envelope_store = SqliteEnvelopeStore::prepare(partition_id, config).await?;
30 Ok(Self {
31 envelope_store,
32 batch_size_bytes: config.spool_envelopes_batch_size_bytes(),
33 max_disk_size: config.spool_envelopes_max_disk_size(),
34 partition_id,
35 ephemeral: config.spool_ephemeral(),
36 })
37 }
38
39 pub fn ephemeral(&self) -> bool {
41 self.ephemeral
42 }
43
44 fn assume_data_on_disk(stack_creation_type: StackCreationType) -> bool {
46 matches!(stack_creation_type, StackCreationType::Initialization)
47 }
48}
49
50impl StackProvider for SqliteStackProvider {
51 type Stack = CachingEnvelopeStack<SqliteEnvelopeStack>;
52
53 async fn initialize(&self) -> InitializationState {
54 match self.envelope_store.project_key_pairs().await {
55 Ok(project_key_pairs) => InitializationState::new(project_key_pairs),
56 Err(error) => {
57 relay_log::error!(
58 error = &error as &dyn Error,
59 "failed to initialize the sqlite stack provider"
60 );
61 InitializationState::empty()
62 }
63 }
64 }
65
66 fn create_stack(
67 &self,
68 stack_creation_type: StackCreationType,
69 project_key_pair: ProjectKeyPair,
70 ) -> Self::Stack {
71 let inner = SqliteEnvelopeStack::new(
72 self.partition_id,
73 self.envelope_store.clone(),
74 self.batch_size_bytes,
75 project_key_pair.own_key,
76 project_key_pair.sampling_key,
77 Self::assume_data_on_disk(stack_creation_type),
83 );
84
85 CachingEnvelopeStack::new(inner)
86 }
87
88 fn has_store_capacity(&self) -> bool {
89 (self.envelope_store.usage() as usize) < self.max_disk_size
90 }
91
92 async fn store_total_count(&self) -> u64 {
93 self.envelope_store
94 .total_count()
95 .await
96 .unwrap_or_else(|error| {
97 relay_log::error!(
98 error = &error as &dyn Error,
99 "failed to get the total count of envelopes for the sqlite envelope store",
100 );
101 0
103 })
104 }
105
106 fn total_size(&self) -> Option<u64> {
107 Some(self.envelope_store.usage())
108 }
109
110 fn stack_type<'a>(&self) -> &'a str {
111 "sqlite"
112 }
113
114 async fn flush(&mut self, envelope_stacks: impl IntoIterator<Item = Self::Stack>) {
115 relay_log::trace!("Flushing sqlite envelope buffer");
116
117 let partition_tag = self.partition_id.to_string();
118 relay_statsd::metric!(
119 timer(RelayTimers::BufferDrain),
120 partition_id = &partition_tag,
121 {
122 for envelope_stack in envelope_stacks {
123 envelope_stack.flush().await;
124 }
125 }
126 );
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use std::sync::Arc;
133
134 use relay_base_schema::project::ProjectKey;
135 use relay_config::Config;
136 use uuid::Uuid;
137
138 use crate::EnvelopeStack;
139 use crate::services::buffer::common::ProjectKeyPair;
140 use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
141 use crate::services::buffer::stack_provider::{StackCreationType, StackProvider};
142 use crate::services::buffer::testutils::utils::mock_envelopes;
143
144 fn mock_config() -> Arc<Config> {
145 let path = std::env::temp_dir()
146 .join(Uuid::new_v4().to_string())
147 .into_os_string()
148 .into_string()
149 .unwrap();
150
151 Config::from_json_value(serde_json::json!({
152 "spool": {
153 "envelopes": {
154 "path": path,
155 "disk_batch_size": 100,
156 "max_batches": 1,
157 }
158 }
159 }))
160 .unwrap()
161 .into()
162 }
163
164 #[tokio::test]
165 async fn test_flush() {
166 let config = mock_config();
167 let mut stack_provider = SqliteStackProvider::new(0, &config).await.unwrap();
168
169 let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
170 let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
171
172 let mut envelope_stack = stack_provider.create_stack(
173 StackCreationType::New,
174 ProjectKeyPair::new(own_key, sampling_key),
175 );
176
177 let envelopes = mock_envelopes(10);
178 for envelope in envelopes {
179 envelope_stack.push(envelope).await.unwrap();
180 }
181
182 let envelope_store = stack_provider.envelope_store.clone();
183
184 assert_eq!(envelope_store.total_count().await.unwrap(), 0);
187
188 stack_provider.flush(vec![envelope_stack]).await;
190 assert_eq!(envelope_store.total_count().await.unwrap(), 10);
191 }
192}