objectstore_service/
stream.rs

1//! Stream types and buffering utilities for object data.
2//!
3//! Data flows in streams to keep memory consumption low. Two distinct types
4//! cover the two directions of data flow:
5//!
6//! - [`ClientStream`] — incoming data from a client PUT request body. Uses
7//!   [`ClientError`] as the error type so backends can distinguish a broken
8//!   client connection from a backend I/O failure (400 vs 500).
9//! - [`PayloadStream`] — outgoing data returned from
10//!   [`Backend::get_object`](crate::backend::common::Backend::get_object).
11
12use std::error::Error;
13use std::fmt;
14use std::io;
15use std::sync::Arc;
16
17use bytes::{Bytes, BytesMut};
18use futures_util::stream::BoxStream;
19use futures_util::{Stream, StreamExt, TryStreamExt};
20
21/// Outgoing byte stream returned by [`Backend::get_object`](crate::backend::common::Backend::get_object).
22///
23/// Use [`single`] to construct a single-chunk `PayloadStream` from an owned value.
24pub type PayloadStream = BoxStream<'static, io::Result<Bytes>>;
25
26/// Error originating from a client-supplied input stream.
27///
28/// Wraps any error yielded by a [`ClientStream`] (the incoming HTTP request body).
29/// Backends receive this via [`ClientStream`] and can detect it with
30/// [`unpack_client_error`] to return a 4xx response rather than treating
31/// it as a 5xx backend failure.
32#[derive(Clone, Debug)]
33pub struct ClientError(Arc<dyn Error + Send + Sync + 'static>);
34
35impl ClientError {
36    /// Creates a new [`ClientError`] wrapping `err`.
37    pub fn new<E>(err: E) -> Self
38    where
39        E: Error + Send + Sync + 'static,
40    {
41        Self(Arc::new(err))
42    }
43}
44
45impl fmt::Display for ClientError {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        self.0.fmt(f)
48    }
49}
50
51impl Error for ClientError {
52    fn source(&self) -> Option<&(dyn Error + 'static)> {
53        self.0.source()
54    }
55}
56
57/// Required by [`tokio_util::io::StreamReader`] in the local filesystem backend.
58impl From<ClientError> for io::Error {
59    fn from(err: ClientError) -> Self {
60        io::Error::other(err)
61    }
62}
63
64/// Incoming byte stream from a client PUT request body.
65///
66/// Uses [`ClientError`] as the error type so that a dropped or interrupted
67/// client connection is distinguishable from a backend I/O failure. Backends
68/// that detect a [`ClientError`] (via [`unpack_client_error`]) can surface it
69/// as [`crate::error::Error::Client`], which the server maps to HTTP 400 rather
70/// than 500.
71///
72/// Use [`single`] to construct a single-chunk `ClientStream` from an owned value.
73pub type ClientStream = BoxStream<'static, Result<Bytes, ClientError>>;
74
75/// Walks the source chain of `err` looking for a [`ClientError`].
76///
77/// At each step, two locations are checked:
78///
79/// - **Direct**: the error itself is a `ClientError`.
80/// - **Packed in `io::Error`**: the error is an `io::Error` whose custom inner
81///   value is a `ClientError`.
82///
83/// Use this in `put_object` implementations to reclassify body-stream errors
84/// as [`crate::error::Error::Client`] instead of an opaque server error.
85pub fn unpack_client_error<E>(err: &E) -> Option<ClientError>
86where
87    E: Error + 'static,
88{
89    let mut source = Some(err as &(dyn Error + 'static));
90
91    while let Some(s) = source {
92        // The client error may be wrapped as custom `io::Error`, in which case it cannot be
93        // discovered by iterating `sources`.
94        let target = match s.downcast_ref::<io::Error>().and_then(|e| e.get_ref()) {
95            Some(inner) => inner,
96            None => s,
97        };
98
99        if let Some(client_error) = target.downcast_ref::<ClientError>() {
100            return Some(client_error.clone());
101        }
102
103        source = s.source();
104    }
105    None
106}
107
108#[derive(Debug, Default)]
109enum ChunkedBytesState {
110    #[default]
111    Empty,
112    Single(Bytes),
113    Multi(BytesMut),
114}
115
116/// Lazy stream buffer that avoids copying single chunks.
117///
118/// Tracks three internal states:
119/// - **Empty** — no allocation, no data.
120/// - **Single** — first chunk stored by move (zero-copy).
121/// - **Multi** — allocated on second `push`, coalesces all chunks.
122#[derive(Debug)]
123pub(crate) struct ChunkedBytes {
124    state: ChunkedBytesState,
125    capacity: usize,
126}
127
128impl ChunkedBytes {
129    /// Creates a new buffer with the given capacity hint.
130    ///
131    /// The capacity is used when transitioning from Single to Multi state
132    /// and is exposed via [`capacity()`](Self::capacity) for use as a buffer limit.
133    pub fn new(capacity: usize) -> Self {
134        Self {
135            state: ChunkedBytesState::Empty,
136            capacity,
137        }
138    }
139
140    /// Appends a chunk to the buffer.
141    ///
142    /// Empty→Single stores by move (zero-copy). Single→Multi allocates and
143    /// copies both chunks. Multi extends the existing allocation.
144    pub fn push(&mut self, chunk: Bytes) {
145        if chunk.is_empty() {
146            return;
147        }
148        self.state = match std::mem::take(&mut self.state) {
149            ChunkedBytesState::Empty => ChunkedBytesState::Single(chunk),
150            ChunkedBytesState::Single(first) => {
151                let capacity = self.capacity.max(first.len() + chunk.len());
152                let mut buf = BytesMut::with_capacity(capacity);
153                buf.extend_from_slice(&first);
154                buf.extend_from_slice(&chunk);
155                ChunkedBytesState::Multi(buf)
156            }
157            ChunkedBytesState::Multi(mut buf) => {
158                buf.extend_from_slice(&chunk);
159                ChunkedBytesState::Multi(buf)
160            }
161        };
162    }
163
164    /// Returns the configured capacity (used as the buffer limit).
165    pub fn capacity(&self) -> usize {
166        self.capacity
167    }
168
169    /// Returns the total number of buffered bytes.
170    pub fn len(&self) -> usize {
171        match &self.state {
172            ChunkedBytesState::Empty => 0,
173            ChunkedBytesState::Single(b) => b.len(),
174            ChunkedBytesState::Multi(b) => b.len(),
175        }
176    }
177
178    /// Returns `true` if no bytes have been buffered.
179    #[cfg(test)]
180    pub fn is_empty(&self) -> bool {
181        self.len() == 0
182    }
183
184    /// Consumes the buffer and returns the data as a single `Bytes`.
185    ///
186    /// Single-chunk data is returned by move (zero-copy). Multi-chunk data
187    /// is frozen from the internal `BytesMut`.
188    pub fn into_bytes(self) -> Bytes {
189        match self.state {
190            ChunkedBytesState::Empty => Bytes::new(),
191            ChunkedBytesState::Single(b) => b,
192            ChunkedBytesState::Multi(b) => b.freeze(),
193        }
194    }
195}
196
197/// Reads up to `limit` bytes from a stream to support size-based routing decisions.
198///
199/// Constructed via the async [`SizedPeek::new`], which reads eagerly so that
200/// [`is_exhausted()`](Self::is_exhausted) is always valid.
201/// The full stream (buffered prefix plus any unconsumed remainder) is recovered
202/// with [`into_stream()`](Self::into_stream).
203///
204/// The buffer never exceeds `limit` bytes: the chunk that would cause overflow
205/// is held separately and re-emitted by `into_stream` without copying.
206pub(crate) struct SizedPeek<S> {
207    buffer: ChunkedBytes,
208    /// The first chunk that exceeded the limit; `None` when the stream was exhausted.
209    pending: Option<Bytes>,
210    /// `None` when the stream was fully consumed within the limit.
211    stream: Option<S>,
212}
213
214impl<S> SizedPeek<S> {
215    /// Returns `true` if the stream was fully consumed within the peek limit.
216    pub fn is_exhausted(&self) -> bool {
217        self.pending.is_none()
218    }
219
220    /// Returns the number of bytes held in the buffer (at most `limit`).
221    #[cfg(test)]
222    pub fn len(&self) -> usize {
223        self.buffer.len()
224    }
225}
226
227impl<S, E> SizedPeek<S>
228where
229    S: Stream<Item = Result<Bytes, E>> + Unpin,
230{
231    /// Reads from `stream` into an internal buffer until `limit` bytes are accumulated
232    /// or the stream ends.
233    ///
234    /// Uses strictly-greater-than comparison: a stream of exactly `limit` bytes is
235    /// considered exhausted.
236    pub async fn new(mut stream: S, limit: usize) -> Result<Self, E> {
237        let mut buffer = ChunkedBytes::new(limit);
238
239        while let Some(chunk) = stream.try_next().await? {
240            if buffer.len() + chunk.len() > buffer.capacity() {
241                return Ok(Self {
242                    buffer,
243                    pending: Some(chunk),
244                    stream: Some(stream),
245                });
246            }
247            buffer.push(chunk);
248        }
249
250        Ok(Self {
251            buffer,
252            pending: None,
253            stream: None,
254        })
255    }
256
257    /// Consumes self and returns all bytes as a single [`Bytes`].
258    ///
259    /// If the peek limit was exceeded, drains the remaining stream before
260    /// returning. Always correct regardless of [`is_exhausted`](Self::is_exhausted).
261    pub async fn into_bytes(mut self) -> Result<Bytes, E> {
262        if let Some(pending) = self.pending.take() {
263            self.buffer.push(pending);
264        }
265
266        if let Some(mut stream) = self.stream.take() {
267            while let Some(chunk) = stream.try_next().await? {
268                self.buffer.push(chunk);
269            }
270        }
271
272        Ok(self.buffer.into_bytes())
273    }
274}
275
276impl<S, E> SizedPeek<S>
277where
278    S: Stream<Item = Result<Bytes, E>>,
279{
280    /// Consumes self and returns a stream that yields the buffered prefix first,
281    /// then any remaining data from the original stream.
282    pub fn into_stream(self) -> impl Stream<Item = Result<Bytes, E>> {
283        let leading = [self.buffer.into_bytes(), self.pending.unwrap_or_default()]
284            .into_iter()
285            .filter(|b| !b.is_empty())
286            .map(Ok);
287
288        let tail = futures_util::stream::iter(self.stream).flatten();
289        futures_util::stream::iter(leading).chain(tail)
290    }
291}
292
293/// Creates a single-chunk stream that yields `contents` as one item.
294pub fn single<E: Send + 'static>(
295    contents: impl Into<Bytes>,
296) -> BoxStream<'static, Result<Bytes, E>> {
297    futures_util::stream::once(std::future::ready(Ok(contents.into()))).boxed()
298}
299
300/// Collects a stream of `Bytes` chunks into a `Vec<u8>`.
301#[cfg(test)]
302pub(crate) async fn read_to_vec<S, E>(mut stream: S) -> crate::error::Result<Vec<u8>>
303where
304    S: Stream<Item = Result<Bytes, E>> + Unpin,
305    E: Into<crate::error::Error>,
306{
307    let mut payload = Vec::new();
308    while let Some(result) = stream.next().await {
309        let chunk = result.map_err(Into::into)?;
310        payload.extend(&chunk);
311    }
312    Ok(payload)
313}
314
315#[cfg(test)]
316mod tests {
317    use futures_util::stream;
318
319    use super::*;
320
321    // --- ChunkedBytes tests ---
322
323    #[test]
324    fn empty_into_bytes() {
325        let buf = ChunkedBytes::new(1024);
326        assert!(buf.is_empty());
327        assert_eq!(buf.len(), 0);
328        assert_eq!(buf.into_bytes().len(), 0);
329    }
330
331    #[test]
332    fn single_chunk_zero_copy() {
333        let original = Bytes::from_static(b"hello world");
334        let ptr = original.as_ptr();
335
336        let mut buf = ChunkedBytes::new(1024);
337        buf.push(original);
338
339        assert_eq!(buf.len(), 11);
340        assert!(!buf.is_empty());
341
342        let result = buf.into_bytes();
343        assert_eq!(result.as_ref(), b"hello world");
344        // Pointer equality proves zero-copy.
345        assert_eq!(result.as_ptr(), ptr);
346    }
347
348    #[test]
349    fn multi_chunk_coalesces() {
350        let mut buf = ChunkedBytes::new(1024);
351        buf.push(Bytes::from_static(b"hello "));
352        buf.push(Bytes::from_static(b"world"));
353
354        assert_eq!(buf.len(), 11);
355        assert_eq!(buf.into_bytes().as_ref(), b"hello world");
356    }
357
358    #[test]
359    fn push_empty_chunk_is_noop() {
360        let original = Bytes::from_static(b"data");
361        let ptr = original.as_ptr();
362
363        let mut buf = ChunkedBytes::new(1024);
364        buf.push(Bytes::new());
365        assert!(buf.is_empty());
366
367        buf.push(original);
368        buf.push(Bytes::new());
369        // Still in Single state — empty pushes don't trigger Multi.
370        let result = buf.into_bytes();
371        assert_eq!(result.as_ptr(), ptr);
372    }
373
374    #[test]
375    fn capacity_is_preserved() {
376        let buf = ChunkedBytes::new(42);
377        assert_eq!(buf.capacity(), 42);
378    }
379
380    // --- SizedPeek tests ---
381
382    #[tokio::test]
383    async fn exhausted_when_stream_fits() {
384        let s = stream::once(std::future::ready(Ok::<_, io::Error>(Bytes::from_static(
385            b"small payload",
386        ))));
387
388        let peeked = SizedPeek::new(s, 1024).await.unwrap();
389
390        assert!(peeked.is_exhausted());
391        assert_eq!(peeked.len(), 13);
392    }
393
394    #[tokio::test]
395    async fn remaining_when_stream_exceeds_limit() {
396        let chunks: Vec<io::Result<Bytes>> = vec![
397            Ok(Bytes::from(vec![0u8; 600])),
398            Ok(Bytes::from(vec![1u8; 600])),
399        ];
400
401        let peeked = SizedPeek::new(stream::iter(chunks), 1000).await.unwrap();
402
403        assert!(!peeked.is_exhausted());
404        // Only the first 600-byte chunk fits in the buffer; the second is pending.
405        assert_eq!(peeked.len(), 600);
406    }
407
408    #[tokio::test]
409    async fn into_stream_reassembles_data() {
410        // "aaa" fits (3 ≤ 5), "bbb" overflows (3+3=6 > 5) → pending, "ccc" stays in stream.
411        let chunks: Vec<io::Result<Bytes>> = vec![
412            Ok(Bytes::from_static(b"aaa")),
413            Ok(Bytes::from_static(b"bbb")),
414            Ok(Bytes::from_static(b"ccc")),
415        ];
416
417        let peeked = SizedPeek::new(stream::iter(chunks), 5).await.unwrap();
418        assert!(!peeked.is_exhausted());
419
420        let output = read_to_vec(peeked.into_stream()).await.unwrap();
421        assert_eq!(output, b"aaabbbccc");
422    }
423
424    #[tokio::test]
425    async fn into_stream_single_chunk_zero_copy() {
426        let original = Bytes::from_static(b"zero-copy roundtrip");
427        let ptr = original.as_ptr();
428        let s = stream::once(std::future::ready(Ok::<_, io::Error>(original)));
429
430        let peeked = SizedPeek::new(s, 1024).await.unwrap();
431        let out = peeked.into_stream();
432        futures_util::pin_mut!(out);
433        let first = out.try_next().await.unwrap().unwrap();
434        assert_eq!(first.as_ptr(), ptr);
435    }
436
437    #[tokio::test]
438    async fn oversized_single_chunk_goes_to_pending() {
439        // A single chunk larger than the limit never enters the buffer.
440        let big = Bytes::from(vec![0u8; 2000]);
441        let ptr = big.as_ptr();
442        let s = stream::once(std::future::ready(Ok::<_, io::Error>(big)));
443
444        let peeked = SizedPeek::new(s, 1000).await.unwrap();
445        assert!(!peeked.is_exhausted());
446        assert_eq!(peeked.len(), 0); // nothing in buffer
447
448        let out = peeked.into_stream();
449        futures_util::pin_mut!(out);
450        let first = out.try_next().await.unwrap().unwrap();
451        // The pending chunk is emitted by move — same pointer, no copy.
452        assert_eq!(first.as_ptr(), ptr);
453        assert_eq!(first.len(), 2000);
454    }
455
456    #[tokio::test]
457    async fn error_propagation() {
458        let chunks: Vec<io::Result<Bytes>> = vec![
459            Ok(Bytes::from_static(b"ok")),
460            Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken")),
461        ];
462
463        let result = SizedPeek::new(stream::iter(chunks), 1024).await;
464        assert!(result.is_err());
465    }
466}