1use std::error::Error;
13use std::fmt;
14use std::io;
15use std::sync::Arc;
16
17use bytes::{Bytes, BytesMut};
18use futures_util::stream::BoxStream;
19use futures_util::{Stream, StreamExt, TryStreamExt};
20
21pub type PayloadStream = BoxStream<'static, io::Result<Bytes>>;
25
26#[derive(Clone, Debug)]
33pub struct ClientError(Arc<dyn Error + Send + Sync + 'static>);
34
35impl ClientError {
36 pub fn new<E>(err: E) -> Self
38 where
39 E: Error + Send + Sync + 'static,
40 {
41 Self(Arc::new(err))
42 }
43}
44
45impl fmt::Display for ClientError {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 self.0.fmt(f)
48 }
49}
50
51impl Error for ClientError {
52 fn source(&self) -> Option<&(dyn Error + 'static)> {
53 self.0.source()
54 }
55}
56
57impl From<ClientError> for io::Error {
59 fn from(err: ClientError) -> Self {
60 io::Error::other(err)
61 }
62}
63
64pub type ClientStream = BoxStream<'static, Result<Bytes, ClientError>>;
74
75pub fn unpack_client_error<E>(err: &E) -> Option<ClientError>
86where
87 E: Error + 'static,
88{
89 let mut source = Some(err as &(dyn Error + 'static));
90
91 while let Some(s) = source {
92 let target = match s.downcast_ref::<io::Error>().and_then(|e| e.get_ref()) {
95 Some(inner) => inner,
96 None => s,
97 };
98
99 if let Some(client_error) = target.downcast_ref::<ClientError>() {
100 return Some(client_error.clone());
101 }
102
103 source = s.source();
104 }
105 None
106}
107
108#[derive(Debug, Default)]
109enum ChunkedBytesState {
110 #[default]
111 Empty,
112 Single(Bytes),
113 Multi(BytesMut),
114}
115
116#[derive(Debug)]
123pub(crate) struct ChunkedBytes {
124 state: ChunkedBytesState,
125 capacity: usize,
126}
127
128impl ChunkedBytes {
129 pub fn new(capacity: usize) -> Self {
134 Self {
135 state: ChunkedBytesState::Empty,
136 capacity,
137 }
138 }
139
140 pub fn push(&mut self, chunk: Bytes) {
145 if chunk.is_empty() {
146 return;
147 }
148 self.state = match std::mem::take(&mut self.state) {
149 ChunkedBytesState::Empty => ChunkedBytesState::Single(chunk),
150 ChunkedBytesState::Single(first) => {
151 let capacity = self.capacity.max(first.len() + chunk.len());
152 let mut buf = BytesMut::with_capacity(capacity);
153 buf.extend_from_slice(&first);
154 buf.extend_from_slice(&chunk);
155 ChunkedBytesState::Multi(buf)
156 }
157 ChunkedBytesState::Multi(mut buf) => {
158 buf.extend_from_slice(&chunk);
159 ChunkedBytesState::Multi(buf)
160 }
161 };
162 }
163
164 pub fn capacity(&self) -> usize {
166 self.capacity
167 }
168
169 pub fn len(&self) -> usize {
171 match &self.state {
172 ChunkedBytesState::Empty => 0,
173 ChunkedBytesState::Single(b) => b.len(),
174 ChunkedBytesState::Multi(b) => b.len(),
175 }
176 }
177
178 #[cfg(test)]
180 pub fn is_empty(&self) -> bool {
181 self.len() == 0
182 }
183
184 pub fn into_bytes(self) -> Bytes {
189 match self.state {
190 ChunkedBytesState::Empty => Bytes::new(),
191 ChunkedBytesState::Single(b) => b,
192 ChunkedBytesState::Multi(b) => b.freeze(),
193 }
194 }
195}
196
197pub(crate) struct SizedPeek<S> {
207 buffer: ChunkedBytes,
208 pending: Option<Bytes>,
210 stream: Option<S>,
212}
213
214impl<S> SizedPeek<S> {
215 pub fn is_exhausted(&self) -> bool {
217 self.pending.is_none()
218 }
219
220 #[cfg(test)]
222 pub fn len(&self) -> usize {
223 self.buffer.len()
224 }
225}
226
227impl<S, E> SizedPeek<S>
228where
229 S: Stream<Item = Result<Bytes, E>> + Unpin,
230{
231 pub async fn new(mut stream: S, limit: usize) -> Result<Self, E> {
237 let mut buffer = ChunkedBytes::new(limit);
238
239 while let Some(chunk) = stream.try_next().await? {
240 if buffer.len() + chunk.len() > buffer.capacity() {
241 return Ok(Self {
242 buffer,
243 pending: Some(chunk),
244 stream: Some(stream),
245 });
246 }
247 buffer.push(chunk);
248 }
249
250 Ok(Self {
251 buffer,
252 pending: None,
253 stream: None,
254 })
255 }
256
257 pub async fn into_bytes(mut self) -> Result<Bytes, E> {
262 if let Some(pending) = self.pending.take() {
263 self.buffer.push(pending);
264 }
265
266 if let Some(mut stream) = self.stream.take() {
267 while let Some(chunk) = stream.try_next().await? {
268 self.buffer.push(chunk);
269 }
270 }
271
272 Ok(self.buffer.into_bytes())
273 }
274}
275
276impl<S, E> SizedPeek<S>
277where
278 S: Stream<Item = Result<Bytes, E>>,
279{
280 pub fn into_stream(self) -> impl Stream<Item = Result<Bytes, E>> {
283 let leading = [self.buffer.into_bytes(), self.pending.unwrap_or_default()]
284 .into_iter()
285 .filter(|b| !b.is_empty())
286 .map(Ok);
287
288 let tail = futures_util::stream::iter(self.stream).flatten();
289 futures_util::stream::iter(leading).chain(tail)
290 }
291}
292
293pub fn single<E: Send + 'static>(
295 contents: impl Into<Bytes>,
296) -> BoxStream<'static, Result<Bytes, E>> {
297 futures_util::stream::once(std::future::ready(Ok(contents.into()))).boxed()
298}
299
300#[cfg(test)]
302pub(crate) async fn read_to_vec<S, E>(mut stream: S) -> crate::error::Result<Vec<u8>>
303where
304 S: Stream<Item = Result<Bytes, E>> + Unpin,
305 E: Into<crate::error::Error>,
306{
307 let mut payload = Vec::new();
308 while let Some(result) = stream.next().await {
309 let chunk = result.map_err(Into::into)?;
310 payload.extend(&chunk);
311 }
312 Ok(payload)
313}
314
315#[cfg(test)]
316mod tests {
317 use futures_util::stream;
318
319 use super::*;
320
321 #[test]
324 fn empty_into_bytes() {
325 let buf = ChunkedBytes::new(1024);
326 assert!(buf.is_empty());
327 assert_eq!(buf.len(), 0);
328 assert_eq!(buf.into_bytes().len(), 0);
329 }
330
331 #[test]
332 fn single_chunk_zero_copy() {
333 let original = Bytes::from_static(b"hello world");
334 let ptr = original.as_ptr();
335
336 let mut buf = ChunkedBytes::new(1024);
337 buf.push(original);
338
339 assert_eq!(buf.len(), 11);
340 assert!(!buf.is_empty());
341
342 let result = buf.into_bytes();
343 assert_eq!(result.as_ref(), b"hello world");
344 assert_eq!(result.as_ptr(), ptr);
346 }
347
348 #[test]
349 fn multi_chunk_coalesces() {
350 let mut buf = ChunkedBytes::new(1024);
351 buf.push(Bytes::from_static(b"hello "));
352 buf.push(Bytes::from_static(b"world"));
353
354 assert_eq!(buf.len(), 11);
355 assert_eq!(buf.into_bytes().as_ref(), b"hello world");
356 }
357
358 #[test]
359 fn push_empty_chunk_is_noop() {
360 let original = Bytes::from_static(b"data");
361 let ptr = original.as_ptr();
362
363 let mut buf = ChunkedBytes::new(1024);
364 buf.push(Bytes::new());
365 assert!(buf.is_empty());
366
367 buf.push(original);
368 buf.push(Bytes::new());
369 let result = buf.into_bytes();
371 assert_eq!(result.as_ptr(), ptr);
372 }
373
374 #[test]
375 fn capacity_is_preserved() {
376 let buf = ChunkedBytes::new(42);
377 assert_eq!(buf.capacity(), 42);
378 }
379
380 #[tokio::test]
383 async fn exhausted_when_stream_fits() {
384 let s = stream::once(std::future::ready(Ok::<_, io::Error>(Bytes::from_static(
385 b"small payload",
386 ))));
387
388 let peeked = SizedPeek::new(s, 1024).await.unwrap();
389
390 assert!(peeked.is_exhausted());
391 assert_eq!(peeked.len(), 13);
392 }
393
394 #[tokio::test]
395 async fn remaining_when_stream_exceeds_limit() {
396 let chunks: Vec<io::Result<Bytes>> = vec![
397 Ok(Bytes::from(vec![0u8; 600])),
398 Ok(Bytes::from(vec![1u8; 600])),
399 ];
400
401 let peeked = SizedPeek::new(stream::iter(chunks), 1000).await.unwrap();
402
403 assert!(!peeked.is_exhausted());
404 assert_eq!(peeked.len(), 600);
406 }
407
408 #[tokio::test]
409 async fn into_stream_reassembles_data() {
410 let chunks: Vec<io::Result<Bytes>> = vec![
412 Ok(Bytes::from_static(b"aaa")),
413 Ok(Bytes::from_static(b"bbb")),
414 Ok(Bytes::from_static(b"ccc")),
415 ];
416
417 let peeked = SizedPeek::new(stream::iter(chunks), 5).await.unwrap();
418 assert!(!peeked.is_exhausted());
419
420 let output = read_to_vec(peeked.into_stream()).await.unwrap();
421 assert_eq!(output, b"aaabbbccc");
422 }
423
424 #[tokio::test]
425 async fn into_stream_single_chunk_zero_copy() {
426 let original = Bytes::from_static(b"zero-copy roundtrip");
427 let ptr = original.as_ptr();
428 let s = stream::once(std::future::ready(Ok::<_, io::Error>(original)));
429
430 let peeked = SizedPeek::new(s, 1024).await.unwrap();
431 let out = peeked.into_stream();
432 futures_util::pin_mut!(out);
433 let first = out.try_next().await.unwrap().unwrap();
434 assert_eq!(first.as_ptr(), ptr);
435 }
436
437 #[tokio::test]
438 async fn oversized_single_chunk_goes_to_pending() {
439 let big = Bytes::from(vec![0u8; 2000]);
441 let ptr = big.as_ptr();
442 let s = stream::once(std::future::ready(Ok::<_, io::Error>(big)));
443
444 let peeked = SizedPeek::new(s, 1000).await.unwrap();
445 assert!(!peeked.is_exhausted());
446 assert_eq!(peeked.len(), 0); let out = peeked.into_stream();
449 futures_util::pin_mut!(out);
450 let first = out.try_next().await.unwrap().unwrap();
451 assert_eq!(first.as_ptr(), ptr);
453 assert_eq!(first.len(), 2000);
454 }
455
456 #[tokio::test]
457 async fn error_propagation() {
458 let chunks: Vec<io::Result<Bytes>> = vec![
459 Ok(Bytes::from_static(b"ok")),
460 Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken")),
461 ];
462
463 let result = SizedPeek::new(stream::iter(chunks), 1024).await;
464 assert!(result.is_err());
465 }
466}