relay_server/services/buffer/stack_provider/
sqlite.rs1use std::error::Error;
2
3use relay_config::Config;
4
5use crate::services::buffer::common::ProjectKeyPair;
6use crate::services::buffer::envelope_stack::caching::CachingEnvelopeStack;
7use crate::services::buffer::envelope_store::sqlite::{
8 SqliteEnvelopeStore, SqliteEnvelopeStoreError,
9};
10use crate::services::buffer::stack_provider::{
11 InitializationState, StackCreationType, StackProvider,
12};
13use crate::statsd::RelayTimers;
14use crate::{EnvelopeStack, SqliteEnvelopeStack};
15
16#[derive(Debug)]
17pub struct SqliteStackProvider {
18 envelope_store: SqliteEnvelopeStore,
19 batch_size_bytes: usize,
20 max_disk_size: usize,
21 partition_id: u8,
22}
23
24#[warn(dead_code)]
25impl SqliteStackProvider {
26 pub async fn new(partition_id: u8, config: &Config) -> Result<Self, SqliteEnvelopeStoreError> {
28 let envelope_store = SqliteEnvelopeStore::prepare(partition_id, config).await?;
29 Ok(Self {
30 envelope_store,
31 batch_size_bytes: config.spool_envelopes_batch_size_bytes(),
32 max_disk_size: config.spool_envelopes_max_disk_size(),
33 partition_id,
34 })
35 }
36
37 fn assume_data_on_disk(stack_creation_type: StackCreationType) -> bool {
39 matches!(stack_creation_type, StackCreationType::Initialization)
40 }
41}
42
43impl StackProvider for SqliteStackProvider {
44 type Stack = CachingEnvelopeStack<SqliteEnvelopeStack>;
45
46 async fn initialize(&self) -> InitializationState {
47 match self.envelope_store.project_key_pairs().await {
48 Ok(project_key_pairs) => InitializationState::new(project_key_pairs),
49 Err(error) => {
50 relay_log::error!(
51 error = &error as &dyn Error,
52 "failed to initialize the sqlite stack provider"
53 );
54 InitializationState::empty()
55 }
56 }
57 }
58
59 fn create_stack(
60 &self,
61 stack_creation_type: StackCreationType,
62 project_key_pair: ProjectKeyPair,
63 ) -> Self::Stack {
64 let inner = SqliteEnvelopeStack::new(
65 self.partition_id,
66 self.envelope_store.clone(),
67 self.batch_size_bytes,
68 project_key_pair.own_key,
69 project_key_pair.sampling_key,
70 Self::assume_data_on_disk(stack_creation_type),
76 );
77
78 CachingEnvelopeStack::new(inner)
79 }
80
81 fn has_store_capacity(&self) -> bool {
82 (self.envelope_store.usage() as usize) < self.max_disk_size
83 }
84
85 async fn store_total_count(&self) -> u64 {
86 self.envelope_store
87 .total_count()
88 .await
89 .unwrap_or_else(|error| {
90 relay_log::error!(
91 error = &error as &dyn Error,
92 "failed to get the total count of envelopes for the sqlite envelope store",
93 );
94 0
96 })
97 }
98
99 fn total_size(&self) -> Option<u64> {
100 Some(self.envelope_store.usage())
101 }
102
103 fn stack_type<'a>(&self) -> &'a str {
104 "sqlite"
105 }
106
107 async fn flush(&mut self, envelope_stacks: impl IntoIterator<Item = Self::Stack>) {
108 relay_log::trace!("Flushing sqlite envelope buffer");
109
110 let partition_tag = self.partition_id.to_string();
111 relay_statsd::metric!(
112 timer(RelayTimers::BufferDrain),
113 partition_id = &partition_tag,
114 {
115 for envelope_stack in envelope_stacks {
116 envelope_stack.flush().await;
117 }
118 }
119 );
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use std::sync::Arc;
126
127 use relay_base_schema::project::ProjectKey;
128 use relay_config::Config;
129 use uuid::Uuid;
130
131 use crate::EnvelopeStack;
132 use crate::services::buffer::common::ProjectKeyPair;
133 use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
134 use crate::services::buffer::stack_provider::{StackCreationType, StackProvider};
135 use crate::services::buffer::testutils::utils::mock_envelopes;
136
137 fn mock_config() -> Arc<Config> {
138 let path = std::env::temp_dir()
139 .join(Uuid::new_v4().to_string())
140 .into_os_string()
141 .into_string()
142 .unwrap();
143
144 Config::from_json_value(serde_json::json!({
145 "spool": {
146 "envelopes": {
147 "path": path,
148 "disk_batch_size": 100,
149 "max_batches": 1,
150 }
151 }
152 }))
153 .unwrap()
154 .into()
155 }
156
157 #[tokio::test]
158 async fn test_flush() {
159 let config = mock_config();
160 let mut stack_provider = SqliteStackProvider::new(0, &config).await.unwrap();
161
162 let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
163 let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
164
165 let mut envelope_stack = stack_provider.create_stack(
166 StackCreationType::New,
167 ProjectKeyPair::new(own_key, sampling_key),
168 );
169
170 let envelopes = mock_envelopes(10);
171 for envelope in envelopes {
172 envelope_stack.push(envelope).await.unwrap();
173 }
174
175 let envelope_store = stack_provider.envelope_store.clone();
176
177 assert_eq!(envelope_store.total_count().await.unwrap(), 0);
180
181 stack_provider.flush(vec![envelope_stack]).await;
183 assert_eq!(envelope_store.total_count().await.unwrap(), 10);
184 }
185}