objectstore_service/
stream.rs

1//! Payload stream type and zero-copy buffering utilities.
2
3use std::io;
4
5use bytes::{Bytes, BytesMut};
6use futures_util::stream::BoxStream;
7use futures_util::{Stream, StreamExt, TryStreamExt};
8
9/// Type alias for data streams used in service APIs.
10pub type PayloadStream = BoxStream<'static, io::Result<Bytes>>;
11
12#[derive(Debug, Default)]
13enum ChunkedBytesState {
14    #[default]
15    Empty,
16    Single(Bytes),
17    Multi(BytesMut),
18}
19
20/// Lazy stream buffer that avoids copying single chunks.
21///
22/// Tracks three internal states:
23/// - **Empty** — no allocation, no data.
24/// - **Single** — first chunk stored by move (zero-copy).
25/// - **Multi** — allocated on second `push`, coalesces all chunks.
26#[derive(Debug)]
27pub(crate) struct ChunkedBytes {
28    state: ChunkedBytesState,
29    capacity: usize,
30}
31
32impl ChunkedBytes {
33    /// Creates a new buffer with the given capacity hint.
34    ///
35    /// The capacity is used when transitioning from Single to Multi state
36    /// and is exposed via [`capacity()`](Self::capacity) for use as a buffer limit.
37    pub fn new(capacity: usize) -> Self {
38        Self {
39            state: ChunkedBytesState::Empty,
40            capacity,
41        }
42    }
43
44    /// Appends a chunk to the buffer.
45    ///
46    /// Empty→Single stores by move (zero-copy). Single→Multi allocates and
47    /// copies both chunks. Multi extends the existing allocation.
48    pub fn push(&mut self, chunk: Bytes) {
49        if chunk.is_empty() {
50            return;
51        }
52        self.state = match std::mem::take(&mut self.state) {
53            ChunkedBytesState::Empty => ChunkedBytesState::Single(chunk),
54            ChunkedBytesState::Single(first) => {
55                let capacity = self.capacity.max(first.len() + chunk.len());
56                let mut buf = BytesMut::with_capacity(capacity);
57                buf.extend_from_slice(&first);
58                buf.extend_from_slice(&chunk);
59                ChunkedBytesState::Multi(buf)
60            }
61            ChunkedBytesState::Multi(mut buf) => {
62                buf.extend_from_slice(&chunk);
63                ChunkedBytesState::Multi(buf)
64            }
65        };
66    }
67
68    /// Returns the configured capacity (used as the buffer limit).
69    pub fn capacity(&self) -> usize {
70        self.capacity
71    }
72
73    /// Returns the total number of buffered bytes.
74    pub fn len(&self) -> usize {
75        match &self.state {
76            ChunkedBytesState::Empty => 0,
77            ChunkedBytesState::Single(b) => b.len(),
78            ChunkedBytesState::Multi(b) => b.len(),
79        }
80    }
81
82    /// Returns `true` if no bytes have been buffered.
83    #[cfg(test)]
84    pub fn is_empty(&self) -> bool {
85        self.len() == 0
86    }
87
88    /// Consumes the buffer and returns the data as a single `Bytes`.
89    ///
90    /// Single-chunk data is returned by move (zero-copy). Multi-chunk data
91    /// is frozen from the internal `BytesMut`.
92    pub fn into_bytes(self) -> Bytes {
93        match self.state {
94            ChunkedBytesState::Empty => Bytes::new(),
95            ChunkedBytesState::Single(b) => b,
96            ChunkedBytesState::Multi(b) => b.freeze(),
97        }
98    }
99}
100
101/// Reads up to `limit` bytes from a stream to support size-based routing decisions.
102///
103/// Constructed via the async [`SizedPeek::new`], which reads eagerly so that
104/// [`is_exhausted()`](Self::is_exhausted) and [`len()`](Self::len) are always valid.
105/// The full stream (buffered prefix plus any unconsumed remainder) is recovered
106/// with [`into_stream()`](Self::into_stream).
107///
108/// The buffer never exceeds `limit` bytes: the chunk that would cause overflow
109/// is held separately and re-emitted by `into_stream` without copying.
110pub(crate) struct SizedPeek<S> {
111    buffer: ChunkedBytes,
112    /// The first chunk that exceeded the limit; `None` when the stream was exhausted.
113    pending: Option<Bytes>,
114    /// `None` when the stream was fully consumed within the limit.
115    stream: Option<S>,
116}
117
118impl<S> SizedPeek<S> {
119    /// Returns `true` if the stream was fully consumed within the peek limit.
120    pub fn is_exhausted(&self) -> bool {
121        self.pending.is_none()
122    }
123
124    /// Returns the number of bytes held in the buffer (at most `limit`).
125    pub fn len(&self) -> usize {
126        self.buffer.len()
127    }
128}
129
130impl<S> SizedPeek<S>
131where
132    S: Stream<Item = io::Result<Bytes>> + Unpin,
133{
134    /// Reads from `stream` into an internal buffer until `limit` bytes are accumulated
135    /// or the stream ends.
136    ///
137    /// Uses strictly-greater-than comparison: a stream of exactly `limit` bytes is
138    /// considered exhausted.
139    pub async fn new(mut stream: S, limit: usize) -> io::Result<Self> {
140        let mut buffer = ChunkedBytes::new(limit);
141
142        while let Some(chunk) = stream.try_next().await? {
143            if buffer.len() + chunk.len() > buffer.capacity() {
144                return Ok(Self {
145                    buffer,
146                    pending: Some(chunk),
147                    stream: Some(stream),
148                });
149            }
150            buffer.push(chunk);
151        }
152
153        Ok(Self {
154            buffer,
155            pending: None,
156            stream: None,
157        })
158    }
159}
160
161impl<S> SizedPeek<S>
162where
163    S: Stream<Item = io::Result<Bytes>>,
164{
165    /// Consumes self and returns a stream that yields the buffered prefix first,
166    /// then any remaining data from the original stream.
167    pub fn into_stream(self) -> impl Stream<Item = io::Result<Bytes>> {
168        let leading = [self.buffer.into_bytes(), self.pending.unwrap_or_default()]
169            .into_iter()
170            .filter(|b| !b.is_empty())
171            .map(Ok);
172
173        let tail = futures_util::stream::iter(self.stream).flatten();
174        futures_util::stream::iter(leading).chain(tail)
175    }
176}
177
178/// Creates a [`PayloadStream`] from a byte slice.
179#[cfg(test)]
180pub fn make_stream(contents: &[u8]) -> PayloadStream {
181    tokio_stream::once(Ok(contents.to_vec().into())).boxed()
182}
183
184/// Collects a stream of `Bytes` chunks into a `Vec<u8>`.
185#[cfg(test)]
186pub(crate) async fn read_to_vec<S>(mut stream: S) -> crate::error::Result<Vec<u8>>
187where
188    S: Stream<Item = io::Result<Bytes>> + Unpin,
189{
190    let mut payload = Vec::new();
191    while let Some(chunk) = stream.try_next().await? {
192        payload.extend(&chunk);
193    }
194    Ok(payload)
195}
196
197#[cfg(test)]
198mod tests {
199    use futures_util::stream;
200
201    use super::*;
202
203    // --- ChunkedBytes tests ---
204
205    #[test]
206    fn empty_into_bytes() {
207        let buf = ChunkedBytes::new(1024);
208        assert!(buf.is_empty());
209        assert_eq!(buf.len(), 0);
210        assert_eq!(buf.into_bytes().len(), 0);
211    }
212
213    #[test]
214    fn single_chunk_zero_copy() {
215        let original = Bytes::from_static(b"hello world");
216        let ptr = original.as_ptr();
217
218        let mut buf = ChunkedBytes::new(1024);
219        buf.push(original);
220
221        assert_eq!(buf.len(), 11);
222        assert!(!buf.is_empty());
223
224        let result = buf.into_bytes();
225        assert_eq!(result.as_ref(), b"hello world");
226        // Pointer equality proves zero-copy.
227        assert_eq!(result.as_ptr(), ptr);
228    }
229
230    #[test]
231    fn multi_chunk_coalesces() {
232        let mut buf = ChunkedBytes::new(1024);
233        buf.push(Bytes::from_static(b"hello "));
234        buf.push(Bytes::from_static(b"world"));
235
236        assert_eq!(buf.len(), 11);
237        assert_eq!(buf.into_bytes().as_ref(), b"hello world");
238    }
239
240    #[test]
241    fn push_empty_chunk_is_noop() {
242        let original = Bytes::from_static(b"data");
243        let ptr = original.as_ptr();
244
245        let mut buf = ChunkedBytes::new(1024);
246        buf.push(Bytes::new());
247        assert!(buf.is_empty());
248
249        buf.push(original);
250        buf.push(Bytes::new());
251        // Still in Single state — empty pushes don't trigger Multi.
252        let result = buf.into_bytes();
253        assert_eq!(result.as_ptr(), ptr);
254    }
255
256    #[test]
257    fn capacity_is_preserved() {
258        let buf = ChunkedBytes::new(42);
259        assert_eq!(buf.capacity(), 42);
260    }
261
262    // --- SizedPeek tests ---
263
264    #[tokio::test]
265    async fn exhausted_when_stream_fits() {
266        let s = stream::once(std::future::ready(Ok(Bytes::from_static(b"small payload"))));
267
268        let peeked = SizedPeek::new(s, 1024).await.unwrap();
269
270        assert!(peeked.is_exhausted());
271        assert_eq!(peeked.len(), 13);
272    }
273
274    #[tokio::test]
275    async fn remaining_when_stream_exceeds_limit() {
276        let chunks: Vec<io::Result<Bytes>> = vec![
277            Ok(Bytes::from(vec![0u8; 600])),
278            Ok(Bytes::from(vec![1u8; 600])),
279        ];
280
281        let peeked = SizedPeek::new(stream::iter(chunks), 1000).await.unwrap();
282
283        assert!(!peeked.is_exhausted());
284        // Only the first 600-byte chunk fits in the buffer; the second is pending.
285        assert_eq!(peeked.len(), 600);
286    }
287
288    #[tokio::test]
289    async fn into_stream_reassembles_data() {
290        // "aaa" fits (3 ≤ 5), "bbb" overflows (3+3=6 > 5) → pending, "ccc" stays in stream.
291        let chunks: Vec<io::Result<Bytes>> = vec![
292            Ok(Bytes::from_static(b"aaa")),
293            Ok(Bytes::from_static(b"bbb")),
294            Ok(Bytes::from_static(b"ccc")),
295        ];
296
297        let peeked = SizedPeek::new(stream::iter(chunks), 5).await.unwrap();
298        assert!(!peeked.is_exhausted());
299
300        let output = read_to_vec(peeked.into_stream()).await.unwrap();
301        assert_eq!(output, b"aaabbbccc");
302    }
303
304    #[tokio::test]
305    async fn into_stream_single_chunk_zero_copy() {
306        let original = Bytes::from_static(b"zero-copy roundtrip");
307        let ptr = original.as_ptr();
308        let s = stream::once(std::future::ready(Ok(original)));
309
310        let peeked = SizedPeek::new(s, 1024).await.unwrap();
311        let out = peeked.into_stream();
312        futures_util::pin_mut!(out);
313        let first = out.try_next().await.unwrap().unwrap();
314        assert_eq!(first.as_ptr(), ptr);
315    }
316
317    #[tokio::test]
318    async fn oversized_single_chunk_goes_to_pending() {
319        // A single chunk larger than the limit never enters the buffer.
320        let big = Bytes::from(vec![0u8; 2000]);
321        let ptr = big.as_ptr();
322        let s = stream::once(std::future::ready(Ok(big)));
323
324        let peeked = SizedPeek::new(s, 1000).await.unwrap();
325        assert!(!peeked.is_exhausted());
326        assert_eq!(peeked.len(), 0); // nothing in buffer
327
328        let out = peeked.into_stream();
329        futures_util::pin_mut!(out);
330        let first = out.try_next().await.unwrap().unwrap();
331        // The pending chunk is emitted by move — same pointer, no copy.
332        assert_eq!(first.as_ptr(), ptr);
333        assert_eq!(first.len(), 2000);
334    }
335
336    #[tokio::test]
337    async fn error_propagation() {
338        let chunks: Vec<io::Result<Bytes>> = vec![
339            Ok(Bytes::from_static(b"ok")),
340            Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken")),
341        ];
342
343        let result = SizedPeek::new(stream::iter(chunks), 1024).await;
344        assert!(result.is_err());
345    }
346}