1use bytes::Bytes;
2use chrono::Utc;
3use clap::{Parser, ValueEnum};
4use rand::RngCore;
5use relay_config::Config;
6use relay_server::{Envelope, MemoryChecker, MemoryStat, PolymorphicEnvelopeBuffer};
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11#[derive(Clone, Copy, Debug, ValueEnum)]
12enum Impl {
13 Memory,
14 Sqlite,
15}
16
17#[derive(Clone, Copy, Debug, ValueEnum)]
18enum Mode {
19 Sequential,
20 Interleaved,
21}
22
23#[derive(Parser, Debug)]
24struct Args {
25 #[arg(long)]
26 envelope_size_bytes: usize,
27 #[arg(long)]
28 compression_ratio: f64,
29 #[arg(long)]
30 batch_size_kib: usize,
31 #[arg(long)]
32 implementation: Impl,
33 #[arg(long)]
34 mode: Mode,
35 #[arg(long)]
36 projects: usize,
37 #[arg(long, default_value_t = 60)]
38 duration_secs: u64,
39 #[arg(long, default_value = None)]
40 db: Option<PathBuf>,
41}
42
43#[tokio::main]
44async fn main() {
45 let args = Args::parse();
46 println!("{:?}", &args);
47 let Args {
48 envelope_size_bytes,
49 compression_ratio,
50 batch_size_kib,
51 implementation,
52 mode,
53 projects,
54 duration_secs,
55 db,
56 } = args;
57 unsafe {
58 relay_log::init(&Default::default(), &Default::default());
59 }
60
61 let dir = tempfile::tempdir().unwrap();
62 let path = db.unwrap_or(dir.path().join("envelopes.db"));
63
64 let config = Arc::new(
65 Config::from_json_value(serde_json::json!({
66 "spool": {
67 "envelopes": {
68 "buffer_strategy": match implementation {
69 Impl::Memory => "memory",
70 Impl::Sqlite => "sqlite",
71 },
72 "path": match implementation {
73 Impl::Memory => None,
74 Impl::Sqlite => Some(path),
75 },
76 "batch_size_bytes": batch_size_kib * 1024,
77 }
78 }
79 }))
80 .unwrap(),
81 );
82
83 let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());
84 let buffer = PolymorphicEnvelopeBuffer::from_config(0, &config, memory_checker)
85 .await
86 .unwrap();
87
88 match mode {
89 Mode::Sequential => {
90 run_sequential(
91 buffer,
92 envelope_size_bytes,
93 compression_ratio,
94 projects,
95 Duration::from_secs(duration_secs),
96 )
97 .await
98 }
99 Mode::Interleaved => {
100 run_interleaved(
101 buffer,
102 envelope_size_bytes,
103 compression_ratio,
104 projects,
105 Duration::from_secs(duration_secs),
106 )
107 .await
108 }
109 };
110
111 println!("Cleaning up temporary files...");
112 drop(dir);
113 println!("Done...");
114}
115
116async fn run_sequential(
117 mut buffer: PolymorphicEnvelopeBuffer,
118 envelope_size: usize,
119 compression_ratio: f64,
120 project_count: usize,
121 duration: Duration,
122) {
123 let proto_envelope = mock_envelope(envelope_size, project_count, compression_ratio);
125 let bytes_per_envelope = proto_envelope.to_vec().unwrap().len();
126
127 let start_time = Instant::now();
128
129 let mut last_check = Instant::now();
130 let mut write_duration = Duration::ZERO;
131 let mut writes = 0;
132 while start_time.elapsed() < duration / 2 {
133 let envelope = mock_envelope(envelope_size, project_count, compression_ratio);
134
135 let before = Instant::now();
136 buffer.push(envelope).await.unwrap();
137 let after = Instant::now();
138
139 write_duration += after - before;
140 writes += 1;
141
142 if (after - last_check) > Duration::from_secs(1) {
143 let throughput = (writes * bytes_per_envelope) as f64 / write_duration.as_secs_f64();
144 let throughput = throughput / 1024.0 / 1024.0;
145 println!("{throughput:.2}");
146 write_duration = Duration::ZERO;
147 writes = 0;
148 last_check = after;
149 }
150 }
151
152 let mut last_check = Instant::now();
153 let mut read_duration = Duration::ZERO;
154 let mut reads = 0;
155 while start_time.elapsed() < duration {
156 let before = Instant::now();
157 if buffer.pop().await.unwrap().is_none() {
158 break;
159 };
160 let after = Instant::now();
161
162 read_duration += after - before;
163 reads += 1;
164
165 if (after - last_check) > Duration::from_secs(1) {
166 let throughput = (reads * bytes_per_envelope) as f64 / read_duration.as_secs_f64();
167 let throughput = throughput / 1024.0 / 1024.0;
168 println!("Read throughput: {throughput:.2} MiB / s");
169 read_duration = Duration::ZERO;
170 reads = 0;
171 last_check = after;
172 }
173 }
174}
175
176async fn run_interleaved(
177 mut buffer: PolymorphicEnvelopeBuffer,
178 envelope_size: usize,
179 compression_ratio: f64,
180 project_count: usize,
181 duration: Duration,
182) {
183 let proto_envelope = mock_envelope(envelope_size, project_count, compression_ratio);
185 let bytes_per_envelope = proto_envelope.to_vec().unwrap().len();
186
187 let start_time = Instant::now();
188
189 let mut last_check = Instant::now();
190 let mut write_duration = Duration::ZERO;
191 let mut read_duration = Duration::ZERO;
192 let mut iterations = 0;
193 while start_time.elapsed() < duration {
194 let envelope = mock_envelope(envelope_size, project_count, compression_ratio);
195
196 let before = Instant::now();
197 buffer.push(envelope).await.unwrap();
198 let after_write = Instant::now();
199 buffer.pop().await.unwrap();
200 let after_read = Instant::now();
201
202 write_duration += after_write - before;
203 read_duration += after_read - after_write;
204 iterations += 1;
205
206 if (after_read - last_check) > Duration::from_secs(1) {
207 let write_throughput =
208 (iterations * bytes_per_envelope) as f64 / write_duration.as_secs_f64();
209 let write_throughput = write_throughput / 1024.0 / 1024.0;
210 let read_throughput =
211 (iterations * bytes_per_envelope) as f64 / read_duration.as_secs_f64();
212 let read_throughput = read_throughput / 1024.0 / 1024.0;
213 println!("Write throughput: {write_throughput:.2} MiB / s");
214
215 println!("Read throughput: {read_throughput:.2} MiB / s");
216 write_duration = Duration::ZERO;
217 read_duration = Duration::ZERO;
218 iterations = 0;
219
220 last_check = after_read;
221 }
222 }
223}
224
225fn mock_envelope(
226 payload_size: usize,
227 project_count: usize,
228 compression_ratio: f64,
229) -> Box<Envelope> {
230 let project_key = (rand::random::<f64>() * project_count as f64) as u128;
231 let mut envelope = format!(
232 "\
233 {{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\",\"dsn\":\"https://{project_key:032x}:@sentry.io/42\"}}\n\
234 {{\"type\":\"attachment\", \"length\":{payload_size}}}\n",
235 ).into_bytes();
236
237 let mut payload = vec![0u8; payload_size];
239 let fraction = (payload_size as f64 / compression_ratio) as usize;
240 rand::thread_rng().fill_bytes(&mut payload[..fraction]);
241 envelope.extend(payload);
242
243 let mut envelope = Envelope::parse_bytes(Bytes::from(envelope)).unwrap();
244 envelope.set_received_at(Utc::now());
245 envelope
246}