bench_buffer/
main.rs

1use bytes::Bytes;
2use chrono::Utc;
3use clap::{Parser, ValueEnum};
4use rand::RngCore;
5use relay_config::Config;
6use relay_server::{Envelope, MemoryChecker, MemoryStat, PolymorphicEnvelopeBuffer};
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11#[derive(Clone, Copy, Debug, ValueEnum)]
12enum Impl {
13    Memory,
14    Sqlite,
15}
16
17#[derive(Clone, Copy, Debug, ValueEnum)]
18enum Mode {
19    Sequential,
20    Interleaved,
21}
22
23#[derive(Parser, Debug)]
24struct Args {
25    #[arg(long)]
26    envelope_size_bytes: usize,
27    #[arg(long)]
28    compression_ratio: f64,
29    #[arg(long)]
30    batch_size_kib: usize,
31    #[arg(long)]
32    implementation: Impl,
33    #[arg(long)]
34    mode: Mode,
35    #[arg(long)]
36    projects: usize,
37    #[arg(long, default_value_t = 60)]
38    duration_secs: u64,
39    #[arg(long, default_value = None)]
40    db: Option<PathBuf>,
41}
42
43#[tokio::main]
44async fn main() {
45    let args = Args::parse();
46    println!("{:?}", &args);
47    let Args {
48        envelope_size_bytes,
49        compression_ratio,
50        batch_size_kib,
51        implementation,
52        mode,
53        projects,
54        duration_secs,
55        db,
56    } = args;
57    unsafe {
58        relay_log::init(&Default::default(), &Default::default());
59    }
60
61    let dir = tempfile::tempdir().unwrap();
62    let path = db.unwrap_or(dir.path().join("envelopes.db"));
63
64    let config = Arc::new(
65        Config::from_json_value(serde_json::json!({
66            "spool": {
67                "envelopes": {
68                    "buffer_strategy": match implementation {
69                        Impl::Memory => "memory",
70                        Impl::Sqlite => "sqlite",
71                    },
72                    "path": match implementation {
73                        Impl::Memory => None,
74                        Impl::Sqlite => Some(path),
75                    },
76                    "batch_size_bytes": batch_size_kib * 1024,
77                }
78            }
79        }))
80        .unwrap(),
81    );
82
83    let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());
84    let buffer = PolymorphicEnvelopeBuffer::from_config(0, &config, memory_checker)
85        .await
86        .unwrap();
87
88    match mode {
89        Mode::Sequential => {
90            run_sequential(
91                buffer,
92                envelope_size_bytes,
93                compression_ratio,
94                projects,
95                Duration::from_secs(duration_secs),
96            )
97            .await
98        }
99        Mode::Interleaved => {
100            run_interleaved(
101                buffer,
102                envelope_size_bytes,
103                compression_ratio,
104                projects,
105                Duration::from_secs(duration_secs),
106            )
107            .await
108        }
109    };
110
111    println!("Cleaning up temporary files...");
112    drop(dir);
113    println!("Done...");
114}
115
116async fn run_sequential(
117    mut buffer: PolymorphicEnvelopeBuffer,
118    envelope_size: usize,
119    compression_ratio: f64,
120    project_count: usize,
121    duration: Duration,
122) {
123    // Determine envelope size once:
124    let proto_envelope = mock_envelope(envelope_size, project_count, compression_ratio);
125    let bytes_per_envelope = proto_envelope.to_vec().unwrap().len();
126
127    let start_time = Instant::now();
128
129    let mut last_check = Instant::now();
130    let mut write_duration = Duration::ZERO;
131    let mut writes = 0;
132    while start_time.elapsed() < duration / 2 {
133        let envelope = mock_envelope(envelope_size, project_count, compression_ratio);
134
135        let before = Instant::now();
136        buffer.push(envelope).await.unwrap();
137        let after = Instant::now();
138
139        write_duration += after - before;
140        writes += 1;
141
142        if (after - last_check) > Duration::from_secs(1) {
143            let throughput = (writes * bytes_per_envelope) as f64 / write_duration.as_secs_f64();
144            let throughput = throughput / 1024.0 / 1024.0;
145            println!("{throughput:.2}");
146            write_duration = Duration::ZERO;
147            writes = 0;
148            last_check = after;
149        }
150    }
151
152    let mut last_check = Instant::now();
153    let mut read_duration = Duration::ZERO;
154    let mut reads = 0;
155    while start_time.elapsed() < duration {
156        let before = Instant::now();
157        if buffer.pop().await.unwrap().is_none() {
158            break;
159        };
160        let after = Instant::now();
161
162        read_duration += after - before;
163        reads += 1;
164
165        if (after - last_check) > Duration::from_secs(1) {
166            let throughput = (reads * bytes_per_envelope) as f64 / read_duration.as_secs_f64();
167            let throughput = throughput / 1024.0 / 1024.0;
168            println!("Read throughput: {throughput:.2} MiB / s");
169            read_duration = Duration::ZERO;
170            reads = 0;
171            last_check = after;
172        }
173    }
174}
175
176async fn run_interleaved(
177    mut buffer: PolymorphicEnvelopeBuffer,
178    envelope_size: usize,
179    compression_ratio: f64,
180    project_count: usize,
181    duration: Duration,
182) {
183    // Determine envelope size once:
184    let proto_envelope = mock_envelope(envelope_size, project_count, compression_ratio);
185    let bytes_per_envelope = proto_envelope.to_vec().unwrap().len();
186
187    let start_time = Instant::now();
188
189    let mut last_check = Instant::now();
190    let mut write_duration = Duration::ZERO;
191    let mut read_duration = Duration::ZERO;
192    let mut iterations = 0;
193    while start_time.elapsed() < duration {
194        let envelope = mock_envelope(envelope_size, project_count, compression_ratio);
195
196        let before = Instant::now();
197        buffer.push(envelope).await.unwrap();
198        let after_write = Instant::now();
199        buffer.pop().await.unwrap();
200        let after_read = Instant::now();
201
202        write_duration += after_write - before;
203        read_duration += after_read - after_write;
204        iterations += 1;
205
206        if (after_read - last_check) > Duration::from_secs(1) {
207            let write_throughput =
208                (iterations * bytes_per_envelope) as f64 / write_duration.as_secs_f64();
209            let write_throughput = write_throughput / 1024.0 / 1024.0;
210            let read_throughput =
211                (iterations * bytes_per_envelope) as f64 / read_duration.as_secs_f64();
212            let read_throughput = read_throughput / 1024.0 / 1024.0;
213            println!("Write throughput: {write_throughput:.2} MiB / s");
214
215            println!("Read throughput: {read_throughput:.2} MiB / s");
216            write_duration = Duration::ZERO;
217            read_duration = Duration::ZERO;
218            iterations = 0;
219
220            last_check = after_read;
221        }
222    }
223}
224
225fn mock_envelope(
226    payload_size: usize,
227    project_count: usize,
228    compression_ratio: f64,
229) -> Box<Envelope> {
230    let project_key = (rand::random::<f64>() * project_count as f64) as u128;
231    let mut envelope = format!(
232        "\
233            {{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\",\"dsn\":\"https://{project_key:032x}:@sentry.io/42\"}}\n\
234            {{\"type\":\"attachment\", \"length\":{payload_size}}}\n",
235    ).into_bytes();
236
237    // Fill with random bytes to get estimated compression ratio:
238    let mut payload = vec![0u8; payload_size];
239    let fraction = (payload_size as f64 / compression_ratio) as usize;
240    rand::thread_rng().fill_bytes(&mut payload[..fraction]);
241    envelope.extend(payload);
242
243    let mut envelope = Envelope::parse_bytes(Bytes::from(envelope)).unwrap();
244    envelope.set_received_at(Utc::now());
245    envelope
246}