relay_server/services/buffer/envelope_stack/
sqlite.rs1use std::fmt::Debug;
2use std::num::NonZeroUsize;
3
4use chrono::{DateTime, Utc};
5use relay_base_schema::project::ProjectKey;
6
7use crate::envelope::Envelope;
8use crate::services::buffer::envelope_stack::EnvelopeStack;
9use crate::services::buffer::envelope_store::sqlite::{
10 DatabaseBatch, DatabaseEnvelope, InsertEnvelopeError, SqliteEnvelopeStore,
11 SqliteEnvelopeStoreError,
12};
13use crate::statsd::{RelayCounters, RelayTimers};
14
15#[derive(Debug, thiserror::Error)]
17pub enum SqliteEnvelopeStackError {
18 #[error("envelope store error: {0}")]
19 EnvelopeStoreError(#[from] SqliteEnvelopeStoreError),
20 #[error("envelope encode error: {0}")]
21 Envelope(#[from] InsertEnvelopeError),
22}
23
24#[derive(Debug)]
25pub struct SqliteEnvelopeStack {
30 envelope_store: SqliteEnvelopeStore,
32 batch_size_bytes: NonZeroUsize,
34 own_key: ProjectKey,
36 sampling_key: ProjectKey,
38 batch: Vec<DatabaseEnvelope>,
40 check_disk: bool,
43 partition_tag: String,
45}
46
47impl SqliteEnvelopeStack {
48 pub fn new(
50 partition_id: u8,
51 envelope_store: SqliteEnvelopeStore,
52 batch_size_bytes: usize,
53 own_key: ProjectKey,
54 sampling_key: ProjectKey,
55 check_disk: bool,
56 ) -> Self {
57 Self {
58 envelope_store,
59 batch_size_bytes: NonZeroUsize::new(batch_size_bytes)
60 .expect("batch bytes should be > 0"),
61 own_key,
62 sampling_key,
63 batch: vec![],
64 check_disk,
65 partition_tag: partition_id.to_string(),
66 }
67 }
68
69 fn above_spool_threshold(&self) -> bool {
71 self.batch.iter().map(|e| e.len()).sum::<usize>() > self.batch_size_bytes.get()
72 }
73
74 async fn spool_to_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> {
80 let batch = std::mem::take(&mut self.batch);
81 let Ok(batch) = DatabaseBatch::try_from(batch) else {
82 return Ok(());
83 };
84
85 relay_statsd::metric!(
86 counter(RelayCounters::BufferSpooledEnvelopes) += batch.len() as u64,
87 partition_id = &self.partition_tag
88 );
89
90 relay_statsd::metric!(
95 timer(RelayTimers::BufferSpool),
96 partition_id = &self.partition_tag,
97 {
98 self.envelope_store
99 .insert_batch(batch)
100 .await
101 .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?;
102 }
103 );
104
105 self.check_disk = true;
107
108 Ok(())
109 }
110
111 async fn unspool_from_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> {
115 debug_assert!(self.batch.is_empty());
116 let batch = relay_statsd::metric!(
117 timer(RelayTimers::BufferUnspool),
118 partition_id = &self.partition_tag,
119 {
120 self.envelope_store
121 .delete_batch(self.own_key, self.sampling_key)
122 .await
123 .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?
124 }
125 );
126
127 match batch {
128 Some(batch) => {
129 self.batch = batch.into();
130 }
131 None => self.check_disk = false,
132 }
133
134 relay_statsd::metric!(
135 counter(RelayCounters::BufferUnspooledEnvelopes) += self.batch.len() as u64,
136 partition_id = &self.partition_tag
137 );
138 Ok(())
139 }
140
141 fn validate_envelope(&self, envelope: &Envelope) -> bool {
144 let own_key = envelope.meta().public_key();
145 let sampling_key = envelope.sampling_key().unwrap_or(own_key);
146
147 self.own_key == own_key && self.sampling_key == sampling_key
148 }
149}
150
151impl EnvelopeStack for SqliteEnvelopeStack {
152 type Error = SqliteEnvelopeStackError;
153
154 async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), Self::Error> {
155 debug_assert!(self.validate_envelope(&envelope));
156
157 if self.above_spool_threshold() {
158 self.spool_to_disk().await?;
159 }
160
161 let encoded_envelope = relay_statsd::metric!(
162 timer(RelayTimers::BufferEnvelopesSerialization),
163 partition_id = &self.partition_tag,
164 { DatabaseEnvelope::try_from(envelope.as_ref())? }
165 );
166 self.batch.push(encoded_envelope);
167
168 Ok(())
169 }
170
171 async fn peek(&mut self) -> Result<Option<DateTime<Utc>>, Self::Error> {
172 if self.batch.is_empty() && self.check_disk {
173 self.unspool_from_disk().await?
174 }
175
176 let Some(envelope) = self.batch.last() else {
177 return Ok(None);
178 };
179
180 Ok(Some(envelope.received_at()))
181 }
182
183 async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
184 if self.batch.is_empty() && self.check_disk {
185 self.unspool_from_disk().await?
186 }
187
188 let Some(envelope) = self.batch.pop() else {
189 return Ok(None);
190 };
191 let envelope = envelope.try_into()?;
192
193 Ok(Some(envelope))
194 }
195
196 async fn flush(mut self) {
197 if let Err(e) = self.spool_to_disk().await {
198 relay_log::error!(error = &e as &dyn std::error::Error, "flush error");
199 }
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use chrono::Utc;
206 use relay_base_schema::project::ProjectKey;
207 use std::time::Duration;
208
209 use super::*;
210 use crate::services::buffer::testutils::utils::{mock_envelope, mock_envelopes, setup_db};
211
212 fn calculate_compressed_size(envelopes: &[Box<Envelope>]) -> usize {
214 envelopes
215 .iter()
216 .map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap().len())
217 .sum()
218 }
219
220 #[tokio::test]
221 #[should_panic]
222 async fn test_push_with_mismatching_project_keys() {
223 let db = setup_db(false).await;
224 let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
225 let mut stack = SqliteEnvelopeStack::new(
226 0,
227 envelope_store,
228 10,
229 ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
230 ProjectKey::parse("c25ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
231 true,
232 );
233
234 let envelope = mock_envelope(Utc::now());
235 let _ = stack.push(envelope).await;
236 }
237
238 const COMPRESSED_ENVELOPE_SIZE: usize = 313;
239
240 #[tokio::test]
241 async fn test_push_when_db_is_not_valid() {
242 let db = setup_db(false).await;
243 let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
244
245 let envelopes = mock_envelopes(4);
247 let threshold_size = calculate_compressed_size(&envelopes) - 1;
248
249 let mut stack = SqliteEnvelopeStack::new(
250 0,
251 envelope_store,
252 threshold_size,
253 ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
254 ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
255 true,
256 );
257
258 for envelope in envelopes.clone() {
260 assert!(stack.push(envelope).await.is_ok());
261 }
262
263 let envelope = mock_envelope(Utc::now());
266 assert!(matches!(
267 stack.push(envelope).await,
268 Err(SqliteEnvelopeStackError::EnvelopeStoreError(_))
269 ));
270
271 let envelope = mock_envelope(Utc::now());
274 assert!(stack.push(envelope.clone()).await.is_ok());
275 assert_eq!(stack.batch.len(), 1);
276
277 let popped_envelope_1 = stack.pop().await.unwrap().unwrap();
279 assert_eq!(
280 popped_envelope_1.event_id().unwrap(),
281 envelope.event_id().unwrap()
282 );
283 assert_eq!(stack.batch.len(), 0);
284 }
285
286 #[tokio::test]
287 async fn test_pop_when_db_is_not_valid() {
288 let db = setup_db(false).await;
289 let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
290 let mut stack = SqliteEnvelopeStack::new(
291 0,
292 envelope_store,
293 2,
294 ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
295 ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
296 true,
297 );
298
299 assert!(matches!(
301 stack.pop().await,
302 Err(SqliteEnvelopeStackError::EnvelopeStoreError(_))
303 ));
304 }
305
306 #[tokio::test]
307 async fn test_pop_when_stack_is_empty() {
308 let db = setup_db(true).await;
309 let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
310 let mut stack = SqliteEnvelopeStack::new(
311 0,
312 envelope_store,
313 2,
314 ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
315 ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
316 true,
317 );
318
319 assert!(stack.pop().await.unwrap().is_none());
322 }
323
324 #[tokio::test]
325 async fn test_push_below_threshold_and_pop() {
326 let db = setup_db(true).await;
327 let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
328 let mut stack = SqliteEnvelopeStack::new(
329 0,
330 envelope_store,
331 9999,
332 ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
333 ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
334 true,
335 );
336
337 let envelopes = mock_envelopes(5);
338
339 for envelope in envelopes.clone() {
341 assert!(stack.push(envelope).await.is_ok());
342 }
343 assert_eq!(stack.batch.len(), 5);
344
345 let peeked = stack.peek().await.unwrap().unwrap();
347 assert_eq!(
348 peeked.timestamp_millis(),
349 envelopes.clone()[4].received_at().timestamp_millis()
350 );
351
352 for envelope in envelopes.iter().rev() {
354 let popped_envelope = stack.pop().await.unwrap().unwrap();
355 assert_eq!(
356 popped_envelope.event_id().unwrap(),
357 envelope.event_id().unwrap()
358 );
359 }
360
361 assert_eq!(stack.batch.len(), 0);
362 }
363
364 #[tokio::test]
365 async fn test_push_above_threshold_and_pop() {
366 let db = setup_db(true).await;
367 let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
368
369 let envelopes = mock_envelopes(7);
371 let threshold_size = calculate_compressed_size(&envelopes[..5]) - 1;
372
373 let mut stack = SqliteEnvelopeStack::new(
375 0,
376 envelope_store,
377 threshold_size,
378 ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
379 ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
380 true,
381 );
382
383 for envelope in envelopes.clone() {
385 assert!(stack.push(envelope).await.is_ok());
386 }
387 assert_eq!(stack.batch.len(), 2);
388
389 let peeked = stack.peek().await.unwrap().unwrap();
391 assert_eq!(
392 peeked.timestamp_millis(),
393 envelopes[6].received_at().timestamp_millis()
394 );
395
396 for envelope in envelopes[5..7].iter().rev() {
399 let popped_envelope = stack.pop().await.unwrap().unwrap();
400 assert_eq!(
401 popped_envelope.event_id().unwrap(),
402 envelope.event_id().unwrap()
403 );
404 }
405 assert_eq!(stack.batch.len(), 0);
406
407 let peeked = stack.peek().await.unwrap().unwrap();
409 assert_eq!(
410 peeked.timestamp_millis(),
411 envelopes[4].received_at().timestamp_millis()
412 );
413
414 let envelope = mock_envelope(Utc::now());
417 assert!(stack.push(envelope.clone()).await.is_ok());
418
419 let popped_envelope = stack.pop().await.unwrap().unwrap();
421 assert_eq!(
422 popped_envelope.event_id().unwrap(),
423 envelope.event_id().unwrap()
424 );
425
426 for envelope in envelopes[0..5].iter().rev() {
429 let popped_envelope = stack.pop().await.unwrap().unwrap();
430 assert_eq!(
431 popped_envelope.event_id().unwrap(),
432 envelope.event_id().unwrap()
433 );
434 }
435 assert_eq!(stack.batch.len(), 0);
436 }
437
438 #[tokio::test]
439 async fn test_drain() {
440 let db = setup_db(true).await;
441 let envelope_store = SqliteEnvelopeStore::new(0, db, Duration::from_millis(100));
442 let mut stack = SqliteEnvelopeStack::new(
443 0,
444 envelope_store.clone(),
445 10 * COMPRESSED_ENVELOPE_SIZE,
446 ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
447 ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
448 true,
449 );
450
451 let envelopes = mock_envelopes(5);
452
453 for envelope in envelopes.clone() {
455 assert!(stack.push(envelope).await.is_ok());
456 }
457 assert_eq!(stack.batch.len(), 5);
458 assert_eq!(envelope_store.total_count().await.unwrap(), 0);
459
460 stack.flush().await;
462 assert_eq!(envelope_store.total_count().await.unwrap(), 5);
463 }
464}