objectstore_client/
get.rs

1use std::{fmt, io};
2
3use async_compression::tokio::bufread::ZstdDecoder;
4use bytes::BytesMut;
5use futures_util::{StreamExt, TryStreamExt};
6use objectstore_types::metadata::{Compression, Metadata};
7use reqwest::StatusCode;
8use tokio_util::io::{ReaderStream, StreamReader};
9
10use crate::{ClientStream, ObjectKey, Session};
11
12/// The result from a successful [`get()`](Session::get) call.
13///
14/// This carries the response as a stream, plus the compression algorithm of the data.
15pub struct GetResponse {
16    /// The metadata attached to this object, including the compression algorithm used for the payload.
17    pub metadata: Metadata,
18    /// The response stream.
19    pub stream: ClientStream,
20}
21
22impl GetResponse {
23    /// Loads the object payload fully into memory.
24    pub async fn payload(self) -> crate::Result<bytes::Bytes> {
25        let bytes: BytesMut = self.stream.try_collect().await?;
26        Ok(bytes.freeze())
27    }
28
29    /// Loads the object payload fully into memory and interprets it as UTF-8 text.
30    pub async fn text(self) -> crate::Result<String> {
31        let bytes = self.payload().await?;
32        Ok(String::from_utf8(bytes.to_vec())?)
33    }
34}
35
36impl fmt::Debug for GetResponse {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        f.debug_struct("GetResponse")
39            .field("metadata", &self.metadata)
40            .field("stream", &format_args!("[Stream]"))
41            .finish()
42    }
43}
44
45impl Session {
46    /// Retrieves the object with the given `key`.
47    pub fn get(&self, key: &str) -> GetBuilder {
48        GetBuilder {
49            session: self.clone(),
50            key: key.to_owned(),
51            decompress: true,
52            accept_encoding: vec![],
53        }
54    }
55}
56
57/// A [`get`](Session::get) request builder.
58#[derive(Debug)]
59pub struct GetBuilder {
60    pub(crate) session: Session,
61    pub(crate) key: ObjectKey,
62    pub(crate) decompress: bool,
63    pub(crate) accept_encoding: Vec<Compression>,
64}
65
66impl GetBuilder {
67    /// Indicates whether the request should automatically handle decompression of known algorithms,
68    /// or rather return the payload as it is stored, along with the compression algorithm it is stored in.
69    ///
70    /// By default, automatic decompression is enabled.
71    pub fn decompress(mut self, decompress: bool) -> Self {
72        self.decompress = decompress;
73        self
74    }
75
76    /// Specifies compression encodings the caller can handle natively.
77    ///
78    /// When the stored object's compression matches one of these, the payload
79    /// is returned still compressed and `metadata.compression` is preserved.
80    /// An empty list (the default) means the client does not accept any
81    /// compressed encoding, so automatic decompression applies as usual.
82    pub fn accept_encoding(mut self, encodings: impl IntoIterator<Item = Compression>) -> Self {
83        self.accept_encoding = encodings.into_iter().collect();
84        self
85    }
86
87    /// Sends the get request.
88    pub async fn send(self) -> crate::Result<Option<GetResponse>> {
89        let response = self
90            .session
91            .request(reqwest::Method::GET, &self.key)?
92            .send()
93            .await?;
94        if response.status() == StatusCode::NOT_FOUND {
95            return Ok(None);
96        }
97        let response = response.error_for_status()?;
98
99        let mut metadata = Metadata::from_headers(response.headers(), "")?;
100
101        let stream = response.bytes_stream().map_err(io::Error::other).boxed();
102        let stream = maybe_decompress(
103            stream,
104            &mut metadata,
105            self.decompress,
106            &self.accept_encoding,
107        );
108
109        Ok(Some(GetResponse { metadata, stream }))
110    }
111}
112
113/// Wraps a stream in a zstd decompression layer.
114///
115/// Decompresses if the metadata indicates zstd compression, `decompress` is `true`,
116/// and the stored encoding is not listed in `accept_encoding`. When the stored encoding
117/// is in `accept_encoding`, the payload is returned compressed and `metadata.compression`
118/// is preserved. Clears `metadata.compression` when decompression is applied.
119pub(crate) fn maybe_decompress(
120    stream: ClientStream,
121    metadata: &mut Metadata,
122    decompress: bool,
123    accept_encoding: &[Compression],
124) -> ClientStream {
125    let encoding_accepted = metadata
126        .compression
127        .is_some_and(|c| accept_encoding.contains(&c));
128    match (metadata.compression, decompress && !encoding_accepted) {
129        (Some(Compression::Zstd), true) => {
130            metadata.compression = None;
131            ReaderStream::new(ZstdDecoder::new(StreamReader::new(stream))).boxed()
132        }
133        _ => stream,
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use futures_util::{StreamExt as _, TryStreamExt as _};
140    use objectstore_types::metadata::{Compression, Metadata};
141
142    use super::maybe_decompress;
143    use crate::ClientStream;
144
145    fn compressed_zstd_stream(data: &[u8]) -> ClientStream {
146        let mut encoder = zstd::Encoder::new(vec![], 0).unwrap();
147        std::io::copy(&mut std::io::Cursor::new(data), &mut encoder).unwrap();
148        let compressed = encoder.finish().unwrap();
149        futures_util::stream::once(async move {
150            Ok::<_, std::io::Error>(bytes::Bytes::from(compressed))
151        })
152        .boxed()
153    }
154
155    fn raw_stream(data: &[u8]) -> ClientStream {
156        let bytes = bytes::Bytes::copy_from_slice(data);
157        futures_util::stream::once(async move { Ok::<_, std::io::Error>(bytes) }).boxed()
158    }
159
160    async fn collect(stream: ClientStream) -> Vec<u8> {
161        let chunks: bytes::BytesMut = stream.try_collect().await.unwrap();
162        chunks.to_vec()
163    }
164
165    fn zstd_metadata() -> Metadata {
166        Metadata {
167            compression: Some(Compression::Zstd),
168            ..Default::default()
169        }
170    }
171
172    fn no_compression_metadata() -> Metadata {
173        Metadata::default()
174    }
175
176    #[tokio::test]
177    async fn empty_accept_decompress_true_decompresses() {
178        let payload = b"hello world";
179        let stream = compressed_zstd_stream(payload);
180        let mut metadata = zstd_metadata();
181
182        let out = maybe_decompress(stream, &mut metadata, true, &[]);
183        assert_eq!(collect(out).await, payload);
184        assert_eq!(metadata.compression, None);
185    }
186
187    #[tokio::test]
188    async fn empty_accept_decompress_false_returns_compressed() {
189        let payload = b"hello world";
190        let compressed_bytes = collect(compressed_zstd_stream(payload)).await;
191        let stream = compressed_zstd_stream(payload);
192
193        let mut metadata = zstd_metadata();
194        let out = maybe_decompress(stream, &mut metadata, false, &[]);
195        assert_eq!(collect(out).await, compressed_bytes);
196        assert_eq!(metadata.compression, Some(Compression::Zstd));
197    }
198
199    #[tokio::test]
200    async fn zstd_accept_decompress_true_skips_decompression() {
201        let payload = b"hello world";
202        let compressed_bytes = collect(compressed_zstd_stream(payload)).await;
203        let stream = compressed_zstd_stream(payload);
204
205        let mut metadata = zstd_metadata();
206        let out = maybe_decompress(stream, &mut metadata, true, &[Compression::Zstd]);
207        assert_eq!(collect(out).await, compressed_bytes);
208        assert_eq!(metadata.compression, Some(Compression::Zstd));
209    }
210
211    #[tokio::test]
212    async fn zstd_accept_decompress_false_returns_compressed() {
213        let payload = b"hello world";
214        let compressed_bytes = collect(compressed_zstd_stream(payload)).await;
215        let stream = compressed_zstd_stream(payload);
216
217        let mut metadata = zstd_metadata();
218        let out = maybe_decompress(stream, &mut metadata, false, &[Compression::Zstd]);
219        assert_eq!(collect(out).await, compressed_bytes);
220        assert_eq!(metadata.compression, Some(Compression::Zstd));
221    }
222
223    #[tokio::test]
224    async fn no_compression_returns_raw_regardless_of_accept() {
225        let payload = b"hello world";
226        let stream = raw_stream(payload);
227
228        let mut metadata = no_compression_metadata();
229        let out = maybe_decompress(stream, &mut metadata, true, &[Compression::Zstd]);
230        assert_eq!(collect(out).await, payload);
231        assert_eq!(metadata.compression, None);
232    }
233}