Skip to main content

relay_server/utils/
multipart.rs

1use std::future::Future;
2use std::io;
3
4use axum::extract::Request;
5use bytes::Bytes;
6use futures::TryStreamExt;
7use multer::{Field, Multipart};
8use relay_config::Config;
9use relay_quotas::DataCategory;
10use relay_system::Addr;
11use serde::{Deserialize, Serialize};
12use tokio::io::AsyncReadExt;
13use tokio_util::io::StreamReader;
14
15use crate::endpoints::common::BadStoreRequest;
16use crate::envelope::{AttachmentType, ContentType, Item, ItemType, Items};
17use crate::extractors::RequestMeta;
18use crate::managed::Managed;
19use crate::services::outcome::{
20    DiscardAttachmentType, DiscardItemType, DiscardReason, Outcome, TrackOutcome,
21};
22
23/// Type used for encoding string lengths.
24type Len = u32;
25
26/// Serializes a Pascal-style string with a 4 byte little-endian length prefix.
27fn write_string<W>(mut writer: W, string: &str) -> io::Result<()>
28where
29    W: io::Write,
30{
31    writer.write_all(&(string.len() as Len).to_le_bytes())?;
32    writer.write_all(string.as_bytes())?;
33
34    Ok(())
35}
36
37/// Safely consumes a slice of the given length.
38fn split_front<'a>(data: &mut &'a [u8], len: usize) -> Option<&'a [u8]> {
39    if data.len() < len {
40        *data = &[];
41        return None;
42    }
43
44    let (slice, rest) = data.split_at(len);
45    *data = rest;
46    Some(slice)
47}
48
49/// Consumes the 4-byte length prefix of a string.
50fn consume_len(data: &mut &[u8]) -> Option<usize> {
51    let len = std::mem::size_of::<Len>();
52    let slice = split_front(data, len)?;
53    let bytes = slice.try_into().ok();
54    bytes.map(|b| Len::from_le_bytes(b) as usize)
55}
56
57/// Consumes a Pascal-style string with a 4 byte little-endian length prefix.
58fn consume_string<'a>(data: &mut &'a [u8]) -> Option<&'a str> {
59    let len = consume_len(data)?;
60    let bytes = split_front(data, len)?;
61    std::str::from_utf8(bytes).ok()
62}
63
64/// An entry in a serialized form data item.
65#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
66pub struct FormDataEntry<'a>(&'a str, &'a str);
67
68impl<'a> FormDataEntry<'a> {
69    pub fn new(key: &'a str, value: &'a str) -> Self {
70        Self(key, value)
71    }
72
73    pub fn key(&self) -> &'a str {
74        self.0
75    }
76
77    pub fn value(&self) -> &'a str {
78        self.1
79    }
80
81    fn to_writer<W: io::Write>(&self, mut writer: W) {
82        write_string(&mut writer, self.key()).ok();
83        write_string(&mut writer, self.value()).ok();
84    }
85
86    fn read(data: &mut &'a [u8]) -> Option<Self> {
87        let key = consume_string(data)?;
88        let value = consume_string(data)?;
89        Some(Self::new(key, value))
90    }
91}
92
93/// A writer for serialized form data.
94///
95/// This writer is used to serialize multiple plain fields from a multipart form data request into a
96/// single envelope item. Use `FormDataIter` to iterate all entries.
97struct FormDataWriter {
98    data: Vec<u8>,
99}
100
101impl FormDataWriter {
102    pub fn new() -> Self {
103        Self { data: Vec::new() }
104    }
105
106    pub fn append(&mut self, key: &str, value: &str) {
107        let entry = FormDataEntry::new(key, value);
108        entry.to_writer(&mut self.data);
109    }
110
111    pub fn into_inner(self) -> Vec<u8> {
112        self.data
113    }
114}
115
116/// Iterates through serialized form data written with `FormDataWriter`.
117pub struct FormDataIter<'a> {
118    data: &'a [u8],
119}
120
121impl<'a> FormDataIter<'a> {
122    pub fn new(data: &'a [u8]) -> Self {
123        Self { data }
124    }
125}
126
127impl<'a> Iterator for FormDataIter<'a> {
128    type Item = FormDataEntry<'a>;
129
130    fn next(&mut self) -> Option<Self::Item> {
131        while !self.data.is_empty() {
132            match FormDataEntry::read(&mut self.data) {
133                Some(entry) => return Some(entry),
134                None => relay_log::error!("form data deserialization failed"),
135            }
136        }
137
138        None
139    }
140}
141
142/// Looks for a multipart boundary at the beginning of the data
143/// and returns it as a `&str` if it is found
144///
145/// A multipart boundary starts at the beginning of the data (possibly
146/// after some blank lines) and it is prefixed by '--' (two dashes)
147///
148/// ```ignore
149/// let boundary = get_multipart_boundary(b"--The boundary\r\n next line");
150/// assert_eq!(Some("The boundary"), boundary);
151///
152/// let invalid_boundary = get_multipart_boundary(b"The boundary\r\n next line");
153/// assert_eq!(None, invalid_boundary);
154/// ```
155pub fn get_multipart_boundary(data: &[u8]) -> Option<&str> {
156    data.split(|&byte| byte == b'\r' || byte == b'\n')
157        // Get the first non-empty line
158        .find(|slice| !slice.is_empty())
159        // Check for the form boundary indicator
160        .filter(|slice| slice.len() > 2 && slice.starts_with(b"--"))
161        // Form boundaries must be valid UTF-8 strings
162        .and_then(|slice| std::str::from_utf8(&slice[2..]).ok())
163}
164
165/// Strategy for how to infer attachment type and add a multipart attachment to an envelope item.
166///
167/// This enables different endpoints to have different ways of dealing with multipart attachments,
168/// for instance, one endpoint can upload attachments and add a ref to the item, while another
169/// endpoint can add attachments to items directly.
170pub trait AttachmentStrategy {
171    fn infer_type(&self, field: &Field) -> AttachmentType;
172
173    /// Defines how individual multipart items should be handled.
174    ///
175    /// Returns
176    ///  - `Ok(Some(item))` if everything was successful.
177    ///  - `Ok(None)` if there was an error adding the attachment, but the rest of the request
178    ///    should still be handled.
179    ///  - `Err(..)` if there was an unexpected error adding the attachment and the request should
180    ///    be cancelled.
181    fn add_to_item(
182        &self,
183        field: Field<'static>,
184        item: Managed<Item>,
185        config: &Config,
186    ) -> impl Future<Output = Result<Option<Managed<Item>>, multer::Error>> + Send;
187}
188
189pub async fn read_bytes_into_item(
190    field: Field<'static>,
191    mut item: Managed<Item>,
192    config: &Config,
193) -> Result<Managed<Item>, multer::Error> {
194    let content_type = field
195        .content_type()
196        .map(|ct| ct.as_ref().parse().unwrap_or(ContentType::OctetStream));
197    let field_name = field.name().map(String::from);
198    let limit = config.max_attachment_size();
199    let mut buf = Vec::new();
200    StreamReader::new(field.map_err(io::Error::other))
201        .take((limit + 1) as u64) // Extra byte needed to determine if limit was exceeded.
202        .read_to_end(&mut buf)
203        .await
204        .map_err(|e| multer::Error::StreamReadFailed(Box::new(e)))?;
205    let bytes = Bytes::from(buf);
206    let n_bytes = bytes.len();
207    item.modify(|inner, records| {
208        if let Some(content_type) = content_type {
209            inner.set_payload(content_type, bytes);
210        } else {
211            inner.set_payload_without_content_type(bytes);
212        };
213        records.lenient(DataCategory::Attachment);
214    });
215
216    if n_bytes > limit {
217        let attachment_type = item.attachment_type().unwrap_or(AttachmentType::Attachment);
218        let item_type = DiscardItemType::Attachment(DiscardAttachmentType::from(attachment_type));
219        let _ = item.reject_err(Outcome::Invalid(DiscardReason::ItemTooLarge(item_type)));
220
221        Err(multer::Error::FieldSizeExceeded {
222            limit: limit as u64,
223            field_name,
224        })
225    } else {
226        Ok(item)
227    }
228}
229
230pub async fn multipart_items(
231    mut multipart: Multipart<'static>,
232    config: &Config,
233    attachment_strategy: impl AttachmentStrategy,
234    request_meta: &RequestMeta,
235    outcome_aggregator: &Addr<TrackOutcome>,
236) -> Result<Managed<Items>, multer::Error> {
237    let mut items =
238        Managed::with_meta_from_request_meta(request_meta, outcome_aggregator, Items::new());
239    let mut form_data = FormDataWriter::new();
240    let mut attachments_size = 0;
241
242    while let Some(field) = multipart.next_field().await? {
243        if let Some(file_name) = field.file_name() {
244            let mut item = Item::new(ItemType::Attachment);
245            let attachment_type = attachment_strategy.infer_type(&field);
246            item.set_attachment_type(attachment_type);
247            item.set_filename(file_name);
248            let item = items.wrap(item);
249            let item = attachment_strategy
250                .add_to_item(field, item, config)
251                .await
252                .inspect_err(|e| {
253                    if let multer::Error::FieldSizeExceeded { .. } = e {
254                        let attachment_type = DiscardAttachmentType::from(attachment_type);
255                        let item_type = DiscardItemType::Attachment(attachment_type);
256                        let discard_reason = DiscardReason::ItemTooLarge(item_type);
257                        let _ = items.reject_err(Outcome::Invalid(discard_reason));
258                    }
259                })?;
260            if let Some(item) = item {
261                // This increases the attachments byte count even if the item is an attachment ref.
262                // This is by design as the total number of bytes read into memory should be
263                // constrained.
264                attachments_size += item.len();
265                items.merge_with(item, |items, item, _| items.push(item));
266                if attachments_size > config.max_attachments_size() {
267                    let item_type = DiscardItemType::Attachment(DiscardAttachmentType::Attachment);
268                    let _ =
269                        items.reject_err(Outcome::Invalid(DiscardReason::ItemTooLarge(item_type)));
270                    return Err(multer::Error::StreamSizeExceeded {
271                        limit: config.max_attachments_size() as u64,
272                    });
273                }
274            }
275        } else if let Some(field_name) = field.name().map(str::to_owned) {
276            // Ensure to decode this SAFELY to match Django's POST data behavior. This allows us to
277            // process sentry event payloads even if they contain invalid encoding.
278            let string = field.text().await?;
279            form_data.append(&field_name, &string);
280        } else {
281            relay_log::trace!("multipart content without name or file_name");
282        }
283    }
284
285    let form_data = form_data.into_inner();
286    if !form_data.is_empty() {
287        let mut item = Item::new(ItemType::FormData);
288        // Content type is `Text` (since it is not a json object but multiple
289        // json arrays serialized one after the other).
290        item.set_payload(ContentType::Text, form_data);
291        items.merge_with(items.wrap(item), |items, item, _| items.push(item));
292    }
293
294    Ok(items)
295}
296
297pub fn multipart_from_request(request: Request) -> Result<Multipart<'static>, BadStoreRequest> {
298    let content_type = request
299        .headers()
300        .get("content-type")
301        .and_then(|v| v.to_str().ok())
302        .unwrap_or("");
303    let boundary =
304        multer::parse_boundary(content_type).map_err(BadStoreRequest::InvalidMultipart)?;
305    Ok(Multipart::new(
306        request.into_body().into_data_stream(),
307        boundary,
308    ))
309}
310
311#[cfg(test)]
312mod tests {
313    use std::convert::Infallible;
314
315    use super::*;
316
317    fn mock_request_meta() -> RequestMeta {
318        let dsn = "https://a94ae32be2582e0bbd7a4cbb95971fee:@sentry.io/42"
319            .parse()
320            .unwrap();
321        RequestMeta::new(dsn)
322    }
323
324    #[test]
325    fn test_get_boundary() {
326        let examples: &[(&[u8], Option<&str>)] = &[
327            (b"--some_val", Some("some_val")),
328            (b"--\nsecond line", None),
329            (b"\n\r--some_val", Some("some_val")),
330            (b"\n\r--some_val\nadfa", Some("some_val")),
331            (b"\n\r--some_val\rfasdf", Some("some_val")),
332            (b"\n\r--some_val\r\nfasdf", Some("some_val")),
333            (b"\n\rsome_val", None),
334            (b"", None),
335            (b"--", None),
336        ];
337
338        for (input, expected) in examples {
339            let boundary = get_multipart_boundary(input);
340            assert_eq!(*expected, boundary);
341        }
342    }
343
344    #[test]
345    fn test_formdata() {
346        let mut writer = FormDataWriter::new();
347        writer.append("foo", "foo");
348        writer.append("bar", "");
349        writer.append("blub", "blub");
350
351        let payload = writer.into_inner();
352        let iter = FormDataIter::new(&payload);
353        let entries: Vec<_> = iter.collect();
354
355        assert_eq!(
356            entries,
357            vec![
358                FormDataEntry::new("foo", "foo"),
359                FormDataEntry::new("bar", ""),
360                FormDataEntry::new("blub", "blub"),
361            ]
362        );
363    }
364
365    #[test]
366    fn test_empty_formdata() {
367        let writer = FormDataWriter::new();
368        let payload = writer.into_inner();
369
370        let iter = FormDataIter::new(&payload);
371        let entries: Vec<_> = iter.collect();
372
373        assert_eq!(entries, vec![]);
374    }
375
376    /// Regression test for multipart payloads without a trailing newline.
377    #[tokio::test]
378    async fn missing_trailing_newline() {
379        let data = "--X-BOUNDARY\r\nContent-Disposition: form-data; \
380        name=\"my_text_field\"\r\n\r\nabcd\r\n--X-BOUNDARY--"; // No trailing newline
381
382        let stream = futures::stream::once(async { Ok::<_, Infallible>(data) });
383        let mut multipart = Multipart::new(stream, "X-BOUNDARY");
384
385        assert!(multipart.next_field().await.unwrap().is_some());
386        assert!(multipart.next_field().await.unwrap().is_none());
387    }
388
389    #[tokio::test]
390    async fn test_individual_size_limit_exceeded() {
391        let data = "--X-BOUNDARY\r\n\
392              Content-Disposition: form-data; name=\"file\"; filename=\"large.txt\"\r\n\
393              Content-Type: text/plain\r\n\
394              \r\n\
395              content too large for limit\r\n\
396              --X-BOUNDARY\r\n\
397              Content-Disposition: form-data; name=\"small_file\"; filename=\"small.txt\"\r\n\
398              Content-Type: text/plain\r\n\
399              \r\n\
400              ok\r\n\
401              --X-BOUNDARY--\r\n";
402
403        let stream = futures::stream::once(async move { Ok::<_, Infallible>(data) });
404        let multipart = Multipart::new(stream, "X-BOUNDARY");
405
406        let config = Config::from_json_value(serde_json::json!({
407            "limits": {
408                "max_attachment_size": 5
409            }
410        }))
411        .unwrap();
412
413        struct MockAttachmentStrategy;
414        impl AttachmentStrategy for MockAttachmentStrategy {
415            async fn add_to_item(
416                &self,
417                field: Field<'static>,
418                item: Managed<Item>,
419                config: &Config,
420            ) -> Result<Option<Managed<Item>>, multer::Error> {
421                read_bytes_into_item(field, item, config).await.map(Some)
422            }
423
424            fn infer_type(&self, _: &Field) -> AttachmentType {
425                AttachmentType::Attachment
426            }
427        }
428
429        let res = multipart_items(
430            multipart,
431            &config,
432            MockAttachmentStrategy,
433            &mock_request_meta(),
434            &Addr::dummy(),
435        )
436        .await;
437        assert!(res.is_err_and(|x| matches!(x, multer::Error::FieldSizeExceeded { .. })));
438    }
439
440    #[tokio::test]
441    async fn test_collective_size_limit_exceeded() {
442        let data = "--X-BOUNDARY\r\n\
443              Content-Disposition: form-data; name=\"file\"; filename=\"large.txt\"\r\n\
444              Content-Type: text/plain\r\n\
445              \r\n\
446              content too large for limit\r\n\
447              --X-BOUNDARY\r\n\
448              Content-Disposition: form-data; name=\"small_file\"; filename=\"small.txt\"\r\n\
449              Content-Type: text/plain\r\n\
450              \r\n\
451              ok\r\n\
452              --X-BOUNDARY--\r\n";
453
454        let stream = futures::stream::once(async move { Ok::<_, Infallible>(data) });
455
456        let config = &Config::from_json_value(serde_json::json!({
457            "limits": {
458                "max_attachments_size": 5
459            }
460        }))
461        .unwrap();
462
463        let multipart = Multipart::new(stream, "X-BOUNDARY");
464
465        struct MockAttachmentStrategy;
466        impl AttachmentStrategy for MockAttachmentStrategy {
467            async fn add_to_item(
468                &self,
469                field: Field<'static>,
470                item: Managed<Item>,
471                config: &Config,
472            ) -> Result<Option<Managed<Item>>, multer::Error> {
473                read_bytes_into_item(field, item, config).await.map(Some)
474            }
475
476            fn infer_type(&self, _: &Field) -> AttachmentType {
477                AttachmentType::Attachment
478            }
479        }
480
481        let result = multipart_items(
482            multipart,
483            config,
484            MockAttachmentStrategy,
485            &mock_request_meta(),
486            &Addr::dummy(),
487        )
488        .await;
489
490        // Should be warned if the overall stream limit is being breached.
491        assert!(result.is_err_and(|x| matches!(x, multer::Error::StreamSizeExceeded { limit: _ })));
492    }
493}