1use std::future::Future;
2use std::io;
3
4use axum::extract::Request;
5use bytes::Bytes;
6use futures::TryStreamExt;
7use multer::{Field, Multipart};
8use relay_config::Config;
9use relay_quotas::DataCategory;
10use relay_system::Addr;
11use serde::{Deserialize, Serialize};
12use tokio::io::AsyncReadExt;
13use tokio_util::io::StreamReader;
14
15use crate::endpoints::common::BadStoreRequest;
16use crate::envelope::{AttachmentType, ContentType, Item, ItemType, Items};
17use crate::extractors::RequestMeta;
18use crate::managed::Managed;
19use crate::services::outcome::{
20 DiscardAttachmentType, DiscardItemType, DiscardReason, Outcome, TrackOutcome,
21};
22
23type Len = u32;
25
26fn write_string<W>(mut writer: W, string: &str) -> io::Result<()>
28where
29 W: io::Write,
30{
31 writer.write_all(&(string.len() as Len).to_le_bytes())?;
32 writer.write_all(string.as_bytes())?;
33
34 Ok(())
35}
36
37fn split_front<'a>(data: &mut &'a [u8], len: usize) -> Option<&'a [u8]> {
39 if data.len() < len {
40 *data = &[];
41 return None;
42 }
43
44 let (slice, rest) = data.split_at(len);
45 *data = rest;
46 Some(slice)
47}
48
49fn consume_len(data: &mut &[u8]) -> Option<usize> {
51 let len = std::mem::size_of::<Len>();
52 let slice = split_front(data, len)?;
53 let bytes = slice.try_into().ok();
54 bytes.map(|b| Len::from_le_bytes(b) as usize)
55}
56
57fn consume_string<'a>(data: &mut &'a [u8]) -> Option<&'a str> {
59 let len = consume_len(data)?;
60 let bytes = split_front(data, len)?;
61 std::str::from_utf8(bytes).ok()
62}
63
64#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
66pub struct FormDataEntry<'a>(&'a str, &'a str);
67
68impl<'a> FormDataEntry<'a> {
69 pub fn new(key: &'a str, value: &'a str) -> Self {
70 Self(key, value)
71 }
72
73 pub fn key(&self) -> &'a str {
74 self.0
75 }
76
77 pub fn value(&self) -> &'a str {
78 self.1
79 }
80
81 fn to_writer<W: io::Write>(&self, mut writer: W) {
82 write_string(&mut writer, self.key()).ok();
83 write_string(&mut writer, self.value()).ok();
84 }
85
86 fn read(data: &mut &'a [u8]) -> Option<Self> {
87 let key = consume_string(data)?;
88 let value = consume_string(data)?;
89 Some(Self::new(key, value))
90 }
91}
92
93struct FormDataWriter {
98 data: Vec<u8>,
99}
100
101impl FormDataWriter {
102 pub fn new() -> Self {
103 Self { data: Vec::new() }
104 }
105
106 pub fn append(&mut self, key: &str, value: &str) {
107 let entry = FormDataEntry::new(key, value);
108 entry.to_writer(&mut self.data);
109 }
110
111 pub fn into_inner(self) -> Vec<u8> {
112 self.data
113 }
114}
115
116pub struct FormDataIter<'a> {
118 data: &'a [u8],
119}
120
121impl<'a> FormDataIter<'a> {
122 pub fn new(data: &'a [u8]) -> Self {
123 Self { data }
124 }
125}
126
127impl<'a> Iterator for FormDataIter<'a> {
128 type Item = FormDataEntry<'a>;
129
130 fn next(&mut self) -> Option<Self::Item> {
131 while !self.data.is_empty() {
132 match FormDataEntry::read(&mut self.data) {
133 Some(entry) => return Some(entry),
134 None => relay_log::error!("form data deserialization failed"),
135 }
136 }
137
138 None
139 }
140}
141
142pub fn get_multipart_boundary(data: &[u8]) -> Option<&str> {
156 data.split(|&byte| byte == b'\r' || byte == b'\n')
157 .find(|slice| !slice.is_empty())
159 .filter(|slice| slice.len() > 2 && slice.starts_with(b"--"))
161 .and_then(|slice| std::str::from_utf8(&slice[2..]).ok())
163}
164
165pub trait AttachmentStrategy {
171 fn infer_type(&self, field: &Field) -> AttachmentType;
172
173 fn add_to_item(
182 &self,
183 field: Field<'static>,
184 item: Managed<Item>,
185 config: &Config,
186 ) -> impl Future<Output = Result<Option<Managed<Item>>, multer::Error>> + Send;
187}
188
189pub async fn read_bytes_into_item(
190 field: Field<'static>,
191 mut item: Managed<Item>,
192 config: &Config,
193) -> Result<Managed<Item>, multer::Error> {
194 let content_type = field
195 .content_type()
196 .map(|ct| ct.as_ref().parse().unwrap_or(ContentType::OctetStream));
197 let field_name = field.name().map(String::from);
198 let limit = config.max_attachment_size();
199 let mut buf = Vec::new();
200 StreamReader::new(field.map_err(io::Error::other))
201 .take((limit + 1) as u64) .read_to_end(&mut buf)
203 .await
204 .map_err(|e| multer::Error::StreamReadFailed(Box::new(e)))?;
205 let bytes = Bytes::from(buf);
206 let n_bytes = bytes.len();
207 item.modify(|inner, records| {
208 if let Some(content_type) = content_type {
209 inner.set_payload(content_type, bytes);
210 } else {
211 inner.set_payload_without_content_type(bytes);
212 };
213 records.lenient(DataCategory::Attachment);
214 });
215
216 if n_bytes > limit {
217 let attachment_type = item.attachment_type().unwrap_or(AttachmentType::Attachment);
218 let item_type = DiscardItemType::Attachment(DiscardAttachmentType::from(attachment_type));
219 let _ = item.reject_err(Outcome::Invalid(DiscardReason::ItemTooLarge(item_type)));
220
221 Err(multer::Error::FieldSizeExceeded {
222 limit: limit as u64,
223 field_name,
224 })
225 } else {
226 Ok(item)
227 }
228}
229
230pub async fn multipart_items(
231 mut multipart: Multipart<'static>,
232 config: &Config,
233 attachment_strategy: impl AttachmentStrategy,
234 request_meta: &RequestMeta,
235 outcome_aggregator: &Addr<TrackOutcome>,
236) -> Result<Managed<Items>, multer::Error> {
237 let mut items =
238 Managed::with_meta_from_request_meta(request_meta, outcome_aggregator, Items::new());
239 let mut form_data = FormDataWriter::new();
240 let mut attachments_size = 0;
241
242 while let Some(field) = multipart.next_field().await? {
243 if let Some(file_name) = field.file_name() {
244 let mut item = Item::new(ItemType::Attachment);
245 let attachment_type = attachment_strategy.infer_type(&field);
246 item.set_attachment_type(attachment_type);
247 item.set_filename(file_name);
248 let item = items.wrap(item);
249 let item = attachment_strategy
250 .add_to_item(field, item, config)
251 .await
252 .inspect_err(|e| {
253 if let multer::Error::FieldSizeExceeded { .. } = e {
254 let attachment_type = DiscardAttachmentType::from(attachment_type);
255 let item_type = DiscardItemType::Attachment(attachment_type);
256 let discard_reason = DiscardReason::ItemTooLarge(item_type);
257 let _ = items.reject_err(Outcome::Invalid(discard_reason));
258 }
259 })?;
260 if let Some(item) = item {
261 attachments_size += item.len();
265 items.merge_with(item, |items, item, _| items.push(item));
266 if attachments_size > config.max_attachments_size() {
267 let item_type = DiscardItemType::Attachment(DiscardAttachmentType::Attachment);
268 let _ =
269 items.reject_err(Outcome::Invalid(DiscardReason::ItemTooLarge(item_type)));
270 return Err(multer::Error::StreamSizeExceeded {
271 limit: config.max_attachments_size() as u64,
272 });
273 }
274 }
275 } else if let Some(field_name) = field.name().map(str::to_owned) {
276 let string = field.text().await?;
279 form_data.append(&field_name, &string);
280 } else {
281 relay_log::trace!("multipart content without name or file_name");
282 }
283 }
284
285 let form_data = form_data.into_inner();
286 if !form_data.is_empty() {
287 let mut item = Item::new(ItemType::FormData);
288 item.set_payload(ContentType::Text, form_data);
291 items.merge_with(items.wrap(item), |items, item, _| items.push(item));
292 }
293
294 Ok(items)
295}
296
297pub fn multipart_from_request(request: Request) -> Result<Multipart<'static>, BadStoreRequest> {
298 let content_type = request
299 .headers()
300 .get("content-type")
301 .and_then(|v| v.to_str().ok())
302 .unwrap_or("");
303 let boundary =
304 multer::parse_boundary(content_type).map_err(BadStoreRequest::InvalidMultipart)?;
305 Ok(Multipart::new(
306 request.into_body().into_data_stream(),
307 boundary,
308 ))
309}
310
311#[cfg(test)]
312mod tests {
313 use std::convert::Infallible;
314
315 use super::*;
316
317 fn mock_request_meta() -> RequestMeta {
318 let dsn = "https://a94ae32be2582e0bbd7a4cbb95971fee:@sentry.io/42"
319 .parse()
320 .unwrap();
321 RequestMeta::new(dsn)
322 }
323
324 #[test]
325 fn test_get_boundary() {
326 let examples: &[(&[u8], Option<&str>)] = &[
327 (b"--some_val", Some("some_val")),
328 (b"--\nsecond line", None),
329 (b"\n\r--some_val", Some("some_val")),
330 (b"\n\r--some_val\nadfa", Some("some_val")),
331 (b"\n\r--some_val\rfasdf", Some("some_val")),
332 (b"\n\r--some_val\r\nfasdf", Some("some_val")),
333 (b"\n\rsome_val", None),
334 (b"", None),
335 (b"--", None),
336 ];
337
338 for (input, expected) in examples {
339 let boundary = get_multipart_boundary(input);
340 assert_eq!(*expected, boundary);
341 }
342 }
343
344 #[test]
345 fn test_formdata() {
346 let mut writer = FormDataWriter::new();
347 writer.append("foo", "foo");
348 writer.append("bar", "");
349 writer.append("blub", "blub");
350
351 let payload = writer.into_inner();
352 let iter = FormDataIter::new(&payload);
353 let entries: Vec<_> = iter.collect();
354
355 assert_eq!(
356 entries,
357 vec![
358 FormDataEntry::new("foo", "foo"),
359 FormDataEntry::new("bar", ""),
360 FormDataEntry::new("blub", "blub"),
361 ]
362 );
363 }
364
365 #[test]
366 fn test_empty_formdata() {
367 let writer = FormDataWriter::new();
368 let payload = writer.into_inner();
369
370 let iter = FormDataIter::new(&payload);
371 let entries: Vec<_> = iter.collect();
372
373 assert_eq!(entries, vec![]);
374 }
375
376 #[tokio::test]
378 async fn missing_trailing_newline() {
379 let data = "--X-BOUNDARY\r\nContent-Disposition: form-data; \
380 name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--"; let stream = futures::stream::once(async { Ok::<_, Infallible>(data) });
383 let mut multipart = Multipart::new(stream, "X-BOUNDARY");
384
385 assert!(multipart.next_field().await.unwrap().is_some());
386 assert!(multipart.next_field().await.unwrap().is_none());
387 }
388
389 #[tokio::test]
390 async fn test_individual_size_limit_exceeded() {
391 let data = "--X-BOUNDARY\r\n\
392 Content-Disposition: form-data; name=\"file\"; filename=\"large.txt\"\r\n\
393 Content-Type: text/plain\r\n\
394 \r\n\
395 content too large for limit\r\n\
396 --X-BOUNDARY\r\n\
397 Content-Disposition: form-data; name=\"small_file\"; filename=\"small.txt\"\r\n\
398 Content-Type: text/plain\r\n\
399 \r\n\
400 ok\r\n\
401 --X-BOUNDARY--\r\n";
402
403 let stream = futures::stream::once(async move { Ok::<_, Infallible>(data) });
404 let multipart = Multipart::new(stream, "X-BOUNDARY");
405
406 let config = Config::from_json_value(serde_json::json!({
407 "limits": {
408 "max_attachment_size": 5
409 }
410 }))
411 .unwrap();
412
413 struct MockAttachmentStrategy;
414 impl AttachmentStrategy for MockAttachmentStrategy {
415 async fn add_to_item(
416 &self,
417 field: Field<'static>,
418 item: Managed<Item>,
419 config: &Config,
420 ) -> Result<Option<Managed<Item>>, multer::Error> {
421 read_bytes_into_item(field, item, config).await.map(Some)
422 }
423
424 fn infer_type(&self, _: &Field) -> AttachmentType {
425 AttachmentType::Attachment
426 }
427 }
428
429 let res = multipart_items(
430 multipart,
431 &config,
432 MockAttachmentStrategy,
433 &mock_request_meta(),
434 &Addr::dummy(),
435 )
436 .await;
437 assert!(res.is_err_and(|x| matches!(x, multer::Error::FieldSizeExceeded { .. })));
438 }
439
440 #[tokio::test]
441 async fn test_collective_size_limit_exceeded() {
442 let data = "--X-BOUNDARY\r\n\
443 Content-Disposition: form-data; name=\"file\"; filename=\"large.txt\"\r\n\
444 Content-Type: text/plain\r\n\
445 \r\n\
446 content too large for limit\r\n\
447 --X-BOUNDARY\r\n\
448 Content-Disposition: form-data; name=\"small_file\"; filename=\"small.txt\"\r\n\
449 Content-Type: text/plain\r\n\
450 \r\n\
451 ok\r\n\
452 --X-BOUNDARY--\r\n";
453
454 let stream = futures::stream::once(async move { Ok::<_, Infallible>(data) });
455
456 let config = &Config::from_json_value(serde_json::json!({
457 "limits": {
458 "max_attachments_size": 5
459 }
460 }))
461 .unwrap();
462
463 let multipart = Multipart::new(stream, "X-BOUNDARY");
464
465 struct MockAttachmentStrategy;
466 impl AttachmentStrategy for MockAttachmentStrategy {
467 async fn add_to_item(
468 &self,
469 field: Field<'static>,
470 item: Managed<Item>,
471 config: &Config,
472 ) -> Result<Option<Managed<Item>>, multer::Error> {
473 read_bytes_into_item(field, item, config).await.map(Some)
474 }
475
476 fn infer_type(&self, _: &Field) -> AttachmentType {
477 AttachmentType::Attachment
478 }
479 }
480
481 let result = multipart_items(
482 multipart,
483 config,
484 MockAttachmentStrategy,
485 &mock_request_meta(),
486 &Addr::dummy(),
487 )
488 .await;
489
490 assert!(result.is_err_and(|x| matches!(x, multer::Error::StreamSizeExceeded { limit: _ })));
492 }
493}