relay_server/utils/
multipart.rs

1use std::convert::Infallible;
2use std::io;
3use std::task::Poll;
4
5use axum::RequestExt;
6use axum::extract::{FromRequest, FromRequestParts, Request};
7use axum::http::StatusCode;
8use axum::response::{IntoResponse, Response};
9use bytes::{Bytes, BytesMut};
10use futures::{StreamExt, TryStreamExt};
11use multer::{Field, Multipart};
12use relay_config::Config;
13use relay_quotas::DataCategory;
14use relay_system::Addr;
15use serde::{Deserialize, Serialize};
16
17use crate::envelope::{AttachmentType, ContentType, Item, ItemType, Items};
18use crate::extractors::{BadEventMeta, PartialDsn, Remote, RequestMeta};
19use crate::service::ServiceState;
20use crate::services::outcome::{
21    DiscardAttachmentType, DiscardItemType, DiscardReason, Outcome, TrackOutcome,
22};
23use crate::utils::ApiErrorResponse;
24
25/// Type used for encoding string lengths.
26type Len = u32;
27
28/// Serializes a Pascal-style string with a 4 byte little-endian length prefix.
29fn write_string<W>(mut writer: W, string: &str) -> io::Result<()>
30where
31    W: io::Write,
32{
33    writer.write_all(&(string.len() as Len).to_le_bytes())?;
34    writer.write_all(string.as_bytes())?;
35
36    Ok(())
37}
38
39/// Safely consumes a slice of the given length.
40fn split_front<'a>(data: &mut &'a [u8], len: usize) -> Option<&'a [u8]> {
41    if data.len() < len {
42        *data = &[];
43        return None;
44    }
45
46    let (slice, rest) = data.split_at(len);
47    *data = rest;
48    Some(slice)
49}
50
51/// Consumes the 4-byte length prefix of a string.
52fn consume_len(data: &mut &[u8]) -> Option<usize> {
53    let len = std::mem::size_of::<Len>();
54    let slice = split_front(data, len)?;
55    let bytes = slice.try_into().ok();
56    bytes.map(|b| Len::from_le_bytes(b) as usize)
57}
58
59/// Consumes a Pascal-style string with a 4 byte little-endian length prefix.
60fn consume_string<'a>(data: &mut &'a [u8]) -> Option<&'a str> {
61    let len = consume_len(data)?;
62    let bytes = split_front(data, len)?;
63    std::str::from_utf8(bytes).ok()
64}
65
66/// An entry in a serialized form data item.
67#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
68pub struct FormDataEntry<'a>(&'a str, &'a str);
69
70impl<'a> FormDataEntry<'a> {
71    pub fn new(key: &'a str, value: &'a str) -> Self {
72        Self(key, value)
73    }
74
75    pub fn key(&self) -> &'a str {
76        self.0
77    }
78
79    pub fn value(&self) -> &'a str {
80        self.1
81    }
82
83    fn to_writer<W: io::Write>(&self, mut writer: W) {
84        write_string(&mut writer, self.key()).ok();
85        write_string(&mut writer, self.value()).ok();
86    }
87
88    fn read(data: &mut &'a [u8]) -> Option<Self> {
89        let key = consume_string(data)?;
90        let value = consume_string(data)?;
91        Some(Self::new(key, value))
92    }
93}
94
95/// A writer for serialized form data.
96///
97/// This writer is used to serialize multiple plain fields from a multipart form data request into a
98/// single envelope item. Use `FormDataIter` to iterate all entries.
99struct FormDataWriter {
100    data: Vec<u8>,
101}
102
103impl FormDataWriter {
104    pub fn new() -> Self {
105        Self { data: Vec::new() }
106    }
107
108    pub fn append(&mut self, key: &str, value: &str) {
109        let entry = FormDataEntry::new(key, value);
110        entry.to_writer(&mut self.data);
111    }
112
113    pub fn into_inner(self) -> Vec<u8> {
114        self.data
115    }
116}
117
118/// Iterates through serialized form data written with `FormDataWriter`.
119pub struct FormDataIter<'a> {
120    data: &'a [u8],
121}
122
123impl<'a> FormDataIter<'a> {
124    pub fn new(data: &'a [u8]) -> Self {
125        Self { data }
126    }
127}
128
129impl<'a> Iterator for FormDataIter<'a> {
130    type Item = FormDataEntry<'a>;
131
132    fn next(&mut self) -> Option<Self::Item> {
133        while !self.data.is_empty() {
134            match FormDataEntry::read(&mut self.data) {
135                Some(entry) => return Some(entry),
136                None => relay_log::error!("form data deserialization failed"),
137            }
138        }
139
140        None
141    }
142}
143
144/// Looks for a multipart boundary at the beginning of the data
145/// and returns it as a `&str` if it is found
146///
147/// A multipart boundary starts at the beginning of the data (possibly
148/// after some blank lines) and it is prefixed by '--' (two dashes)
149///
150/// ```ignore
151/// let boundary = get_multipart_boundary(b"--The boundary\r\n next line");
152/// assert_eq!(Some("The boundary"), boundary);
153///
154/// let invalid_boundary = get_multipart_boundary(b"The boundary\r\n next line");
155/// assert_eq!(None, invalid_boundary);
156/// ```
157pub fn get_multipart_boundary(data: &[u8]) -> Option<&str> {
158    data.split(|&byte| byte == b'\r' || byte == b'\n')
159        // Get the first non-empty line
160        .find(|slice| !slice.is_empty())
161        // Check for the form boundary indicator
162        .filter(|slice| slice.len() > 2 && slice.starts_with(b"--"))
163        // Form boundaries must be valid UTF-8 strings
164        .and_then(|slice| std::str::from_utf8(&slice[2..]).ok())
165}
166
167#[derive(Debug, thiserror::Error)]
168pub enum BadMultipart {
169    #[error("event metadata error: {0}")]
170    EventMeta(#[from] BadEventMeta),
171    #[error("multipart error: {0}")]
172    Multipart(#[from] multer::Error),
173}
174
175impl From<Infallible> for BadMultipart {
176    fn from(infallible: Infallible) -> Self {
177        match infallible {}
178    }
179}
180
181impl IntoResponse for BadMultipart {
182    fn into_response(self) -> Response {
183        let status_code = match self {
184            BadMultipart::Multipart(
185                multer::Error::FieldSizeExceeded { .. } | multer::Error::StreamSizeExceeded { .. },
186            ) => StatusCode::PAYLOAD_TOO_LARGE,
187            _ => StatusCode::BAD_REQUEST,
188        };
189
190        (status_code, ApiErrorResponse::from_error(&self)).into_response()
191    }
192}
193
194async fn multipart_items<F, G>(
195    mut multipart: Multipart<'_>,
196    mut infer_type: F,
197    mut emit_outcome: G,
198    config: &Config,
199    ignore_large_fields: bool,
200) -> Result<Items, multer::Error>
201where
202    F: FnMut(Option<&str>, &str) -> AttachmentType,
203    G: FnMut(Outcome, u32),
204{
205    let mut items = Items::new();
206    let mut form_data = FormDataWriter::new();
207    let mut attachments_size = 0;
208
209    while let Some(field) = multipart.next_field().await? {
210        if let Some(file_name) = field.file_name() {
211            let mut item = Item::new(ItemType::Attachment);
212            item.set_attachment_type(infer_type(field.name(), file_name));
213            item.set_filename(file_name);
214
215            let content_type = field.content_type().cloned();
216            let field = LimitedField::new(field, config.max_attachment_size());
217            match field.bytes().await {
218                Err(multer::Error::FieldSizeExceeded { limit, .. }) if ignore_large_fields => {
219                    emit_outcome(
220                        Outcome::Invalid(DiscardReason::TooLarge(DiscardItemType::Attachment(
221                            DiscardAttachmentType::Attachment,
222                        ))),
223                        u32::try_from(limit).unwrap_or(u32::MAX),
224                    );
225                    continue;
226                }
227                Err(err) => return Err(err),
228                Ok(bytes) => {
229                    attachments_size += bytes.len();
230
231                    if attachments_size > config.max_attachments_size() {
232                        return Err(multer::Error::StreamSizeExceeded {
233                            limit: config.max_attachments_size() as u64,
234                        });
235                    }
236
237                    if let Some(content_type) = content_type {
238                        item.set_payload(content_type.as_ref().into(), bytes);
239                    } else {
240                        item.set_payload_without_content_type(bytes);
241                    }
242                }
243            }
244
245            items.push(item);
246        } else if let Some(field_name) = field.name().map(str::to_owned) {
247            // Ensure to decode this SAFELY to match Django's POST data behavior. This allows us to
248            // process sentry event payloads even if they contain invalid encoding.
249            let string = field.text().await?;
250            form_data.append(&field_name, &string);
251        } else {
252            relay_log::trace!("multipart content without name or file_name");
253        }
254    }
255
256    let form_data = form_data.into_inner();
257    if !form_data.is_empty() {
258        let mut item = Item::new(ItemType::FormData);
259        // Content type is `Text` (since it is not a json object but multiple
260        // json arrays serialized one after the other).
261        item.set_payload(ContentType::Text, form_data);
262        items.push(item);
263    }
264
265    Ok(items)
266}
267
268/// Wrapper around `multer::Field` which consumes the entire underlying stream even when the
269/// size limit is exceeded.
270///
271/// The idea being that you can process fields in a multi-part form even if one fields is too large.
272struct LimitedField<'a> {
273    field: Field<'a>,
274    consumed_size: usize,
275    size_limit: usize,
276    inner_finished: bool,
277}
278
279impl<'a> LimitedField<'a> {
280    fn new(field: Field<'a>, limit: usize) -> Self {
281        LimitedField {
282            field,
283            consumed_size: 0,
284            size_limit: limit,
285            inner_finished: false,
286        }
287    }
288
289    async fn bytes(self) -> Result<Bytes, multer::Error> {
290        self.try_fold(BytesMut::new(), |mut acc, x| async move {
291            acc.extend_from_slice(&x);
292            Ok(acc)
293        })
294        .await
295        .map(|x| x.freeze())
296    }
297}
298
299impl futures::Stream for LimitedField<'_> {
300    type Item = Result<Bytes, multer::Error>;
301
302    fn poll_next(
303        mut self: std::pin::Pin<&mut Self>,
304        cx: &mut std::task::Context<'_>,
305    ) -> std::task::Poll<Option<Self::Item>> {
306        if self.inner_finished {
307            return Poll::Ready(None);
308        }
309
310        match self.field.poll_next_unpin(cx) {
311            err @ Poll::Ready(Some(Err(_))) => err,
312            Poll::Ready(Some(Ok(t))) => {
313                self.consumed_size += t.len();
314                match self.consumed_size <= self.size_limit {
315                    true => Poll::Ready(Some(Ok(t))),
316                    false => {
317                        cx.waker().wake_by_ref();
318                        Poll::Pending
319                    }
320                }
321            }
322            Poll::Ready(None) if self.consumed_size > self.size_limit => {
323                self.inner_finished = true;
324                Poll::Ready(Some(Err(multer::Error::FieldSizeExceeded {
325                    limit: self.consumed_size as u64,
326                    field_name: self.field.name().map(Into::into),
327                })))
328            }
329            Poll::Ready(None) => {
330                self.inner_finished = true;
331                Poll::Ready(None)
332            }
333            Poll::Pending => Poll::Pending,
334        }
335    }
336}
337
338/// Wrapper around [`multer::Multipart`] that checks each field is smaller than
339/// `max_attachment_size` and that the combined size of all fields is smaller than
340/// 'max_attachments_size'.
341pub struct ConstrainedMultipart(pub Multipart<'static>);
342
343impl FromRequest<ServiceState> for ConstrainedMultipart {
344    type Rejection = Remote<multer::Error>;
345
346    async fn from_request(request: Request, state: &ServiceState) -> Result<Self, Self::Rejection> {
347        // Still want to enforce multer limits here so that we avoid parsing large fields.
348        let limits =
349            multer::SizeLimit::new().whole_stream(state.config().max_attachments_size() as u64);
350
351        multipart_from_request(request, multer::Constraints::new().size_limit(limits))
352            .map(Self)
353            .map_err(Remote)
354    }
355}
356
357impl ConstrainedMultipart {
358    pub async fn items<F>(self, infer_type: F, config: &Config) -> Result<Items, multer::Error>
359    where
360        F: FnMut(Option<&str>, &str) -> AttachmentType,
361    {
362        // The emit outcome closure here does nothing since in this code branch we don't want to
363        // emit outcomes as we already return an error to the request.
364        multipart_items(self.0, infer_type, |_, _| (), config, false).await
365    }
366}
367
368/// Wrapper around [`multer::Multipart`] that skips over fields which are larger than
369/// `max_attachment_size`. These fields are also not taken into account when checking that the
370/// combined size of all fields is smaller than `max_attachments_size`.
371#[allow(dead_code)]
372pub struct UnconstrainedMultipart {
373    multipart: Multipart<'static>,
374    outcome_aggregator: Addr<TrackOutcome>,
375    request_meta: RequestMeta,
376}
377
378impl FromRequest<ServiceState> for UnconstrainedMultipart {
379    type Rejection = BadMultipart;
380
381    async fn from_request(
382        mut request: Request,
383        state: &ServiceState,
384    ) -> Result<Self, Self::Rejection> {
385        let mut parts = request.extract_parts().await?;
386        let request_meta = RequestMeta::<PartialDsn>::from_request_parts(&mut parts, state).await?;
387
388        let multipart = multipart_from_request(request, multer::Constraints::new())?;
389        Ok(UnconstrainedMultipart {
390            multipart,
391            outcome_aggregator: state.outcome_aggregator().clone(),
392            request_meta,
393        })
394    }
395}
396
397#[cfg_attr(not(any(test, sentry)), expect(dead_code))]
398impl UnconstrainedMultipart {
399    pub async fn items<F>(self, infer_type: F, config: &Config) -> Result<Items, multer::Error>
400    where
401        F: FnMut(Option<&str>, &str) -> AttachmentType,
402    {
403        let UnconstrainedMultipart {
404            multipart,
405            outcome_aggregator,
406            request_meta,
407        } = self;
408
409        multipart_items(
410            multipart,
411            infer_type,
412            |outcome, quantity| {
413                outcome_aggregator.send(TrackOutcome {
414                    timestamp: request_meta.received_at(),
415                    scoping: request_meta.get_partial_scoping(),
416                    outcome,
417                    event_id: None,
418                    remote_addr: request_meta.remote_addr(),
419                    category: DataCategory::Attachment,
420                    quantity,
421                })
422            },
423            config,
424            true,
425        )
426        .await
427    }
428}
429
430pub fn multipart_from_request(
431    request: Request,
432    constraints: multer::Constraints,
433) -> Result<Multipart<'static>, multer::Error> {
434    let content_type = request
435        .headers()
436        .get("content-type")
437        .and_then(|v| v.to_str().ok())
438        .unwrap_or("");
439    let boundary = multer::parse_boundary(content_type)?;
440
441    Ok(Multipart::with_constraints(
442        request.into_body().into_data_stream(),
443        boundary,
444        constraints,
445    ))
446}
447
448#[cfg(test)]
449mod tests {
450    use std::convert::Infallible;
451
452    use super::*;
453
454    #[test]
455    fn test_get_boundary() {
456        let examples: &[(&[u8], Option<&str>)] = &[
457            (b"--some_val", Some("some_val")),
458            (b"--\nsecond line", None),
459            (b"\n\r--some_val", Some("some_val")),
460            (b"\n\r--some_val\nadfa", Some("some_val")),
461            (b"\n\r--some_val\rfasdf", Some("some_val")),
462            (b"\n\r--some_val\r\nfasdf", Some("some_val")),
463            (b"\n\rsome_val", None),
464            (b"", None),
465            (b"--", None),
466        ];
467
468        for (input, expected) in examples {
469            let boundary = get_multipart_boundary(input);
470            assert_eq!(*expected, boundary);
471        }
472    }
473
474    #[test]
475    fn test_formdata() {
476        let mut writer = FormDataWriter::new();
477        writer.append("foo", "foo");
478        writer.append("bar", "");
479        writer.append("blub", "blub");
480
481        let payload = writer.into_inner();
482        let iter = FormDataIter::new(&payload);
483        let entries: Vec<_> = iter.collect();
484
485        assert_eq!(
486            entries,
487            vec![
488                FormDataEntry::new("foo", "foo"),
489                FormDataEntry::new("bar", ""),
490                FormDataEntry::new("blub", "blub"),
491            ]
492        );
493    }
494
495    #[test]
496    fn test_empty_formdata() {
497        let writer = FormDataWriter::new();
498        let payload = writer.into_inner();
499
500        let iter = FormDataIter::new(&payload);
501        let entries: Vec<_> = iter.collect();
502
503        assert_eq!(entries, vec![]);
504    }
505
506    /// Regression test for multipart payloads without a trailing newline.
507    #[tokio::test]
508    async fn missing_trailing_newline() -> anyhow::Result<()> {
509        let data = "--X-BOUNDARY\r\nContent-Disposition: form-data; \
510        name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--"; // No trailing newline
511
512        let stream = futures::stream::once(async { Ok::<_, Infallible>(data) });
513        let mut multipart = Multipart::new(stream, "X-BOUNDARY");
514
515        assert!(multipart.next_field().await?.is_some());
516        assert!(multipart.next_field().await?.is_none());
517
518        Ok(())
519    }
520
521    #[tokio::test]
522    async fn test_individual_size_limit_exceeded() -> anyhow::Result<()> {
523        let data = "--X-BOUNDARY\r\n\
524              Content-Disposition: form-data; name=\"file\"; filename=\"large.txt\"\r\n\
525              Content-Type: text/plain\r\n\
526              \r\n\
527              content too large for limit\r\n\
528              --X-BOUNDARY\r\n\
529              Content-Disposition: form-data; name=\"small_file\"; filename=\"small.txt\"\r\n\
530              Content-Type: text/plain\r\n\
531              \r\n\
532              ok\r\n\
533              --X-BOUNDARY--\r\n";
534
535        let stream = futures::stream::once(async move { Ok::<_, Infallible>(data) });
536        let multipart = Multipart::new(stream, "X-BOUNDARY");
537
538        let config = Config::from_json_value(serde_json::json!({
539            "limits": {
540                "max_attachment_size": 5
541            }
542        }))?;
543
544        let mut mock_outcomes = vec![];
545        let items = multipart_items(
546            multipart,
547            |_, _| AttachmentType::Attachment,
548            |_, x| mock_outcomes.push(x),
549            &config,
550            true,
551        )
552        .await?;
553
554        // The large field is skipped so only the small one should make it through.
555        assert_eq!(items.len(), 1);
556        let item = &items[0];
557        assert_eq!(item.filename(), Some("small.txt"));
558        assert_eq!(item.payload(), Bytes::from("ok"));
559        assert_eq!(mock_outcomes, vec![27]);
560
561        Ok(())
562    }
563
564    #[tokio::test]
565    async fn test_collective_size_limit_exceeded() -> anyhow::Result<()> {
566        let data = "--X-BOUNDARY\r\n\
567              Content-Disposition: form-data; name=\"file\"; filename=\"large.txt\"\r\n\
568              Content-Type: text/plain\r\n\
569              \r\n\
570              content too large for limit\r\n\
571              --X-BOUNDARY\r\n\
572              Content-Disposition: form-data; name=\"small_file\"; filename=\"small.txt\"\r\n\
573              Content-Type: text/plain\r\n\
574              \r\n\
575              ok\r\n\
576              --X-BOUNDARY--\r\n";
577
578        let stream = futures::stream::once(async move { Ok::<_, Infallible>(data) });
579
580        let config = Config::from_json_value(serde_json::json!({
581            "limits": {
582                "max_attachments_size": 5
583            }
584        }))?;
585
586        let multipart = Multipart::new(stream, "X-BOUNDARY");
587
588        let result = UnconstrainedMultipart {
589            multipart,
590            outcome_aggregator: Addr::dummy(),
591            request_meta: RequestMeta::new(
592                "https://a94ae32be2584e0bbd7a4cbb95971fee:@sentry.io/42"
593                    .parse()
594                    .unwrap(),
595            ),
596        }
597        .items(|_, _| AttachmentType::Attachment, &config)
598        .await;
599
600        // Should be warned if the overall stream limit is being breached.
601        assert!(result.is_err_and(|x| matches!(x, multer::Error::StreamSizeExceeded { limit: _ })));
602
603        Ok(())
604    }
605}