1use std::convert::Infallible;
2use std::io;
3use std::task::Poll;
4
5use axum::RequestExt;
6use axum::extract::{FromRequest, FromRequestParts, Request};
7use axum::http::StatusCode;
8use axum::response::{IntoResponse, Response};
9use bytes::{Bytes, BytesMut};
10use futures::{StreamExt, TryStreamExt};
11use multer::{Field, Multipart};
12use relay_config::Config;
13use relay_quotas::DataCategory;
14use relay_system::Addr;
15use serde::{Deserialize, Serialize};
16
17use crate::envelope::{AttachmentType, ContentType, Item, ItemType, Items};
18use crate::extractors::{BadEventMeta, PartialDsn, Remote, RequestMeta};
19use crate::service::ServiceState;
20use crate::services::outcome::{
21 DiscardAttachmentType, DiscardItemType, DiscardReason, Outcome, TrackOutcome,
22};
23use crate::utils::ApiErrorResponse;
24
25type Len = u32;
27
28fn write_string<W>(mut writer: W, string: &str) -> io::Result<()>
30where
31 W: io::Write,
32{
33 writer.write_all(&(string.len() as Len).to_le_bytes())?;
34 writer.write_all(string.as_bytes())?;
35
36 Ok(())
37}
38
39fn split_front<'a>(data: &mut &'a [u8], len: usize) -> Option<&'a [u8]> {
41 if data.len() < len {
42 *data = &[];
43 return None;
44 }
45
46 let (slice, rest) = data.split_at(len);
47 *data = rest;
48 Some(slice)
49}
50
51fn consume_len(data: &mut &[u8]) -> Option<usize> {
53 let len = std::mem::size_of::<Len>();
54 let slice = split_front(data, len)?;
55 let bytes = slice.try_into().ok();
56 bytes.map(|b| Len::from_le_bytes(b) as usize)
57}
58
59fn consume_string<'a>(data: &mut &'a [u8]) -> Option<&'a str> {
61 let len = consume_len(data)?;
62 let bytes = split_front(data, len)?;
63 std::str::from_utf8(bytes).ok()
64}
65
66#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
68pub struct FormDataEntry<'a>(&'a str, &'a str);
69
70impl<'a> FormDataEntry<'a> {
71 pub fn new(key: &'a str, value: &'a str) -> Self {
72 Self(key, value)
73 }
74
75 pub fn key(&self) -> &'a str {
76 self.0
77 }
78
79 pub fn value(&self) -> &'a str {
80 self.1
81 }
82
83 fn to_writer<W: io::Write>(&self, mut writer: W) {
84 write_string(&mut writer, self.key()).ok();
85 write_string(&mut writer, self.value()).ok();
86 }
87
88 fn read(data: &mut &'a [u8]) -> Option<Self> {
89 let key = consume_string(data)?;
90 let value = consume_string(data)?;
91 Some(Self::new(key, value))
92 }
93}
94
95struct FormDataWriter {
100 data: Vec<u8>,
101}
102
103impl FormDataWriter {
104 pub fn new() -> Self {
105 Self { data: Vec::new() }
106 }
107
108 pub fn append(&mut self, key: &str, value: &str) {
109 let entry = FormDataEntry::new(key, value);
110 entry.to_writer(&mut self.data);
111 }
112
113 pub fn into_inner(self) -> Vec<u8> {
114 self.data
115 }
116}
117
118pub struct FormDataIter<'a> {
120 data: &'a [u8],
121}
122
123impl<'a> FormDataIter<'a> {
124 pub fn new(data: &'a [u8]) -> Self {
125 Self { data }
126 }
127}
128
129impl<'a> Iterator for FormDataIter<'a> {
130 type Item = FormDataEntry<'a>;
131
132 fn next(&mut self) -> Option<Self::Item> {
133 while !self.data.is_empty() {
134 match FormDataEntry::read(&mut self.data) {
135 Some(entry) => return Some(entry),
136 None => relay_log::error!("form data deserialization failed"),
137 }
138 }
139
140 None
141 }
142}
143
144pub fn get_multipart_boundary(data: &[u8]) -> Option<&str> {
158 data.split(|&byte| byte == b'\r' || byte == b'\n')
159 .find(|slice| !slice.is_empty())
161 .filter(|slice| slice.len() > 2 && slice.starts_with(b"--"))
163 .and_then(|slice| std::str::from_utf8(&slice[2..]).ok())
165}
166
167#[derive(Debug, thiserror::Error)]
168pub enum BadMultipart {
169 #[error("event metadata error: {0}")]
170 EventMeta(#[from] BadEventMeta),
171 #[error("multipart error: {0}")]
172 Multipart(#[from] multer::Error),
173}
174
175impl From<Infallible> for BadMultipart {
176 fn from(infallible: Infallible) -> Self {
177 match infallible {}
178 }
179}
180
181impl IntoResponse for BadMultipart {
182 fn into_response(self) -> Response {
183 let status_code = match self {
184 BadMultipart::Multipart(
185 multer::Error::FieldSizeExceeded { .. } | multer::Error::StreamSizeExceeded { .. },
186 ) => StatusCode::PAYLOAD_TOO_LARGE,
187 _ => StatusCode::BAD_REQUEST,
188 };
189
190 (status_code, ApiErrorResponse::from_error(&self)).into_response()
191 }
192}
193
194async fn multipart_items<F, G>(
195 mut multipart: Multipart<'_>,
196 mut infer_type: F,
197 mut emit_outcome: G,
198 config: &Config,
199 ignore_large_fields: bool,
200) -> Result<Items, multer::Error>
201where
202 F: FnMut(Option<&str>, &str) -> AttachmentType,
203 G: FnMut(Outcome, u32),
204{
205 let mut items = Items::new();
206 let mut form_data = FormDataWriter::new();
207 let mut attachments_size = 0;
208
209 while let Some(field) = multipart.next_field().await? {
210 if let Some(file_name) = field.file_name() {
211 let mut item = Item::new(ItemType::Attachment);
212 item.set_attachment_type(infer_type(field.name(), file_name));
213 item.set_filename(file_name);
214
215 let content_type = field.content_type().cloned();
216 let field = LimitedField::new(field, config.max_attachment_size());
217 match field.bytes().await {
218 Err(multer::Error::FieldSizeExceeded { limit, .. }) if ignore_large_fields => {
219 emit_outcome(
220 Outcome::Invalid(DiscardReason::TooLarge(DiscardItemType::Attachment(
221 DiscardAttachmentType::Attachment,
222 ))),
223 u32::try_from(limit).unwrap_or(u32::MAX),
224 );
225 continue;
226 }
227 Err(err) => return Err(err),
228 Ok(bytes) => {
229 attachments_size += bytes.len();
230
231 if attachments_size > config.max_attachments_size() {
232 return Err(multer::Error::StreamSizeExceeded {
233 limit: config.max_attachments_size() as u64,
234 });
235 }
236
237 if let Some(content_type) = content_type {
238 item.set_payload(content_type.as_ref().into(), bytes);
239 } else {
240 item.set_payload_without_content_type(bytes);
241 }
242 }
243 }
244
245 items.push(item);
246 } else if let Some(field_name) = field.name().map(str::to_owned) {
247 let string = field.text().await?;
250 form_data.append(&field_name, &string);
251 } else {
252 relay_log::trace!("multipart content without name or file_name");
253 }
254 }
255
256 let form_data = form_data.into_inner();
257 if !form_data.is_empty() {
258 let mut item = Item::new(ItemType::FormData);
259 item.set_payload(ContentType::Text, form_data);
262 items.push(item);
263 }
264
265 Ok(items)
266}
267
268struct LimitedField<'a> {
273 field: Field<'a>,
274 consumed_size: usize,
275 size_limit: usize,
276 inner_finished: bool,
277}
278
279impl<'a> LimitedField<'a> {
280 fn new(field: Field<'a>, limit: usize) -> Self {
281 LimitedField {
282 field,
283 consumed_size: 0,
284 size_limit: limit,
285 inner_finished: false,
286 }
287 }
288
289 async fn bytes(self) -> Result<Bytes, multer::Error> {
290 self.try_fold(BytesMut::new(), |mut acc, x| async move {
291 acc.extend_from_slice(&x);
292 Ok(acc)
293 })
294 .await
295 .map(|x| x.freeze())
296 }
297}
298
299impl futures::Stream for LimitedField<'_> {
300 type Item = Result<Bytes, multer::Error>;
301
302 fn poll_next(
303 mut self: std::pin::Pin<&mut Self>,
304 cx: &mut std::task::Context<'_>,
305 ) -> std::task::Poll<Option<Self::Item>> {
306 if self.inner_finished {
307 return Poll::Ready(None);
308 }
309
310 match self.field.poll_next_unpin(cx) {
311 err @ Poll::Ready(Some(Err(_))) => err,
312 Poll::Ready(Some(Ok(t))) => {
313 self.consumed_size += t.len();
314 match self.consumed_size <= self.size_limit {
315 true => Poll::Ready(Some(Ok(t))),
316 false => {
317 cx.waker().wake_by_ref();
318 Poll::Pending
319 }
320 }
321 }
322 Poll::Ready(None) if self.consumed_size > self.size_limit => {
323 self.inner_finished = true;
324 Poll::Ready(Some(Err(multer::Error::FieldSizeExceeded {
325 limit: self.consumed_size as u64,
326 field_name: self.field.name().map(Into::into),
327 })))
328 }
329 Poll::Ready(None) => {
330 self.inner_finished = true;
331 Poll::Ready(None)
332 }
333 Poll::Pending => Poll::Pending,
334 }
335 }
336}
337
338pub struct ConstrainedMultipart(pub Multipart<'static>);
342
343impl FromRequest<ServiceState> for ConstrainedMultipart {
344 type Rejection = Remote<multer::Error>;
345
346 async fn from_request(request: Request, state: &ServiceState) -> Result<Self, Self::Rejection> {
347 let limits =
349 multer::SizeLimit::new().whole_stream(state.config().max_attachments_size() as u64);
350
351 multipart_from_request(request, multer::Constraints::new().size_limit(limits))
352 .map(Self)
353 .map_err(Remote)
354 }
355}
356
357impl ConstrainedMultipart {
358 pub async fn items<F>(self, infer_type: F, config: &Config) -> Result<Items, multer::Error>
359 where
360 F: FnMut(Option<&str>, &str) -> AttachmentType,
361 {
362 multipart_items(self.0, infer_type, |_, _| (), config, false).await
365 }
366}
367
368#[allow(dead_code)]
372pub struct UnconstrainedMultipart {
373 multipart: Multipart<'static>,
374 outcome_aggregator: Addr<TrackOutcome>,
375 request_meta: RequestMeta,
376}
377
378impl FromRequest<ServiceState> for UnconstrainedMultipart {
379 type Rejection = BadMultipart;
380
381 async fn from_request(
382 mut request: Request,
383 state: &ServiceState,
384 ) -> Result<Self, Self::Rejection> {
385 let mut parts = request.extract_parts().await?;
386 let request_meta = RequestMeta::<PartialDsn>::from_request_parts(&mut parts, state).await?;
387
388 let multipart = multipart_from_request(request, multer::Constraints::new())?;
389 Ok(UnconstrainedMultipart {
390 multipart,
391 outcome_aggregator: state.outcome_aggregator().clone(),
392 request_meta,
393 })
394 }
395}
396
397#[cfg_attr(not(any(test, sentry)), expect(dead_code))]
398impl UnconstrainedMultipart {
399 pub async fn items<F>(self, infer_type: F, config: &Config) -> Result<Items, multer::Error>
400 where
401 F: FnMut(Option<&str>, &str) -> AttachmentType,
402 {
403 let UnconstrainedMultipart {
404 multipart,
405 outcome_aggregator,
406 request_meta,
407 } = self;
408
409 multipart_items(
410 multipart,
411 infer_type,
412 |outcome, quantity| {
413 outcome_aggregator.send(TrackOutcome {
414 timestamp: request_meta.received_at(),
415 scoping: request_meta.get_partial_scoping(),
416 outcome,
417 event_id: None,
418 remote_addr: request_meta.remote_addr(),
419 category: DataCategory::Attachment,
420 quantity,
421 })
422 },
423 config,
424 true,
425 )
426 .await
427 }
428}
429
430pub fn multipart_from_request(
431 request: Request,
432 constraints: multer::Constraints,
433) -> Result<Multipart<'static>, multer::Error> {
434 let content_type = request
435 .headers()
436 .get("content-type")
437 .and_then(|v| v.to_str().ok())
438 .unwrap_or("");
439 let boundary = multer::parse_boundary(content_type)?;
440
441 Ok(Multipart::with_constraints(
442 request.into_body().into_data_stream(),
443 boundary,
444 constraints,
445 ))
446}
447
448#[cfg(test)]
449mod tests {
450 use std::convert::Infallible;
451
452 use super::*;
453
454 #[test]
455 fn test_get_boundary() {
456 let examples: &[(&[u8], Option<&str>)] = &[
457 (b"--some_val", Some("some_val")),
458 (b"--\nsecond line", None),
459 (b"\n\r--some_val", Some("some_val")),
460 (b"\n\r--some_val\nadfa", Some("some_val")),
461 (b"\n\r--some_val\rfasdf", Some("some_val")),
462 (b"\n\r--some_val\r\nfasdf", Some("some_val")),
463 (b"\n\rsome_val", None),
464 (b"", None),
465 (b"--", None),
466 ];
467
468 for (input, expected) in examples {
469 let boundary = get_multipart_boundary(input);
470 assert_eq!(*expected, boundary);
471 }
472 }
473
474 #[test]
475 fn test_formdata() {
476 let mut writer = FormDataWriter::new();
477 writer.append("foo", "foo");
478 writer.append("bar", "");
479 writer.append("blub", "blub");
480
481 let payload = writer.into_inner();
482 let iter = FormDataIter::new(&payload);
483 let entries: Vec<_> = iter.collect();
484
485 assert_eq!(
486 entries,
487 vec![
488 FormDataEntry::new("foo", "foo"),
489 FormDataEntry::new("bar", ""),
490 FormDataEntry::new("blub", "blub"),
491 ]
492 );
493 }
494
495 #[test]
496 fn test_empty_formdata() {
497 let writer = FormDataWriter::new();
498 let payload = writer.into_inner();
499
500 let iter = FormDataIter::new(&payload);
501 let entries: Vec<_> = iter.collect();
502
503 assert_eq!(entries, vec![]);
504 }
505
506 #[tokio::test]
508 async fn missing_trailing_newline() -> anyhow::Result<()> {
509 let data = "--X-BOUNDARY\r\nContent-Disposition: form-data; \
510 name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--"; let stream = futures::stream::once(async { Ok::<_, Infallible>(data) });
513 let mut multipart = Multipart::new(stream, "X-BOUNDARY");
514
515 assert!(multipart.next_field().await?.is_some());
516 assert!(multipart.next_field().await?.is_none());
517
518 Ok(())
519 }
520
521 #[tokio::test]
522 async fn test_individual_size_limit_exceeded() -> anyhow::Result<()> {
523 let data = "--X-BOUNDARY\r\n\
524 Content-Disposition: form-data; name=\"file\"; filename=\"large.txt\"\r\n\
525 Content-Type: text/plain\r\n\
526 \r\n\
527 content too large for limit\r\n\
528 --X-BOUNDARY\r\n\
529 Content-Disposition: form-data; name=\"small_file\"; filename=\"small.txt\"\r\n\
530 Content-Type: text/plain\r\n\
531 \r\n\
532 ok\r\n\
533 --X-BOUNDARY--\r\n";
534
535 let stream = futures::stream::once(async move { Ok::<_, Infallible>(data) });
536 let multipart = Multipart::new(stream, "X-BOUNDARY");
537
538 let config = Config::from_json_value(serde_json::json!({
539 "limits": {
540 "max_attachment_size": 5
541 }
542 }))?;
543
544 let mut mock_outcomes = vec![];
545 let items = multipart_items(
546 multipart,
547 |_, _| AttachmentType::Attachment,
548 |_, x| mock_outcomes.push(x),
549 &config,
550 true,
551 )
552 .await?;
553
554 assert_eq!(items.len(), 1);
556 let item = &items[0];
557 assert_eq!(item.filename(), Some("small.txt"));
558 assert_eq!(item.payload(), Bytes::from("ok"));
559 assert_eq!(mock_outcomes, vec![27]);
560
561 Ok(())
562 }
563
564 #[tokio::test]
565 async fn test_collective_size_limit_exceeded() -> anyhow::Result<()> {
566 let data = "--X-BOUNDARY\r\n\
567 Content-Disposition: form-data; name=\"file\"; filename=\"large.txt\"\r\n\
568 Content-Type: text/plain\r\n\
569 \r\n\
570 content too large for limit\r\n\
571 --X-BOUNDARY\r\n\
572 Content-Disposition: form-data; name=\"small_file\"; filename=\"small.txt\"\r\n\
573 Content-Type: text/plain\r\n\
574 \r\n\
575 ok\r\n\
576 --X-BOUNDARY--\r\n";
577
578 let stream = futures::stream::once(async move { Ok::<_, Infallible>(data) });
579
580 let config = Config::from_json_value(serde_json::json!({
581 "limits": {
582 "max_attachments_size": 5
583 }
584 }))?;
585
586 let multipart = Multipart::new(stream, "X-BOUNDARY");
587
588 let result = UnconstrainedMultipart {
589 multipart,
590 outcome_aggregator: Addr::dummy(),
591 request_meta: RequestMeta::new(
592 "https://a94ae32be2584e0bbd7a4cbb95971fee:@sentry.io/42"
593 .parse()
594 .unwrap(),
595 ),
596 }
597 .items(|_, _| AttachmentType::Attachment, &config)
598 .await;
599
600 assert!(result.is_err_and(|x| matches!(x, multer::Error::StreamSizeExceeded { limit: _ })));
602
603 Ok(())
604 }
605}