objectstore_service/
stream.rs1use std::io;
4
5use bytes::{Bytes, BytesMut};
6use futures_util::stream::BoxStream;
7use futures_util::{Stream, StreamExt, TryStreamExt};
8
9pub type PayloadStream = BoxStream<'static, io::Result<Bytes>>;
11
12#[derive(Debug, Default)]
13enum ChunkedBytesState {
14 #[default]
15 Empty,
16 Single(Bytes),
17 Multi(BytesMut),
18}
19
20#[derive(Debug)]
27pub(crate) struct ChunkedBytes {
28 state: ChunkedBytesState,
29 capacity: usize,
30}
31
32impl ChunkedBytes {
33 pub fn new(capacity: usize) -> Self {
38 Self {
39 state: ChunkedBytesState::Empty,
40 capacity,
41 }
42 }
43
44 pub fn push(&mut self, chunk: Bytes) {
49 if chunk.is_empty() {
50 return;
51 }
52 self.state = match std::mem::take(&mut self.state) {
53 ChunkedBytesState::Empty => ChunkedBytesState::Single(chunk),
54 ChunkedBytesState::Single(first) => {
55 let capacity = self.capacity.max(first.len() + chunk.len());
56 let mut buf = BytesMut::with_capacity(capacity);
57 buf.extend_from_slice(&first);
58 buf.extend_from_slice(&chunk);
59 ChunkedBytesState::Multi(buf)
60 }
61 ChunkedBytesState::Multi(mut buf) => {
62 buf.extend_from_slice(&chunk);
63 ChunkedBytesState::Multi(buf)
64 }
65 };
66 }
67
68 pub fn capacity(&self) -> usize {
70 self.capacity
71 }
72
73 pub fn len(&self) -> usize {
75 match &self.state {
76 ChunkedBytesState::Empty => 0,
77 ChunkedBytesState::Single(b) => b.len(),
78 ChunkedBytesState::Multi(b) => b.len(),
79 }
80 }
81
82 #[cfg(test)]
84 pub fn is_empty(&self) -> bool {
85 self.len() == 0
86 }
87
88 pub fn into_bytes(self) -> Bytes {
93 match self.state {
94 ChunkedBytesState::Empty => Bytes::new(),
95 ChunkedBytesState::Single(b) => b,
96 ChunkedBytesState::Multi(b) => b.freeze(),
97 }
98 }
99}
100
101pub(crate) struct SizedPeek<S> {
111 buffer: ChunkedBytes,
112 pending: Option<Bytes>,
114 stream: Option<S>,
116}
117
118impl<S> SizedPeek<S> {
119 pub fn is_exhausted(&self) -> bool {
121 self.pending.is_none()
122 }
123
124 pub fn len(&self) -> usize {
126 self.buffer.len()
127 }
128}
129
130impl<S> SizedPeek<S>
131where
132 S: Stream<Item = io::Result<Bytes>> + Unpin,
133{
134 pub async fn new(mut stream: S, limit: usize) -> io::Result<Self> {
140 let mut buffer = ChunkedBytes::new(limit);
141
142 while let Some(chunk) = stream.try_next().await? {
143 if buffer.len() + chunk.len() > buffer.capacity() {
144 return Ok(Self {
145 buffer,
146 pending: Some(chunk),
147 stream: Some(stream),
148 });
149 }
150 buffer.push(chunk);
151 }
152
153 Ok(Self {
154 buffer,
155 pending: None,
156 stream: None,
157 })
158 }
159}
160
161impl<S> SizedPeek<S>
162where
163 S: Stream<Item = io::Result<Bytes>>,
164{
165 pub fn into_stream(self) -> impl Stream<Item = io::Result<Bytes>> {
168 let leading = [self.buffer.into_bytes(), self.pending.unwrap_or_default()]
169 .into_iter()
170 .filter(|b| !b.is_empty())
171 .map(Ok);
172
173 let tail = futures_util::stream::iter(self.stream).flatten();
174 futures_util::stream::iter(leading).chain(tail)
175 }
176}
177
178#[cfg(test)]
180pub fn make_stream(contents: &[u8]) -> PayloadStream {
181 tokio_stream::once(Ok(contents.to_vec().into())).boxed()
182}
183
184#[cfg(test)]
186pub(crate) async fn read_to_vec<S>(mut stream: S) -> crate::error::Result<Vec<u8>>
187where
188 S: Stream<Item = io::Result<Bytes>> + Unpin,
189{
190 let mut payload = Vec::new();
191 while let Some(chunk) = stream.try_next().await? {
192 payload.extend(&chunk);
193 }
194 Ok(payload)
195}
196
197#[cfg(test)]
198mod tests {
199 use futures_util::stream;
200
201 use super::*;
202
203 #[test]
206 fn empty_into_bytes() {
207 let buf = ChunkedBytes::new(1024);
208 assert!(buf.is_empty());
209 assert_eq!(buf.len(), 0);
210 assert_eq!(buf.into_bytes().len(), 0);
211 }
212
213 #[test]
214 fn single_chunk_zero_copy() {
215 let original = Bytes::from_static(b"hello world");
216 let ptr = original.as_ptr();
217
218 let mut buf = ChunkedBytes::new(1024);
219 buf.push(original);
220
221 assert_eq!(buf.len(), 11);
222 assert!(!buf.is_empty());
223
224 let result = buf.into_bytes();
225 assert_eq!(result.as_ref(), b"hello world");
226 assert_eq!(result.as_ptr(), ptr);
228 }
229
230 #[test]
231 fn multi_chunk_coalesces() {
232 let mut buf = ChunkedBytes::new(1024);
233 buf.push(Bytes::from_static(b"hello "));
234 buf.push(Bytes::from_static(b"world"));
235
236 assert_eq!(buf.len(), 11);
237 assert_eq!(buf.into_bytes().as_ref(), b"hello world");
238 }
239
240 #[test]
241 fn push_empty_chunk_is_noop() {
242 let original = Bytes::from_static(b"data");
243 let ptr = original.as_ptr();
244
245 let mut buf = ChunkedBytes::new(1024);
246 buf.push(Bytes::new());
247 assert!(buf.is_empty());
248
249 buf.push(original);
250 buf.push(Bytes::new());
251 let result = buf.into_bytes();
253 assert_eq!(result.as_ptr(), ptr);
254 }
255
256 #[test]
257 fn capacity_is_preserved() {
258 let buf = ChunkedBytes::new(42);
259 assert_eq!(buf.capacity(), 42);
260 }
261
262 #[tokio::test]
265 async fn exhausted_when_stream_fits() {
266 let s = stream::once(std::future::ready(Ok(Bytes::from_static(b"small payload"))));
267
268 let peeked = SizedPeek::new(s, 1024).await.unwrap();
269
270 assert!(peeked.is_exhausted());
271 assert_eq!(peeked.len(), 13);
272 }
273
274 #[tokio::test]
275 async fn remaining_when_stream_exceeds_limit() {
276 let chunks: Vec<io::Result<Bytes>> = vec![
277 Ok(Bytes::from(vec![0u8; 600])),
278 Ok(Bytes::from(vec![1u8; 600])),
279 ];
280
281 let peeked = SizedPeek::new(stream::iter(chunks), 1000).await.unwrap();
282
283 assert!(!peeked.is_exhausted());
284 assert_eq!(peeked.len(), 600);
286 }
287
288 #[tokio::test]
289 async fn into_stream_reassembles_data() {
290 let chunks: Vec<io::Result<Bytes>> = vec![
292 Ok(Bytes::from_static(b"aaa")),
293 Ok(Bytes::from_static(b"bbb")),
294 Ok(Bytes::from_static(b"ccc")),
295 ];
296
297 let peeked = SizedPeek::new(stream::iter(chunks), 5).await.unwrap();
298 assert!(!peeked.is_exhausted());
299
300 let output = read_to_vec(peeked.into_stream()).await.unwrap();
301 assert_eq!(output, b"aaabbbccc");
302 }
303
304 #[tokio::test]
305 async fn into_stream_single_chunk_zero_copy() {
306 let original = Bytes::from_static(b"zero-copy roundtrip");
307 let ptr = original.as_ptr();
308 let s = stream::once(std::future::ready(Ok(original)));
309
310 let peeked = SizedPeek::new(s, 1024).await.unwrap();
311 let out = peeked.into_stream();
312 futures_util::pin_mut!(out);
313 let first = out.try_next().await.unwrap().unwrap();
314 assert_eq!(first.as_ptr(), ptr);
315 }
316
317 #[tokio::test]
318 async fn oversized_single_chunk_goes_to_pending() {
319 let big = Bytes::from(vec![0u8; 2000]);
321 let ptr = big.as_ptr();
322 let s = stream::once(std::future::ready(Ok(big)));
323
324 let peeked = SizedPeek::new(s, 1000).await.unwrap();
325 assert!(!peeked.is_exhausted());
326 assert_eq!(peeked.len(), 0); let out = peeked.into_stream();
329 futures_util::pin_mut!(out);
330 let first = out.try_next().await.unwrap().unwrap();
331 assert_eq!(first.as_ptr(), ptr);
333 assert_eq!(first.len(), 2000);
334 }
335
336 #[tokio::test]
337 async fn error_propagation() {
338 let chunks: Vec<io::Result<Bytes>> = vec![
339 Ok(Bytes::from_static(b"ok")),
340 Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken")),
341 ];
342
343 let result = SizedPeek::new(stream::iter(chunks), 1024).await;
344 assert!(result.is_err());
345 }
346}